X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=3224abcc3ac04bff9de55df6184e77934f23c6b4;hb=1808a1f8e4b7b102823492a2007a46929aebf189;hp=14e181835a381bf8cde10dffcf5091f297bcb471;hpb=610d0be13b3d01f653ef269271dd667a57c85ef2;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e181835..3224abcc3 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,58 +1,89 @@ -import * as Bull from 'bull' +import Bull, { Job, JobOptions, Queue } from 'bull' +import { jobStates } from '@server/helpers/custom-validators/jobs' +import { CONFIG } from '@server/initializers/config' +import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' import { ActivitypubFollowPayload, ActivitypubHttpBroadcastPayload, ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, + ActorKeysPayload, + DeleteResumableUploadMetaFilePayload, EmailPayload, JobState, JobType, + ManageVideoTorrentPayload, + MoveObjectStoragePayload, RefreshPayload, + VideoEditionPayload, VideoFileImportPayload, VideoImportPayload, + VideoLiveEndingPayload, VideoRedundancyPayload, VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { Redis } from '../redis' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' +import { processActivityPubCleaner } from './handlers/activitypub-cleaner' +import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' -import { processEmail } from './handlers/email' -import { processVideoTranscoding } from './handlers/video-transcoding' -import { processActivityPubFollow } from './handlers/activitypub-follow' -import { processVideoImport } from './handlers/video-import' -import { processVideosViews } from './handlers/video-views' import { refreshAPObject } from './handlers/activitypub-refresher' +import { processActorKeys } from './handlers/actor-keys' +import { processEmail } from './handlers/email' +import { processManageVideoTorrent } from './handlers/manage-video-torrent' +import { processMoveToObjectStorage } from './handlers/move-to-object-storage' +import { processVideoEdition } from './handlers/video-edition' import { processVideoFileImport } from './handlers/video-file-import' -import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { processVideoImport } from './handlers/video-import' +import { processVideoLiveEnding } from './handlers/video-live-ending' +import { processVideoTranscoding } from './handlers/video-transcoding' +import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | + { type: 'activitypub-http-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | - { type: 'videos-views', payload: {} } | - { type: 'video-redundancy', payload: VideoRedundancyPayload } + { type: 'videos-views-stats', payload: {} } | + { type: 'video-live-ending', payload: VideoLiveEndingPayload } | + { type: 'actor-keys', payload: ActorKeysPayload } | + { type: 'video-redundancy', payload: VideoRedundancyPayload } | + { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | + { type: 'video-edition', payload: VideoEditionPayload } | + { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } + +export type CreateJobOptions = { + delay?: number + priority?: number +} -const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { +const handlers: { [id in JobType]: (job: Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, 'video-transcoding': processVideoTranscoding, 'email': processEmail, 'video-import': processVideoImport, - 'videos-views': processVideosViews, + 'videos-views-stats': processVideosViewsStats, 'activitypub-refresher': refreshAPObject, - 'video-redundancy': processVideoRedundancy + 'video-live-ending': processVideoLiveEnding, + 'actor-keys': processActorKeys, + 'video-redundancy': processVideoRedundancy, + 'move-to-object-storage': processMoveToObjectStorage, + 'manage-video-torrent': processManageVideoTorrent, + 'video-edition': processVideoEdition } const jobTypes: JobType[] = [ @@ -60,45 +91,64 @@ const jobTypes: JobType[] = [ 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', + 'activitypub-cleaner', 'email', 'video-transcoding', 'video-file-import', 'video-import', - 'videos-views', + 'videos-views-stats', 'activitypub-refresher', - 'video-redundancy' + 'video-redundancy', + 'actor-keys', + 'video-live-ending', + 'move-to-object-storage', + 'manage-video-torrent', + 'video-edition' ] class JobQueue { private static instance: JobQueue - private queues: { [id in JobType]?: Bull.Queue } = {} + private queues: { [id in JobType]?: Queue } = {} private initialized = false private jobRedisPrefix: string private constructor () { } - init () { + init (produceOnly = false) { // Already initialized if (this.initialized === true) return this.initialized = true this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - const queueOptions = { + + const queueOptions: Bull.QueueOptions = { prefix: this.jobRedisPrefix, - redis: Redis.getRedisClientOptions(), + redis: { + password: CONFIG.REDIS.AUTH, + db: CONFIG.REDIS.DB, + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + path: CONFIG.REDIS.SOCKET + }, settings: { maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts } } - for (const handlerName of Object.keys(handlers)) { + for (const handlerName of (Object.keys(handlers) as JobType[])) { const queue = new Bull(handlerName, queueOptions) + + if (produceOnly) { + queue.pause(true) + .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) + } + const handler = handlers[handlerName] - queue.process(JOB_CONCURRENCY[handlerName], handler) + queue.process(this.getJobConcurrency(handlerName), handler) .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) queue.on('failed', (job, err) => { @@ -122,36 +172,52 @@ class JobQueue { } } - createJob (obj: CreateJobArgument): void { - this.createJobWithPromise(obj) + async pause () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].pause(true) + } + } + + async resume () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].resume(true) + } + } + + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { + this.createJobWithPromise(obj, options) .catch(err => logger.error('Cannot create job.', { err, obj })) } - createJobWithPromise (obj: CreateJobArgument) { - const queue = this.queues[obj.type] + createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { + const queue: Queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) return } - const jobArgs: Bull.JobOptions = { + const jobArgs: JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], - timeout: JOB_TTL[obj.type] + timeout: JOB_TTL[obj.type], + priority: options.priority, + delay: options.delay } return queue.add(obj.payload, jobArgs) } async listForApi (options: { - state: JobState + state?: JobState start: number count: number asc?: boolean jobType: JobType - }): Promise { + }): Promise { const { state, start, count, asc, jobType } = options - let results: Bull.Job[] = [] + + const states = state ? [ state ] : jobStates + let results: Job[] = [] const filteredJobTypes = this.filterJobTypes(jobType) @@ -162,7 +228,7 @@ class JobQueue { continue } - const jobs = await queue.getJobs([ state ], 0, start + count, asc) + const jobs = await queue.getJobs(states, 0, start + count, asc) results = results.concat(jobs) } @@ -179,6 +245,7 @@ class JobQueue { } async count (state: JobState, jobType?: JobType): Promise { + const states = state ? [ state ] : jobStates let total = 0 const filteredJobTypes = this.filterJobTypes(jobType) @@ -192,7 +259,9 @@ class JobQueue { const counts = await queue.getJobCounts() - total += counts[state] + for (const s of states) { + total += counts[s] + } } return total @@ -206,9 +275,15 @@ class JobQueue { } private addRepeatableJobs () { - this.queues['videos-views'].add({}, { - repeat: REPEAT_JOBS['videos-views'] + this.queues['videos-views-stats'].add({}, { + repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) + + if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { + this.queues['activitypub-cleaner'].add({}, { + repeat: REPEAT_JOBS['activitypub-cleaner'] + }).catch(err => logger.error('Cannot add repeatable job.', { err })) + } } private filterJobTypes (jobType?: JobType) { @@ -217,6 +292,13 @@ class JobQueue { return jobTypes.filter(t => t === jobType) } + private getJobConcurrency (jobType: JobType) { + if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY + if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY + + return JOB_CONCURRENCY[jobType] + } + static get Instance () { return this.instance || (this.instance = new this()) }