From 22df69fdecf299c8be6acaa25f086249ea9a0085 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 2 Aug 2022 15:29:00 +0200 Subject: Add job queue hooks --- server/lib/job-queue/job-queue.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'server/lib/job-queue/job-queue.ts') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e55d2e7c2..0ae325f4d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -24,6 +24,7 @@ import { } from '../../../shared/models' import { logger } from '../../helpers/logger' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { Hooks } from '../plugins/hooks' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' @@ -157,8 +158,11 @@ class JobQueue { const handler = handlers[handlerName] - queue.process(this.getJobConcurrency(handlerName), handler) - .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job) => { + const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) + + return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') + }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) queue.on('failed', (job, err) => { const logLevel = silentFailure.has(handlerName) -- cgit v1.2.3