aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-11 11:30:06 +0200
committerChocobozzz <me@florianbigard.com>2022-08-11 11:30:06 +0200
commitab08ab4e284ad145de64ff9e1c9cf8149f701e29 (patch)
tree82735dc06e2e1cb47a92a49c9996b12668f5f2f9 /server/lib/job-queue/job-queue.ts
parentaeb112edbe8524fdd25fcaa339e8cfab63bc16db (diff)
downloadPeerTube-ab08ab4e284ad145de64ff9e1c9cf8149f701e29.tar.gz
PeerTube-ab08ab4e284ad145de64ff9e1c9cf8149f701e29.tar.zst
PeerTube-ab08ab4e284ad145de64ff9e1c9cf8149f701e29.zip
Don't crash on redis connection error
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts24
1 files changed, 17 insertions, 7 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 3970d48b7..0fcaaf466 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -50,6 +50,7 @@ import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetch
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
51import { refreshAPObject } from './handlers/activitypub-refresher' 51import { refreshAPObject } from './handlers/activitypub-refresher'
52import { processActorKeys } from './handlers/actor-keys' 52import { processActorKeys } from './handlers/actor-keys'
53import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
53import { processEmail } from './handlers/email' 54import { processEmail } from './handlers/email'
54import { processFederateVideo } from './handlers/federate-video' 55import { processFederateVideo } from './handlers/federate-video'
55import { processManageVideoTorrent } from './handlers/manage-video-torrent' 56import { processManageVideoTorrent } from './handlers/manage-video-torrent'
@@ -62,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
62import { processVideoStudioEdition } from './handlers/video-studio-edition' 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
63import { processVideoTranscoding } from './handlers/video-transcoding' 64import { processVideoTranscoding } from './handlers/video-transcoding'
64import { processVideosViewsStats } from './handlers/video-views-stats' 65import { processVideosViewsStats } from './handlers/video-views-stats'
65import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
66 66
67export type CreateJobArgument = 67export type CreateJobArgument =
68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -186,6 +186,7 @@ class JobQueue {
186 connection: this.getRedisConnection(), 186 connection: this.getRedisConnection(),
187 prefix: this.jobRedisPrefix 187 prefix: this.jobRedisPrefix
188 }) 188 })
189 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
189 190
190 this.addRepeatableJobs() 191 this.addRepeatableJobs()
191 } 192 }
@@ -228,9 +229,7 @@ class JobQueue {
228 } 229 }
229 }) 230 })
230 231
231 worker.on('error', err => { 232 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
232 logger.error('Error in job queue %s.', handlerName, { err })
233 })
234 233
235 this.workers[handlerName] = worker 234 this.workers[handlerName] = worker
236 } 235 }
@@ -241,7 +240,10 @@ class JobQueue {
241 prefix: this.jobRedisPrefix 240 prefix: this.jobRedisPrefix
242 } 241 }
243 242
244 this.queues[handlerName] = new Queue(handlerName, queueOptions) 243 const queue = new Queue(handlerName, queueOptions)
244 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
245
246 this.queues[handlerName] = queue
245 } 247 }
246 248
247 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { 249 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
@@ -251,7 +253,11 @@ class JobQueue {
251 prefix: this.jobRedisPrefix, 253 prefix: this.jobRedisPrefix,
252 maxStalledCount: 10 254 maxStalledCount: 10
253 } 255 }
254 this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) 256
257 const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
258 queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
259
260 this.queueSchedulers[handlerName] = queueScheduler
255 } 261 }
256 262
257 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { 263 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
@@ -260,7 +266,11 @@ class JobQueue {
260 connection: this.getRedisConnection(), 266 connection: this.getRedisConnection(),
261 prefix: this.jobRedisPrefix 267 prefix: this.jobRedisPrefix
262 } 268 }
263 this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) 269
270 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
271 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
272
273 this.queueEvents[handlerName] = queueEvents
264 } 274 }
265 275
266 private getRedisConnection () { 276 private getRedisConnection () {