]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Don't inject untrusted input
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
10 QueueScheduler,
11 QueueSchedulerOptions,
12 Worker,
13 WorkerOptions
14} from 'bullmq'
402145b8 15import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 16import { CONFIG } from '@server/initializers/config'
402145b8 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 18import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
19import {
20 ActivitypubFollowPayload,
21 ActivitypubHttpBroadcastPayload,
e1c55031
C
22 ActivitypubHttpFetcherPayload,
23 ActivitypubHttpUnicastPayload,
8795d6f2 24 ActorKeysPayload,
2a491182 25 AfterVideoChannelImportPayload,
276250f0 26 DeleteResumableUploadMetaFilePayload,
e1c55031 27 EmailPayload,
bd911b54 28 FederateVideoPayload,
8dc8a34e 29 JobState,
e1c55031 30 JobType,
f012319a 31 ManageVideoTorrentPayload,
0305db28 32 MoveObjectStoragePayload,
bd911b54 33 NotifyPayload,
e1c55031 34 RefreshPayload,
2a491182 35 VideoChannelImportPayload,
e1c55031
C
36 VideoFileImportPayload,
37 VideoImportPayload,
a5cf76af 38 VideoLiveEndingPayload,
e1c55031 39 VideoRedundancyPayload,
92e66e04 40 VideoStudioEditionPayload,
e1c55031 41 VideoTranscodingPayload
8dc8a34e 42} from '../../../shared/models'
94a5ff8a 43import { logger } from '../../helpers/logger'
74dc3bca 44import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 45import { Hooks } from '../plugins/hooks'
74d249bc 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 47import { processActivityPubFollow } from './handlers/activitypub-follow'
405c83f9 48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
8dc8a34e
C
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 51import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 52import { processActorKeys } from './handlers/actor-keys'
ab08ab4e 53import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
402145b8 54import { processEmail } from './handlers/email'
bd911b54 55import { processFederateVideo } from './handlers/federate-video'
f012319a 56import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 57import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 58import { processNotify } from './handlers/notify'
2a491182 59import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 60import { processVideoFileImport } from './handlers/video-file-import'
402145b8 61import { processVideoImport } from './handlers/video-import'
a5cf76af 62import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 64import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 65import { processVideosViewsStats } from './handlers/video-views-stats'
564b9b55 66import { Redis } from '../redis'
94a5ff8a 67
bd911b54 68export type CreateJobArgument =
94a5ff8a 69 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 70 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
71 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
72 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
74d249bc 73 { type: 'activitypub-http-cleaner', payload: {} } |
5350fd8e 74 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 75 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 76 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 77 { type: 'email', payload: EmailPayload } |
6b616860 78 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 79 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 80 { type: 'videos-views-stats', payload: {} } |
a5cf76af 81 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 82 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 83 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 84 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 85 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 86 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
87 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
88 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
89 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
90 { type: 'notify', payload: NotifyPayload } |
91 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
92 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 93
0305db28 94export type CreateJobOptions = {
a5cf76af 95 delay?: number
77d7e851 96 priority?: number
a5cf76af
C
97}
98
41fb13c3 99const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
405c83f9
C
100 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
101 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
94a5ff8a
C
102 'activitypub-http-unicast': processActivityPubHttpUnicast,
103 'activitypub-http-fetcher': processActivityPubHttpFetcher,
74d249bc 104 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 105 'activitypub-follow': processActivityPubFollow,
28be8916 106 'video-file-import': processVideoFileImport,
a0327eed 107 'video-transcoding': processVideoTranscoding,
fbad87b0 108 'email': processEmail,
6b616860 109 'video-import': processVideoImport,
51353d9a 110 'videos-views-stats': processVideosViewsStats,
b764380a 111 'activitypub-refresher': refreshAPObject,
a5cf76af 112 'video-live-ending': processVideoLiveEnding,
8795d6f2 113 'actor-keys': processActorKeys,
0305db28 114 'video-redundancy': processVideoRedundancy,
c729caf6 115 'move-to-object-storage': processMoveToObjectStorage,
f012319a 116 'manage-video-torrent': processManageVideoTorrent,
bd911b54 117 'video-studio-edition': processVideoStudioEdition,
2a491182
F
118 'video-channel-import': processVideoChannelImport,
119 'after-video-channel-import': processAfterVideoChannelImport,
120 'notify': processNotify,
bd911b54 121 'federate-video': processFederateVideo
94a5ff8a
C
122}
123
32567717
C
124const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
125 'move-to-object-storage': onMoveToObjectStorageFailure
126}
127
94831479
C
128const jobTypes: JobType[] = [
129 'activitypub-follow',
71e3dfda 130 'activitypub-http-broadcast',
f27b7a75 131 'activitypub-http-broadcast-parallel',
71e3dfda 132 'activitypub-http-fetcher',
94831479 133 'activitypub-http-unicast',
74d249bc 134 'activitypub-cleaner',
94831479 135 'email',
a0327eed 136 'video-transcoding',
fbad87b0 137 'video-file-import',
6b616860 138 'video-import',
51353d9a 139 'videos-views-stats',
b764380a 140 'activitypub-refresher',
a5cf76af 141 'video-redundancy',
8795d6f2 142 'actor-keys',
0305db28 143 'video-live-ending',
c729caf6 144 'move-to-object-storage',
f012319a 145 'manage-video-torrent',
bd911b54 146 'video-studio-edition',
2a491182
F
147 'video-channel-import',
148 'after-video-channel-import',
bd911b54
C
149 'notify',
150 'federate-video'
71e3dfda
C
151]
152
941d28cc
C
153const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
154
94a5ff8a
C
155class JobQueue {
156
157 private static instance: JobQueue
158
5a921e7b 159 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 160 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
161 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
162 private queueEvents: { [id in JobType]?: QueueEvents } = {}
163
bd911b54
C
164 private flowProducer: FlowProducer
165
94a5ff8a 166 private initialized = false
2c29ad4f 167 private jobRedisPrefix: string
94a5ff8a 168
a1587156
C
169 private constructor () {
170 }
94a5ff8a 171
4404a7c4 172 init () {
94a5ff8a
C
173 // Already initialized
174 if (this.initialized === true) return
175 this.initialized = true
176
6dd9de95 177 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 178
5a921e7b 179 for (const handlerName of (Object.keys(handlers) as JobType[])) {
4404a7c4 180 this.buildWorker(handlerName)
5a921e7b 181 this.buildQueue(handlerName)
4404a7c4
C
182 this.buildQueueScheduler(handlerName)
183 this.buildQueueEvent(handlerName)
5a921e7b
C
184 }
185
bd911b54 186 this.flowProducer = new FlowProducer({
564b9b55 187 connection: Redis.getRedisClientOptions('FlowProducer'),
bd911b54
C
188 prefix: this.jobRedisPrefix
189 })
ab08ab4e 190 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
bd911b54 191
5a921e7b
C
192 this.addRepeatableJobs()
193 }
194
4404a7c4 195 private buildWorker (handlerName: JobType) {
5a921e7b 196 const workerOptions: WorkerOptions = {
4404a7c4 197 autorun: false,
5a921e7b 198 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 199 prefix: this.jobRedisPrefix,
564b9b55 200 connection: Redis.getRedisClientOptions('Worker')
94831479 201 }
ecb4e35f 202
5a921e7b
C
203 const handler = function (job: Job) {
204 const timeout = JOB_TTL[handlerName]
205 const p = handlers[handlerName](job)
e1ab52d7 206
5a921e7b 207 if (!timeout) return p
e1ab52d7 208
5a921e7b
C
209 return timeoutPromise(p, timeout)
210 }
94a5ff8a 211
5a921e7b
C
212 const processor = async (jobArg: Job<any>) => {
213 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 214
5a921e7b
C
215 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
216 }
d7f83948 217
5a921e7b 218 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 219
5a921e7b
C
220 worker.on('failed', (job, err) => {
221 const logLevel = silentFailure.has(handlerName)
222 ? 'debug'
223 : 'error'
32567717 224
5a921e7b 225 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 226
5a921e7b
C
227 if (errorHandlers[job.name]) {
228 errorHandlers[job.name](job, err)
229 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
230 }
231 })
94831479 232
ab08ab4e 233 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
5a921e7b
C
234
235 this.workers[handlerName] = worker
236 }
237
238 private buildQueue (handlerName: JobType) {
239 const queueOptions: QueueOptions = {
564b9b55 240 connection: Redis.getRedisClientOptions('Queue'),
5a921e7b 241 prefix: this.jobRedisPrefix
94a5ff8a 242 }
6b616860 243
ab08ab4e
C
244 const queue = new Queue(handlerName, queueOptions)
245 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
246
247 this.queues[handlerName] = queue
5a921e7b
C
248 }
249
4404a7c4 250 private buildQueueScheduler (handlerName: JobType) {
5a921e7b 251 const queueSchedulerOptions: QueueSchedulerOptions = {
4404a7c4 252 autorun: false,
564b9b55 253 connection: Redis.getRedisClientOptions('QueueScheduler'),
5a921e7b
C
254 prefix: this.jobRedisPrefix,
255 maxStalledCount: 10
256 }
ab08ab4e
C
257
258 const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
259 queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
260
261 this.queueSchedulers[handlerName] = queueScheduler
94a5ff8a
C
262 }
263
4404a7c4 264 private buildQueueEvent (handlerName: JobType) {
5a921e7b 265 const queueEventsOptions: QueueEventsOptions = {
4404a7c4 266 autorun: false,
564b9b55 267 connection: Redis.getRedisClientOptions('QueueEvent'),
5a921e7b 268 prefix: this.jobRedisPrefix
14f2b3ad 269 }
ab08ab4e
C
270
271 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
272 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
273
274 this.queueEvents[handlerName] = queueEvents
5a921e7b
C
275 }
276
bd911b54
C
277 // ---------------------------------------------------------------------------
278
5a921e7b
C
279 async terminate () {
280 const promises = Object.keys(this.workers)
281 .map(handlerName => {
282 const worker: Worker = this.workers[handlerName]
283 const queue: Queue = this.queues[handlerName]
284 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
285 const queueEvent: QueueEvents = this.queueEvents[handlerName]
286
287 return Promise.all([
288 worker.close(false),
289 queue.close(),
290 queueScheduler.close(),
291 queueEvent.close()
292 ])
293 })
294
295 return Promise.all(promises)
14f2b3ad
C
296 }
297
4404a7c4
C
298 start () {
299 const promises = Object.keys(this.workers)
300 .map(handlerName => {
301 const worker: Worker = this.workers[handlerName]
302 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
303 const queueEvent: QueueEvents = this.queueEvents[handlerName]
304
305 return Promise.all([
306 worker.run(),
307 queueScheduler.run(),
308 queueEvent.run()
309 ])
310 })
311
312 return Promise.all(promises)
313 }
314
419b520c 315 async pause () {
e2b2c726
C
316 for (const handlerName of Object.keys(this.workers)) {
317 const worker: Worker = this.workers[handlerName]
5a921e7b
C
318
319 await worker.pause()
419b520c
C
320 }
321 }
322
51335c72 323 resume () {
e2b2c726
C
324 for (const handlerName of Object.keys(this.workers)) {
325 const worker: Worker = this.workers[handlerName]
5a921e7b
C
326
327 worker.resume()
419b520c
C
328 }
329 }
330
bd911b54
C
331 // ---------------------------------------------------------------------------
332
333 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
334 this.createJob(options)
335 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
336 }
337
2a491182 338 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 339 const queue: Queue = this.queues[options.type]
94831479 340 if (queue === undefined) {
bd911b54 341 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 342 return
94831479 343 }
94a5ff8a 344
bd911b54
C
345 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
346
347 return queue.add('job', options.payload, jobOptions)
348 }
349
2a491182 350 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
351 let lastJob: FlowJob
352
353 for (const job of jobs) {
354 if (!job) continue
355
356 lastJob = {
b42c2c7e
C
357 ...this.buildJobFlowOption(job),
358
bd911b54
C
359 children: lastJob
360 ? [ lastJob ]
361 : []
362 }
363 }
364
365 return this.flowProducer.add(lastJob)
366 }
367
2a491182 368 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
369 return this.flowProducer.add({
370 ...this.buildJobFlowOption(parent),
371
372 children: children.map(c => this.buildJobFlowOption(c))
373 })
374 }
375
376 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
377 return {
378 name: 'job',
379 data: job.payload,
380 queueName: job.type,
381 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
382 }
383 }
384
bd911b54
C
385 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
386 return {
94831479 387 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 388 attempts: JOB_ATTEMPTS[type],
77d7e851 389 priority: options.priority,
a5cf76af 390 delay: options.delay
94831479 391 }
94a5ff8a
C
392 }
393
bd911b54
C
394 // ---------------------------------------------------------------------------
395
1061c73f 396 async listForApi (options: {
402145b8 397 state?: JobState
a1587156
C
398 start: number
399 count: number
400 asc?: boolean
1061c73f 401 jobType: JobType
41fb13c3 402 }): Promise<Job[]> {
402145b8
C
403 const { state, start, count, asc, jobType } = options
404
e2b2c726
C
405 const states = this.buildStateFilter(state)
406 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 407
e2b2c726 408 let results: Job[] = []
1061c73f 409
1061c73f 410 for (const jobType of filteredJobTypes) {
5a921e7b
C
411 const queue: Queue = this.queues[jobType]
412
94831479
C
413 if (queue === undefined) {
414 logger.error('Unknown queue %s to list jobs.', jobType)
415 continue
416 }
2c29ad4f 417
402145b8 418 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
419 results = results.concat(jobs)
420 }
94a5ff8a 421
94831479
C
422 results.sort((j1: any, j2: any) => {
423 if (j1.timestamp < j2.timestamp) return -1
424 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 425
94831479 426 return 1
94a5ff8a 427 })
94a5ff8a 428
94831479 429 if (asc === false) results.reverse()
94a5ff8a 430
94831479 431 return results.slice(start, start + count)
94a5ff8a
C
432 }
433
402145b8
C
434 async count (state: JobState, jobType?: JobType): Promise<number> {
435 const states = state ? [ state ] : jobStates
e2b2c726 436 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 437
e2b2c726 438 let total = 0
1061c73f
C
439
440 for (const type of filteredJobTypes) {
a1587156 441 const queue = this.queues[type]
94831479
C
442 if (queue === undefined) {
443 logger.error('Unknown queue %s to count jobs.', type)
444 continue
445 }
3df45638 446
94831479 447 const counts = await queue.getJobCounts()
3df45638 448
040d6896
RK
449 for (const s of states) {
450 total += counts[s]
451 }
94831479 452 }
3df45638 453
94831479 454 return total
3df45638
C
455 }
456
e2b2c726
C
457 private buildStateFilter (state?: JobState) {
458 if (!state) return jobStates
459
460 const states = [ state ]
461
462 // Include parent if filtering on waiting
463 if (state === 'waiting') states.push('waiting-children')
464
465 return states
466 }
467
468 private buildTypeFilter (jobType?: JobType) {
469 if (!jobType) return jobTypes
470
471 return jobTypes.filter(t => t === jobType)
472 }
473
630d0a1b
C
474 async getStats () {
475 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
476
477 return Promise.all(promises)
478 }
479
bd911b54
C
480 // ---------------------------------------------------------------------------
481
2f5c6b2f 482 async removeOldJobs () {
94831479 483 for (const key of Object.keys(this.queues)) {
5a921e7b
C
484 const queue: Queue = this.queues[key]
485 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
94831479 486 }
2c29ad4f
C
487 }
488
6b616860 489 private addRepeatableJobs () {
5a921e7b 490 this.queues['videos-views-stats'].add('job', {}, {
51353d9a 491 repeat: REPEAT_JOBS['videos-views-stats']
a1587156 492 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
493
494 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 495 this.queues['activitypub-cleaner'].add('job', {}, {
74d249bc
C
496 repeat: REPEAT_JOBS['activitypub-cleaner']
497 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
498 }
6b616860
C
499 }
500
9129b769
C
501 private getJobConcurrency (jobType: JobType) {
502 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
503 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
504
505 return JOB_CONCURRENCY[jobType]
506 }
507
94a5ff8a
C
508 static get Instance () {
509 return this.instance || (this.instance = new this())
510 }
511}
512
513// ---------------------------------------------------------------------------
514
515export {
1061c73f 516 jobTypes,
94a5ff8a
C
517 JobQueue
518}