X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=50d732bebcf5c05c9df0c1eac2b12d3c6585ab24;hb=bd911b54b555b11df7e9849cf92d358bccfecf6e;hp=0cf5d53ce015d3ece080dad3c44ccd910b1ee84f;hpb=5a921e7b74910414626bfc9672b857e987e3ebed;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0cf5d53ce..50d732beb 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,4 +1,6 @@ import { + FlowJob, + FlowProducer, Job, JobsOptions, Queue, @@ -13,7 +15,7 @@ import { import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' -import { timeoutPromise } from '@shared/core-utils' +import { pick, timeoutPromise } from '@shared/core-utils' import { ActivitypubFollowPayload, ActivitypubHttpBroadcastPayload, @@ -22,10 +24,12 @@ import { ActorKeysPayload, DeleteResumableUploadMetaFilePayload, EmailPayload, + FederateVideoPayload, JobState, JobType, ManageVideoTorrentPayload, MoveObjectStoragePayload, + NotifyPayload, RefreshPayload, VideoFileImportPayload, VideoImportPayload, @@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' import { processEmail } from './handlers/email' +import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' +import { processNotify } from './handlers/notify' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' @@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' -type CreateJobArgument = +export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | @@ -73,7 +79,9 @@ type CreateJobArgument = { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | - { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } + { type: 'notify', payload: NotifyPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | + { type: 'federate-video', payload: FederateVideoPayload } export type CreateJobOptions = { delay?: number @@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-redundancy': processVideoRedundancy, 'move-to-object-storage': processMoveToObjectStorage, 'manage-video-torrent': processManageVideoTorrent, - 'video-studio-edition': processVideoStudioEdition + 'notify': processNotify, + 'video-studio-edition': processVideoStudioEdition, + 'federate-video': processFederateVideo } const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } = { @@ -123,7 +133,9 @@ const jobTypes: JobType[] = [ 'video-live-ending', 'move-to-object-storage', 'manage-video-torrent', - 'video-studio-edition' + 'video-studio-edition', + 'notify', + 'federate-video' ] const silentFailure = new Set([ 'activitypub-http-unicast' ]) @@ -137,6 +149,8 @@ class JobQueue { private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} private queueEvents: { [id in JobType]?: QueueEvents } = {} + private flowProducer: FlowProducer + private initialized = false private jobRedisPrefix: string @@ -157,6 +171,11 @@ class JobQueue { this.buildQueueEvent(handlerName, produceOnly) } + this.flowProducer = new FlowProducer({ + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix + }) + this.addRepeatableJobs() } @@ -243,6 +262,8 @@ class JobQueue { } } + // --------------------------------------------------------------------------- + async terminate () { const promises = Object.keys(this.workers) .map(handlerName => { @@ -278,28 +299,56 @@ class JobQueue { } } - createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { - this.createJobWithPromise(obj, options) - .catch(err => logger.error('Cannot create job.', { err, obj })) + // --------------------------------------------------------------------------- + + createJobAsync (options: CreateJobArgument & CreateJobOptions): void { + this.createJob(options) + .catch(err => logger.error('Cannot create job.', { err, options })) } - async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { - const queue: Queue = this.queues[obj.type] + async createJob (options: CreateJobArgument & CreateJobOptions) { + const queue: Queue = this.queues[options.type] if (queue === undefined) { - logger.error('Unknown queue %s: cannot create job.', obj.type) + logger.error('Unknown queue %s: cannot create job.', options.type) return } - const jobArgs: JobsOptions = { + const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) + + return queue.add('job', options.payload, jobOptions) + } + + async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { + let lastJob: FlowJob + + for (const job of jobs) { + if (!job) continue + + lastJob = { + name: 'job', + data: job.payload, + queueName: job.type, + opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), + children: lastJob + ? [ lastJob ] + : [] + } + } + + return this.flowProducer.add(lastJob) + } + + private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { + return { backoff: { delay: 60 * 1000, type: 'exponential' }, - attempts: JOB_ATTEMPTS[obj.type], + attempts: JOB_ATTEMPTS[type], priority: options.priority, delay: options.delay } - - return queue.add('job', obj.payload, jobArgs) } + // --------------------------------------------------------------------------- + async listForApi (options: { state?: JobState start: number @@ -367,6 +416,8 @@ class JobQueue { return Promise.all(promises) } + // --------------------------------------------------------------------------- + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue: Queue = this.queues[key]