X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=d3776c3bfd6b72d373fd5c97e88e7f87cc0853f9;hb=941d28cc7f9877aa2cfcee018413e0f04f0a9048;hp=4cda12b5754c5ca411384656eaaf2f4edafc192b;hpb=41fb13c330de629df2d23379209e79c7af0f2e9a;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 4cda12b57..d3776c3bf 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -8,20 +8,22 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + DeleteResumableUploadMetaFilePayload, EmailPayload, JobState, JobType, + ManageVideoTorrentPayload, MoveObjectStoragePayload, RefreshPayload, VideoFileImportPayload, VideoImportPayload, VideoLiveEndingPayload, VideoRedundancyPayload, + VideoStudioEditionPayload, VideoTranscodingPayload } from '../../../shared/models' import { logger } from '../../helpers/logger' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' -import { Redis } from '../redis' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' @@ -30,12 +32,14 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' import { processEmail } from './handlers/email' +import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' +import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' -import { processVideosViews } from './handlers/video-views' +import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -48,10 +52,13 @@ type CreateJobArgument = { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | - { type: 'videos-views', payload: {} } | + { type: 'videos-views-stats', payload: {} } | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'actor-keys', payload: ActorKeysPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } | + { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | + { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | + { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } export type CreateJobOptions = { @@ -69,12 +76,14 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-transcoding': processVideoTranscoding, 'email': processEmail, 'video-import': processVideoImport, - 'videos-views': processVideosViews, + 'videos-views-stats': processVideosViewsStats, 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, 'actor-keys': processActorKeys, 'video-redundancy': processVideoRedundancy, - 'move-to-object-storage': processMoveToObjectStorage + 'move-to-object-storage': processMoveToObjectStorage, + 'manage-video-torrent': processManageVideoTorrent, + 'video-studio-edition': processVideoStudioEdition } const jobTypes: JobType[] = [ @@ -87,14 +96,18 @@ const jobTypes: JobType[] = [ 'video-transcoding', 'video-file-import', 'video-import', - 'videos-views', + 'videos-views-stats', 'activitypub-refresher', 'video-redundancy', 'actor-keys', 'video-live-ending', - 'move-to-object-storage' + 'move-to-object-storage', + 'manage-video-torrent', + 'video-studio-edition' ] +const silentFailure = new Set([ 'activitypub-http-unicast' ]) + class JobQueue { private static instance: JobQueue @@ -106,15 +119,22 @@ class JobQueue { private constructor () { } - init () { + init (produceOnly = false) { // Already initialized if (this.initialized === true) return this.initialized = true this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST - const queueOptions = { + + const queueOptions: Bull.QueueOptions = { prefix: this.jobRedisPrefix, - redis: Redis.getRedisClientOptions(), + redis: { + password: CONFIG.REDIS.AUTH, + db: CONFIG.REDIS.DB, + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + path: CONFIG.REDIS.SOCKET + }, settings: { maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts } @@ -122,13 +142,23 @@ class JobQueue { for (const handlerName of (Object.keys(handlers) as JobType[])) { const queue = new Bull(handlerName, queueOptions) + + if (produceOnly) { + queue.pause(true) + .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) + } + const handler = handlers[handlerName] queue.process(this.getJobConcurrency(handlerName), handler) .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) queue.on('failed', (job, err) => { - logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) + const logLevel = silentFailure.has(handlerName) + ? 'debug' + : 'error' + + logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) }) queue.on('error', err => { @@ -148,13 +178,25 @@ class JobQueue { } } + async pause () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].pause(true) + } + } + + async resume () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].resume(true) + } + } + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { this.createJobWithPromise(obj, options) .catch(err => logger.error('Cannot create job.', { err, obj })) } createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { - const queue = this.queues[obj.type] + const queue: Queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) return @@ -239,8 +281,8 @@ class JobQueue { } private addRepeatableJobs () { - this.queues['videos-views'].add({}, { - repeat: REPEAT_JOBS['videos-views'] + this.queues['videos-views-stats'].add({}, { + repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {