]>
Commit | Line | Data |
---|---|---|
5a921e7b | 1 | import { |
bd911b54 C |
2 | FlowJob, |
3 | FlowProducer, | |
5a921e7b C |
4 | Job, |
5 | JobsOptions, | |
6 | Queue, | |
7 | QueueEvents, | |
8 | QueueEventsOptions, | |
9 | QueueOptions, | |
10 | QueueScheduler, | |
11 | QueueSchedulerOptions, | |
12 | Worker, | |
13 | WorkerOptions | |
14 | } from 'bullmq' | |
402145b8 | 15 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
9129b769 | 16 | import { CONFIG } from '@server/initializers/config' |
402145b8 | 17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
bd911b54 | 18 | import { pick, timeoutPromise } from '@shared/core-utils' |
8dc8a34e C |
19 | import { |
20 | ActivitypubFollowPayload, | |
21 | ActivitypubHttpBroadcastPayload, | |
e1c55031 C |
22 | ActivitypubHttpFetcherPayload, |
23 | ActivitypubHttpUnicastPayload, | |
8795d6f2 | 24 | ActorKeysPayload, |
276250f0 | 25 | DeleteResumableUploadMetaFilePayload, |
e1c55031 | 26 | EmailPayload, |
bd911b54 | 27 | FederateVideoPayload, |
8dc8a34e | 28 | JobState, |
e1c55031 | 29 | JobType, |
f012319a | 30 | ManageVideoTorrentPayload, |
0305db28 | 31 | MoveObjectStoragePayload, |
bd911b54 | 32 | NotifyPayload, |
e1c55031 C |
33 | RefreshPayload, |
34 | VideoFileImportPayload, | |
35 | VideoImportPayload, | |
a5cf76af | 36 | VideoLiveEndingPayload, |
e1c55031 | 37 | VideoRedundancyPayload, |
92e66e04 | 38 | VideoStudioEditionPayload, |
e1c55031 | 39 | VideoTranscodingPayload |
8dc8a34e | 40 | } from '../../../shared/models' |
94a5ff8a | 41 | import { logger } from '../../helpers/logger' |
74dc3bca | 42 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
22df69fd | 43 | import { Hooks } from '../plugins/hooks' |
74d249bc | 44 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
402145b8 | 45 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
8dc8a34e C |
46 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
47 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | |
48 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | |
e1c55031 | 49 | import { refreshAPObject } from './handlers/activitypub-refresher' |
8795d6f2 | 50 | import { processActorKeys } from './handlers/actor-keys' |
402145b8 | 51 | import { processEmail } from './handlers/email' |
bd911b54 | 52 | import { processFederateVideo } from './handlers/federate-video' |
f012319a | 53 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
32567717 | 54 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
bd911b54 | 55 | import { processNotify } from './handlers/notify' |
e1c55031 | 56 | import { processVideoFileImport } from './handlers/video-file-import' |
402145b8 | 57 | import { processVideoImport } from './handlers/video-import' |
a5cf76af | 58 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
92e66e04 | 59 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
402145b8 | 60 | import { processVideoTranscoding } from './handlers/video-transcoding' |
51353d9a | 61 | import { processVideosViewsStats } from './handlers/video-views-stats' |
94a5ff8a | 62 | |
bd911b54 | 63 | export type CreateJobArgument = |
94a5ff8a | 64 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
f27b7a75 | 65 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
94a5ff8a C |
66 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
67 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | |
74d249bc | 68 | { type: 'activitypub-http-cleaner', payload: {} } | |
5350fd8e | 69 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | |
28be8916 | 70 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
a0327eed | 71 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | |
fbad87b0 | 72 | { type: 'email', payload: EmailPayload } | |
6b616860 | 73 | { type: 'video-import', payload: VideoImportPayload } | |
04b8c3fb | 74 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
51353d9a | 75 | { type: 'videos-views-stats', payload: {} } | |
a5cf76af | 76 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | |
8795d6f2 | 77 | { type: 'actor-keys', payload: ActorKeysPayload } | |
0305db28 | 78 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | |
276250f0 | 79 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
92e66e04 | 80 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
f012319a | 81 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
bd911b54 C |
82 | { type: 'notify', payload: NotifyPayload } | |
83 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | |
84 | { type: 'federate-video', payload: FederateVideoPayload } | |
94a5ff8a | 85 | |
0305db28 | 86 | export type CreateJobOptions = { |
a5cf76af | 87 | delay?: number |
77d7e851 | 88 | priority?: number |
a5cf76af C |
89 | } |
90 | ||
41fb13c3 | 91 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
94a5ff8a | 92 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
f27b7a75 | 93 | 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, |
94a5ff8a C |
94 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
95 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | |
74d249bc | 96 | 'activitypub-cleaner': processActivityPubCleaner, |
5350fd8e | 97 | 'activitypub-follow': processActivityPubFollow, |
28be8916 | 98 | 'video-file-import': processVideoFileImport, |
a0327eed | 99 | 'video-transcoding': processVideoTranscoding, |
fbad87b0 | 100 | 'email': processEmail, |
6b616860 | 101 | 'video-import': processVideoImport, |
51353d9a | 102 | 'videos-views-stats': processVideosViewsStats, |
b764380a | 103 | 'activitypub-refresher': refreshAPObject, |
a5cf76af | 104 | 'video-live-ending': processVideoLiveEnding, |
8795d6f2 | 105 | 'actor-keys': processActorKeys, |
0305db28 | 106 | 'video-redundancy': processVideoRedundancy, |
c729caf6 | 107 | 'move-to-object-storage': processMoveToObjectStorage, |
f012319a | 108 | 'manage-video-torrent': processManageVideoTorrent, |
bd911b54 C |
109 | 'notify': processNotify, |
110 | 'video-studio-edition': processVideoStudioEdition, | |
111 | 'federate-video': processFederateVideo | |
94a5ff8a C |
112 | } |
113 | ||
32567717 C |
114 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
115 | 'move-to-object-storage': onMoveToObjectStorageFailure | |
116 | } | |
117 | ||
94831479 C |
118 | const jobTypes: JobType[] = [ |
119 | 'activitypub-follow', | |
71e3dfda | 120 | 'activitypub-http-broadcast', |
f27b7a75 | 121 | 'activitypub-http-broadcast-parallel', |
71e3dfda | 122 | 'activitypub-http-fetcher', |
94831479 | 123 | 'activitypub-http-unicast', |
74d249bc | 124 | 'activitypub-cleaner', |
94831479 | 125 | 'email', |
a0327eed | 126 | 'video-transcoding', |
fbad87b0 | 127 | 'video-file-import', |
6b616860 | 128 | 'video-import', |
51353d9a | 129 | 'videos-views-stats', |
b764380a | 130 | 'activitypub-refresher', |
a5cf76af | 131 | 'video-redundancy', |
8795d6f2 | 132 | 'actor-keys', |
0305db28 | 133 | 'video-live-ending', |
c729caf6 | 134 | 'move-to-object-storage', |
f012319a | 135 | 'manage-video-torrent', |
bd911b54 C |
136 | 'video-studio-edition', |
137 | 'notify', | |
138 | 'federate-video' | |
71e3dfda C |
139 | ] |
140 | ||
941d28cc C |
141 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
142 | ||
94a5ff8a C |
143 | class JobQueue { |
144 | ||
145 | private static instance: JobQueue | |
146 | ||
5a921e7b | 147 | private workers: { [id in JobType]?: Worker } = {} |
41fb13c3 | 148 | private queues: { [id in JobType]?: Queue } = {} |
5a921e7b C |
149 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} |
150 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | |
151 | ||
bd911b54 C |
152 | private flowProducer: FlowProducer |
153 | ||
94a5ff8a | 154 | private initialized = false |
2c29ad4f | 155 | private jobRedisPrefix: string |
94a5ff8a | 156 | |
a1587156 C |
157 | private constructor () { |
158 | } | |
94a5ff8a | 159 | |
e1ab52d7 | 160 | init (produceOnly = false) { |
94a5ff8a C |
161 | // Already initialized |
162 | if (this.initialized === true) return | |
163 | this.initialized = true | |
164 | ||
6dd9de95 | 165 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST |
ff4d2c73 | 166 | |
5a921e7b C |
167 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
168 | this.buildWorker(handlerName, produceOnly) | |
169 | this.buildQueue(handlerName) | |
170 | this.buildQueueScheduler(handlerName, produceOnly) | |
171 | this.buildQueueEvent(handlerName, produceOnly) | |
172 | } | |
173 | ||
bd911b54 C |
174 | this.flowProducer = new FlowProducer({ |
175 | connection: this.getRedisConnection(), | |
176 | prefix: this.jobRedisPrefix | |
177 | }) | |
178 | ||
5a921e7b C |
179 | this.addRepeatableJobs() |
180 | } | |
181 | ||
182 | private buildWorker (handlerName: JobType, produceOnly: boolean) { | |
183 | const workerOptions: WorkerOptions = { | |
184 | autorun: !produceOnly, | |
185 | concurrency: this.getJobConcurrency(handlerName), | |
2c29ad4f | 186 | prefix: this.jobRedisPrefix, |
5a921e7b | 187 | connection: this.getRedisConnection() |
94831479 | 188 | } |
ecb4e35f | 189 | |
5a921e7b C |
190 | const handler = function (job: Job) { |
191 | const timeout = JOB_TTL[handlerName] | |
192 | const p = handlers[handlerName](job) | |
e1ab52d7 | 193 | |
5a921e7b | 194 | if (!timeout) return p |
e1ab52d7 | 195 | |
5a921e7b C |
196 | return timeoutPromise(p, timeout) |
197 | } | |
94a5ff8a | 198 | |
5a921e7b C |
199 | const processor = async (jobArg: Job<any>) => { |
200 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | |
22df69fd | 201 | |
5a921e7b C |
202 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') |
203 | } | |
d7f83948 | 204 | |
5a921e7b | 205 | const worker = new Worker(handlerName, processor, workerOptions) |
941d28cc | 206 | |
5a921e7b C |
207 | worker.on('failed', (job, err) => { |
208 | const logLevel = silentFailure.has(handlerName) | |
209 | ? 'debug' | |
210 | : 'error' | |
32567717 | 211 | |
5a921e7b | 212 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) |
3df45638 | 213 | |
5a921e7b C |
214 | if (errorHandlers[job.name]) { |
215 | errorHandlers[job.name](job, err) | |
216 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | |
217 | } | |
218 | }) | |
94831479 | 219 | |
5a921e7b C |
220 | worker.on('error', err => { |
221 | logger.error('Error in job queue %s.', handlerName, { err }) | |
222 | }) | |
223 | ||
224 | this.workers[handlerName] = worker | |
225 | } | |
226 | ||
227 | private buildQueue (handlerName: JobType) { | |
228 | const queueOptions: QueueOptions = { | |
229 | connection: this.getRedisConnection(), | |
230 | prefix: this.jobRedisPrefix | |
94a5ff8a | 231 | } |
6b616860 | 232 | |
5a921e7b C |
233 | this.queues[handlerName] = new Queue(handlerName, queueOptions) |
234 | } | |
235 | ||
236 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { | |
237 | const queueSchedulerOptions: QueueSchedulerOptions = { | |
238 | autorun: !produceOnly, | |
239 | connection: this.getRedisConnection(), | |
240 | prefix: this.jobRedisPrefix, | |
241 | maxStalledCount: 10 | |
242 | } | |
243 | this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) | |
94a5ff8a C |
244 | } |
245 | ||
5a921e7b C |
246 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { |
247 | const queueEventsOptions: QueueEventsOptions = { | |
248 | autorun: !produceOnly, | |
249 | connection: this.getRedisConnection(), | |
250 | prefix: this.jobRedisPrefix | |
14f2b3ad | 251 | } |
5a921e7b C |
252 | this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) |
253 | } | |
254 | ||
255 | private getRedisConnection () { | |
256 | return { | |
257 | password: CONFIG.REDIS.AUTH, | |
258 | db: CONFIG.REDIS.DB, | |
259 | host: CONFIG.REDIS.HOSTNAME, | |
260 | port: CONFIG.REDIS.PORT, | |
261 | path: CONFIG.REDIS.SOCKET | |
262 | } | |
263 | } | |
264 | ||
bd911b54 C |
265 | // --------------------------------------------------------------------------- |
266 | ||
5a921e7b C |
267 | async terminate () { |
268 | const promises = Object.keys(this.workers) | |
269 | .map(handlerName => { | |
270 | const worker: Worker = this.workers[handlerName] | |
271 | const queue: Queue = this.queues[handlerName] | |
272 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
273 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
274 | ||
275 | return Promise.all([ | |
276 | worker.close(false), | |
277 | queue.close(), | |
278 | queueScheduler.close(), | |
279 | queueEvent.close() | |
280 | ]) | |
281 | }) | |
282 | ||
283 | return Promise.all(promises) | |
14f2b3ad C |
284 | } |
285 | ||
419b520c | 286 | async pause () { |
5a921e7b C |
287 | for (const handler of Object.keys(this.workers)) { |
288 | const worker: Worker = this.workers[handler] | |
289 | ||
290 | await worker.pause() | |
419b520c C |
291 | } |
292 | } | |
293 | ||
5a921e7b C |
294 | resume () { |
295 | for (const handler of Object.keys(this.workers)) { | |
296 | const worker: Worker = this.workers[handler] | |
297 | ||
298 | worker.resume() | |
419b520c C |
299 | } |
300 | } | |
301 | ||
bd911b54 C |
302 | // --------------------------------------------------------------------------- |
303 | ||
304 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { | |
305 | this.createJob(options) | |
306 | .catch(err => logger.error('Cannot create job.', { err, options })) | |
a1587156 C |
307 | } |
308 | ||
bd911b54 C |
309 | async createJob (options: CreateJobArgument & CreateJobOptions) { |
310 | const queue: Queue = this.queues[options.type] | |
94831479 | 311 | if (queue === undefined) { |
bd911b54 | 312 | logger.error('Unknown queue %s: cannot create job.', options.type) |
a1587156 | 313 | return |
94831479 | 314 | } |
94a5ff8a | 315 | |
bd911b54 C |
316 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) |
317 | ||
318 | return queue.add('job', options.payload, jobOptions) | |
319 | } | |
320 | ||
321 | async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | |
322 | let lastJob: FlowJob | |
323 | ||
324 | for (const job of jobs) { | |
325 | if (!job) continue | |
326 | ||
327 | lastJob = { | |
328 | name: 'job', | |
329 | data: job.payload, | |
330 | queueName: job.type, | |
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | |
332 | children: lastJob | |
333 | ? [ lastJob ] | |
334 | : [] | |
335 | } | |
336 | } | |
337 | ||
338 | return this.flowProducer.add(lastJob) | |
339 | } | |
340 | ||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | |
342 | return { | |
94831479 | 343 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
bd911b54 | 344 | attempts: JOB_ATTEMPTS[type], |
77d7e851 | 345 | priority: options.priority, |
a5cf76af | 346 | delay: options.delay |
94831479 | 347 | } |
94a5ff8a C |
348 | } |
349 | ||
bd911b54 C |
350 | // --------------------------------------------------------------------------- |
351 | ||
1061c73f | 352 | async listForApi (options: { |
402145b8 | 353 | state?: JobState |
a1587156 C |
354 | start: number |
355 | count: number | |
356 | asc?: boolean | |
1061c73f | 357 | jobType: JobType |
41fb13c3 | 358 | }): Promise<Job[]> { |
402145b8 C |
359 | const { state, start, count, asc, jobType } = options |
360 | ||
361 | const states = state ? [ state ] : jobStates | |
41fb13c3 | 362 | let results: Job[] = [] |
94a5ff8a | 363 | |
1061c73f C |
364 | const filteredJobTypes = this.filterJobTypes(jobType) |
365 | ||
1061c73f | 366 | for (const jobType of filteredJobTypes) { |
5a921e7b C |
367 | const queue: Queue = this.queues[jobType] |
368 | ||
94831479 C |
369 | if (queue === undefined) { |
370 | logger.error('Unknown queue %s to list jobs.', jobType) | |
371 | continue | |
372 | } | |
2c29ad4f | 373 | |
402145b8 | 374 | const jobs = await queue.getJobs(states, 0, start + count, asc) |
94831479 C |
375 | results = results.concat(jobs) |
376 | } | |
94a5ff8a | 377 | |
94831479 C |
378 | results.sort((j1: any, j2: any) => { |
379 | if (j1.timestamp < j2.timestamp) return -1 | |
380 | else if (j1.timestamp === j2.timestamp) return 0 | |
94a5ff8a | 381 | |
94831479 | 382 | return 1 |
94a5ff8a | 383 | }) |
94a5ff8a | 384 | |
94831479 | 385 | if (asc === false) results.reverse() |
94a5ff8a | 386 | |
94831479 | 387 | return results.slice(start, start + count) |
94a5ff8a C |
388 | } |
389 | ||
402145b8 C |
390 | async count (state: JobState, jobType?: JobType): Promise<number> { |
391 | const states = state ? [ state ] : jobStates | |
94831479 | 392 | let total = 0 |
3df45638 | 393 | |
1061c73f C |
394 | const filteredJobTypes = this.filterJobTypes(jobType) |
395 | ||
396 | for (const type of filteredJobTypes) { | |
a1587156 | 397 | const queue = this.queues[type] |
94831479 C |
398 | if (queue === undefined) { |
399 | logger.error('Unknown queue %s to count jobs.', type) | |
400 | continue | |
401 | } | |
3df45638 | 402 | |
94831479 | 403 | const counts = await queue.getJobCounts() |
3df45638 | 404 | |
040d6896 RK |
405 | for (const s of states) { |
406 | total += counts[s] | |
407 | } | |
94831479 | 408 | } |
3df45638 | 409 | |
94831479 | 410 | return total |
3df45638 C |
411 | } |
412 | ||
630d0a1b C |
413 | async getStats () { |
414 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | |
415 | ||
416 | return Promise.all(promises) | |
417 | } | |
418 | ||
bd911b54 C |
419 | // --------------------------------------------------------------------------- |
420 | ||
2f5c6b2f | 421 | async removeOldJobs () { |
94831479 | 422 | for (const key of Object.keys(this.queues)) { |
5a921e7b C |
423 | const queue: Queue = this.queues[key] |
424 | await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') | |
94831479 | 425 | } |
2c29ad4f C |
426 | } |
427 | ||
5a921e7b C |
428 | waitJob (job: Job) { |
429 | return job.waitUntilFinished(this.queueEvents[job.queueName]) | |
430 | } | |
431 | ||
6b616860 | 432 | private addRepeatableJobs () { |
5a921e7b | 433 | this.queues['videos-views-stats'].add('job', {}, { |
51353d9a | 434 | repeat: REPEAT_JOBS['videos-views-stats'] |
a1587156 | 435 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
74d249bc C |
436 | |
437 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | |
5a921e7b | 438 | this.queues['activitypub-cleaner'].add('job', {}, { |
74d249bc C |
439 | repeat: REPEAT_JOBS['activitypub-cleaner'] |
440 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | |
441 | } | |
6b616860 C |
442 | } |
443 | ||
1061c73f C |
444 | private filterJobTypes (jobType?: JobType) { |
445 | if (!jobType) return jobTypes | |
446 | ||
447 | return jobTypes.filter(t => t === jobType) | |
448 | } | |
449 | ||
9129b769 C |
450 | private getJobConcurrency (jobType: JobType) { |
451 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | |
452 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | |
453 | ||
454 | return JOB_CONCURRENCY[jobType] | |
455 | } | |
456 | ||
94a5ff8a C |
457 | static get Instance () { |
458 | return this.instance || (this.instance = new this()) | |
459 | } | |
460 | } | |
461 | ||
462 | // --------------------------------------------------------------------------- | |
463 | ||
464 | export { | |
1061c73f | 465 | jobTypes, |
94a5ff8a C |
466 | JobQueue |
467 | } |