ActivitypubHttpFetcherPayload,
ActivitypubHttpUnicastPayload,
ActorKeysPayload,
+ AfterVideoChannelImportPayload,
DeleteResumableUploadMetaFilePayload,
EmailPayload,
FederateVideoPayload,
MoveObjectStoragePayload,
NotifyPayload,
RefreshPayload,
+ VideoChannelImportPayload,
VideoFileImportPayload,
VideoImportPayload,
VideoLiveEndingPayload,
VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
-import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
+import {
+ JOB_ATTEMPTS,
+ JOB_CONCURRENCY,
+ JOB_REMOVAL_OPTIONS,
+ JOB_TTL,
+ REPEAT_JOBS,
+ WEBSERVER
+} from '../../initializers/constants'
import { Hooks } from '../plugins/hooks'
+import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
-import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
+import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
+import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
import { processEmail } from './handlers/email'
import { processFederateVideo } from './handlers/federate-video'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { processNotify } from './handlers/notify'
+import { processVideoChannelImport } from './handlers/video-channel-import'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
+import { parseDurationToMs } from '@server/helpers/core-utils'
export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
{ type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
{ type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
+ { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
+ { type: 'video-channel-import', payload: VideoChannelImportPayload } |
+ { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
{ type: 'notify', payload: NotifyPayload } |
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
{ type: 'federate-video', payload: FederateVideoPayload }
}
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
- 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
- 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
+ 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
+ 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
'video-redundancy': processVideoRedundancy,
'move-to-object-storage': processMoveToObjectStorage,
'manage-video-torrent': processManageVideoTorrent,
- 'notify': processNotify,
'video-studio-edition': processVideoStudioEdition,
+ 'video-channel-import': processVideoChannelImport,
+ 'after-video-channel-import': processAfterVideoChannelImport,
+ 'notify': processNotify,
'federate-video': processFederateVideo
}
'move-to-object-storage',
'manage-video-torrent',
'video-studio-edition',
+ 'video-channel-import',
+ 'after-video-channel-import',
'notify',
'federate-video'
]
private constructor () {
}
- init (produceOnly = false) {
+ init () {
// Already initialized
if (this.initialized === true) return
this.initialized = true
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
- for (const handlerName of (Object.keys(handlers) as JobType[])) {
- this.buildWorker(handlerName, produceOnly)
+ for (const handlerName of Object.keys(handlers)) {
+ this.buildWorker(handlerName)
this.buildQueue(handlerName)
- this.buildQueueScheduler(handlerName, produceOnly)
- this.buildQueueEvent(handlerName, produceOnly)
+ this.buildQueueScheduler(handlerName)
+ this.buildQueueEvent(handlerName)
}
this.flowProducer = new FlowProducer({
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('FlowProducer'),
prefix: this.jobRedisPrefix
})
+ this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
this.addRepeatableJobs()
}
- private buildWorker (handlerName: JobType, produceOnly: boolean) {
+ private buildWorker (handlerName: JobType) {
const workerOptions: WorkerOptions = {
- autorun: !produceOnly,
+ autorun: false,
concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix,
- connection: this.getRedisConnection()
+ connection: Redis.getRedisClientOptions('Worker')
}
const handler = function (job: Job) {
}
})
- worker.on('error', err => {
- logger.error('Error in job queue %s.', handlerName, { err })
- })
+ worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
this.workers[handlerName] = worker
}
private buildQueue (handlerName: JobType) {
const queueOptions: QueueOptions = {
- connection: this.getRedisConnection(),
+ connection: Redis.getRedisClientOptions('Queue'),
prefix: this.jobRedisPrefix
}
- this.queues[handlerName] = new Queue(handlerName, queueOptions)
+ const queue = new Queue(handlerName, queueOptions)
+ queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
+
+ this.queues[handlerName] = queue
}
- private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
+ private buildQueueScheduler (handlerName: JobType) {
const queueSchedulerOptions: QueueSchedulerOptions = {
- autorun: !produceOnly,
- connection: this.getRedisConnection(),
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueScheduler'),
prefix: this.jobRedisPrefix,
maxStalledCount: 10
}
- this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
+
+ const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
+ queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
+
+ this.queueSchedulers[handlerName] = queueScheduler
}
- private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
+ private buildQueueEvent (handlerName: JobType) {
const queueEventsOptions: QueueEventsOptions = {
- autorun: !produceOnly,
- connection: this.getRedisConnection(),
+ autorun: false,
+ connection: Redis.getRedisClientOptions('QueueEvent'),
prefix: this.jobRedisPrefix
}
- this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
- }
- private getRedisConnection () {
- return {
- password: CONFIG.REDIS.AUTH,
- db: CONFIG.REDIS.DB,
- host: CONFIG.REDIS.HOSTNAME,
- port: CONFIG.REDIS.PORT,
- path: CONFIG.REDIS.SOCKET
- }
+ const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
+ queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
+
+ this.queueEvents[handlerName] = queueEvents
}
// ---------------------------------------------------------------------------
return Promise.all(promises)
}
+ start () {
+ const promises = Object.keys(this.workers)
+ .map(handlerName => {
+ const worker: Worker = this.workers[handlerName]
+ const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
+ const queueEvent: QueueEvents = this.queueEvents[handlerName]
+
+ return Promise.all([
+ worker.run(),
+ queueScheduler.run(),
+ queueEvent.run()
+ ])
+ })
+
+ return Promise.all(promises)
+ }
+
async pause () {
- for (const handler of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handler]
+ for (const handlerName of Object.keys(this.workers)) {
+ const worker: Worker = this.workers[handlerName]
await worker.pause()
}
}
resume () {
- for (const handler of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handler]
+ for (const handlerName of Object.keys(this.workers)) {
+ const worker: Worker = this.workers[handlerName]
worker.resume()
}
.catch(err => logger.error('Cannot create job.', { err, options }))
}
- async createJob (options: CreateJobArgument & CreateJobOptions) {
+ createJob (options: CreateJobArgument & CreateJobOptions) {
const queue: Queue = this.queues[options.type]
if (queue === undefined) {
logger.error('Unknown queue %s: cannot create job.', options.type)
return queue.add('job', options.payload, jobOptions)
}
- async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
+ createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
let lastJob: FlowJob
for (const job of jobs) {
if (!job) continue
lastJob = {
- name: 'job',
- data: job.payload,
- queueName: job.type,
- opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+ ...this.buildJobFlowOption(job),
+
children: lastJob
? [ lastJob ]
: []
return this.flowProducer.add(lastJob)
}
+ createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
+ return this.flowProducer.add({
+ ...this.buildJobFlowOption(parent),
+
+ children: children.map(c => this.buildJobFlowOption(c))
+ })
+ }
+
+ private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
+ return {
+ name: 'job',
+ data: job.payload,
+ queueName: job.type,
+ opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
+ }
+ }
+
private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
return {
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[type],
priority: options.priority,
- delay: options.delay
+ delay: options.delay,
+
+ ...this.buildJobRemovalOptions(type)
}
}
}): Promise<Job[]> {
const { state, start, count, asc, jobType } = options
- const states = state ? [ state ] : jobStates
- let results: Job[] = []
+ const states = this.buildStateFilter(state)
+ const filteredJobTypes = this.buildTypeFilter(jobType)
- const filteredJobTypes = this.filterJobTypes(jobType)
+ let results: Job[] = []
for (const jobType of filteredJobTypes) {
const queue: Queue = this.queues[jobType]
async count (state: JobState, jobType?: JobType): Promise<number> {
const states = state ? [ state ] : jobStates
- let total = 0
+ const filteredJobTypes = this.buildTypeFilter(jobType)
- const filteredJobTypes = this.filterJobTypes(jobType)
+ let total = 0
for (const type of filteredJobTypes) {
const queue = this.queues[type]
return total
}
+ private buildStateFilter (state?: JobState) {
+ if (!state) return jobStates
+
+ const states = [ state ]
+
+ // Include parent if filtering on waiting
+ if (state === 'waiting') states.push('waiting-children')
+
+ return states
+ }
+
+ private buildTypeFilter (jobType?: JobType) {
+ if (!jobType) return jobTypes
+
+ return jobTypes.filter(t => t === jobType)
+ }
+
async getStats () {
const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]
- await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
}
}
- waitJob (job: Job) {
- return job.waitUntilFinished(this.queueEvents[job.queueName])
- }
-
private addRepeatableJobs () {
this.queues['videos-views-stats'].add('job', {}, {
- repeat: REPEAT_JOBS['videos-views-stats']
+ repeat: REPEAT_JOBS['videos-views-stats'],
+
+ ...this.buildJobRemovalOptions('videos-views-stats')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
this.queues['activitypub-cleaner'].add('job', {}, {
- repeat: REPEAT_JOBS['activitypub-cleaner']
+ repeat: REPEAT_JOBS['activitypub-cleaner'],
+
+ ...this.buildJobRemovalOptions('activitypub-cleaner')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
}
}
- private filterJobTypes (jobType?: JobType) {
- if (!jobType) return jobTypes
-
- return jobTypes.filter(t => t === jobType)
- }
-
private getJobConcurrency (jobType: JobType) {
if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
return JOB_CONCURRENCY[jobType]
}
+ private buildJobRemovalOptions (queueName: string) {
+ return {
+ removeOnComplete: {
+ // Wants seconds
+ age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
+
+ count: JOB_REMOVAL_OPTIONS.COUNT
+ },
+ removeOnFail: {
+ // Wants seconds
+ age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
+
+ count: JOB_REMOVAL_OPTIONS.COUNT / 1000
+ }
+ }
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}