diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 3970d48b7..0fcaaf466 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -50,6 +50,7 @@ import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetch | |||
50 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 50 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
51 | import { refreshAPObject } from './handlers/activitypub-refresher' | 51 | import { refreshAPObject } from './handlers/activitypub-refresher' |
52 | import { processActorKeys } from './handlers/actor-keys' | 52 | import { processActorKeys } from './handlers/actor-keys' |
53 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' | ||
53 | import { processEmail } from './handlers/email' | 54 | import { processEmail } from './handlers/email' |
54 | import { processFederateVideo } from './handlers/federate-video' | 55 | import { processFederateVideo } from './handlers/federate-video' |
55 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 56 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
@@ -62,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
62 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
63 | import { processVideoTranscoding } from './handlers/video-transcoding' | 64 | import { processVideoTranscoding } from './handlers/video-transcoding' |
64 | import { processVideosViewsStats } from './handlers/video-views-stats' | 65 | import { processVideosViewsStats } from './handlers/video-views-stats' |
65 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' | ||
66 | 66 | ||
67 | export type CreateJobArgument = | 67 | export type CreateJobArgument = |
68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -186,6 +186,7 @@ class JobQueue { | |||
186 | connection: this.getRedisConnection(), | 186 | connection: this.getRedisConnection(), |
187 | prefix: this.jobRedisPrefix | 187 | prefix: this.jobRedisPrefix |
188 | }) | 188 | }) |
189 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) | ||
189 | 190 | ||
190 | this.addRepeatableJobs() | 191 | this.addRepeatableJobs() |
191 | } | 192 | } |
@@ -228,9 +229,7 @@ class JobQueue { | |||
228 | } | 229 | } |
229 | }) | 230 | }) |
230 | 231 | ||
231 | worker.on('error', err => { | 232 | worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) |
232 | logger.error('Error in job queue %s.', handlerName, { err }) | ||
233 | }) | ||
234 | 233 | ||
235 | this.workers[handlerName] = worker | 234 | this.workers[handlerName] = worker |
236 | } | 235 | } |
@@ -241,7 +240,10 @@ class JobQueue { | |||
241 | prefix: this.jobRedisPrefix | 240 | prefix: this.jobRedisPrefix |
242 | } | 241 | } |
243 | 242 | ||
244 | this.queues[handlerName] = new Queue(handlerName, queueOptions) | 243 | const queue = new Queue(handlerName, queueOptions) |
244 | queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) | ||
245 | |||
246 | this.queues[handlerName] = queue | ||
245 | } | 247 | } |
246 | 248 | ||
247 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { | 249 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { |
@@ -251,7 +253,11 @@ class JobQueue { | |||
251 | prefix: this.jobRedisPrefix, | 253 | prefix: this.jobRedisPrefix, |
252 | maxStalledCount: 10 | 254 | maxStalledCount: 10 |
253 | } | 255 | } |
254 | this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) | 256 | |
257 | const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) | ||
258 | queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) | ||
259 | |||
260 | this.queueSchedulers[handlerName] = queueScheduler | ||
255 | } | 261 | } |
256 | 262 | ||
257 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { | 263 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { |
@@ -260,7 +266,11 @@ class JobQueue { | |||
260 | connection: this.getRedisConnection(), | 266 | connection: this.getRedisConnection(), |
261 | prefix: this.jobRedisPrefix | 267 | prefix: this.jobRedisPrefix |
262 | } | 268 | } |
263 | this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) | 269 | |
270 | const queueEvents = new QueueEvents(handlerName, queueEventsOptions) | ||
271 | queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) | ||
272 | |||
273 | this.queueEvents[handlerName] = queueEvents | ||
264 | } | 274 | } |
265 | 275 | ||
266 | private getRedisConnection () { | 276 | private getRedisConnection () { |