From 22df69fdecf299c8be6acaa25f086249ea9a0085 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 2 Aug 2022 15:29:00 +0200 Subject: [PATCH] Add job queue hooks --- client/src/root-helpers/plugins-manager.ts | 10 ++++- server/lib/job-queue/job-queue.ts | 8 +++- server/lib/plugins/hooks.ts | 4 +- server/lib/plugins/plugin-helpers-builder.ts | 4 ++ server/lib/plugins/plugin-manager.ts | 8 +++- .../peertube-plugin-test-four/main.js | 10 +++++ .../fixtures/peertube-plugin-test/main.js | 26 ++++++++++- server/tests/plugins/filter-hooks.ts | 45 +++++++++++++++++++ server/tests/plugins/plugin-helpers.ts | 23 ++++++++++ .../plugins/register-server-option.model.ts | 17 +++---- shared/core-utils/plugins/hooks.ts | 17 +++++-- .../plugins/server/server-hook.model.ts | 5 ++- 12 files changed, 151 insertions(+), 26 deletions(-) diff --git a/client/src/root-helpers/plugins-manager.ts b/client/src/root-helpers/plugins-manager.ts index 37a52be72..300f174b4 100644 --- a/client/src/root-helpers/plugins-manager.ts +++ b/client/src/root-helpers/plugins-manager.ts @@ -112,8 +112,14 @@ class PluginsManager { for (const hook of this.hooks[hookName]) { logger.info(`Running hook ${hookName} of plugin ${hook.plugin.name}`) - result = await internalRunHook(hook.handler, hookType, result, params, err => { - logger.error(`Cannot run hook ${hookName} of script ${hook.clientScript.script} of plugin ${hook.plugin.name}`, err) + result = await internalRunHook({ + handler: hook.handler, + hookType, + result, + params, + onError: err => { + logger.error(`Cannot run hook ${hookName} of script ${hook.clientScript.script} of plugin ${hook.plugin.name}`, err) + } }) } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e55d2e7c2..0ae325f4d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -24,6 +24,7 @@ import { } from '../../../shared/models' import { logger } from '../../helpers/logger' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { Hooks } from '../plugins/hooks' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' @@ -157,8 +158,11 @@ class JobQueue { const handler = handlers[handlerName] - queue.process(this.getJobConcurrency(handlerName), handler) - .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job) => { + const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) + + return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') + }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) queue.on('failed', (job, err) => { const logLevel = silentFailure.has(handlerName) diff --git a/server/lib/plugins/hooks.ts b/server/lib/plugins/hooks.ts index 327aaece2..694527c12 100644 --- a/server/lib/plugins/hooks.ts +++ b/server/lib/plugins/hooks.ts @@ -8,8 +8,8 @@ type RawFunction = (params: U) => T // Helpers to run hooks const Hooks = { - wrapObject: (result: T, hookName: U) => { - return PluginManager.Instance.runHook(hookName, result) + wrapObject: (result: T, hookName: U, context?: any) => { + return PluginManager.Instance.runHook(hookName, result, context) }, wrapPromiseFun: async (fun: PromiseFunction, params: U, hookName: V) => { diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts index b76c0a8a4..4e799b3d4 100644 --- a/server/lib/plugins/plugin-helpers-builder.ts +++ b/server/lib/plugins/plugin-helpers-builder.ts @@ -220,6 +220,10 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) { function buildUserHelpers () { return { + loadById: (id: number) => { + return UserModel.loadByIdFull(id) + }, + getAuthUser: (res: express.Response) => { const user = res.locals.oauth?.token?.User if (!user) return undefined diff --git a/server/lib/plugins/plugin-manager.ts b/server/lib/plugins/plugin-manager.ts index c21ebd0c5..a706df1e0 100644 --- a/server/lib/plugins/plugin-manager.ts +++ b/server/lib/plugins/plugin-manager.ts @@ -215,8 +215,12 @@ export class PluginManager implements ServerHook { for (const hook of this.hooks[hookName]) { logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) - result = await internalRunHook(hook.handler, hookType, result, params, err => { - logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) + result = await internalRunHook({ + handler: hook.handler, + hookType, + result, + params, + onError: err => { logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) } }) } diff --git a/server/tests/fixtures/peertube-plugin-test-four/main.js b/server/tests/fixtures/peertube-plugin-test-four/main.js index bff42ff40..5194e3e02 100644 --- a/server/tests/fixtures/peertube-plugin-test-four/main.js +++ b/server/tests/fixtures/peertube-plugin-test-four/main.js @@ -88,6 +88,15 @@ async function register ({ return res.json({ routerRoute }) }) + router.get('/user/:id', async (req, res) => { + const user = await peertubeHelpers.user.loadById(req.params.id) + if (!user) return res.status(404).end() + + return res.json({ + username: user.username + }) + }) + router.get('/user', async (req, res) => { const user = await peertubeHelpers.user.getAuthUser(res) if (!user) return res.sendStatus(404) @@ -97,6 +106,7 @@ async function register ({ const isUser = user.role === 2 return res.json({ + id: user.id, username: user.username, displayName: user.Account.name, isAdmin, diff --git a/server/tests/fixtures/peertube-plugin-test/main.js b/server/tests/fixtures/peertube-plugin-test/main.js index 2f3809215..f62f6a435 100644 --- a/server/tests/fixtures/peertube-plugin-test/main.js +++ b/server/tests/fixtures/peertube-plugin-test/main.js @@ -253,6 +253,27 @@ async function register ({ registerHook, registerSetting, settingsManager, stora } }) + registerHook({ + target: 'filter:job-queue.process.params', + handler: (object, context) => { + peertubeHelpers.logger.debug('TOTO.', { object, context }) + + if (context.type !== 'video-studio-edition') return object + + object.data.tasks = [ + { + name: 'cut', + options: { + start: 0, + end: 1 + } + } + ] + + return object + } + }) + // Upload/import/live attributes for (const target of [ 'filter:api.video.upload.video-attribute.result', @@ -284,7 +305,10 @@ async function register ({ registerHook, registerSetting, settingsManager, stora 'filter:api.search.video-playlists.index.list.result', 'filter:api.overviews.videos.list.params', - 'filter:api.overviews.videos.list.result' + 'filter:api.overviews.videos.list.result', + + 'filter:job-queue.process.params', + 'filter:job-queue.process.result' ] for (const h of filterHooks) { diff --git a/server/tests/plugins/filter-hooks.ts b/server/tests/plugins/filter-hooks.ts index b1f83d201..27b72cf7d 100644 --- a/server/tests/plugins/filter-hooks.ts +++ b/server/tests/plugins/filter-hooks.ts @@ -632,6 +632,51 @@ describe('Test plugin filter hooks', function () { }) + describe('Job queue filters', function () { + let videoUUID: string + + before(async function () { + this.timeout(120_000) + + const { uuid } = await servers[0].videos.quickUpload({ name: 'studio' }) + + const video = await servers[0].videos.get({ id: uuid }) + expect(video.duration).at.least(2) + videoUUID = video.uuid + + await waitJobs(servers) + + await servers[0].config.enableStudio() + }) + + it('Should run filter:job-queue.process.params', async function () { + this.timeout(120_000) + + await servers[0].videoStudio.createEditionTasks({ + videoId: videoUUID, + tasks: [ + { + name: 'add-intro', + options: { + file: 'video_very_short_240p.mp4' + } + } + ] + }) + + await waitJobs(servers) + + await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.params', 1, false) + + const video = await servers[0].videos.get({ id: videoUUID }) + expect(video.duration).at.most(2) + }) + + it('Should run filter:job-queue.process.result', async function () { + await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.result', 1, false) + }) + }) + after(async function () { await cleanupTests(servers) }) diff --git a/server/tests/plugins/plugin-helpers.ts b/server/tests/plugins/plugin-helpers.ts index 5e8d08dff..49807f8f5 100644 --- a/server/tests/plugins/plugin-helpers.ts +++ b/server/tests/plugins/plugin-helpers.ts @@ -110,6 +110,7 @@ describe('Test plugin helpers', function () { }) describe('User', function () { + let rootId: number it('Should not get a user if not authenticated', async function () { await makeGetRequest({ @@ -132,6 +133,28 @@ describe('Test plugin helpers', function () { expect(res.body.isAdmin).to.be.true expect(res.body.isModerator).to.be.false expect(res.body.isUser).to.be.false + + rootId = res.body.id + }) + + it('Should load a user by id', async function () { + { + const res = await makeGetRequest({ + url: servers[0].url, + path: '/plugins/test-four/router/user/' + rootId, + expectedStatus: HttpStatusCode.OK_200 + }) + + expect(res.body.username).to.equal('root') + } + + { + await makeGetRequest({ + url: servers[0].url, + path: '/plugins/test-four/router/user/42', + expectedStatus: HttpStatusCode.NOT_FOUND_404 + }) + } }) }) diff --git a/server/types/plugins/register-server-option.model.ts b/server/types/plugins/register-server-option.model.ts index ff32e8199..fb4f12a4c 100644 --- a/server/types/plugins/register-server-option.model.ts +++ b/server/types/plugins/register-server-option.model.ts @@ -14,10 +14,9 @@ import { RegisterServerSettingOptions, ServerConfig, ThumbnailType, - UserRole, VideoBlacklistCreate } from '@shared/models' -import { MVideoThumbnail } from '../models' +import { MUserDefault, MVideoThumbnail } from '../models' import { RegisterServerAuthExternalOptions, RegisterServerAuthExternalResult, @@ -100,16 +99,10 @@ export type PeerTubeHelpers = { user: { // PeerTube >= 3.2 - getAuthUser: (response: Response) => Promise<{ - id?: string - username: string - email: string - blocked: boolean - role: UserRole - Account: { - name: string - } - } | undefined> + getAuthUser: (response: Response) => Promise + + // PeerTube >= 4.3 + loadById: (id: number) => Promise } } diff --git a/shared/core-utils/plugins/hooks.ts b/shared/core-utils/plugins/hooks.ts index 92cb5ad68..3784969b5 100644 --- a/shared/core-utils/plugins/hooks.ts +++ b/shared/core-utils/plugins/hooks.ts @@ -8,15 +8,24 @@ function getHookType (hookName: string) { return HookType.STATIC } -async function internalRunHook (handler: Function, hookType: HookType, result: T, params: any, onError: (err: Error) => void) { +async function internalRunHook (options: { + handler: Function + hookType: HookType + result: T + params: any + onError: (err: Error) => void +}) { + const { handler, hookType, result, params, onError } = options + try { if (hookType === HookType.FILTER) { const p = handler(result, params) - if (isPromise(p)) result = await p - else result = p + const newResult = isPromise(p) + ? await p + : p - return result + return newResult } // Action/static hooks do not have result value diff --git a/shared/models/plugins/server/server-hook.model.ts b/shared/models/plugins/server/server-hook.model.ts index e64c3bbbc..a8e31f576 100644 --- a/shared/models/plugins/server/server-hook.model.ts +++ b/shared/models/plugins/server/server-hook.model.ts @@ -90,7 +90,10 @@ export const serverFilterHookObject = { // Filter result to check if the embed is allowed for a particular request 'filter:html.embed.video.allowed.result': true, - 'filter:html.embed.video-playlist.allowed.result': true + 'filter:html.embed.video-playlist.allowed.result': true, + + 'filter:job-queue.process.params': true, + 'filter:job-queue.process.result': true } export type ServerFilterHookName = keyof typeof serverFilterHookObject -- 2.41.0