aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts25
1 files changed, 15 insertions, 10 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 8ff0c169e..8a24604e1 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -2,13 +2,14 @@ import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { Redis } from '../redis' 4import { Redis } from '../redis'
5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' 5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
9import { EmailPayload, processEmail } from './handlers/email' 9import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' 10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
12 13
13type CreateJobArgument = 14type CreateJobArgument =
14 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 15 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -17,7 +18,8 @@ type CreateJobArgument =
17 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | 18 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
18 { type: 'video-file-import', payload: VideoFileImportPayload } | 19 { type: 'video-file-import', payload: VideoFileImportPayload } |
19 { type: 'video-file', payload: VideoFilePayload } | 20 { type: 'video-file', payload: VideoFilePayload } |
20 { type: 'email', payload: EmailPayload } 21 { type: 'email', payload: EmailPayload } |
22 { type: 'video-import', payload: VideoImportPayload }
21 23
22const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 24const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
23 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 25 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
@@ -26,7 +28,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
26 'activitypub-follow': processActivityPubFollow, 28 'activitypub-follow': processActivityPubFollow,
27 'video-file-import': processVideoFileImport, 29 'video-file-import': processVideoFileImport,
28 'video-file': processVideoFile, 30 'video-file': processVideoFile,
29 'email': processEmail 31 'email': processEmail,
32 'video-import': processVideoImport
30} 33}
31 34
32const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { 35const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
@@ -43,7 +46,8 @@ const jobTypes: JobType[] = [
43 'activitypub-http-unicast', 46 'activitypub-http-unicast',
44 'email', 47 'email',
45 'video-file', 48 'video-file',
46 'video-file-import' 49 'video-file-import',
50 'video-import'
47] 51]
48 52
49class JobQueue { 53class JobQueue {
@@ -75,7 +79,11 @@ class JobQueue {
75 const handler = handlers[handlerName] 79 const handler = handlers[handlerName]
76 80
77 queue.process(JOB_CONCURRENCY[handlerName], handler) 81 queue.process(JOB_CONCURRENCY[handlerName], handler)
78 .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) 82 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
83
84 queue.on('failed', (job, err) => {
85 logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
86 })
79 87
80 queue.on('error', err => { 88 queue.on('error', err => {
81 logger.error('Error in job queue %s.', handlerName, { err }) 89 logger.error('Error in job queue %s.', handlerName, { err })
@@ -102,11 +110,8 @@ class JobQueue {
102 110
103 const jobArgs: Bull.JobOptions = { 111 const jobArgs: Bull.JobOptions = {
104 backoff: { delay: 60 * 1000, type: 'exponential' }, 112 backoff: { delay: 60 * 1000, type: 'exponential' },
105 attempts: JOB_ATTEMPTS[obj.type] 113 attempts: JOB_ATTEMPTS[obj.type],
106 } 114 timeout: JOB_TTL[obj.type]
107
108 if (jobsWithRequestTimeout[obj.type] === true) {
109 jobArgs.timeout = JOB_REQUEST_TTL
110 } 115 }
111 116
112 return queue.add(obj.payload, jobArgs) 117 return queue.add(obj.payload, jobArgs)