From 9129b7694d577322327ee79e9b9aa64deee92765 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 8 Feb 2021 10:51:10 +0100 Subject: Allow to specify transcoding and import jobs concurrency --- server/controllers/api/config.ts | 2 + server/initializers/checker-after-init.ts | 10 +++++ server/initializers/checker-before-init.ts | 4 +- server/initializers/config.ts | 3 ++ server/initializers/constants.ts | 4 +- server/lib/job-queue/handlers/video-transcoding.ts | 44 ++++++++++++---------- server/lib/job-queue/job-queue.ts | 12 +++++- server/lib/video-transcoding.ts | 4 +- server/middlewares/validators/config.ts | 2 + server/models/video/video.ts | 4 +- server/tests/api/check-params/config.ts | 2 + server/tests/api/server/config.ts | 6 +++ 12 files changed, 65 insertions(+), 32 deletions(-) (limited to 'server') diff --git a/server/controllers/api/config.ts b/server/controllers/api/config.ts index 7fda06a87..5c242da04 100644 --- a/server/controllers/api/config.ts +++ b/server/controllers/api/config.ts @@ -417,6 +417,7 @@ function customConfig (): CustomConfig { allowAdditionalExtensions: CONFIG.TRANSCODING.ALLOW_ADDITIONAL_EXTENSIONS, allowAudioFiles: CONFIG.TRANSCODING.ALLOW_AUDIO_FILES, threads: CONFIG.TRANSCODING.THREADS, + concurrency: CONFIG.TRANSCODING.CONCURRENCY, profile: CONFIG.TRANSCODING.PROFILE, resolutions: { '0p': CONFIG.TRANSCODING.RESOLUTIONS['0p'], @@ -458,6 +459,7 @@ function customConfig (): CustomConfig { }, import: { videos: { + concurrency: CONFIG.IMPORT.VIDEOS.CONCURRENCY, http: { enabled: CONFIG.IMPORT.VIDEOS.HTTP.ENABLED }, diff --git a/server/initializers/checker-after-init.ts b/server/initializers/checker-after-init.ts index 979c97a8b..2b00e2047 100644 --- a/server/initializers/checker-after-init.ts +++ b/server/initializers/checker-after-init.ts @@ -116,6 +116,16 @@ function checkConfig () { if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false && CONFIG.TRANSCODING.HLS.ENABLED === false) { return 'You need to enable at least WebTorrent transcoding or HLS transcoding.' } + + if (CONFIG.TRANSCODING.CONCURRENCY <= 0) { + return 'Transcoding concurrency should be > 0' + } + } + + if (CONFIG.IMPORT.VIDEOS.HTTP.ENABLED || CONFIG.IMPORT.VIDEOS.TORRENT.ENABLED) { + if (CONFIG.IMPORT.VIDEOS.CONCURRENCY <= 0) { + return 'Video import concurrency should be > 0' + } } // Broadcast message diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts index cac24f0b6..a186afbdd 100644 --- a/server/initializers/checker-before-init.ts +++ b/server/initializers/checker-before-init.ts @@ -22,10 +22,10 @@ function checkMissedConfig () { 'signup.filters.cidr.whitelist', 'signup.filters.cidr.blacklist', 'redundancy.videos.strategies', 'redundancy.videos.check_interval', 'transcoding.enabled', 'transcoding.threads', 'transcoding.allow_additional_extensions', 'transcoding.hls.enabled', - 'transcoding.profile', + 'transcoding.profile', 'transcoding.concurrency', 'transcoding.resolutions.0p', 'transcoding.resolutions.240p', 'transcoding.resolutions.360p', 'transcoding.resolutions.480p', 'transcoding.resolutions.720p', 'transcoding.resolutions.1080p', 'transcoding.resolutions.1440p', 'transcoding.resolutions.2160p', - 'import.videos.http.enabled', 'import.videos.torrent.enabled', 'auto_blacklist.videos.of_users.enabled', + 'import.videos.http.enabled', 'import.videos.torrent.enabled', 'import.videos.concurrency', 'auto_blacklist.videos.of_users.enabled', 'trending.videos.interval_days', 'instance.name', 'instance.short_description', 'instance.description', 'instance.terms', 'instance.default_client_route', 'instance.is_nsfw', 'instance.default_nsfw_policy', 'instance.robots', 'instance.securitytxt', diff --git a/server/initializers/config.ts b/server/initializers/config.ts index 3b44d82ed..930fd784e 100644 --- a/server/initializers/config.ts +++ b/server/initializers/config.ts @@ -188,6 +188,7 @@ const CONFIG = { get ALLOW_ADDITIONAL_EXTENSIONS () { return config.get('transcoding.allow_additional_extensions') }, get ALLOW_AUDIO_FILES () { return config.get('transcoding.allow_audio_files') }, get THREADS () { return config.get('transcoding.threads') }, + get CONCURRENCY () { return config.get('transcoding.concurrency') }, get PROFILE () { return config.get('transcoding.profile') }, RESOLUTIONS: { get '0p' () { return config.get('transcoding.resolutions.0p') }, @@ -237,6 +238,8 @@ const CONFIG = { }, IMPORT: { VIDEOS: { + get CONCURRENCY () { return config.get('import.videos.concurrency') }, + HTTP: { get ENABLED () { return config.get('import.videos.http.enabled') }, get FORCE_IPV4 () { return config.get('import.videos.http.force_ipv4') }, diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 9d9b3966c..7beaca238 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -146,14 +146,12 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { 'video-redundancy': 1, 'video-live-ending': 1 } -const JOB_CONCURRENCY: { [id in JobType]: number } = { +const JOB_CONCURRENCY: { [id in JobType]?: number } = { 'activitypub-http-broadcast': 1, 'activitypub-http-unicast': 5, 'activitypub-http-fetcher': 1, 'activitypub-follow': 1, 'video-file-import': 1, - 'video-transcoding': 1, - 'video-import': 1, 'email': 5, 'videos-views': 1, 'activitypub-refresher': 1, diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4718a7d5c..e248b645e 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -76,7 +76,7 @@ async function processVideoTranscoding (job: Bull.Job) { // Job handlers // --------------------------------------------------------------------------- -async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { +async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { const videoFileInput = payload.copyCodecs ? video.getWebTorrentFile(payload.resolution) : video.getMaxQualityFile() @@ -93,7 +93,7 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide job }) - await retryTransactionWrapper(onHlsPlaylistGeneration, video, payload.resolution) + await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) } async function handleNewWebTorrentResolutionJob ( @@ -110,9 +110,7 @@ async function handleNewWebTorrentResolutionJob ( async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { await mergeAudioVideofile(video, payload.resolution, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) - - await createLowerResolutionsJobs(video, user, payload.resolution, false) + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) } async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { @@ -123,13 +121,11 @@ async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTran // --------------------------------------------------------------------------- -async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: number) { +async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { if (video === undefined) return undefined - const maxQualityFile = video.getMaxQualityFile() - - // We generated the max quality HLS playlist, we don't need the webtorrent files anymore if the admin disabled it - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false && video.hasWebTorrentFiles() && maxQualityFile.resolution === resolution) { + if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + // Remove webtorrent files if not enabled for (const file of video.VideoFiles) { await video.removeFile(file) await video.removeTorrent(file) @@ -137,6 +133,9 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb } video.VideoFiles = [] + + // Create HLS new resolution jobs + await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') } return publishAndFederateIfNeeded(video) @@ -144,7 +143,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb async function onVideoFileOptimizer ( videoArg: MVideoWithFile, - payload: OptimizeTranscodingPayload, + payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, transcodeType: TranscodeOptionsType, user: MUserId ) { @@ -166,11 +165,12 @@ async function onVideoFileOptimizer ( isPortraitMode, resolution: videoDatabase.getMaxQualityFile().resolution, // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues - copyCodecs: transcodeType !== 'quick-transcode' + copyCodecs: transcodeType !== 'quick-transcode', + isMaxQuality: true }) await createHlsJobIfEnabled(user, originalFileHLSPayload) - const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) + const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent') if (!hasNewResolutions) { // No transcoding to do, it's now published @@ -193,7 +193,7 @@ async function onNewWebTorrentFileResolution ( ) { await publishAndFederateIfNeeded(video) - await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) + await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) } // --------------------------------------------------------------------------- @@ -210,6 +210,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { resolution: number isPortraitMode?: boolean copyCodecs: boolean + isMaxQuality: boolean }) { if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return @@ -222,7 +223,8 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { videoUUID: payload.videoUUID, resolution: payload.resolution, isPortraitMode: payload.isPortraitMode, - copyCodecs: payload.copyCodecs + copyCodecs: payload.copyCodecs, + isMaxQuality: payload.isMaxQuality } return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) @@ -232,7 +234,8 @@ async function createLowerResolutionsJobs ( video: MVideoFullLight, user: MUserId, videoFileResolution: number, - isPortraitMode: boolean + isPortraitMode: boolean, + type: 'hls' | 'webtorrent' ) { // Create transcoding jobs if there are enabled resolutions const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') @@ -250,7 +253,7 @@ async function createLowerResolutionsJobs ( for (const resolution of resolutionsEnabled) { let dataInput: VideoTranscodingPayload - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { // WebTorrent will create subsequent HLS job dataInput = { type: 'new-resolution-to-webtorrent', @@ -258,13 +261,16 @@ async function createLowerResolutionsJobs ( resolution, isPortraitMode } - } else if (CONFIG.TRANSCODING.HLS.ENABLED) { + } + + if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { dataInput = { type: 'new-resolution-to-hls', videoUUID: video.uuid, resolution, isPortraitMode, - copyCodecs: false + copyCodecs: false, + isMaxQuality: false } } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 38b1d6f1f..72fed6072 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,5 +1,6 @@ import * as Bull from 'bull' import { jobStates } from '@server/helpers/custom-validators/jobs' +import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' import { ActivitypubFollowPayload, @@ -105,11 +106,11 @@ class JobQueue { } } - for (const handlerName of Object.keys(handlers)) { + for (const handlerName of (Object.keys(handlers) as JobType[])) { const queue = new Bull(handlerName, queueOptions) const handler = handlers[handlerName] - queue.process(JOB_CONCURRENCY[handlerName], handler) + queue.process(this.getJobConcurrency(handlerName), handler) .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) queue.on('failed', (job, err) => { @@ -235,6 +236,13 @@ class JobQueue { return jobTypes.filter(t => t === jobType) } + private getJobConcurrency (jobType: JobType) { + if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY + if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY + + return JOB_CONCURRENCY[jobType] + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/video-transcoding.ts b/server/lib/video-transcoding.ts index 88a6e0673..a58c9dd20 100644 --- a/server/lib/video-transcoding.ts +++ b/server/lib/video-transcoding.ts @@ -272,7 +272,7 @@ async function generateHlsPlaylistCommon (options: { const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC, job } = options const transcodeDirectory = CONFIG.STORAGE.TMP_DIR - const videoTranscodedBasePath = join(transcodeDirectory, type, video.uuid) + const videoTranscodedBasePath = join(transcodeDirectory, type) await ensureDir(videoTranscodedBasePath) const videoFilename = generateVideoStreamingPlaylistName(video.uuid, resolution) @@ -337,8 +337,6 @@ async function generateHlsPlaylistCommon (options: { await move(playlistFileTranscodePath, playlistPath, { overwrite: true }) // Move video file await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true }) - // Cleanup directory - await remove(videoTranscodedBasePath) const stats = await stat(videoFilePath) diff --git a/server/middlewares/validators/config.ts b/server/middlewares/validators/config.ts index 52c28799f..71098ff99 100644 --- a/server/middlewares/validators/config.ts +++ b/server/middlewares/validators/config.ts @@ -39,6 +39,7 @@ const customConfigUpdateValidator = [ body('transcoding.enabled').isBoolean().withMessage('Should have a valid transcoding enabled boolean'), body('transcoding.allowAdditionalExtensions').isBoolean().withMessage('Should have a valid additional extensions boolean'), body('transcoding.threads').isInt().withMessage('Should have a valid transcoding threads number'), + body('transcoding.concurrency').isInt({ min: 1 }).withMessage('Should have a valid transcoding concurrency number'), body('transcoding.resolutions.0p').isBoolean().withMessage('Should have a valid transcoding 0p resolution enabled boolean'), body('transcoding.resolutions.240p').isBoolean().withMessage('Should have a valid transcoding 240p resolution enabled boolean'), body('transcoding.resolutions.360p').isBoolean().withMessage('Should have a valid transcoding 360p resolution enabled boolean'), @@ -51,6 +52,7 @@ const customConfigUpdateValidator = [ body('transcoding.webtorrent.enabled').isBoolean().withMessage('Should have a valid webtorrent transcoding enabled boolean'), body('transcoding.hls.enabled').isBoolean().withMessage('Should have a valid hls transcoding enabled boolean'), + body('import.videos.concurrency').isInt({ min: 0 }).withMessage('Should have a valid import concurrency number'), body('import.videos.http.enabled').isBoolean().withMessage('Should have a valid import video http enabled boolean'), body('import.videos.torrent.enabled').isBoolean().withMessage('Should have a valid import video torrent enabled boolean'), diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 3f6fd8dc0..14e80a3ba 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -345,7 +345,6 @@ export type AvailableForListIDsOptions = { include: [ { model: VideoFileModel, - separate: true, // We may have multiple files, having multiple redundancies so let's separate this join required: false, include: subInclude } @@ -372,7 +371,6 @@ export type AvailableForListIDsOptions = { include: [ { model: VideoStreamingPlaylistModel.unscoped(), - separate: true, // We may have multiple streaming playlists, having multiple redundancies so let's separate this join required: false, include: subInclude } @@ -1689,7 +1687,7 @@ export class VideoModel extends Model { channelModel.Account = accountModel - const videoModel = new VideoModel(pick(row, videoKeys)) + const videoModel = new VideoModel(pick(row, videoKeys), buildOpts) videoModel.VideoChannel = channelModel videoModel.UserVideoHistories = [] diff --git a/server/tests/api/check-params/config.ts b/server/tests/api/check-params/config.ts index d6c20f7af..c7eb3189b 100644 --- a/server/tests/api/check-params/config.ts +++ b/server/tests/api/check-params/config.ts @@ -86,6 +86,7 @@ describe('Test config API validators', function () { enabled: true, allowAdditionalExtensions: true, allowAudioFiles: true, + concurrency: 1, threads: 1, profile: 'vod_profile', resolutions: { @@ -130,6 +131,7 @@ describe('Test config API validators', function () { }, import: { videos: { + concurrency: 1, http: { enabled: false }, diff --git a/server/tests/api/server/config.ts b/server/tests/api/server/config.ts index 26df8373e..b2371614f 100644 --- a/server/tests/api/server/config.ts +++ b/server/tests/api/server/config.ts @@ -70,6 +70,7 @@ function checkInitialConfig (server: ServerInfo, data: CustomConfig) { expect(data.transcoding.allowAdditionalExtensions).to.be.false expect(data.transcoding.allowAudioFiles).to.be.false expect(data.transcoding.threads).to.equal(2) + expect(data.transcoding.concurrency).to.equal(2) expect(data.transcoding.profile).to.equal('default') expect(data.transcoding.resolutions['240p']).to.be.true expect(data.transcoding.resolutions['360p']).to.be.true @@ -97,6 +98,7 @@ function checkInitialConfig (server: ServerInfo, data: CustomConfig) { expect(data.live.transcoding.resolutions['1440p']).to.be.false expect(data.live.transcoding.resolutions['2160p']).to.be.false + expect(data.import.videos.concurrency).to.equal(2) expect(data.import.videos.http.enabled).to.be.true expect(data.import.videos.torrent.enabled).to.be.true expect(data.autoBlacklist.videos.ofUsers.enabled).to.be.false @@ -159,6 +161,7 @@ function checkUpdatedConfig (data: CustomConfig) { expect(data.transcoding.enabled).to.be.true expect(data.transcoding.threads).to.equal(1) + expect(data.transcoding.concurrency).to.equal(3) expect(data.transcoding.allowAdditionalExtensions).to.be.true expect(data.transcoding.allowAudioFiles).to.be.true expect(data.transcoding.profile).to.equal('vod_profile') @@ -186,6 +189,7 @@ function checkUpdatedConfig (data: CustomConfig) { expect(data.live.transcoding.resolutions['1080p']).to.be.true expect(data.live.transcoding.resolutions['2160p']).to.be.true + expect(data.import.videos.concurrency).to.equal(4) expect(data.import.videos.http.enabled).to.be.false expect(data.import.videos.torrent.enabled).to.be.false expect(data.autoBlacklist.videos.ofUsers.enabled).to.be.true @@ -323,6 +327,7 @@ describe('Test config', function () { allowAdditionalExtensions: true, allowAudioFiles: true, threads: 1, + concurrency: 3, profile: 'vod_profile', resolutions: { '0p': false, @@ -364,6 +369,7 @@ describe('Test config', function () { }, import: { videos: { + concurrency: 4, http: { enabled: false }, -- cgit v1.2.3