X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=3224abcc3ac04bff9de55df6184e77934f23c6b4;hb=1808a1f8e4b7b102823492a2007a46929aebf189;hp=1dc28755e5d90c10ccdec6416066311c99347175;hpb=d5b7d9110dd637a7f67ce9e430145314812a8df1;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 1dc28755e..3224abcc3 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,161 +1,302 @@ -import * as kue from 'kue' -import { JobState, JobType } from '../../../shared/models' +import Bull, { Job, JobOptions, Queue } from 'bull' +import { jobStates } from '@server/helpers/custom-validators/jobs' +import { CONFIG } from '@server/initializers/config' +import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { + ActivitypubFollowPayload, + ActivitypubHttpBroadcastPayload, + ActivitypubHttpFetcherPayload, + ActivitypubHttpUnicastPayload, + ActorKeysPayload, + DeleteResumableUploadMetaFilePayload, + EmailPayload, + JobState, + JobType, + ManageVideoTorrentPayload, + MoveObjectStoragePayload, + RefreshPayload, + VideoEditionPayload, + VideoFileImportPayload, + VideoImportPayload, + VideoLiveEndingPayload, + VideoRedundancyPayload, + VideoTranscodingPayload +} from '../../../shared/models' import { logger } from '../../helpers/logger' -import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' -import { Redis } from '../redis' -import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' -import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' -import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' -import { EmailPayload, processEmail } from './handlers/email' -import { processVideoFile, VideoFilePayload } from './handlers/video-file' +import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { processActivityPubCleaner } from './handlers/activitypub-cleaner' +import { processActivityPubFollow } from './handlers/activitypub-follow' +import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' +import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' +import { refreshAPObject } from './handlers/activitypub-refresher' +import { processActorKeys } from './handlers/actor-keys' +import { processEmail } from './handlers/email' +import { processManageVideoTorrent } from './handlers/manage-video-torrent' +import { processMoveToObjectStorage } from './handlers/move-to-object-storage' +import { processVideoEdition } from './handlers/video-edition' +import { processVideoFileImport } from './handlers/video-file-import' +import { processVideoImport } from './handlers/video-import' +import { processVideoLiveEnding } from './handlers/video-live-ending' +import { processVideoTranscoding } from './handlers/video-transcoding' +import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | - { type: 'video-file', payload: VideoFilePayload } | - { type: 'email', payload: EmailPayload } + { type: 'activitypub-http-cleaner', payload: {} } | + { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | + { type: 'video-file-import', payload: VideoFileImportPayload } | + { type: 'video-transcoding', payload: VideoTranscodingPayload } | + { type: 'email', payload: EmailPayload } | + { type: 'video-import', payload: VideoImportPayload } | + { type: 'activitypub-refresher', payload: RefreshPayload } | + { type: 'videos-views-stats', payload: {} } | + { type: 'video-live-ending', payload: VideoLiveEndingPayload } | + { type: 'actor-keys', payload: ActorKeysPayload } | + { type: 'video-redundancy', payload: VideoRedundancyPayload } | + { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | + { type: 'video-edition', payload: VideoEditionPayload } | + { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } + +export type CreateJobOptions = { + delay?: number + priority?: number +} -const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { +const handlers: { [id in JobType]: (job: Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, - 'video-file': processVideoFile, - 'email': processEmail + 'activitypub-cleaner': processActivityPubCleaner, + 'activitypub-follow': processActivityPubFollow, + 'video-file-import': processVideoFileImport, + 'video-transcoding': processVideoTranscoding, + 'email': processEmail, + 'video-import': processVideoImport, + 'videos-views-stats': processVideosViewsStats, + 'activitypub-refresher': refreshAPObject, + 'video-live-ending': processVideoLiveEnding, + 'actor-keys': processActorKeys, + 'video-redundancy': processVideoRedundancy, + 'move-to-object-storage': processMoveToObjectStorage, + 'manage-video-torrent': processManageVideoTorrent, + 'video-edition': processVideoEdition } +const jobTypes: JobType[] = [ + 'activitypub-follow', + 'activitypub-http-broadcast', + 'activitypub-http-fetcher', + 'activitypub-http-unicast', + 'activitypub-cleaner', + 'email', + 'video-transcoding', + 'video-file-import', + 'video-import', + 'videos-views-stats', + 'activitypub-refresher', + 'video-redundancy', + 'actor-keys', + 'video-live-ending', + 'move-to-object-storage', + 'manage-video-torrent', + 'video-edition' +] + class JobQueue { private static instance: JobQueue - private jobQueue: kue.Queue + private queues: { [id in JobType]?: Queue } = {} private initialized = false private jobRedisPrefix: string - private constructor () {} + private constructor () { + } - async init () { + init (produceOnly = false) { // Already initialized if (this.initialized === true) return this.initialized = true - this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST + this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - this.jobQueue = kue.createQueue({ + const queueOptions: Bull.QueueOptions = { prefix: this.jobRedisPrefix, redis: { + password: CONFIG.REDIS.AUTH, + db: CONFIG.REDIS.DB, host: CONFIG.REDIS.HOSTNAME, port: CONFIG.REDIS.PORT, - auth: CONFIG.REDIS.AUTH + path: CONFIG.REDIS.SOCKET + }, + settings: { + maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts } - }) + } - this.jobQueue.setMaxListeners(15) + for (const handlerName of (Object.keys(handlers) as JobType[])) { + const queue = new Bull(handlerName, queueOptions) - this.jobQueue.on('error', err => { - logger.error('Error in job queue.', { err }) - process.exit(-1) - }) - this.jobQueue.watchStuckJobs(5000) - - await this.reactiveStuckJobs() - - for (const handlerName of Object.keys(handlers)) { - this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { - try { - const res = await handlers[ handlerName ](job) - return done(null, res) - } catch (err) { - return done(err) - } + if (produceOnly) { + queue.pause(true) + .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) + } + + const handler = handlers[handlerName] + + queue.process(this.getJobConcurrency(handlerName), handler) + .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + + queue.on('failed', (job, err) => { + logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) + }) + + queue.on('error', err => { + logger.error('Error in job queue %s.', handlerName, { err }) }) + + this.queues[handlerName] = queue } + + this.addRepeatableJobs() } - createJob (obj: CreateJobArgument, priority = 'normal') { - return new Promise((res, rej) => { - this.jobQueue - .create(obj.type, obj.payload) - .priority(priority) - .attempts(JOB_ATTEMPTS[obj.type]) - .backoff({ delay: 60 * 1000, type: 'exponential' }) - .save(err => { - if (err) return rej(err) - - return res() - }) - }) + terminate () { + for (const queueName of Object.keys(this.queues)) { + const queue = this.queues[queueName] + queue.close() + } } - async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise { - const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) + async pause () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].pause(true) + } + } - const jobPromises = jobStrings - .map(s => s.split('|')) - .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) + async resume () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].resume(true) + } + } - return Promise.all(jobPromises) + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { + this.createJobWithPromise(obj, options) + .catch(err => logger.error('Cannot create job.', { err, obj })) } - count (state: JobState) { - return new Promise((res, rej) => { - this.jobQueue[state + 'Count']((err, total) => { - if (err) return rej(err) + createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { + const queue: Queue = this.queues[obj.type] + if (queue === undefined) { + logger.error('Unknown queue %s: cannot create job.', obj.type) + return + } - return res(total) - }) - }) + const jobArgs: JobOptions = { + backoff: { delay: 60 * 1000, type: 'exponential' }, + attempts: JOB_ATTEMPTS[obj.type], + timeout: JOB_TTL[obj.type], + priority: options.priority, + delay: options.delay + } + + return queue.add(obj.payload, jobArgs) } - removeOldJobs () { - const now = new Date().getTime() - kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { - if (err) { - logger.error('Cannot get jobs when removing old jobs.', { err }) - return + async listForApi (options: { + state?: JobState + start: number + count: number + asc?: boolean + jobType: JobType + }): Promise { + const { state, start, count, asc, jobType } = options + + const states = state ? [ state ] : jobStates + let results: Job[] = [] + + const filteredJobTypes = this.filterJobTypes(jobType) + + for (const jobType of filteredJobTypes) { + const queue = this.queues[jobType] + if (queue === undefined) { + logger.error('Unknown queue %s to list jobs.', jobType) + continue } - for (const job of jobs) { - if (now - job.created_at > JOB_COMPLETED_LIFETIME) { - job.remove() - } - } + const jobs = await queue.getJobs(states, 0, start + count, asc) + results = results.concat(jobs) + } + + results.sort((j1: any, j2: any) => { + if (j1.timestamp < j2.timestamp) return -1 + else if (j1.timestamp === j2.timestamp) return 0 + + return 1 }) + + if (asc === false) results.reverse() + + return results.slice(start, start + count) } - private reactiveStuckJobs () { - const promises: Promise[] = [] + async count (state: JobState, jobType?: JobType): Promise { + const states = state ? [ state ] : jobStates + let total = 0 - this.jobQueue.active((err, ids) => { - if (err) throw err + const filteredJobTypes = this.filterJobTypes(jobType) - for (const id of ids) { - kue.Job.get(id, (err, job) => { - if (err) throw err + for (const type of filteredJobTypes) { + const queue = this.queues[type] + if (queue === undefined) { + logger.error('Unknown queue %s to count jobs.', type) + continue + } - const p = new Promise((res, rej) => { - job.inactive(err => { - if (err) return rej(err) - return res() - }) - }) + const counts = await queue.getJobCounts() - promises.push(p) - }) + for (const s of states) { + total += counts[s] } - }) + } - return Promise.all(promises) + return total } - private getJob (id: number) { - return new Promise((res, rej) => { - kue.Job.get(id, (err, job) => { - if (err) return rej(err) + async removeOldJobs () { + for (const key of Object.keys(this.queues)) { + const queue = this.queues[key] + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + } + } - return res(job) - }) - }) + private addRepeatableJobs () { + this.queues['videos-views-stats'].add({}, { + repeat: REPEAT_JOBS['videos-views-stats'] + }).catch(err => logger.error('Cannot add repeatable job.', { err })) + + if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { + this.queues['activitypub-cleaner'].add({}, { + repeat: REPEAT_JOBS['activitypub-cleaner'] + }).catch(err => logger.error('Cannot add repeatable job.', { err })) + } + } + + private filterJobTypes (jobType?: JobType) { + if (!jobType) return jobTypes + + return jobTypes.filter(t => t === jobType) + } + + private getJobConcurrency (jobType: JobType) { + if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY + if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY + + return JOB_CONCURRENCY[jobType] } static get Instance () { @@ -166,5 +307,6 @@ class JobQueue { // --------------------------------------------------------------------------- export { + jobTypes, JobQueue }