VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
-import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
+import {
+ JOB_ATTEMPTS,
+ JOB_CONCURRENCY,
+ JOB_REMOVAL_OPTIONS,
+ JOB_TTL,
+ REPEAT_JOBS,
+ WEBSERVER
+} from '../../initializers/constants'
import { Hooks } from '../plugins/hooks'
+import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
-import { Redis } from '../redis'
+import { parseDurationToMs } from '@server/helpers/core-utils'
export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
- for (const handlerName of (Object.keys(handlers) as JobType[])) {
+ for (const handlerName of Object.keys(handlers)) {
this.buildWorker(handlerName)
this.buildQueue(handlerName)
this.buildQueueScheduler(handlerName)
})
}
- private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
+ private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
return {
name: 'job',
data: job.payload,
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[type],
priority: options.priority,
- delay: options.delay
+ delay: options.delay,
+
+ ...this.buildJobRemovalOptions(type)
}
}
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]
- await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
}
}
private addRepeatableJobs () {
this.queues['videos-views-stats'].add('job', {}, {
- repeat: REPEAT_JOBS['videos-views-stats']
+ repeat: REPEAT_JOBS['videos-views-stats'],
+
+ ...this.buildJobRemovalOptions('videos-views-stats')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
this.queues['activitypub-cleaner'].add('job', {}, {
- repeat: REPEAT_JOBS['activitypub-cleaner']
+ repeat: REPEAT_JOBS['activitypub-cleaner'],
+
+ ...this.buildJobRemovalOptions('activitypub-cleaner')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
}
}
return JOB_CONCURRENCY[jobType]
}
+ private buildJobRemovalOptions (queueName: string) {
+ return {
+ removeOnComplete: {
+ // Wants seconds
+ age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
+
+ count: JOB_REMOVAL_OPTIONS.COUNT
+ },
+ removeOnFail: {
+ // Wants seconds
+ age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
+
+ count: JOB_REMOVAL_OPTIONS.COUNT / 1000
+ }
+ }
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}