X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=4c1597b334a7c877500d1d52f4ee52b384aa7a6f;hb=ad5db1044c8599eaaaa2a578b350777ae996b068;hp=42e8347b1f663650c5aed7042c5e02cbd1b9ffcc;hpb=74d249bc1346c7cfaac7ee49bebbebcf2a01f82a;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 42e8347b1..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,4 +1,4 @@ -import * as Bull from 'bull' +import Bull, { Job, JobOptions, Queue } from 'bull' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' @@ -8,9 +8,11 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + DeleteResumableUploadMetaFilePayload, EmailPayload, JobState, JobType, + MoveObjectStoragePayload, RefreshPayload, VideoFileImportPayload, VideoImportPayload, @@ -29,11 +31,12 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' import { processEmail } from './handlers/email' +import { processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoTranscoding } from './handlers/video-transcoding' -import { processVideosViews } from './handlers/video-views' +import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -46,17 +49,19 @@ type CreateJobArgument = { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | - { type: 'videos-views', payload: {} } | + { type: 'videos-views-stats', payload: {} } | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'actor-keys', payload: ActorKeysPayload } | - { type: 'video-redundancy', payload: VideoRedundancyPayload } + { type: 'video-redundancy', payload: VideoRedundancyPayload } | + { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } -type CreateJobOptions = { +export type CreateJobOptions = { delay?: number priority?: number } -const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { +const handlers: { [id in JobType]: (job: Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, @@ -66,11 +71,12 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'video-transcoding': processVideoTranscoding, 'email': processEmail, 'video-import': processVideoImport, - 'videos-views': processVideosViews, + 'videos-views-stats': processVideosViewsStats, 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, 'actor-keys': processActorKeys, - 'video-redundancy': processVideoRedundancy + 'video-redundancy': processVideoRedundancy, + 'move-to-object-storage': processMoveToObjectStorage } const jobTypes: JobType[] = [ @@ -83,25 +89,26 @@ const jobTypes: JobType[] = [ 'video-transcoding', 'video-file-import', 'video-import', - 'videos-views', + 'videos-views-stats', 'activitypub-refresher', 'video-redundancy', 'actor-keys', - 'video-live-ending' + 'video-live-ending', + 'move-to-object-storage' ] class JobQueue { private static instance: JobQueue - private queues: { [id in JobType]?: Bull.Queue } = {} + private queues: { [id in JobType]?: Queue } = {} private initialized = false private jobRedisPrefix: string private constructor () { } - init () { + init (produceOnly = false) { // Already initialized if (this.initialized === true) return this.initialized = true @@ -117,6 +124,12 @@ class JobQueue { for (const handlerName of (Object.keys(handlers) as JobType[])) { const queue = new Bull(handlerName, queueOptions) + + if (produceOnly) { + queue.pause(true) + .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) + } + const handler = handlers[handlerName] queue.process(this.getJobConcurrency(handlerName), handler) @@ -155,7 +168,7 @@ class JobQueue { return } - const jobArgs: Bull.JobOptions = { + const jobArgs: JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], timeout: JOB_TTL[obj.type], @@ -172,11 +185,11 @@ class JobQueue { count: number asc?: boolean jobType: JobType - }): Promise { + }): Promise { const { state, start, count, asc, jobType } = options const states = state ? [ state ] : jobStates - let results: Bull.Job[] = [] + let results: Job[] = [] const filteredJobTypes = this.filterJobTypes(jobType) @@ -234,8 +247,8 @@ class JobQueue { } private addRepeatableJobs () { - this.queues['videos-views'].add({}, { - repeat: REPEAT_JOBS['videos-views'] + this.queues['videos-views-stats'].add({}, { + repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {