From 9129b7694d577322327ee79e9b9aa64deee92765 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 8 Feb 2021 10:51:10 +0100 Subject: Allow to specify transcoding and import jobs concurrency --- server/lib/job-queue/handlers/video-transcoding.ts | 44 ++++++++++++---------- server/lib/job-queue/job-queue.ts | 12 +++++- 2 files changed, 35 insertions(+), 21 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4718a7d5c..e248b645e 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -76,7 +76,7 @@ async function processVideoTranscoding (job: Bull.Job) { // Job handlers // --------------------------------------------------------------------------- -async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { +async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { const videoFileInput = payload.copyCodecs ? video.getWebTorrentFile(payload.resolution) : video.getMaxQualityFile() @@ -93,7 +93,7 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide job }) - await retryTransactionWrapper(onHlsPlaylistGeneration, video, payload.resolution) + await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) } async function handleNewWebTorrentResolutionJob ( @@ -110,9 +110,7 @@ async function handleNewWebTorrentResolutionJob ( async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { await mergeAudioVideofile(video, payload.resolution, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) - - await createLowerResolutionsJobs(video, user, payload.resolution, false) + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) } async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { @@ -123,13 +121,11 @@ async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTran // --------------------------------------------------------------------------- -async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: number) { +async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { if (video === undefined) return undefined - const maxQualityFile = video.getMaxQualityFile() - - // We generated the max quality HLS playlist, we don't need the webtorrent files anymore if the admin disabled it - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false && video.hasWebTorrentFiles() && maxQualityFile.resolution === resolution) { + if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + // Remove webtorrent files if not enabled for (const file of video.VideoFiles) { await video.removeFile(file) await video.removeTorrent(file) @@ -137,6 +133,9 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb } video.VideoFiles = [] + + // Create HLS new resolution jobs + await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') } return publishAndFederateIfNeeded(video) @@ -144,7 +143,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb async function onVideoFileOptimizer ( videoArg: MVideoWithFile, - payload: OptimizeTranscodingPayload, + payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, transcodeType: TranscodeOptionsType, user: MUserId ) { @@ -166,11 +165,12 @@ async function onVideoFileOptimizer ( isPortraitMode, resolution: videoDatabase.getMaxQualityFile().resolution, // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues - copyCodecs: transcodeType !== 'quick-transcode' + copyCodecs: transcodeType !== 'quick-transcode', + isMaxQuality: true }) await createHlsJobIfEnabled(user, originalFileHLSPayload) - const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) + const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent') if (!hasNewResolutions) { // No transcoding to do, it's now published @@ -193,7 +193,7 @@ async function onNewWebTorrentFileResolution ( ) { await publishAndFederateIfNeeded(video) - await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) + await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) } // --------------------------------------------------------------------------- @@ -210,6 +210,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { resolution: number isPortraitMode?: boolean copyCodecs: boolean + isMaxQuality: boolean }) { if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return @@ -222,7 +223,8 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { videoUUID: payload.videoUUID, resolution: payload.resolution, isPortraitMode: payload.isPortraitMode, - copyCodecs: payload.copyCodecs + copyCodecs: payload.copyCodecs, + isMaxQuality: payload.isMaxQuality } return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) @@ -232,7 +234,8 @@ async function createLowerResolutionsJobs ( video: MVideoFullLight, user: MUserId, videoFileResolution: number, - isPortraitMode: boolean + isPortraitMode: boolean, + type: 'hls' | 'webtorrent' ) { // Create transcoding jobs if there are enabled resolutions const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') @@ -250,7 +253,7 @@ async function createLowerResolutionsJobs ( for (const resolution of resolutionsEnabled) { let dataInput: VideoTranscodingPayload - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { // WebTorrent will create subsequent HLS job dataInput = { type: 'new-resolution-to-webtorrent', @@ -258,13 +261,16 @@ async function createLowerResolutionsJobs ( resolution, isPortraitMode } - } else if (CONFIG.TRANSCODING.HLS.ENABLED) { + } + + if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { dataInput = { type: 'new-resolution-to-hls', videoUUID: video.uuid, resolution, isPortraitMode, - copyCodecs: false + copyCodecs: false, + isMaxQuality: false } } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 38b1d6f1f..72fed6072 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,5 +1,6 @@ import * as Bull from 'bull' import { jobStates } from '@server/helpers/custom-validators/jobs' +import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' import { ActivitypubFollowPayload, @@ -105,11 +106,11 @@ class JobQueue { } } - for (const handlerName of Object.keys(handlers)) { + for (const handlerName of (Object.keys(handlers) as JobType[])) { const queue = new Bull(handlerName, queueOptions) const handler = handlers[handlerName] - queue.process(JOB_CONCURRENCY[handlerName], handler) + queue.process(this.getJobConcurrency(handlerName), handler) .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) queue.on('failed', (job, err) => { @@ -235,6 +236,13 @@ class JobQueue { return jobTypes.filter(t => t === jobType) } + private getJobConcurrency (jobType: JobType) { + if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY + if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY + + return JOB_CONCURRENCY[jobType] + } + static get Instance () { return this.instance || (this.instance = new this()) } -- cgit v1.2.3