]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Fix import video file lock
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
5a921e7b
C
10 Worker,
11 WorkerOptions
12} from 'bullmq'
182082f5 13import { parseDurationToMs } from '@server/helpers/core-utils'
402145b8 14import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 15import { CONFIG } from '@server/initializers/config'
402145b8 16import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 17import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
18import {
19 ActivitypubFollowPayload,
20 ActivitypubHttpBroadcastPayload,
e1c55031
C
21 ActivitypubHttpFetcherPayload,
22 ActivitypubHttpUnicastPayload,
8795d6f2 23 ActorKeysPayload,
2a491182 24 AfterVideoChannelImportPayload,
276250f0 25 DeleteResumableUploadMetaFilePayload,
e1c55031 26 EmailPayload,
bd911b54 27 FederateVideoPayload,
8dc8a34e 28 JobState,
e1c55031 29 JobType,
f012319a 30 ManageVideoTorrentPayload,
0305db28 31 MoveObjectStoragePayload,
bd911b54 32 NotifyPayload,
e1c55031 33 RefreshPayload,
0c9668f7 34 TranscodingJobBuilderPayload,
2a491182 35 VideoChannelImportPayload,
e1c55031
C
36 VideoFileImportPayload,
37 VideoImportPayload,
a5cf76af 38 VideoLiveEndingPayload,
e1c55031 39 VideoRedundancyPayload,
92e66e04 40 VideoStudioEditionPayload,
e1c55031 41 VideoTranscodingPayload
8dc8a34e 42} from '../../../shared/models'
94a5ff8a 43import { logger } from '../../helpers/logger'
182082f5 44import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 45import { Hooks } from '../plugins/hooks'
c3b21b68 46import { Redis } from '../redis'
74d249bc 47import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 48import { processActivityPubFollow } from './handlers/activitypub-follow'
405c83f9 49import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
8dc8a34e
C
50import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
51import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 52import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 53import { processActorKeys } from './handlers/actor-keys'
ab08ab4e 54import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
402145b8 55import { processEmail } from './handlers/email'
bd911b54 56import { processFederateVideo } from './handlers/federate-video'
f012319a 57import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 58import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 59import { processNotify } from './handlers/notify'
0c9668f7 60import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder'
2a491182 61import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 62import { processVideoFileImport } from './handlers/video-file-import'
402145b8 63import { processVideoImport } from './handlers/video-import'
a5cf76af 64import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 65import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 66import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 67import { processVideosViewsStats } from './handlers/video-views-stats'
94a5ff8a 68
bd911b54 69export type CreateJobArgument =
94a5ff8a 70 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 71 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
72 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
73 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
0c9668f7 74 { type: 'activitypub-cleaner', payload: {} } |
5350fd8e 75 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 76 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 77 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 78 { type: 'email', payload: EmailPayload } |
0c9668f7 79 { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
6b616860 80 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 81 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 82 { type: 'videos-views-stats', payload: {} } |
a5cf76af 83 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 84 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 85 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 86 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 87 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 88 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
89 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
90 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
91 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
92 { type: 'notify', payload: NotifyPayload } |
93 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
94 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 95
0305db28 96export type CreateJobOptions = {
a5cf76af 97 delay?: number
77d7e851 98 priority?: number
a5cf76af
C
99}
100
41fb13c3 101const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
74d249bc 102 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 103 'activitypub-follow': processActivityPubFollow,
0c9668f7
C
104 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
105 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
106 'activitypub-http-fetcher': processActivityPubHttpFetcher,
107 'activitypub-http-unicast': processActivityPubHttpUnicast,
108 'activitypub-refresher': refreshAPObject,
109 'actor-keys': processActorKeys,
110 'after-video-channel-import': processAfterVideoChannelImport,
fbad87b0 111 'email': processEmail,
0c9668f7
C
112 'federate-video': processFederateVideo,
113 'transcoding-job-builder': processTranscodingJobBuilder,
114 'manage-video-torrent': processManageVideoTorrent,
115 'move-to-object-storage': processMoveToObjectStorage,
116 'notify': processNotify,
117 'video-channel-import': processVideoChannelImport,
118 'video-file-import': processVideoFileImport,
6b616860 119 'video-import': processVideoImport,
a5cf76af 120 'video-live-ending': processVideoLiveEnding,
0305db28 121 'video-redundancy': processVideoRedundancy,
bd911b54 122 'video-studio-edition': processVideoStudioEdition,
0c9668f7
C
123 'video-transcoding': processVideoTranscoding,
124 'videos-views-stats': processVideosViewsStats
94a5ff8a
C
125}
126
32567717
C
127const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
128 'move-to-object-storage': onMoveToObjectStorageFailure
129}
130
94831479 131const jobTypes: JobType[] = [
0c9668f7 132 'activitypub-cleaner',
94831479 133 'activitypub-follow',
f27b7a75 134 'activitypub-http-broadcast-parallel',
0c9668f7 135 'activitypub-http-broadcast',
71e3dfda 136 'activitypub-http-fetcher',
94831479 137 'activitypub-http-unicast',
0c9668f7
C
138 'activitypub-refresher',
139 'actor-keys',
140 'after-video-channel-import',
94831479 141 'email',
0c9668f7
C
142 'federate-video',
143 'transcoding-job-builder',
144 'manage-video-torrent',
145 'move-to-object-storage',
146 'notify',
147 'video-channel-import',
fbad87b0 148 'video-file-import',
6b616860 149 'video-import',
0305db28 150 'video-live-ending',
0c9668f7 151 'video-redundancy',
bd911b54 152 'video-studio-edition',
0c9668f7
C
153 'video-transcoding',
154 'videos-views-stats'
71e3dfda
C
155]
156
941d28cc
C
157const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
158
94a5ff8a
C
159class JobQueue {
160
161 private static instance: JobQueue
162
5a921e7b 163 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 164 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
165 private queueEvents: { [id in JobType]?: QueueEvents } = {}
166
bd911b54
C
167 private flowProducer: FlowProducer
168
94a5ff8a 169 private initialized = false
2c29ad4f 170 private jobRedisPrefix: string
94a5ff8a 171
a1587156
C
172 private constructor () {
173 }
94a5ff8a 174
4404a7c4 175 init () {
94a5ff8a
C
176 // Already initialized
177 if (this.initialized === true) return
178 this.initialized = true
179
6dd9de95 180 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 181
1c30b112 182 for (const handlerName of Object.keys(handlers)) {
4404a7c4 183 this.buildWorker(handlerName)
5a921e7b 184 this.buildQueue(handlerName)
4404a7c4 185 this.buildQueueEvent(handlerName)
5a921e7b
C
186 }
187
bd911b54 188 this.flowProducer = new FlowProducer({
564b9b55 189 connection: Redis.getRedisClientOptions('FlowProducer'),
bd911b54
C
190 prefix: this.jobRedisPrefix
191 })
ab08ab4e 192 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
bd911b54 193
5a921e7b
C
194 this.addRepeatableJobs()
195 }
196
4404a7c4 197 private buildWorker (handlerName: JobType) {
5a921e7b 198 const workerOptions: WorkerOptions = {
4404a7c4 199 autorun: false,
5a921e7b 200 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 201 prefix: this.jobRedisPrefix,
182082f5
C
202 connection: Redis.getRedisClientOptions('Worker'),
203 maxStalledCount: 10
94831479 204 }
ecb4e35f 205
5a921e7b
C
206 const handler = function (job: Job) {
207 const timeout = JOB_TTL[handlerName]
208 const p = handlers[handlerName](job)
e1ab52d7 209
5a921e7b 210 if (!timeout) return p
e1ab52d7 211
5a921e7b
C
212 return timeoutPromise(p, timeout)
213 }
94a5ff8a 214
5a921e7b
C
215 const processor = async (jobArg: Job<any>) => {
216 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 217
5a921e7b
C
218 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
219 }
d7f83948 220
5a921e7b 221 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 222
5a921e7b
C
223 worker.on('failed', (job, err) => {
224 const logLevel = silentFailure.has(handlerName)
225 ? 'debug'
226 : 'error'
32567717 227
5a921e7b 228 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 229
5a921e7b
C
230 if (errorHandlers[job.name]) {
231 errorHandlers[job.name](job, err)
232 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
233 }
234 })
94831479 235
ab08ab4e 236 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
5a921e7b
C
237
238 this.workers[handlerName] = worker
239 }
240
241 private buildQueue (handlerName: JobType) {
242 const queueOptions: QueueOptions = {
564b9b55 243 connection: Redis.getRedisClientOptions('Queue'),
5a921e7b 244 prefix: this.jobRedisPrefix
94a5ff8a 245 }
6b616860 246
ab08ab4e
C
247 const queue = new Queue(handlerName, queueOptions)
248 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
249
250 this.queues[handlerName] = queue
5a921e7b
C
251 }
252
4404a7c4 253 private buildQueueEvent (handlerName: JobType) {
5a921e7b 254 const queueEventsOptions: QueueEventsOptions = {
4404a7c4 255 autorun: false,
564b9b55 256 connection: Redis.getRedisClientOptions('QueueEvent'),
5a921e7b 257 prefix: this.jobRedisPrefix
14f2b3ad 258 }
ab08ab4e
C
259
260 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
261 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
262
263 this.queueEvents[handlerName] = queueEvents
5a921e7b
C
264 }
265
bd911b54
C
266 // ---------------------------------------------------------------------------
267
5a921e7b
C
268 async terminate () {
269 const promises = Object.keys(this.workers)
270 .map(handlerName => {
271 const worker: Worker = this.workers[handlerName]
272 const queue: Queue = this.queues[handlerName]
5a921e7b
C
273 const queueEvent: QueueEvents = this.queueEvents[handlerName]
274
275 return Promise.all([
276 worker.close(false),
277 queue.close(),
5a921e7b
C
278 queueEvent.close()
279 ])
280 })
281
282 return Promise.all(promises)
14f2b3ad
C
283 }
284
4404a7c4
C
285 start () {
286 const promises = Object.keys(this.workers)
287 .map(handlerName => {
288 const worker: Worker = this.workers[handlerName]
4404a7c4
C
289 const queueEvent: QueueEvents = this.queueEvents[handlerName]
290
291 return Promise.all([
292 worker.run(),
4404a7c4
C
293 queueEvent.run()
294 ])
295 })
296
297 return Promise.all(promises)
298 }
299
419b520c 300 async pause () {
e2b2c726
C
301 for (const handlerName of Object.keys(this.workers)) {
302 const worker: Worker = this.workers[handlerName]
5a921e7b
C
303
304 await worker.pause()
419b520c
C
305 }
306 }
307
51335c72 308 resume () {
e2b2c726
C
309 for (const handlerName of Object.keys(this.workers)) {
310 const worker: Worker = this.workers[handlerName]
5a921e7b
C
311
312 worker.resume()
419b520c
C
313 }
314 }
315
bd911b54
C
316 // ---------------------------------------------------------------------------
317
318 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
319 this.createJob(options)
320 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
321 }
322
2a491182 323 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 324 const queue: Queue = this.queues[options.type]
94831479 325 if (queue === undefined) {
bd911b54 326 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 327 return
94831479 328 }
94a5ff8a 329
bd911b54
C
330 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
331
332 return queue.add('job', options.payload, jobOptions)
333 }
334
2a491182 335 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
336 let lastJob: FlowJob
337
338 for (const job of jobs) {
339 if (!job) continue
340
341 lastJob = {
b42c2c7e
C
342 ...this.buildJobFlowOption(job),
343
bd911b54
C
344 children: lastJob
345 ? [ lastJob ]
346 : []
347 }
348 }
349
350 return this.flowProducer.add(lastJob)
351 }
352
2a491182 353 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
354 return this.flowProducer.add({
355 ...this.buildJobFlowOption(parent),
356
357 children: children.map(c => this.buildJobFlowOption(c))
358 })
359 }
360
c3b21b68 361 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
b42c2c7e
C
362 return {
363 name: 'job',
364 data: job.payload,
365 queueName: job.type,
366 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
367 }
368 }
369
bd911b54
C
370 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
371 return {
94831479 372 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 373 attempts: JOB_ATTEMPTS[type],
77d7e851 374 priority: options.priority,
c3b21b68
C
375 delay: options.delay,
376
377 ...this.buildJobRemovalOptions(type)
94831479 378 }
94a5ff8a
C
379 }
380
bd911b54
C
381 // ---------------------------------------------------------------------------
382
1061c73f 383 async listForApi (options: {
402145b8 384 state?: JobState
a1587156
C
385 start: number
386 count: number
387 asc?: boolean
1061c73f 388 jobType: JobType
41fb13c3 389 }): Promise<Job[]> {
402145b8
C
390 const { state, start, count, asc, jobType } = options
391
e2b2c726
C
392 const states = this.buildStateFilter(state)
393 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 394
e2b2c726 395 let results: Job[] = []
1061c73f 396
1061c73f 397 for (const jobType of filteredJobTypes) {
5a921e7b
C
398 const queue: Queue = this.queues[jobType]
399
94831479
C
400 if (queue === undefined) {
401 logger.error('Unknown queue %s to list jobs.', jobType)
402 continue
403 }
2c29ad4f 404
402145b8 405 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
406 results = results.concat(jobs)
407 }
94a5ff8a 408
94831479
C
409 results.sort((j1: any, j2: any) => {
410 if (j1.timestamp < j2.timestamp) return -1
411 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 412
94831479 413 return 1
94a5ff8a 414 })
94a5ff8a 415
94831479 416 if (asc === false) results.reverse()
94a5ff8a 417
94831479 418 return results.slice(start, start + count)
94a5ff8a
C
419 }
420
402145b8
C
421 async count (state: JobState, jobType?: JobType): Promise<number> {
422 const states = state ? [ state ] : jobStates
e2b2c726 423 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 424
e2b2c726 425 let total = 0
1061c73f
C
426
427 for (const type of filteredJobTypes) {
a1587156 428 const queue = this.queues[type]
94831479
C
429 if (queue === undefined) {
430 logger.error('Unknown queue %s to count jobs.', type)
431 continue
432 }
3df45638 433
94831479 434 const counts = await queue.getJobCounts()
3df45638 435
040d6896
RK
436 for (const s of states) {
437 total += counts[s]
438 }
94831479 439 }
3df45638 440
94831479 441 return total
3df45638
C
442 }
443
e2b2c726
C
444 private buildStateFilter (state?: JobState) {
445 if (!state) return jobStates
446
447 const states = [ state ]
448
449 // Include parent if filtering on waiting
450 if (state === 'waiting') states.push('waiting-children')
451
452 return states
453 }
454
455 private buildTypeFilter (jobType?: JobType) {
456 if (!jobType) return jobTypes
457
458 return jobTypes.filter(t => t === jobType)
459 }
460
630d0a1b
C
461 async getStats () {
462 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
463
464 return Promise.all(promises)
465 }
466
bd911b54
C
467 // ---------------------------------------------------------------------------
468
2f5c6b2f 469 async removeOldJobs () {
94831479 470 for (const key of Object.keys(this.queues)) {
5a921e7b 471 const queue: Queue = this.queues[key]
5949eca7
C
472 await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
473 await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
94831479 474 }
2c29ad4f
C
475 }
476
6b616860 477 private addRepeatableJobs () {
5a921e7b 478 this.queues['videos-views-stats'].add('job', {}, {
c3b21b68
C
479 repeat: REPEAT_JOBS['videos-views-stats'],
480
481 ...this.buildJobRemovalOptions('videos-views-stats')
a1587156 482 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
483
484 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 485 this.queues['activitypub-cleaner'].add('job', {}, {
c3b21b68
C
486 repeat: REPEAT_JOBS['activitypub-cleaner'],
487
488 ...this.buildJobRemovalOptions('activitypub-cleaner')
74d249bc
C
489 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
490 }
6b616860
C
491 }
492
9129b769
C
493 private getJobConcurrency (jobType: JobType) {
494 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
495 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
496
497 return JOB_CONCURRENCY[jobType]
498 }
499
c3b21b68
C
500 private buildJobRemovalOptions (queueName: string) {
501 return {
502 removeOnComplete: {
503 // Wants seconds
504 age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
505
506 count: JOB_REMOVAL_OPTIONS.COUNT
507 },
508 removeOnFail: {
509 // Wants seconds
510 age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
511
512 count: JOB_REMOVAL_OPTIONS.COUNT / 1000
513 }
514 }
515 }
516
94a5ff8a
C
517 static get Instance () {
518 return this.instance || (this.instance = new this())
519 }
520}
521
522// ---------------------------------------------------------------------------
523
524export {
1061c73f 525 jobTypes,
94a5ff8a
C
526 JobQueue
527}