X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=8597eb00018356dc081237b0f48f57cb7f8d6415;hb=1c30b112b9860255bdb458482c8dba9432419c49;hp=655be6568734f4293bd8086c11fb13b0404d38d7;hpb=4404a7c467a2c6863728127eeff5ca4b59619940;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 655be6568..8597eb000 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -41,8 +41,16 @@ import { VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { + JOB_ATTEMPTS, + JOB_CONCURRENCY, + JOB_REMOVAL_OPTIONS, + JOB_TTL, + REPEAT_JOBS, + WEBSERVER +} from '../../initializers/constants' import { Hooks } from '../plugins/hooks' +import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' @@ -63,6 +71,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' +import { parseDurationToMs } from '@server/helpers/core-utils' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -175,7 +184,7 @@ class JobQueue { this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - for (const handlerName of (Object.keys(handlers) as JobType[])) { + for (const handlerName of Object.keys(handlers)) { this.buildWorker(handlerName) this.buildQueue(handlerName) this.buildQueueScheduler(handlerName) @@ -183,7 +192,7 @@ class JobQueue { } this.flowProducer = new FlowProducer({ - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('FlowProducer'), prefix: this.jobRedisPrefix }) this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) @@ -196,7 +205,7 @@ class JobQueue { autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - connection: this.getRedisConnection() + connection: Redis.getRedisClientOptions('Worker') } const handler = function (job: Job) { @@ -236,7 +245,7 @@ class JobQueue { private buildQueue (handlerName: JobType) { const queueOptions: QueueOptions = { - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('Queue'), prefix: this.jobRedisPrefix } @@ -249,7 +258,7 @@ class JobQueue { private buildQueueScheduler (handlerName: JobType) { const queueSchedulerOptions: QueueSchedulerOptions = { autorun: false, - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('QueueScheduler'), prefix: this.jobRedisPrefix, maxStalledCount: 10 } @@ -263,7 +272,7 @@ class JobQueue { private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { autorun: false, - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('QueueEvent'), prefix: this.jobRedisPrefix } @@ -273,16 +282,6 @@ class JobQueue { this.queueEvents[handlerName] = queueEvents } - private getRedisConnection () { - return { - password: CONFIG.REDIS.AUTH, - db: CONFIG.REDIS.DB, - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - path: CONFIG.REDIS.SOCKET - } - } - // --------------------------------------------------------------------------- async terminate () { @@ -382,7 +381,7 @@ class JobQueue { }) } - private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { return { name: 'job', data: job.payload, @@ -396,7 +395,9 @@ class JobQueue { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[type], priority: options.priority, - delay: options.delay + delay: options.delay, + + ...this.buildJobRemovalOptions(type) } } @@ -491,18 +492,23 @@ class JobQueue { async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue: Queue = this.queues[key] - await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') + await queue.clean(parseDurationToMs('7 days'), 1000, 'completed') + await queue.clean(parseDurationToMs('7 days'), 1000, 'failed') } } private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { - repeat: REPEAT_JOBS['videos-views-stats'] + repeat: REPEAT_JOBS['videos-views-stats'], + + ...this.buildJobRemovalOptions('videos-views-stats') }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { this.queues['activitypub-cleaner'].add('job', {}, { - repeat: REPEAT_JOBS['activitypub-cleaner'] + repeat: REPEAT_JOBS['activitypub-cleaner'], + + ...this.buildJobRemovalOptions('activitypub-cleaner') }).catch(err => logger.error('Cannot add repeatable job.', { err })) } } @@ -514,6 +520,23 @@ class JobQueue { return JOB_CONCURRENCY[jobType] } + private buildJobRemovalOptions (queueName: string) { + return { + removeOnComplete: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT + }, + removeOnFail: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT / 1000 + } + } + } + static get Instance () { return this.instance || (this.instance = new this()) }