From 182082f52d8cd94dc068ce39276af8243a00fb7c Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 16 Feb 2023 14:10:11 +0100 Subject: [PATCH] Update bullmq Requires redis >= 6.2 --- package.json | 2 +- server/initializers/constants.ts | 4 ++-- server/lib/job-queue/job-queue.ts | 36 ++++--------------------------- yarn.lock | 18 ++++++---------- 4 files changed, 13 insertions(+), 47 deletions(-) diff --git a/package.json b/package.json index 0c2b6e5e5..831f4d143 100644 --- a/package.json +++ b/package.json @@ -107,7 +107,7 @@ "bencode": "^2.0.2", "bittorrent-tracker": "^9.0.0", "bluebird": "^3.5.0", - "bullmq": "^1.87.0", + "bullmq": "^3.6.6", "bytes": "^3.0.0", "chokidar": "^3.4.2", "commander": "^10.0.0", diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 992c86ed2..4703e20f2 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -212,10 +212,10 @@ const JOB_TTL: { [id in JobType]: number } = { } const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { 'videos-views-stats': { - cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour + pattern: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour }, 'activitypub-cleaner': { - cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM + pattern: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM } } const JOB_PRIORITY = { diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 8597eb000..cc6be0bd8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -7,11 +7,10 @@ import { QueueEvents, QueueEventsOptions, QueueOptions, - QueueScheduler, - QueueSchedulerOptions, Worker, WorkerOptions } from 'bullmq' +import { parseDurationToMs } from '@server/helpers/core-utils' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' @@ -41,14 +40,7 @@ import { VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { - JOB_ATTEMPTS, - JOB_CONCURRENCY, - JOB_REMOVAL_OPTIONS, - JOB_TTL, - REPEAT_JOBS, - WEBSERVER -} from '../../initializers/constants' +import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { Hooks } from '../plugins/hooks' import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' @@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' -import { parseDurationToMs } from '@server/helpers/core-utils' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -166,7 +157,6 @@ class JobQueue { private workers: { [id in JobType]?: Worker } = {} private queues: { [id in JobType]?: Queue } = {} - private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} private queueEvents: { [id in JobType]?: QueueEvents } = {} private flowProducer: FlowProducer @@ -187,7 +177,6 @@ class JobQueue { for (const handlerName of Object.keys(handlers)) { this.buildWorker(handlerName) this.buildQueue(handlerName) - this.buildQueueScheduler(handlerName) this.buildQueueEvent(handlerName) } @@ -205,7 +194,8 @@ class JobQueue { autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - connection: Redis.getRedisClientOptions('Worker') + connection: Redis.getRedisClientOptions('Worker'), + maxStalledCount: 10 } const handler = function (job: Job) { @@ -255,20 +245,6 @@ class JobQueue { this.queues[handlerName] = queue } - private buildQueueScheduler (handlerName: JobType) { - const queueSchedulerOptions: QueueSchedulerOptions = { - autorun: false, - connection: Redis.getRedisClientOptions('QueueScheduler'), - prefix: this.jobRedisPrefix, - maxStalledCount: 10 - } - - const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) - queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) - - this.queueSchedulers[handlerName] = queueScheduler - } - private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { autorun: false, @@ -289,13 +265,11 @@ class JobQueue { .map(handlerName => { const worker: Worker = this.workers[handlerName] const queue: Queue = this.queues[handlerName] - const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName] return Promise.all([ worker.close(false), queue.close(), - queueScheduler.close(), queueEvent.close() ]) }) @@ -307,12 +281,10 @@ class JobQueue { const promises = Object.keys(this.workers) .map(handlerName => { const worker: Worker = this.workers[handlerName] - const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName] return Promise.all([ worker.run(), - queueScheduler.run(), queueEvent.run() ]) }) diff --git a/yarn.lock b/yarn.lock index a36627524..f8d9557ef 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3205,15 +3205,14 @@ builtins@^5.0.1: dependencies: semver "^7.0.0" -bullmq@^1.87.0: - version "1.91.1" - resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.91.1.tgz#ed17cfd4e314afa398fd099a32d365046b1ed4bc" - integrity sha512-u7dat9I8ZwouZ651AMZkBSvB6NVUPpnAjd4iokd9DM41whqIBnDjuL11h7+kEjcpiDKj6E+wxZiER00FqirZQg== +bullmq@^3.6.6: + version "3.6.6" + resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.6.6.tgz#de3c407021eff2eb283fb2aca66336ebeee9d5c5" + integrity sha512-W71jXrcTdcT3Y5tzMyTx22Cd8O3dTML7vl6KG3YdGVGrO3+UmKRLYfGLn1QwIhIoTQJVvIrSB4qfGs1hgqYRVw== dependencies: cron-parser "^4.6.0" - get-port "6.1.2" glob "^8.0.3" - ioredis "^5.2.2" + ioredis "^5.3.0" lodash "^4.17.21" msgpackr "^1.6.2" semver "^7.3.7" @@ -5181,11 +5180,6 @@ get-port@5.1.1: resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== -get-port@6.1.2: - version "6.1.2" - resolved "https://registry.yarnpkg.com/get-port/-/get-port-6.1.2.tgz#c1228abb67ba0e17fb346da33b15187833b9c08a" - integrity sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw== - get-stdin@^8.0.0: version "8.0.0" resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53" @@ -5704,7 +5698,7 @@ invariant@2.2.4: dependencies: loose-envify "^1.0.0" -ioredis@^5.2.2, ioredis@^5.2.3: +ioredis@^5.2.3, ioredis@^5.3.0: version "5.3.1" resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f" integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg== -- 2.41.0