]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Bumped to version v5.2.1
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
5a921e7b
C
10 Worker,
11 WorkerOptions
12} from 'bullmq'
182082f5 13import { parseDurationToMs } from '@server/helpers/core-utils'
402145b8 14import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 15import { CONFIG } from '@server/initializers/config'
402145b8 16import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 17import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
18import {
19 ActivitypubFollowPayload,
20 ActivitypubHttpBroadcastPayload,
e1c55031
C
21 ActivitypubHttpFetcherPayload,
22 ActivitypubHttpUnicastPayload,
8795d6f2 23 ActorKeysPayload,
2a491182 24 AfterVideoChannelImportPayload,
276250f0 25 DeleteResumableUploadMetaFilePayload,
e1c55031 26 EmailPayload,
bd911b54 27 FederateVideoPayload,
8dc8a34e 28 JobState,
e1c55031 29 JobType,
f012319a 30 ManageVideoTorrentPayload,
0305db28 31 MoveObjectStoragePayload,
bd911b54 32 NotifyPayload,
e1c55031 33 RefreshPayload,
0c9668f7 34 TranscodingJobBuilderPayload,
2a491182 35 VideoChannelImportPayload,
e1c55031
C
36 VideoFileImportPayload,
37 VideoImportPayload,
a5cf76af 38 VideoLiveEndingPayload,
e1c55031 39 VideoRedundancyPayload,
92e66e04 40 VideoStudioEditionPayload,
e1c55031 41 VideoTranscodingPayload
8dc8a34e 42} from '../../../shared/models'
94a5ff8a 43import { logger } from '../../helpers/logger'
182082f5 44import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 45import { Hooks } from '../plugins/hooks'
c3b21b68 46import { Redis } from '../redis'
74d249bc 47import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 48import { processActivityPubFollow } from './handlers/activitypub-follow'
405c83f9 49import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
8dc8a34e
C
50import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
51import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 52import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 53import { processActorKeys } from './handlers/actor-keys'
ab08ab4e 54import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
402145b8 55import { processEmail } from './handlers/email'
bd911b54 56import { processFederateVideo } from './handlers/federate-video'
f012319a 57import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 58import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 59import { processNotify } from './handlers/notify'
0c9668f7 60import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder'
2a491182 61import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 62import { processVideoFileImport } from './handlers/video-file-import'
402145b8 63import { processVideoImport } from './handlers/video-import'
a5cf76af 64import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 65import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 66import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 67import { processVideosViewsStats } from './handlers/video-views-stats'
94a5ff8a 68
bd911b54 69export type CreateJobArgument =
94a5ff8a 70 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 71 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
72 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
73 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
0c9668f7 74 { type: 'activitypub-cleaner', payload: {} } |
5350fd8e 75 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 76 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 77 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 78 { type: 'email', payload: EmailPayload } |
0c9668f7 79 { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
6b616860 80 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 81 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 82 { type: 'videos-views-stats', payload: {} } |
a5cf76af 83 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 84 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 85 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 86 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 87 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 88 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
89 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
90 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
91 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
92 { type: 'notify', payload: NotifyPayload } |
93 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
94 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 95
0305db28 96export type CreateJobOptions = {
a5cf76af 97 delay?: number
77d7e851 98 priority?: number
3a0c2a77 99 failParentOnFailure?: boolean
a5cf76af
C
100}
101
41fb13c3 102const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
74d249bc 103 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 104 'activitypub-follow': processActivityPubFollow,
0c9668f7
C
105 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
106 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
107 'activitypub-http-fetcher': processActivityPubHttpFetcher,
108 'activitypub-http-unicast': processActivityPubHttpUnicast,
109 'activitypub-refresher': refreshAPObject,
110 'actor-keys': processActorKeys,
111 'after-video-channel-import': processAfterVideoChannelImport,
fbad87b0 112 'email': processEmail,
0c9668f7
C
113 'federate-video': processFederateVideo,
114 'transcoding-job-builder': processTranscodingJobBuilder,
115 'manage-video-torrent': processManageVideoTorrent,
116 'move-to-object-storage': processMoveToObjectStorage,
117 'notify': processNotify,
118 'video-channel-import': processVideoChannelImport,
119 'video-file-import': processVideoFileImport,
6b616860 120 'video-import': processVideoImport,
a5cf76af 121 'video-live-ending': processVideoLiveEnding,
0305db28 122 'video-redundancy': processVideoRedundancy,
bd911b54 123 'video-studio-edition': processVideoStudioEdition,
0c9668f7
C
124 'video-transcoding': processVideoTranscoding,
125 'videos-views-stats': processVideosViewsStats
94a5ff8a
C
126}
127
32567717
C
128const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
129 'move-to-object-storage': onMoveToObjectStorageFailure
130}
131
94831479 132const jobTypes: JobType[] = [
0c9668f7 133 'activitypub-cleaner',
94831479 134 'activitypub-follow',
f27b7a75 135 'activitypub-http-broadcast-parallel',
0c9668f7 136 'activitypub-http-broadcast',
71e3dfda 137 'activitypub-http-fetcher',
94831479 138 'activitypub-http-unicast',
0c9668f7
C
139 'activitypub-refresher',
140 'actor-keys',
141 'after-video-channel-import',
94831479 142 'email',
0c9668f7
C
143 'federate-video',
144 'transcoding-job-builder',
145 'manage-video-torrent',
146 'move-to-object-storage',
147 'notify',
148 'video-channel-import',
fbad87b0 149 'video-file-import',
6b616860 150 'video-import',
0305db28 151 'video-live-ending',
0c9668f7 152 'video-redundancy',
bd911b54 153 'video-studio-edition',
0c9668f7
C
154 'video-transcoding',
155 'videos-views-stats'
71e3dfda
C
156]
157
941d28cc
C
158const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
159
94a5ff8a
C
160class JobQueue {
161
162 private static instance: JobQueue
163
5a921e7b 164 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 165 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
166 private queueEvents: { [id in JobType]?: QueueEvents } = {}
167
bd911b54
C
168 private flowProducer: FlowProducer
169
94a5ff8a 170 private initialized = false
2c29ad4f 171 private jobRedisPrefix: string
94a5ff8a 172
a1587156
C
173 private constructor () {
174 }
94a5ff8a 175
4404a7c4 176 init () {
94a5ff8a
C
177 // Already initialized
178 if (this.initialized === true) return
179 this.initialized = true
180
6dd9de95 181 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 182
1c30b112 183 for (const handlerName of Object.keys(handlers)) {
4404a7c4 184 this.buildWorker(handlerName)
5a921e7b 185 this.buildQueue(handlerName)
4404a7c4 186 this.buildQueueEvent(handlerName)
5a921e7b
C
187 }
188
bd911b54 189 this.flowProducer = new FlowProducer({
564b9b55 190 connection: Redis.getRedisClientOptions('FlowProducer'),
bd911b54
C
191 prefix: this.jobRedisPrefix
192 })
ab08ab4e 193 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
bd911b54 194
5a921e7b
C
195 this.addRepeatableJobs()
196 }
197
4404a7c4 198 private buildWorker (handlerName: JobType) {
5a921e7b 199 const workerOptions: WorkerOptions = {
4404a7c4 200 autorun: false,
5a921e7b 201 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 202 prefix: this.jobRedisPrefix,
182082f5
C
203 connection: Redis.getRedisClientOptions('Worker'),
204 maxStalledCount: 10
94831479 205 }
ecb4e35f 206
5a921e7b
C
207 const handler = function (job: Job) {
208 const timeout = JOB_TTL[handlerName]
209 const p = handlers[handlerName](job)
e1ab52d7 210
5a921e7b 211 if (!timeout) return p
e1ab52d7 212
5a921e7b
C
213 return timeoutPromise(p, timeout)
214 }
94a5ff8a 215
5a921e7b
C
216 const processor = async (jobArg: Job<any>) => {
217 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 218
5a921e7b
C
219 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
220 }
d7f83948 221
5a921e7b 222 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 223
5a921e7b
C
224 worker.on('failed', (job, err) => {
225 const logLevel = silentFailure.has(handlerName)
226 ? 'debug'
227 : 'error'
32567717 228
5a921e7b 229 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 230
5a921e7b
C
231 if (errorHandlers[job.name]) {
232 errorHandlers[job.name](job, err)
233 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
234 }
235 })
94831479 236
ab08ab4e 237 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
5a921e7b
C
238
239 this.workers[handlerName] = worker
240 }
241
242 private buildQueue (handlerName: JobType) {
243 const queueOptions: QueueOptions = {
564b9b55 244 connection: Redis.getRedisClientOptions('Queue'),
5a921e7b 245 prefix: this.jobRedisPrefix
94a5ff8a 246 }
6b616860 247
ab08ab4e
C
248 const queue = new Queue(handlerName, queueOptions)
249 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
250
251 this.queues[handlerName] = queue
5a921e7b
C
252 }
253
4404a7c4 254 private buildQueueEvent (handlerName: JobType) {
5a921e7b 255 const queueEventsOptions: QueueEventsOptions = {
4404a7c4 256 autorun: false,
564b9b55 257 connection: Redis.getRedisClientOptions('QueueEvent'),
5a921e7b 258 prefix: this.jobRedisPrefix
14f2b3ad 259 }
ab08ab4e
C
260
261 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
262 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
263
264 this.queueEvents[handlerName] = queueEvents
5a921e7b
C
265 }
266
bd911b54
C
267 // ---------------------------------------------------------------------------
268
5a921e7b
C
269 async terminate () {
270 const promises = Object.keys(this.workers)
271 .map(handlerName => {
272 const worker: Worker = this.workers[handlerName]
273 const queue: Queue = this.queues[handlerName]
5a921e7b
C
274 const queueEvent: QueueEvents = this.queueEvents[handlerName]
275
276 return Promise.all([
277 worker.close(false),
278 queue.close(),
5a921e7b
C
279 queueEvent.close()
280 ])
281 })
282
283 return Promise.all(promises)
14f2b3ad
C
284 }
285
4404a7c4
C
286 start () {
287 const promises = Object.keys(this.workers)
288 .map(handlerName => {
289 const worker: Worker = this.workers[handlerName]
4404a7c4
C
290 const queueEvent: QueueEvents = this.queueEvents[handlerName]
291
292 return Promise.all([
293 worker.run(),
4404a7c4
C
294 queueEvent.run()
295 ])
296 })
297
298 return Promise.all(promises)
299 }
300
419b520c 301 async pause () {
e2b2c726
C
302 for (const handlerName of Object.keys(this.workers)) {
303 const worker: Worker = this.workers[handlerName]
5a921e7b
C
304
305 await worker.pause()
419b520c
C
306 }
307 }
308
51335c72 309 resume () {
e2b2c726
C
310 for (const handlerName of Object.keys(this.workers)) {
311 const worker: Worker = this.workers[handlerName]
5a921e7b
C
312
313 worker.resume()
419b520c
C
314 }
315 }
316
bd911b54
C
317 // ---------------------------------------------------------------------------
318
319 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
320 this.createJob(options)
321 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
322 }
323
2a491182 324 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 325 const queue: Queue = this.queues[options.type]
94831479 326 if (queue === undefined) {
bd911b54 327 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 328 return
94831479 329 }
94a5ff8a 330
bd911b54
C
331 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
332
333 return queue.add('job', options.payload, jobOptions)
334 }
335
2a491182 336 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
337 let lastJob: FlowJob
338
339 for (const job of jobs) {
340 if (!job) continue
341
342 lastJob = {
b42c2c7e
C
343 ...this.buildJobFlowOption(job),
344
bd911b54
C
345 children: lastJob
346 ? [ lastJob ]
347 : []
348 }
349 }
350
351 return this.flowProducer.add(lastJob)
352 }
353
2a491182 354 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
355 return this.flowProducer.add({
356 ...this.buildJobFlowOption(parent),
357
358 children: children.map(c => this.buildJobFlowOption(c))
359 })
360 }
361
c3b21b68 362 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
b42c2c7e
C
363 return {
364 name: 'job',
365 data: job.payload,
366 queueName: job.type,
3a0c2a77
C
367 opts: {
368 failParentOnFailure: true,
369
370 ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ]))
371 }
b42c2c7e
C
372 }
373 }
374
bd911b54
C
375 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
376 return {
94831479 377 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 378 attempts: JOB_ATTEMPTS[type],
77d7e851 379 priority: options.priority,
c3b21b68
C
380 delay: options.delay,
381
382 ...this.buildJobRemovalOptions(type)
94831479 383 }
94a5ff8a
C
384 }
385
bd911b54
C
386 // ---------------------------------------------------------------------------
387
1061c73f 388 async listForApi (options: {
402145b8 389 state?: JobState
a1587156
C
390 start: number
391 count: number
392 asc?: boolean
1061c73f 393 jobType: JobType
41fb13c3 394 }): Promise<Job[]> {
402145b8
C
395 const { state, start, count, asc, jobType } = options
396
e2b2c726
C
397 const states = this.buildStateFilter(state)
398 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 399
e2b2c726 400 let results: Job[] = []
1061c73f 401
1061c73f 402 for (const jobType of filteredJobTypes) {
5a921e7b
C
403 const queue: Queue = this.queues[jobType]
404
94831479
C
405 if (queue === undefined) {
406 logger.error('Unknown queue %s to list jobs.', jobType)
407 continue
408 }
2c29ad4f 409
402145b8 410 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
411 results = results.concat(jobs)
412 }
94a5ff8a 413
94831479
C
414 results.sort((j1: any, j2: any) => {
415 if (j1.timestamp < j2.timestamp) return -1
416 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 417
94831479 418 return 1
94a5ff8a 419 })
94a5ff8a 420
94831479 421 if (asc === false) results.reverse()
94a5ff8a 422
94831479 423 return results.slice(start, start + count)
94a5ff8a
C
424 }
425
402145b8
C
426 async count (state: JobState, jobType?: JobType): Promise<number> {
427 const states = state ? [ state ] : jobStates
e2b2c726 428 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 429
e2b2c726 430 let total = 0
1061c73f
C
431
432 for (const type of filteredJobTypes) {
a1587156 433 const queue = this.queues[type]
94831479
C
434 if (queue === undefined) {
435 logger.error('Unknown queue %s to count jobs.', type)
436 continue
437 }
3df45638 438
94831479 439 const counts = await queue.getJobCounts()
3df45638 440
040d6896
RK
441 for (const s of states) {
442 total += counts[s]
443 }
94831479 444 }
3df45638 445
94831479 446 return total
3df45638
C
447 }
448
e2b2c726
C
449 private buildStateFilter (state?: JobState) {
450 if (!state) return jobStates
451
452 const states = [ state ]
453
454 // Include parent if filtering on waiting
455 if (state === 'waiting') states.push('waiting-children')
456
457 return states
458 }
459
460 private buildTypeFilter (jobType?: JobType) {
461 if (!jobType) return jobTypes
462
463 return jobTypes.filter(t => t === jobType)
464 }
465
630d0a1b
C
466 async getStats () {
467 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
468
469 return Promise.all(promises)
470 }
471
bd911b54
C
472 // ---------------------------------------------------------------------------
473
2f5c6b2f 474 async removeOldJobs () {
94831479 475 for (const key of Object.keys(this.queues)) {
5a921e7b 476 const queue: Queue = this.queues[key]
5949eca7
C
477 await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
478 await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
94831479 479 }
2c29ad4f
C
480 }
481
6b616860 482 private addRepeatableJobs () {
5a921e7b 483 this.queues['videos-views-stats'].add('job', {}, {
c3b21b68
C
484 repeat: REPEAT_JOBS['videos-views-stats'],
485
486 ...this.buildJobRemovalOptions('videos-views-stats')
a1587156 487 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
488
489 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 490 this.queues['activitypub-cleaner'].add('job', {}, {
c3b21b68
C
491 repeat: REPEAT_JOBS['activitypub-cleaner'],
492
493 ...this.buildJobRemovalOptions('activitypub-cleaner')
74d249bc
C
494 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
495 }
6b616860
C
496 }
497
9129b769
C
498 private getJobConcurrency (jobType: JobType) {
499 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
500 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
501
502 return JOB_CONCURRENCY[jobType]
503 }
504
c3b21b68
C
505 private buildJobRemovalOptions (queueName: string) {
506 return {
507 removeOnComplete: {
508 // Wants seconds
509 age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
510
511 count: JOB_REMOVAL_OPTIONS.COUNT
512 },
513 removeOnFail: {
514 // Wants seconds
515 age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
516
517 count: JOB_REMOVAL_OPTIONS.COUNT / 1000
518 }
519 }
520 }
521
94a5ff8a
C
522 static get Instance () {
523 return this.instance || (this.instance = new this())
524 }
525}
526
527// ---------------------------------------------------------------------------
528
529export {
1061c73f 530 jobTypes,
94a5ff8a
C
531 JobQueue
532}