X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=53d6b6a9cfb8a34c46ee2f9b660f0aa388c6aea5;hb=276250f0a36e00373166d91d539e5220d6f158c7;hp=7a3a1bf8226becb155d99ae2e2df26ab29a45cd5;hpb=0305db28c98fd6cf43a3c50ba92c76215e99d512;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 7a3a1bf82..53d6b6a9c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,4 +1,4 @@ -import * as Bull from 'bull' +import Bull, { Job, JobOptions, Queue } from 'bull' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' @@ -8,6 +8,7 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + DeleteResumableUploadMetaFilePayload, EmailPayload, JobState, JobType, @@ -30,12 +31,12 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' import { processEmail } from './handlers/email' +import { processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViews } from './handlers/video-views' -import { processMoveToObjectStorage } from './handlers/move-to-object-storage' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -52,6 +53,7 @@ type CreateJobArgument = { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'actor-keys', payload: ActorKeysPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } | + { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } export type CreateJobOptions = { @@ -59,7 +61,7 @@ export type CreateJobOptions = { priority?: number } -const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { +const handlers: { [id in JobType]: (job: Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, @@ -99,7 +101,7 @@ class JobQueue { private static instance: JobQueue - private queues: { [id in JobType]?: Bull.Queue } = {} + private queues: { [id in JobType]?: Queue } = {} private initialized = false private jobRedisPrefix: string @@ -160,7 +162,7 @@ class JobQueue { return } - const jobArgs: Bull.JobOptions = { + const jobArgs: JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], timeout: JOB_TTL[obj.type], @@ -177,11 +179,11 @@ class JobQueue { count: number asc?: boolean jobType: JobType - }): Promise { + }): Promise { const { state, start, count, asc, jobType } = options const states = state ? [ state ] : jobStates - let results: Bull.Job[] = [] + let results: Job[] = [] const filteredJobTypes = this.filterJobTypes(jobType)