+ const queue = new Queue(handlerName, queueOptions)
+ queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
+
+ this.queues[handlerName] = queue
+ }
+
+ private buildQueueScheduler (handlerName: JobType) {
+ const queueSchedulerOptions: QueueSchedulerOptions = {
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueScheduler'),
+ prefix: this.jobRedisPrefix,
+ maxStalledCount: 10
+ }
+
+ const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
+ queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
+
+ this.queueSchedulers[handlerName] = queueScheduler
+ }
+
+ private buildQueueEvent (handlerName: JobType) {
+ const queueEventsOptions: QueueEventsOptions = {
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueEvent'),
+ prefix: this.jobRedisPrefix
+ }
+
+ const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
+ queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
+
+ this.queueEvents[handlerName] = queueEvents
+ }
+
+ // ---------------------------------------------------------------------------
+
+ async terminate () {
+ const promises = Object.keys(this.workers)
+ .map(handlerName => {
+ const worker: Worker = this.workers[handlerName]
+ const queue: Queue = this.queues[handlerName]
+ const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
+ const queueEvent: QueueEvents = this.queueEvents[handlerName]
+
+ return Promise.all([
+ worker.close(false),
+ queue.close(),
+ queueScheduler.close(),
+ queueEvent.close()
+ ])
+ })
+
+ return Promise.all(promises)
+ }
+
+ start () {
+ const promises = Object.keys(this.workers)
+ .map(handlerName => {
+ const worker: Worker = this.workers[handlerName]
+ const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
+ const queueEvent: QueueEvents = this.queueEvents[handlerName]
+
+ return Promise.all([
+ worker.run(),
+ queueScheduler.run(),
+ queueEvent.run()
+ ])