From c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 23 Nov 2022 17:13:01 +0100 Subject: [PATCH] Delete completed/failed jobs directly from bullmq --- server/initializers/constants.ts | 22 ++++++++- server/lib/job-queue/job-queue.ts | 46 ++++++++++++++++--- .../schedulers/remove-old-jobs-scheduler.ts | 2 + 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 991fe3e85..c8fa8fa2c 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -239,7 +239,23 @@ const REQUEST_TIMEOUTS = { REDUNDANCY: JOB_TTL['video-redundancy'] } -const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days +const JOB_REMOVAL_OPTIONS = { + COUNT: 10000, // Max jobs to store + + SUCCESS: { // Success jobs + 'DEFAULT': parseDurationToMs('2 days'), + + 'activitypub-http-broadcast-parallel': parseDurationToMs('10 minutes'), + 'activitypub-http-unicast': parseDurationToMs('1 hour'), + 'videos-views-stats': parseDurationToMs('3 hours'), + 'activitypub-refresher': parseDurationToMs('10 hours') + }, + + FAILURE: { // Failed job + DEFAULT: parseDurationToMs('7 days') + } +} + const VIDEO_IMPORT_TIMEOUT = Math.floor(JOB_TTL['video-import'] * 0.9) const SCHEDULER_INTERVALS_MS = { @@ -938,6 +954,8 @@ if (process.env.PRODUCTION_CONSTANTS !== 'true') { OVERVIEWS.VIDEOS.SAMPLE_THRESHOLD = 2 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 + + JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000 } if (isTestInstance()) { @@ -1069,7 +1087,7 @@ export { CRAWL_REQUEST_CONCURRENCY, DEFAULT_AUDIO_RESOLUTION, BINARY_CONTENT_TYPES, - JOB_COMPLETED_LIFETIME, + JOB_REMOVAL_OPTIONS, HTTP_SIGNATURE, VIDEO_IMPORT_STATES, VIDEO_CHANNEL_SYNC_STATE, diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 6bc59732f..91366c481 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -41,8 +41,16 @@ import { VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { + JOB_ATTEMPTS, + JOB_CONCURRENCY, + JOB_REMOVAL_OPTIONS, + JOB_TTL, + REPEAT_JOBS, + WEBSERVER +} from '../../initializers/constants' import { Hooks } from '../plugins/hooks' +import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' @@ -63,7 +71,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' -import { Redis } from '../redis' +import { parseDurationToMs } from '@server/helpers/core-utils' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -373,7 +381,7 @@ class JobQueue { }) } - private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { return { name: 'job', data: job.payload, @@ -387,7 +395,9 @@ class JobQueue { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[type], priority: options.priority, - delay: options.delay + delay: options.delay, + + ...this.buildJobRemovalOptions(type) } } @@ -482,18 +492,23 @@ class JobQueue { async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue: Queue = this.queues[key] - await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') + await queue.clean(parseDurationToMs('7 days'), 100, 'completed') + await queue.clean(parseDurationToMs('7 days'), 100, 'failed') } } private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { - repeat: REPEAT_JOBS['videos-views-stats'] + repeat: REPEAT_JOBS['videos-views-stats'], + + ...this.buildJobRemovalOptions('videos-views-stats') }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { this.queues['activitypub-cleaner'].add('job', {}, { - repeat: REPEAT_JOBS['activitypub-cleaner'] + repeat: REPEAT_JOBS['activitypub-cleaner'], + + ...this.buildJobRemovalOptions('activitypub-cleaner') }).catch(err => logger.error('Cannot add repeatable job.', { err })) } } @@ -505,6 +520,23 @@ class JobQueue { return JOB_CONCURRENCY[jobType] } + private buildJobRemovalOptions (queueName: string) { + return { + removeOnComplete: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT + }, + removeOnFail: { + // Wants seconds + age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, + + count: JOB_REMOVAL_OPTIONS.COUNT / 1000 + } + } + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts index 879846999..6b05ea144 100644 --- a/server/lib/schedulers/remove-old-jobs-scheduler.ts +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts @@ -4,6 +4,8 @@ import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { JobQueue } from '../job-queue' import { AbstractScheduler } from './abstract-scheduler' +// FIXME: delete this scheduler in a few versions (introduced in 5.0) +// We introduced job removal directly using bullmq option but we still need to delete old jobs export class RemoveOldJobsScheduler extends AbstractScheduler { private static instance: AbstractScheduler -- 2.41.0