From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- .../job-queue/handlers/transcoding-job-builder.ts | 47 ++++ server/lib/job-queue/handlers/video-file-import.ts | 2 +- server/lib/job-queue/handlers/video-import.ts | 12 +- server/lib/job-queue/handlers/video-live-ending.ts | 10 +- .../lib/job-queue/handlers/video-studio-edition.ts | 68 ++--- server/lib/job-queue/handlers/video-transcoding.ts | 282 +++------------------ server/lib/job-queue/job-queue.ts | 63 ++--- 7 files changed, 169 insertions(+), 315 deletions(-) create mode 100644 server/lib/job-queue/handlers/transcoding-job-builder.ts (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts new file mode 100644 index 000000000..8b4a877d7 --- /dev/null +++ b/server/lib/job-queue/handlers/transcoding-job-builder.ts @@ -0,0 +1,47 @@ +import { Job } from 'bullmq' +import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' +import { UserModel } from '@server/models/user/user' +import { VideoModel } from '@server/models/video/video' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { pick } from '@shared/core-utils' +import { TranscodingJobBuilderPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' +import { JobQueue } from '../job-queue' + +async function processTranscodingJobBuilder (job: Job) { + const payload = job.data as TranscodingJobBuilderPayload + + logger.info('Processing transcoding job builder in job %s.', job.id) + + if (payload.optimizeJob) { + const video = await VideoModel.loadFull(payload.videoUUID) + const user = await UserModel.loadByVideoId(video.id) + const videoFile = video.getMaxQualityFile() + + await createOptimizeOrMergeAudioJobs({ + ...pick(payload.optimizeJob, [ 'isNewVideo' ]), + + video, + videoFile, + user + }) + } + + for (const job of (payload.jobs || [])) { + await JobQueue.Instance.createJob(job) + + await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') + } + + for (const sequentialJobs of (payload.sequentialJobs || [])) { + await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs) + + await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.length) + } +} + +// --------------------------------------------------------------------------- + +export { + processTranscodingJobBuilder +} diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index d950f6407..9a4550e4d 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -10,8 +10,8 @@ import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { MVideoFullLight } from '@server/types/models' import { getLowercaseExtension } from '@shared/core-utils' +import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' import { VideoFileImportPayload, VideoStorage } from '@shared/models' -import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' import { logger } from '../../../helpers/logger' import { JobQueue } from '../job-queue' diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4d361c7b9..2a063282c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -7,15 +7,16 @@ import { isPostImportVideoAccepted } from '@server/lib/moderation' import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { Hooks } from '@server/lib/plugins/hooks' import { ServerConfigManager } from '@server/lib/server-config-manager' +import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' import { isAbleToUploadVideo } from '@server/lib/user' -import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' +import { buildMoveToObjectStorageJob } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { ThumbnailModel } from '@server/models/video/thumbnail' import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' import { getLowercaseExtension } from '@shared/core-utils' -import { isAudioFile } from '@shared/extra-utils' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' import { ThumbnailType, VideoImportPayload, @@ -28,7 +29,6 @@ import { VideoResolution, VideoState } from '@shared/models' -import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '../../../helpers/ffmpeg' import { logger } from '../../../helpers/logger' import { getSecureTorrentName } from '../../../helpers/utils' import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' @@ -137,7 +137,7 @@ async function processFile (downloader: () => Promise, videoImport: MVid const { resolution } = await isAudioFile(tempVideoPath, probe) ? { resolution: VideoResolution.H_NOVIDEO } - : await getVideoStreamDimensionsInfo(tempVideoPath) + : await getVideoStreamDimensionsInfo(tempVideoPath, probe) const fps = await getVideoStreamFPS(tempVideoPath, probe) const duration = await getVideoStreamDuration(tempVideoPath, probe) @@ -313,9 +313,7 @@ async function afterImportSuccess (options: { } if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? - await JobQueue.Instance.createJob( - await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) - ) + await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user }) } } diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 2f3a971bd..1bf43f592 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,25 +1,25 @@ import { Job } from 'bullmq' import { readdir, remove } from 'fs-extra' import { join } from 'path' -import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' import { generateVideoMiniature } from '@server/lib/thumbnail' -import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' +import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding' +import { VideoPathManager } from '@server/lib/video-path-manager' import { moveToNextState } from '@server/lib/video-state' import { VideoModel } from '@server/models/video/video' import { VideoBlacklistModel } from '@server/models/video/video-blacklist' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' +import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' -import { VideoPathManager } from '@server/lib/video-path-manager' -import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' const lTags = loggerTagsFactory('live', 'job') @@ -224,6 +224,7 @@ async function assignReplayFilesToVideo (options: { const probe = await ffprobePromise(concatenatedTsFilePath) const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) + const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe) try { await generateHlsPlaylistResolutionFromTS({ @@ -231,6 +232,7 @@ async function assignReplayFilesToVideo (options: { inputFileMutexReleaser, concatenatedTsFilePath, resolution, + fps, isAAC: audioStream?.codec_name === 'aac' }) } catch (err) { diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 3e208d83d..991d11ef1 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts @@ -1,15 +1,16 @@ import { Job } from 'bullmq' import { move, remove } from 'fs-extra' import { join } from 'path' -import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' +import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' import { CONFIG } from '@server/initializers/config' +import { VIDEO_FILTERS } from '@server/initializers/constants' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { generateWebTorrentVideoFilename } from '@server/lib/paths' +import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '@server/lib/user' -import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' -import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' +import { buildFileMetadata, removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' import { VideoPathManager } from '@server/lib/video-path-manager' import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' import { UserModel } from '@server/models/user/user' @@ -17,15 +18,8 @@ import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' import { getLowercaseExtension, pick } from '@shared/core-utils' -import { - buildFileMetadata, - buildUUID, - ffprobePromise, - getFileSize, - getVideoStreamDimensionsInfo, - getVideoStreamDuration, - getVideoStreamFPS -} from '@shared/extra-utils' +import { buildUUID, getFileSize } from '@shared/extra-utils' +import { FFmpegEdition, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg' import { VideoStudioEditionPayload, VideoStudioTask, @@ -36,7 +30,6 @@ import { VideoStudioTaskWatermarkPayload } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' -import { JobQueue } from '../job-queue' const lTagsBase = loggerTagsFactory('video-edition') @@ -102,9 +95,7 @@ async function processVideoStudioEdition (job: Job) { const user = await UserModel.loadByVideoId(video.id) - await JobQueue.Instance.createJob( - await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) - ) + await createOptimizeOrMergeAudioJobs({ video, videoFile: newFile, isNewVideo: false, user }) } // --------------------------------------------------------------------------- @@ -131,9 +122,9 @@ const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessor } async function processTask (options: TaskProcessorOptions) { - const { video, task } = options + const { video, task, lTags } = options - logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) + logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags }) const processor = taskProcessors[options.task.name] if (!process) throw new Error('Unknown task ' + task.name) @@ -142,48 +133,53 @@ async function processTask (options: TaskProcessorOptions) { } function processAddIntroOutro (options: TaskProcessorOptions) { - const { task } = options + const { task, lTags } = options + + logger.debug('Will add intro/outro to the video.', { options, ...lTags }) - return addIntroOutro({ + return buildFFmpegEdition().addIntroOutro({ ...pick(options, [ 'inputPath', 'outputPath' ]), introOutroPath: task.options.file, type: task.name === 'add-intro' ? 'intro' - : 'outro', - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE + : 'outro' }) } function processCut (options: TaskProcessorOptions) { - const { task } = options + const { task, lTags } = options - return cutVideo({ + logger.debug('Will cut the video.', { options, ...lTags }) + + return buildFFmpegEdition().cutVideo({ ...pick(options, [ 'inputPath', 'outputPath' ]), start: task.options.start, - end: task.options.end, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE + end: task.options.end }) } function processAddWatermark (options: TaskProcessorOptions) { - const { task } = options + const { task, lTags } = options + + logger.debug('Will add watermark to the video.', { options, ...lTags }) - return addWatermark({ + return buildFFmpegEdition().addWatermark({ ...pick(options, [ 'inputPath', 'outputPath' ]), watermarkPath: task.options.file, - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE + videoFilters: { + watermarkSizeRatio: VIDEO_FILTERS.WATERMARK.SIZE_RATIO, + horitonzalMarginRatio: VIDEO_FILTERS.WATERMARK.HORIZONTAL_MARGIN_RATIO, + verticalMarginRatio: VIDEO_FILTERS.WATERMARK.VERTICAL_MARGIN_RATIO + } }) } +// --------------------------------------------------------------------------- + async function buildNewFile (video: MVideoId, path: string) { const videoFile = new VideoFileModel({ extname: getLowercaseExtension(path), @@ -223,3 +219,7 @@ async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStud throw new Error('Quota exceeded for this user to edit the video') } } + +function buildFFmpegEdition () { + return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) +} diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 3e6d23363..17b717275 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,13 +1,13 @@ import { Job } from 'bullmq' -import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' -import { Hooks } from '@server/lib/plugins/hooks' -import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' +import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding' +import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding' +import { removeAllWebTorrentFiles } from '@server/lib/video-file' import { VideoPathManager } from '@server/lib/video-path-manager' -import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' +import { moveToFailedTranscodingState } from '@server/lib/video-state' import { UserModel } from '@server/models/user/user' import { VideoJobInfoModel } from '@server/models/video/video-job-info' -import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' -import { pick } from '@shared/core-utils' +import { MUser, MUserId, MVideoFullLight } from '@server/types/models' import { HLSTranscodingPayload, MergeAudioTranscodingPayload, @@ -15,18 +15,8 @@ import { OptimizeTranscodingPayload, VideoTranscodingPayload } from '@shared/models' -import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg' import { logger, loggerTagsFactory } from '../../../helpers/logger' -import { CONFIG } from '../../../initializers/config' import { VideoModel } from '../../../models/video/video' -import { - generateHlsPlaylistResolution, - mergeAudioVideofile, - optimizeOriginalVideofile, - transcodeNewWebTorrentResolution -} from '../../transcoding/transcoding' -import { JobQueue } from '../job-queue' type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise @@ -84,260 +74,72 @@ export { // Job handlers // --------------------------------------------------------------------------- -async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { - logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) - - const videoFileInput = payload.copyCodecs - ? video.getWebTorrentFile(payload.resolution) - : video.getMaxQualityFile() - - const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() - - const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - await videoFileInput.getVideo().reload() - - await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { - return generateHlsPlaylistResolution({ - video, - videoInputPath, - inputFileMutexReleaser, - resolution: payload.resolution, - copyCodecs: payload.copyCodecs, - job - }) - }) - } finally { - inputFileMutexReleaser() - } - - logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - - await onHlsPlaylistGeneration(video, user, payload) -} - -async function handleNewWebTorrentResolutionJob ( - job: Job, - payload: NewWebTorrentResolutionTranscodingPayload, - video: MVideoFullLight, - user: MUserId -) { - logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) - - await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job }) - - logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - - await onNewWebTorrentFileResolution(video, user, payload) -} - async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) - await mergeAudioVideofile({ video, resolution: payload.resolution, job }) + await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job }) logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) - const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) + await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job }) logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } -// --------------------------------------------------------------------------- - -async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { - if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { - // Remove webtorrent files if not enabled - for (const file of video.VideoFiles) { - await video.removeWebTorrentFile(file) - await file.destroy() - } - - video.VideoFiles = [] - - // Create HLS new resolution jobs - await createLowerResolutionsJobs({ - video, - user, - videoFileResolution: payload.resolution, - hasAudio: payload.hasAudio, - isNewVideo: payload.isNewVideo ?? true, - type: 'hls' - }) - } - - await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') - await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) -} +async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) { + logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) -async function onVideoFirstWebTorrentTranscoding ( - videoArg: MVideoWithFile, - payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, - transcodeType: TranscodeVODOptionsType, - user: MUserId -) { - const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) + await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job }) - try { - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadFull(videoArg.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile() - - // Generate HLS version of the original file - const originalFileHLSPayload = { - ...payload, - - hasAudio: !!audioStream, - resolution: videoDatabase.getMaxQualityFile().resolution, - // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues - copyCodecs: transcodeType !== 'quick-transcode', - isMaxQuality: true - } - const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) - const hasNewResolutions = await createLowerResolutionsJobs({ - video: videoDatabase, - user, - videoFileResolution: resolution, - hasAudio: !!audioStream, - type: 'webtorrent', - isNewVideo: payload.isNewVideo ?? true - }) - - await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') + logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - // Move to next state if there are no other resolutions to generate - if (!hasHls && !hasNewResolutions) { - await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) - } - } finally { - mutexReleaser() - } + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } -async function onNewWebTorrentFileResolution ( - video: MVideo, - user: MUserId, - payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload -) { - if (payload.createHLSIfNeeded) { - await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) - } - - await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') +async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { + logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) - await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) -} + const videoFileInput = payload.copyCodecs + ? video.getWebTorrentFile(payload.resolution) + : video.getMaxQualityFile() -// --------------------------------------------------------------------------- + const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() -async function createHlsJobIfEnabled (user: MUserId, payload: { - videoUUID: string - resolution: number - hasAudio: boolean - copyCodecs: boolean - isMaxQuality: boolean - isNewVideo?: boolean -}) { - if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false - - const jobOptions = { - priority: await getTranscodingJobPriority(user) - } + const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - const hlsTranscodingPayload: HLSTranscodingPayload = { - type: 'new-resolution-to-hls', - autoDeleteWebTorrentIfNeeded: true, + try { + await videoFileInput.getVideo().reload() - ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) + await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { + return generateHlsPlaylistResolution({ + video, + videoInputPath, + inputFileMutexReleaser, + resolution: payload.resolution, + fps: payload.fps, + copyCodecs: payload.copyCodecs, + job + }) + }) + } finally { + inputFileMutexReleaser() } - await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) - - return true -} - -async function createLowerResolutionsJobs (options: { - video: MVideoFullLight - user: MUserId - videoFileResolution: number - hasAudio: boolean - isNewVideo: boolean - type: 'hls' | 'webtorrent' -}) { - const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options - - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = await Hooks.wrapObject( - computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), - 'filter:transcoding.auto.resolutions-to-transcode.result', - options - ) - - const resolutionCreated: string[] = [] - - for (const resolution of resolutionsEnabled) { - let dataInput: VideoTranscodingPayload - - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { - // WebTorrent will create subsequent HLS job - dataInput = { - type: 'new-resolution-to-webtorrent', - videoUUID: video.uuid, - resolution, - hasAudio, - createHLSIfNeeded: true, - isNewVideo - } - - resolutionCreated.push('webtorrent-' + resolution) - } - - if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { - dataInput = { - type: 'new-resolution-to-hls', - videoUUID: video.uuid, - resolution, - hasAudio, - copyCodecs: false, - isMaxQuality: false, - autoDeleteWebTorrentIfNeeded: true, - isNewVideo - } - - resolutionCreated.push('hls-' + resolution) - } - - if (!dataInput) continue - - const jobOptions = { - priority: await getTranscodingJobPriority(user) - } - - await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) - } + logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - if (resolutionCreated.length === 0) { - logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) + if (payload.deleteWebTorrentFiles === true) { + logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid)) - return false + await removeAllWebTorrentFiles(video) } - logger.info( - 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, - { resolutionCreated, ...lTags(video.uuid) } - ) - - return true + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index cc6be0bd8..21bf0f226 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -31,6 +31,7 @@ import { MoveObjectStoragePayload, NotifyPayload, RefreshPayload, + TranscodingJobBuilderPayload, VideoChannelImportPayload, VideoFileImportPayload, VideoImportPayload, @@ -56,6 +57,7 @@ import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processNotify } from './handlers/notify' +import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' import { processVideoChannelImport } from './handlers/video-channel-import' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' @@ -69,11 +71,12 @@ export type CreateJobArgument = { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | - { type: 'activitypub-http-cleaner', payload: {} } | + { type: 'activitypub-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | + { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views-stats', payload: {} } | @@ -96,28 +99,29 @@ export type CreateJobOptions = { } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, - 'activitypub-http-unicast': processActivityPubHttpUnicast, - 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, - 'video-file-import': processVideoFileImport, - 'video-transcoding': processVideoTranscoding, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-http-unicast': processActivityPubHttpUnicast, + 'activitypub-refresher': refreshAPObject, + 'actor-keys': processActorKeys, + 'after-video-channel-import': processAfterVideoChannelImport, 'email': processEmail, + 'federate-video': processFederateVideo, + 'transcoding-job-builder': processTranscodingJobBuilder, + 'manage-video-torrent': processManageVideoTorrent, + 'move-to-object-storage': processMoveToObjectStorage, + 'notify': processNotify, + 'video-channel-import': processVideoChannelImport, + 'video-file-import': processVideoFileImport, 'video-import': processVideoImport, - 'videos-views-stats': processVideosViewsStats, - 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, - 'actor-keys': processActorKeys, 'video-redundancy': processVideoRedundancy, - 'move-to-object-storage': processMoveToObjectStorage, - 'manage-video-torrent': processManageVideoTorrent, 'video-studio-edition': processVideoStudioEdition, - 'video-channel-import': processVideoChannelImport, - 'after-video-channel-import': processAfterVideoChannelImport, - 'notify': processNotify, - 'federate-video': processFederateVideo + 'video-transcoding': processVideoTranscoding, + 'videos-views-stats': processVideosViewsStats } const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } = { @@ -125,28 +129,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } } const jobTypes: JobType[] = [ + 'activitypub-cleaner', 'activitypub-follow', - 'activitypub-http-broadcast', 'activitypub-http-broadcast-parallel', + 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', - 'activitypub-cleaner', + 'activitypub-refresher', + 'actor-keys', + 'after-video-channel-import', 'email', - 'video-transcoding', + 'federate-video', + 'transcoding-job-builder', + 'manage-video-torrent', + 'move-to-object-storage', + 'notify', + 'video-channel-import', 'video-file-import', 'video-import', - 'videos-views-stats', - 'activitypub-refresher', - 'video-redundancy', - 'actor-keys', 'video-live-ending', - 'move-to-object-storage', - 'manage-video-torrent', + 'video-redundancy', 'video-studio-edition', - 'video-channel-import', - 'after-video-channel-import', - 'notify', - 'federate-video' + 'video-transcoding', + 'videos-views-stats' ] const silentFailure = new Set([ 'activitypub-http-unicast' ]) -- cgit v1.2.3