]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Don't crash on redis connection error
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index 3970d48b754d99d79377300d14f1da997c80c273..0fcaaf4662c876a1e48d775f8941db3095bc7eca 100644 (file)
@@ -50,6 +50,7 @@ import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetch
 import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
 import { refreshAPObject } from './handlers/activitypub-refresher'
 import { processActorKeys } from './handlers/actor-keys'
+import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
 import { processEmail } from './handlers/email'
 import { processFederateVideo } from './handlers/federate-video'
 import { processManageVideoTorrent } from './handlers/manage-video-torrent'
@@ -62,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
 import { processVideoStudioEdition } from './handlers/video-studio-edition'
 import { processVideoTranscoding } from './handlers/video-transcoding'
 import { processVideosViewsStats } from './handlers/video-views-stats'
-import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
 
 export type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -186,6 +186,7 @@ class JobQueue {
       connection: this.getRedisConnection(),
       prefix: this.jobRedisPrefix
     })
+    this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
 
     this.addRepeatableJobs()
   }
@@ -228,9 +229,7 @@ class JobQueue {
       }
     })
 
-    worker.on('error', err => {
-      logger.error('Error in job queue %s.', handlerName, { err })
-    })
+    worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
 
     this.workers[handlerName] = worker
   }
@@ -241,7 +240,10 @@ class JobQueue {
       prefix: this.jobRedisPrefix
     }
 
-    this.queues[handlerName] = new Queue(handlerName, queueOptions)
+    const queue = new Queue(handlerName, queueOptions)
+    queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
+
+    this.queues[handlerName] = queue
   }
 
   private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
@@ -251,7 +253,11 @@ class JobQueue {
       prefix: this.jobRedisPrefix,
       maxStalledCount: 10
     }
-    this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
+
+    const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
+    queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
+
+    this.queueSchedulers[handlerName] = queueScheduler
   }
 
   private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
@@ -260,7 +266,11 @@ class JobQueue {
       connection: this.getRedisConnection(),
       prefix: this.jobRedisPrefix
     }
-    this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
+
+    const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
+    queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
+
+    this.queueEvents[handlerName] = queueEvents
   }
 
   private getRedisConnection () {