- queue.on('failed', (job, err) => {
- logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
+ private buildQueue (handlerName: JobType) {
+ const queueOptions: QueueOptions = {
+ connection: Redis.getRedisClientOptions('Queue'),
+ prefix: this.jobRedisPrefix
+ }
+
+ const queue = new Queue(handlerName, queueOptions)
+ queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
+
+ this.queues[handlerName] = queue
+ }
+
+ private buildQueueEvent (handlerName: JobType) {
+ const queueEventsOptions: QueueEventsOptions = {
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueEvent'),
+ prefix: this.jobRedisPrefix
+ }
+
+ const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
+ queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
+
+ this.queueEvents[handlerName] = queueEvents
+ }
+
+ // ---------------------------------------------------------------------------
+
+ async terminate () {
+ const promises = Object.keys(this.workers)
+ .map(handlerName => {
+ const worker: Worker = this.workers[handlerName]
+ const queue: Queue = this.queues[handlerName]
+ const queueEvent: QueueEvents = this.queueEvents[handlerName]
+
+ return Promise.all([
+ worker.close(false),
+ queue.close(),
+ queueEvent.close()
+ ])