diff options
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 25 |
1 files changed, 15 insertions, 10 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 8ff0c169e..8a24604e1 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -2,13 +2,14 @@ import * as Bull from 'bull' | |||
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { Redis } from '../redis' | 4 | import { Redis } from '../redis' |
5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' | 5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' |
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
9 | import { EmailPayload, processEmail } from './handlers/email' | 9 | import { EmailPayload, processEmail } from './handlers/email' |
10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' | 10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' | ||
12 | 13 | ||
13 | type CreateJobArgument = | 14 | type CreateJobArgument = |
14 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 15 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -17,7 +18,8 @@ type CreateJobArgument = | |||
17 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | 18 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | |
18 | { type: 'video-file-import', payload: VideoFileImportPayload } | | 19 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
19 | { type: 'video-file', payload: VideoFilePayload } | | 20 | { type: 'video-file', payload: VideoFilePayload } | |
20 | { type: 'email', payload: EmailPayload } | 21 | { type: 'email', payload: EmailPayload } | |
22 | { type: 'video-import', payload: VideoImportPayload } | ||
21 | 23 | ||
22 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 24 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
23 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 25 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
@@ -26,7 +28,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
26 | 'activitypub-follow': processActivityPubFollow, | 28 | 'activitypub-follow': processActivityPubFollow, |
27 | 'video-file-import': processVideoFileImport, | 29 | 'video-file-import': processVideoFileImport, |
28 | 'video-file': processVideoFile, | 30 | 'video-file': processVideoFile, |
29 | 'email': processEmail | 31 | 'email': processEmail, |
32 | 'video-import': processVideoImport | ||
30 | } | 33 | } |
31 | 34 | ||
32 | const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { | 35 | const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { |
@@ -43,7 +46,8 @@ const jobTypes: JobType[] = [ | |||
43 | 'activitypub-http-unicast', | 46 | 'activitypub-http-unicast', |
44 | 'email', | 47 | 'email', |
45 | 'video-file', | 48 | 'video-file', |
46 | 'video-file-import' | 49 | 'video-file-import', |
50 | 'video-import' | ||
47 | ] | 51 | ] |
48 | 52 | ||
49 | class JobQueue { | 53 | class JobQueue { |
@@ -75,7 +79,11 @@ class JobQueue { | |||
75 | const handler = handlers[handlerName] | 79 | const handler = handlers[handlerName] |
76 | 80 | ||
77 | queue.process(JOB_CONCURRENCY[handlerName], handler) | 81 | queue.process(JOB_CONCURRENCY[handlerName], handler) |
78 | .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) | 82 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) |
83 | |||
84 | queue.on('failed', (job, err) => { | ||
85 | logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) | ||
86 | }) | ||
79 | 87 | ||
80 | queue.on('error', err => { | 88 | queue.on('error', err => { |
81 | logger.error('Error in job queue %s.', handlerName, { err }) | 89 | logger.error('Error in job queue %s.', handlerName, { err }) |
@@ -102,11 +110,8 @@ class JobQueue { | |||
102 | 110 | ||
103 | const jobArgs: Bull.JobOptions = { | 111 | const jobArgs: Bull.JobOptions = { |
104 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 112 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
105 | attempts: JOB_ATTEMPTS[obj.type] | 113 | attempts: JOB_ATTEMPTS[obj.type], |
106 | } | 114 | timeout: JOB_TTL[obj.type] |
107 | |||
108 | if (jobsWithRequestTimeout[obj.type] === true) { | ||
109 | jobArgs.timeout = JOB_REQUEST_TTL | ||
110 | } | 115 | } |
111 | 116 | ||
112 | return queue.add(obj.payload, jobArgs) | 117 | return queue.add(obj.payload, jobArgs) |