]>
Commit | Line | Data |
---|---|---|
1 | import { | |
2 | FlowJob, | |
3 | FlowProducer, | |
4 | Job, | |
5 | JobsOptions, | |
6 | Queue, | |
7 | QueueEvents, | |
8 | QueueEventsOptions, | |
9 | QueueOptions, | |
10 | QueueScheduler, | |
11 | QueueSchedulerOptions, | |
12 | Worker, | |
13 | WorkerOptions | |
14 | } from 'bullmq' | |
15 | import { jobStates } from '@server/helpers/custom-validators/jobs' | |
16 | import { CONFIG } from '@server/initializers/config' | |
17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | |
18 | import { pick, timeoutPromise } from '@shared/core-utils' | |
19 | import { | |
20 | ActivitypubFollowPayload, | |
21 | ActivitypubHttpBroadcastPayload, | |
22 | ActivitypubHttpFetcherPayload, | |
23 | ActivitypubHttpUnicastPayload, | |
24 | ActorKeysPayload, | |
25 | AfterVideoChannelImportPayload, | |
26 | DeleteResumableUploadMetaFilePayload, | |
27 | EmailPayload, | |
28 | FederateVideoPayload, | |
29 | JobState, | |
30 | JobType, | |
31 | ManageVideoTorrentPayload, | |
32 | MoveObjectStoragePayload, | |
33 | NotifyPayload, | |
34 | RefreshPayload, | |
35 | VideoChannelImportPayload, | |
36 | VideoFileImportPayload, | |
37 | VideoImportPayload, | |
38 | VideoLiveEndingPayload, | |
39 | VideoRedundancyPayload, | |
40 | VideoStudioEditionPayload, | |
41 | VideoTranscodingPayload | |
42 | } from '../../../shared/models' | |
43 | import { logger } from '../../helpers/logger' | |
44 | import { | |
45 | JOB_ATTEMPTS, | |
46 | JOB_CONCURRENCY, | |
47 | JOB_REMOVAL_OPTIONS, | |
48 | JOB_TTL, | |
49 | REPEAT_JOBS, | |
50 | WEBSERVER | |
51 | } from '../../initializers/constants' | |
52 | import { Hooks } from '../plugins/hooks' | |
53 | import { Redis } from '../redis' | |
54 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | |
55 | import { processActivityPubFollow } from './handlers/activitypub-follow' | |
56 | import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' | |
57 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | |
58 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | |
59 | import { refreshAPObject } from './handlers/activitypub-refresher' | |
60 | import { processActorKeys } from './handlers/actor-keys' | |
61 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' | |
62 | import { processEmail } from './handlers/email' | |
63 | import { processFederateVideo } from './handlers/federate-video' | |
64 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | |
65 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | |
66 | import { processNotify } from './handlers/notify' | |
67 | import { processVideoChannelImport } from './handlers/video-channel-import' | |
68 | import { processVideoFileImport } from './handlers/video-file-import' | |
69 | import { processVideoImport } from './handlers/video-import' | |
70 | import { processVideoLiveEnding } from './handlers/video-live-ending' | |
71 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | |
72 | import { processVideoTranscoding } from './handlers/video-transcoding' | |
73 | import { processVideosViewsStats } from './handlers/video-views-stats' | |
74 | import { parseDurationToMs } from '@server/helpers/core-utils' | |
75 | ||
76 | export type CreateJobArgument = | |
77 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |
78 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | |
79 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | |
80 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | |
81 | { type: 'activitypub-http-cleaner', payload: {} } | | |
82 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | |
83 | { type: 'video-file-import', payload: VideoFileImportPayload } | | |
84 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | | |
85 | { type: 'email', payload: EmailPayload } | | |
86 | { type: 'video-import', payload: VideoImportPayload } | | |
87 | { type: 'activitypub-refresher', payload: RefreshPayload } | | |
88 | { type: 'videos-views-stats', payload: {} } | | |
89 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | |
90 | { type: 'actor-keys', payload: ActorKeysPayload } | | |
91 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | |
92 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | |
93 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | |
94 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | |
95 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | |
96 | { type: 'video-channel-import', payload: VideoChannelImportPayload } | | |
97 | { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | | |
98 | { type: 'notify', payload: NotifyPayload } | | |
99 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | |
100 | { type: 'federate-video', payload: FederateVideoPayload } | |
101 | ||
102 | export type CreateJobOptions = { | |
103 | delay?: number | |
104 | priority?: number | |
105 | } | |
106 | ||
107 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |
108 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, | |
109 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, | |
110 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | |
111 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | |
112 | 'activitypub-cleaner': processActivityPubCleaner, | |
113 | 'activitypub-follow': processActivityPubFollow, | |
114 | 'video-file-import': processVideoFileImport, | |
115 | 'video-transcoding': processVideoTranscoding, | |
116 | 'email': processEmail, | |
117 | 'video-import': processVideoImport, | |
118 | 'videos-views-stats': processVideosViewsStats, | |
119 | 'activitypub-refresher': refreshAPObject, | |
120 | 'video-live-ending': processVideoLiveEnding, | |
121 | 'actor-keys': processActorKeys, | |
122 | 'video-redundancy': processVideoRedundancy, | |
123 | 'move-to-object-storage': processMoveToObjectStorage, | |
124 | 'manage-video-torrent': processManageVideoTorrent, | |
125 | 'video-studio-edition': processVideoStudioEdition, | |
126 | 'video-channel-import': processVideoChannelImport, | |
127 | 'after-video-channel-import': processAfterVideoChannelImport, | |
128 | 'notify': processNotify, | |
129 | 'federate-video': processFederateVideo | |
130 | } | |
131 | ||
132 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | |
133 | 'move-to-object-storage': onMoveToObjectStorageFailure | |
134 | } | |
135 | ||
136 | const jobTypes: JobType[] = [ | |
137 | 'activitypub-follow', | |
138 | 'activitypub-http-broadcast', | |
139 | 'activitypub-http-broadcast-parallel', | |
140 | 'activitypub-http-fetcher', | |
141 | 'activitypub-http-unicast', | |
142 | 'activitypub-cleaner', | |
143 | 'email', | |
144 | 'video-transcoding', | |
145 | 'video-file-import', | |
146 | 'video-import', | |
147 | 'videos-views-stats', | |
148 | 'activitypub-refresher', | |
149 | 'video-redundancy', | |
150 | 'actor-keys', | |
151 | 'video-live-ending', | |
152 | 'move-to-object-storage', | |
153 | 'manage-video-torrent', | |
154 | 'video-studio-edition', | |
155 | 'video-channel-import', | |
156 | 'after-video-channel-import', | |
157 | 'notify', | |
158 | 'federate-video' | |
159 | ] | |
160 | ||
161 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | |
162 | ||
163 | class JobQueue { | |
164 | ||
165 | private static instance: JobQueue | |
166 | ||
167 | private workers: { [id in JobType]?: Worker } = {} | |
168 | private queues: { [id in JobType]?: Queue } = {} | |
169 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | |
170 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | |
171 | ||
172 | private flowProducer: FlowProducer | |
173 | ||
174 | private initialized = false | |
175 | private jobRedisPrefix: string | |
176 | ||
177 | private constructor () { | |
178 | } | |
179 | ||
180 | init () { | |
181 | // Already initialized | |
182 | if (this.initialized === true) return | |
183 | this.initialized = true | |
184 | ||
185 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | |
186 | ||
187 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | |
188 | this.buildWorker(handlerName) | |
189 | this.buildQueue(handlerName) | |
190 | this.buildQueueScheduler(handlerName) | |
191 | this.buildQueueEvent(handlerName) | |
192 | } | |
193 | ||
194 | this.flowProducer = new FlowProducer({ | |
195 | connection: Redis.getRedisClientOptions('FlowProducer'), | |
196 | prefix: this.jobRedisPrefix | |
197 | }) | |
198 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) | |
199 | ||
200 | this.addRepeatableJobs() | |
201 | } | |
202 | ||
203 | private buildWorker (handlerName: JobType) { | |
204 | const workerOptions: WorkerOptions = { | |
205 | autorun: false, | |
206 | concurrency: this.getJobConcurrency(handlerName), | |
207 | prefix: this.jobRedisPrefix, | |
208 | connection: Redis.getRedisClientOptions('Worker') | |
209 | } | |
210 | ||
211 | const handler = function (job: Job) { | |
212 | const timeout = JOB_TTL[handlerName] | |
213 | const p = handlers[handlerName](job) | |
214 | ||
215 | if (!timeout) return p | |
216 | ||
217 | return timeoutPromise(p, timeout) | |
218 | } | |
219 | ||
220 | const processor = async (jobArg: Job<any>) => { | |
221 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | |
222 | ||
223 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | |
224 | } | |
225 | ||
226 | const worker = new Worker(handlerName, processor, workerOptions) | |
227 | ||
228 | worker.on('failed', (job, err) => { | |
229 | const logLevel = silentFailure.has(handlerName) | |
230 | ? 'debug' | |
231 | : 'error' | |
232 | ||
233 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) | |
234 | ||
235 | if (errorHandlers[job.name]) { | |
236 | errorHandlers[job.name](job, err) | |
237 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | |
238 | } | |
239 | }) | |
240 | ||
241 | worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) | |
242 | ||
243 | this.workers[handlerName] = worker | |
244 | } | |
245 | ||
246 | private buildQueue (handlerName: JobType) { | |
247 | const queueOptions: QueueOptions = { | |
248 | connection: Redis.getRedisClientOptions('Queue'), | |
249 | prefix: this.jobRedisPrefix | |
250 | } | |
251 | ||
252 | const queue = new Queue(handlerName, queueOptions) | |
253 | queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) | |
254 | ||
255 | this.queues[handlerName] = queue | |
256 | } | |
257 | ||
258 | private buildQueueScheduler (handlerName: JobType) { | |
259 | const queueSchedulerOptions: QueueSchedulerOptions = { | |
260 | autorun: false, | |
261 | connection: Redis.getRedisClientOptions('QueueScheduler'), | |
262 | prefix: this.jobRedisPrefix, | |
263 | maxStalledCount: 10 | |
264 | } | |
265 | ||
266 | const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) | |
267 | queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) | |
268 | ||
269 | this.queueSchedulers[handlerName] = queueScheduler | |
270 | } | |
271 | ||
272 | private buildQueueEvent (handlerName: JobType) { | |
273 | const queueEventsOptions: QueueEventsOptions = { | |
274 | autorun: false, | |
275 | connection: Redis.getRedisClientOptions('QueueEvent'), | |
276 | prefix: this.jobRedisPrefix | |
277 | } | |
278 | ||
279 | const queueEvents = new QueueEvents(handlerName, queueEventsOptions) | |
280 | queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) | |
281 | ||
282 | this.queueEvents[handlerName] = queueEvents | |
283 | } | |
284 | ||
285 | // --------------------------------------------------------------------------- | |
286 | ||
287 | async terminate () { | |
288 | const promises = Object.keys(this.workers) | |
289 | .map(handlerName => { | |
290 | const worker: Worker = this.workers[handlerName] | |
291 | const queue: Queue = this.queues[handlerName] | |
292 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
293 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
294 | ||
295 | return Promise.all([ | |
296 | worker.close(false), | |
297 | queue.close(), | |
298 | queueScheduler.close(), | |
299 | queueEvent.close() | |
300 | ]) | |
301 | }) | |
302 | ||
303 | return Promise.all(promises) | |
304 | } | |
305 | ||
306 | start () { | |
307 | const promises = Object.keys(this.workers) | |
308 | .map(handlerName => { | |
309 | const worker: Worker = this.workers[handlerName] | |
310 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
311 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
312 | ||
313 | return Promise.all([ | |
314 | worker.run(), | |
315 | queueScheduler.run(), | |
316 | queueEvent.run() | |
317 | ]) | |
318 | }) | |
319 | ||
320 | return Promise.all(promises) | |
321 | } | |
322 | ||
323 | async pause () { | |
324 | for (const handlerName of Object.keys(this.workers)) { | |
325 | const worker: Worker = this.workers[handlerName] | |
326 | ||
327 | await worker.pause() | |
328 | } | |
329 | } | |
330 | ||
331 | resume () { | |
332 | for (const handlerName of Object.keys(this.workers)) { | |
333 | const worker: Worker = this.workers[handlerName] | |
334 | ||
335 | worker.resume() | |
336 | } | |
337 | } | |
338 | ||
339 | // --------------------------------------------------------------------------- | |
340 | ||
341 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { | |
342 | this.createJob(options) | |
343 | .catch(err => logger.error('Cannot create job.', { err, options })) | |
344 | } | |
345 | ||
346 | createJob (options: CreateJobArgument & CreateJobOptions) { | |
347 | const queue: Queue = this.queues[options.type] | |
348 | if (queue === undefined) { | |
349 | logger.error('Unknown queue %s: cannot create job.', options.type) | |
350 | return | |
351 | } | |
352 | ||
353 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) | |
354 | ||
355 | return queue.add('job', options.payload, jobOptions) | |
356 | } | |
357 | ||
358 | createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | |
359 | let lastJob: FlowJob | |
360 | ||
361 | for (const job of jobs) { | |
362 | if (!job) continue | |
363 | ||
364 | lastJob = { | |
365 | ...this.buildJobFlowOption(job), | |
366 | ||
367 | children: lastJob | |
368 | ? [ lastJob ] | |
369 | : [] | |
370 | } | |
371 | } | |
372 | ||
373 | return this.flowProducer.add(lastJob) | |
374 | } | |
375 | ||
376 | createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | |
377 | return this.flowProducer.add({ | |
378 | ...this.buildJobFlowOption(parent), | |
379 | ||
380 | children: children.map(c => this.buildJobFlowOption(c)) | |
381 | }) | |
382 | } | |
383 | ||
384 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { | |
385 | return { | |
386 | name: 'job', | |
387 | data: job.payload, | |
388 | queueName: job.type, | |
389 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | |
390 | } | |
391 | } | |
392 | ||
393 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | |
394 | return { | |
395 | backoff: { delay: 60 * 1000, type: 'exponential' }, | |
396 | attempts: JOB_ATTEMPTS[type], | |
397 | priority: options.priority, | |
398 | delay: options.delay, | |
399 | ||
400 | ...this.buildJobRemovalOptions(type) | |
401 | } | |
402 | } | |
403 | ||
404 | // --------------------------------------------------------------------------- | |
405 | ||
406 | async listForApi (options: { | |
407 | state?: JobState | |
408 | start: number | |
409 | count: number | |
410 | asc?: boolean | |
411 | jobType: JobType | |
412 | }): Promise<Job[]> { | |
413 | const { state, start, count, asc, jobType } = options | |
414 | ||
415 | const states = this.buildStateFilter(state) | |
416 | const filteredJobTypes = this.buildTypeFilter(jobType) | |
417 | ||
418 | let results: Job[] = [] | |
419 | ||
420 | for (const jobType of filteredJobTypes) { | |
421 | const queue: Queue = this.queues[jobType] | |
422 | ||
423 | if (queue === undefined) { | |
424 | logger.error('Unknown queue %s to list jobs.', jobType) | |
425 | continue | |
426 | } | |
427 | ||
428 | const jobs = await queue.getJobs(states, 0, start + count, asc) | |
429 | results = results.concat(jobs) | |
430 | } | |
431 | ||
432 | results.sort((j1: any, j2: any) => { | |
433 | if (j1.timestamp < j2.timestamp) return -1 | |
434 | else if (j1.timestamp === j2.timestamp) return 0 | |
435 | ||
436 | return 1 | |
437 | }) | |
438 | ||
439 | if (asc === false) results.reverse() | |
440 | ||
441 | return results.slice(start, start + count) | |
442 | } | |
443 | ||
444 | async count (state: JobState, jobType?: JobType): Promise<number> { | |
445 | const states = state ? [ state ] : jobStates | |
446 | const filteredJobTypes = this.buildTypeFilter(jobType) | |
447 | ||
448 | let total = 0 | |
449 | ||
450 | for (const type of filteredJobTypes) { | |
451 | const queue = this.queues[type] | |
452 | if (queue === undefined) { | |
453 | logger.error('Unknown queue %s to count jobs.', type) | |
454 | continue | |
455 | } | |
456 | ||
457 | const counts = await queue.getJobCounts() | |
458 | ||
459 | for (const s of states) { | |
460 | total += counts[s] | |
461 | } | |
462 | } | |
463 | ||
464 | return total | |
465 | } | |
466 | ||
467 | private buildStateFilter (state?: JobState) { | |
468 | if (!state) return jobStates | |
469 | ||
470 | const states = [ state ] | |
471 | ||
472 | // Include parent if filtering on waiting | |
473 | if (state === 'waiting') states.push('waiting-children') | |
474 | ||
475 | return states | |
476 | } | |
477 | ||
478 | private buildTypeFilter (jobType?: JobType) { | |
479 | if (!jobType) return jobTypes | |
480 | ||
481 | return jobTypes.filter(t => t === jobType) | |
482 | } | |
483 | ||
484 | async getStats () { | |
485 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | |
486 | ||
487 | return Promise.all(promises) | |
488 | } | |
489 | ||
490 | // --------------------------------------------------------------------------- | |
491 | ||
492 | async removeOldJobs () { | |
493 | for (const key of Object.keys(this.queues)) { | |
494 | const queue: Queue = this.queues[key] | |
495 | await queue.clean(parseDurationToMs('7 days'), 1000, 'completed') | |
496 | await queue.clean(parseDurationToMs('7 days'), 1000, 'failed') | |
497 | } | |
498 | } | |
499 | ||
500 | private addRepeatableJobs () { | |
501 | this.queues['videos-views-stats'].add('job', {}, { | |
502 | repeat: REPEAT_JOBS['videos-views-stats'], | |
503 | ||
504 | ...this.buildJobRemovalOptions('videos-views-stats') | |
505 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | |
506 | ||
507 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | |
508 | this.queues['activitypub-cleaner'].add('job', {}, { | |
509 | repeat: REPEAT_JOBS['activitypub-cleaner'], | |
510 | ||
511 | ...this.buildJobRemovalOptions('activitypub-cleaner') | |
512 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | |
513 | } | |
514 | } | |
515 | ||
516 | private getJobConcurrency (jobType: JobType) { | |
517 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | |
518 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | |
519 | ||
520 | return JOB_CONCURRENCY[jobType] | |
521 | } | |
522 | ||
523 | private buildJobRemovalOptions (queueName: string) { | |
524 | return { | |
525 | removeOnComplete: { | |
526 | // Wants seconds | |
527 | age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, | |
528 | ||
529 | count: JOB_REMOVAL_OPTIONS.COUNT | |
530 | }, | |
531 | removeOnFail: { | |
532 | // Wants seconds | |
533 | age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, | |
534 | ||
535 | count: JOB_REMOVAL_OPTIONS.COUNT / 1000 | |
536 | } | |
537 | } | |
538 | } | |
539 | ||
540 | static get Instance () { | |
541 | return this.instance || (this.instance = new this()) | |
542 | } | |
543 | } | |
544 | ||
545 | // --------------------------------------------------------------------------- | |
546 | ||
547 | export { | |
548 | jobTypes, | |
549 | JobQueue | |
550 | } |