X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=03f6fbea75a382f1f8b656d4e8525cf9dbf0c87d;hb=26818a73ba0d7fd53ca69eba0c8e525f3670b5a8;hp=50d732bebcf5c05c9df0c1eac2b12d3c6585ab24;hpb=bd911b54b555b11df7e9849cf92d358bccfecf6e;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..03f6fbea7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -7,11 +7,10 @@ import { QueueEvents, QueueEventsOptions, QueueOptions, - QueueScheduler, - QueueSchedulerOptions, Worker, WorkerOptions } from 'bullmq' +import { parseDurationToMs } from '@server/helpers/core-utils' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' @@ -22,6 +21,7 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + AfterVideoChannelImportPayload, DeleteResumableUploadMetaFilePayload, EmailPayload, FederateVideoPayload, @@ -31,6 +31,8 @@ import { MoveObjectStoragePayload, NotifyPayload, RefreshPayload, + TranscodingJobBuilderPayload, + VideoChannelImportPayload, VideoFileImportPayload, VideoImportPayload, VideoLiveEndingPayload, @@ -39,20 +41,24 @@ import { VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { Hooks } from '../plugins/hooks' +import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' -import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' +import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' import { processEmail } from './handlers/email' import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processNotify } from './handlers/notify' +import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' +import { processVideoChannelImport } from './handlers/video-channel-import' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' @@ -65,11 +71,12 @@ export type CreateJobArgument = { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | - { type: 'activitypub-http-cleaner', payload: {} } | + { type: 'activitypub-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | + { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views-stats', payload: {} } | @@ -79,6 +86,9 @@ export type CreateJobArgument = { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | + { type: 'video-channel-import', payload: VideoChannelImportPayload } | + { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | { type: 'notify', payload: NotifyPayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | { type: 'federate-video', payload: FederateVideoPayload } @@ -86,29 +96,33 @@ export type CreateJobArgument = export type CreateJobOptions = { delay?: number priority?: number + failParentOnFailure?: boolean } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, - 'activitypub-http-unicast': processActivityPubHttpUnicast, - 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, - 'video-file-import': processVideoFileImport, - 'video-transcoding': processVideoTranscoding, - 'email': processEmail, - 'video-import': processVideoImport, - 'videos-views-stats': processVideosViewsStats, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-refresher': refreshAPObject, - 'video-live-ending': processVideoLiveEnding, 'actor-keys': processActorKeys, - 'video-redundancy': processVideoRedundancy, - 'move-to-object-storage': processMoveToObjectStorage, + 'after-video-channel-import': processAfterVideoChannelImport, + 'email': processEmail, + 'federate-video': processFederateVideo, + 'transcoding-job-builder': processTranscodingJobBuilder, 'manage-video-torrent': processManageVideoTorrent, + 'move-to-object-storage': processMoveToObjectStorage, 'notify': processNotify, + 'video-channel-import': processVideoChannelImport, + 'video-file-import': processVideoFileImport, + 'video-import': processVideoImport, + 'video-live-ending': processVideoLiveEnding, + 'video-redundancy': processVideoRedundancy, 'video-studio-edition': processVideoStudioEdition, - 'federate-video': processFederateVideo + 'video-transcoding': processVideoTranscoding, + 'videos-views-stats': processVideosViewsStats } const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } = { @@ -116,26 +130,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } } const jobTypes: JobType[] = [ + 'activitypub-cleaner', 'activitypub-follow', - 'activitypub-http-broadcast', 'activitypub-http-broadcast-parallel', + 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', - 'activitypub-cleaner', + 'activitypub-refresher', + 'actor-keys', + 'after-video-channel-import', 'email', - 'video-transcoding', + 'federate-video', + 'transcoding-job-builder', + 'manage-video-torrent', + 'move-to-object-storage', + 'notify', + 'video-channel-import', 'video-file-import', 'video-import', - 'videos-views-stats', - 'activitypub-refresher', - 'video-redundancy', - 'actor-keys', 'video-live-ending', - 'move-to-object-storage', - 'manage-video-torrent', + 'video-redundancy', 'video-studio-edition', - 'notify', - 'federate-video' + 'video-transcoding', + 'videos-views-stats' ] const silentFailure = new Set([ 'activitypub-http-unicast' ]) @@ -146,7 +163,6 @@ class JobQueue { private workers: { [id in JobType]?: Worker } = {} private queues: { [id in JobType]?: Queue } = {} - private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} private queueEvents: { [id in JobType]?: QueueEvents } = {} private flowProducer: FlowProducer @@ -157,34 +173,35 @@ class JobQueue { private constructor () { } - init (produceOnly = false) { + init () { // Already initialized if (this.initialized === true) return this.initialized = true this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - for (const handlerName of (Object.keys(handlers) as JobType[])) { - this.buildWorker(handlerName, produceOnly) + for (const handlerName of Object.keys(handlers)) { + this.buildWorker(handlerName) this.buildQueue(handlerName) - this.buildQueueScheduler(handlerName, produceOnly) - this.buildQueueEvent(handlerName, produceOnly) + this.buildQueueEvent(handlerName) } this.flowProducer = new FlowProducer({ - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('FlowProducer'), prefix: this.jobRedisPrefix }) + this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) this.addRepeatableJobs() } - private buildWorker (handlerName: JobType, produceOnly: boolean) { + private buildWorker (handlerName: JobType) { const workerOptions: WorkerOptions = { - autorun: !produceOnly, + autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - connection: this.getRedisConnection() + connection: Redis.getRedisClientOptions('Worker'), + maxStalledCount: 10 } const handler = function (job: Job) { @@ -217,49 +234,34 @@ class JobQueue { } }) - worker.on('error', err => { - logger.error('Error in job queue %s.', handlerName, { err }) - }) + worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) this.workers[handlerName] = worker } private buildQueue (handlerName: JobType) { const queueOptions: QueueOptions = { - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('Queue'), prefix: this.jobRedisPrefix } - this.queues[handlerName] = new Queue(handlerName, queueOptions) - } + const queue = new Queue(handlerName, queueOptions) + queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) - private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { - const queueSchedulerOptions: QueueSchedulerOptions = { - autorun: !produceOnly, - connection: this.getRedisConnection(), - prefix: this.jobRedisPrefix, - maxStalledCount: 10 - } - this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) + this.queues[handlerName] = queue } - private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { + private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { - autorun: !produceOnly, - connection: this.getRedisConnection(), + autorun: false, + connection: Redis.getRedisClientOptions('QueueEvent'), prefix: this.jobRedisPrefix } - this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) - } - private getRedisConnection () { - return { - password: CONFIG.REDIS.AUTH, - db: CONFIG.REDIS.DB, - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - path: CONFIG.REDIS.SOCKET - } + const queueEvents = new QueueEvents(handlerName, queueEventsOptions) + queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) + + this.queueEvents[handlerName] = queueEvents } // --------------------------------------------------------------------------- @@ -269,13 +271,11 @@ class JobQueue { .map(handlerName => { const worker: Worker = this.workers[handlerName] const queue: Queue = this.queues[handlerName] - const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName] return Promise.all([ worker.close(false), queue.close(), - queueScheduler.close(), queueEvent.close() ]) }) @@ -283,17 +283,32 @@ class JobQueue { return Promise.all(promises) } + start () { + const promises = Object.keys(this.workers) + .map(handlerName => { + const worker: Worker = this.workers[handlerName] + const queueEvent: QueueEvents = this.queueEvents[handlerName] + + return Promise.all([ + worker.run(), + queueEvent.run() + ]) + }) + + return Promise.all(promises) + } + async pause () { - for (const handler of Object.keys(this.workers)) { - const worker: Worker = this.workers[handler] + for (const handlerName of Object.keys(this.workers)) { + const worker: Worker = this.workers[handlerName] await worker.pause() } } resume () { - for (const handler of Object.keys(this.workers)) { - const worker: Worker = this.workers[handler] + for (const handlerName of Object.keys(this.workers)) { + const worker: Worker = this.workers[handlerName] worker.resume() } @@ -306,7 +321,7 @@ class JobQueue { .catch(err => logger.error('Cannot create job.', { err, options })) } - async createJob (options: CreateJobArgument & CreateJobOptions) { + createJob (options: CreateJobArgument & CreateJobOptions) { const queue: Queue = this.queues[options.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', options.type) @@ -318,17 +333,15 @@ class JobQueue { return queue.add('job', options.payload, jobOptions) } - async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { + createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { let lastJob: FlowJob for (const job of jobs) { if (!job) continue lastJob = { - name: 'job', - data: job.payload, - queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), + ...this.buildJobFlowOption(job), + children: lastJob ? [ lastJob ] : [] @@ -338,12 +351,35 @@ class JobQueue { return this.flowProducer.add(lastJob) } + createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { + return this.flowProducer.add({ + ...this.buildJobFlowOption(parent), + + children: children.map(c => this.buildJobFlowOption(c)) + }) + } + + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { + return { + name: 'job', + data: job.payload, + queueName: job.type, + opts: { + failParentOnFailure: true, + + ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) + } + } + } + private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { return { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[type], priority: options.priority, - delay: options.delay + delay: options.delay, + + ...this.buildJobRemovalOptions(type) } } @@ -358,10 +394,10 @@ class JobQueue { }): Promise { const { state, start, count, asc, jobType } = options - const states = state ? [ state ] : jobStates - let results: Job[] = [] + const states = this.buildStateFilter(state) + const filteredJobTypes = this.buildTypeFilter(jobType) - const filteredJobTypes = this.filterJobTypes(jobType) + let results: Job[] = [] for (const jobType of filteredJobTypes) { const queue: Queue = this.queues[jobType] @@ -389,9 +425,9 @@ class JobQueue { async count (state: JobState, jobType?: JobType): Promise { const states = state ? [ state ] : jobStates - let total = 0 + const filteredJobTypes = this.buildTypeFilter(jobType) - const filteredJobTypes = this.filterJobTypes(jobType) + let total = 0 for (const type of filteredJobTypes) { const queue = this.queues[type] @@ -410,6 +446,23 @@ class JobQueue { return total } + private buildStateFilter (state?: JobState) { + if (!state) return jobStates + + const states = [ state ] + + // Include parent if filtering on waiting + if (state === 'waiting') states.push('waiting-children') + + return states + } + + private buildTypeFilter (jobType?: JobType) { + if (!jobType) return jobTypes + + return jobTypes.filter(t => t === jobType) + } + async getStats () { const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) @@ -421,32 +474,27 @@ class JobQueue { async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue: Queue = this.queues[key] - await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') + await queue.clean(parseDurationToMs('7 days'), 1000, 'completed') + await queue.clean(parseDurationToMs('7 days'), 1000, 'failed') } } - waitJob (job: Job) { - return job.waitUntilFinished(this.queueEvents[job.queueName]) - } - private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { - repeat: REPEAT_JOBS['videos-views-stats'] + repeat: REPEAT_JOBS['videos-views-stats'], + + ...this.buildJobRemovalOptions('videos-views-stats') }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { this.queues['activitypub-cleaner'].add('job', {}, { - repeat: REPEAT_JOBS['activitypub-cleaner'] + repeat: REPEAT_JOBS['activitypub-cleaner'], + + ...this.buildJobRemovalOptions('activitypub-cleaner') }).catch(err => logger.error('Cannot add repeatable job.', { err })) } } - private filterJobTypes (jobType?: JobType) { - if (!jobType) return jobTypes - - return jobTypes.filter(t => t === jobType) - } - private getJobConcurrency (jobType: JobType) { if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY @@ -454,6 +502,23 @@ class JobQueue { return JOB_CONCURRENCY[jobType] } + private buildJobRemovalOptions (queueName: string) { + return { + removeOnComplete: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT + }, + removeOnFail: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT / 1000 + } + } + } + static get Instance () { return this.instance || (this.instance = new this()) }