diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ffd948b5f..8a24604e1 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -2,7 +2,7 @@ import * as Bull from 'bull' | |||
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { Redis } from '../redis' | 4 | import { Redis } from '../redis' |
5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' | 5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' |
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
@@ -79,6 +79,7 @@ class JobQueue { | |||
79 | const handler = handlers[handlerName] | 79 | const handler = handlers[handlerName] |
80 | 80 | ||
81 | queue.process(JOB_CONCURRENCY[handlerName], handler) | 81 | queue.process(JOB_CONCURRENCY[handlerName], handler) |
82 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | ||
82 | 83 | ||
83 | queue.on('failed', (job, err) => { | 84 | queue.on('failed', (job, err) => { |
84 | logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) | 85 | logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) |
@@ -109,11 +110,8 @@ class JobQueue { | |||
109 | 110 | ||
110 | const jobArgs: Bull.JobOptions = { | 111 | const jobArgs: Bull.JobOptions = { |
111 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 112 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
112 | attempts: JOB_ATTEMPTS[obj.type] | 113 | attempts: JOB_ATTEMPTS[obj.type], |
113 | } | 114 | timeout: JOB_TTL[obj.type] |
114 | |||
115 | if (jobsWithRequestTimeout[obj.type] === true) { | ||
116 | jobArgs.timeout = JOB_REQUEST_TTL | ||
117 | } | 115 | } |
118 | 116 | ||
119 | return queue.add(obj.payload, jobArgs) | 117 | return queue.add(obj.payload, jobArgs) |