]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Add logger for uploadx
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
10 QueueScheduler,
11 QueueSchedulerOptions,
12 Worker,
13 WorkerOptions
14} from 'bullmq'
402145b8 15import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 16import { CONFIG } from '@server/initializers/config'
402145b8 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 18import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
19import {
20 ActivitypubFollowPayload,
21 ActivitypubHttpBroadcastPayload,
e1c55031
C
22 ActivitypubHttpFetcherPayload,
23 ActivitypubHttpUnicastPayload,
8795d6f2 24 ActorKeysPayload,
2a491182 25 AfterVideoChannelImportPayload,
276250f0 26 DeleteResumableUploadMetaFilePayload,
e1c55031 27 EmailPayload,
bd911b54 28 FederateVideoPayload,
8dc8a34e 29 JobState,
e1c55031 30 JobType,
f012319a 31 ManageVideoTorrentPayload,
0305db28 32 MoveObjectStoragePayload,
bd911b54 33 NotifyPayload,
e1c55031 34 RefreshPayload,
2a491182 35 VideoChannelImportPayload,
e1c55031
C
36 VideoFileImportPayload,
37 VideoImportPayload,
a5cf76af 38 VideoLiveEndingPayload,
e1c55031 39 VideoRedundancyPayload,
92e66e04 40 VideoStudioEditionPayload,
e1c55031 41 VideoTranscodingPayload
8dc8a34e 42} from '../../../shared/models'
94a5ff8a 43import { logger } from '../../helpers/logger'
74dc3bca 44import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 45import { Hooks } from '../plugins/hooks'
74d249bc 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 47import { processActivityPubFollow } from './handlers/activitypub-follow'
405c83f9 48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
8dc8a34e
C
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 51import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 52import { processActorKeys } from './handlers/actor-keys'
ab08ab4e 53import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
402145b8 54import { processEmail } from './handlers/email'
bd911b54 55import { processFederateVideo } from './handlers/federate-video'
f012319a 56import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 57import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 58import { processNotify } from './handlers/notify'
2a491182 59import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 60import { processVideoFileImport } from './handlers/video-file-import'
402145b8 61import { processVideoImport } from './handlers/video-import'
a5cf76af 62import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 64import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 65import { processVideosViewsStats } from './handlers/video-views-stats'
94a5ff8a 66
bd911b54 67export type CreateJobArgument =
94a5ff8a 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 69 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
70 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
71 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
74d249bc 72 { type: 'activitypub-http-cleaner', payload: {} } |
5350fd8e 73 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 74 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 75 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 76 { type: 'email', payload: EmailPayload } |
6b616860 77 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 78 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 79 { type: 'videos-views-stats', payload: {} } |
a5cf76af 80 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 81 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 82 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 83 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 84 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 85 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
86 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
87 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
88 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
89 { type: 'notify', payload: NotifyPayload } |
90 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
91 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 92
0305db28 93export type CreateJobOptions = {
a5cf76af 94 delay?: number
77d7e851 95 priority?: number
a5cf76af
C
96}
97
41fb13c3 98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
405c83f9
C
99 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
100 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
94a5ff8a
C
101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
74d249bc 103 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 104 'activitypub-follow': processActivityPubFollow,
28be8916 105 'video-file-import': processVideoFileImport,
a0327eed 106 'video-transcoding': processVideoTranscoding,
fbad87b0 107 'email': processEmail,
6b616860 108 'video-import': processVideoImport,
51353d9a 109 'videos-views-stats': processVideosViewsStats,
b764380a 110 'activitypub-refresher': refreshAPObject,
a5cf76af 111 'video-live-ending': processVideoLiveEnding,
8795d6f2 112 'actor-keys': processActorKeys,
0305db28 113 'video-redundancy': processVideoRedundancy,
c729caf6 114 'move-to-object-storage': processMoveToObjectStorage,
f012319a 115 'manage-video-torrent': processManageVideoTorrent,
bd911b54 116 'video-studio-edition': processVideoStudioEdition,
2a491182
F
117 'video-channel-import': processVideoChannelImport,
118 'after-video-channel-import': processAfterVideoChannelImport,
119 'notify': processNotify,
bd911b54 120 'federate-video': processFederateVideo
94a5ff8a
C
121}
122
32567717
C
123const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
124 'move-to-object-storage': onMoveToObjectStorageFailure
125}
126
94831479
C
127const jobTypes: JobType[] = [
128 'activitypub-follow',
71e3dfda 129 'activitypub-http-broadcast',
f27b7a75 130 'activitypub-http-broadcast-parallel',
71e3dfda 131 'activitypub-http-fetcher',
94831479 132 'activitypub-http-unicast',
74d249bc 133 'activitypub-cleaner',
94831479 134 'email',
a0327eed 135 'video-transcoding',
fbad87b0 136 'video-file-import',
6b616860 137 'video-import',
51353d9a 138 'videos-views-stats',
b764380a 139 'activitypub-refresher',
a5cf76af 140 'video-redundancy',
8795d6f2 141 'actor-keys',
0305db28 142 'video-live-ending',
c729caf6 143 'move-to-object-storage',
f012319a 144 'manage-video-torrent',
bd911b54 145 'video-studio-edition',
2a491182
F
146 'video-channel-import',
147 'after-video-channel-import',
bd911b54
C
148 'notify',
149 'federate-video'
71e3dfda
C
150]
151
941d28cc
C
152const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
153
94a5ff8a
C
154class JobQueue {
155
156 private static instance: JobQueue
157
5a921e7b 158 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 159 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
160 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
161 private queueEvents: { [id in JobType]?: QueueEvents } = {}
162
bd911b54
C
163 private flowProducer: FlowProducer
164
94a5ff8a 165 private initialized = false
2c29ad4f 166 private jobRedisPrefix: string
94a5ff8a 167
a1587156
C
168 private constructor () {
169 }
94a5ff8a 170
4404a7c4 171 init () {
94a5ff8a
C
172 // Already initialized
173 if (this.initialized === true) return
174 this.initialized = true
175
6dd9de95 176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 177
5a921e7b 178 for (const handlerName of (Object.keys(handlers) as JobType[])) {
4404a7c4 179 this.buildWorker(handlerName)
5a921e7b 180 this.buildQueue(handlerName)
4404a7c4
C
181 this.buildQueueScheduler(handlerName)
182 this.buildQueueEvent(handlerName)
5a921e7b
C
183 }
184
bd911b54
C
185 this.flowProducer = new FlowProducer({
186 connection: this.getRedisConnection(),
187 prefix: this.jobRedisPrefix
188 })
ab08ab4e 189 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
bd911b54 190
5a921e7b
C
191 this.addRepeatableJobs()
192 }
193
4404a7c4 194 private buildWorker (handlerName: JobType) {
5a921e7b 195 const workerOptions: WorkerOptions = {
4404a7c4 196 autorun: false,
5a921e7b 197 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 198 prefix: this.jobRedisPrefix,
5a921e7b 199 connection: this.getRedisConnection()
94831479 200 }
ecb4e35f 201
5a921e7b
C
202 const handler = function (job: Job) {
203 const timeout = JOB_TTL[handlerName]
204 const p = handlers[handlerName](job)
e1ab52d7 205
5a921e7b 206 if (!timeout) return p
e1ab52d7 207
5a921e7b
C
208 return timeoutPromise(p, timeout)
209 }
94a5ff8a 210
5a921e7b
C
211 const processor = async (jobArg: Job<any>) => {
212 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 213
5a921e7b
C
214 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
215 }
d7f83948 216
5a921e7b 217 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 218
5a921e7b
C
219 worker.on('failed', (job, err) => {
220 const logLevel = silentFailure.has(handlerName)
221 ? 'debug'
222 : 'error'
32567717 223
5a921e7b 224 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 225
5a921e7b
C
226 if (errorHandlers[job.name]) {
227 errorHandlers[job.name](job, err)
228 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
229 }
230 })
94831479 231
ab08ab4e 232 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
5a921e7b
C
233
234 this.workers[handlerName] = worker
235 }
236
237 private buildQueue (handlerName: JobType) {
238 const queueOptions: QueueOptions = {
239 connection: this.getRedisConnection(),
240 prefix: this.jobRedisPrefix
94a5ff8a 241 }
6b616860 242
ab08ab4e
C
243 const queue = new Queue(handlerName, queueOptions)
244 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
245
246 this.queues[handlerName] = queue
5a921e7b
C
247 }
248
4404a7c4 249 private buildQueueScheduler (handlerName: JobType) {
5a921e7b 250 const queueSchedulerOptions: QueueSchedulerOptions = {
4404a7c4 251 autorun: false,
5a921e7b
C
252 connection: this.getRedisConnection(),
253 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10
255 }
ab08ab4e
C
256
257 const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
258 queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
259
260 this.queueSchedulers[handlerName] = queueScheduler
94a5ff8a
C
261 }
262
4404a7c4 263 private buildQueueEvent (handlerName: JobType) {
5a921e7b 264 const queueEventsOptions: QueueEventsOptions = {
4404a7c4 265 autorun: false,
5a921e7b
C
266 connection: this.getRedisConnection(),
267 prefix: this.jobRedisPrefix
14f2b3ad 268 }
ab08ab4e
C
269
270 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
271 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
272
273 this.queueEvents[handlerName] = queueEvents
5a921e7b
C
274 }
275
276 private getRedisConnection () {
277 return {
278 password: CONFIG.REDIS.AUTH,
279 db: CONFIG.REDIS.DB,
280 host: CONFIG.REDIS.HOSTNAME,
281 port: CONFIG.REDIS.PORT,
282 path: CONFIG.REDIS.SOCKET
283 }
284 }
285
bd911b54
C
286 // ---------------------------------------------------------------------------
287
5a921e7b
C
288 async terminate () {
289 const promises = Object.keys(this.workers)
290 .map(handlerName => {
291 const worker: Worker = this.workers[handlerName]
292 const queue: Queue = this.queues[handlerName]
293 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
294 const queueEvent: QueueEvents = this.queueEvents[handlerName]
295
296 return Promise.all([
297 worker.close(false),
298 queue.close(),
299 queueScheduler.close(),
300 queueEvent.close()
301 ])
302 })
303
304 return Promise.all(promises)
14f2b3ad
C
305 }
306
4404a7c4
C
307 start () {
308 const promises = Object.keys(this.workers)
309 .map(handlerName => {
310 const worker: Worker = this.workers[handlerName]
311 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
312 const queueEvent: QueueEvents = this.queueEvents[handlerName]
313
314 return Promise.all([
315 worker.run(),
316 queueScheduler.run(),
317 queueEvent.run()
318 ])
319 })
320
321 return Promise.all(promises)
322 }
323
419b520c 324 async pause () {
e2b2c726
C
325 for (const handlerName of Object.keys(this.workers)) {
326 const worker: Worker = this.workers[handlerName]
5a921e7b
C
327
328 await worker.pause()
419b520c
C
329 }
330 }
331
51335c72 332 resume () {
e2b2c726
C
333 for (const handlerName of Object.keys(this.workers)) {
334 const worker: Worker = this.workers[handlerName]
5a921e7b
C
335
336 worker.resume()
419b520c
C
337 }
338 }
339
bd911b54
C
340 // ---------------------------------------------------------------------------
341
342 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
343 this.createJob(options)
344 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
345 }
346
2a491182 347 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 348 const queue: Queue = this.queues[options.type]
94831479 349 if (queue === undefined) {
bd911b54 350 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 351 return
94831479 352 }
94a5ff8a 353
bd911b54
C
354 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
355
356 return queue.add('job', options.payload, jobOptions)
357 }
358
2a491182 359 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
360 let lastJob: FlowJob
361
362 for (const job of jobs) {
363 if (!job) continue
364
365 lastJob = {
b42c2c7e
C
366 ...this.buildJobFlowOption(job),
367
bd911b54
C
368 children: lastJob
369 ? [ lastJob ]
370 : []
371 }
372 }
373
374 return this.flowProducer.add(lastJob)
375 }
376
2a491182 377 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
378 return this.flowProducer.add({
379 ...this.buildJobFlowOption(parent),
380
381 children: children.map(c => this.buildJobFlowOption(c))
382 })
383 }
384
385 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
386 return {
387 name: 'job',
388 data: job.payload,
389 queueName: job.type,
390 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
391 }
392 }
393
bd911b54
C
394 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
395 return {
94831479 396 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 397 attempts: JOB_ATTEMPTS[type],
77d7e851 398 priority: options.priority,
a5cf76af 399 delay: options.delay
94831479 400 }
94a5ff8a
C
401 }
402
bd911b54
C
403 // ---------------------------------------------------------------------------
404
1061c73f 405 async listForApi (options: {
402145b8 406 state?: JobState
a1587156
C
407 start: number
408 count: number
409 asc?: boolean
1061c73f 410 jobType: JobType
41fb13c3 411 }): Promise<Job[]> {
402145b8
C
412 const { state, start, count, asc, jobType } = options
413
e2b2c726
C
414 const states = this.buildStateFilter(state)
415 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 416
e2b2c726 417 let results: Job[] = []
1061c73f 418
1061c73f 419 for (const jobType of filteredJobTypes) {
5a921e7b
C
420 const queue: Queue = this.queues[jobType]
421
94831479
C
422 if (queue === undefined) {
423 logger.error('Unknown queue %s to list jobs.', jobType)
424 continue
425 }
2c29ad4f 426
402145b8 427 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
428 results = results.concat(jobs)
429 }
94a5ff8a 430
94831479
C
431 results.sort((j1: any, j2: any) => {
432 if (j1.timestamp < j2.timestamp) return -1
433 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 434
94831479 435 return 1
94a5ff8a 436 })
94a5ff8a 437
94831479 438 if (asc === false) results.reverse()
94a5ff8a 439
94831479 440 return results.slice(start, start + count)
94a5ff8a
C
441 }
442
402145b8
C
443 async count (state: JobState, jobType?: JobType): Promise<number> {
444 const states = state ? [ state ] : jobStates
e2b2c726 445 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 446
e2b2c726 447 let total = 0
1061c73f
C
448
449 for (const type of filteredJobTypes) {
a1587156 450 const queue = this.queues[type]
94831479
C
451 if (queue === undefined) {
452 logger.error('Unknown queue %s to count jobs.', type)
453 continue
454 }
3df45638 455
94831479 456 const counts = await queue.getJobCounts()
3df45638 457
040d6896
RK
458 for (const s of states) {
459 total += counts[s]
460 }
94831479 461 }
3df45638 462
94831479 463 return total
3df45638
C
464 }
465
e2b2c726
C
466 private buildStateFilter (state?: JobState) {
467 if (!state) return jobStates
468
469 const states = [ state ]
470
471 // Include parent if filtering on waiting
472 if (state === 'waiting') states.push('waiting-children')
473
474 return states
475 }
476
477 private buildTypeFilter (jobType?: JobType) {
478 if (!jobType) return jobTypes
479
480 return jobTypes.filter(t => t === jobType)
481 }
482
630d0a1b
C
483 async getStats () {
484 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
485
486 return Promise.all(promises)
487 }
488
bd911b54
C
489 // ---------------------------------------------------------------------------
490
2f5c6b2f 491 async removeOldJobs () {
94831479 492 for (const key of Object.keys(this.queues)) {
5a921e7b
C
493 const queue: Queue = this.queues[key]
494 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
94831479 495 }
2c29ad4f
C
496 }
497
6b616860 498 private addRepeatableJobs () {
5a921e7b 499 this.queues['videos-views-stats'].add('job', {}, {
51353d9a 500 repeat: REPEAT_JOBS['videos-views-stats']
a1587156 501 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
502
503 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 504 this.queues['activitypub-cleaner'].add('job', {}, {
74d249bc
C
505 repeat: REPEAT_JOBS['activitypub-cleaner']
506 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
507 }
6b616860
C
508 }
509
9129b769
C
510 private getJobConcurrency (jobType: JobType) {
511 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
512 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
513
514 return JOB_CONCURRENCY[jobType]
515 }
516
94a5ff8a
C
517 static get Instance () {
518 return this.instance || (this.instance = new this())
519 }
520}
521
522// ---------------------------------------------------------------------------
523
524export {
1061c73f 525 jobTypes,
94a5ff8a
C
526 JobQueue
527}