diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-02 15:29:00 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-02 15:50:05 +0200 |
commit | 22df69fdecf299c8be6acaa25f086249ea9a0085 (patch) | |
tree | e8c7e21c18fb42bb74b54f2eab1509c3d93a380d /server/lib | |
parent | 7a9e420a02434e4f16c99e7d58da9075dff25d15 (diff) | |
download | PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.gz PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.zst PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.zip |
Add job queue hooks
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 8 | ||||
-rw-r--r-- | server/lib/plugins/hooks.ts | 4 | ||||
-rw-r--r-- | server/lib/plugins/plugin-helpers-builder.ts | 4 | ||||
-rw-r--r-- | server/lib/plugins/plugin-manager.ts | 8 |
4 files changed, 18 insertions, 6 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e55d2e7c2..0ae325f4d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -24,6 +24,7 @@ import { | |||
24 | } from '../../../shared/models' | 24 | } from '../../../shared/models' |
25 | import { logger } from '../../helpers/logger' | 25 | import { logger } from '../../helpers/logger' |
26 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 26 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
27 | import { Hooks } from '../plugins/hooks' | ||
27 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | 28 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
28 | import { processActivityPubFollow } from './handlers/activitypub-follow' | 29 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
29 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 30 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
@@ -157,8 +158,11 @@ class JobQueue { | |||
157 | 158 | ||
158 | const handler = handlers[handlerName] | 159 | const handler = handlers[handlerName] |
159 | 160 | ||
160 | queue.process(this.getJobConcurrency(handlerName), handler) | 161 | queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { |
161 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 162 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) |
163 | |||
164 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | ||
165 | }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | ||
162 | 166 | ||
163 | queue.on('failed', (job, err) => { | 167 | queue.on('failed', (job, err) => { |
164 | const logLevel = silentFailure.has(handlerName) | 168 | const logLevel = silentFailure.has(handlerName) |
diff --git a/server/lib/plugins/hooks.ts b/server/lib/plugins/hooks.ts index 327aaece2..694527c12 100644 --- a/server/lib/plugins/hooks.ts +++ b/server/lib/plugins/hooks.ts | |||
@@ -8,8 +8,8 @@ type RawFunction <U, T> = (params: U) => T | |||
8 | 8 | ||
9 | // Helpers to run hooks | 9 | // Helpers to run hooks |
10 | const Hooks = { | 10 | const Hooks = { |
11 | wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U) => { | 11 | wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U, context?: any) => { |
12 | return PluginManager.Instance.runHook(hookName, result) | 12 | return PluginManager.Instance.runHook(hookName, result, context) |
13 | }, | 13 | }, |
14 | 14 | ||
15 | wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { | 15 | wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { |
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts index b76c0a8a4..4e799b3d4 100644 --- a/server/lib/plugins/plugin-helpers-builder.ts +++ b/server/lib/plugins/plugin-helpers-builder.ts | |||
@@ -220,6 +220,10 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) { | |||
220 | 220 | ||
221 | function buildUserHelpers () { | 221 | function buildUserHelpers () { |
222 | return { | 222 | return { |
223 | loadById: (id: number) => { | ||
224 | return UserModel.loadByIdFull(id) | ||
225 | }, | ||
226 | |||
223 | getAuthUser: (res: express.Response) => { | 227 | getAuthUser: (res: express.Response) => { |
224 | const user = res.locals.oauth?.token?.User | 228 | const user = res.locals.oauth?.token?.User |
225 | if (!user) return undefined | 229 | if (!user) return undefined |
diff --git a/server/lib/plugins/plugin-manager.ts b/server/lib/plugins/plugin-manager.ts index c21ebd0c5..a706df1e0 100644 --- a/server/lib/plugins/plugin-manager.ts +++ b/server/lib/plugins/plugin-manager.ts | |||
@@ -215,8 +215,12 @@ export class PluginManager implements ServerHook { | |||
215 | for (const hook of this.hooks[hookName]) { | 215 | for (const hook of this.hooks[hookName]) { |
216 | logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) | 216 | logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) |
217 | 217 | ||
218 | result = await internalRunHook(hook.handler, hookType, result, params, err => { | 218 | result = await internalRunHook({ |
219 | logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) | 219 | handler: hook.handler, |
220 | hookType, | ||
221 | result, | ||
222 | params, | ||
223 | onError: err => { logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) } | ||
220 | }) | 224 | }) |
221 | } | 225 | } |
222 | 226 | ||