]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Fix changelog of secret config for docker
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
5a921e7b
C
10 Worker,
11 WorkerOptions
12} from 'bullmq'
182082f5 13import { parseDurationToMs } from '@server/helpers/core-utils'
402145b8 14import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 15import { CONFIG } from '@server/initializers/config'
402145b8 16import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 17import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
18import {
19 ActivitypubFollowPayload,
20 ActivitypubHttpBroadcastPayload,
e1c55031
C
21 ActivitypubHttpFetcherPayload,
22 ActivitypubHttpUnicastPayload,
8795d6f2 23 ActorKeysPayload,
2a491182 24 AfterVideoChannelImportPayload,
276250f0 25 DeleteResumableUploadMetaFilePayload,
e1c55031 26 EmailPayload,
bd911b54 27 FederateVideoPayload,
8dc8a34e 28 JobState,
e1c55031 29 JobType,
f012319a 30 ManageVideoTorrentPayload,
0305db28 31 MoveObjectStoragePayload,
bd911b54 32 NotifyPayload,
e1c55031 33 RefreshPayload,
2a491182 34 VideoChannelImportPayload,
e1c55031
C
35 VideoFileImportPayload,
36 VideoImportPayload,
a5cf76af 37 VideoLiveEndingPayload,
e1c55031 38 VideoRedundancyPayload,
92e66e04 39 VideoStudioEditionPayload,
e1c55031 40 VideoTranscodingPayload
8dc8a34e 41} from '../../../shared/models'
94a5ff8a 42import { logger } from '../../helpers/logger'
182082f5 43import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 44import { Hooks } from '../plugins/hooks'
c3b21b68 45import { Redis } from '../redis'
74d249bc 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 47import { processActivityPubFollow } from './handlers/activitypub-follow'
405c83f9 48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
8dc8a34e
C
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 51import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 52import { processActorKeys } from './handlers/actor-keys'
ab08ab4e 53import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
402145b8 54import { processEmail } from './handlers/email'
bd911b54 55import { processFederateVideo } from './handlers/federate-video'
f012319a 56import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 57import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 58import { processNotify } from './handlers/notify'
2a491182 59import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 60import { processVideoFileImport } from './handlers/video-file-import'
402145b8 61import { processVideoImport } from './handlers/video-import'
a5cf76af 62import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 64import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 65import { processVideosViewsStats } from './handlers/video-views-stats'
94a5ff8a 66
bd911b54 67export type CreateJobArgument =
94a5ff8a 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 69 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
70 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
71 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
74d249bc 72 { type: 'activitypub-http-cleaner', payload: {} } |
5350fd8e 73 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 74 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 75 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 76 { type: 'email', payload: EmailPayload } |
6b616860 77 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 78 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 79 { type: 'videos-views-stats', payload: {} } |
a5cf76af 80 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 81 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 82 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 83 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 84 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 85 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
86 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
87 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
88 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
89 { type: 'notify', payload: NotifyPayload } |
90 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
91 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 92
0305db28 93export type CreateJobOptions = {
a5cf76af 94 delay?: number
77d7e851 95 priority?: number
a5cf76af
C
96}
97
41fb13c3 98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
405c83f9
C
99 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
100 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
94a5ff8a
C
101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
74d249bc 103 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 104 'activitypub-follow': processActivityPubFollow,
28be8916 105 'video-file-import': processVideoFileImport,
a0327eed 106 'video-transcoding': processVideoTranscoding,
fbad87b0 107 'email': processEmail,
6b616860 108 'video-import': processVideoImport,
51353d9a 109 'videos-views-stats': processVideosViewsStats,
b764380a 110 'activitypub-refresher': refreshAPObject,
a5cf76af 111 'video-live-ending': processVideoLiveEnding,
8795d6f2 112 'actor-keys': processActorKeys,
0305db28 113 'video-redundancy': processVideoRedundancy,
c729caf6 114 'move-to-object-storage': processMoveToObjectStorage,
f012319a 115 'manage-video-torrent': processManageVideoTorrent,
bd911b54 116 'video-studio-edition': processVideoStudioEdition,
2a491182
F
117 'video-channel-import': processVideoChannelImport,
118 'after-video-channel-import': processAfterVideoChannelImport,
119 'notify': processNotify,
bd911b54 120 'federate-video': processFederateVideo
94a5ff8a
C
121}
122
32567717
C
123const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
124 'move-to-object-storage': onMoveToObjectStorageFailure
125}
126
94831479
C
127const jobTypes: JobType[] = [
128 'activitypub-follow',
71e3dfda 129 'activitypub-http-broadcast',
f27b7a75 130 'activitypub-http-broadcast-parallel',
71e3dfda 131 'activitypub-http-fetcher',
94831479 132 'activitypub-http-unicast',
74d249bc 133 'activitypub-cleaner',
94831479 134 'email',
a0327eed 135 'video-transcoding',
fbad87b0 136 'video-file-import',
6b616860 137 'video-import',
51353d9a 138 'videos-views-stats',
b764380a 139 'activitypub-refresher',
a5cf76af 140 'video-redundancy',
8795d6f2 141 'actor-keys',
0305db28 142 'video-live-ending',
c729caf6 143 'move-to-object-storage',
f012319a 144 'manage-video-torrent',
bd911b54 145 'video-studio-edition',
2a491182
F
146 'video-channel-import',
147 'after-video-channel-import',
bd911b54
C
148 'notify',
149 'federate-video'
71e3dfda
C
150]
151
941d28cc
C
152const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
153
94a5ff8a
C
154class JobQueue {
155
156 private static instance: JobQueue
157
5a921e7b 158 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 159 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
160 private queueEvents: { [id in JobType]?: QueueEvents } = {}
161
bd911b54
C
162 private flowProducer: FlowProducer
163
94a5ff8a 164 private initialized = false
2c29ad4f 165 private jobRedisPrefix: string
94a5ff8a 166
a1587156
C
167 private constructor () {
168 }
94a5ff8a 169
4404a7c4 170 init () {
94a5ff8a
C
171 // Already initialized
172 if (this.initialized === true) return
173 this.initialized = true
174
6dd9de95 175 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 176
1c30b112 177 for (const handlerName of Object.keys(handlers)) {
4404a7c4 178 this.buildWorker(handlerName)
5a921e7b 179 this.buildQueue(handlerName)
4404a7c4 180 this.buildQueueEvent(handlerName)
5a921e7b
C
181 }
182
bd911b54 183 this.flowProducer = new FlowProducer({
564b9b55 184 connection: Redis.getRedisClientOptions('FlowProducer'),
bd911b54
C
185 prefix: this.jobRedisPrefix
186 })
ab08ab4e 187 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
bd911b54 188
5a921e7b
C
189 this.addRepeatableJobs()
190 }
191
4404a7c4 192 private buildWorker (handlerName: JobType) {
5a921e7b 193 const workerOptions: WorkerOptions = {
4404a7c4 194 autorun: false,
5a921e7b 195 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 196 prefix: this.jobRedisPrefix,
182082f5
C
197 connection: Redis.getRedisClientOptions('Worker'),
198 maxStalledCount: 10
94831479 199 }
ecb4e35f 200
5a921e7b
C
201 const handler = function (job: Job) {
202 const timeout = JOB_TTL[handlerName]
203 const p = handlers[handlerName](job)
e1ab52d7 204
5a921e7b 205 if (!timeout) return p
e1ab52d7 206
5a921e7b
C
207 return timeoutPromise(p, timeout)
208 }
94a5ff8a 209
5a921e7b
C
210 const processor = async (jobArg: Job<any>) => {
211 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 212
5a921e7b
C
213 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
214 }
d7f83948 215
5a921e7b 216 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 217
5a921e7b
C
218 worker.on('failed', (job, err) => {
219 const logLevel = silentFailure.has(handlerName)
220 ? 'debug'
221 : 'error'
32567717 222
5a921e7b 223 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 224
5a921e7b
C
225 if (errorHandlers[job.name]) {
226 errorHandlers[job.name](job, err)
227 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
228 }
229 })
94831479 230
ab08ab4e 231 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
5a921e7b
C
232
233 this.workers[handlerName] = worker
234 }
235
236 private buildQueue (handlerName: JobType) {
237 const queueOptions: QueueOptions = {
564b9b55 238 connection: Redis.getRedisClientOptions('Queue'),
5a921e7b 239 prefix: this.jobRedisPrefix
94a5ff8a 240 }
6b616860 241
ab08ab4e
C
242 const queue = new Queue(handlerName, queueOptions)
243 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
244
245 this.queues[handlerName] = queue
5a921e7b
C
246 }
247
4404a7c4 248 private buildQueueEvent (handlerName: JobType) {
5a921e7b 249 const queueEventsOptions: QueueEventsOptions = {
4404a7c4 250 autorun: false,
564b9b55 251 connection: Redis.getRedisClientOptions('QueueEvent'),
5a921e7b 252 prefix: this.jobRedisPrefix
14f2b3ad 253 }
ab08ab4e
C
254
255 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
256 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
257
258 this.queueEvents[handlerName] = queueEvents
5a921e7b
C
259 }
260
bd911b54
C
261 // ---------------------------------------------------------------------------
262
5a921e7b
C
263 async terminate () {
264 const promises = Object.keys(this.workers)
265 .map(handlerName => {
266 const worker: Worker = this.workers[handlerName]
267 const queue: Queue = this.queues[handlerName]
5a921e7b
C
268 const queueEvent: QueueEvents = this.queueEvents[handlerName]
269
270 return Promise.all([
271 worker.close(false),
272 queue.close(),
5a921e7b
C
273 queueEvent.close()
274 ])
275 })
276
277 return Promise.all(promises)
14f2b3ad
C
278 }
279
4404a7c4
C
280 start () {
281 const promises = Object.keys(this.workers)
282 .map(handlerName => {
283 const worker: Worker = this.workers[handlerName]
4404a7c4
C
284 const queueEvent: QueueEvents = this.queueEvents[handlerName]
285
286 return Promise.all([
287 worker.run(),
4404a7c4
C
288 queueEvent.run()
289 ])
290 })
291
292 return Promise.all(promises)
293 }
294
419b520c 295 async pause () {
e2b2c726
C
296 for (const handlerName of Object.keys(this.workers)) {
297 const worker: Worker = this.workers[handlerName]
5a921e7b
C
298
299 await worker.pause()
419b520c
C
300 }
301 }
302
51335c72 303 resume () {
e2b2c726
C
304 for (const handlerName of Object.keys(this.workers)) {
305 const worker: Worker = this.workers[handlerName]
5a921e7b
C
306
307 worker.resume()
419b520c
C
308 }
309 }
310
bd911b54
C
311 // ---------------------------------------------------------------------------
312
313 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
314 this.createJob(options)
315 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
316 }
317
2a491182 318 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 319 const queue: Queue = this.queues[options.type]
94831479 320 if (queue === undefined) {
bd911b54 321 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 322 return
94831479 323 }
94a5ff8a 324
bd911b54
C
325 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
326
327 return queue.add('job', options.payload, jobOptions)
328 }
329
2a491182 330 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
331 let lastJob: FlowJob
332
333 for (const job of jobs) {
334 if (!job) continue
335
336 lastJob = {
b42c2c7e
C
337 ...this.buildJobFlowOption(job),
338
bd911b54
C
339 children: lastJob
340 ? [ lastJob ]
341 : []
342 }
343 }
344
345 return this.flowProducer.add(lastJob)
346 }
347
2a491182 348 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
349 return this.flowProducer.add({
350 ...this.buildJobFlowOption(parent),
351
352 children: children.map(c => this.buildJobFlowOption(c))
353 })
354 }
355
c3b21b68 356 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
b42c2c7e
C
357 return {
358 name: 'job',
359 data: job.payload,
360 queueName: job.type,
361 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
362 }
363 }
364
bd911b54
C
365 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
366 return {
94831479 367 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 368 attempts: JOB_ATTEMPTS[type],
77d7e851 369 priority: options.priority,
c3b21b68
C
370 delay: options.delay,
371
372 ...this.buildJobRemovalOptions(type)
94831479 373 }
94a5ff8a
C
374 }
375
bd911b54
C
376 // ---------------------------------------------------------------------------
377
1061c73f 378 async listForApi (options: {
402145b8 379 state?: JobState
a1587156
C
380 start: number
381 count: number
382 asc?: boolean
1061c73f 383 jobType: JobType
41fb13c3 384 }): Promise<Job[]> {
402145b8
C
385 const { state, start, count, asc, jobType } = options
386
e2b2c726
C
387 const states = this.buildStateFilter(state)
388 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 389
e2b2c726 390 let results: Job[] = []
1061c73f 391
1061c73f 392 for (const jobType of filteredJobTypes) {
5a921e7b
C
393 const queue: Queue = this.queues[jobType]
394
94831479
C
395 if (queue === undefined) {
396 logger.error('Unknown queue %s to list jobs.', jobType)
397 continue
398 }
2c29ad4f 399
402145b8 400 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
401 results = results.concat(jobs)
402 }
94a5ff8a 403
94831479
C
404 results.sort((j1: any, j2: any) => {
405 if (j1.timestamp < j2.timestamp) return -1
406 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 407
94831479 408 return 1
94a5ff8a 409 })
94a5ff8a 410
94831479 411 if (asc === false) results.reverse()
94a5ff8a 412
94831479 413 return results.slice(start, start + count)
94a5ff8a
C
414 }
415
402145b8
C
416 async count (state: JobState, jobType?: JobType): Promise<number> {
417 const states = state ? [ state ] : jobStates
e2b2c726 418 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 419
e2b2c726 420 let total = 0
1061c73f
C
421
422 for (const type of filteredJobTypes) {
a1587156 423 const queue = this.queues[type]
94831479
C
424 if (queue === undefined) {
425 logger.error('Unknown queue %s to count jobs.', type)
426 continue
427 }
3df45638 428
94831479 429 const counts = await queue.getJobCounts()
3df45638 430
040d6896
RK
431 for (const s of states) {
432 total += counts[s]
433 }
94831479 434 }
3df45638 435
94831479 436 return total
3df45638
C
437 }
438
e2b2c726
C
439 private buildStateFilter (state?: JobState) {
440 if (!state) return jobStates
441
442 const states = [ state ]
443
444 // Include parent if filtering on waiting
445 if (state === 'waiting') states.push('waiting-children')
446
447 return states
448 }
449
450 private buildTypeFilter (jobType?: JobType) {
451 if (!jobType) return jobTypes
452
453 return jobTypes.filter(t => t === jobType)
454 }
455
630d0a1b
C
456 async getStats () {
457 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
458
459 return Promise.all(promises)
460 }
461
bd911b54
C
462 // ---------------------------------------------------------------------------
463
2f5c6b2f 464 async removeOldJobs () {
94831479 465 for (const key of Object.keys(this.queues)) {
5a921e7b 466 const queue: Queue = this.queues[key]
5949eca7
C
467 await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
468 await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
94831479 469 }
2c29ad4f
C
470 }
471
6b616860 472 private addRepeatableJobs () {
5a921e7b 473 this.queues['videos-views-stats'].add('job', {}, {
c3b21b68
C
474 repeat: REPEAT_JOBS['videos-views-stats'],
475
476 ...this.buildJobRemovalOptions('videos-views-stats')
a1587156 477 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
478
479 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 480 this.queues['activitypub-cleaner'].add('job', {}, {
c3b21b68
C
481 repeat: REPEAT_JOBS['activitypub-cleaner'],
482
483 ...this.buildJobRemovalOptions('activitypub-cleaner')
74d249bc
C
484 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
485 }
6b616860
C
486 }
487
9129b769
C
488 private getJobConcurrency (jobType: JobType) {
489 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
490 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
491
492 return JOB_CONCURRENCY[jobType]
493 }
494
c3b21b68
C
495 private buildJobRemovalOptions (queueName: string) {
496 return {
497 removeOnComplete: {
498 // Wants seconds
499 age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
500
501 count: JOB_REMOVAL_OPTIONS.COUNT
502 },
503 removeOnFail: {
504 // Wants seconds
505 age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
506
507 count: JOB_REMOVAL_OPTIONS.COUNT / 1000
508 }
509 }
510 }
511
94a5ff8a
C
512 static get Instance () {
513 return this.instance || (this.instance = new this())
514 }
515}
516
517// ---------------------------------------------------------------------------
518
519export {
1061c73f 520 jobTypes,
94a5ff8a
C
521 JobQueue
522}