X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=8ff0c169e092276543b2eaa5dfbaf6aa12906787;hb=4a9e71c2b1ef57de01cd04984348b3957ebbc21d;hp=acc69ef2459c24165628001251a45013e8be7fb0;hpb=71e3dfda4e2bcc228415c0d66b09a84bb73dddd1;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index acc69ef24..8ff0c169e 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,13 +1,13 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' import { Redis } from '../redis' +import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' -import { processVideoFile, VideoFilePayload } from './handlers/video-file' +import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' type CreateJobArgument = @@ -15,30 +15,42 @@ type CreateJobArgument = { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | + { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } -const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { +const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-follow': processActivityPubFollow, + 'video-file-import': processVideoFileImport, 'video-file': processVideoFile, 'email': processEmail } -const jobsWithTLL: JobType[] = [ +const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { + 'activitypub-http-broadcast': true, + 'activitypub-http-unicast': true, + 'activitypub-http-fetcher': true, + 'activitypub-follow': true +} + +const jobTypes: JobType[] = [ + 'activitypub-follow', 'activitypub-http-broadcast', - 'activitypub-http-unicast', 'activitypub-http-fetcher', - 'activitypub-follow' + 'activitypub-http-unicast', + 'email', + 'video-file', + 'video-file-import' ] class JobQueue { private static instance: JobQueue - private jobQueue: kue.Queue + private queues: { [ id in JobType ]?: Bull.Queue } = {} private initialized = false private jobRedisPrefix: string @@ -49,128 +61,108 @@ class JobQueue { if (this.initialized === true) return this.initialized = true - this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST - - this.jobQueue = kue.createQueue({ + this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST + const queueOptions = { prefix: this.jobRedisPrefix, - redis: { - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - auth: CONFIG.REDIS.AUTH + redis: Redis.getRedisClient(), + settings: { + maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts } - }) - - this.jobQueue.setMaxListeners(20) + } - this.jobQueue.on('error', err => { - logger.error('Error in job queue.', { err }) - process.exit(-1) - }) - this.jobQueue.watchStuckJobs(5000) + for (const handlerName of Object.keys(handlers)) { + const queue = new Bull(handlerName, queueOptions) + const handler = handlers[handlerName] - await this.reactiveStuckJobs() + queue.process(JOB_CONCURRENCY[handlerName], handler) + .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) - for (const handlerName of Object.keys(handlers)) { - this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { - try { - const res = await handlers[ handlerName ](job) - return done(null, res) - } catch (err) { - return done(err) - } + queue.on('error', err => { + logger.error('Error in job queue %s.', handlerName, { err }) + process.exit(-1) }) + + this.queues[handlerName] = queue } } - createJob (obj: CreateJobArgument, priority = 'normal') { - return new Promise((res, rej) => { - let job = this.jobQueue - .create(obj.type, obj.payload) - .priority(priority) - .attempts(JOB_ATTEMPTS[obj.type]) - .backoff({ delay: 60 * 1000, type: 'exponential' }) + terminate () { + for (const queueName of Object.keys(this.queues)) { + const queue = this.queues[queueName] + queue.close() + } + } - if (jobsWithTLL.indexOf(obj.type) !== -1) { - job = job.ttl(JOB_REQUEST_TTL) - } + createJob (obj: CreateJobArgument) { + const queue = this.queues[obj.type] + if (queue === undefined) { + logger.error('Unknown queue %s: cannot create job.', obj.type) + throw Error('Unknown queue, cannot create job') + } - return job.save(err => { - if (err) return rej(err) + const jobArgs: Bull.JobOptions = { + backoff: { delay: 60 * 1000, type: 'exponential' }, + attempts: JOB_ATTEMPTS[obj.type] + } - return res() - }) - }) + if (jobsWithRequestTimeout[obj.type] === true) { + jobArgs.timeout = JOB_REQUEST_TTL + } + + return queue.add(obj.payload, jobArgs) } - async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise { - const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) + async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise { + let results: Bull.Job[] = [] - const jobPromises = jobStrings - .map(s => s.split('|')) - .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) + // TODO: optimize + for (const jobType of jobTypes) { + const queue = this.queues[ jobType ] + if (queue === undefined) { + logger.error('Unknown queue %s to list jobs.', jobType) + continue + } - return Promise.all(jobPromises) - } + // FIXME: Bull queue typings does not have getJobs method + const jobs = await (queue as any).getJobs(state, 0, start + count, asc) + results = results.concat(jobs) + } - count (state: JobState) { - return new Promise((res, rej) => { - this.jobQueue[state + 'Count']((err, total) => { - if (err) return rej(err) + results.sort((j1: any, j2: any) => { + if (j1.timestamp < j2.timestamp) return -1 + else if (j1.timestamp === j2.timestamp) return 0 - return res(total) - }) + return 1 }) - } - removeOldJobs () { - const now = new Date().getTime() - kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { - if (err) { - logger.error('Cannot get jobs when removing old jobs.', { err }) - return - } + if (asc === false) results.reverse() - for (const job of jobs) { - if (now - job.created_at > JOB_COMPLETED_LIFETIME) { - job.remove() - } - } - }) + return results.slice(start, start + count) } - private reactiveStuckJobs () { - const promises: Promise[] = [] + async count (state: JobState): Promise { + let total = 0 - this.jobQueue.active((err, ids) => { - if (err) throw err - - for (const id of ids) { - kue.Job.get(id, (err, job) => { - if (err) throw err + for (const type of jobTypes) { + const queue = this.queues[ type ] + if (queue === undefined) { + logger.error('Unknown queue %s to count jobs.', type) + continue + } - const p = new Promise((res, rej) => { - job.inactive(err => { - if (err) return rej(err) - return res() - }) - }) + const counts = await queue.getJobCounts() - promises.push(p) - }) - } - }) + total += counts[ state ] + } - return Promise.all(promises) + return total } - private getJob (id: number) { - return new Promise((res, rej) => { - kue.Job.get(id, (err, job) => { - if (err) return rej(err) - - return res(job) - }) - }) + removeOldJobs () { + for (const key of Object.keys(this.queues)) { + const queue = this.queues[key] + queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + } } static get Instance () {