aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-08 15:48:17 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:18:07 +0200
commitbd911b54b555b11df7e9849cf92d358bccfecf6e (patch)
tree23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server/lib/job-queue/job-queue.ts
parent5a921e7b74910414626bfc9672b857e987e3ebed (diff)
downloadPeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip
Use bullmq job dependency
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts81
1 files changed, 66 insertions, 15 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0cf5d53ce..50d732beb 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,6 @@
1import { 1import {
2 FlowJob,
3 FlowProducer,
2 Job, 4 Job,
3 JobsOptions, 5 JobsOptions,
4 Queue, 6 Queue,
@@ -13,7 +15,7 @@ import {
13import { jobStates } from '@server/helpers/custom-validators/jobs' 15import { jobStates } from '@server/helpers/custom-validators/jobs'
14import { CONFIG } from '@server/initializers/config' 16import { CONFIG } from '@server/initializers/config'
15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils' 18import { pick, timeoutPromise } from '@shared/core-utils'
17import { 19import {
18 ActivitypubFollowPayload, 20 ActivitypubFollowPayload,
19 ActivitypubHttpBroadcastPayload, 21 ActivitypubHttpBroadcastPayload,
@@ -22,10 +24,12 @@ import {
22 ActorKeysPayload, 24 ActorKeysPayload,
23 DeleteResumableUploadMetaFilePayload, 25 DeleteResumableUploadMetaFilePayload,
24 EmailPayload, 26 EmailPayload,
27 FederateVideoPayload,
25 JobState, 28 JobState,
26 JobType, 29 JobType,
27 ManageVideoTorrentPayload, 30 ManageVideoTorrentPayload,
28 MoveObjectStoragePayload, 31 MoveObjectStoragePayload,
32 NotifyPayload,
29 RefreshPayload, 33 RefreshPayload,
30 VideoFileImportPayload, 34 VideoFileImportPayload,
31 VideoImportPayload, 35 VideoImportPayload,
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
45import { refreshAPObject } from './handlers/activitypub-refresher' 49import { refreshAPObject } from './handlers/activitypub-refresher'
46import { processActorKeys } from './handlers/actor-keys' 50import { processActorKeys } from './handlers/actor-keys'
47import { processEmail } from './handlers/email' 51import { processEmail } from './handlers/email'
52import { processFederateVideo } from './handlers/federate-video'
48import { processManageVideoTorrent } from './handlers/manage-video-torrent' 53import { processManageVideoTorrent } from './handlers/manage-video-torrent'
49import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' 54import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
55import { processNotify } from './handlers/notify'
50import { processVideoFileImport } from './handlers/video-file-import' 56import { processVideoFileImport } from './handlers/video-file-import'
51import { processVideoImport } from './handlers/video-import' 57import { processVideoImport } from './handlers/video-import'
52import { processVideoLiveEnding } from './handlers/video-live-ending' 58import { processVideoLiveEnding } from './handlers/video-live-ending'
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
54import { processVideoTranscoding } from './handlers/video-transcoding' 60import { processVideoTranscoding } from './handlers/video-transcoding'
55import { processVideosViewsStats } from './handlers/video-views-stats' 61import { processVideosViewsStats } from './handlers/video-views-stats'
56 62
57type CreateJobArgument = 63export type CreateJobArgument =
58 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 64 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
59 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | 65 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
60 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 66 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
@@ -73,7 +79,9 @@ type CreateJobArgument =
73 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | 79 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
74 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | 80 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
75 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | 81 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
76 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } 82 { type: 'notify', payload: NotifyPayload } |
83 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
84 { type: 'federate-video', payload: FederateVideoPayload }
77 85
78export type CreateJobOptions = { 86export type CreateJobOptions = {
79 delay?: number 87 delay?: number
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
98 'video-redundancy': processVideoRedundancy, 106 'video-redundancy': processVideoRedundancy,
99 'move-to-object-storage': processMoveToObjectStorage, 107 'move-to-object-storage': processMoveToObjectStorage,
100 'manage-video-torrent': processManageVideoTorrent, 108 'manage-video-torrent': processManageVideoTorrent,
101 'video-studio-edition': processVideoStudioEdition 109 'notify': processNotify,
110 'video-studio-edition': processVideoStudioEdition,
111 'federate-video': processFederateVideo
102} 112}
103 113
104const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { 114const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
123 'video-live-ending', 133 'video-live-ending',
124 'move-to-object-storage', 134 'move-to-object-storage',
125 'manage-video-torrent', 135 'manage-video-torrent',
126 'video-studio-edition' 136 'video-studio-edition',
137 'notify',
138 'federate-video'
127] 139]
128 140
129const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) 141const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
@@ -137,6 +149,8 @@ class JobQueue {
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} 149 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {} 150 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139 151
152 private flowProducer: FlowProducer
153
140 private initialized = false 154 private initialized = false
141 private jobRedisPrefix: string 155 private jobRedisPrefix: string
142 156
@@ -157,6 +171,11 @@ class JobQueue {
157 this.buildQueueEvent(handlerName, produceOnly) 171 this.buildQueueEvent(handlerName, produceOnly)
158 } 172 }
159 173
174 this.flowProducer = new FlowProducer({
175 connection: this.getRedisConnection(),
176 prefix: this.jobRedisPrefix
177 })
178
160 this.addRepeatableJobs() 179 this.addRepeatableJobs()
161 } 180 }
162 181
@@ -243,6 +262,8 @@ class JobQueue {
243 } 262 }
244 } 263 }
245 264
265 // ---------------------------------------------------------------------------
266
246 async terminate () { 267 async terminate () {
247 const promises = Object.keys(this.workers) 268 const promises = Object.keys(this.workers)
248 .map(handlerName => { 269 .map(handlerName => {
@@ -278,28 +299,56 @@ class JobQueue {
278 } 299 }
279 } 300 }
280 301
281 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { 302 // ---------------------------------------------------------------------------
282 this.createJobWithPromise(obj, options) 303
283 .catch(err => logger.error('Cannot create job.', { err, obj })) 304 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
305 this.createJob(options)
306 .catch(err => logger.error('Cannot create job.', { err, options }))
284 } 307 }
285 308
286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 309 async createJob (options: CreateJobArgument & CreateJobOptions) {
287 const queue: Queue = this.queues[obj.type] 310 const queue: Queue = this.queues[options.type]
288 if (queue === undefined) { 311 if (queue === undefined) {
289 logger.error('Unknown queue %s: cannot create job.', obj.type) 312 logger.error('Unknown queue %s: cannot create job.', options.type)
290 return 313 return
291 } 314 }
292 315
293 const jobArgs: JobsOptions = { 316 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
317
318 return queue.add('job', options.payload, jobOptions)
319 }
320
321 async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
322 let lastJob: FlowJob
323
324 for (const job of jobs) {
325 if (!job) continue
326
327 lastJob = {
328 name: 'job',
329 data: job.payload,
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob
333 ? [ lastJob ]
334 : []
335 }
336 }
337
338 return this.flowProducer.add(lastJob)
339 }
340
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return {
294 backoff: { delay: 60 * 1000, type: 'exponential' }, 343 backoff: { delay: 60 * 1000, type: 'exponential' },
295 attempts: JOB_ATTEMPTS[obj.type], 344 attempts: JOB_ATTEMPTS[type],
296 priority: options.priority, 345 priority: options.priority,
297 delay: options.delay 346 delay: options.delay
298 } 347 }
299
300 return queue.add('job', obj.payload, jobArgs)
301 } 348 }
302 349
350 // ---------------------------------------------------------------------------
351
303 async listForApi (options: { 352 async listForApi (options: {
304 state?: JobState 353 state?: JobState
305 start: number 354 start: number
@@ -367,6 +416,8 @@ class JobQueue {
367 return Promise.all(promises) 416 return Promise.all(promises)
368 } 417 }
369 418
419 // ---------------------------------------------------------------------------
420
370 async removeOldJobs () { 421 async removeOldJobs () {
371 for (const key of Object.keys(this.queues)) { 422 for (const key of Object.keys(this.queues)) {
372 const queue: Queue = this.queues[key] 423 const queue: Queue = this.queues[key]