]>
Commit | Line | Data |
---|---|---|
5a921e7b | 1 | import { |
bd911b54 C |
2 | FlowJob, |
3 | FlowProducer, | |
5a921e7b C |
4 | Job, |
5 | JobsOptions, | |
6 | Queue, | |
7 | QueueEvents, | |
8 | QueueEventsOptions, | |
9 | QueueOptions, | |
10 | QueueScheduler, | |
11 | QueueSchedulerOptions, | |
12 | Worker, | |
13 | WorkerOptions | |
14 | } from 'bullmq' | |
402145b8 | 15 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
9129b769 | 16 | import { CONFIG } from '@server/initializers/config' |
402145b8 | 17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
bd911b54 | 18 | import { pick, timeoutPromise } from '@shared/core-utils' |
8dc8a34e C |
19 | import { |
20 | ActivitypubFollowPayload, | |
21 | ActivitypubHttpBroadcastPayload, | |
e1c55031 C |
22 | ActivitypubHttpFetcherPayload, |
23 | ActivitypubHttpUnicastPayload, | |
8795d6f2 | 24 | ActorKeysPayload, |
2a491182 | 25 | AfterVideoChannelImportPayload, |
276250f0 | 26 | DeleteResumableUploadMetaFilePayload, |
e1c55031 | 27 | EmailPayload, |
bd911b54 | 28 | FederateVideoPayload, |
8dc8a34e | 29 | JobState, |
e1c55031 | 30 | JobType, |
f012319a | 31 | ManageVideoTorrentPayload, |
0305db28 | 32 | MoveObjectStoragePayload, |
bd911b54 | 33 | NotifyPayload, |
e1c55031 | 34 | RefreshPayload, |
2a491182 | 35 | VideoChannelImportPayload, |
e1c55031 C |
36 | VideoFileImportPayload, |
37 | VideoImportPayload, | |
a5cf76af | 38 | VideoLiveEndingPayload, |
e1c55031 | 39 | VideoRedundancyPayload, |
92e66e04 | 40 | VideoStudioEditionPayload, |
e1c55031 | 41 | VideoTranscodingPayload |
8dc8a34e | 42 | } from '../../../shared/models' |
94a5ff8a | 43 | import { logger } from '../../helpers/logger' |
c3b21b68 C |
44 | import { |
45 | JOB_ATTEMPTS, | |
46 | JOB_CONCURRENCY, | |
47 | JOB_REMOVAL_OPTIONS, | |
48 | JOB_TTL, | |
49 | REPEAT_JOBS, | |
50 | WEBSERVER | |
51 | } from '../../initializers/constants' | |
22df69fd | 52 | import { Hooks } from '../plugins/hooks' |
c3b21b68 | 53 | import { Redis } from '../redis' |
74d249bc | 54 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
402145b8 | 55 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
405c83f9 | 56 | import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' |
8dc8a34e C |
57 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
58 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | |
e1c55031 | 59 | import { refreshAPObject } from './handlers/activitypub-refresher' |
8795d6f2 | 60 | import { processActorKeys } from './handlers/actor-keys' |
ab08ab4e | 61 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' |
402145b8 | 62 | import { processEmail } from './handlers/email' |
bd911b54 | 63 | import { processFederateVideo } from './handlers/federate-video' |
f012319a | 64 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
32567717 | 65 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
bd911b54 | 66 | import { processNotify } from './handlers/notify' |
2a491182 | 67 | import { processVideoChannelImport } from './handlers/video-channel-import' |
e1c55031 | 68 | import { processVideoFileImport } from './handlers/video-file-import' |
402145b8 | 69 | import { processVideoImport } from './handlers/video-import' |
a5cf76af | 70 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
92e66e04 | 71 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
402145b8 | 72 | import { processVideoTranscoding } from './handlers/video-transcoding' |
51353d9a | 73 | import { processVideosViewsStats } from './handlers/video-views-stats' |
c3b21b68 | 74 | import { parseDurationToMs } from '@server/helpers/core-utils' |
94a5ff8a | 75 | |
bd911b54 | 76 | export type CreateJobArgument = |
94a5ff8a | 77 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
f27b7a75 | 78 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
94a5ff8a C |
79 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
80 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | |
74d249bc | 81 | { type: 'activitypub-http-cleaner', payload: {} } | |
5350fd8e | 82 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | |
28be8916 | 83 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
a0327eed | 84 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | |
fbad87b0 | 85 | { type: 'email', payload: EmailPayload } | |
6b616860 | 86 | { type: 'video-import', payload: VideoImportPayload } | |
04b8c3fb | 87 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
51353d9a | 88 | { type: 'videos-views-stats', payload: {} } | |
a5cf76af | 89 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | |
8795d6f2 | 90 | { type: 'actor-keys', payload: ActorKeysPayload } | |
0305db28 | 91 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | |
276250f0 | 92 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
92e66e04 | 93 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
f012319a | 94 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
2a491182 F |
95 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | |
96 | { type: 'video-channel-import', payload: VideoChannelImportPayload } | | |
97 | { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | | |
bd911b54 C |
98 | { type: 'notify', payload: NotifyPayload } | |
99 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | |
100 | { type: 'federate-video', payload: FederateVideoPayload } | |
94a5ff8a | 101 | |
0305db28 | 102 | export type CreateJobOptions = { |
a5cf76af | 103 | delay?: number |
77d7e851 | 104 | priority?: number |
a5cf76af C |
105 | } |
106 | ||
41fb13c3 | 107 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
405c83f9 C |
108 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, |
109 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, | |
94a5ff8a C |
110 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
111 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | |
74d249bc | 112 | 'activitypub-cleaner': processActivityPubCleaner, |
5350fd8e | 113 | 'activitypub-follow': processActivityPubFollow, |
28be8916 | 114 | 'video-file-import': processVideoFileImport, |
a0327eed | 115 | 'video-transcoding': processVideoTranscoding, |
fbad87b0 | 116 | 'email': processEmail, |
6b616860 | 117 | 'video-import': processVideoImport, |
51353d9a | 118 | 'videos-views-stats': processVideosViewsStats, |
b764380a | 119 | 'activitypub-refresher': refreshAPObject, |
a5cf76af | 120 | 'video-live-ending': processVideoLiveEnding, |
8795d6f2 | 121 | 'actor-keys': processActorKeys, |
0305db28 | 122 | 'video-redundancy': processVideoRedundancy, |
c729caf6 | 123 | 'move-to-object-storage': processMoveToObjectStorage, |
f012319a | 124 | 'manage-video-torrent': processManageVideoTorrent, |
bd911b54 | 125 | 'video-studio-edition': processVideoStudioEdition, |
2a491182 F |
126 | 'video-channel-import': processVideoChannelImport, |
127 | 'after-video-channel-import': processAfterVideoChannelImport, | |
128 | 'notify': processNotify, | |
bd911b54 | 129 | 'federate-video': processFederateVideo |
94a5ff8a C |
130 | } |
131 | ||
32567717 C |
132 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
133 | 'move-to-object-storage': onMoveToObjectStorageFailure | |
134 | } | |
135 | ||
94831479 C |
136 | const jobTypes: JobType[] = [ |
137 | 'activitypub-follow', | |
71e3dfda | 138 | 'activitypub-http-broadcast', |
f27b7a75 | 139 | 'activitypub-http-broadcast-parallel', |
71e3dfda | 140 | 'activitypub-http-fetcher', |
94831479 | 141 | 'activitypub-http-unicast', |
74d249bc | 142 | 'activitypub-cleaner', |
94831479 | 143 | 'email', |
a0327eed | 144 | 'video-transcoding', |
fbad87b0 | 145 | 'video-file-import', |
6b616860 | 146 | 'video-import', |
51353d9a | 147 | 'videos-views-stats', |
b764380a | 148 | 'activitypub-refresher', |
a5cf76af | 149 | 'video-redundancy', |
8795d6f2 | 150 | 'actor-keys', |
0305db28 | 151 | 'video-live-ending', |
c729caf6 | 152 | 'move-to-object-storage', |
f012319a | 153 | 'manage-video-torrent', |
bd911b54 | 154 | 'video-studio-edition', |
2a491182 F |
155 | 'video-channel-import', |
156 | 'after-video-channel-import', | |
bd911b54 C |
157 | 'notify', |
158 | 'federate-video' | |
71e3dfda C |
159 | ] |
160 | ||
941d28cc C |
161 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
162 | ||
94a5ff8a C |
163 | class JobQueue { |
164 | ||
165 | private static instance: JobQueue | |
166 | ||
5a921e7b | 167 | private workers: { [id in JobType]?: Worker } = {} |
41fb13c3 | 168 | private queues: { [id in JobType]?: Queue } = {} |
5a921e7b C |
169 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} |
170 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | |
171 | ||
bd911b54 C |
172 | private flowProducer: FlowProducer |
173 | ||
94a5ff8a | 174 | private initialized = false |
2c29ad4f | 175 | private jobRedisPrefix: string |
94a5ff8a | 176 | |
a1587156 C |
177 | private constructor () { |
178 | } | |
94a5ff8a | 179 | |
4404a7c4 | 180 | init () { |
94a5ff8a C |
181 | // Already initialized |
182 | if (this.initialized === true) return | |
183 | this.initialized = true | |
184 | ||
6dd9de95 | 185 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST |
ff4d2c73 | 186 | |
1c30b112 | 187 | for (const handlerName of Object.keys(handlers)) { |
4404a7c4 | 188 | this.buildWorker(handlerName) |
5a921e7b | 189 | this.buildQueue(handlerName) |
4404a7c4 C |
190 | this.buildQueueScheduler(handlerName) |
191 | this.buildQueueEvent(handlerName) | |
5a921e7b C |
192 | } |
193 | ||
bd911b54 | 194 | this.flowProducer = new FlowProducer({ |
564b9b55 | 195 | connection: Redis.getRedisClientOptions('FlowProducer'), |
bd911b54 C |
196 | prefix: this.jobRedisPrefix |
197 | }) | |
ab08ab4e | 198 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) |
bd911b54 | 199 | |
5a921e7b C |
200 | this.addRepeatableJobs() |
201 | } | |
202 | ||
4404a7c4 | 203 | private buildWorker (handlerName: JobType) { |
5a921e7b | 204 | const workerOptions: WorkerOptions = { |
4404a7c4 | 205 | autorun: false, |
5a921e7b | 206 | concurrency: this.getJobConcurrency(handlerName), |
2c29ad4f | 207 | prefix: this.jobRedisPrefix, |
564b9b55 | 208 | connection: Redis.getRedisClientOptions('Worker') |
94831479 | 209 | } |
ecb4e35f | 210 | |
5a921e7b C |
211 | const handler = function (job: Job) { |
212 | const timeout = JOB_TTL[handlerName] | |
213 | const p = handlers[handlerName](job) | |
e1ab52d7 | 214 | |
5a921e7b | 215 | if (!timeout) return p |
e1ab52d7 | 216 | |
5a921e7b C |
217 | return timeoutPromise(p, timeout) |
218 | } | |
94a5ff8a | 219 | |
5a921e7b C |
220 | const processor = async (jobArg: Job<any>) => { |
221 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | |
22df69fd | 222 | |
5a921e7b C |
223 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') |
224 | } | |
d7f83948 | 225 | |
5a921e7b | 226 | const worker = new Worker(handlerName, processor, workerOptions) |
941d28cc | 227 | |
5a921e7b C |
228 | worker.on('failed', (job, err) => { |
229 | const logLevel = silentFailure.has(handlerName) | |
230 | ? 'debug' | |
231 | : 'error' | |
32567717 | 232 | |
5a921e7b | 233 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) |
3df45638 | 234 | |
5a921e7b C |
235 | if (errorHandlers[job.name]) { |
236 | errorHandlers[job.name](job, err) | |
237 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | |
238 | } | |
239 | }) | |
94831479 | 240 | |
ab08ab4e | 241 | worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) |
5a921e7b C |
242 | |
243 | this.workers[handlerName] = worker | |
244 | } | |
245 | ||
246 | private buildQueue (handlerName: JobType) { | |
247 | const queueOptions: QueueOptions = { | |
564b9b55 | 248 | connection: Redis.getRedisClientOptions('Queue'), |
5a921e7b | 249 | prefix: this.jobRedisPrefix |
94a5ff8a | 250 | } |
6b616860 | 251 | |
ab08ab4e C |
252 | const queue = new Queue(handlerName, queueOptions) |
253 | queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) | |
254 | ||
255 | this.queues[handlerName] = queue | |
5a921e7b C |
256 | } |
257 | ||
4404a7c4 | 258 | private buildQueueScheduler (handlerName: JobType) { |
5a921e7b | 259 | const queueSchedulerOptions: QueueSchedulerOptions = { |
4404a7c4 | 260 | autorun: false, |
564b9b55 | 261 | connection: Redis.getRedisClientOptions('QueueScheduler'), |
5a921e7b C |
262 | prefix: this.jobRedisPrefix, |
263 | maxStalledCount: 10 | |
264 | } | |
ab08ab4e C |
265 | |
266 | const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) | |
267 | queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) | |
268 | ||
269 | this.queueSchedulers[handlerName] = queueScheduler | |
94a5ff8a C |
270 | } |
271 | ||
4404a7c4 | 272 | private buildQueueEvent (handlerName: JobType) { |
5a921e7b | 273 | const queueEventsOptions: QueueEventsOptions = { |
4404a7c4 | 274 | autorun: false, |
564b9b55 | 275 | connection: Redis.getRedisClientOptions('QueueEvent'), |
5a921e7b | 276 | prefix: this.jobRedisPrefix |
14f2b3ad | 277 | } |
ab08ab4e C |
278 | |
279 | const queueEvents = new QueueEvents(handlerName, queueEventsOptions) | |
280 | queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) | |
281 | ||
282 | this.queueEvents[handlerName] = queueEvents | |
5a921e7b C |
283 | } |
284 | ||
bd911b54 C |
285 | // --------------------------------------------------------------------------- |
286 | ||
5a921e7b C |
287 | async terminate () { |
288 | const promises = Object.keys(this.workers) | |
289 | .map(handlerName => { | |
290 | const worker: Worker = this.workers[handlerName] | |
291 | const queue: Queue = this.queues[handlerName] | |
292 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
293 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
294 | ||
295 | return Promise.all([ | |
296 | worker.close(false), | |
297 | queue.close(), | |
298 | queueScheduler.close(), | |
299 | queueEvent.close() | |
300 | ]) | |
301 | }) | |
302 | ||
303 | return Promise.all(promises) | |
14f2b3ad C |
304 | } |
305 | ||
4404a7c4 C |
306 | start () { |
307 | const promises = Object.keys(this.workers) | |
308 | .map(handlerName => { | |
309 | const worker: Worker = this.workers[handlerName] | |
310 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | |
311 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | |
312 | ||
313 | return Promise.all([ | |
314 | worker.run(), | |
315 | queueScheduler.run(), | |
316 | queueEvent.run() | |
317 | ]) | |
318 | }) | |
319 | ||
320 | return Promise.all(promises) | |
321 | } | |
322 | ||
419b520c | 323 | async pause () { |
e2b2c726 C |
324 | for (const handlerName of Object.keys(this.workers)) { |
325 | const worker: Worker = this.workers[handlerName] | |
5a921e7b C |
326 | |
327 | await worker.pause() | |
419b520c C |
328 | } |
329 | } | |
330 | ||
51335c72 | 331 | resume () { |
e2b2c726 C |
332 | for (const handlerName of Object.keys(this.workers)) { |
333 | const worker: Worker = this.workers[handlerName] | |
5a921e7b C |
334 | |
335 | worker.resume() | |
419b520c C |
336 | } |
337 | } | |
338 | ||
bd911b54 C |
339 | // --------------------------------------------------------------------------- |
340 | ||
341 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { | |
342 | this.createJob(options) | |
343 | .catch(err => logger.error('Cannot create job.', { err, options })) | |
a1587156 C |
344 | } |
345 | ||
2a491182 | 346 | createJob (options: CreateJobArgument & CreateJobOptions) { |
bd911b54 | 347 | const queue: Queue = this.queues[options.type] |
94831479 | 348 | if (queue === undefined) { |
bd911b54 | 349 | logger.error('Unknown queue %s: cannot create job.', options.type) |
a1587156 | 350 | return |
94831479 | 351 | } |
94a5ff8a | 352 | |
bd911b54 C |
353 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) |
354 | ||
355 | return queue.add('job', options.payload, jobOptions) | |
356 | } | |
357 | ||
2a491182 | 358 | createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { |
bd911b54 C |
359 | let lastJob: FlowJob |
360 | ||
361 | for (const job of jobs) { | |
362 | if (!job) continue | |
363 | ||
364 | lastJob = { | |
b42c2c7e C |
365 | ...this.buildJobFlowOption(job), |
366 | ||
bd911b54 C |
367 | children: lastJob |
368 | ? [ lastJob ] | |
369 | : [] | |
370 | } | |
371 | } | |
372 | ||
373 | return this.flowProducer.add(lastJob) | |
374 | } | |
375 | ||
2a491182 | 376 | createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { |
b42c2c7e C |
377 | return this.flowProducer.add({ |
378 | ...this.buildJobFlowOption(parent), | |
379 | ||
380 | children: children.map(c => this.buildJobFlowOption(c)) | |
381 | }) | |
382 | } | |
383 | ||
c3b21b68 | 384 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { |
b42c2c7e C |
385 | return { |
386 | name: 'job', | |
387 | data: job.payload, | |
388 | queueName: job.type, | |
389 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | |
390 | } | |
391 | } | |
392 | ||
bd911b54 C |
393 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { |
394 | return { | |
94831479 | 395 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
bd911b54 | 396 | attempts: JOB_ATTEMPTS[type], |
77d7e851 | 397 | priority: options.priority, |
c3b21b68 C |
398 | delay: options.delay, |
399 | ||
400 | ...this.buildJobRemovalOptions(type) | |
94831479 | 401 | } |
94a5ff8a C |
402 | } |
403 | ||
bd911b54 C |
404 | // --------------------------------------------------------------------------- |
405 | ||
1061c73f | 406 | async listForApi (options: { |
402145b8 | 407 | state?: JobState |
a1587156 C |
408 | start: number |
409 | count: number | |
410 | asc?: boolean | |
1061c73f | 411 | jobType: JobType |
41fb13c3 | 412 | }): Promise<Job[]> { |
402145b8 C |
413 | const { state, start, count, asc, jobType } = options |
414 | ||
e2b2c726 C |
415 | const states = this.buildStateFilter(state) |
416 | const filteredJobTypes = this.buildTypeFilter(jobType) | |
94a5ff8a | 417 | |
e2b2c726 | 418 | let results: Job[] = [] |
1061c73f | 419 | |
1061c73f | 420 | for (const jobType of filteredJobTypes) { |
5a921e7b C |
421 | const queue: Queue = this.queues[jobType] |
422 | ||
94831479 C |
423 | if (queue === undefined) { |
424 | logger.error('Unknown queue %s to list jobs.', jobType) | |
425 | continue | |
426 | } | |
2c29ad4f | 427 | |
402145b8 | 428 | const jobs = await queue.getJobs(states, 0, start + count, asc) |
94831479 C |
429 | results = results.concat(jobs) |
430 | } | |
94a5ff8a | 431 | |
94831479 C |
432 | results.sort((j1: any, j2: any) => { |
433 | if (j1.timestamp < j2.timestamp) return -1 | |
434 | else if (j1.timestamp === j2.timestamp) return 0 | |
94a5ff8a | 435 | |
94831479 | 436 | return 1 |
94a5ff8a | 437 | }) |
94a5ff8a | 438 | |
94831479 | 439 | if (asc === false) results.reverse() |
94a5ff8a | 440 | |
94831479 | 441 | return results.slice(start, start + count) |
94a5ff8a C |
442 | } |
443 | ||
402145b8 C |
444 | async count (state: JobState, jobType?: JobType): Promise<number> { |
445 | const states = state ? [ state ] : jobStates | |
e2b2c726 | 446 | const filteredJobTypes = this.buildTypeFilter(jobType) |
3df45638 | 447 | |
e2b2c726 | 448 | let total = 0 |
1061c73f C |
449 | |
450 | for (const type of filteredJobTypes) { | |
a1587156 | 451 | const queue = this.queues[type] |
94831479 C |
452 | if (queue === undefined) { |
453 | logger.error('Unknown queue %s to count jobs.', type) | |
454 | continue | |
455 | } | |
3df45638 | 456 | |
94831479 | 457 | const counts = await queue.getJobCounts() |
3df45638 | 458 | |
040d6896 RK |
459 | for (const s of states) { |
460 | total += counts[s] | |
461 | } | |
94831479 | 462 | } |
3df45638 | 463 | |
94831479 | 464 | return total |
3df45638 C |
465 | } |
466 | ||
e2b2c726 C |
467 | private buildStateFilter (state?: JobState) { |
468 | if (!state) return jobStates | |
469 | ||
470 | const states = [ state ] | |
471 | ||
472 | // Include parent if filtering on waiting | |
473 | if (state === 'waiting') states.push('waiting-children') | |
474 | ||
475 | return states | |
476 | } | |
477 | ||
478 | private buildTypeFilter (jobType?: JobType) { | |
479 | if (!jobType) return jobTypes | |
480 | ||
481 | return jobTypes.filter(t => t === jobType) | |
482 | } | |
483 | ||
630d0a1b C |
484 | async getStats () { |
485 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | |
486 | ||
487 | return Promise.all(promises) | |
488 | } | |
489 | ||
bd911b54 C |
490 | // --------------------------------------------------------------------------- |
491 | ||
2f5c6b2f | 492 | async removeOldJobs () { |
94831479 | 493 | for (const key of Object.keys(this.queues)) { |
5a921e7b | 494 | const queue: Queue = this.queues[key] |
5949eca7 C |
495 | await queue.clean(parseDurationToMs('7 days'), 1000, 'completed') |
496 | await queue.clean(parseDurationToMs('7 days'), 1000, 'failed') | |
94831479 | 497 | } |
2c29ad4f C |
498 | } |
499 | ||
6b616860 | 500 | private addRepeatableJobs () { |
5a921e7b | 501 | this.queues['videos-views-stats'].add('job', {}, { |
c3b21b68 C |
502 | repeat: REPEAT_JOBS['videos-views-stats'], |
503 | ||
504 | ...this.buildJobRemovalOptions('videos-views-stats') | |
a1587156 | 505 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
74d249bc C |
506 | |
507 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | |
5a921e7b | 508 | this.queues['activitypub-cleaner'].add('job', {}, { |
c3b21b68 C |
509 | repeat: REPEAT_JOBS['activitypub-cleaner'], |
510 | ||
511 | ...this.buildJobRemovalOptions('activitypub-cleaner') | |
74d249bc C |
512 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
513 | } | |
6b616860 C |
514 | } |
515 | ||
9129b769 C |
516 | private getJobConcurrency (jobType: JobType) { |
517 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | |
518 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | |
519 | ||
520 | return JOB_CONCURRENCY[jobType] | |
521 | } | |
522 | ||
c3b21b68 C |
523 | private buildJobRemovalOptions (queueName: string) { |
524 | return { | |
525 | removeOnComplete: { | |
526 | // Wants seconds | |
527 | age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, | |
528 | ||
529 | count: JOB_REMOVAL_OPTIONS.COUNT | |
530 | }, | |
531 | removeOnFail: { | |
532 | // Wants seconds | |
533 | age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, | |
534 | ||
535 | count: JOB_REMOVAL_OPTIONS.COUNT / 1000 | |
536 | } | |
537 | } | |
538 | } | |
539 | ||
94a5ff8a C |
540 | static get Instance () { |
541 | return this.instance || (this.instance = new this()) | |
542 | } | |
543 | } | |
544 | ||
545 | // --------------------------------------------------------------------------- | |
546 | ||
547 | export { | |
1061c73f | 548 | jobTypes, |
94a5ff8a C |
549 | JobQueue |
550 | } |