From 5a921e7b74910414626bfc9672b857e987e3ebed Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 8 Aug 2022 10:42:08 +0200 Subject: Move to bullmq --- .../lib/job-queue/handlers/activitypub-cleaner.ts | 2 +- .../lib/job-queue/handlers/activitypub-follow.ts | 2 +- .../handlers/activitypub-http-broadcast.ts | 2 +- .../job-queue/handlers/activitypub-http-fetcher.ts | 2 +- .../job-queue/handlers/activitypub-http-unicast.ts | 2 +- .../job-queue/handlers/activitypub-refresher.ts | 2 +- server/lib/job-queue/handlers/actor-keys.ts | 2 +- server/lib/job-queue/handlers/email.ts | 2 +- .../lib/job-queue/handlers/manage-video-torrent.ts | 2 +- .../job-queue/handlers/move-to-object-storage.ts | 2 +- server/lib/job-queue/handlers/video-file-import.ts | 2 +- server/lib/job-queue/handlers/video-import.ts | 2 +- server/lib/job-queue/handlers/video-live-ending.ts | 2 +- server/lib/job-queue/handlers/video-redundancy.ts | 2 +- .../lib/job-queue/handlers/video-studio-edition.ts | 2 +- server/lib/job-queue/handlers/video-transcoding.ts | 2 +- server/lib/job-queue/job-queue.ts | 188 +++++++++++++++------ server/lib/transcoding/transcoding.ts | 2 +- 18 files changed, 149 insertions(+), 73 deletions(-) (limited to 'server/lib') diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index 3d7dc6fb9..84c0a2de2 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -1,5 +1,5 @@ import { map } from 'bluebird' -import { Job } from 'bull' +import { Job } from 'bullmq' import { isAnnounceActivityValid, isDislikeActivityValid, diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 2ee98171c..944da5be1 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' import { ActivitypubFollowPayload } from '@shared/models' import { sanitizeHost } from '../../../helpers/core-utils' diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 709e8501f..354c608fb 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,5 +1,5 @@ import { map } from 'bluebird' -import { Job } from 'bull' +import { Job } from 'bullmq' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { ActivitypubHttpBroadcastPayload } from '@shared/models' diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index de533de6c..e0b841887 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 99bcd3e8d..837a597a5 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { ActivitypubHttpUnicastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 92ceed180..600f858a0 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' import { loadVideoByUrl } from '@server/lib/model-loaders' diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 9d5a65376..4a5bad9fb 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' import { ActorModel } from '@server/models/actor/actor' import { ActorKeysPayload } from '@shared/models' diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 6fc1caa84..b5b9475b1 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { EmailPayload } from '@shared/models' import { logger } from '../../../helpers/logger' import { Emailer } from '../../emailer' diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index dfd4e6140..4505ca79e 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 49064052c..d608fd865 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { remove } from 'fs-extra' import { join } from 'path' import { logger, loggerTagsFactory } from '@server/helpers/logger' diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 71c5444af..40c44cf52 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { copy, stat } from 'fs-extra' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { CONFIG } from '@server/initializers/config' diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4cde26aef..e5cd35865 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { move, remove, stat } from 'fs-extra' import { retryTransactionWrapper } from '@server/helpers/database-utils' import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 78d0b2192..79002258c 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { readdir, remove } from 'fs-extra' import { join } from 'path' import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 9cb7a6589..75ab2cd02 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' import { VideoRedundancyPayload } from '@shared/models' import { logger } from '../../../helpers/logger' diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 735150d57..078243538 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { move, remove } from 'fs-extra' import { join } from 'path' import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4e5e97919..8dbae8c42 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' import { Hooks } from '@server/lib/plugins/hooks' import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0ae325f4d..0cf5d53ce 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,7 +1,19 @@ -import Bull, { Job, JobOptions, Queue } from 'bull' +import { + Job, + JobsOptions, + Queue, + QueueEvents, + QueueEventsOptions, + QueueOptions, + QueueScheduler, + QueueSchedulerOptions, + Worker, + WorkerOptions +} from 'bullmq' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { timeoutPromise } from '@shared/core-utils' import { ActivitypubFollowPayload, ActivitypubHttpBroadcastPayload, @@ -120,7 +132,11 @@ class JobQueue { private static instance: JobQueue + private workers: { [id in JobType]?: Worker } = {} private queues: { [id in JobType]?: Queue } = {} + private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} + private queueEvents: { [id in JobType]?: QueueEvents } = {} + private initialized = false private jobRedisPrefix: string @@ -134,75 +150,131 @@ class JobQueue { this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - const queueOptions: Bull.QueueOptions = { + for (const handlerName of (Object.keys(handlers) as JobType[])) { + this.buildWorker(handlerName, produceOnly) + this.buildQueue(handlerName) + this.buildQueueScheduler(handlerName, produceOnly) + this.buildQueueEvent(handlerName, produceOnly) + } + + this.addRepeatableJobs() + } + + private buildWorker (handlerName: JobType, produceOnly: boolean) { + const workerOptions: WorkerOptions = { + autorun: !produceOnly, + concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - redis: { - password: CONFIG.REDIS.AUTH, - db: CONFIG.REDIS.DB, - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - path: CONFIG.REDIS.SOCKET - }, - settings: { - maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts - } + connection: this.getRedisConnection() } - for (const handlerName of (Object.keys(handlers) as JobType[])) { - const queue = new Bull(handlerName, queueOptions) + const handler = function (job: Job) { + const timeout = JOB_TTL[handlerName] + const p = handlers[handlerName](job) - if (produceOnly) { - queue.pause(true) - .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) - } + if (!timeout) return p - const handler = handlers[handlerName] + return timeoutPromise(p, timeout) + } - queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job) => { - const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) + const processor = async (jobArg: Job) => { + const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) - return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') - }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') + } - queue.on('failed', (job, err) => { - const logLevel = silentFailure.has(handlerName) - ? 'debug' - : 'error' + const worker = new Worker(handlerName, processor, workerOptions) - logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) + worker.on('failed', (job, err) => { + const logLevel = silentFailure.has(handlerName) + ? 'debug' + : 'error' - if (errorHandlers[job.name]) { - errorHandlers[job.name](job, err) - .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) - } - }) + logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) - queue.on('error', err => { - logger.error('Error in job queue %s.', handlerName, { err }) - }) + if (errorHandlers[job.name]) { + errorHandlers[job.name](job, err) + .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) + } + }) - this.queues[handlerName] = queue + worker.on('error', err => { + logger.error('Error in job queue %s.', handlerName, { err }) + }) + + this.workers[handlerName] = worker + } + + private buildQueue (handlerName: JobType) { + const queueOptions: QueueOptions = { + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix } - this.addRepeatableJobs() + this.queues[handlerName] = new Queue(handlerName, queueOptions) + } + + private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { + const queueSchedulerOptions: QueueSchedulerOptions = { + autorun: !produceOnly, + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix, + maxStalledCount: 10 + } + this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) } - terminate () { - for (const queueName of Object.keys(this.queues)) { - const queue = this.queues[queueName] - queue.close() + private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { + const queueEventsOptions: QueueEventsOptions = { + autorun: !produceOnly, + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix } + this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) + } + + private getRedisConnection () { + return { + password: CONFIG.REDIS.AUTH, + db: CONFIG.REDIS.DB, + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + path: CONFIG.REDIS.SOCKET + } + } + + async terminate () { + const promises = Object.keys(this.workers) + .map(handlerName => { + const worker: Worker = this.workers[handlerName] + const queue: Queue = this.queues[handlerName] + const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] + const queueEvent: QueueEvents = this.queueEvents[handlerName] + + return Promise.all([ + worker.close(false), + queue.close(), + queueScheduler.close(), + queueEvent.close() + ]) + }) + + return Promise.all(promises) } async pause () { - for (const handler of Object.keys(this.queues)) { - await this.queues[handler].pause(true) + for (const handler of Object.keys(this.workers)) { + const worker: Worker = this.workers[handler] + + await worker.pause() } } - async resume () { - for (const handler of Object.keys(this.queues)) { - await this.queues[handler].resume(true) + resume () { + for (const handler of Object.keys(this.workers)) { + const worker: Worker = this.workers[handler] + + worker.resume() } } @@ -211,22 +283,21 @@ class JobQueue { .catch(err => logger.error('Cannot create job.', { err, obj })) } - createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { + async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { const queue: Queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) return } - const jobArgs: JobOptions = { + const jobArgs: JobsOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], - timeout: JOB_TTL[obj.type], priority: options.priority, delay: options.delay } - return queue.add(obj.payload, jobArgs) + return queue.add('job', obj.payload, jobArgs) } async listForApi (options: { @@ -244,7 +315,8 @@ class JobQueue { const filteredJobTypes = this.filterJobTypes(jobType) for (const jobType of filteredJobTypes) { - const queue = this.queues[jobType] + const queue: Queue = this.queues[jobType] + if (queue === undefined) { logger.error('Unknown queue %s to list jobs.', jobType) continue @@ -297,18 +369,22 @@ class JobQueue { async removeOldJobs () { for (const key of Object.keys(this.queues)) { - const queue = this.queues[key] - await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + const queue: Queue = this.queues[key] + await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') } } + waitJob (job: Job) { + return job.waitUntilFinished(this.queueEvents[job.queueName]) + } + private addRepeatableJobs () { - this.queues['videos-views-stats'].add({}, { + this.queues['videos-views-stats'].add('job', {}, { repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { - this.queues['activitypub-cleaner'].add({}, { + this.queues['activitypub-cleaner'].add('job', {}, { repeat: REPEAT_JOBS['activitypub-cleaner'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) } diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts index 070c7ebda..07eee4122 100644 --- a/server/lib/transcoding/transcoding.ts +++ b/server/lib/transcoding/transcoding.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' import { basename, extname as extnameUtil, join } from 'path' import { toEven } from '@server/helpers/core-utils' -- cgit v1.2.3