From 5a921e7b74910414626bfc9672b857e987e3ebed Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 8 Aug 2022 10:42:08 +0200 Subject: [PATCH] Move to bullmq --- package.json | 3 +- server.ts | 1 + server/controllers/api/jobs.ts | 13 +- server/controllers/api/videos/update.ts | 4 +- server/controllers/api/videos/upload.ts | 4 +- server/helpers/custom-validators/jobs.ts | 2 +- server/helpers/ffmpeg/ffmpeg-commons.ts | 4 +- server/helpers/ffmpeg/ffmpeg-vod.ts | 2 +- server/initializers/constants.ts | 4 +- .../job-queue/handlers/activitypub-cleaner.ts | 2 +- .../job-queue/handlers/activitypub-follow.ts | 2 +- .../handlers/activitypub-http-broadcast.ts | 2 +- .../handlers/activitypub-http-fetcher.ts | 2 +- .../handlers/activitypub-http-unicast.ts | 2 +- .../handlers/activitypub-refresher.ts | 2 +- server/lib/job-queue/handlers/actor-keys.ts | 2 +- server/lib/job-queue/handlers/email.ts | 2 +- .../handlers/manage-video-torrent.ts | 2 +- .../handlers/move-to-object-storage.ts | 2 +- .../job-queue/handlers/video-file-import.ts | 2 +- server/lib/job-queue/handlers/video-import.ts | 2 +- .../job-queue/handlers/video-live-ending.ts | 2 +- .../job-queue/handlers/video-redundancy.ts | 2 +- .../handlers/video-studio-edition.ts | 2 +- .../job-queue/handlers/video-transcoding.ts | 2 +- server/lib/job-queue/job-queue.ts | 188 ++++++++++++------ server/lib/transcoding/transcoding.ts | 2 +- shared/core-utils/common/promises.ts | 15 +- shared/models/server/job.model.ts | 6 +- yarn.lock | 59 ++---- 30 files changed, 201 insertions(+), 138 deletions(-) diff --git a/package.json b/package.json index 24924c3da..64faf8355 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,7 @@ "bencode": "^2.0.2", "bittorrent-tracker": "^9.0.0", "bluebird": "^3.5.0", - "bull": "^4.1.0", + "bullmq": "^1.87.0", "bytes": "^3.0.0", "chokidar": "^3.4.2", "commander": "^9.0.0", @@ -183,7 +183,6 @@ "@types/bencode": "^2.0.0", "@types/bluebird": "^3.5.33", "@types/body-parser": "^1.16.3", - "@types/bull": "^3.15.0", "@types/bytes": "^3.0.0", "@types/chai": "^4.0.4", "@types/chai-json-schema": "^1.4.3", diff --git a/server.ts b/server.ts index aaf1ea021..3b9353e2f 100644 --- a/server.ts +++ b/server.ts @@ -352,6 +352,7 @@ async function startApplication () { process.on('exit', () => { JobQueue.Instance.terminate() + .catch(err => logger.error('Cannot terminate job queue.', { err })) }) process.on('SIGINT', () => process.exit(0)) diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index c61b7362f..6a53e3083 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -1,3 +1,4 @@ +import { Job as BullJob } from 'bullmq' import express from 'express' import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' import { isArray } from '../../helpers/custom-validators/misc' @@ -25,7 +26,7 @@ jobsRouter.post('/pause', jobsRouter.post('/resume', authenticate, ensureUserHasRight(UserRight.MANAGE_JOBS), - asyncMiddleware(resumeJobQueue) + resumeJobQueue ) jobsRouter.get('/:state?', @@ -54,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) { return res.sendStatus(HttpStatusCode.NO_CONTENT_204) } -async function resumeJobQueue (req: express.Request, res: express.Response) { - await JobQueue.Instance.resume() +function resumeJobQueue (req: express.Request, res: express.Response) { + JobQueue.Instance.resume() return res.sendStatus(HttpStatusCode.NO_CONTENT_204) } @@ -82,7 +83,7 @@ async function listJobs (req: express.Request, res: express.Response) { return res.json(result) } -async function formatJob (job: any, state?: JobState): Promise { +async function formatJob (job: BullJob, state?: JobState): Promise { const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null @@ -90,9 +91,9 @@ async function formatJob (job: any, state?: JobState): Promise { return { id: job.id, state: state || await job.getState(), - type: job.queue.name as JobType, + type: job.queueName as JobType, data: job.data, - progress: await job.progress(), + progress: job.progress as number, priority: job.opts.priority, error, createdAt: new Date(job.timestamp), diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 65a7321fd..1545a2232 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts @@ -199,7 +199,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await job.finished() + await JobQueue.Instance.waitJob(job) } const hls = video.getHLSPlaylist() @@ -208,7 +208,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await job.finished() + await JobQueue.Instance.waitJob(job) } // Refresh video since files have changed diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 3ce66c9ca..4a9d7b619 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts @@ -17,6 +17,7 @@ import { import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { openapiOperationDoc } from '@server/middlewares/doc' +import { VideoSourceModel } from '@server/models/video/video-source' import { MVideoFile, MVideoFullLight } from '@server/types/models' import { getLowercaseExtension } from '@shared/core-utils' import { isAudioFile, uuidToShort } from '@shared/extra-utils' @@ -44,7 +45,6 @@ import { import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' import { VideoModel } from '../../../models/video/video' import { VideoFileModel } from '../../../models/video/video-file' -import { VideoSourceModel } from '@server/models/video/video-source' const lTags = loggerTagsFactory('api', 'video') const auditLogger = auditLoggerFactory('videos') @@ -270,7 +270,7 @@ async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoF const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await job.finished() + await JobQueue.Instance.waitJob(job) const refreshedVideo = await VideoModel.loadFull(video.id) if (!refreshedVideo) return diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts index f6777ecd5..c168b3e91 100644 --- a/server/helpers/custom-validators/jobs.ts +++ b/server/helpers/custom-validators/jobs.ts @@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models' import { exists } from './misc' import { jobTypes } from '@server/lib/job-queue/job-queue' -const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ] +const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused', 'waiting-children' ] function isValidJobState (value: JobState) { return exists(value) && jobStates.includes(value) diff --git a/server/helpers/ffmpeg/ffmpeg-commons.ts b/server/helpers/ffmpeg/ffmpeg-commons.ts index ee338889c..b01989899 100644 --- a/server/helpers/ffmpeg/ffmpeg-commons.ts +++ b/server/helpers/ffmpeg/ffmpeg-commons.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg' import { execPromise } from '@server/helpers/core-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' @@ -81,7 +81,7 @@ async function runCommand (options: { command.on('progress', progress => { if (!progress.percent) return - job.progress(Math.round(progress.percent)) + job.updateProgress(Math.round(progress.percent)) .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() })) }) } diff --git a/server/helpers/ffmpeg/ffmpeg-vod.ts b/server/helpers/ffmpeg/ffmpeg-vod.ts index f84157e0f..7a81a1313 100644 --- a/server/helpers/ffmpeg/ffmpeg-vod.ts +++ b/server/helpers/ffmpeg/ffmpeg-vod.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { FfmpegCommand } from 'fluent-ffmpeg' import { readFile, writeFile } from 'fs-extra' import { dirname } from 'path' diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 8165a289d..db43c59be 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -1,4 +1,4 @@ -import { CronRepeatOptions, EveryRepeatOptions } from 'bull' +import { RepeatOptions } from 'bullmq' import { randomBytes } from 'crypto' import { invert } from 'lodash' import { join } from 'path' @@ -197,7 +197,7 @@ const JOB_TTL: { [id in JobType]: number } = { 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours } -const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = { +const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { 'videos-views-stats': { cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour }, diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index 3d7dc6fb9..84c0a2de2 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -1,5 +1,5 @@ import { map } from 'bluebird' -import { Job } from 'bull' +import { Job } from 'bullmq' import { isAnnounceActivityValid, isDislikeActivityValid, diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 2ee98171c..944da5be1 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' import { ActivitypubFollowPayload } from '@shared/models' import { sanitizeHost } from '../../../helpers/core-utils' diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 709e8501f..354c608fb 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,5 +1,5 @@ import { map } from 'bluebird' -import { Job } from 'bull' +import { Job } from 'bullmq' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { ActivitypubHttpBroadcastPayload } from '@shared/models' diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index de533de6c..e0b841887 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 99bcd3e8d..837a597a5 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { ActivitypubHttpUnicastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 92ceed180..600f858a0 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' import { loadVideoByUrl } from '@server/lib/model-loaders' diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 9d5a65376..4a5bad9fb 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' import { ActorModel } from '@server/models/actor/actor' import { ActorKeysPayload } from '@shared/models' diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 6fc1caa84..b5b9475b1 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { EmailPayload } from '@shared/models' import { logger } from '../../../helpers/logger' import { Emailer } from '../../emailer' diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index dfd4e6140..4505ca79e 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 49064052c..d608fd865 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { remove } from 'fs-extra' import { join } from 'path' import { logger, loggerTagsFactory } from '@server/helpers/logger' diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 71c5444af..40c44cf52 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { copy, stat } from 'fs-extra' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { CONFIG } from '@server/initializers/config' diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4cde26aef..e5cd35865 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { move, remove, stat } from 'fs-extra' import { retryTransactionWrapper } from '@server/helpers/database-utils' import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 78d0b2192..79002258c 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { readdir, remove } from 'fs-extra' import { join } from 'path' import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 9cb7a6589..75ab2cd02 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' import { VideoRedundancyPayload } from '@shared/models' import { logger } from '../../../helpers/logger' diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 735150d57..078243538 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { move, remove } from 'fs-extra' import { join } from 'path' import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4e5e97919..8dbae8c42 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' import { Hooks } from '@server/lib/plugins/hooks' import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0ae325f4d..0cf5d53ce 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,7 +1,19 @@ -import Bull, { Job, JobOptions, Queue } from 'bull' +import { + Job, + JobsOptions, + Queue, + QueueEvents, + QueueEventsOptions, + QueueOptions, + QueueScheduler, + QueueSchedulerOptions, + Worker, + WorkerOptions +} from 'bullmq' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { timeoutPromise } from '@shared/core-utils' import { ActivitypubFollowPayload, ActivitypubHttpBroadcastPayload, @@ -120,7 +132,11 @@ class JobQueue { private static instance: JobQueue + private workers: { [id in JobType]?: Worker } = {} private queues: { [id in JobType]?: Queue } = {} + private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} + private queueEvents: { [id in JobType]?: QueueEvents } = {} + private initialized = false private jobRedisPrefix: string @@ -134,75 +150,131 @@ class JobQueue { this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - const queueOptions: Bull.QueueOptions = { + for (const handlerName of (Object.keys(handlers) as JobType[])) { + this.buildWorker(handlerName, produceOnly) + this.buildQueue(handlerName) + this.buildQueueScheduler(handlerName, produceOnly) + this.buildQueueEvent(handlerName, produceOnly) + } + + this.addRepeatableJobs() + } + + private buildWorker (handlerName: JobType, produceOnly: boolean) { + const workerOptions: WorkerOptions = { + autorun: !produceOnly, + concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - redis: { - password: CONFIG.REDIS.AUTH, - db: CONFIG.REDIS.DB, - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - path: CONFIG.REDIS.SOCKET - }, - settings: { - maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts - } + connection: this.getRedisConnection() } - for (const handlerName of (Object.keys(handlers) as JobType[])) { - const queue = new Bull(handlerName, queueOptions) + const handler = function (job: Job) { + const timeout = JOB_TTL[handlerName] + const p = handlers[handlerName](job) - if (produceOnly) { - queue.pause(true) - .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) - } + if (!timeout) return p - const handler = handlers[handlerName] + return timeoutPromise(p, timeout) + } - queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job) => { - const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) + const processor = async (jobArg: Job) => { + const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) - return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') - }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') + } - queue.on('failed', (job, err) => { - const logLevel = silentFailure.has(handlerName) - ? 'debug' - : 'error' + const worker = new Worker(handlerName, processor, workerOptions) - logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) + worker.on('failed', (job, err) => { + const logLevel = silentFailure.has(handlerName) + ? 'debug' + : 'error' - if (errorHandlers[job.name]) { - errorHandlers[job.name](job, err) - .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) - } - }) + logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) - queue.on('error', err => { - logger.error('Error in job queue %s.', handlerName, { err }) - }) + if (errorHandlers[job.name]) { + errorHandlers[job.name](job, err) + .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) + } + }) - this.queues[handlerName] = queue + worker.on('error', err => { + logger.error('Error in job queue %s.', handlerName, { err }) + }) + + this.workers[handlerName] = worker + } + + private buildQueue (handlerName: JobType) { + const queueOptions: QueueOptions = { + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix } - this.addRepeatableJobs() + this.queues[handlerName] = new Queue(handlerName, queueOptions) + } + + private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { + const queueSchedulerOptions: QueueSchedulerOptions = { + autorun: !produceOnly, + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix, + maxStalledCount: 10 + } + this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) } - terminate () { - for (const queueName of Object.keys(this.queues)) { - const queue = this.queues[queueName] - queue.close() + private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { + const queueEventsOptions: QueueEventsOptions = { + autorun: !produceOnly, + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix } + this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) + } + + private getRedisConnection () { + return { + password: CONFIG.REDIS.AUTH, + db: CONFIG.REDIS.DB, + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + path: CONFIG.REDIS.SOCKET + } + } + + async terminate () { + const promises = Object.keys(this.workers) + .map(handlerName => { + const worker: Worker = this.workers[handlerName] + const queue: Queue = this.queues[handlerName] + const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] + const queueEvent: QueueEvents = this.queueEvents[handlerName] + + return Promise.all([ + worker.close(false), + queue.close(), + queueScheduler.close(), + queueEvent.close() + ]) + }) + + return Promise.all(promises) } async pause () { - for (const handler of Object.keys(this.queues)) { - await this.queues[handler].pause(true) + for (const handler of Object.keys(this.workers)) { + const worker: Worker = this.workers[handler] + + await worker.pause() } } - async resume () { - for (const handler of Object.keys(this.queues)) { - await this.queues[handler].resume(true) + resume () { + for (const handler of Object.keys(this.workers)) { + const worker: Worker = this.workers[handler] + + worker.resume() } } @@ -211,22 +283,21 @@ class JobQueue { .catch(err => logger.error('Cannot create job.', { err, obj })) } - createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { + async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { const queue: Queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) return } - const jobArgs: JobOptions = { + const jobArgs: JobsOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], - timeout: JOB_TTL[obj.type], priority: options.priority, delay: options.delay } - return queue.add(obj.payload, jobArgs) + return queue.add('job', obj.payload, jobArgs) } async listForApi (options: { @@ -244,7 +315,8 @@ class JobQueue { const filteredJobTypes = this.filterJobTypes(jobType) for (const jobType of filteredJobTypes) { - const queue = this.queues[jobType] + const queue: Queue = this.queues[jobType] + if (queue === undefined) { logger.error('Unknown queue %s to list jobs.', jobType) continue @@ -297,18 +369,22 @@ class JobQueue { async removeOldJobs () { for (const key of Object.keys(this.queues)) { - const queue = this.queues[key] - await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + const queue: Queue = this.queues[key] + await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') } } + waitJob (job: Job) { + return job.waitUntilFinished(this.queueEvents[job.queueName]) + } + private addRepeatableJobs () { - this.queues['videos-views-stats'].add({}, { + this.queues['videos-views-stats'].add('job', {}, { repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { - this.queues['activitypub-cleaner'].add({}, { + this.queues['activitypub-cleaner'].add('job', {}, { repeat: REPEAT_JOBS['activitypub-cleaner'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) } diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts index 070c7ebda..07eee4122 100644 --- a/server/lib/transcoding/transcoding.ts +++ b/server/lib/transcoding/transcoding.ts @@ -1,4 +1,4 @@ -import { Job } from 'bull' +import { Job } from 'bullmq' import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' import { basename, extname as extnameUtil, join } from 'path' import { toEven } from '@server/helpers/core-utils' diff --git a/shared/core-utils/common/promises.ts b/shared/core-utils/common/promises.ts index 7ef9d60b6..dc0db9074 100644 --- a/shared/core-utils/common/promises.ts +++ b/shared/core-utils/common/promises.ts @@ -6,7 +6,20 @@ function isCatchable (value: any) { return value && typeof value.catch === 'function' } +function timeoutPromise (promise: Promise, timeoutMs: number) { + let timer: ReturnType + + return Promise.race([ + promise, + + new Promise((_res, rej) => { + timer = setTimeout(() => rej(new Error('Timeout')), timeoutMs) + }) + ]).finally(() => clearTimeout(timer)) +} + export { isPromise, - isCatchable + isCatchable, + timeoutPromise } diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index ac10ea964..a924183f2 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -4,7 +4,7 @@ import { VideoResolution } from '../videos/file/video-resolution.enum' import { VideoStudioTaskCut } from '../videos/studio' import { SendEmailOptions } from './emailer.model' -export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused' +export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused' | 'waiting-children' export type JobType = | 'activitypub-http-unicast' @@ -27,8 +27,8 @@ export type JobType = | 'video-studio-edition' export interface Job { - id: number - state: JobState + id: number | string + state: JobState | 'unknown' type: JobType data: any priority: number diff --git a/yarn.lock b/yarn.lock index db5433be5..d16fd026c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1959,14 +1959,6 @@ "@types/connect" "*" "@types/node" "*" -"@types/bull@^3.15.0": - version "3.15.8" - resolved "https://registry.yarnpkg.com/@types/bull/-/bull-3.15.8.tgz#ae2139f94490d740b37c8da5d828ce75dd82ce7c" - integrity sha512-8DbSPMSsZH5PWPnGEkAZLYgJEH4ghHJNKF7LB6Wr5R0/v6g+Vs+JoaA7kcvLtHE936xg2WpFPkaoaJgExOmKDw== - dependencies: - "@types/ioredis" "*" - "@types/redis" "^2.8.0" - "@types/bytes@^3.0.0": version "3.1.1" resolved "https://registry.yarnpkg.com/@types/bytes/-/bytes-3.1.1.tgz#67a876422e660dc4c10a27f3e5bcfbd5455f01d0" @@ -2100,13 +2092,6 @@ resolved "https://registry.yarnpkg.com/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz#0ea7b61496902b95890dc4c3a116b60cb8dae812" integrity sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ== -"@types/ioredis@*": - version "4.28.10" - resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.10.tgz#40ceb157a4141088d1394bb87c98ed09a75a06ff" - integrity sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ== - dependencies: - "@types/node" "*" - "@types/json-buffer@~3.0.0": version "3.0.0" resolved "https://registry.yarnpkg.com/@types/json-buffer/-/json-buffer-3.0.0.tgz#85c1ff0f0948fc159810d4b5be35bf8c20875f64" @@ -2284,13 +2269,6 @@ resolved "https://registry.yarnpkg.com/@types/range-parser/-/range-parser-1.2.4.tgz#cd667bcfdd025213aafb7ca5915a932590acdcdc" integrity sha512-EEhsLsD6UsDM1yFhAvy0Cjr6VwmpMWqFBCb9w07wVugF7w9nfajxLuVmngTIpgS6svCnm6Vaw+MZhoDCKnOfsw== -"@types/redis@^2.8.0": - version "2.8.32" - resolved "https://registry.yarnpkg.com/@types/redis/-/redis-2.8.32.tgz#1d3430219afbee10f8cfa389dad2571a05ecfb11" - integrity sha512-7jkMKxcGq9p242exlbsVzuJb57KqHRhNl4dHoQu2Y5v9bCAbtIXXH0R3HleSQW4CTOqpHIYUW3t6tpUj4BVQ+w== - dependencies: - "@types/node" "*" - "@types/request@^2.0.3": version "2.48.8" resolved "https://registry.yarnpkg.com/@types/request/-/request-2.48.8.tgz#0b90fde3b655ab50976cb8c5ac00faca22f5a82c" @@ -3178,20 +3156,20 @@ builtins@^5.0.1: dependencies: semver "^7.0.0" -bull@^4.1.0: - version "4.8.4" - resolved "https://registry.yarnpkg.com/bull/-/bull-4.8.4.tgz#c538610492050d5160dbd9180704145f135a0aa9" - integrity sha512-vDNhM/pvfFY3+msulMbqPBdBO7ntKxRZRtMfi3EguVW/Ozo4uez+B81I8ZoDxYCLgSOBfwRuPnFtcv7QNzm4Ew== +bullmq@^1.87.0: + version "1.87.0" + resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.87.0.tgz#e93618302f547239fbb85ee47f7f1f2c3d0c5eef" + integrity sha512-oN44FaiWJDviWBNx3V8o4FQBdHrfVHRwJuYvU4HnWpBVdCKd6HMbKqF+XeuuxcqBPbbf7cl6hThoKZ+9iTCOkA== dependencies: cron-parser "^4.2.1" - debuglog "^1.0.0" get-port "^5.1.1" + glob "^7.2.0" ioredis "^4.28.5" lodash "^4.17.21" - msgpackr "^1.5.2" - p-timeout "^3.2.0" - semver "^7.3.2" - uuid "^8.3.0" + msgpackr "^1.4.6" + semver "^7.3.7" + tslib "^1.14.1" + uuid "^8.3.2" busboy@^1.0.0: version "1.6.0" @@ -3856,11 +3834,6 @@ debug@^3.2.7: dependencies: ms "^2.1.1" -debuglog@^1.0.0: - version "1.0.1" - resolved "https://registry.yarnpkg.com/debuglog/-/debuglog-1.0.1.tgz#aa24ffb9ac3df9a2351837cfb2d279360cd78492" - integrity sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw== - decamelize@^1.2.0: version "1.2.0" resolved "https://registry.yarnpkg.com/decamelize/-/decamelize-1.2.0.tgz#f6534d15148269b20352e7bee26f501f9a191290" @@ -5169,7 +5142,7 @@ glob@7.2.0: once "^1.3.0" path-is-absolute "^1.0.0" -glob@^7.1.3: +glob@^7.1.3, glob@^7.2.0: version "7.2.3" resolved "https://registry.yarnpkg.com/glob/-/glob-7.2.3.tgz#b8df0fb802bbfa8e89bd1d938b4e16578ed44f2b" integrity sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q== @@ -6696,10 +6669,10 @@ msgpackr-extract@^2.0.2: "@msgpackr-extract/msgpackr-extract-linux-x64" "2.0.2" "@msgpackr-extract/msgpackr-extract-win32-x64" "2.0.2" -msgpackr@^1.5.2: - version "1.6.1" - resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.1.tgz#4f3c94d6a5b819b838ffc736eddaf60eba436d20" - integrity sha512-Je+xBEfdjtvA4bKaOv8iRhjC8qX2oJwpYH4f7JrG4uMVJVmnmkAT4pjKdbztKprGj3iwjcxPzb5umVZ02Qq3tA== +msgpackr@^1.4.6: + version "1.6.2" + resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.2.tgz#176cd9f6b4437dad87a839b37f23c2dfee408d9a" + integrity sha512-bqSQ0DYJbXbrJcrZFmMygUZmqQiDfI2ewFVWcrZY12w5XHWtPuW4WppDT/e63Uu311ajwkRRXSoF0uILroBeTA== optionalDependencies: msgpackr-extract "^2.0.2" @@ -9054,7 +9027,7 @@ tsconfig-paths@^4.0.0: minimist "^1.2.6" strip-bom "^3.0.0" -tslib@^1.11.1, tslib@^1.8.1: +tslib@^1.11.1, tslib@^1.14.1, tslib@^1.8.1: version "1.14.1" resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== @@ -9277,7 +9250,7 @@ uuid-parse@^1.1.0: resolved "https://registry.yarnpkg.com/uuid-parse/-/uuid-parse-1.1.0.tgz#7061c5a1384ae0e1f943c538094597e1b5f3a65b" integrity sha512-OdmXxA8rDsQ7YpNVbKSJkNzTw2I+S5WsbMDnCtIWSQaosNAcWtFuI/YK1TjzUI6nbkgiqEyh8gWngfcv8Asd9A== -uuid@^8.3.0, uuid@^8.3.2: +uuid@^8.3.2: version "8.3.2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== -- 2.41.0