diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e54d12acd..655be6568 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -168,7 +168,7 @@ class JobQueue { | |||
168 | private constructor () { | 168 | private constructor () { |
169 | } | 169 | } |
170 | 170 | ||
171 | init (produceOnly = false) { | 171 | init () { |
172 | // Already initialized | 172 | // Already initialized |
173 | if (this.initialized === true) return | 173 | if (this.initialized === true) return |
174 | this.initialized = true | 174 | this.initialized = true |
@@ -176,10 +176,10 @@ class JobQueue { | |||
176 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | 176 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST |
177 | 177 | ||
178 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | 178 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
179 | this.buildWorker(handlerName, produceOnly) | 179 | this.buildWorker(handlerName) |
180 | this.buildQueue(handlerName) | 180 | this.buildQueue(handlerName) |
181 | this.buildQueueScheduler(handlerName, produceOnly) | 181 | this.buildQueueScheduler(handlerName) |
182 | this.buildQueueEvent(handlerName, produceOnly) | 182 | this.buildQueueEvent(handlerName) |
183 | } | 183 | } |
184 | 184 | ||
185 | this.flowProducer = new FlowProducer({ | 185 | this.flowProducer = new FlowProducer({ |
@@ -191,9 +191,9 @@ class JobQueue { | |||
191 | this.addRepeatableJobs() | 191 | this.addRepeatableJobs() |
192 | } | 192 | } |
193 | 193 | ||
194 | private buildWorker (handlerName: JobType, produceOnly: boolean) { | 194 | private buildWorker (handlerName: JobType) { |
195 | const workerOptions: WorkerOptions = { | 195 | const workerOptions: WorkerOptions = { |
196 | autorun: !produceOnly, | 196 | autorun: false, |
197 | concurrency: this.getJobConcurrency(handlerName), | 197 | concurrency: this.getJobConcurrency(handlerName), |
198 | prefix: this.jobRedisPrefix, | 198 | prefix: this.jobRedisPrefix, |
199 | connection: this.getRedisConnection() | 199 | connection: this.getRedisConnection() |
@@ -246,9 +246,9 @@ class JobQueue { | |||
246 | this.queues[handlerName] = queue | 246 | this.queues[handlerName] = queue |
247 | } | 247 | } |
248 | 248 | ||
249 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { | 249 | private buildQueueScheduler (handlerName: JobType) { |
250 | const queueSchedulerOptions: QueueSchedulerOptions = { | 250 | const queueSchedulerOptions: QueueSchedulerOptions = { |
251 | autorun: !produceOnly, | 251 | autorun: false, |
252 | connection: this.getRedisConnection(), | 252 | connection: this.getRedisConnection(), |
253 | prefix: this.jobRedisPrefix, | 253 | prefix: this.jobRedisPrefix, |
254 | maxStalledCount: 10 | 254 | maxStalledCount: 10 |
@@ -260,9 +260,9 @@ class JobQueue { | |||
260 | this.queueSchedulers[handlerName] = queueScheduler | 260 | this.queueSchedulers[handlerName] = queueScheduler |
261 | } | 261 | } |
262 | 262 | ||
263 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { | 263 | private buildQueueEvent (handlerName: JobType) { |
264 | const queueEventsOptions: QueueEventsOptions = { | 264 | const queueEventsOptions: QueueEventsOptions = { |
265 | autorun: !produceOnly, | 265 | autorun: false, |
266 | connection: this.getRedisConnection(), | 266 | connection: this.getRedisConnection(), |
267 | prefix: this.jobRedisPrefix | 267 | prefix: this.jobRedisPrefix |
268 | } | 268 | } |
@@ -304,6 +304,23 @@ class JobQueue { | |||
304 | return Promise.all(promises) | 304 | return Promise.all(promises) |
305 | } | 305 | } |
306 | 306 | ||
307 | start () { | ||
308 | const promises = Object.keys(this.workers) | ||
309 | .map(handlerName => { | ||
310 | const worker: Worker = this.workers[handlerName] | ||
311 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
312 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | ||
313 | |||
314 | return Promise.all([ | ||
315 | worker.run(), | ||
316 | queueScheduler.run(), | ||
317 | queueEvent.run() | ||
318 | ]) | ||
319 | }) | ||
320 | |||
321 | return Promise.all(promises) | ||
322 | } | ||
323 | |||
307 | async pause () { | 324 | async pause () { |
308 | for (const handlerName of Object.keys(this.workers)) { | 325 | for (const handlerName of Object.keys(this.workers)) { |
309 | const worker: Worker = this.workers[handlerName] | 326 | const worker: Worker = this.workers[handlerName] |