diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-02 15:29:00 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-02 15:50:05 +0200 |
commit | 22df69fdecf299c8be6acaa25f086249ea9a0085 (patch) | |
tree | e8c7e21c18fb42bb74b54f2eab1509c3d93a380d | |
parent | 7a9e420a02434e4f16c99e7d58da9075dff25d15 (diff) | |
download | PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.gz PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.zst PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.zip |
Add job queue hooks
-rw-r--r-- | client/src/root-helpers/plugins-manager.ts | 10 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 8 | ||||
-rw-r--r-- | server/lib/plugins/hooks.ts | 4 | ||||
-rw-r--r-- | server/lib/plugins/plugin-helpers-builder.ts | 4 | ||||
-rw-r--r-- | server/lib/plugins/plugin-manager.ts | 8 | ||||
-rw-r--r-- | server/tests/fixtures/peertube-plugin-test-four/main.js | 10 | ||||
-rw-r--r-- | server/tests/fixtures/peertube-plugin-test/main.js | 26 | ||||
-rw-r--r-- | server/tests/plugins/filter-hooks.ts | 45 | ||||
-rw-r--r-- | server/tests/plugins/plugin-helpers.ts | 23 | ||||
-rw-r--r-- | server/types/plugins/register-server-option.model.ts | 17 | ||||
-rw-r--r-- | shared/core-utils/plugins/hooks.ts | 17 | ||||
-rw-r--r-- | shared/models/plugins/server/server-hook.model.ts | 5 |
12 files changed, 151 insertions, 26 deletions
diff --git a/client/src/root-helpers/plugins-manager.ts b/client/src/root-helpers/plugins-manager.ts index 37a52be72..300f174b4 100644 --- a/client/src/root-helpers/plugins-manager.ts +++ b/client/src/root-helpers/plugins-manager.ts | |||
@@ -112,8 +112,14 @@ class PluginsManager { | |||
112 | for (const hook of this.hooks[hookName]) { | 112 | for (const hook of this.hooks[hookName]) { |
113 | logger.info(`Running hook ${hookName} of plugin ${hook.plugin.name}`) | 113 | logger.info(`Running hook ${hookName} of plugin ${hook.plugin.name}`) |
114 | 114 | ||
115 | result = await internalRunHook(hook.handler, hookType, result, params, err => { | 115 | result = await internalRunHook({ |
116 | logger.error(`Cannot run hook ${hookName} of script ${hook.clientScript.script} of plugin ${hook.plugin.name}`, err) | 116 | handler: hook.handler, |
117 | hookType, | ||
118 | result, | ||
119 | params, | ||
120 | onError: err => { | ||
121 | logger.error(`Cannot run hook ${hookName} of script ${hook.clientScript.script} of plugin ${hook.plugin.name}`, err) | ||
122 | } | ||
117 | }) | 123 | }) |
118 | } | 124 | } |
119 | 125 | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e55d2e7c2..0ae325f4d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -24,6 +24,7 @@ import { | |||
24 | } from '../../../shared/models' | 24 | } from '../../../shared/models' |
25 | import { logger } from '../../helpers/logger' | 25 | import { logger } from '../../helpers/logger' |
26 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 26 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
27 | import { Hooks } from '../plugins/hooks' | ||
27 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | 28 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
28 | import { processActivityPubFollow } from './handlers/activitypub-follow' | 29 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
29 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 30 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
@@ -157,8 +158,11 @@ class JobQueue { | |||
157 | 158 | ||
158 | const handler = handlers[handlerName] | 159 | const handler = handlers[handlerName] |
159 | 160 | ||
160 | queue.process(this.getJobConcurrency(handlerName), handler) | 161 | queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { |
161 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 162 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) |
163 | |||
164 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | ||
165 | }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | ||
162 | 166 | ||
163 | queue.on('failed', (job, err) => { | 167 | queue.on('failed', (job, err) => { |
164 | const logLevel = silentFailure.has(handlerName) | 168 | const logLevel = silentFailure.has(handlerName) |
diff --git a/server/lib/plugins/hooks.ts b/server/lib/plugins/hooks.ts index 327aaece2..694527c12 100644 --- a/server/lib/plugins/hooks.ts +++ b/server/lib/plugins/hooks.ts | |||
@@ -8,8 +8,8 @@ type RawFunction <U, T> = (params: U) => T | |||
8 | 8 | ||
9 | // Helpers to run hooks | 9 | // Helpers to run hooks |
10 | const Hooks = { | 10 | const Hooks = { |
11 | wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U) => { | 11 | wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U, context?: any) => { |
12 | return PluginManager.Instance.runHook(hookName, result) | 12 | return PluginManager.Instance.runHook(hookName, result, context) |
13 | }, | 13 | }, |
14 | 14 | ||
15 | wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { | 15 | wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { |
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts index b76c0a8a4..4e799b3d4 100644 --- a/server/lib/plugins/plugin-helpers-builder.ts +++ b/server/lib/plugins/plugin-helpers-builder.ts | |||
@@ -220,6 +220,10 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) { | |||
220 | 220 | ||
221 | function buildUserHelpers () { | 221 | function buildUserHelpers () { |
222 | return { | 222 | return { |
223 | loadById: (id: number) => { | ||
224 | return UserModel.loadByIdFull(id) | ||
225 | }, | ||
226 | |||
223 | getAuthUser: (res: express.Response) => { | 227 | getAuthUser: (res: express.Response) => { |
224 | const user = res.locals.oauth?.token?.User | 228 | const user = res.locals.oauth?.token?.User |
225 | if (!user) return undefined | 229 | if (!user) return undefined |
diff --git a/server/lib/plugins/plugin-manager.ts b/server/lib/plugins/plugin-manager.ts index c21ebd0c5..a706df1e0 100644 --- a/server/lib/plugins/plugin-manager.ts +++ b/server/lib/plugins/plugin-manager.ts | |||
@@ -215,8 +215,12 @@ export class PluginManager implements ServerHook { | |||
215 | for (const hook of this.hooks[hookName]) { | 215 | for (const hook of this.hooks[hookName]) { |
216 | logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) | 216 | logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) |
217 | 217 | ||
218 | result = await internalRunHook(hook.handler, hookType, result, params, err => { | 218 | result = await internalRunHook({ |
219 | logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) | 219 | handler: hook.handler, |
220 | hookType, | ||
221 | result, | ||
222 | params, | ||
223 | onError: err => { logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) } | ||
220 | }) | 224 | }) |
221 | } | 225 | } |
222 | 226 | ||
diff --git a/server/tests/fixtures/peertube-plugin-test-four/main.js b/server/tests/fixtures/peertube-plugin-test-four/main.js index bff42ff40..5194e3e02 100644 --- a/server/tests/fixtures/peertube-plugin-test-four/main.js +++ b/server/tests/fixtures/peertube-plugin-test-four/main.js | |||
@@ -88,6 +88,15 @@ async function register ({ | |||
88 | return res.json({ routerRoute }) | 88 | return res.json({ routerRoute }) |
89 | }) | 89 | }) |
90 | 90 | ||
91 | router.get('/user/:id', async (req, res) => { | ||
92 | const user = await peertubeHelpers.user.loadById(req.params.id) | ||
93 | if (!user) return res.status(404).end() | ||
94 | |||
95 | return res.json({ | ||
96 | username: user.username | ||
97 | }) | ||
98 | }) | ||
99 | |||
91 | router.get('/user', async (req, res) => { | 100 | router.get('/user', async (req, res) => { |
92 | const user = await peertubeHelpers.user.getAuthUser(res) | 101 | const user = await peertubeHelpers.user.getAuthUser(res) |
93 | if (!user) return res.sendStatus(404) | 102 | if (!user) return res.sendStatus(404) |
@@ -97,6 +106,7 @@ async function register ({ | |||
97 | const isUser = user.role === 2 | 106 | const isUser = user.role === 2 |
98 | 107 | ||
99 | return res.json({ | 108 | return res.json({ |
109 | id: user.id, | ||
100 | username: user.username, | 110 | username: user.username, |
101 | displayName: user.Account.name, | 111 | displayName: user.Account.name, |
102 | isAdmin, | 112 | isAdmin, |
diff --git a/server/tests/fixtures/peertube-plugin-test/main.js b/server/tests/fixtures/peertube-plugin-test/main.js index 2f3809215..f62f6a435 100644 --- a/server/tests/fixtures/peertube-plugin-test/main.js +++ b/server/tests/fixtures/peertube-plugin-test/main.js | |||
@@ -253,6 +253,27 @@ async function register ({ registerHook, registerSetting, settingsManager, stora | |||
253 | } | 253 | } |
254 | }) | 254 | }) |
255 | 255 | ||
256 | registerHook({ | ||
257 | target: 'filter:job-queue.process.params', | ||
258 | handler: (object, context) => { | ||
259 | peertubeHelpers.logger.debug('TOTO.', { object, context }) | ||
260 | |||
261 | if (context.type !== 'video-studio-edition') return object | ||
262 | |||
263 | object.data.tasks = [ | ||
264 | { | ||
265 | name: 'cut', | ||
266 | options: { | ||
267 | start: 0, | ||
268 | end: 1 | ||
269 | } | ||
270 | } | ||
271 | ] | ||
272 | |||
273 | return object | ||
274 | } | ||
275 | }) | ||
276 | |||
256 | // Upload/import/live attributes | 277 | // Upload/import/live attributes |
257 | for (const target of [ | 278 | for (const target of [ |
258 | 'filter:api.video.upload.video-attribute.result', | 279 | 'filter:api.video.upload.video-attribute.result', |
@@ -284,7 +305,10 @@ async function register ({ registerHook, registerSetting, settingsManager, stora | |||
284 | 'filter:api.search.video-playlists.index.list.result', | 305 | 'filter:api.search.video-playlists.index.list.result', |
285 | 306 | ||
286 | 'filter:api.overviews.videos.list.params', | 307 | 'filter:api.overviews.videos.list.params', |
287 | 'filter:api.overviews.videos.list.result' | 308 | 'filter:api.overviews.videos.list.result', |
309 | |||
310 | 'filter:job-queue.process.params', | ||
311 | 'filter:job-queue.process.result' | ||
288 | ] | 312 | ] |
289 | 313 | ||
290 | for (const h of filterHooks) { | 314 | for (const h of filterHooks) { |
diff --git a/server/tests/plugins/filter-hooks.ts b/server/tests/plugins/filter-hooks.ts index b1f83d201..27b72cf7d 100644 --- a/server/tests/plugins/filter-hooks.ts +++ b/server/tests/plugins/filter-hooks.ts | |||
@@ -632,6 +632,51 @@ describe('Test plugin filter hooks', function () { | |||
632 | 632 | ||
633 | }) | 633 | }) |
634 | 634 | ||
635 | describe('Job queue filters', function () { | ||
636 | let videoUUID: string | ||
637 | |||
638 | before(async function () { | ||
639 | this.timeout(120_000) | ||
640 | |||
641 | const { uuid } = await servers[0].videos.quickUpload({ name: 'studio' }) | ||
642 | |||
643 | const video = await servers[0].videos.get({ id: uuid }) | ||
644 | expect(video.duration).at.least(2) | ||
645 | videoUUID = video.uuid | ||
646 | |||
647 | await waitJobs(servers) | ||
648 | |||
649 | await servers[0].config.enableStudio() | ||
650 | }) | ||
651 | |||
652 | it('Should run filter:job-queue.process.params', async function () { | ||
653 | this.timeout(120_000) | ||
654 | |||
655 | await servers[0].videoStudio.createEditionTasks({ | ||
656 | videoId: videoUUID, | ||
657 | tasks: [ | ||
658 | { | ||
659 | name: 'add-intro', | ||
660 | options: { | ||
661 | file: 'video_very_short_240p.mp4' | ||
662 | } | ||
663 | } | ||
664 | ] | ||
665 | }) | ||
666 | |||
667 | await waitJobs(servers) | ||
668 | |||
669 | await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.params', 1, false) | ||
670 | |||
671 | const video = await servers[0].videos.get({ id: videoUUID }) | ||
672 | expect(video.duration).at.most(2) | ||
673 | }) | ||
674 | |||
675 | it('Should run filter:job-queue.process.result', async function () { | ||
676 | await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.result', 1, false) | ||
677 | }) | ||
678 | }) | ||
679 | |||
635 | after(async function () { | 680 | after(async function () { |
636 | await cleanupTests(servers) | 681 | await cleanupTests(servers) |
637 | }) | 682 | }) |
diff --git a/server/tests/plugins/plugin-helpers.ts b/server/tests/plugins/plugin-helpers.ts index 5e8d08dff..49807f8f5 100644 --- a/server/tests/plugins/plugin-helpers.ts +++ b/server/tests/plugins/plugin-helpers.ts | |||
@@ -110,6 +110,7 @@ describe('Test plugin helpers', function () { | |||
110 | }) | 110 | }) |
111 | 111 | ||
112 | describe('User', function () { | 112 | describe('User', function () { |
113 | let rootId: number | ||
113 | 114 | ||
114 | it('Should not get a user if not authenticated', async function () { | 115 | it('Should not get a user if not authenticated', async function () { |
115 | await makeGetRequest({ | 116 | await makeGetRequest({ |
@@ -132,6 +133,28 @@ describe('Test plugin helpers', function () { | |||
132 | expect(res.body.isAdmin).to.be.true | 133 | expect(res.body.isAdmin).to.be.true |
133 | expect(res.body.isModerator).to.be.false | 134 | expect(res.body.isModerator).to.be.false |
134 | expect(res.body.isUser).to.be.false | 135 | expect(res.body.isUser).to.be.false |
136 | |||
137 | rootId = res.body.id | ||
138 | }) | ||
139 | |||
140 | it('Should load a user by id', async function () { | ||
141 | { | ||
142 | const res = await makeGetRequest({ | ||
143 | url: servers[0].url, | ||
144 | path: '/plugins/test-four/router/user/' + rootId, | ||
145 | expectedStatus: HttpStatusCode.OK_200 | ||
146 | }) | ||
147 | |||
148 | expect(res.body.username).to.equal('root') | ||
149 | } | ||
150 | |||
151 | { | ||
152 | await makeGetRequest({ | ||
153 | url: servers[0].url, | ||
154 | path: '/plugins/test-four/router/user/42', | ||
155 | expectedStatus: HttpStatusCode.NOT_FOUND_404 | ||
156 | }) | ||
157 | } | ||
135 | }) | 158 | }) |
136 | }) | 159 | }) |
137 | 160 | ||
diff --git a/server/types/plugins/register-server-option.model.ts b/server/types/plugins/register-server-option.model.ts index ff32e8199..fb4f12a4c 100644 --- a/server/types/plugins/register-server-option.model.ts +++ b/server/types/plugins/register-server-option.model.ts | |||
@@ -14,10 +14,9 @@ import { | |||
14 | RegisterServerSettingOptions, | 14 | RegisterServerSettingOptions, |
15 | ServerConfig, | 15 | ServerConfig, |
16 | ThumbnailType, | 16 | ThumbnailType, |
17 | UserRole, | ||
18 | VideoBlacklistCreate | 17 | VideoBlacklistCreate |
19 | } from '@shared/models' | 18 | } from '@shared/models' |
20 | import { MVideoThumbnail } from '../models' | 19 | import { MUserDefault, MVideoThumbnail } from '../models' |
21 | import { | 20 | import { |
22 | RegisterServerAuthExternalOptions, | 21 | RegisterServerAuthExternalOptions, |
23 | RegisterServerAuthExternalResult, | 22 | RegisterServerAuthExternalResult, |
@@ -100,16 +99,10 @@ export type PeerTubeHelpers = { | |||
100 | 99 | ||
101 | user: { | 100 | user: { |
102 | // PeerTube >= 3.2 | 101 | // PeerTube >= 3.2 |
103 | getAuthUser: (response: Response) => Promise<{ | 102 | getAuthUser: (response: Response) => Promise<MUserDefault> |
104 | id?: string | 103 | |
105 | username: string | 104 | // PeerTube >= 4.3 |
106 | email: string | 105 | loadById: (id: number) => Promise<MUserDefault> |
107 | blocked: boolean | ||
108 | role: UserRole | ||
109 | Account: { | ||
110 | name: string | ||
111 | } | ||
112 | } | undefined> | ||
113 | } | 106 | } |
114 | } | 107 | } |
115 | 108 | ||
diff --git a/shared/core-utils/plugins/hooks.ts b/shared/core-utils/plugins/hooks.ts index 92cb5ad68..3784969b5 100644 --- a/shared/core-utils/plugins/hooks.ts +++ b/shared/core-utils/plugins/hooks.ts | |||
@@ -8,15 +8,24 @@ function getHookType (hookName: string) { | |||
8 | return HookType.STATIC | 8 | return HookType.STATIC |
9 | } | 9 | } |
10 | 10 | ||
11 | async function internalRunHook <T> (handler: Function, hookType: HookType, result: T, params: any, onError: (err: Error) => void) { | 11 | async function internalRunHook <T> (options: { |
12 | handler: Function | ||
13 | hookType: HookType | ||
14 | result: T | ||
15 | params: any | ||
16 | onError: (err: Error) => void | ||
17 | }) { | ||
18 | const { handler, hookType, result, params, onError } = options | ||
19 | |||
12 | try { | 20 | try { |
13 | if (hookType === HookType.FILTER) { | 21 | if (hookType === HookType.FILTER) { |
14 | const p = handler(result, params) | 22 | const p = handler(result, params) |
15 | 23 | ||
16 | if (isPromise(p)) result = await p | 24 | const newResult = isPromise(p) |
17 | else result = p | 25 | ? await p |
26 | : p | ||
18 | 27 | ||
19 | return result | 28 | return newResult |
20 | } | 29 | } |
21 | 30 | ||
22 | // Action/static hooks do not have result value | 31 | // Action/static hooks do not have result value |
diff --git a/shared/models/plugins/server/server-hook.model.ts b/shared/models/plugins/server/server-hook.model.ts index e64c3bbbc..a8e31f576 100644 --- a/shared/models/plugins/server/server-hook.model.ts +++ b/shared/models/plugins/server/server-hook.model.ts | |||
@@ -90,7 +90,10 @@ export const serverFilterHookObject = { | |||
90 | 90 | ||
91 | // Filter result to check if the embed is allowed for a particular request | 91 | // Filter result to check if the embed is allowed for a particular request |
92 | 'filter:html.embed.video.allowed.result': true, | 92 | 'filter:html.embed.video.allowed.result': true, |
93 | 'filter:html.embed.video-playlist.allowed.result': true | 93 | 'filter:html.embed.video-playlist.allowed.result': true, |
94 | |||
95 | 'filter:job-queue.process.params': true, | ||
96 | 'filter:job-queue.process.result': true | ||
94 | } | 97 | } |
95 | 98 | ||
96 | export type ServerFilterHookName = keyof typeof serverFilterHookObject | 99 | export type ServerFilterHookName = keyof typeof serverFilterHookObject |