diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-02 15:29:00 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-02 15:50:05 +0200 |
commit | 22df69fdecf299c8be6acaa25f086249ea9a0085 (patch) | |
tree | e8c7e21c18fb42bb74b54f2eab1509c3d93a380d /server/lib/job-queue/job-queue.ts | |
parent | 7a9e420a02434e4f16c99e7d58da9075dff25d15 (diff) | |
download | PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.gz PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.zst PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.zip |
Add job queue hooks
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e55d2e7c2..0ae325f4d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -24,6 +24,7 @@ import { | |||
24 | } from '../../../shared/models' | 24 | } from '../../../shared/models' |
25 | import { logger } from '../../helpers/logger' | 25 | import { logger } from '../../helpers/logger' |
26 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 26 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
27 | import { Hooks } from '../plugins/hooks' | ||
27 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | 28 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
28 | import { processActivityPubFollow } from './handlers/activitypub-follow' | 29 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
29 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 30 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
@@ -157,8 +158,11 @@ class JobQueue { | |||
157 | 158 | ||
158 | const handler = handlers[handlerName] | 159 | const handler = handlers[handlerName] |
159 | 160 | ||
160 | queue.process(this.getJobConcurrency(handlerName), handler) | 161 | queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { |
161 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 162 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) |
163 | |||
164 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | ||
165 | }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | ||
162 | 166 | ||
163 | queue.on('failed', (job, err) => { | 167 | queue.on('failed', (job, err) => { |
164 | const logLevel = silentFailure.has(handlerName) | 168 | const logLevel = silentFailure.has(handlerName) |