X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=03f6fbea75a382f1f8b656d4e8525cf9dbf0c87d;hb=26818a73ba0d7fd53ca69eba0c8e525f3670b5a8;hp=8597eb00018356dc081237b0f48f57cb7f8d6415;hpb=a2be43f5700460d3afdc194abc788690b79e66cd;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 8597eb000..03f6fbea7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -7,11 +7,10 @@ import { QueueEvents, QueueEventsOptions, QueueOptions, - QueueScheduler, - QueueSchedulerOptions, Worker, WorkerOptions } from 'bullmq' +import { parseDurationToMs } from '@server/helpers/core-utils' import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' @@ -32,6 +31,7 @@ import { MoveObjectStoragePayload, NotifyPayload, RefreshPayload, + TranscodingJobBuilderPayload, VideoChannelImportPayload, VideoFileImportPayload, VideoImportPayload, @@ -41,14 +41,7 @@ import { VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' -import { - JOB_ATTEMPTS, - JOB_CONCURRENCY, - JOB_REMOVAL_OPTIONS, - JOB_TTL, - REPEAT_JOBS, - WEBSERVER -} from '../../initializers/constants' +import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { Hooks } from '../plugins/hooks' import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' @@ -64,6 +57,7 @@ import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processNotify } from './handlers/notify' +import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' import { processVideoChannelImport } from './handlers/video-channel-import' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' @@ -71,18 +65,18 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' -import { parseDurationToMs } from '@server/helpers/core-utils' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | - { type: 'activitypub-http-cleaner', payload: {} } | + { type: 'activitypub-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | + { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views-stats', payload: {} } | @@ -102,31 +96,33 @@ export type CreateJobArgument = export type CreateJobOptions = { delay?: number priority?: number + failParentOnFailure?: boolean } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, - 'activitypub-http-unicast': processActivityPubHttpUnicast, - 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, - 'video-file-import': processVideoFileImport, - 'video-transcoding': processVideoTranscoding, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-http-unicast': processActivityPubHttpUnicast, + 'activitypub-refresher': refreshAPObject, + 'actor-keys': processActorKeys, + 'after-video-channel-import': processAfterVideoChannelImport, 'email': processEmail, + 'federate-video': processFederateVideo, + 'transcoding-job-builder': processTranscodingJobBuilder, + 'manage-video-torrent': processManageVideoTorrent, + 'move-to-object-storage': processMoveToObjectStorage, + 'notify': processNotify, + 'video-channel-import': processVideoChannelImport, + 'video-file-import': processVideoFileImport, 'video-import': processVideoImport, - 'videos-views-stats': processVideosViewsStats, - 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, - 'actor-keys': processActorKeys, 'video-redundancy': processVideoRedundancy, - 'move-to-object-storage': processMoveToObjectStorage, - 'manage-video-torrent': processManageVideoTorrent, 'video-studio-edition': processVideoStudioEdition, - 'video-channel-import': processVideoChannelImport, - 'after-video-channel-import': processAfterVideoChannelImport, - 'notify': processNotify, - 'federate-video': processFederateVideo + 'video-transcoding': processVideoTranscoding, + 'videos-views-stats': processVideosViewsStats } const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } = { @@ -134,28 +130,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } } const jobTypes: JobType[] = [ + 'activitypub-cleaner', 'activitypub-follow', - 'activitypub-http-broadcast', 'activitypub-http-broadcast-parallel', + 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', - 'activitypub-cleaner', + 'activitypub-refresher', + 'actor-keys', + 'after-video-channel-import', 'email', - 'video-transcoding', + 'federate-video', + 'transcoding-job-builder', + 'manage-video-torrent', + 'move-to-object-storage', + 'notify', + 'video-channel-import', 'video-file-import', 'video-import', - 'videos-views-stats', - 'activitypub-refresher', - 'video-redundancy', - 'actor-keys', 'video-live-ending', - 'move-to-object-storage', - 'manage-video-torrent', + 'video-redundancy', 'video-studio-edition', - 'video-channel-import', - 'after-video-channel-import', - 'notify', - 'federate-video' + 'video-transcoding', + 'videos-views-stats' ] const silentFailure = new Set([ 'activitypub-http-unicast' ]) @@ -166,7 +163,6 @@ class JobQueue { private workers: { [id in JobType]?: Worker } = {} private queues: { [id in JobType]?: Queue } = {} - private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} private queueEvents: { [id in JobType]?: QueueEvents } = {} private flowProducer: FlowProducer @@ -187,7 +183,6 @@ class JobQueue { for (const handlerName of Object.keys(handlers)) { this.buildWorker(handlerName) this.buildQueue(handlerName) - this.buildQueueScheduler(handlerName) this.buildQueueEvent(handlerName) } @@ -205,7 +200,8 @@ class JobQueue { autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - connection: Redis.getRedisClientOptions('Worker') + connection: Redis.getRedisClientOptions('Worker'), + maxStalledCount: 10 } const handler = function (job: Job) { @@ -255,20 +251,6 @@ class JobQueue { this.queues[handlerName] = queue } - private buildQueueScheduler (handlerName: JobType) { - const queueSchedulerOptions: QueueSchedulerOptions = { - autorun: false, - connection: Redis.getRedisClientOptions('QueueScheduler'), - prefix: this.jobRedisPrefix, - maxStalledCount: 10 - } - - const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) - queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) - - this.queueSchedulers[handlerName] = queueScheduler - } - private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { autorun: false, @@ -289,13 +271,11 @@ class JobQueue { .map(handlerName => { const worker: Worker = this.workers[handlerName] const queue: Queue = this.queues[handlerName] - const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName] return Promise.all([ worker.close(false), queue.close(), - queueScheduler.close(), queueEvent.close() ]) }) @@ -307,12 +287,10 @@ class JobQueue { const promises = Object.keys(this.workers) .map(handlerName => { const worker: Worker = this.workers[handlerName] - const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName] return Promise.all([ worker.run(), - queueScheduler.run(), queueEvent.run() ]) }) @@ -386,7 +364,11 @@ class JobQueue { name: 'job', data: job.payload, queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) + opts: { + failParentOnFailure: true, + + ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) + } } }