]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Channel sync (#5135)
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
10 QueueScheduler,
11 QueueSchedulerOptions,
12 Worker,
13 WorkerOptions
14} from 'bullmq'
402145b8 15import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 16import { CONFIG } from '@server/initializers/config'
402145b8 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 18import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
19import {
20 ActivitypubFollowPayload,
21 ActivitypubHttpBroadcastPayload,
e1c55031
C
22 ActivitypubHttpFetcherPayload,
23 ActivitypubHttpUnicastPayload,
8795d6f2 24 ActorKeysPayload,
2a491182 25 AfterVideoChannelImportPayload,
276250f0 26 DeleteResumableUploadMetaFilePayload,
e1c55031 27 EmailPayload,
bd911b54 28 FederateVideoPayload,
8dc8a34e 29 JobState,
e1c55031 30 JobType,
f012319a 31 ManageVideoTorrentPayload,
0305db28 32 MoveObjectStoragePayload,
bd911b54 33 NotifyPayload,
e1c55031 34 RefreshPayload,
2a491182 35 VideoChannelImportPayload,
e1c55031
C
36 VideoFileImportPayload,
37 VideoImportPayload,
a5cf76af 38 VideoLiveEndingPayload,
e1c55031 39 VideoRedundancyPayload,
92e66e04 40 VideoStudioEditionPayload,
e1c55031 41 VideoTranscodingPayload
8dc8a34e 42} from '../../../shared/models'
94a5ff8a 43import { logger } from '../../helpers/logger'
74dc3bca 44import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 45import { Hooks } from '../plugins/hooks'
74d249bc 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 47import { processActivityPubFollow } from './handlers/activitypub-follow'
8dc8a34e
C
48import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 51import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 52import { processActorKeys } from './handlers/actor-keys'
402145b8 53import { processEmail } from './handlers/email'
bd911b54 54import { processFederateVideo } from './handlers/federate-video'
f012319a 55import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 56import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 57import { processNotify } from './handlers/notify'
2a491182 58import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 59import { processVideoFileImport } from './handlers/video-file-import'
402145b8 60import { processVideoImport } from './handlers/video-import'
a5cf76af 61import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 62import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 63import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 64import { processVideosViewsStats } from './handlers/video-views-stats'
2a491182 65import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
94a5ff8a 66
bd911b54 67export type CreateJobArgument =
94a5ff8a 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 69 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
70 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
71 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
74d249bc 72 { type: 'activitypub-http-cleaner', payload: {} } |
5350fd8e 73 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 74 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 75 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 76 { type: 'email', payload: EmailPayload } |
6b616860 77 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 78 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 79 { type: 'videos-views-stats', payload: {} } |
a5cf76af 80 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 81 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 82 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 83 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 84 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 85 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
86 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
87 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
88 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
89 { type: 'notify', payload: NotifyPayload } |
90 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
91 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 92
0305db28 93export type CreateJobOptions = {
a5cf76af 94 delay?: number
77d7e851 95 priority?: number
a5cf76af
C
96}
97
41fb13c3 98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
94a5ff8a 99 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
f27b7a75 100 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
94a5ff8a
C
101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
74d249bc 103 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 104 'activitypub-follow': processActivityPubFollow,
28be8916 105 'video-file-import': processVideoFileImport,
a0327eed 106 'video-transcoding': processVideoTranscoding,
fbad87b0 107 'email': processEmail,
6b616860 108 'video-import': processVideoImport,
51353d9a 109 'videos-views-stats': processVideosViewsStats,
b764380a 110 'activitypub-refresher': refreshAPObject,
a5cf76af 111 'video-live-ending': processVideoLiveEnding,
8795d6f2 112 'actor-keys': processActorKeys,
0305db28 113 'video-redundancy': processVideoRedundancy,
c729caf6 114 'move-to-object-storage': processMoveToObjectStorage,
f012319a 115 'manage-video-torrent': processManageVideoTorrent,
bd911b54 116 'video-studio-edition': processVideoStudioEdition,
2a491182
F
117 'video-channel-import': processVideoChannelImport,
118 'after-video-channel-import': processAfterVideoChannelImport,
119 'notify': processNotify,
bd911b54 120 'federate-video': processFederateVideo
94a5ff8a
C
121}
122
32567717
C
123const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
124 'move-to-object-storage': onMoveToObjectStorageFailure
125}
126
94831479
C
127const jobTypes: JobType[] = [
128 'activitypub-follow',
71e3dfda 129 'activitypub-http-broadcast',
f27b7a75 130 'activitypub-http-broadcast-parallel',
71e3dfda 131 'activitypub-http-fetcher',
94831479 132 'activitypub-http-unicast',
74d249bc 133 'activitypub-cleaner',
94831479 134 'email',
a0327eed 135 'video-transcoding',
fbad87b0 136 'video-file-import',
6b616860 137 'video-import',
51353d9a 138 'videos-views-stats',
b764380a 139 'activitypub-refresher',
a5cf76af 140 'video-redundancy',
8795d6f2 141 'actor-keys',
0305db28 142 'video-live-ending',
c729caf6 143 'move-to-object-storage',
f012319a 144 'manage-video-torrent',
bd911b54 145 'video-studio-edition',
2a491182
F
146 'video-channel-import',
147 'after-video-channel-import',
bd911b54
C
148 'notify',
149 'federate-video'
71e3dfda
C
150]
151
941d28cc
C
152const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
153
94a5ff8a
C
154class JobQueue {
155
156 private static instance: JobQueue
157
5a921e7b 158 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 159 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
160 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
161 private queueEvents: { [id in JobType]?: QueueEvents } = {}
162
bd911b54
C
163 private flowProducer: FlowProducer
164
94a5ff8a 165 private initialized = false
2c29ad4f 166 private jobRedisPrefix: string
94a5ff8a 167
a1587156
C
168 private constructor () {
169 }
94a5ff8a 170
e1ab52d7 171 init (produceOnly = false) {
94a5ff8a
C
172 // Already initialized
173 if (this.initialized === true) return
174 this.initialized = true
175
6dd9de95 176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 177
5a921e7b
C
178 for (const handlerName of (Object.keys(handlers) as JobType[])) {
179 this.buildWorker(handlerName, produceOnly)
180 this.buildQueue(handlerName)
181 this.buildQueueScheduler(handlerName, produceOnly)
182 this.buildQueueEvent(handlerName, produceOnly)
183 }
184
bd911b54
C
185 this.flowProducer = new FlowProducer({
186 connection: this.getRedisConnection(),
187 prefix: this.jobRedisPrefix
188 })
189
5a921e7b
C
190 this.addRepeatableJobs()
191 }
192
193 private buildWorker (handlerName: JobType, produceOnly: boolean) {
194 const workerOptions: WorkerOptions = {
195 autorun: !produceOnly,
196 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 197 prefix: this.jobRedisPrefix,
5a921e7b 198 connection: this.getRedisConnection()
94831479 199 }
ecb4e35f 200
5a921e7b
C
201 const handler = function (job: Job) {
202 const timeout = JOB_TTL[handlerName]
203 const p = handlers[handlerName](job)
e1ab52d7 204
5a921e7b 205 if (!timeout) return p
e1ab52d7 206
5a921e7b
C
207 return timeoutPromise(p, timeout)
208 }
94a5ff8a 209
5a921e7b
C
210 const processor = async (jobArg: Job<any>) => {
211 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 212
5a921e7b
C
213 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
214 }
d7f83948 215
5a921e7b 216 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 217
5a921e7b
C
218 worker.on('failed', (job, err) => {
219 const logLevel = silentFailure.has(handlerName)
220 ? 'debug'
221 : 'error'
32567717 222
5a921e7b 223 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 224
5a921e7b
C
225 if (errorHandlers[job.name]) {
226 errorHandlers[job.name](job, err)
227 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
228 }
229 })
94831479 230
5a921e7b
C
231 worker.on('error', err => {
232 logger.error('Error in job queue %s.', handlerName, { err })
233 })
234
235 this.workers[handlerName] = worker
236 }
237
238 private buildQueue (handlerName: JobType) {
239 const queueOptions: QueueOptions = {
240 connection: this.getRedisConnection(),
241 prefix: this.jobRedisPrefix
94a5ff8a 242 }
6b616860 243
5a921e7b
C
244 this.queues[handlerName] = new Queue(handlerName, queueOptions)
245 }
246
247 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
248 const queueSchedulerOptions: QueueSchedulerOptions = {
249 autorun: !produceOnly,
250 connection: this.getRedisConnection(),
251 prefix: this.jobRedisPrefix,
252 maxStalledCount: 10
253 }
254 this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
94a5ff8a
C
255 }
256
5a921e7b
C
257 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
258 const queueEventsOptions: QueueEventsOptions = {
259 autorun: !produceOnly,
260 connection: this.getRedisConnection(),
261 prefix: this.jobRedisPrefix
14f2b3ad 262 }
5a921e7b
C
263 this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
264 }
265
266 private getRedisConnection () {
267 return {
268 password: CONFIG.REDIS.AUTH,
269 db: CONFIG.REDIS.DB,
270 host: CONFIG.REDIS.HOSTNAME,
271 port: CONFIG.REDIS.PORT,
272 path: CONFIG.REDIS.SOCKET
273 }
274 }
275
bd911b54
C
276 // ---------------------------------------------------------------------------
277
5a921e7b
C
278 async terminate () {
279 const promises = Object.keys(this.workers)
280 .map(handlerName => {
281 const worker: Worker = this.workers[handlerName]
282 const queue: Queue = this.queues[handlerName]
283 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
284 const queueEvent: QueueEvents = this.queueEvents[handlerName]
285
286 return Promise.all([
287 worker.close(false),
288 queue.close(),
289 queueScheduler.close(),
290 queueEvent.close()
291 ])
292 })
293
294 return Promise.all(promises)
14f2b3ad
C
295 }
296
419b520c 297 async pause () {
e2b2c726
C
298 for (const handlerName of Object.keys(this.workers)) {
299 const worker: Worker = this.workers[handlerName]
5a921e7b
C
300
301 await worker.pause()
419b520c
C
302 }
303 }
304
51335c72 305 resume () {
e2b2c726
C
306 for (const handlerName of Object.keys(this.workers)) {
307 const worker: Worker = this.workers[handlerName]
5a921e7b
C
308
309 worker.resume()
419b520c
C
310 }
311 }
312
bd911b54
C
313 // ---------------------------------------------------------------------------
314
315 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
316 this.createJob(options)
317 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
318 }
319
2a491182 320 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 321 const queue: Queue = this.queues[options.type]
94831479 322 if (queue === undefined) {
bd911b54 323 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 324 return
94831479 325 }
94a5ff8a 326
bd911b54
C
327 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
328
329 return queue.add('job', options.payload, jobOptions)
330 }
331
2a491182 332 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
333 let lastJob: FlowJob
334
335 for (const job of jobs) {
336 if (!job) continue
337
338 lastJob = {
b42c2c7e
C
339 ...this.buildJobFlowOption(job),
340
bd911b54
C
341 children: lastJob
342 ? [ lastJob ]
343 : []
344 }
345 }
346
347 return this.flowProducer.add(lastJob)
348 }
349
2a491182 350 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
351 return this.flowProducer.add({
352 ...this.buildJobFlowOption(parent),
353
354 children: children.map(c => this.buildJobFlowOption(c))
355 })
356 }
357
358 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
359 return {
360 name: 'job',
361 data: job.payload,
362 queueName: job.type,
363 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
364 }
365 }
366
bd911b54
C
367 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
368 return {
94831479 369 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 370 attempts: JOB_ATTEMPTS[type],
77d7e851 371 priority: options.priority,
a5cf76af 372 delay: options.delay
94831479 373 }
94a5ff8a
C
374 }
375
bd911b54
C
376 // ---------------------------------------------------------------------------
377
1061c73f 378 async listForApi (options: {
402145b8 379 state?: JobState
a1587156
C
380 start: number
381 count: number
382 asc?: boolean
1061c73f 383 jobType: JobType
41fb13c3 384 }): Promise<Job[]> {
402145b8
C
385 const { state, start, count, asc, jobType } = options
386
e2b2c726
C
387 const states = this.buildStateFilter(state)
388 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 389
e2b2c726 390 let results: Job[] = []
1061c73f 391
1061c73f 392 for (const jobType of filteredJobTypes) {
5a921e7b
C
393 const queue: Queue = this.queues[jobType]
394
94831479
C
395 if (queue === undefined) {
396 logger.error('Unknown queue %s to list jobs.', jobType)
397 continue
398 }
2c29ad4f 399
402145b8 400 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
401 results = results.concat(jobs)
402 }
94a5ff8a 403
94831479
C
404 results.sort((j1: any, j2: any) => {
405 if (j1.timestamp < j2.timestamp) return -1
406 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 407
94831479 408 return 1
94a5ff8a 409 })
94a5ff8a 410
94831479 411 if (asc === false) results.reverse()
94a5ff8a 412
94831479 413 return results.slice(start, start + count)
94a5ff8a
C
414 }
415
402145b8
C
416 async count (state: JobState, jobType?: JobType): Promise<number> {
417 const states = state ? [ state ] : jobStates
e2b2c726 418 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 419
e2b2c726 420 let total = 0
1061c73f
C
421
422 for (const type of filteredJobTypes) {
a1587156 423 const queue = this.queues[type]
94831479
C
424 if (queue === undefined) {
425 logger.error('Unknown queue %s to count jobs.', type)
426 continue
427 }
3df45638 428
94831479 429 const counts = await queue.getJobCounts()
3df45638 430
040d6896
RK
431 for (const s of states) {
432 total += counts[s]
433 }
94831479 434 }
3df45638 435
94831479 436 return total
3df45638
C
437 }
438
e2b2c726
C
439 private buildStateFilter (state?: JobState) {
440 if (!state) return jobStates
441
442 const states = [ state ]
443
444 // Include parent if filtering on waiting
445 if (state === 'waiting') states.push('waiting-children')
446
447 return states
448 }
449
450 private buildTypeFilter (jobType?: JobType) {
451 if (!jobType) return jobTypes
452
453 return jobTypes.filter(t => t === jobType)
454 }
455
630d0a1b
C
456 async getStats () {
457 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
458
459 return Promise.all(promises)
460 }
461
bd911b54
C
462 // ---------------------------------------------------------------------------
463
2f5c6b2f 464 async removeOldJobs () {
94831479 465 for (const key of Object.keys(this.queues)) {
5a921e7b
C
466 const queue: Queue = this.queues[key]
467 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
94831479 468 }
2c29ad4f
C
469 }
470
6b616860 471 private addRepeatableJobs () {
5a921e7b 472 this.queues['videos-views-stats'].add('job', {}, {
51353d9a 473 repeat: REPEAT_JOBS['videos-views-stats']
a1587156 474 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
475
476 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 477 this.queues['activitypub-cleaner'].add('job', {}, {
74d249bc
C
478 repeat: REPEAT_JOBS['activitypub-cleaner']
479 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
480 }
6b616860
C
481 }
482
9129b769
C
483 private getJobConcurrency (jobType: JobType) {
484 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
485 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
486
487 return JOB_CONCURRENCY[jobType]
488 }
489
94a5ff8a
C
490 static get Instance () {
491 return this.instance || (this.instance = new this())
492 }
493}
494
495// ---------------------------------------------------------------------------
496
497export {
1061c73f 498 jobTypes,
94a5ff8a
C
499 JobQueue
500}