QueueEvents,
QueueEventsOptions,
QueueOptions,
- QueueScheduler,
- QueueSchedulerOptions,
Worker,
WorkerOptions
} from 'bullmq'
+import { parseDurationToMs } from '@server/helpers/core-utils'
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
MoveObjectStoragePayload,
NotifyPayload,
RefreshPayload,
+ TranscodingJobBuilderPayload,
VideoChannelImportPayload,
VideoFileImportPayload,
VideoImportPayload,
VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
-import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
+import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import { Hooks } from '../plugins/hooks'
+import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { processNotify } from './handlers/notify'
+import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder'
import { processVideoChannelImport } from './handlers/video-channel-import'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
{ type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
- { type: 'activitypub-http-cleaner', payload: {} } |
+ { type: 'activitypub-cleaner', payload: {} } |
{ type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
{ type: 'video-file-import', payload: VideoFileImportPayload } |
{ type: 'video-transcoding', payload: VideoTranscodingPayload } |
{ type: 'email', payload: EmailPayload } |
+ { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
{ type: 'video-import', payload: VideoImportPayload } |
{ type: 'activitypub-refresher', payload: RefreshPayload } |
{ type: 'videos-views-stats', payload: {} } |
export type CreateJobOptions = {
delay?: number
priority?: number
+ failParentOnFailure?: boolean
}
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
- 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
- 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
- 'activitypub-http-unicast': processActivityPubHttpUnicast,
- 'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
'activitypub-follow': processActivityPubFollow,
- 'video-file-import': processVideoFileImport,
- 'video-transcoding': processVideoTranscoding,
+ 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
+ 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
+ 'activitypub-http-fetcher': processActivityPubHttpFetcher,
+ 'activitypub-http-unicast': processActivityPubHttpUnicast,
+ 'activitypub-refresher': refreshAPObject,
+ 'actor-keys': processActorKeys,
+ 'after-video-channel-import': processAfterVideoChannelImport,
'email': processEmail,
+ 'federate-video': processFederateVideo,
+ 'transcoding-job-builder': processTranscodingJobBuilder,
+ 'manage-video-torrent': processManageVideoTorrent,
+ 'move-to-object-storage': processMoveToObjectStorage,
+ 'notify': processNotify,
+ 'video-channel-import': processVideoChannelImport,
+ 'video-file-import': processVideoFileImport,
'video-import': processVideoImport,
- 'videos-views-stats': processVideosViewsStats,
- 'activitypub-refresher': refreshAPObject,
'video-live-ending': processVideoLiveEnding,
- 'actor-keys': processActorKeys,
'video-redundancy': processVideoRedundancy,
- 'move-to-object-storage': processMoveToObjectStorage,
- 'manage-video-torrent': processManageVideoTorrent,
'video-studio-edition': processVideoStudioEdition,
- 'video-channel-import': processVideoChannelImport,
- 'after-video-channel-import': processAfterVideoChannelImport,
- 'notify': processNotify,
- 'federate-video': processFederateVideo
+ 'video-transcoding': processVideoTranscoding,
+ 'videos-views-stats': processVideosViewsStats
}
const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
}
const jobTypes: JobType[] = [
+ 'activitypub-cleaner',
'activitypub-follow',
- 'activitypub-http-broadcast',
'activitypub-http-broadcast-parallel',
+ 'activitypub-http-broadcast',
'activitypub-http-fetcher',
'activitypub-http-unicast',
- 'activitypub-cleaner',
+ 'activitypub-refresher',
+ 'actor-keys',
+ 'after-video-channel-import',
'email',
- 'video-transcoding',
+ 'federate-video',
+ 'transcoding-job-builder',
+ 'manage-video-torrent',
+ 'move-to-object-storage',
+ 'notify',
+ 'video-channel-import',
'video-file-import',
'video-import',
- 'videos-views-stats',
- 'activitypub-refresher',
- 'video-redundancy',
- 'actor-keys',
'video-live-ending',
- 'move-to-object-storage',
- 'manage-video-torrent',
+ 'video-redundancy',
'video-studio-edition',
- 'video-channel-import',
- 'after-video-channel-import',
- 'notify',
- 'federate-video'
+ 'video-transcoding',
+ 'videos-views-stats'
]
const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
private workers: { [id in JobType]?: Worker } = {}
private queues: { [id in JobType]?: Queue } = {}
- private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
private queueEvents: { [id in JobType]?: QueueEvents } = {}
private flowProducer: FlowProducer
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
- for (const handlerName of (Object.keys(handlers) as JobType[])) {
+ for (const handlerName of Object.keys(handlers)) {
this.buildWorker(handlerName)
this.buildQueue(handlerName)
- this.buildQueueScheduler(handlerName)
this.buildQueueEvent(handlerName)
}
this.flowProducer = new FlowProducer({
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('FlowProducer'),
prefix: this.jobRedisPrefix
})
this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
autorun: false,
concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix,
- connection: this.getRedisConnection()
+ connection: Redis.getRedisClientOptions('Worker'),
+ maxStalledCount: 10
}
const handler = function (job: Job) {
private buildQueue (handlerName: JobType) {
const queueOptions: QueueOptions = {
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('Queue'),
prefix: this.jobRedisPrefix
}
this.queues[handlerName] = queue
}
- private buildQueueScheduler (handlerName: JobType) {
- const queueSchedulerOptions: QueueSchedulerOptions = {
- autorun: false,
- connection: this.getRedisConnection(),
- prefix: this.jobRedisPrefix,
- maxStalledCount: 10
- }
-
- const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
- queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
-
- this.queueSchedulers[handlerName] = queueScheduler
- }
-
private buildQueueEvent (handlerName: JobType) {
const queueEventsOptions: QueueEventsOptions = {
autorun: false,
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('QueueEvent'),
prefix: this.jobRedisPrefix
}
this.queueEvents[handlerName] = queueEvents
}
- private getRedisConnection () {
- return {
- password: CONFIG.REDIS.AUTH,
- db: CONFIG.REDIS.DB,
- host: CONFIG.REDIS.HOSTNAME,
- port: CONFIG.REDIS.PORT,
- path: CONFIG.REDIS.SOCKET
- }
- }
-
// ---------------------------------------------------------------------------
async terminate () {
.map(handlerName => {
const worker: Worker = this.workers[handlerName]
const queue: Queue = this.queues[handlerName]
- const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([
worker.close(false),
queue.close(),
- queueScheduler.close(),
queueEvent.close()
])
})
const promises = Object.keys(this.workers)
.map(handlerName => {
const worker: Worker = this.workers[handlerName]
- const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([
worker.run(),
- queueScheduler.run(),
queueEvent.run()
])
})
})
}
- private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
+ private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
return {
name: 'job',
data: job.payload,
queueName: job.type,
- opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
+ opts: {
+ failParentOnFailure: true,
+
+ ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ]))
+ }
}
}
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[type],
priority: options.priority,
- delay: options.delay
+ delay: options.delay,
+
+ ...this.buildJobRemovalOptions(type)
}
}
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]
- await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
}
}
private addRepeatableJobs () {
this.queues['videos-views-stats'].add('job', {}, {
- repeat: REPEAT_JOBS['videos-views-stats']
+ repeat: REPEAT_JOBS['videos-views-stats'],
+
+ ...this.buildJobRemovalOptions('videos-views-stats')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
this.queues['activitypub-cleaner'].add('job', {}, {
- repeat: REPEAT_JOBS['activitypub-cleaner']
+ repeat: REPEAT_JOBS['activitypub-cleaner'],
+
+ ...this.buildJobRemovalOptions('activitypub-cleaner')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
}
}
return JOB_CONCURRENCY[jobType]
}
+ private buildJobRemovalOptions (queueName: string) {
+ return {
+ removeOnComplete: {
+ // Wants seconds
+ age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
+
+ count: JOB_REMOVAL_OPTIONS.COUNT
+ },
+ removeOnFail: {
+ // Wants seconds
+ age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
+
+ count: JOB_REMOVAL_OPTIONS.COUNT / 1000
+ }
+ }
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}