From 4404a7c467a2c6863728127eeff5ca4b59619940 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 14 Sep 2022 11:35:58 +0200 Subject: Prevent job queue to be started before plugins --- server/lib/job-queue/job-queue.ts | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e54d12acd..655be6568 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -168,7 +168,7 @@ class JobQueue { private constructor () { } - init (produceOnly = false) { + init () { // Already initialized if (this.initialized === true) return this.initialized = true @@ -176,10 +176,10 @@ class JobQueue { this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST for (const handlerName of (Object.keys(handlers) as JobType[])) { - this.buildWorker(handlerName, produceOnly) + this.buildWorker(handlerName) this.buildQueue(handlerName) - this.buildQueueScheduler(handlerName, produceOnly) - this.buildQueueEvent(handlerName, produceOnly) + this.buildQueueScheduler(handlerName) + this.buildQueueEvent(handlerName) } this.flowProducer = new FlowProducer({ @@ -191,9 +191,9 @@ class JobQueue { this.addRepeatableJobs() } - private buildWorker (handlerName: JobType, produceOnly: boolean) { + private buildWorker (handlerName: JobType) { const workerOptions: WorkerOptions = { - autorun: !produceOnly, + autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, connection: this.getRedisConnection() @@ -246,9 +246,9 @@ class JobQueue { this.queues[handlerName] = queue } - private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { + private buildQueueScheduler (handlerName: JobType) { const queueSchedulerOptions: QueueSchedulerOptions = { - autorun: !produceOnly, + autorun: false, connection: this.getRedisConnection(), prefix: this.jobRedisPrefix, maxStalledCount: 10 @@ -260,9 +260,9 @@ class JobQueue { this.queueSchedulers[handlerName] = queueScheduler } - private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { + private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { - autorun: !produceOnly, + autorun: false, connection: this.getRedisConnection(), prefix: this.jobRedisPrefix } @@ -304,6 +304,23 @@ class JobQueue { return Promise.all(promises) } + start () { + const promises = Object.keys(this.workers) + .map(handlerName => { + const worker: Worker = this.workers[handlerName] + const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] + const queueEvent: QueueEvents = this.queueEvents[handlerName] + + return Promise.all([ + worker.run(), + queueScheduler.run(), + queueEvent.run() + ]) + }) + + return Promise.all(promises) + } + async pause () { for (const handlerName of Object.keys(this.workers)) { const worker: Worker = this.workers[handlerName] -- cgit v1.2.3