diff options
author | Chocobozzz <me@florianbigard.com> | 2023-07-31 14:34:36 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-08-11 15:02:33 +0200 |
commit | 3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch) | |
tree | e4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/job-queue/job-queue.ts | |
parent | 04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff) | |
download | PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip |
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge
conflicts, but it's a major step forward:
* Server can be faster at startup because imports() are async and we can
easily lazy import big modules
* Angular doesn't seem to support ES import (with .js extension), so we
had to correctly organize peertube into a monorepo:
* Use yarn workspace feature
* Use typescript reference projects for dependencies
* Shared projects have been moved into "packages", each one is now a
node module (with a dedicated package.json/tsconfig.json)
* server/tools have been moved into apps/ and is now a dedicated app
bundled and published on NPM so users don't have to build peertube
cli tools manually
* server/tests have been moved into packages/ so we don't compile
them every time we want to run the server
* Use isolatedModule option:
* Had to move from const enum to const
(https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums)
* Had to explictely specify "type" imports when used in decorators
* Prefer tsx (that uses esbuild under the hood) instead of ts-node to
load typescript files (tests with mocha or scripts):
* To reduce test complexity as esbuild doesn't support decorator
metadata, we only test server files that do not import server
models
* We still build tests files into js files for a faster CI
* Remove unmaintained peertube CLI import script
* Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 537 |
1 files changed, 0 insertions, 537 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts deleted file mode 100644 index 177bca285..000000000 --- a/server/lib/job-queue/job-queue.ts +++ /dev/null | |||
@@ -1,537 +0,0 @@ | |||
1 | import { | ||
2 | FlowJob, | ||
3 | FlowProducer, | ||
4 | Job, | ||
5 | JobsOptions, | ||
6 | Queue, | ||
7 | QueueEvents, | ||
8 | QueueEventsOptions, | ||
9 | QueueOptions, | ||
10 | Worker, | ||
11 | WorkerOptions | ||
12 | } from 'bullmq' | ||
13 | import { parseDurationToMs } from '@server/helpers/core-utils' | ||
14 | import { jobStates } from '@server/helpers/custom-validators/jobs' | ||
15 | import { CONFIG } from '@server/initializers/config' | ||
16 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | ||
17 | import { pick, timeoutPromise } from '@shared/core-utils' | ||
18 | import { | ||
19 | ActivitypubFollowPayload, | ||
20 | ActivitypubHttpBroadcastPayload, | ||
21 | ActivitypubHttpFetcherPayload, | ||
22 | ActivitypubHttpUnicastPayload, | ||
23 | ActorKeysPayload, | ||
24 | AfterVideoChannelImportPayload, | ||
25 | DeleteResumableUploadMetaFilePayload, | ||
26 | EmailPayload, | ||
27 | FederateVideoPayload, | ||
28 | GenerateStoryboardPayload, | ||
29 | JobState, | ||
30 | JobType, | ||
31 | ManageVideoTorrentPayload, | ||
32 | MoveObjectStoragePayload, | ||
33 | NotifyPayload, | ||
34 | RefreshPayload, | ||
35 | TranscodingJobBuilderPayload, | ||
36 | VideoChannelImportPayload, | ||
37 | VideoFileImportPayload, | ||
38 | VideoImportPayload, | ||
39 | VideoLiveEndingPayload, | ||
40 | VideoRedundancyPayload, | ||
41 | VideoStudioEditionPayload, | ||
42 | VideoTranscodingPayload | ||
43 | } from '../../../shared/models' | ||
44 | import { logger } from '../../helpers/logger' | ||
45 | import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | ||
46 | import { Hooks } from '../plugins/hooks' | ||
47 | import { Redis } from '../redis' | ||
48 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | ||
49 | import { processActivityPubFollow } from './handlers/activitypub-follow' | ||
50 | import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' | ||
51 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | ||
52 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | ||
53 | import { refreshAPObject } from './handlers/activitypub-refresher' | ||
54 | import { processActorKeys } from './handlers/actor-keys' | ||
55 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' | ||
56 | import { processEmail } from './handlers/email' | ||
57 | import { processFederateVideo } from './handlers/federate-video' | ||
58 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | ||
59 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | ||
60 | import { processNotify } from './handlers/notify' | ||
61 | import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' | ||
62 | import { processVideoChannelImport } from './handlers/video-channel-import' | ||
63 | import { processVideoFileImport } from './handlers/video-file-import' | ||
64 | import { processVideoImport } from './handlers/video-import' | ||
65 | import { processVideoLiveEnding } from './handlers/video-live-ending' | ||
66 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | ||
67 | import { processVideoTranscoding } from './handlers/video-transcoding' | ||
68 | import { processVideosViewsStats } from './handlers/video-views-stats' | ||
69 | import { processGenerateStoryboard } from './handlers/generate-storyboard' | ||
70 | |||
71 | export type CreateJobArgument = | ||
72 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | ||
73 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | ||
74 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | ||
75 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | ||
76 | { type: 'activitypub-cleaner', payload: {} } | | ||
77 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | ||
78 | { type: 'video-file-import', payload: VideoFileImportPayload } | | ||
79 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | | ||
80 | { type: 'email', payload: EmailPayload } | | ||
81 | { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | | ||
82 | { type: 'video-import', payload: VideoImportPayload } | | ||
83 | { type: 'activitypub-refresher', payload: RefreshPayload } | | ||
84 | { type: 'videos-views-stats', payload: {} } | | ||
85 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | ||
86 | { type: 'actor-keys', payload: ActorKeysPayload } | | ||
87 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | ||
88 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | ||
89 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | ||
90 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | ||
91 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
92 | { type: 'video-channel-import', payload: VideoChannelImportPayload } | | ||
93 | { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | | ||
94 | { type: 'notify', payload: NotifyPayload } | | ||
95 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
96 | { type: 'federate-video', payload: FederateVideoPayload } | | ||
97 | { type: 'generate-video-storyboard', payload: GenerateStoryboardPayload } | ||
98 | |||
99 | export type CreateJobOptions = { | ||
100 | delay?: number | ||
101 | priority?: number | ||
102 | failParentOnFailure?: boolean | ||
103 | } | ||
104 | |||
105 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | ||
106 | 'activitypub-cleaner': processActivityPubCleaner, | ||
107 | 'activitypub-follow': processActivityPubFollow, | ||
108 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, | ||
109 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, | ||
110 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
111 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
112 | 'activitypub-refresher': refreshAPObject, | ||
113 | 'actor-keys': processActorKeys, | ||
114 | 'after-video-channel-import': processAfterVideoChannelImport, | ||
115 | 'email': processEmail, | ||
116 | 'federate-video': processFederateVideo, | ||
117 | 'transcoding-job-builder': processTranscodingJobBuilder, | ||
118 | 'manage-video-torrent': processManageVideoTorrent, | ||
119 | 'move-to-object-storage': processMoveToObjectStorage, | ||
120 | 'notify': processNotify, | ||
121 | 'video-channel-import': processVideoChannelImport, | ||
122 | 'video-file-import': processVideoFileImport, | ||
123 | 'video-import': processVideoImport, | ||
124 | 'video-live-ending': processVideoLiveEnding, | ||
125 | 'video-redundancy': processVideoRedundancy, | ||
126 | 'video-studio-edition': processVideoStudioEdition, | ||
127 | 'video-transcoding': processVideoTranscoding, | ||
128 | 'videos-views-stats': processVideosViewsStats, | ||
129 | 'generate-video-storyboard': processGenerateStoryboard | ||
130 | } | ||
131 | |||
132 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | ||
133 | 'move-to-object-storage': onMoveToObjectStorageFailure | ||
134 | } | ||
135 | |||
136 | const jobTypes: JobType[] = [ | ||
137 | 'activitypub-cleaner', | ||
138 | 'activitypub-follow', | ||
139 | 'activitypub-http-broadcast-parallel', | ||
140 | 'activitypub-http-broadcast', | ||
141 | 'activitypub-http-fetcher', | ||
142 | 'activitypub-http-unicast', | ||
143 | 'activitypub-refresher', | ||
144 | 'actor-keys', | ||
145 | 'after-video-channel-import', | ||
146 | 'email', | ||
147 | 'federate-video', | ||
148 | 'generate-video-storyboard', | ||
149 | 'manage-video-torrent', | ||
150 | 'move-to-object-storage', | ||
151 | 'notify', | ||
152 | 'transcoding-job-builder', | ||
153 | 'video-channel-import', | ||
154 | 'video-file-import', | ||
155 | 'video-import', | ||
156 | 'video-live-ending', | ||
157 | 'video-redundancy', | ||
158 | 'video-studio-edition', | ||
159 | 'video-transcoding', | ||
160 | 'videos-views-stats' | ||
161 | ] | ||
162 | |||
163 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | ||
164 | |||
165 | class JobQueue { | ||
166 | |||
167 | private static instance: JobQueue | ||
168 | |||
169 | private workers: { [id in JobType]?: Worker } = {} | ||
170 | private queues: { [id in JobType]?: Queue } = {} | ||
171 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | ||
172 | |||
173 | private flowProducer: FlowProducer | ||
174 | |||
175 | private initialized = false | ||
176 | private jobRedisPrefix: string | ||
177 | |||
178 | private constructor () { | ||
179 | } | ||
180 | |||
181 | init () { | ||
182 | // Already initialized | ||
183 | if (this.initialized === true) return | ||
184 | this.initialized = true | ||
185 | |||
186 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | ||
187 | |||
188 | for (const handlerName of Object.keys(handlers)) { | ||
189 | this.buildWorker(handlerName) | ||
190 | this.buildQueue(handlerName) | ||
191 | this.buildQueueEvent(handlerName) | ||
192 | } | ||
193 | |||
194 | this.flowProducer = new FlowProducer({ | ||
195 | connection: Redis.getRedisClientOptions('FlowProducer'), | ||
196 | prefix: this.jobRedisPrefix | ||
197 | }) | ||
198 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) | ||
199 | |||
200 | this.addRepeatableJobs() | ||
201 | } | ||
202 | |||
203 | private buildWorker (handlerName: JobType) { | ||
204 | const workerOptions: WorkerOptions = { | ||
205 | autorun: false, | ||
206 | concurrency: this.getJobConcurrency(handlerName), | ||
207 | prefix: this.jobRedisPrefix, | ||
208 | connection: Redis.getRedisClientOptions('Worker'), | ||
209 | maxStalledCount: 10 | ||
210 | } | ||
211 | |||
212 | const handler = function (job: Job) { | ||
213 | const timeout = JOB_TTL[handlerName] | ||
214 | const p = handlers[handlerName](job) | ||
215 | |||
216 | if (!timeout) return p | ||
217 | |||
218 | return timeoutPromise(p, timeout) | ||
219 | } | ||
220 | |||
221 | const processor = async (jobArg: Job<any>) => { | ||
222 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | ||
223 | |||
224 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | ||
225 | } | ||
226 | |||
227 | const worker = new Worker(handlerName, processor, workerOptions) | ||
228 | |||
229 | worker.on('failed', (job, err) => { | ||
230 | const logLevel = silentFailure.has(handlerName) | ||
231 | ? 'debug' | ||
232 | : 'error' | ||
233 | |||
234 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) | ||
235 | |||
236 | if (errorHandlers[job.name]) { | ||
237 | errorHandlers[job.name](job, err) | ||
238 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | ||
239 | } | ||
240 | }) | ||
241 | |||
242 | worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) | ||
243 | |||
244 | this.workers[handlerName] = worker | ||
245 | } | ||
246 | |||
247 | private buildQueue (handlerName: JobType) { | ||
248 | const queueOptions: QueueOptions = { | ||
249 | connection: Redis.getRedisClientOptions('Queue'), | ||
250 | prefix: this.jobRedisPrefix | ||
251 | } | ||
252 | |||
253 | const queue = new Queue(handlerName, queueOptions) | ||
254 | queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) | ||
255 | |||
256 | this.queues[handlerName] = queue | ||
257 | } | ||
258 | |||
259 | private buildQueueEvent (handlerName: JobType) { | ||
260 | const queueEventsOptions: QueueEventsOptions = { | ||
261 | autorun: false, | ||
262 | connection: Redis.getRedisClientOptions('QueueEvent'), | ||
263 | prefix: this.jobRedisPrefix | ||
264 | } | ||
265 | |||
266 | const queueEvents = new QueueEvents(handlerName, queueEventsOptions) | ||
267 | queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) | ||
268 | |||
269 | this.queueEvents[handlerName] = queueEvents | ||
270 | } | ||
271 | |||
272 | // --------------------------------------------------------------------------- | ||
273 | |||
274 | async terminate () { | ||
275 | const promises = Object.keys(this.workers) | ||
276 | .map(handlerName => { | ||
277 | const worker: Worker = this.workers[handlerName] | ||
278 | const queue: Queue = this.queues[handlerName] | ||
279 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | ||
280 | |||
281 | return Promise.all([ | ||
282 | worker.close(false), | ||
283 | queue.close(), | ||
284 | queueEvent.close() | ||
285 | ]) | ||
286 | }) | ||
287 | |||
288 | return Promise.all(promises) | ||
289 | } | ||
290 | |||
291 | start () { | ||
292 | const promises = Object.keys(this.workers) | ||
293 | .map(handlerName => { | ||
294 | const worker: Worker = this.workers[handlerName] | ||
295 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | ||
296 | |||
297 | return Promise.all([ | ||
298 | worker.run(), | ||
299 | queueEvent.run() | ||
300 | ]) | ||
301 | }) | ||
302 | |||
303 | return Promise.all(promises) | ||
304 | } | ||
305 | |||
306 | async pause () { | ||
307 | for (const handlerName of Object.keys(this.workers)) { | ||
308 | const worker: Worker = this.workers[handlerName] | ||
309 | |||
310 | await worker.pause() | ||
311 | } | ||
312 | } | ||
313 | |||
314 | resume () { | ||
315 | for (const handlerName of Object.keys(this.workers)) { | ||
316 | const worker: Worker = this.workers[handlerName] | ||
317 | |||
318 | worker.resume() | ||
319 | } | ||
320 | } | ||
321 | |||
322 | // --------------------------------------------------------------------------- | ||
323 | |||
324 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { | ||
325 | this.createJob(options) | ||
326 | .catch(err => logger.error('Cannot create job.', { err, options })) | ||
327 | } | ||
328 | |||
329 | createJob (options: CreateJobArgument & CreateJobOptions) { | ||
330 | const queue: Queue = this.queues[options.type] | ||
331 | if (queue === undefined) { | ||
332 | logger.error('Unknown queue %s: cannot create job.', options.type) | ||
333 | return | ||
334 | } | ||
335 | |||
336 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) | ||
337 | |||
338 | return queue.add('job', options.payload, jobOptions) | ||
339 | } | ||
340 | |||
341 | createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | ||
342 | let lastJob: FlowJob | ||
343 | |||
344 | for (const job of jobs) { | ||
345 | if (!job) continue | ||
346 | |||
347 | lastJob = { | ||
348 | ...this.buildJobFlowOption(job), | ||
349 | |||
350 | children: lastJob | ||
351 | ? [ lastJob ] | ||
352 | : [] | ||
353 | } | ||
354 | } | ||
355 | |||
356 | return this.flowProducer.add(lastJob) | ||
357 | } | ||
358 | |||
359 | createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | ||
360 | return this.flowProducer.add({ | ||
361 | ...this.buildJobFlowOption(parent), | ||
362 | |||
363 | children: children.map(c => this.buildJobFlowOption(c)) | ||
364 | }) | ||
365 | } | ||
366 | |||
367 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { | ||
368 | return { | ||
369 | name: 'job', | ||
370 | data: job.payload, | ||
371 | queueName: job.type, | ||
372 | opts: { | ||
373 | failParentOnFailure: true, | ||
374 | |||
375 | ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) | ||
376 | } | ||
377 | } | ||
378 | } | ||
379 | |||
380 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | ||
381 | return { | ||
382 | backoff: { delay: 60 * 1000, type: 'exponential' }, | ||
383 | attempts: JOB_ATTEMPTS[type], | ||
384 | priority: options.priority, | ||
385 | delay: options.delay, | ||
386 | |||
387 | ...this.buildJobRemovalOptions(type) | ||
388 | } | ||
389 | } | ||
390 | |||
391 | // --------------------------------------------------------------------------- | ||
392 | |||
393 | async listForApi (options: { | ||
394 | state?: JobState | ||
395 | start: number | ||
396 | count: number | ||
397 | asc?: boolean | ||
398 | jobType: JobType | ||
399 | }): Promise<Job[]> { | ||
400 | const { state, start, count, asc, jobType } = options | ||
401 | |||
402 | const states = this.buildStateFilter(state) | ||
403 | const filteredJobTypes = this.buildTypeFilter(jobType) | ||
404 | |||
405 | let results: Job[] = [] | ||
406 | |||
407 | for (const jobType of filteredJobTypes) { | ||
408 | const queue: Queue = this.queues[jobType] | ||
409 | |||
410 | if (queue === undefined) { | ||
411 | logger.error('Unknown queue %s to list jobs.', jobType) | ||
412 | continue | ||
413 | } | ||
414 | |||
415 | const jobs = await queue.getJobs(states, 0, start + count, asc) | ||
416 | results = results.concat(jobs) | ||
417 | } | ||
418 | |||
419 | results.sort((j1: any, j2: any) => { | ||
420 | if (j1.timestamp < j2.timestamp) return -1 | ||
421 | else if (j1.timestamp === j2.timestamp) return 0 | ||
422 | |||
423 | return 1 | ||
424 | }) | ||
425 | |||
426 | if (asc === false) results.reverse() | ||
427 | |||
428 | return results.slice(start, start + count) | ||
429 | } | ||
430 | |||
431 | async count (state: JobState, jobType?: JobType): Promise<number> { | ||
432 | const states = state ? [ state ] : jobStates | ||
433 | const filteredJobTypes = this.buildTypeFilter(jobType) | ||
434 | |||
435 | let total = 0 | ||
436 | |||
437 | for (const type of filteredJobTypes) { | ||
438 | const queue = this.queues[type] | ||
439 | if (queue === undefined) { | ||
440 | logger.error('Unknown queue %s to count jobs.', type) | ||
441 | continue | ||
442 | } | ||
443 | |||
444 | const counts = await queue.getJobCounts() | ||
445 | |||
446 | for (const s of states) { | ||
447 | total += counts[s] | ||
448 | } | ||
449 | } | ||
450 | |||
451 | return total | ||
452 | } | ||
453 | |||
454 | private buildStateFilter (state?: JobState) { | ||
455 | if (!state) return jobStates | ||
456 | |||
457 | const states = [ state ] | ||
458 | |||
459 | // Include parent if filtering on waiting | ||
460 | if (state === 'waiting') states.push('waiting-children') | ||
461 | |||
462 | return states | ||
463 | } | ||
464 | |||
465 | private buildTypeFilter (jobType?: JobType) { | ||
466 | if (!jobType) return jobTypes | ||
467 | |||
468 | return jobTypes.filter(t => t === jobType) | ||
469 | } | ||
470 | |||
471 | async getStats () { | ||
472 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | ||
473 | |||
474 | return Promise.all(promises) | ||
475 | } | ||
476 | |||
477 | // --------------------------------------------------------------------------- | ||
478 | |||
479 | async removeOldJobs () { | ||
480 | for (const key of Object.keys(this.queues)) { | ||
481 | const queue: Queue = this.queues[key] | ||
482 | await queue.clean(parseDurationToMs('7 days'), 1000, 'completed') | ||
483 | await queue.clean(parseDurationToMs('7 days'), 1000, 'failed') | ||
484 | } | ||
485 | } | ||
486 | |||
487 | private addRepeatableJobs () { | ||
488 | this.queues['videos-views-stats'].add('job', {}, { | ||
489 | repeat: REPEAT_JOBS['videos-views-stats'], | ||
490 | |||
491 | ...this.buildJobRemovalOptions('videos-views-stats') | ||
492 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | ||
493 | |||
494 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | ||
495 | this.queues['activitypub-cleaner'].add('job', {}, { | ||
496 | repeat: REPEAT_JOBS['activitypub-cleaner'], | ||
497 | |||
498 | ...this.buildJobRemovalOptions('activitypub-cleaner') | ||
499 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | ||
500 | } | ||
501 | } | ||
502 | |||
503 | private getJobConcurrency (jobType: JobType) { | ||
504 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | ||
505 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | ||
506 | |||
507 | return JOB_CONCURRENCY[jobType] | ||
508 | } | ||
509 | |||
510 | private buildJobRemovalOptions (queueName: string) { | ||
511 | return { | ||
512 | removeOnComplete: { | ||
513 | // Wants seconds | ||
514 | age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, | ||
515 | |||
516 | count: JOB_REMOVAL_OPTIONS.COUNT | ||
517 | }, | ||
518 | removeOnFail: { | ||
519 | // Wants seconds | ||
520 | age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, | ||
521 | |||
522 | count: JOB_REMOVAL_OPTIONS.COUNT / 1000 | ||
523 | } | ||
524 | } | ||
525 | } | ||
526 | |||
527 | static get Instance () { | ||
528 | return this.instance || (this.instance = new this()) | ||
529 | } | ||
530 | } | ||
531 | |||
532 | // --------------------------------------------------------------------------- | ||
533 | |||
534 | export { | ||
535 | jobTypes, | ||
536 | JobQueue | ||
537 | } | ||