]>
Commit | Line | Data |
---|---|---|
1 | import { | |
2 | FlowJob, | |
3 | FlowProducer, | |
4 | Job, | |
5 | JobsOptions, | |
6 | Queue, | |
7 | QueueEvents, | |
8 | QueueEventsOptions, | |
9 | QueueOptions, | |
10 | QueueScheduler, | |
11 | QueueSchedulerOptions, | |
12 | Worker, | |
13 | WorkerOptions | |
14 | } from 'bullmq' | |
15 | import { jobStates } from '@server/helpers/custom-validators/jobs' | |
16 | import { CONFIG } from '@server/initializers/config' | |
17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | |
18 | import { pick, timeoutPromise } from '@shared/core-utils' | |
19 | import { | |
20 | ActivitypubFollowPayload, | |
21 | ActivitypubHttpBroadcastPayload, | |
22 | ActivitypubHttpFetcherPayload, | |
23 | ActivitypubHttpUnicastPayload, | |
24 | ActorKeysPayload, | |
25 | AfterVideoChannelImportPayload, | |
26 | DeleteResumableUploadMetaFilePayload, | |
27 | EmailPayload, | |
28 | FederateVideoPayload, | |
29 | JobState, | |
30 | JobType, | |
31 | ManageVideoTorrentPayload, | |
32 | MoveObjectStoragePayload, | |
33 | NotifyPayload, | |
34 | RefreshPayload, | |
35 | VideoChannelImportPayload, | |
36 | VideoFileImportPayload, | |
37 | VideoImportPayload, | |
38 | VideoLiveEndingPayload, | |
39 | VideoRedundancyPayload, | |
40 | VideoStudioEditionPayload, | |
41 | VideoTranscodingPayload | |
42 | } from '../../../shared/models' | |
43 | import { logger } from '../../helpers/logger' | |
44 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | |
45 | import { Hooks } from '../plugins/hooks' | |
46 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | |
47 | import { processActivityPubFollow } from './handlers/activitypub-follow' | |
48 | import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' | |
49 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | |
50 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | |
51 | import { refreshAPObject } from './handlers/activitypub-refresher' | |
52 | import { processActorKeys } from './handlers/actor-keys' | |
53 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' | |
54 | import { processEmail } from './handlers/email' | |
55 | import { processFederateVideo } from './handlers/federate-video' | |
56 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | |
57 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | |
58 | import { processNotify } from './handlers/notify' | |
59 | import { processVideoChannelImport } from './handlers/video-channel-import' | |
60 | import { processVideoFileImport } from './handlers/video-file-import' | |
61 | import { processVideoImport } from './handlers/video-import' | |
62 | import { processVideoLiveEnding } from './handlers/video-live-ending' | |
63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | |
64 | import { processVideoTranscoding } from './handlers/video-transcoding' | |
65 | import { processVideosViewsStats } from './handlers/video-views-stats' | |
66 | import { Redis } from '../redis' | |
67 | ||
68 | export type CreateJobArgument = | |
69 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |
70 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | |
71 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | |
72 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | |
73 | { type: 'activitypub-http-cleaner', payload: {} } | | |
74 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | |
75 | { type: 'video-file-import', payload: VideoFileImportPayload } | | |
76 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | | |
77 | { type: 'email', payload: EmailPayload } | | |
78 | { type: 'video-import', payload: VideoImportPayload } | | |
79 | { type: 'activitypub-refresher', payload: RefreshPayload } | | |
80 | { type: 'videos-views-stats', payload: {} } | | |
81 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | |
82 | { type: 'actor-keys', payload: ActorKeysPayload } | | |
83 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | |
84 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | |
85 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | |
86 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | |
87 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | |
88 | { type: 'video-channel-import', payload: VideoChannelImportPayload } | | |
89 | { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | | |
90 | { type: 'notify', payload: NotifyPayload } | | |
91 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | |
92 | { type: 'federate-video', payload: FederateVideoPayload } | |
93 | ||
94 | export type CreateJobOptions = { | |
95 | delay?: number | |
96 | priority?: number | |
97 | } | |
98 | ||
99 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |
100 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, | |
101 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, | |
102 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | |
103 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | |
104 | 'activitypub-cleaner': processActivityPubCleaner, | |
105 | 'activitypub-follow': processActivityPubFollow, | |
106 | 'video-file-import': processVideoFileImport, | |
107 | 'video-transcoding': processVideoTranscoding, | |
108 | 'email': processEmail, | |
109 | 'video-import': processVideoImport, | |
110 | 'videos-views-stats': processVideosViewsStats, | |
111 | 'activitypub-refresher': refreshAPObject, | |
112 | 'video-live-ending': processVideoLiveEnding, | |
113 | 'actor-keys': processActorKeys, | |
114 | 'video-redundancy': processVideoRedundancy, | |
115 | 'move-to-object-storage': processMoveToObjectStorage, | |
116 | 'manage-video-torrent': processManageVideoTorrent, | |
117 | 'video-studio-edition': processVideoStudioEdition, | |
118 | 'video-channel-import': processVideoChannelImport, | |
119 | 'after-video-channel-import': processAfterVideoChannelImport, | |
120 | 'notify': processNotify, | |
121 | 'federate-video': processFederateVideo | |
122 | } | |
123 | ||
124 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | |
125 | 'move-to-object-storage': onMoveToObjectStorageFailure | |
126 | } | |
127 | ||
128 | const jobTypes: JobType[] = [ | |
129 | 'activitypub-follow', | |
130 | 'activitypub-http-broadcast', | |
131 | 'activitypub-http-broadcast-parallel', | |
132 | 'activitypub-http-fetcher', | |
133 | 'activitypub-http-unicast', | |
134 | 'activitypub-cleaner', | |
135 | 'email', | |
136 | 'video-transcoding', | |
137 | 'video-file-import', | |
138 | 'video-import', | |
139 | 'videos-views-stats', | |
140 | 'activitypub-refresher', | |
141 | 'video-redundancy', | |
142 | 'actor-keys', | |
143 | 'video-live-ending', | |
144 | 'move-to-object-storage', | |
145 | 'manage-video-torrent', | |
146 | 'video-studio-edition', | |
147 | 'video-channel-import', | |
148 | 'after-video-channel-import', | |
149 | 'notify', | |
150 | 'federate-video' | |
151 | ] | |
152 | ||
153 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | |
154 | ||
155 | class JobQueue { | |
156 | ||
157 | private static instance: JobQueue | |
158 | ||
159 | private workers: { [id in JobType]?: Worker } = {} | |
160 | private queues: { [id in JobType]?: Queue } = {} | |
161 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | |
162 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | |
163 | ||
164 | private flowProducer: FlowProducer | |
165 | ||
166 | private initialized = false | |
167 | private jobRedisPrefix: string | |
168 | ||
169 | private constructor () { | |
170 | } | |
171 | ||
172 | init () { | |
173 | // Already initialized | |
174 | if (this.initialized === true) return | |
175 | this.initialized = true | |
176 | ||
177 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | |
178 | ||
179 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | |
180 | this.buildWorker(handlerName) | |
181 | this.buildQueue(handlerName) | |
182 | this.buildQueueScheduler(handlerName) | |
183 | this.buildQueueEvent(handlerName) | |
184 | } | |
185 | ||
186 | this.flowProducer = new FlowProducer({ | |
187 | connection: Redis.getRedisClientOptions('FlowProducer'), | |
188 | prefix: this.jobRedisPrefix | |
189 | }) | |
190 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) | |
191 | ||
192 | this.addRepeatableJobs() | |
193 | } | |
194 | ||
195 | private buildWorker (handlerName: JobType) { | |
196 | const workerOptions: WorkerOptions = { | |
197 | autorun: false, | |
198 | concurrency: this.getJobConcurrency(handlerName), | |
199 | prefix: this.jobRedisPrefix, | |
200 | connection: Redis.getRedisClientOptions('Worker') | |
201 | } | |
202 | ||
203 | const handler = function (job: Job) { | |
204 | const timeout = JOB_TTL[handlerName] | |
205 | const p = handlers[handlerName](job) | |
206 | ||
207 | if (!timeout) return p | |
208 | ||
209 | return timeoutPromise(p, timeout) | |
210 | } | |
211 | ||
212 | const processor = async (jobArg: Job<any>) => { | |
213 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | |
214 | ||
215 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | |
216 | } | |
217 | ||
218 | const worker = new Worker(handlerName, processor, workerOptions) | |
219 | ||
220 | worker.on('failed', (job, err) => { | |
221 | const logLevel = silentFailure.has(handlerName) | |
222 | ? 'debug' | |
223 | : 'error' | |
224 | ||
225 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) | |
226 | ||
227 | if (errorHandlers[job.name]) { | |
228 | errorHandlers[job.name](job, err) | |
229 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | |
230 | } | |
231 | }) | |
232 | ||
233 | worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) | |
234 | ||
235 | this.workers[handlerName] = worker | |
236 | } | |
237 | ||
238 | private buildQueue (handlerName: JobType) { | |
239 | const queueOptions: QueueOptions = { | |
240 | connection: Redis.getRedisClientOptions('Queue'), | |
241 | prefix: this.jobRedisPrefix | |
242 | } | |
243 | ||
244 | const queue = new Queue(handlerName, queueOptions) | |
245 | queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) | |
246 | ||
247 | this.queues[handlerName] = queue | |
248 | } | |
249 | ||
250 | private buildQueueScheduler (handlerName: JobType) { | |
251 | const queueSchedulerOptions: QueueSchedulerOptions = { | |
252 | autorun: false, | |
253 | connection: Redis.getRedisClientOptions('QueueScheduler'), | |
254 | prefix: this.jobRedisPrefix, | |
255 | maxStalledCount: 10 | |
256 | } | |
257 | ||
258 | const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) | |
259 | queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) | |
260 | ||
261 | this.queueSchedulers[handlerName] = queueScheduler | |
262 | } | |
263 | ||
264 | private buildQueueEvent (handlerName: JobType) { | |
265 | const queueEventsOptions: QueueEventsOptions = { | |
266 | autorun: false, | |
267 | connection: Redis.getRedisClientOptions('QueueEvent'), | |
268 | prefix: this.jobRedisPrefix | |
269 | } | |
270 | ||
271 | const queueEvents = new QueueEvents(handlerName, queueEventsOptions) | |
272 | queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) | |
273 | ||
274 | this.queueEvents[handlerName] = queueEvents | |
275 | } | |
276 | ||
277 | // --------------------------------------------------------------------------- | |
278 | ||
279 | async terminate () { | |
280 | const promises = Object.keys(this.workers) | |
281 | .map(handlerName => { | |
282 | const worker: Worker = this.workers[handlerName] | |
283 | const queue: Queue = this.queues[handlerName] | |
284 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
285 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
286 | ||
287 | return Promise.all([ | |
288 | worker.close(false), | |
289 | queue.close(), | |
290 | queueScheduler.close(), | |
291 | queueEvent.close() | |
292 | ]) | |
293 | }) | |
294 | ||
295 | return Promise.all(promises) | |
296 | } | |
297 | ||
298 | start () { | |
299 | const promises = Object.keys(this.workers) | |
300 | .map(handlerName => { | |
301 | const worker: Worker = this.workers[handlerName] | |
302 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
303 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
304 | ||
305 | return Promise.all([ | |
306 | worker.run(), | |
307 | queueScheduler.run(), | |
308 | queueEvent.run() | |
309 | ]) | |
310 | }) | |
311 | ||
312 | return Promise.all(promises) | |
313 | } | |
314 | ||
315 | async pause () { | |
316 | for (const handlerName of Object.keys(this.workers)) { | |
317 | const worker: Worker = this.workers[handlerName] | |
318 | ||
319 | await worker.pause() | |
320 | } | |
321 | } | |
322 | ||
323 | resume () { | |
324 | for (const handlerName of Object.keys(this.workers)) { | |
325 | const worker: Worker = this.workers[handlerName] | |
326 | ||
327 | worker.resume() | |
328 | } | |
329 | } | |
330 | ||
331 | // --------------------------------------------------------------------------- | |
332 | ||
333 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { | |
334 | this.createJob(options) | |
335 | .catch(err => logger.error('Cannot create job.', { err, options })) | |
336 | } | |
337 | ||
338 | createJob (options: CreateJobArgument & CreateJobOptions) { | |
339 | const queue: Queue = this.queues[options.type] | |
340 | if (queue === undefined) { | |
341 | logger.error('Unknown queue %s: cannot create job.', options.type) | |
342 | return | |
343 | } | |
344 | ||
345 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) | |
346 | ||
347 | return queue.add('job', options.payload, jobOptions) | |
348 | } | |
349 | ||
350 | createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | |
351 | let lastJob: FlowJob | |
352 | ||
353 | for (const job of jobs) { | |
354 | if (!job) continue | |
355 | ||
356 | lastJob = { | |
357 | ...this.buildJobFlowOption(job), | |
358 | ||
359 | children: lastJob | |
360 | ? [ lastJob ] | |
361 | : [] | |
362 | } | |
363 | } | |
364 | ||
365 | return this.flowProducer.add(lastJob) | |
366 | } | |
367 | ||
368 | createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | |
369 | return this.flowProducer.add({ | |
370 | ...this.buildJobFlowOption(parent), | |
371 | ||
372 | children: children.map(c => this.buildJobFlowOption(c)) | |
373 | }) | |
374 | } | |
375 | ||
376 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { | |
377 | return { | |
378 | name: 'job', | |
379 | data: job.payload, | |
380 | queueName: job.type, | |
381 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | |
382 | } | |
383 | } | |
384 | ||
385 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | |
386 | return { | |
387 | backoff: { delay: 60 * 1000, type: 'exponential' }, | |
388 | attempts: JOB_ATTEMPTS[type], | |
389 | priority: options.priority, | |
390 | delay: options.delay | |
391 | } | |
392 | } | |
393 | ||
394 | // --------------------------------------------------------------------------- | |
395 | ||
396 | async listForApi (options: { | |
397 | state?: JobState | |
398 | start: number | |
399 | count: number | |
400 | asc?: boolean | |
401 | jobType: JobType | |
402 | }): Promise<Job[]> { | |
403 | const { state, start, count, asc, jobType } = options | |
404 | ||
405 | const states = this.buildStateFilter(state) | |
406 | const filteredJobTypes = this.buildTypeFilter(jobType) | |
407 | ||
408 | let results: Job[] = [] | |
409 | ||
410 | for (const jobType of filteredJobTypes) { | |
411 | const queue: Queue = this.queues[jobType] | |
412 | ||
413 | if (queue === undefined) { | |
414 | logger.error('Unknown queue %s to list jobs.', jobType) | |
415 | continue | |
416 | } | |
417 | ||
418 | const jobs = await queue.getJobs(states, 0, start + count, asc) | |
419 | results = results.concat(jobs) | |
420 | } | |
421 | ||
422 | results.sort((j1: any, j2: any) => { | |
423 | if (j1.timestamp < j2.timestamp) return -1 | |
424 | else if (j1.timestamp === j2.timestamp) return 0 | |
425 | ||
426 | return 1 | |
427 | }) | |
428 | ||
429 | if (asc === false) results.reverse() | |
430 | ||
431 | return results.slice(start, start + count) | |
432 | } | |
433 | ||
434 | async count (state: JobState, jobType?: JobType): Promise<number> { | |
435 | const states = state ? [ state ] : jobStates | |
436 | const filteredJobTypes = this.buildTypeFilter(jobType) | |
437 | ||
438 | let total = 0 | |
439 | ||
440 | for (const type of filteredJobTypes) { | |
441 | const queue = this.queues[type] | |
442 | if (queue === undefined) { | |
443 | logger.error('Unknown queue %s to count jobs.', type) | |
444 | continue | |
445 | } | |
446 | ||
447 | const counts = await queue.getJobCounts() | |
448 | ||
449 | for (const s of states) { | |
450 | total += counts[s] | |
451 | } | |
452 | } | |
453 | ||
454 | return total | |
455 | } | |
456 | ||
457 | private buildStateFilter (state?: JobState) { | |
458 | if (!state) return jobStates | |
459 | ||
460 | const states = [ state ] | |
461 | ||
462 | // Include parent if filtering on waiting | |
463 | if (state === 'waiting') states.push('waiting-children') | |
464 | ||
465 | return states | |
466 | } | |
467 | ||
468 | private buildTypeFilter (jobType?: JobType) { | |
469 | if (!jobType) return jobTypes | |
470 | ||
471 | return jobTypes.filter(t => t === jobType) | |
472 | } | |
473 | ||
474 | async getStats () { | |
475 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | |
476 | ||
477 | return Promise.all(promises) | |
478 | } | |
479 | ||
480 | // --------------------------------------------------------------------------- | |
481 | ||
482 | async removeOldJobs () { | |
483 | for (const key of Object.keys(this.queues)) { | |
484 | const queue: Queue = this.queues[key] | |
485 | await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') | |
486 | } | |
487 | } | |
488 | ||
489 | private addRepeatableJobs () { | |
490 | this.queues['videos-views-stats'].add('job', {}, { | |
491 | repeat: REPEAT_JOBS['videos-views-stats'] | |
492 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | |
493 | ||
494 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | |
495 | this.queues['activitypub-cleaner'].add('job', {}, { | |
496 | repeat: REPEAT_JOBS['activitypub-cleaner'] | |
497 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | |
498 | } | |
499 | } | |
500 | ||
501 | private getJobConcurrency (jobType: JobType) { | |
502 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | |
503 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | |
504 | ||
505 | return JOB_CONCURRENCY[jobType] | |
506 | } | |
507 | ||
508 | static get Instance () { | |
509 | return this.instance || (this.instance = new this()) | |
510 | } | |
511 | } | |
512 | ||
513 | // --------------------------------------------------------------------------- | |
514 | ||
515 | export { | |
516 | jobTypes, | |
517 | JobQueue | |
518 | } |