From ab08ab4e284ad145de64ff9e1c9cf8149f701e29 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 11 Aug 2022 11:30:06 +0200 Subject: Don't crash on redis connection error --- server/lib/job-queue/job-queue.ts | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) (limited to 'server/lib/job-queue/job-queue.ts') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 3970d48b7..0fcaaf466 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -50,6 +50,7 @@ import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetch import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' +import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' import { processEmail } from './handlers/email' import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' @@ -62,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' -import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -186,6 +186,7 @@ class JobQueue { connection: this.getRedisConnection(), prefix: this.jobRedisPrefix }) + this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) this.addRepeatableJobs() } @@ -228,9 +229,7 @@ class JobQueue { } }) - worker.on('error', err => { - logger.error('Error in job queue %s.', handlerName, { err }) - }) + worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) this.workers[handlerName] = worker } @@ -241,7 +240,10 @@ class JobQueue { prefix: this.jobRedisPrefix } - this.queues[handlerName] = new Queue(handlerName, queueOptions) + const queue = new Queue(handlerName, queueOptions) + queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) + + this.queues[handlerName] = queue } private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { @@ -251,7 +253,11 @@ class JobQueue { prefix: this.jobRedisPrefix, maxStalledCount: 10 } - this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) + + const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) + queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) + + this.queueSchedulers[handlerName] = queueScheduler } private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { @@ -260,7 +266,11 @@ class JobQueue { connection: this.getRedisConnection(), prefix: this.jobRedisPrefix } - this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) + + const queueEvents = new QueueEvents(handlerName, queueEventsOptions) + queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) + + this.queueEvents[handlerName] = queueEvents } private getRedisConnection () { -- cgit v1.2.3