aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-02 15:29:00 +0200
committerChocobozzz <me@florianbigard.com>2022-08-02 15:50:05 +0200
commit22df69fdecf299c8be6acaa25f086249ea9a0085 (patch)
treee8c7e21c18fb42bb74b54f2eab1509c3d93a380d /server/lib
parent7a9e420a02434e4f16c99e7d58da9075dff25d15 (diff)
downloadPeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.gz
PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.zst
PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.zip
Add job queue hooks
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/job-queue/job-queue.ts8
-rw-r--r--server/lib/plugins/hooks.ts4
-rw-r--r--server/lib/plugins/plugin-helpers-builder.ts4
-rw-r--r--server/lib/plugins/plugin-manager.ts8
4 files changed, 18 insertions, 6 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index e55d2e7c2..0ae325f4d 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -24,6 +24,7 @@ import {
24} from '../../../shared/models' 24} from '../../../shared/models'
25import { logger } from '../../helpers/logger' 25import { logger } from '../../helpers/logger'
26import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 26import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
27import { Hooks } from '../plugins/hooks'
27import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 28import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
28import { processActivityPubFollow } from './handlers/activitypub-follow' 29import { processActivityPubFollow } from './handlers/activitypub-follow'
29import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 30import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
@@ -157,8 +158,11 @@ class JobQueue {
157 158
158 const handler = handlers[handlerName] 159 const handler = handlers[handlerName]
159 160
160 queue.process(this.getJobConcurrency(handlerName), handler) 161 queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => {
161 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 162 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
163
164 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
165 }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
162 166
163 queue.on('failed', (job, err) => { 167 queue.on('failed', (job, err) => {
164 const logLevel = silentFailure.has(handlerName) 168 const logLevel = silentFailure.has(handlerName)
diff --git a/server/lib/plugins/hooks.ts b/server/lib/plugins/hooks.ts
index 327aaece2..694527c12 100644
--- a/server/lib/plugins/hooks.ts
+++ b/server/lib/plugins/hooks.ts
@@ -8,8 +8,8 @@ type RawFunction <U, T> = (params: U) => T
8 8
9// Helpers to run hooks 9// Helpers to run hooks
10const Hooks = { 10const Hooks = {
11 wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U) => { 11 wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U, context?: any) => {
12 return PluginManager.Instance.runHook(hookName, result) 12 return PluginManager.Instance.runHook(hookName, result, context)
13 }, 13 },
14 14
15 wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { 15 wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => {
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts
index b76c0a8a4..4e799b3d4 100644
--- a/server/lib/plugins/plugin-helpers-builder.ts
+++ b/server/lib/plugins/plugin-helpers-builder.ts
@@ -220,6 +220,10 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) {
220 220
221function buildUserHelpers () { 221function buildUserHelpers () {
222 return { 222 return {
223 loadById: (id: number) => {
224 return UserModel.loadByIdFull(id)
225 },
226
223 getAuthUser: (res: express.Response) => { 227 getAuthUser: (res: express.Response) => {
224 const user = res.locals.oauth?.token?.User 228 const user = res.locals.oauth?.token?.User
225 if (!user) return undefined 229 if (!user) return undefined
diff --git a/server/lib/plugins/plugin-manager.ts b/server/lib/plugins/plugin-manager.ts
index c21ebd0c5..a706df1e0 100644
--- a/server/lib/plugins/plugin-manager.ts
+++ b/server/lib/plugins/plugin-manager.ts
@@ -215,8 +215,12 @@ export class PluginManager implements ServerHook {
215 for (const hook of this.hooks[hookName]) { 215 for (const hook of this.hooks[hookName]) {
216 logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) 216 logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName)
217 217
218 result = await internalRunHook(hook.handler, hookType, result, params, err => { 218 result = await internalRunHook({
219 logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) 219 handler: hook.handler,
220 hookType,
221 result,
222 params,
223 onError: err => { logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) }
220 }) 224 })
221 } 225 }
222 226