X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=8597eb00018356dc081237b0f48f57cb7f8d6415;hb=1c30b112b9860255bdb458482c8dba9432419c49;hp=14e3a00aaa31e564d18065691871bec1348ef1c7;hpb=e2b2c726b1c1c31794d324c3afd7c24e1f953131;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e3a00aa..8597eb000 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -22,6 +22,7 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + AfterVideoChannelImportPayload, DeleteResumableUploadMetaFilePayload, EmailPayload, FederateVideoPayload, @@ -31,6 +32,7 @@ import { MoveObjectStoragePayload, NotifyPayload, RefreshPayload, + VideoChannelImportPayload, VideoFileImportPayload, VideoImportPayload, VideoLiveEndingPayload, @@ -39,26 +41,37 @@ import { VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { + JOB_ATTEMPTS, + JOB_CONCURRENCY, + JOB_REMOVAL_OPTIONS, + JOB_TTL, + REPEAT_JOBS, + WEBSERVER +} from '../../initializers/constants' import { Hooks } from '../plugins/hooks' +import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' -import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' +import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' import { processEmail } from './handlers/email' import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processNotify } from './handlers/notify' +import { processVideoChannelImport } from './handlers/video-channel-import' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' +import { parseDurationToMs } from '@server/helpers/core-utils' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -79,6 +92,9 @@ export type CreateJobArgument = { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | + { type: 'video-channel-import', payload: VideoChannelImportPayload } | + { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | { type: 'notify', payload: NotifyPayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | { type: 'federate-video', payload: FederateVideoPayload } @@ -89,8 +105,8 @@ export type CreateJobOptions = { } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, @@ -106,8 +122,10 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-redundancy': processVideoRedundancy, 'move-to-object-storage': processMoveToObjectStorage, 'manage-video-torrent': processManageVideoTorrent, - 'notify': processNotify, 'video-studio-edition': processVideoStudioEdition, + 'video-channel-import': processVideoChannelImport, + 'after-video-channel-import': processAfterVideoChannelImport, + 'notify': processNotify, 'federate-video': processFederateVideo } @@ -134,6 +152,8 @@ const jobTypes: JobType[] = [ 'move-to-object-storage', 'manage-video-torrent', 'video-studio-edition', + 'video-channel-import', + 'after-video-channel-import', 'notify', 'federate-video' ] @@ -157,34 +177,35 @@ class JobQueue { private constructor () { } - init (produceOnly = false) { + init () { // Already initialized if (this.initialized === true) return this.initialized = true this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - for (const handlerName of (Object.keys(handlers) as JobType[])) { - this.buildWorker(handlerName, produceOnly) + for (const handlerName of Object.keys(handlers)) { + this.buildWorker(handlerName) this.buildQueue(handlerName) - this.buildQueueScheduler(handlerName, produceOnly) - this.buildQueueEvent(handlerName, produceOnly) + this.buildQueueScheduler(handlerName) + this.buildQueueEvent(handlerName) } this.flowProducer = new FlowProducer({ - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('FlowProducer'), prefix: this.jobRedisPrefix }) + this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) this.addRepeatableJobs() } - private buildWorker (handlerName: JobType, produceOnly: boolean) { + private buildWorker (handlerName: JobType) { const workerOptions: WorkerOptions = { - autorun: !produceOnly, + autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - connection: this.getRedisConnection() + connection: Redis.getRedisClientOptions('Worker') } const handler = function (job: Job) { @@ -217,49 +238,48 @@ class JobQueue { } }) - worker.on('error', err => { - logger.error('Error in job queue %s.', handlerName, { err }) - }) + worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) }) this.workers[handlerName] = worker } private buildQueue (handlerName: JobType) { const queueOptions: QueueOptions = { - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('Queue'), prefix: this.jobRedisPrefix } - this.queues[handlerName] = new Queue(handlerName, queueOptions) + const queue = new Queue(handlerName, queueOptions) + queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) }) + + this.queues[handlerName] = queue } - private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { + private buildQueueScheduler (handlerName: JobType) { const queueSchedulerOptions: QueueSchedulerOptions = { - autorun: !produceOnly, - connection: this.getRedisConnection(), + autorun: false, + connection: Redis.getRedisClientOptions('QueueScheduler'), prefix: this.jobRedisPrefix, maxStalledCount: 10 } - this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) + + const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) + queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) + + this.queueSchedulers[handlerName] = queueScheduler } - private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { + private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { - autorun: !produceOnly, - connection: this.getRedisConnection(), + autorun: false, + connection: Redis.getRedisClientOptions('QueueEvent'), prefix: this.jobRedisPrefix } - this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) - } - private getRedisConnection () { - return { - password: CONFIG.REDIS.AUTH, - db: CONFIG.REDIS.DB, - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - path: CONFIG.REDIS.SOCKET - } + const queueEvents = new QueueEvents(handlerName, queueEventsOptions) + queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) }) + + this.queueEvents[handlerName] = queueEvents } // --------------------------------------------------------------------------- @@ -283,23 +303,36 @@ class JobQueue { return Promise.all(promises) } + start () { + const promises = Object.keys(this.workers) + .map(handlerName => { + const worker: Worker = this.workers[handlerName] + const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] + const queueEvent: QueueEvents = this.queueEvents[handlerName] + + return Promise.all([ + worker.run(), + queueScheduler.run(), + queueEvent.run() + ]) + }) + + return Promise.all(promises) + } + async pause () { for (const handlerName of Object.keys(this.workers)) { const worker: Worker = this.workers[handlerName] - const queue: Queue = this.queues[handlerName] await worker.pause() - await queue.pause() } } - async resume () { + resume () { for (const handlerName of Object.keys(this.workers)) { const worker: Worker = this.workers[handlerName] - const queue: Queue = this.queues[handlerName] worker.resume() - await queue.resume() } } @@ -310,7 +343,7 @@ class JobQueue { .catch(err => logger.error('Cannot create job.', { err, options })) } - async createJob (options: CreateJobArgument & CreateJobOptions) { + createJob (options: CreateJobArgument & CreateJobOptions) { const queue: Queue = this.queues[options.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', options.type) @@ -322,7 +355,7 @@ class JobQueue { return queue.add('job', options.payload, jobOptions) } - async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { + createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { let lastJob: FlowJob for (const job of jobs) { @@ -340,7 +373,7 @@ class JobQueue { return this.flowProducer.add(lastJob) } - async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { + createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { return this.flowProducer.add({ ...this.buildJobFlowOption(parent), @@ -348,7 +381,7 @@ class JobQueue { }) } - private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { return { name: 'job', data: job.payload, @@ -362,7 +395,9 @@ class JobQueue { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[type], priority: options.priority, - delay: options.delay + delay: options.delay, + + ...this.buildJobRemovalOptions(type) } } @@ -457,18 +492,23 @@ class JobQueue { async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue: Queue = this.queues[key] - await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') + await queue.clean(parseDurationToMs('7 days'), 1000, 'completed') + await queue.clean(parseDurationToMs('7 days'), 1000, 'failed') } } private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { - repeat: REPEAT_JOBS['videos-views-stats'] + repeat: REPEAT_JOBS['videos-views-stats'], + + ...this.buildJobRemovalOptions('videos-views-stats') }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { this.queues['activitypub-cleaner'].add('job', {}, { - repeat: REPEAT_JOBS['activitypub-cleaner'] + repeat: REPEAT_JOBS['activitypub-cleaner'], + + ...this.buildJobRemovalOptions('activitypub-cleaner') }).catch(err => logger.error('Cannot add repeatable job.', { err })) } } @@ -480,6 +520,23 @@ class JobQueue { return JOB_CONCURRENCY[jobType] } + private buildJobRemovalOptions (queueName: string) { + return { + removeOnComplete: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT + }, + removeOnFail: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT / 1000 + } + } + } + static get Instance () { return this.instance || (this.instance = new this()) }