aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-02 15:29:00 +0200
committerChocobozzz <me@florianbigard.com>2022-08-02 15:50:05 +0200
commit22df69fdecf299c8be6acaa25f086249ea9a0085 (patch)
treee8c7e21c18fb42bb74b54f2eab1509c3d93a380d /server
parent7a9e420a02434e4f16c99e7d58da9075dff25d15 (diff)
downloadPeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.gz
PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.tar.zst
PeerTube-22df69fdecf299c8be6acaa25f086249ea9a0085.zip
Add job queue hooks
Diffstat (limited to 'server')
-rw-r--r--server/lib/job-queue/job-queue.ts8
-rw-r--r--server/lib/plugins/hooks.ts4
-rw-r--r--server/lib/plugins/plugin-helpers-builder.ts4
-rw-r--r--server/lib/plugins/plugin-manager.ts8
-rw-r--r--server/tests/fixtures/peertube-plugin-test-four/main.js10
-rw-r--r--server/tests/fixtures/peertube-plugin-test/main.js26
-rw-r--r--server/tests/plugins/filter-hooks.ts45
-rw-r--r--server/tests/plugins/plugin-helpers.ts23
-rw-r--r--server/types/plugins/register-server-option.model.ts17
9 files changed, 126 insertions, 19 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index e55d2e7c2..0ae325f4d 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -24,6 +24,7 @@ import {
24} from '../../../shared/models' 24} from '../../../shared/models'
25import { logger } from '../../helpers/logger' 25import { logger } from '../../helpers/logger'
26import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 26import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
27import { Hooks } from '../plugins/hooks'
27import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 28import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
28import { processActivityPubFollow } from './handlers/activitypub-follow' 29import { processActivityPubFollow } from './handlers/activitypub-follow'
29import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 30import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
@@ -157,8 +158,11 @@ class JobQueue {
157 158
158 const handler = handlers[handlerName] 159 const handler = handlers[handlerName]
159 160
160 queue.process(this.getJobConcurrency(handlerName), handler) 161 queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => {
161 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 162 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
163
164 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
165 }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
162 166
163 queue.on('failed', (job, err) => { 167 queue.on('failed', (job, err) => {
164 const logLevel = silentFailure.has(handlerName) 168 const logLevel = silentFailure.has(handlerName)
diff --git a/server/lib/plugins/hooks.ts b/server/lib/plugins/hooks.ts
index 327aaece2..694527c12 100644
--- a/server/lib/plugins/hooks.ts
+++ b/server/lib/plugins/hooks.ts
@@ -8,8 +8,8 @@ type RawFunction <U, T> = (params: U) => T
8 8
9// Helpers to run hooks 9// Helpers to run hooks
10const Hooks = { 10const Hooks = {
11 wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U) => { 11 wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U, context?: any) => {
12 return PluginManager.Instance.runHook(hookName, result) 12 return PluginManager.Instance.runHook(hookName, result, context)
13 }, 13 },
14 14
15 wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { 15 wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => {
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts
index b76c0a8a4..4e799b3d4 100644
--- a/server/lib/plugins/plugin-helpers-builder.ts
+++ b/server/lib/plugins/plugin-helpers-builder.ts
@@ -220,6 +220,10 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) {
220 220
221function buildUserHelpers () { 221function buildUserHelpers () {
222 return { 222 return {
223 loadById: (id: number) => {
224 return UserModel.loadByIdFull(id)
225 },
226
223 getAuthUser: (res: express.Response) => { 227 getAuthUser: (res: express.Response) => {
224 const user = res.locals.oauth?.token?.User 228 const user = res.locals.oauth?.token?.User
225 if (!user) return undefined 229 if (!user) return undefined
diff --git a/server/lib/plugins/plugin-manager.ts b/server/lib/plugins/plugin-manager.ts
index c21ebd0c5..a706df1e0 100644
--- a/server/lib/plugins/plugin-manager.ts
+++ b/server/lib/plugins/plugin-manager.ts
@@ -215,8 +215,12 @@ export class PluginManager implements ServerHook {
215 for (const hook of this.hooks[hookName]) { 215 for (const hook of this.hooks[hookName]) {
216 logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) 216 logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName)
217 217
218 result = await internalRunHook(hook.handler, hookType, result, params, err => { 218 result = await internalRunHook({
219 logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) 219 handler: hook.handler,
220 hookType,
221 result,
222 params,
223 onError: err => { logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) }
220 }) 224 })
221 } 225 }
222 226
diff --git a/server/tests/fixtures/peertube-plugin-test-four/main.js b/server/tests/fixtures/peertube-plugin-test-four/main.js
index bff42ff40..5194e3e02 100644
--- a/server/tests/fixtures/peertube-plugin-test-four/main.js
+++ b/server/tests/fixtures/peertube-plugin-test-four/main.js
@@ -88,6 +88,15 @@ async function register ({
88 return res.json({ routerRoute }) 88 return res.json({ routerRoute })
89 }) 89 })
90 90
91 router.get('/user/:id', async (req, res) => {
92 const user = await peertubeHelpers.user.loadById(req.params.id)
93 if (!user) return res.status(404).end()
94
95 return res.json({
96 username: user.username
97 })
98 })
99
91 router.get('/user', async (req, res) => { 100 router.get('/user', async (req, res) => {
92 const user = await peertubeHelpers.user.getAuthUser(res) 101 const user = await peertubeHelpers.user.getAuthUser(res)
93 if (!user) return res.sendStatus(404) 102 if (!user) return res.sendStatus(404)
@@ -97,6 +106,7 @@ async function register ({
97 const isUser = user.role === 2 106 const isUser = user.role === 2
98 107
99 return res.json({ 108 return res.json({
109 id: user.id,
100 username: user.username, 110 username: user.username,
101 displayName: user.Account.name, 111 displayName: user.Account.name,
102 isAdmin, 112 isAdmin,
diff --git a/server/tests/fixtures/peertube-plugin-test/main.js b/server/tests/fixtures/peertube-plugin-test/main.js
index 2f3809215..f62f6a435 100644
--- a/server/tests/fixtures/peertube-plugin-test/main.js
+++ b/server/tests/fixtures/peertube-plugin-test/main.js
@@ -253,6 +253,27 @@ async function register ({ registerHook, registerSetting, settingsManager, stora
253 } 253 }
254 }) 254 })
255 255
256 registerHook({
257 target: 'filter:job-queue.process.params',
258 handler: (object, context) => {
259 peertubeHelpers.logger.debug('TOTO.', { object, context })
260
261 if (context.type !== 'video-studio-edition') return object
262
263 object.data.tasks = [
264 {
265 name: 'cut',
266 options: {
267 start: 0,
268 end: 1
269 }
270 }
271 ]
272
273 return object
274 }
275 })
276
256 // Upload/import/live attributes 277 // Upload/import/live attributes
257 for (const target of [ 278 for (const target of [
258 'filter:api.video.upload.video-attribute.result', 279 'filter:api.video.upload.video-attribute.result',
@@ -284,7 +305,10 @@ async function register ({ registerHook, registerSetting, settingsManager, stora
284 'filter:api.search.video-playlists.index.list.result', 305 'filter:api.search.video-playlists.index.list.result',
285 306
286 'filter:api.overviews.videos.list.params', 307 'filter:api.overviews.videos.list.params',
287 'filter:api.overviews.videos.list.result' 308 'filter:api.overviews.videos.list.result',
309
310 'filter:job-queue.process.params',
311 'filter:job-queue.process.result'
288 ] 312 ]
289 313
290 for (const h of filterHooks) { 314 for (const h of filterHooks) {
diff --git a/server/tests/plugins/filter-hooks.ts b/server/tests/plugins/filter-hooks.ts
index b1f83d201..27b72cf7d 100644
--- a/server/tests/plugins/filter-hooks.ts
+++ b/server/tests/plugins/filter-hooks.ts
@@ -632,6 +632,51 @@ describe('Test plugin filter hooks', function () {
632 632
633 }) 633 })
634 634
635 describe('Job queue filters', function () {
636 let videoUUID: string
637
638 before(async function () {
639 this.timeout(120_000)
640
641 const { uuid } = await servers[0].videos.quickUpload({ name: 'studio' })
642
643 const video = await servers[0].videos.get({ id: uuid })
644 expect(video.duration).at.least(2)
645 videoUUID = video.uuid
646
647 await waitJobs(servers)
648
649 await servers[0].config.enableStudio()
650 })
651
652 it('Should run filter:job-queue.process.params', async function () {
653 this.timeout(120_000)
654
655 await servers[0].videoStudio.createEditionTasks({
656 videoId: videoUUID,
657 tasks: [
658 {
659 name: 'add-intro',
660 options: {
661 file: 'video_very_short_240p.mp4'
662 }
663 }
664 ]
665 })
666
667 await waitJobs(servers)
668
669 await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.params', 1, false)
670
671 const video = await servers[0].videos.get({ id: videoUUID })
672 expect(video.duration).at.most(2)
673 })
674
675 it('Should run filter:job-queue.process.result', async function () {
676 await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.result', 1, false)
677 })
678 })
679
635 after(async function () { 680 after(async function () {
636 await cleanupTests(servers) 681 await cleanupTests(servers)
637 }) 682 })
diff --git a/server/tests/plugins/plugin-helpers.ts b/server/tests/plugins/plugin-helpers.ts
index 5e8d08dff..49807f8f5 100644
--- a/server/tests/plugins/plugin-helpers.ts
+++ b/server/tests/plugins/plugin-helpers.ts
@@ -110,6 +110,7 @@ describe('Test plugin helpers', function () {
110 }) 110 })
111 111
112 describe('User', function () { 112 describe('User', function () {
113 let rootId: number
113 114
114 it('Should not get a user if not authenticated', async function () { 115 it('Should not get a user if not authenticated', async function () {
115 await makeGetRequest({ 116 await makeGetRequest({
@@ -132,6 +133,28 @@ describe('Test plugin helpers', function () {
132 expect(res.body.isAdmin).to.be.true 133 expect(res.body.isAdmin).to.be.true
133 expect(res.body.isModerator).to.be.false 134 expect(res.body.isModerator).to.be.false
134 expect(res.body.isUser).to.be.false 135 expect(res.body.isUser).to.be.false
136
137 rootId = res.body.id
138 })
139
140 it('Should load a user by id', async function () {
141 {
142 const res = await makeGetRequest({
143 url: servers[0].url,
144 path: '/plugins/test-four/router/user/' + rootId,
145 expectedStatus: HttpStatusCode.OK_200
146 })
147
148 expect(res.body.username).to.equal('root')
149 }
150
151 {
152 await makeGetRequest({
153 url: servers[0].url,
154 path: '/plugins/test-four/router/user/42',
155 expectedStatus: HttpStatusCode.NOT_FOUND_404
156 })
157 }
135 }) 158 })
136 }) 159 })
137 160
diff --git a/server/types/plugins/register-server-option.model.ts b/server/types/plugins/register-server-option.model.ts
index ff32e8199..fb4f12a4c 100644
--- a/server/types/plugins/register-server-option.model.ts
+++ b/server/types/plugins/register-server-option.model.ts
@@ -14,10 +14,9 @@ import {
14 RegisterServerSettingOptions, 14 RegisterServerSettingOptions,
15 ServerConfig, 15 ServerConfig,
16 ThumbnailType, 16 ThumbnailType,
17 UserRole,
18 VideoBlacklistCreate 17 VideoBlacklistCreate
19} from '@shared/models' 18} from '@shared/models'
20import { MVideoThumbnail } from '../models' 19import { MUserDefault, MVideoThumbnail } from '../models'
21import { 20import {
22 RegisterServerAuthExternalOptions, 21 RegisterServerAuthExternalOptions,
23 RegisterServerAuthExternalResult, 22 RegisterServerAuthExternalResult,
@@ -100,16 +99,10 @@ export type PeerTubeHelpers = {
100 99
101 user: { 100 user: {
102 // PeerTube >= 3.2 101 // PeerTube >= 3.2
103 getAuthUser: (response: Response) => Promise<{ 102 getAuthUser: (response: Response) => Promise<MUserDefault>
104 id?: string 103
105 username: string 104 // PeerTube >= 4.3
106 email: string 105 loadById: (id: number) => Promise<MUserDefault>
107 blocked: boolean
108 role: UserRole
109 Account: {
110 name: string
111 }
112 } | undefined>
113 } 106 }
114} 107}
115 108