QueueEvents,
QueueEventsOptions,
QueueOptions,
- QueueScheduler,
- QueueSchedulerOptions,
Worker,
WorkerOptions
} from 'bullmq'
+import { parseDurationToMs } from '@server/helpers/core-utils'
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
-import {
- JOB_ATTEMPTS,
- JOB_CONCURRENCY,
- JOB_REMOVAL_OPTIONS,
- JOB_TTL,
- REPEAT_JOBS,
- WEBSERVER
-} from '../../initializers/constants'
+import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import { Hooks } from '../plugins/hooks'
import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
-import { parseDurationToMs } from '@server/helpers/core-utils'
export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
private workers: { [id in JobType]?: Worker } = {}
private queues: { [id in JobType]?: Queue } = {}
- private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
private queueEvents: { [id in JobType]?: QueueEvents } = {}
private flowProducer: FlowProducer
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
- for (const handlerName of (Object.keys(handlers) as JobType[])) {
+ for (const handlerName of Object.keys(handlers)) {
this.buildWorker(handlerName)
this.buildQueue(handlerName)
- this.buildQueueScheduler(handlerName)
this.buildQueueEvent(handlerName)
}
autorun: false,
concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix,
- connection: Redis.getRedisClientOptions('Worker')
+ connection: Redis.getRedisClientOptions('Worker'),
+ maxStalledCount: 10
}
const handler = function (job: Job) {
this.queues[handlerName] = queue
}
- private buildQueueScheduler (handlerName: JobType) {
- const queueSchedulerOptions: QueueSchedulerOptions = {
- autorun: false,
- connection: Redis.getRedisClientOptions('QueueScheduler'),
- prefix: this.jobRedisPrefix,
- maxStalledCount: 10
- }
-
- const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
- queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
-
- this.queueSchedulers[handlerName] = queueScheduler
- }
-
private buildQueueEvent (handlerName: JobType) {
const queueEventsOptions: QueueEventsOptions = {
autorun: false,
.map(handlerName => {
const worker: Worker = this.workers[handlerName]
const queue: Queue = this.queues[handlerName]
- const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([
worker.close(false),
queue.close(),
- queueScheduler.close(),
queueEvent.close()
])
})
const promises = Object.keys(this.workers)
.map(handlerName => {
const worker: Worker = this.workers[handlerName]
- const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([
worker.run(),
- queueScheduler.run(),
queueEvent.run()
])
})
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]
- await queue.clean(parseDurationToMs('7 days'), 100, 'completed')
- await queue.clean(parseDurationToMs('7 days'), 100, 'failed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
+ await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
}
}