aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-02 15:29:00 +0200
committerChocobozzz <me@florianbigard.com>2022-08-02 15:50:05 +0200
commit22df69fdecf299c8be6acaa25f086249ea9a0085 (patch)
treee8c7e21c18fb42bb74b54f2eab1509c3d93a380d /server/lib/job-queue/job-queue.ts
parent7a9e420a02434e4f16c99e7d58da9075dff25d15 (diff)
downloadPeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.gz
PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.zst
PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.zip
Add job queue hooks
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts8
1 files changed, 6 insertions, 2 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index e55d2e7c2..0ae325f4d 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -24,6 +24,7 @@ import {
24} from '../../../shared/models' 24} from '../../../shared/models'
25import { logger } from '../../helpers/logger' 25import { logger } from '../../helpers/logger'
26import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 26import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
27import { Hooks } from '../plugins/hooks'
27import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 28import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
28import { processActivityPubFollow } from './handlers/activitypub-follow' 29import { processActivityPubFollow } from './handlers/activitypub-follow'
29import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 30import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
@@ -157,8 +158,11 @@ class JobQueue {
157 158
158 const handler = handlers[handlerName] 159 const handler = handlers[handlerName]
159 160
160 queue.process(this.getJobConcurrency(handlerName), handler) 161 queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => {
161 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 162 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
163
164 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
165 }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
162 166
163 queue.on('failed', (job, err) => { 167 queue.on('failed', (job, err) => {
164 const logLevel = silentFailure.has(handlerName) 168 const logLevel = silentFailure.has(handlerName)