import { Hooks } from '../plugins/hooks'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
-import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
+import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
+import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
import { processEmail } from './handlers/email'
import { processFederateVideo } from './handlers/federate-video'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
-import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
+import { Redis } from '../redis'
export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
}
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
- 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
- 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
+ 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
+ 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
private constructor () {
}
- init (produceOnly = false) {
+ init () {
// Already initialized
if (this.initialized === true) return
this.initialized = true
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
for (const handlerName of (Object.keys(handlers) as JobType[])) {
- this.buildWorker(handlerName, produceOnly)
+ this.buildWorker(handlerName)
this.buildQueue(handlerName)
- this.buildQueueScheduler(handlerName, produceOnly)
- this.buildQueueEvent(handlerName, produceOnly)
+ this.buildQueueScheduler(handlerName)
+ this.buildQueueEvent(handlerName)
}
this.flowProducer = new FlowProducer({
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('FlowProducer'),
prefix: this.jobRedisPrefix
})
+ this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
this.addRepeatableJobs()
}
- private buildWorker (handlerName: JobType, produceOnly: boolean) {
+ private buildWorker (handlerName: JobType) {
const workerOptions: WorkerOptions = {
- autorun: !produceOnly,
+ autorun: false,
concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix,
- connection: this.getRedisConnection()
+ connection: Redis.getRedisClientOptions('Worker')
}
const handler = function (job: Job) {
}
})
- worker.on('error', err => {
- logger.error('Error in job queue %s.', handlerName, { err })
- })
+ worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
this.workers[handlerName] = worker
}
private buildQueue (handlerName: JobType) {
const queueOptions: QueueOptions = {
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('Queue'),
prefix: this.jobRedisPrefix
}
- this.queues[handlerName] = new Queue(handlerName, queueOptions)
+ const queue = new Queue(handlerName, queueOptions)
+ queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
+
+ this.queues[handlerName] = queue
}
- private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
+ private buildQueueScheduler (handlerName: JobType) {
const queueSchedulerOptions: QueueSchedulerOptions = {
- autorun: !produceOnly,
- connection: this.getRedisConnection(),
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueScheduler'),
prefix: this.jobRedisPrefix,
maxStalledCount: 10
}
- this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
+
+ const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
+ queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
+
+ this.queueSchedulers[handlerName] = queueScheduler
}
- private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
+ private buildQueueEvent (handlerName: JobType) {
const queueEventsOptions: QueueEventsOptions = {
- autorun: !produceOnly,
- connection: this.getRedisConnection(),
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueEvent'),
prefix: this.jobRedisPrefix
}
- this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
- }
- private getRedisConnection () {
- return {
- password: CONFIG.REDIS.AUTH,
- db: CONFIG.REDIS.DB,
- host: CONFIG.REDIS.HOSTNAME,
- port: CONFIG.REDIS.PORT,
- path: CONFIG.REDIS.SOCKET
- }
+ const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
+ queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
+
+ this.queueEvents[handlerName] = queueEvents
}
// ---------------------------------------------------------------------------
return Promise.all(promises)
}
+ start () {
+ const promises = Object.keys(this.workers)
+ .map(handlerName => {
+ const worker: Worker = this.workers[handlerName]
+ const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
+ const queueEvent: QueueEvents = this.queueEvents[handlerName]
+
+ return Promise.all([
+ worker.run(),
+ queueScheduler.run(),
+ queueEvent.run()
+ ])
+ })
+
+ return Promise.all(promises)
+ }
+
async pause () {
for (const handlerName of Object.keys(this.workers)) {
const worker: Worker = this.workers[handlerName]