From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/lib/hls.ts | 18 +- .../job-queue/handlers/transcoding-job-builder.ts | 47 +++ server/lib/job-queue/handlers/video-file-import.ts | 2 +- server/lib/job-queue/handlers/video-import.ts | 12 +- server/lib/job-queue/handlers/video-live-ending.ts | 10 +- .../lib/job-queue/handlers/video-studio-edition.ts | 68 +-- server/lib/job-queue/handlers/video-transcoding.ts | 282 ++----------- server/lib/job-queue/job-queue.ts | 63 +-- server/lib/live/live-manager.ts | 94 +++-- server/lib/live/live-segment-sha-store.ts | 5 +- server/lib/live/live-utils.ts | 12 +- server/lib/live/shared/muxing-session.ts | 191 ++++----- .../abstract-transcoding-wrapper.ts | 101 +++++ .../ffmpeg-transcoding-wrapper.ts | 95 +++++ .../lib/live/shared/transcoding-wrapper/index.ts | 3 + .../remote-transcoding-wrapper.ts | 20 + server/lib/object-storage/index.ts | 1 + server/lib/object-storage/proxy.ts | 97 +++++ server/lib/peertube-socket.ts | 32 +- server/lib/plugins/plugin-helpers-builder.ts | 2 +- server/lib/runners/index.ts | 3 + .../runners/job-handlers/abstract-job-handler.ts | 271 ++++++++++++ .../abstract-vod-transcoding-job-handler.ts | 71 ++++ server/lib/runners/job-handlers/index.ts | 6 + .../live-rtmp-hls-transcoding-job-handler.ts | 170 ++++++++ .../runners/job-handlers/runner-job-handlers.ts | 18 + server/lib/runners/job-handlers/shared/index.ts | 1 + .../lib/runners/job-handlers/shared/vod-helpers.ts | 44 ++ .../vod-audio-merge-transcoding-job-handler.ts | 97 +++++ .../vod-hls-transcoding-job-handler.ts | 114 +++++ .../vod-web-video-transcoding-job-handler.ts | 84 ++++ server/lib/runners/runner-urls.ts | 9 + server/lib/runners/runner.ts | 36 ++ .../schedulers/runner-job-watch-dog-scheduler.ts | 42 ++ server/lib/server-config-manager.ts | 10 +- server/lib/transcoding/create-transcoding-job.ts | 36 ++ .../transcoding/default-transcoding-profiles.ts | 16 +- server/lib/transcoding/ended-transcoding.ts | 18 + server/lib/transcoding/hls-transcoding.ts | 181 ++++++++ server/lib/transcoding/shared/ffmpeg-builder.ts | 18 + server/lib/transcoding/shared/index.ts | 2 + .../shared/job-builders/abstract-job-builder.ts | 38 ++ .../lib/transcoding/shared/job-builders/index.ts | 2 + .../job-builders/transcoding-job-queue-builder.ts | 308 ++++++++++++++ .../job-builders/transcoding-runner-job-builder.ts | 189 +++++++++ .../lib/transcoding/transcoding-quick-transcode.ts | 61 +++ server/lib/transcoding/transcoding-resolutions.ts | 52 +++ server/lib/transcoding/transcoding.ts | 465 --------------------- server/lib/transcoding/web-transcoding.ts | 273 ++++++++++++ server/lib/uploadx.ts | 5 +- server/lib/video-blacklist.ts | 2 +- server/lib/video-file.ts | 54 ++- server/lib/video-studio.ts | 2 +- server/lib/video.ts | 63 +-- 54 files changed, 2902 insertions(+), 1014 deletions(-) create mode 100644 server/lib/job-queue/handlers/transcoding-job-builder.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/index.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts create mode 100644 server/lib/object-storage/proxy.ts create mode 100644 server/lib/runners/index.ts create mode 100644 server/lib/runners/job-handlers/abstract-job-handler.ts create mode 100644 server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/index.ts create mode 100644 server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/runner-job-handlers.ts create mode 100644 server/lib/runners/job-handlers/shared/index.ts create mode 100644 server/lib/runners/job-handlers/shared/vod-helpers.ts create mode 100644 server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts create mode 100644 server/lib/runners/runner-urls.ts create mode 100644 server/lib/runners/runner.ts create mode 100644 server/lib/schedulers/runner-job-watch-dog-scheduler.ts create mode 100644 server/lib/transcoding/create-transcoding-job.ts create mode 100644 server/lib/transcoding/ended-transcoding.ts create mode 100644 server/lib/transcoding/hls-transcoding.ts create mode 100644 server/lib/transcoding/shared/ffmpeg-builder.ts create mode 100644 server/lib/transcoding/shared/index.ts create mode 100644 server/lib/transcoding/shared/job-builders/abstract-job-builder.ts create mode 100644 server/lib/transcoding/shared/job-builders/index.ts create mode 100644 server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts create mode 100644 server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts create mode 100644 server/lib/transcoding/transcoding-quick-transcode.ts create mode 100644 server/lib/transcoding/transcoding-resolutions.ts delete mode 100644 server/lib/transcoding/transcoding.ts create mode 100644 server/lib/transcoding/web-transcoding.ts (limited to 'server/lib') diff --git a/server/lib/hls.ts b/server/lib/hls.ts index 053b5d326..fc1d7e1b0 100644 --- a/server/lib/hls.ts +++ b/server/lib/hls.ts @@ -3,10 +3,11 @@ import { flatten } from 'lodash' import PQueue from 'p-queue' import { basename, dirname, join } from 'path' import { MStreamingPlaylist, MStreamingPlaylistFilesVideo, MVideo } from '@server/types/models' -import { uniqify } from '@shared/core-utils' +import { uniqify, uuidRegex } from '@shared/core-utils' import { sha256 } from '@shared/extra-utils' +import { getVideoStreamDimensionsInfo } from '@shared/ffmpeg' import { VideoStorage } from '@shared/models' -import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamDimensionsInfo } from '../helpers/ffmpeg' +import { getAudioStreamCodec, getVideoStreamCodec } from '../helpers/ffmpeg' import { logger } from '../helpers/logger' import { doRequest, doRequestAndSaveToFile } from '../helpers/requests' import { generateRandomString } from '../helpers/utils' @@ -234,6 +235,16 @@ function downloadPlaylistSegments (playlistUrl: string, destinationDir: string, // --------------------------------------------------------------------------- +async function renameVideoFileInPlaylist (playlistPath: string, newVideoFilename: string) { + const content = await readFile(playlistPath, 'utf8') + + const newContent = content.replace(new RegExp(`${uuidRegex}-\\d+-fragmented.mp4`, 'g'), newVideoFilename) + + await writeFile(playlistPath, newContent, 'utf8') +} + +// --------------------------------------------------------------------------- + function injectQueryToPlaylistUrls (content: string, queryString: string) { return content.replace(/\.(m3u8|ts|mp4)/gm, '.$1?' + queryString) } @@ -247,7 +258,8 @@ export { downloadPlaylistSegments, updateStreamingPlaylistsInfohashesIfNeeded, updatePlaylistAfterFileChange, - injectQueryToPlaylistUrls + injectQueryToPlaylistUrls, + renameVideoFileInPlaylist } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts new file mode 100644 index 000000000..8b4a877d7 --- /dev/null +++ b/server/lib/job-queue/handlers/transcoding-job-builder.ts @@ -0,0 +1,47 @@ +import { Job } from 'bullmq' +import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' +import { UserModel } from '@server/models/user/user' +import { VideoModel } from '@server/models/video/video' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { pick } from '@shared/core-utils' +import { TranscodingJobBuilderPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' +import { JobQueue } from '../job-queue' + +async function processTranscodingJobBuilder (job: Job) { + const payload = job.data as TranscodingJobBuilderPayload + + logger.info('Processing transcoding job builder in job %s.', job.id) + + if (payload.optimizeJob) { + const video = await VideoModel.loadFull(payload.videoUUID) + const user = await UserModel.loadByVideoId(video.id) + const videoFile = video.getMaxQualityFile() + + await createOptimizeOrMergeAudioJobs({ + ...pick(payload.optimizeJob, [ 'isNewVideo' ]), + + video, + videoFile, + user + }) + } + + for (const job of (payload.jobs || [])) { + await JobQueue.Instance.createJob(job) + + await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') + } + + for (const sequentialJobs of (payload.sequentialJobs || [])) { + await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs) + + await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.length) + } +} + +// --------------------------------------------------------------------------- + +export { + processTranscodingJobBuilder +} diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index d950f6407..9a4550e4d 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -10,8 +10,8 @@ import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { MVideoFullLight } from '@server/types/models' import { getLowercaseExtension } from '@shared/core-utils' +import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' import { VideoFileImportPayload, VideoStorage } from '@shared/models' -import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' import { logger } from '../../../helpers/logger' import { JobQueue } from '../job-queue' diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4d361c7b9..2a063282c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -7,15 +7,16 @@ import { isPostImportVideoAccepted } from '@server/lib/moderation' import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { Hooks } from '@server/lib/plugins/hooks' import { ServerConfigManager } from '@server/lib/server-config-manager' +import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' import { isAbleToUploadVideo } from '@server/lib/user' -import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' +import { buildMoveToObjectStorageJob } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { ThumbnailModel } from '@server/models/video/thumbnail' import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' import { getLowercaseExtension } from '@shared/core-utils' -import { isAudioFile } from '@shared/extra-utils' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' import { ThumbnailType, VideoImportPayload, @@ -28,7 +29,6 @@ import { VideoResolution, VideoState } from '@shared/models' -import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '../../../helpers/ffmpeg' import { logger } from '../../../helpers/logger' import { getSecureTorrentName } from '../../../helpers/utils' import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' @@ -137,7 +137,7 @@ async function processFile (downloader: () => Promise, videoImport: MVid const { resolution } = await isAudioFile(tempVideoPath, probe) ? { resolution: VideoResolution.H_NOVIDEO } - : await getVideoStreamDimensionsInfo(tempVideoPath) + : await getVideoStreamDimensionsInfo(tempVideoPath, probe) const fps = await getVideoStreamFPS(tempVideoPath, probe) const duration = await getVideoStreamDuration(tempVideoPath, probe) @@ -313,9 +313,7 @@ async function afterImportSuccess (options: { } if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? - await JobQueue.Instance.createJob( - await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) - ) + await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user }) } } diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 2f3a971bd..1bf43f592 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,25 +1,25 @@ import { Job } from 'bullmq' import { readdir, remove } from 'fs-extra' import { join } from 'path' -import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' import { generateVideoMiniature } from '@server/lib/thumbnail' -import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' +import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding' +import { VideoPathManager } from '@server/lib/video-path-manager' import { moveToNextState } from '@server/lib/video-state' import { VideoModel } from '@server/models/video/video' import { VideoBlacklistModel } from '@server/models/video/video-blacklist' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' +import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' -import { VideoPathManager } from '@server/lib/video-path-manager' -import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' const lTags = loggerTagsFactory('live', 'job') @@ -224,6 +224,7 @@ async function assignReplayFilesToVideo (options: { const probe = await ffprobePromise(concatenatedTsFilePath) const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) + const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe) try { await generateHlsPlaylistResolutionFromTS({ @@ -231,6 +232,7 @@ async function assignReplayFilesToVideo (options: { inputFileMutexReleaser, concatenatedTsFilePath, resolution, + fps, isAAC: audioStream?.codec_name === 'aac' }) } catch (err) { diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 3e208d83d..991d11ef1 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts @@ -1,15 +1,16 @@ import { Job } from 'bullmq' import { move, remove } from 'fs-extra' import { join } from 'path' -import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' +import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' import { CONFIG } from '@server/initializers/config' +import { VIDEO_FILTERS } from '@server/initializers/constants' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { generateWebTorrentVideoFilename } from '@server/lib/paths' +import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '@server/lib/user' -import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' -import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' +import { buildFileMetadata, removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' import { VideoPathManager } from '@server/lib/video-path-manager' import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' import { UserModel } from '@server/models/user/user' @@ -17,15 +18,8 @@ import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' import { getLowercaseExtension, pick } from '@shared/core-utils' -import { - buildFileMetadata, - buildUUID, - ffprobePromise, - getFileSize, - getVideoStreamDimensionsInfo, - getVideoStreamDuration, - getVideoStreamFPS -} from '@shared/extra-utils' +import { buildUUID, getFileSize } from '@shared/extra-utils' +import { FFmpegEdition, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg' import { VideoStudioEditionPayload, VideoStudioTask, @@ -36,7 +30,6 @@ import { VideoStudioTaskWatermarkPayload } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' -import { JobQueue } from '../job-queue' const lTagsBase = loggerTagsFactory('video-edition') @@ -102,9 +95,7 @@ async function processVideoStudioEdition (job: Job) { const user = await UserModel.loadByVideoId(video.id) - await JobQueue.Instance.createJob( - await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) - ) + await createOptimizeOrMergeAudioJobs({ video, videoFile: newFile, isNewVideo: false, user }) } // --------------------------------------------------------------------------- @@ -131,9 +122,9 @@ const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessor } async function processTask (options: TaskProcessorOptions) { - const { video, task } = options + const { video, task, lTags } = options - logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) + logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags }) const processor = taskProcessors[options.task.name] if (!process) throw new Error('Unknown task ' + task.name) @@ -142,48 +133,53 @@ async function processTask (options: TaskProcessorOptions) { } function processAddIntroOutro (options: TaskProcessorOptions) { - const { task } = options + const { task, lTags } = options + + logger.debug('Will add intro/outro to the video.', { options, ...lTags }) - return addIntroOutro({ + return buildFFmpegEdition().addIntroOutro({ ...pick(options, [ 'inputPath', 'outputPath' ]), introOutroPath: task.options.file, type: task.name === 'add-intro' ? 'intro' - : 'outro', - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE + : 'outro' }) } function processCut (options: TaskProcessorOptions) { - const { task } = options + const { task, lTags } = options - return cutVideo({ + logger.debug('Will cut the video.', { options, ...lTags }) + + return buildFFmpegEdition().cutVideo({ ...pick(options, [ 'inputPath', 'outputPath' ]), start: task.options.start, - end: task.options.end, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE + end: task.options.end }) } function processAddWatermark (options: TaskProcessorOptions) { - const { task } = options + const { task, lTags } = options + + logger.debug('Will add watermark to the video.', { options, ...lTags }) - return addWatermark({ + return buildFFmpegEdition().addWatermark({ ...pick(options, [ 'inputPath', 'outputPath' ]), watermarkPath: task.options.file, - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE + videoFilters: { + watermarkSizeRatio: VIDEO_FILTERS.WATERMARK.SIZE_RATIO, + horitonzalMarginRatio: VIDEO_FILTERS.WATERMARK.HORIZONTAL_MARGIN_RATIO, + verticalMarginRatio: VIDEO_FILTERS.WATERMARK.VERTICAL_MARGIN_RATIO + } }) } +// --------------------------------------------------------------------------- + async function buildNewFile (video: MVideoId, path: string) { const videoFile = new VideoFileModel({ extname: getLowercaseExtension(path), @@ -223,3 +219,7 @@ async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStud throw new Error('Quota exceeded for this user to edit the video') } } + +function buildFFmpegEdition () { + return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) +} diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 3e6d23363..17b717275 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,13 +1,13 @@ import { Job } from 'bullmq' -import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' -import { Hooks } from '@server/lib/plugins/hooks' -import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' +import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding' +import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding' +import { removeAllWebTorrentFiles } from '@server/lib/video-file' import { VideoPathManager } from '@server/lib/video-path-manager' -import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' +import { moveToFailedTranscodingState } from '@server/lib/video-state' import { UserModel } from '@server/models/user/user' import { VideoJobInfoModel } from '@server/models/video/video-job-info' -import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' -import { pick } from '@shared/core-utils' +import { MUser, MUserId, MVideoFullLight } from '@server/types/models' import { HLSTranscodingPayload, MergeAudioTranscodingPayload, @@ -15,18 +15,8 @@ import { OptimizeTranscodingPayload, VideoTranscodingPayload } from '@shared/models' -import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg' import { logger, loggerTagsFactory } from '../../../helpers/logger' -import { CONFIG } from '../../../initializers/config' import { VideoModel } from '../../../models/video/video' -import { - generateHlsPlaylistResolution, - mergeAudioVideofile, - optimizeOriginalVideofile, - transcodeNewWebTorrentResolution -} from '../../transcoding/transcoding' -import { JobQueue } from '../job-queue' type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise @@ -84,260 +74,72 @@ export { // Job handlers // --------------------------------------------------------------------------- -async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { - logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) - - const videoFileInput = payload.copyCodecs - ? video.getWebTorrentFile(payload.resolution) - : video.getMaxQualityFile() - - const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() - - const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - await videoFileInput.getVideo().reload() - - await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { - return generateHlsPlaylistResolution({ - video, - videoInputPath, - inputFileMutexReleaser, - resolution: payload.resolution, - copyCodecs: payload.copyCodecs, - job - }) - }) - } finally { - inputFileMutexReleaser() - } - - logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - - await onHlsPlaylistGeneration(video, user, payload) -} - -async function handleNewWebTorrentResolutionJob ( - job: Job, - payload: NewWebTorrentResolutionTranscodingPayload, - video: MVideoFullLight, - user: MUserId -) { - logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) - - await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job }) - - logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - - await onNewWebTorrentFileResolution(video, user, payload) -} - async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) - await mergeAudioVideofile({ video, resolution: payload.resolution, job }) + await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job }) logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) - const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) + await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job }) logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } -// --------------------------------------------------------------------------- - -async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { - if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { - // Remove webtorrent files if not enabled - for (const file of video.VideoFiles) { - await video.removeWebTorrentFile(file) - await file.destroy() - } - - video.VideoFiles = [] - - // Create HLS new resolution jobs - await createLowerResolutionsJobs({ - video, - user, - videoFileResolution: payload.resolution, - hasAudio: payload.hasAudio, - isNewVideo: payload.isNewVideo ?? true, - type: 'hls' - }) - } - - await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') - await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) -} +async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) { + logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) -async function onVideoFirstWebTorrentTranscoding ( - videoArg: MVideoWithFile, - payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, - transcodeType: TranscodeVODOptionsType, - user: MUserId -) { - const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) + await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job }) - try { - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadFull(videoArg.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile() - - // Generate HLS version of the original file - const originalFileHLSPayload = { - ...payload, - - hasAudio: !!audioStream, - resolution: videoDatabase.getMaxQualityFile().resolution, - // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues - copyCodecs: transcodeType !== 'quick-transcode', - isMaxQuality: true - } - const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) - const hasNewResolutions = await createLowerResolutionsJobs({ - video: videoDatabase, - user, - videoFileResolution: resolution, - hasAudio: !!audioStream, - type: 'webtorrent', - isNewVideo: payload.isNewVideo ?? true - }) - - await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') + logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - // Move to next state if there are no other resolutions to generate - if (!hasHls && !hasNewResolutions) { - await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) - } - } finally { - mutexReleaser() - } + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } -async function onNewWebTorrentFileResolution ( - video: MVideo, - user: MUserId, - payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload -) { - if (payload.createHLSIfNeeded) { - await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) - } - - await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') +async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { + logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) - await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) -} + const videoFileInput = payload.copyCodecs + ? video.getWebTorrentFile(payload.resolution) + : video.getMaxQualityFile() -// --------------------------------------------------------------------------- + const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() -async function createHlsJobIfEnabled (user: MUserId, payload: { - videoUUID: string - resolution: number - hasAudio: boolean - copyCodecs: boolean - isMaxQuality: boolean - isNewVideo?: boolean -}) { - if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false - - const jobOptions = { - priority: await getTranscodingJobPriority(user) - } + const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - const hlsTranscodingPayload: HLSTranscodingPayload = { - type: 'new-resolution-to-hls', - autoDeleteWebTorrentIfNeeded: true, + try { + await videoFileInput.getVideo().reload() - ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) + await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { + return generateHlsPlaylistResolution({ + video, + videoInputPath, + inputFileMutexReleaser, + resolution: payload.resolution, + fps: payload.fps, + copyCodecs: payload.copyCodecs, + job + }) + }) + } finally { + inputFileMutexReleaser() } - await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) - - return true -} - -async function createLowerResolutionsJobs (options: { - video: MVideoFullLight - user: MUserId - videoFileResolution: number - hasAudio: boolean - isNewVideo: boolean - type: 'hls' | 'webtorrent' -}) { - const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options - - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = await Hooks.wrapObject( - computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), - 'filter:transcoding.auto.resolutions-to-transcode.result', - options - ) - - const resolutionCreated: string[] = [] - - for (const resolution of resolutionsEnabled) { - let dataInput: VideoTranscodingPayload - - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { - // WebTorrent will create subsequent HLS job - dataInput = { - type: 'new-resolution-to-webtorrent', - videoUUID: video.uuid, - resolution, - hasAudio, - createHLSIfNeeded: true, - isNewVideo - } - - resolutionCreated.push('webtorrent-' + resolution) - } - - if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { - dataInput = { - type: 'new-resolution-to-hls', - videoUUID: video.uuid, - resolution, - hasAudio, - copyCodecs: false, - isMaxQuality: false, - autoDeleteWebTorrentIfNeeded: true, - isNewVideo - } - - resolutionCreated.push('hls-' + resolution) - } - - if (!dataInput) continue - - const jobOptions = { - priority: await getTranscodingJobPriority(user) - } - - await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) - } + logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - if (resolutionCreated.length === 0) { - logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) + if (payload.deleteWebTorrentFiles === true) { + logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid)) - return false + await removeAllWebTorrentFiles(video) } - logger.info( - 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, - { resolutionCreated, ...lTags(video.uuid) } - ) - - return true + await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index cc6be0bd8..21bf0f226 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -31,6 +31,7 @@ import { MoveObjectStoragePayload, NotifyPayload, RefreshPayload, + TranscodingJobBuilderPayload, VideoChannelImportPayload, VideoFileImportPayload, VideoImportPayload, @@ -56,6 +57,7 @@ import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processNotify } from './handlers/notify' +import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' import { processVideoChannelImport } from './handlers/video-channel-import' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' @@ -69,11 +71,12 @@ export type CreateJobArgument = { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | - { type: 'activitypub-http-cleaner', payload: {} } | + { type: 'activitypub-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | + { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views-stats', payload: {} } | @@ -96,28 +99,29 @@ export type CreateJobOptions = { } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, - 'activitypub-http-unicast': processActivityPubHttpUnicast, - 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, - 'video-file-import': processVideoFileImport, - 'video-transcoding': processVideoTranscoding, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-http-unicast': processActivityPubHttpUnicast, + 'activitypub-refresher': refreshAPObject, + 'actor-keys': processActorKeys, + 'after-video-channel-import': processAfterVideoChannelImport, 'email': processEmail, + 'federate-video': processFederateVideo, + 'transcoding-job-builder': processTranscodingJobBuilder, + 'manage-video-torrent': processManageVideoTorrent, + 'move-to-object-storage': processMoveToObjectStorage, + 'notify': processNotify, + 'video-channel-import': processVideoChannelImport, + 'video-file-import': processVideoFileImport, 'video-import': processVideoImport, - 'videos-views-stats': processVideosViewsStats, - 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, - 'actor-keys': processActorKeys, 'video-redundancy': processVideoRedundancy, - 'move-to-object-storage': processMoveToObjectStorage, - 'manage-video-torrent': processManageVideoTorrent, 'video-studio-edition': processVideoStudioEdition, - 'video-channel-import': processVideoChannelImport, - 'after-video-channel-import': processAfterVideoChannelImport, - 'notify': processNotify, - 'federate-video': processFederateVideo + 'video-transcoding': processVideoTranscoding, + 'videos-views-stats': processVideosViewsStats } const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } = { @@ -125,28 +129,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } } const jobTypes: JobType[] = [ + 'activitypub-cleaner', 'activitypub-follow', - 'activitypub-http-broadcast', 'activitypub-http-broadcast-parallel', + 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', - 'activitypub-cleaner', + 'activitypub-refresher', + 'actor-keys', + 'after-video-channel-import', 'email', - 'video-transcoding', + 'federate-video', + 'transcoding-job-builder', + 'manage-video-torrent', + 'move-to-object-storage', + 'notify', + 'video-channel-import', 'video-file-import', 'video-import', - 'videos-views-stats', - 'activitypub-refresher', - 'video-redundancy', - 'actor-keys', 'video-live-ending', - 'move-to-object-storage', - 'manage-video-torrent', + 'video-redundancy', 'video-studio-edition', - 'video-channel-import', - 'after-video-channel-import', - 'notify', - 'federate-video' + 'video-transcoding', + 'videos-views-stats' ] const silentFailure = new Set([ 'activitypub-http-unicast' ]) diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 05274955d..aa32a9d52 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' import { join } from 'path' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' -import { - computeResolutionsToTranscode, - ffprobePromise, - getLiveSegmentTime, - getVideoStreamBitrate, - getVideoStreamDimensionsInfo, - getVideoStreamFPS, - hasAudioStream -} from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' import { VIDEO_LIVE } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' import { pick, wait } from '@shared/core-utils' +import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg' import { LiveVideoError, VideoState } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' import { getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { Hooks } from '../plugins/hooks' +import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions' import { LiveQuotaStore } from './live-quota-store' -import { cleanupAndDestroyPermanentLive } from './live-utils' +import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils' import { MuxingSession } from './shared' -import { sequelizeTypescript } from '@server/initializers/database' -import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' +import { RunnerJobModel } from '@server/models/runner/runner-job' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') const context = require('node-media-server/src/node_core_ctx') @@ -57,7 +51,7 @@ class LiveManager { private static instance: LiveManager private readonly muxingSessions = new Map() - private readonly videoSessions = new Map() + private readonly videoSessions = new Map() private rtmpServer: Server private rtmpsServer: ServerTLS @@ -177,14 +171,19 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number, error: LiveVideoError | null) { - const sessionId = this.videoSessions.get(videoId) - if (!sessionId) return + stopSessionOf (videoUUID: string, error: LiveVideoError | null) { + const sessionId = this.videoSessions.get(videoUUID) + if (!sessionId) { + logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID)) + return + } - this.saveEndingSession(videoId, error) - .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) + logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) - this.videoSessions.delete(videoId) + this.saveEndingSession(videoUUID, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) })) + + this.videoSessions.delete(videoUUID) this.abortSession(sessionId) } @@ -221,6 +220,11 @@ class LiveManager { return this.abortSession(sessionId) } + if (this.videoSessions.has(video.uuid)) { + logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid)) + return this.abortSession(sessionId) + } + // Cleanup old potential live (could happen with a permanent live) const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) if (oldStreamingPlaylist) { @@ -229,7 +233,7 @@ class LiveManager { await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) } - this.videoSessions.set(video.id, sessionId) + this.videoSessions.set(video.uuid, sessionId) const now = Date.now() const probe = await ffprobePromise(inputUrl) @@ -253,7 +257,7 @@ class LiveManager { ) logger.info( - 'Will mux/transcode live video of original resolution %d.', resolution, + 'Handling live video of original resolution %d.', resolution, { allResolutions, ...lTags(sessionId, video.uuid) } ) @@ -301,44 +305,44 @@ class LiveManager { muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) - muxingSession.on('bad-socket-health', ({ videoId }) => { + muxingSession.on('bad-socket-health', ({ videoUUID }) => { logger.error( 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + ' Stopping session of video %s.', videoUUID, localLTags ) - this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) + this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH) }) - muxingSession.on('duration-exceeded', ({ videoId }) => { + muxingSession.on('duration-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) + this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED) }) - muxingSession.on('quota-exceeded', ({ videoId }) => { + muxingSession.on('quota-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) + this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED) }) - muxingSession.on('ffmpeg-error', ({ videoId }) => { - this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) + muxingSession.on('transcoding-error', ({ videoUUID }) => { + this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId, sessionId) + muxingSession.on('transcoding-end', ({ videoUUID }) => { + this.onMuxingFFmpegEnd(videoUUID, sessionId) }) - muxingSession.on('after-cleanup', ({ videoId }) => { + muxingSession.on('after-cleanup', ({ videoUUID }) => { this.muxingSessions.delete(sessionId) LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) muxingSession.destroy() - return this.onAfterMuxingCleanup({ videoId, liveSession }) + return this.onAfterMuxingCleanup({ videoUUID, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -379,22 +383,24 @@ class LiveManager { } } - private onMuxingFFmpegEnd (videoId: number, sessionId: string) { - this.videoSessions.delete(videoId) + private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) { + this.videoSessions.delete(videoUUID) - this.saveEndingSession(videoId, null) + this.saveEndingSession(videoUUID, null) .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } private async onAfterMuxingCleanup (options: { - videoId: number | string + videoUUID: string liveSession?: MVideoLiveSession cleanupNow?: boolean // Default false }) { - const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options + const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options + + logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID)) try { - const fullVideo = await VideoModel.loadFull(videoId) + const fullVideo = await VideoModel.loadFull(videoUUID) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) @@ -437,15 +443,17 @@ class LiveManager { await federateVideoIfNeeded(fullVideo, false) } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) + logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) }) } } private async handleBrokenLives () { + await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' }) + const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() for (const uuid of videoUUIDs) { - await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) + await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true }) } } @@ -494,8 +502,8 @@ class LiveManager { }) } - private async saveEndingSession (videoId: number, error: LiveVideoError | null) { - const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) + private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID) if (!liveSession) return liveSession.endDate = new Date() diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts index 4d03754a9..251301141 100644 --- a/server/lib/live/live-segment-sha-store.ts +++ b/server/lib/live/live-segment-sha-store.ts @@ -52,7 +52,10 @@ class LiveSegmentShaStore { logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) if (!this.segmentsSha256.has(segmentName)) { - logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID)) + logger.warn( + 'Unknown segment in live segment hash store for video %s and segment %s.', + this.videoUUID, segmentPath, lTags(this.videoUUID) + ) return } diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index c0dec9829..3fb3ce1ce 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts @@ -1,8 +1,9 @@ import { pathExists, readdir, remove } from 'fs-extra' import { basename, join } from 'path' import { logger } from '@server/helpers/logger' +import { VIDEO_LIVE } from '@server/initializers/constants' import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' -import { VideoStorage } from '@shared/models' +import { LiveVideoLatencyMode, VideoStorage } from '@shared/models' import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage' import { getLiveDirectory } from '../paths' @@ -37,10 +38,19 @@ async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreaming await cleanupTMPLiveFilesFromFilesystem(video) } +function getLiveSegmentTime (latencyMode: LiveVideoLatencyMode) { + if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) { + return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY + } + + return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY +} + export { cleanupAndDestroyPermanentLive, cleanupUnsavedNormalLive, cleanupTMPLiveFiles, + getLiveSegmentTime, buildConcatenatedName } diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 2727fc4a7..f3f8fc886 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -1,11 +1,10 @@ import { mapSeries } from 'bluebird' import { FSWatcher, watch } from 'chokidar' -import { FfmpegCommand } from 'fluent-ffmpeg' +import { EventEmitter } from 'events' import { appendFile, ensureDir, readFile, stat } from 'fs-extra' import PQueue from 'p-queue' import { basename, join } from 'path' -import { EventEmitter } from 'stream' -import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' +import { computeOutputFPS } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' @@ -20,24 +19,24 @@ import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' -import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' import { LiveQuotaStore } from '../live-quota-store' import { LiveSegmentShaStore } from '../live-segment-sha-store' -import { buildConcatenatedName } from '../live-utils' +import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils' +import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper' import memoizee = require('memoizee') interface MuxingSessionEvents { - 'live-ready': (options: { videoId: number }) => void + 'live-ready': (options: { videoUUID: string }) => void - 'bad-socket-health': (options: { videoId: number }) => void - 'duration-exceeded': (options: { videoId: number }) => void - 'quota-exceeded': (options: { videoId: number }) => void + 'bad-socket-health': (options: { videoUUID: string }) => void + 'duration-exceeded': (options: { videoUUID: string }) => void + 'quota-exceeded': (options: { videoUUID: string }) => void - 'ffmpeg-end': (options: { videoId: number }) => void - 'ffmpeg-error': (options: { videoId: number }) => void + 'transcoding-end': (options: { videoUUID: string }) => void + 'transcoding-error': (options: { videoUUID: string }) => void - 'after-cleanup': (options: { videoId: number }) => void + 'after-cleanup': (options: { videoUUID: string }) => void } declare interface MuxingSession { @@ -52,7 +51,7 @@ declare interface MuxingSession { class MuxingSession extends EventEmitter { - private ffmpegCommand: FfmpegCommand + private transcodingWrapper: AbstractTranscodingWrapper private readonly context: any private readonly user: MUserId @@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter { private readonly hasAudio: boolean - private readonly videoId: number private readonly videoUUID: string private readonly saveReplay: boolean @@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter { this.allResolutions = options.allResolutions - this.videoId = this.videoLive.Video.id this.videoUUID = this.videoLive.Video.uuid this.saveReplay = this.videoLive.saveReplay @@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter { await this.prepareDirectories() - this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED - ? await getLiveTranscodingCommand({ - inputUrl: this.inputUrl, + this.transcodingWrapper = this.buildTranscodingWrapper() - outPath: this.outDirectory, - masterPlaylistName: this.streamingPlaylist.playlistFilename, + this.transcodingWrapper.on('end', () => this.onTranscodedEnded()) + this.transcodingWrapper.on('error', () => this.onTranscodingError()) - latencyMode: this.videoLive.latencyMode, - - resolutions: this.allResolutions, - fps: this.fps, - bitrate: this.bitrate, - ratio: this.ratio, - - hasAudio: this.hasAudio, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.LIVE.TRANSCODING.PROFILE - }) - : getLiveMuxingCommand({ - inputUrl: this.inputUrl, - outPath: this.outDirectory, - masterPlaylistName: this.streamingPlaylist.playlistFilename, - latencyMode: this.videoLive.latencyMode - }) - - logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) + await this.transcodingWrapper.run() this.watchMasterFile() this.watchTSFiles() this.watchM3U8File() - - let ffmpegShellCommand: string - this.ffmpegCommand.on('start', cmdline => { - ffmpegShellCommand = cmdline - - logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) - }) - - this.ffmpegCommand.on('error', (err, stdout, stderr) => { - this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) - }) - - this.ffmpegCommand.on('end', () => { - this.emit('ffmpeg-end', ({ videoId: this.videoId })) - - this.onFFmpegEnded() - }) - - this.ffmpegCommand.run() } abort () { - if (!this.ffmpegCommand) return + if (!this.transcodingWrapper) return this.aborted = true - this.ffmpegCommand.kill('SIGINT') + this.transcodingWrapper.abort() } destroy () { @@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter { this.hasClientSocketInBadHealthWithCache.clear() } - private onFFmpegError (options: { - err: any - stdout: string - stderr: string - ffmpegShellCommand: string - }) { - const { err, stdout, stderr, ffmpegShellCommand } = options - - this.onFFmpegEnded() - - // Don't care that we killed the ffmpeg process - if (err?.message?.includes('Exiting normally')) return - - logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) - - this.emit('ffmpeg-error', ({ videoId: this.videoId })) - } - - private onFFmpegEnded () { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) - - setTimeout(() => { - // Wait latest segments generation, and close watchers - - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) - .then(() => { - // Process remaining segments hash - for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { - this.processSegments(this.segmentsToProcessPerPlaylist[key]) - } - }) - .catch(err => { - logger.error( - 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, - { err, ...this.lTags() } - ) - }) - - this.emit('after-cleanup', { videoId: this.videoId }) - }, 1000) - } - private watchMasterFile () { this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) @@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter { this.masterPlaylistCreated = true + logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) + this.masterWatcher.close() .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) }) @@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter { this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { - this.emit('bad-socket-health', { videoId: this.videoId }) + this.emit('bad-socket-health', { videoUUID: this.videoUUID }) return } // Duration constraint check if (this.isDurationConstraintValid(startStreamDateTime) !== true) { - this.emit('duration-exceeded', { videoId: this.videoId }) + this.emit('duration-exceeded', { videoUUID: this.videoUUID }) return } // Check user quota if the user enabled replay saving if (await this.isQuotaExceeded(segmentPath) === true) { - this.emit('quota-exceeded', { videoId: this.videoId }) + this.emit('quota-exceeded', { videoUUID: this.videoUUID }) } } @@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter { if (this.masterPlaylistCreated && !this.liveReady) { this.liveReady = true - this.emit('live-ready', { videoId: this.videoId }) + this.emit('live-ready', { videoUUID: this.videoUUID }) } } + private onTranscodingError () { + this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) + } + + private onTranscodedEnded () { + this.emit('transcoding-end', ({ videoUUID: this.videoUUID })) + + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) + + setTimeout(() => { + // Wait latest segments generation, and close watchers + + Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) + .then(() => { + // Process remaining segments hash + for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { + this.processSegments(this.segmentsToProcessPerPlaylist[key]) + } + }) + .catch(err => { + logger.error( + 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, + { err, ...this.lTags() } + ) + }) + + this.emit('after-cleanup', { videoUUID: this.videoUUID }) + }, 1000) + } + private hasClientSocketInBadHealth (sessionId: string) { const rtmpSession = this.context.sessions.get(sessionId) @@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter { sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED }) } + + private buildTranscodingWrapper () { + const options = { + streamingPlaylist: this.streamingPlaylist, + videoLive: this.videoLive, + + lTags: this.lTags, + + inputUrl: this.inputUrl, + + toTranscode: this.allResolutions.map(resolution => ({ + resolution, + fps: computeOutputFPS({ inputFPS: this.fps, resolution }) + })), + + fps: this.fps, + bitrate: this.bitrate, + ratio: this.ratio, + hasAudio: this.hasAudio, + + segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, + segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode), + + outDirectory: this.outDirectory + } + + return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED + ? new RemoteTranscodingWrapper(options) + : new FFmpegTranscodingWrapper(options) + } } // --------------------------------------------------------------------------- diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts new file mode 100644 index 000000000..226ba4573 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts @@ -0,0 +1,101 @@ +import EventEmitter from 'events' +import { LoggerTagsFn } from '@server/helpers/logger' +import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models' +import { LiveVideoError } from '@shared/models' + +interface TranscodingWrapperEvents { + 'end': () => void + + 'error': (options: { err: Error }) => void +} + +declare interface AbstractTranscodingWrapper { + on( + event: U, listener: TranscodingWrapperEvents[U] + ): this + + emit( + event: U, ...args: Parameters + ): boolean +} + +interface AbstractTranscodingWrapperOptions { + streamingPlaylist: MStreamingPlaylistVideo + videoLive: MVideoLiveVideo + + lTags: LoggerTagsFn + + inputUrl: string + fps: number + toTranscode: { + resolution: number + fps: number + }[] + + bitrate: number + ratio: number + hasAudio: boolean + + segmentListSize: number + segmentDuration: number + + outDirectory: string +} + +abstract class AbstractTranscodingWrapper extends EventEmitter { + protected readonly videoLive: MVideoLiveVideo + + protected readonly toTranscode: { + resolution: number + fps: number + }[] + + protected readonly inputUrl: string + protected readonly fps: number + protected readonly bitrate: number + protected readonly ratio: number + protected readonly hasAudio: boolean + + protected readonly segmentListSize: number + protected readonly segmentDuration: number + + protected readonly videoUUID: string + + protected readonly outDirectory: string + + protected readonly lTags: LoggerTagsFn + + protected readonly streamingPlaylist: MStreamingPlaylistVideo + + constructor (options: AbstractTranscodingWrapperOptions) { + super() + + this.lTags = options.lTags + + this.videoLive = options.videoLive + this.videoUUID = options.videoLive.Video.uuid + this.streamingPlaylist = options.streamingPlaylist + + this.inputUrl = options.inputUrl + this.fps = options.fps + this.toTranscode = options.toTranscode + + this.bitrate = options.bitrate + this.ratio = options.ratio + this.hasAudio = options.hasAudio + + this.segmentListSize = options.segmentListSize + this.segmentDuration = options.segmentDuration + + this.outDirectory = options.outDirectory + } + + abstract run (): Promise + + abstract abort (error?: LiveVideoError): void +} + +export { + AbstractTranscodingWrapper, + AbstractTranscodingWrapperOptions +} diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts new file mode 100644 index 000000000..1f4c12bd4 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts @@ -0,0 +1,95 @@ +import { FfmpegCommand } from 'fluent-ffmpeg' +import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { VIDEO_LIVE } from '@server/initializers/constants' +import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' +import { FFmpegLive } from '@shared/ffmpeg' +import { getLiveSegmentTime } from '../../live-utils' +import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' + +export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { + private ffmpegCommand: FfmpegCommand + private ended = false + + async run () { + this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED + ? await this.buildFFmpegLive().getLiveTranscodingCommand({ + inputUrl: this.inputUrl, + + outPath: this.outDirectory, + masterPlaylistName: this.streamingPlaylist.playlistFilename, + + segmentListSize: this.segmentListSize, + segmentDuration: this.segmentDuration, + + toTranscode: this.toTranscode, + + bitrate: this.bitrate, + ratio: this.ratio, + + hasAudio: this.hasAudio + }) + : this.buildFFmpegLive().getLiveMuxingCommand({ + inputUrl: this.inputUrl, + outPath: this.outDirectory, + + masterPlaylistName: this.streamingPlaylist.playlistFilename, + + segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, + segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode) + }) + + logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags()) + + this.ffmpegCommand.run() + + let ffmpegShellCommand: string + this.ffmpegCommand.on('start', cmdline => { + ffmpegShellCommand = cmdline + + logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) + }) + + this.ffmpegCommand.on('error', (err, stdout, stderr) => { + this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) + }) + + this.ffmpegCommand.on('end', () => { + this.onFFmpegEnded() + }) + + this.ffmpegCommand.run() + } + + abort () { + // Nothing to do, ffmpeg will automatically exit + } + + private onFFmpegError (options: { + err: any + stdout: string + stderr: string + ffmpegShellCommand: string + }) { + const { err, stdout, stderr, ffmpegShellCommand } = options + + // Don't care that we killed the ffmpeg process + if (err?.message?.includes('Exiting normally')) return + + logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) + + this.emit('error', { err }) + } + + private onFFmpegEnded () { + if (this.ended) return + + this.ended = true + this.emit('end') + } + + private buildFFmpegLive () { + return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) + } +} diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts new file mode 100644 index 000000000..ae28fa1ca --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/index.ts @@ -0,0 +1,3 @@ +export * from './abstract-transcoding-wrapper' +export * from './ffmpeg-transcoding-wrapper' +export * from './remote-transcoding-wrapper' diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts new file mode 100644 index 000000000..345eaf442 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts @@ -0,0 +1,20 @@ +import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners' +import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' + +export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper { + async run () { + await new LiveRTMPHLSTranscodingJobHandler().create({ + rtmpUrl: this.inputUrl, + toTranscode: this.toTranscode, + video: this.videoLive.Video, + outputDirectory: this.outDirectory, + playlist: this.streamingPlaylist, + segmentListSize: this.segmentListSize, + segmentDuration: this.segmentDuration + }) + } + + abort () { + this.emit('end') + } +} diff --git a/server/lib/object-storage/index.ts b/server/lib/object-storage/index.ts index 8b413a40e..6525f8dfb 100644 --- a/server/lib/object-storage/index.ts +++ b/server/lib/object-storage/index.ts @@ -1,3 +1,4 @@ export * from './keys' +export * from './proxy' export * from './urls' export * from './videos' diff --git a/server/lib/object-storage/proxy.ts b/server/lib/object-storage/proxy.ts new file mode 100644 index 000000000..c782a8a25 --- /dev/null +++ b/server/lib/object-storage/proxy.ts @@ -0,0 +1,97 @@ +import express from 'express' +import { PassThrough, pipeline } from 'stream' +import { GetObjectCommandOutput } from '@aws-sdk/client-s3' +import { buildReinjectVideoFileTokenQuery } from '@server/controllers/shared/m3u8-playlist' +import { logger } from '@server/helpers/logger' +import { StreamReplacer } from '@server/helpers/stream-replacer' +import { MStreamingPlaylist, MVideo } from '@server/types/models' +import { HttpStatusCode } from '@shared/models' +import { injectQueryToPlaylistUrls } from '../hls' +import { getHLSFileReadStream, getWebTorrentFileReadStream } from './videos' + +export async function proxifyWebTorrentFile (options: { + req: express.Request + res: express.Response + filename: string +}) { + const { req, res, filename } = options + + logger.debug('Proxifying WebTorrent file %s from object storage.', filename) + + try { + const { response: s3Response, stream } = await getWebTorrentFileReadStream({ + filename, + rangeHeader: req.header('range') + }) + + setS3Headers(res, s3Response) + + return stream.pipe(res) + } catch (err) { + return handleObjectStorageFailure(res, err) + } +} + +export async function proxifyHLS (options: { + req: express.Request + res: express.Response + playlist: MStreamingPlaylist + video: MVideo + filename: string + reinjectVideoFileToken: boolean +}) { + const { req, res, playlist, video, filename, reinjectVideoFileToken } = options + + logger.debug('Proxifying HLS file %s from object storage.', filename) + + try { + const { response: s3Response, stream } = await getHLSFileReadStream({ + playlist: playlist.withVideo(video), + filename, + rangeHeader: req.header('range') + }) + + setS3Headers(res, s3Response) + + const streamReplacer = reinjectVideoFileToken + ? new StreamReplacer(line => injectQueryToPlaylistUrls(line, buildReinjectVideoFileTokenQuery(req, filename.endsWith('master.m3u8')))) + : new PassThrough() + + return pipeline( + stream, + streamReplacer, + res, + err => { + if (!err) return + + handleObjectStorageFailure(res, err) + } + ) + } catch (err) { + return handleObjectStorageFailure(res, err) + } +} + +// --------------------------------------------------------------------------- +// Private +// --------------------------------------------------------------------------- + +function handleObjectStorageFailure (res: express.Response, err: Error) { + if (err.name === 'NoSuchKey') { + logger.debug('Could not find key in object storage to proxify private HLS video file.', { err }) + return res.sendStatus(HttpStatusCode.NOT_FOUND_404) + } + + return res.fail({ + status: HttpStatusCode.INTERNAL_SERVER_ERROR_500, + message: err.message, + type: err.name + }) +} + +function setS3Headers (res: express.Response, s3Response: GetObjectCommandOutput) { + if (s3Response.$metadata.httpStatusCode === HttpStatusCode.PARTIAL_CONTENT_206) { + res.setHeader('Content-Range', s3Response.ContentRange) + res.status(HttpStatusCode.PARTIAL_CONTENT_206) + } +} diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 0398ca61d..ded7e9743 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http' import { Namespace, Server as SocketServer, Socket } from 'socket.io' import { isIdValid } from '@server/helpers/custom-validators/misc' import { MVideo, MVideoImmutable } from '@server/types/models' +import { MRunner } from '@server/types/models/runners' import { UserNotificationModelForApi } from '@server/types/models/user' import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { logger } from '../helpers/logger' -import { authenticateSocket } from '../middlewares' +import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' +import { Debounce } from '@server/helpers/debounce' class PeerTubeSocket { @@ -13,6 +15,7 @@ class PeerTubeSocket { private userNotificationSockets: { [ userId: number ]: Socket[] } = {} private liveVideosNamespace: Namespace + private readonly runnerSockets = new Set() private constructor () {} @@ -24,7 +27,7 @@ class PeerTubeSocket { .on('connection', socket => { const userId = socket.handshake.auth.user.id - logger.debug('User %d connected on the notification system.', userId) + logger.debug('User %d connected to the notification system.', userId) if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] @@ -53,6 +56,22 @@ class PeerTubeSocket { socket.leave(videoId) }) }) + + io.of('/runners') + .use(authenticateRunnerSocket) + .on('connection', socket => { + const runner: MRunner = socket.handshake.auth.runner + + logger.debug(`New runner "${runner.name}" connected to the notification system.`) + + this.runnerSockets.add(socket) + + socket.on('disconnect', () => { + logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) + + this.runnerSockets.delete(socket) + }) + }) } sendNotification (userId: number, notification: UserNotificationModelForApi) { @@ -89,6 +108,15 @@ class PeerTubeSocket { .emit(type, data) } + @Debounce({ timeoutMS: 1000 }) + sendAvailableJobsPingToRunners () { + logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) + + for (const runners of this.runnerSockets) { + runners.emit('available-jobs') + } + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts index 66383af46..92ef87cca 100644 --- a/server/lib/plugins/plugin-helpers-builder.ts +++ b/server/lib/plugins/plugin-helpers-builder.ts @@ -1,7 +1,6 @@ import express from 'express' import { Server } from 'http' import { join } from 'path' -import { ffprobePromise } from '@server/helpers/ffmpeg/ffprobe-utils' import { buildLogger } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { WEBSERVER } from '@server/initializers/constants' @@ -16,6 +15,7 @@ import { VideoModel } from '@server/models/video/video' import { VideoBlacklistModel } from '@server/models/video/video-blacklist' import { MPlugin, MVideo, UserNotificationModelForApi } from '@server/types/models' import { PeerTubeHelpers } from '@server/types/plugins' +import { ffprobePromise } from '@shared/ffmpeg' import { VideoBlacklistCreate, VideoStorage } from '@shared/models' import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist' import { PeerTubeSocket } from '../peertube-socket' diff --git a/server/lib/runners/index.ts b/server/lib/runners/index.ts new file mode 100644 index 000000000..a737c7b59 --- /dev/null +++ b/server/lib/runners/index.ts @@ -0,0 +1,3 @@ +export * from './job-handlers' +export * from './runner' +export * from './runner-urls' diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts new file mode 100644 index 000000000..73fc14574 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts @@ -0,0 +1,271 @@ +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { RUNNER_JOBS } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { PeerTubeSocket } from '@server/lib/peertube-socket' +import { RunnerJobModel } from '@server/models/runner/runner-job' +import { setAsUpdated } from '@server/models/shared' +import { MRunnerJob } from '@server/types/models/runners' +import { pick } from '@shared/core-utils' +import { + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobLiveRTMPHLSTranscodingPrivatePayload, + RunnerJobState, + RunnerJobSuccessPayload, + RunnerJobType, + RunnerJobUpdatePayload, + RunnerJobVODAudioMergeTranscodingPayload, + RunnerJobVODAudioMergeTranscodingPrivatePayload, + RunnerJobVODHLSTranscodingPayload, + RunnerJobVODHLSTranscodingPrivatePayload, + RunnerJobVODWebVideoTranscodingPayload, + RunnerJobVODWebVideoTranscodingPrivatePayload +} from '@shared/models' + +type CreateRunnerJobArg = + { + type: Extract + payload: RunnerJobVODWebVideoTranscodingPayload + privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload + } | + { + type: Extract + payload: RunnerJobVODHLSTranscodingPayload + privatePayload: RunnerJobVODHLSTranscodingPrivatePayload + } | + { + type: Extract + payload: RunnerJobVODAudioMergeTranscodingPayload + privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload + } | + { + type: Extract + payload: RunnerJobLiveRTMPHLSTranscodingPayload + privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload + } + +export abstract class AbstractJobHandler { + + protected readonly lTags = loggerTagsFactory('runner') + + // --------------------------------------------------------------------------- + + abstract create (options: C): Promise + + protected async createRunnerJob (options: CreateRunnerJobArg & { + jobUUID: string + priority: number + dependsOnRunnerJob?: MRunnerJob + }): Promise { + const { priority, dependsOnRunnerJob } = options + + const runnerJob = new RunnerJobModel({ + ...pick(options, [ 'type', 'payload', 'privatePayload' ]), + + uuid: options.jobUUID, + + state: dependsOnRunnerJob + ? RunnerJobState.WAITING_FOR_PARENT_JOB + : RunnerJobState.PENDING, + + dependsOnRunnerJobId: dependsOnRunnerJob?.id, + + priority + }) + + const job = await sequelizeTypescript.transaction(async transaction => { + return runnerJob.save({ transaction }) + }) + + if (runnerJob.state === RunnerJobState.PENDING) { + PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() + } + + return job + } + + // --------------------------------------------------------------------------- + + protected abstract specificUpdate (options: { + runnerJob: MRunnerJob + updatePayload?: U + }): Promise | void + + async update (options: { + runnerJob: MRunnerJob + progress?: number + updatePayload?: U + }) { + const { runnerJob, progress } = options + + await this.specificUpdate(options) + + if (progress) runnerJob.progress = progress + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + if (runnerJob.changed()) { + return runnerJob.save({ transaction }) + } + + // Don't update the job too often + if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { + await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) + } + }) + }) + } + + // --------------------------------------------------------------------------- + + async complete (options: { + runnerJob: MRunnerJob + resultPayload: S + }) { + const { runnerJob } = options + + try { + await this.specificComplete(options) + + runnerJob.state = RunnerJobState.COMPLETED + } catch (err) { + logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) + + runnerJob.state = RunnerJobState.ERRORED + runnerJob.error = err.message + } + + runnerJob.progress = null + runnerJob.finishedAt = new Date() + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + + const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) + + if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() + } + + protected abstract specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: S + }): Promise | void + + // --------------------------------------------------------------------------- + + async cancel (options: { + runnerJob: MRunnerJob + fromParent?: boolean + }) { + const { runnerJob, fromParent } = options + + await this.specificCancel(options) + + const cancelState = fromParent + ? RunnerJobState.PARENT_CANCELLED + : RunnerJobState.CANCELLED + + runnerJob.setToErrorOrCancel(cancelState) + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + + const children = await RunnerJobModel.listChildrenOf(runnerJob) + for (const child of children) { + logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) + + await this.cancel({ runnerJob: child, fromParent: true }) + } + } + + protected abstract specificCancel (options: { + runnerJob: MRunnerJob + }): Promise | void + + // --------------------------------------------------------------------------- + + protected abstract isAbortSupported (): boolean + + async abort (options: { + runnerJob: MRunnerJob + }) { + const { runnerJob } = options + + if (this.isAbortSupported() !== true) { + return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) + } + + await this.specificAbort(options) + + runnerJob.resetToPending() + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + } + + protected setAbortState (runnerJob: MRunnerJob) { + runnerJob.resetToPending() + } + + protected abstract specificAbort (options: { + runnerJob: MRunnerJob + }): Promise | void + + // --------------------------------------------------------------------------- + + async error (options: { + runnerJob: MRunnerJob + message: string + fromParent?: boolean + }) { + const { runnerJob, message, fromParent } = options + + const errorState = fromParent + ? RunnerJobState.PARENT_ERRORED + : RunnerJobState.ERRORED + + const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES + ? RunnerJobState.PENDING + : errorState + + await this.specificError({ ...options, nextState }) + + if (nextState === errorState) { + runnerJob.setToErrorOrCancel(nextState) + runnerJob.error = message + } else { + runnerJob.resetToPending() + } + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + + if (runnerJob.state === errorState) { + const children = await RunnerJobModel.listChildrenOf(runnerJob) + + for (const child of children) { + logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) + + await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) + } + } + } + + protected abstract specificError (options: { + runnerJob: MRunnerJob + message: string + nextState: RunnerJobState + }): Promise | void +} diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts new file mode 100644 index 000000000..517645848 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts @@ -0,0 +1,71 @@ + +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { logger } from '@server/helpers/logger' +import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MRunnerJob } from '@server/types/models/runners' +import { + LiveRTMPHLSTranscodingUpdatePayload, + RunnerJobSuccessPayload, + RunnerJobUpdatePayload, + RunnerJobVODPrivatePayload +} from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' +import { loadTranscodingRunnerVideo } from './shared' + +// eslint-disable-next-line max-len +export abstract class AbstractVODTranscodingJobHandler extends AbstractJobHandler { + + // --------------------------------------------------------------------------- + + protected isAbortSupported () { + return true + } + + protected specificUpdate (_options: { + runnerJob: MRunnerJob + updatePayload?: LiveRTMPHLSTranscodingUpdatePayload + }) { + // empty + } + + protected specificAbort (_options: { + runnerJob: MRunnerJob + }) { + // empty + } + + protected async specificError (options: { + runnerJob: MRunnerJob + }) { + const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) + if (!video) return + + await moveToFailedTranscodingState(video) + + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + } + + protected async specificCancel (options: { + runnerJob: MRunnerJob + }) { + const { runnerJob } = options + + const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) + if (!video) return + + const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + + logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid)) + + if (pending === 0) { + logger.info( + `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`, + this.lTags(video.uuid) + ) + + const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo }) + } + } +} diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts new file mode 100644 index 000000000..0fca72b9a --- /dev/null +++ b/server/lib/runners/job-handlers/index.ts @@ -0,0 +1,6 @@ +export * from './abstract-job-handler' +export * from './live-rtmp-hls-transcoding-job-handler' +export * from './vod-audio-merge-transcoding-job-handler' +export * from './vod-hls-transcoding-job-handler' +export * from './vod-web-video-transcoding-job-handler' +export * from './runner-job-handlers' diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..c3d0e427d --- /dev/null +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts @@ -0,0 +1,170 @@ +import { move, remove } from 'fs-extra' +import { join } from 'path' +import { logger } from '@server/helpers/logger' +import { JOB_PRIORITY } from '@server/initializers/constants' +import { LiveManager } from '@server/lib/live' +import { MStreamingPlaylist, MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { + LiveRTMPHLSTranscodingSuccess, + LiveRTMPHLSTranscodingUpdatePayload, + LiveVideoError, + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobLiveRTMPHLSTranscodingPrivatePayload, + RunnerJobState +} from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' + +type CreateOptions = { + video: MVideo + playlist: MStreamingPlaylist + + rtmpUrl: string + + toTranscode: { + resolution: number + fps: number + }[] + + segmentListSize: number + segmentDuration: number + + outputDirectory: string +} + +// eslint-disable-next-line max-len +export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler { + + async create (options: CreateOptions) { + const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options + + const jobUUID = buildUUID() + const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { + input: { + rtmpUrl + }, + output: { + toTranscode, + segmentListSize, + segmentDuration + } + } + + const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { + videoUUID: video.uuid, + masterPlaylistName: playlist.playlistFilename, + outputDirectory + } + + const job = await this.createRunnerJob({ + type: 'live-rtmp-hls-transcoding', + jobUUID, + payload, + privatePayload, + priority: JOB_PRIORITY.TRANSCODING + }) + + return job + } + + // --------------------------------------------------------------------------- + + async specificUpdate (options: { + runnerJob: MRunnerJob + updatePayload: LiveRTMPHLSTranscodingUpdatePayload + }) { + const { runnerJob, updatePayload } = options + + const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload + const outputDirectory = privatePayload.outputDirectory + const videoUUID = privatePayload.videoUUID + + if (updatePayload.type === 'add-chunk') { + await move( + updatePayload.videoChunkFile as string, + join(outputDirectory, updatePayload.videoChunkFilename), + { overwrite: true } + ) + } else if (updatePayload.type === 'remove-chunk') { + await remove(join(outputDirectory, updatePayload.videoChunkFilename)) + } + + if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) { + await move( + updatePayload.resolutionPlaylistFile as string, + join(outputDirectory, updatePayload.resolutionPlaylistFilename), + { overwrite: true } + ) + } + + if (updatePayload.masterPlaylistFile) { + await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true }) + } + + logger.info( + 'Runner live RTMP to HLS job %s for %s updated.', + runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) } + ) + } + + // --------------------------------------------------------------------------- + + protected specificComplete (options: { + runnerJob: MRunnerJob + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'ended' + }) + } + + // --------------------------------------------------------------------------- + + protected isAbortSupported () { + return false + } + + protected specificAbort () { + throw new Error('Not implemented') + } + + protected specificError (options: { + runnerJob: MRunnerJob + nextState: RunnerJobState + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'errored' + }) + } + + protected specificCancel (options: { + runnerJob: MRunnerJob + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'cancelled' + }) + } + + private stopLive (options: { + runnerJob: MRunnerJob + type: 'ended' | 'errored' | 'cancelled' + }) { + const { runnerJob, type } = options + + const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload + const videoUUID = privatePayload.videoUUID + + const errorType = { + ended: null, + errored: LiveVideoError.RUNNER_JOB_ERROR, + cancelled: LiveVideoError.RUNNER_JOB_CANCEL + } + + LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type]) + + logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID)) + } +} diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts new file mode 100644 index 000000000..7bad1bc77 --- /dev/null +++ b/server/lib/runners/job-handlers/runner-job-handlers.ts @@ -0,0 +1,18 @@ +import { MRunnerJob } from '@server/types/models/runners' +import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' +import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' +import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' +import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' +import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' + +const processors: Record AbstractJobHandler> = { + 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, + 'vod-hls-transcoding': VODHLSTranscodingJobHandler, + 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, + 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler +} + +export function getRunnerJobHandlerClass (job: MRunnerJob) { + return processors[job.type] +} diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts new file mode 100644 index 000000000..348273ae2 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/index.ts @@ -0,0 +1 @@ +export * from './vod-helpers' diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts new file mode 100644 index 000000000..93ae89ff8 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/vod-helpers.ts @@ -0,0 +1,44 @@ +import { move } from 'fs-extra' +import { dirname, join } from 'path' +import { logger, LoggerTagsFn } from '@server/helpers/logger' +import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' +import { onWebTorrentVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding' +import { buildNewFile } from '@server/lib/video-file' +import { VideoModel } from '@server/models/video/video' +import { MVideoFullLight } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models' + +export async function onVODWebVideoOrAudioMergeTranscodingJob (options: { + video: MVideoFullLight + videoFilePath: string + privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload +}) { + const { video, videoFilePath, privatePayload } = options + + const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' }) + videoFile.videoId = video.id + + const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) + await move(videoFilePath, newVideoFilePath) + + await onWebTorrentVideoFileTranscoding({ + video, + videoFile, + videoOutputPath: newVideoFilePath + }) + + await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) +} + +export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) { + const videoUUID = runnerJob.privatePayload.videoUUID + + const video = await VideoModel.loadFull(videoUUID) + if (!video) { + logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID)) + return undefined + } + + return video +} diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts new file mode 100644 index 000000000..a7b33f87e --- /dev/null +++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts @@ -0,0 +1,97 @@ +import { pick } from 'lodash' +import { logger } from '@server/helpers/logger' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { getVideoStreamDuration } from '@shared/ffmpeg' +import { + RunnerJobUpdatePayload, + RunnerJobVODAudioMergeTranscodingPayload, + RunnerJobVODWebVideoTranscodingPrivatePayload, + VODAudioMergeTranscodingSuccess +} from '@shared/models' +import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls' +import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' +import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' + +type CreateOptions = { + video: MVideo + isNewVideo: boolean + resolution: number + fps: number + priority: number + dependsOnRunnerJob?: MRunnerJob +} + +// eslint-disable-next-line max-len +export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler { + + async create (options: CreateOptions) { + const { video, resolution, fps, priority, dependsOnRunnerJob } = options + + const jobUUID = buildUUID() + const payload: RunnerJobVODAudioMergeTranscodingPayload = { + input: { + audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid), + previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid) + }, + output: { + resolution, + fps + } + } + + const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { + ...pick(options, [ 'isNewVideo' ]), + + videoUUID: video.uuid + } + + const job = await this.createRunnerJob({ + type: 'vod-audio-merge-transcoding', + jobUUID, + payload, + privatePayload, + priority, + dependsOnRunnerJob + }) + + await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') + + return job + } + + // --------------------------------------------------------------------------- + + async specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: VODAudioMergeTranscodingSuccess + }) { + const { runnerJob, resultPayload } = options + const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload + + const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) + if (!video) return + + const videoFilePath = resultPayload.videoFile as string + + // ffmpeg generated a new video file, so update the video duration + // See https://trac.ffmpeg.org/ticket/5456 + video.duration = await getVideoStreamDuration(videoFilePath) + await video.save() + + // We can remove the old audio file + const oldAudioFile = video.VideoFiles[0] + await video.removeWebTorrentFile(oldAudioFile) + await oldAudioFile.destroy() + video.VideoFiles = [] + + await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) + + logger.info( + 'Runner VOD audio merge transcoding job %s for %s ended.', + runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) + ) + } +} diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..02566b9d5 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts @@ -0,0 +1,114 @@ +import { move } from 'fs-extra' +import { dirname, join } from 'path' +import { logger } from '@server/helpers/logger' +import { renameVideoFileInPlaylist } from '@server/lib/hls' +import { getHlsResolutionPlaylistFilename } from '@server/lib/paths' +import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' +import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding' +import { buildNewFile, removeAllWebTorrentFiles } from '@server/lib/video-file' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { pick } from '@shared/core-utils' +import { buildUUID } from '@shared/extra-utils' +import { + RunnerJobUpdatePayload, + RunnerJobVODHLSTranscodingPayload, + RunnerJobVODHLSTranscodingPrivatePayload, + VODHLSTranscodingSuccess +} from '@shared/models' +import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' +import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' +import { loadTranscodingRunnerVideo } from './shared' + +type CreateOptions = { + video: MVideo + isNewVideo: boolean + deleteWebVideoFiles: boolean + resolution: number + fps: number + priority: number + dependsOnRunnerJob?: MRunnerJob +} + +// eslint-disable-next-line max-len +export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler { + + async create (options: CreateOptions) { + const { video, resolution, fps, dependsOnRunnerJob, priority } = options + + const jobUUID = buildUUID() + + const payload: RunnerJobVODHLSTranscodingPayload = { + input: { + videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) + }, + output: { + resolution, + fps + } + } + + const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = { + ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]), + + videoUUID: video.uuid + } + + const job = await this.createRunnerJob({ + type: 'vod-hls-transcoding', + jobUUID, + payload, + privatePayload, + priority, + dependsOnRunnerJob + }) + + await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') + + return job + } + + // --------------------------------------------------------------------------- + + async specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: VODHLSTranscodingSuccess + }) { + const { runnerJob, resultPayload } = options + const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload + + const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) + if (!video) return + + const videoFilePath = resultPayload.videoFile as string + const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string + + const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' }) + const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) + await move(videoFilePath, newVideoFilePath) + + const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename) + const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename) + await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath) + + await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename) + + await onHLSVideoFileTranscoding({ + video, + videoFile, + m3u8OutputPath: newResolutionPlaylistFilePath, + videoOutputPath: newVideoFilePath + }) + + await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) + + if (privatePayload.deleteWebVideoFiles === true) { + logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid)) + + await removeAllWebTorrentFiles(video) + } + + logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid)) + } +} diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts new file mode 100644 index 000000000..57761a7a1 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts @@ -0,0 +1,84 @@ +import { pick } from 'lodash' +import { logger } from '@server/helpers/logger' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { + RunnerJobUpdatePayload, + RunnerJobVODWebVideoTranscodingPayload, + RunnerJobVODWebVideoTranscodingPrivatePayload, + VODWebVideoTranscodingSuccess +} from '@shared/models' +import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' +import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' +import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' + +type CreateOptions = { + video: MVideo + isNewVideo: boolean + resolution: number + fps: number + priority: number + dependsOnRunnerJob?: MRunnerJob +} + +// eslint-disable-next-line max-len +export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler { + + async create (options: CreateOptions) { + const { video, resolution, fps, priority, dependsOnRunnerJob } = options + + const jobUUID = buildUUID() + const payload: RunnerJobVODWebVideoTranscodingPayload = { + input: { + videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) + }, + output: { + resolution, + fps + } + } + + const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { + ...pick(options, [ 'isNewVideo' ]), + + videoUUID: video.uuid + } + + const job = await this.createRunnerJob({ + type: 'vod-web-video-transcoding', + jobUUID, + payload, + privatePayload, + dependsOnRunnerJob, + priority + }) + + await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') + + return job + } + + // --------------------------------------------------------------------------- + + async specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: VODWebVideoTranscodingSuccess + }) { + const { runnerJob, resultPayload } = options + const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload + + const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) + if (!video) return + + const videoFilePath = resultPayload.videoFile as string + + await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) + + logger.info( + 'Runner VOD web video transcoding job %s for %s ended.', + runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) + ) + } +} diff --git a/server/lib/runners/runner-urls.ts b/server/lib/runners/runner-urls.ts new file mode 100644 index 000000000..329fb1170 --- /dev/null +++ b/server/lib/runners/runner-urls.ts @@ -0,0 +1,9 @@ +import { WEBSERVER } from '@server/initializers/constants' + +export function generateRunnerTranscodingVideoInputFileUrl (jobUUID: string, videoUUID: string) { + return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/max-quality' +} + +export function generateRunnerTranscodingVideoPreviewFileUrl (jobUUID: string, videoUUID: string) { + return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/previews/max-quality' +} diff --git a/server/lib/runners/runner.ts b/server/lib/runners/runner.ts new file mode 100644 index 000000000..74c814ba1 --- /dev/null +++ b/server/lib/runners/runner.ts @@ -0,0 +1,36 @@ +import express from 'express' +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { sequelizeTypescript } from '@server/initializers/database' +import { MRunner } from '@server/types/models/runners' + +const lTags = loggerTagsFactory('runner') + +const updatingRunner = new Set() + +function updateLastRunnerContact (req: express.Request, runner: MRunner) { + const now = new Date() + + // Don't update last runner contact too often + if (now.getTime() - runner.lastContact.getTime() < 2000) return + if (updatingRunner.has(runner.id)) return + + updatingRunner.add(runner.id) + + runner.lastContact = now + runner.ip = req.ip + + logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name)) + + retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + return runner.save({ transaction }) + }) + }) + .catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) })) + .finally(() => updatingRunner.delete(runner.id)) +} + +export { + updateLastRunnerContact +} diff --git a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts new file mode 100644 index 000000000..f7a26d2bc --- /dev/null +++ b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts @@ -0,0 +1,42 @@ +import { CONFIG } from '@server/initializers/config' +import { RunnerJobModel } from '@server/models/runner/runner-job' +import { logger, loggerTagsFactory } from '../../helpers/logger' +import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { getRunnerJobHandlerClass } from '../runners' +import { AbstractScheduler } from './abstract-scheduler' + +const lTags = loggerTagsFactory('runner') + +export class RunnerJobWatchDogScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.RUNNER_JOB_WATCH_DOG + + private constructor () { + super() + } + + protected async internalExecute () { + const vodStalledJobs = await RunnerJobModel.listStalledJobs({ + staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.VOD, + types: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ] + }) + + const liveStalledJobs = await RunnerJobModel.listStalledJobs({ + staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.LIVE, + types: [ 'live-rtmp-hls-transcoding' ] + }) + + for (const stalled of [ ...vodStalledJobs, ...liveStalledJobs ]) { + logger.info('Abort stalled runner job %s (%s)', stalled.uuid, stalled.type, lTags(stalled.uuid, stalled.type)) + + const Handler = getRunnerJobHandlerClass(stalled) + await new Handler().abort({ runnerJob: stalled }) + } + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/server-config-manager.ts b/server/lib/server-config-manager.ts index e87e2854f..ba7916363 100644 --- a/server/lib/server-config-manager.ts +++ b/server/lib/server-config-manager.ts @@ -126,11 +126,14 @@ class ServerConfigManager { serverVersion: PEERTUBE_VERSION, serverCommit: this.serverCommit, transcoding: { + remoteRunners: { + enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED + }, hls: { - enabled: CONFIG.TRANSCODING.HLS.ENABLED + enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.HLS.ENABLED }, webtorrent: { - enabled: CONFIG.TRANSCODING.WEBTORRENT.ENABLED + enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.WEBTORRENT.ENABLED }, enabledResolutions: this.getEnabledResolutions('vod'), profile: CONFIG.TRANSCODING.PROFILE, @@ -150,6 +153,9 @@ class ServerConfigManager { transcoding: { enabled: CONFIG.LIVE.TRANSCODING.ENABLED, + remoteRunners: { + enabled: CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED + }, enabledResolutions: this.getEnabledResolutions('live'), profile: CONFIG.LIVE.TRANSCODING.PROFILE, availableProfiles: VideoTranscodingProfilesManager.Instance.getAvailableProfiles('live') diff --git a/server/lib/transcoding/create-transcoding-job.ts b/server/lib/transcoding/create-transcoding-job.ts new file mode 100644 index 000000000..46831a912 --- /dev/null +++ b/server/lib/transcoding/create-transcoding-job.ts @@ -0,0 +1,36 @@ +import { CONFIG } from '@server/initializers/config' +import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' +import { TranscodingJobQueueBuilder, TranscodingRunnerJobBuilder } from './shared' + +export function createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId +}) { + return getJobBuilder().createOptimizeOrMergeAudioJobs(options) +} + +// --------------------------------------------------------------------------- + +export function createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId +}) { + return getJobBuilder().createTranscodingJobs(options) +} + +// --------------------------------------------------------------------------- +// Private +// --------------------------------------------------------------------------- + +function getJobBuilder () { + if (CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED === true) { + return new TranscodingRunnerJobBuilder() + } + + return new TranscodingJobQueueBuilder() +} diff --git a/server/lib/transcoding/default-transcoding-profiles.ts b/server/lib/transcoding/default-transcoding-profiles.ts index f47718819..5251784ac 100644 --- a/server/lib/transcoding/default-transcoding-profiles.ts +++ b/server/lib/transcoding/default-transcoding-profiles.ts @@ -1,15 +1,9 @@ import { logger } from '@server/helpers/logger' import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils' -import { AvailableEncoders, EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '../../../shared/models/videos' -import { - buildStreamSuffix, - canDoQuickAudioTranscode, - ffprobePromise, - getAudioStream, - getMaxAudioBitrate, - resetSupportedEncoders -} from '../../helpers/ffmpeg' +import { buildStreamSuffix, FFmpegCommandWrapper, ffprobePromise, getAudioStream, getMaxAudioBitrate } from '@shared/ffmpeg' +import { AvailableEncoders, EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '@shared/models' +import { canDoQuickAudioTranscode } from './transcoding-quick-transcode' /** * @@ -184,14 +178,14 @@ class VideoTranscodingProfilesManager { addEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { this.encodersPriorities[type][streamType].push({ name: encoder, priority }) - resetSupportedEncoders() + FFmpegCommandWrapper.resetSupportedEncoders() } removeEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { this.encodersPriorities[type][streamType] = this.encodersPriorities[type][streamType] .filter(o => o.name !== encoder && o.priority !== priority) - resetSupportedEncoders() + FFmpegCommandWrapper.resetSupportedEncoders() } private getEncodersByPriority (type: 'vod' | 'live', streamType: 'audio' | 'video') { diff --git a/server/lib/transcoding/ended-transcoding.ts b/server/lib/transcoding/ended-transcoding.ts new file mode 100644 index 000000000..d31674ede --- /dev/null +++ b/server/lib/transcoding/ended-transcoding.ts @@ -0,0 +1,18 @@ +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { moveToNextState } from '../video-state' + +export async function onTranscodingEnded (options: { + video: MVideo + isNewVideo: boolean + moveVideoToNextState: boolean +}) { + const { video, isNewVideo, moveVideoToNextState } = options + + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + + if (moveVideoToNextState) { + await retryTransactionWrapper(moveToNextState, { video, isNewVideo }) + } +} diff --git a/server/lib/transcoding/hls-transcoding.ts b/server/lib/transcoding/hls-transcoding.ts new file mode 100644 index 000000000..cffa859c7 --- /dev/null +++ b/server/lib/transcoding/hls-transcoding.ts @@ -0,0 +1,181 @@ +import { MutexInterface } from 'async-mutex' +import { Job } from 'bullmq' +import { ensureDir, move, stat } from 'fs-extra' +import { basename, extname as extnameUtil, join } from 'path' +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' +import { sequelizeTypescript } from '@server/initializers/database' +import { MVideo, MVideoFile } from '@server/types/models' +import { pick } from '@shared/core-utils' +import { getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg' +import { VideoResolution } from '@shared/models' +import { CONFIG } from '../../initializers/config' +import { VideoFileModel } from '../../models/video/video-file' +import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist' +import { updatePlaylistAfterFileChange } from '../hls' +import { generateHLSVideoFilename, getHlsResolutionPlaylistFilename } from '../paths' +import { buildFileMetadata } from '../video-file' +import { VideoPathManager } from '../video-path-manager' +import { buildFFmpegVOD } from './shared' + +// Concat TS segments from a live video to a fragmented mp4 HLS playlist +export async function generateHlsPlaylistResolutionFromTS (options: { + video: MVideo + concatenatedTsFilePath: string + resolution: VideoResolution + fps: number + isAAC: boolean + inputFileMutexReleaser: MutexInterface.Releaser +}) { + return generateHlsPlaylistCommon({ + type: 'hls-from-ts' as 'hls-from-ts', + inputPath: options.concatenatedTsFilePath, + + ...pick(options, [ 'video', 'resolution', 'fps', 'inputFileMutexReleaser', 'isAAC' ]) + }) +} + +// Generate an HLS playlist from an input file, and update the master playlist +export function generateHlsPlaylistResolution (options: { + video: MVideo + videoInputPath: string + resolution: VideoResolution + fps: number + copyCodecs: boolean + inputFileMutexReleaser: MutexInterface.Releaser + job?: Job +}) { + return generateHlsPlaylistCommon({ + type: 'hls' as 'hls', + inputPath: options.videoInputPath, + + ...pick(options, [ 'video', 'resolution', 'fps', 'copyCodecs', 'inputFileMutexReleaser', 'job' ]) + }) +} + +export async function onHLSVideoFileTranscoding (options: { + video: MVideo + videoFile: MVideoFile + videoOutputPath: string + m3u8OutputPath: string +}) { + const { video, videoFile, videoOutputPath, m3u8OutputPath } = options + + // Create or update the playlist + const playlist = await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + return VideoStreamingPlaylistModel.loadOrGenerate(video, transaction) + }) + }) + videoFile.videoStreamingPlaylistId = playlist.id + + const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + // VOD transcoding is a long task, refresh video attributes + await video.reload() + + const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, videoFile) + await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video)) + + // Move playlist file + const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, basename(m3u8OutputPath)) + await move(m3u8OutputPath, resolutionPlaylistPath, { overwrite: true }) + // Move video file + await move(videoOutputPath, videoFilePath, { overwrite: true }) + + // Update video duration if it was not set (in case of a live for example) + if (!video.duration) { + video.duration = await getVideoStreamDuration(videoFilePath) + await video.save() + } + + const stats = await stat(videoFilePath) + + videoFile.size = stats.size + videoFile.fps = await getVideoStreamFPS(videoFilePath) + videoFile.metadata = await buildFileMetadata(videoFilePath) + + await createTorrentAndSetInfoHash(playlist, videoFile) + + const oldFile = await VideoFileModel.loadHLSFile({ + playlistId: playlist.id, + fps: videoFile.fps, + resolution: videoFile.resolution + }) + + if (oldFile) { + await video.removeStreamingPlaylistVideoFile(playlist, oldFile) + await oldFile.destroy() + } + + const savedVideoFile = await VideoFileModel.customUpsert(videoFile, 'streaming-playlist', undefined) + + await updatePlaylistAfterFileChange(video, playlist) + + return { resolutionPlaylistPath, videoFile: savedVideoFile } + } finally { + mutexReleaser() + } +} + +// --------------------------------------------------------------------------- + +async function generateHlsPlaylistCommon (options: { + type: 'hls' | 'hls-from-ts' + video: MVideo + inputPath: string + + resolution: VideoResolution + fps: number + + inputFileMutexReleaser: MutexInterface.Releaser + + copyCodecs?: boolean + isAAC?: boolean + + job?: Job +}) { + const { type, video, inputPath, resolution, fps, copyCodecs, isAAC, job, inputFileMutexReleaser } = options + const transcodeDirectory = CONFIG.STORAGE.TMP_DIR + + const videoTranscodedBasePath = join(transcodeDirectory, type) + await ensureDir(videoTranscodedBasePath) + + const videoFilename = generateHLSVideoFilename(resolution) + const videoOutputPath = join(videoTranscodedBasePath, videoFilename) + + const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFilename) + const m3u8OutputPath = join(videoTranscodedBasePath, resolutionPlaylistFilename) + + const transcodeOptions = { + type, + + inputPath, + outputPath: m3u8OutputPath, + + resolution, + fps, + copyCodecs, + + isAAC, + + inputFileMutexReleaser, + + hlsPlaylist: { + videoFilename + } + } + + await buildFFmpegVOD(job).transcode(transcodeOptions) + + const newVideoFile = new VideoFileModel({ + resolution, + extname: extnameUtil(videoFilename), + size: 0, + filename: videoFilename, + fps: -1 + }) + + await onHLSVideoFileTranscoding({ video, videoFile: newVideoFile, videoOutputPath, m3u8OutputPath }) +} diff --git a/server/lib/transcoding/shared/ffmpeg-builder.ts b/server/lib/transcoding/shared/ffmpeg-builder.ts new file mode 100644 index 000000000..441445ec4 --- /dev/null +++ b/server/lib/transcoding/shared/ffmpeg-builder.ts @@ -0,0 +1,18 @@ +import { Job } from 'bullmq' +import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' +import { logger } from '@server/helpers/logger' +import { FFmpegVOD } from '@shared/ffmpeg' +import { VideoTranscodingProfilesManager } from '../default-transcoding-profiles' + +export function buildFFmpegVOD (job?: Job) { + return new FFmpegVOD({ + ...getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()), + + updateJobProgress: progress => { + if (!job) return + + job.updateProgress(progress) + .catch(err => logger.error('Cannot update ffmpeg job progress', { err })) + } + }) +} diff --git a/server/lib/transcoding/shared/index.ts b/server/lib/transcoding/shared/index.ts new file mode 100644 index 000000000..f0b45bcbb --- /dev/null +++ b/server/lib/transcoding/shared/index.ts @@ -0,0 +1,2 @@ +export * from './job-builders' +export * from './ffmpeg-builder' diff --git a/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts new file mode 100644 index 000000000..f1e9efdcf --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts @@ -0,0 +1,38 @@ + +import { JOB_PRIORITY } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' +import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' + +export abstract class AbstractJobBuilder { + + abstract createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId + }): Promise + + abstract createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId | null + }): Promise + + protected async getTranscodingJobPriority (options: { + user: MUserId + fallback: number + }) { + const { user, fallback } = options + + if (!user) return fallback + + const now = new Date() + const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) + + const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) + + return JOB_PRIORITY.TRANSCODING + videoUploadedByUser + } +} diff --git a/server/lib/transcoding/shared/job-builders/index.ts b/server/lib/transcoding/shared/job-builders/index.ts new file mode 100644 index 000000000..9b1c82adf --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/index.ts @@ -0,0 +1,2 @@ +export * from './transcoding-job-queue-builder' +export * from './transcoding-runner-job-builder' diff --git a/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts new file mode 100644 index 000000000..7c892718b --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts @@ -0,0 +1,308 @@ +import Bluebird from 'bluebird' +import { computeOutputFPS } from '@server/helpers/ffmpeg' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' +import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' +import { Hooks } from '@server/lib/plugins/hooks' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' +import { + HLSTranscodingPayload, + MergeAudioTranscodingPayload, + NewWebTorrentResolutionTranscodingPayload, + OptimizeTranscodingPayload, + VideoTranscodingPayload +} from '@shared/models' +import { canDoQuickTranscode } from '../../transcoding-quick-transcode' +import { computeResolutionsToTranscode } from '../../transcoding-resolutions' +import { AbstractJobBuilder } from './abstract-job-builder' + +export class TranscodingJobQueueBuilder extends AbstractJobBuilder { + + async createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId + }) { + const { video, videoFile, isNewVideo, user } = options + + let mergeOrOptimizePayload: MergeAudioTranscodingPayload | OptimizeTranscodingPayload + let nextTranscodingSequentialJobPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] + + const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { + const probe = await ffprobePromise(videoFilePath) + + const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) + const hasAudio = await hasAudioStream(videoFilePath, probe) + const quickTranscode = await canDoQuickTranscode(videoFilePath, probe) + const inputFPS = videoFile.isAudio() + ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value + : await getVideoStreamFPS(videoFilePath, probe) + + const maxResolution = await isAudioFile(videoFilePath, probe) + ? DEFAULT_AUDIO_RESOLUTION + : resolution + + if (CONFIG.TRANSCODING.HLS.ENABLED === true) { + nextTranscodingSequentialJobPayloads.push([ + this.buildHLSJobPayload({ + deleteWebTorrentFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, + + // We had some issues with a web video quick transcoded while producing a HLS version of it + copyCodecs: !quickTranscode, + + resolution: maxResolution, + fps: computeOutputFPS({ inputFPS, resolution: maxResolution }), + videoUUID: video.uuid, + isNewVideo + }) + ]) + } + + const lowerResolutionJobPayloads = await this.buildLowerResolutionJobPayloads({ + video, + inputVideoResolution: maxResolution, + inputVideoFPS: inputFPS, + hasAudio, + isNewVideo + }) + + nextTranscodingSequentialJobPayloads = [ ...nextTranscodingSequentialJobPayloads, ...lowerResolutionJobPayloads ] + + mergeOrOptimizePayload = videoFile.isAudio() + ? this.buildMergeAudioPayload({ videoUUID: video.uuid, isNewVideo }) + : this.buildOptimizePayload({ videoUUID: video.uuid, isNewVideo, quickTranscode }) + }) + } finally { + mutexReleaser() + } + + const nextTranscodingSequentialJobs = await Bluebird.mapSeries(nextTranscodingSequentialJobPayloads, payloads => { + return Bluebird.mapSeries(payloads, payload => { + return this.buildTranscodingJob({ payload, user }) + }) + }) + + const transcodingJobBuilderJob: CreateJobArgument = { + type: 'transcoding-job-builder', + payload: { + videoUUID: video.uuid, + sequentialJobs: nextTranscodingSequentialJobs + } + } + + const mergeOrOptimizeJob = await this.buildTranscodingJob({ payload: mergeOrOptimizePayload, user }) + + return JobQueue.Instance.createSequentialJobFlow(...[ mergeOrOptimizeJob, transcodingJobBuilderJob ]) + } + + // --------------------------------------------------------------------------- + + async createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId | null + }) { + const { video, transcodingType, resolutions, isNewVideo } = options + + const maxResolution = Math.max(...resolutions) + const childrenResolutions = resolutions.filter(r => r !== maxResolution) + + logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) + + const { fps: inputFPS } = await video.probeMaxQualityFile() + + const children = childrenResolutions.map(resolution => { + const fps = computeOutputFPS({ inputFPS, resolution }) + + if (transcodingType === 'hls') { + return this.buildHLSJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) + } + + if (transcodingType === 'webtorrent') { + return this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) + } + + throw new Error('Unknown transcoding type') + }) + + const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) + + const parent = transcodingType === 'hls' + ? this.buildHLSJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) + : this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) + + // Process the last resolution after the other ones to prevent concurrency issue + // Because low resolutions use the biggest one as ffmpeg input + await this.createTranscodingJobsWithChildren({ videoUUID: video.uuid, parent, children, user: null }) + } + + // --------------------------------------------------------------------------- + + private async createTranscodingJobsWithChildren (options: { + videoUUID: string + parent: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload) + children: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload)[] + user: MUserId | null + }) { + const { videoUUID, parent, children, user } = options + + const parentJob = await this.buildTranscodingJob({ payload: parent, user }) + const childrenJobs = await Bluebird.mapSeries(children, c => this.buildTranscodingJob({ payload: c, user })) + + await JobQueue.Instance.createJobWithChildren(parentJob, childrenJobs) + + await VideoJobInfoModel.increaseOrCreate(videoUUID, 'pendingTranscode', 1 + children.length) + } + + private async buildTranscodingJob (options: { + payload: VideoTranscodingPayload + user: MUserId | null // null means we don't want priority + }) { + const { user, payload } = options + + return { + type: 'video-transcoding' as 'video-transcoding', + priority: await this.getTranscodingJobPriority({ user, fallback: undefined }), + payload + } + } + + private async buildLowerResolutionJobPayloads (options: { + video: MVideoWithFileThumbnail + inputVideoResolution: number + inputVideoFPS: number + hasAudio: boolean + isNewVideo: boolean + }) { + const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio } = options + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = await Hooks.wrapObject( + computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), + 'filter:transcoding.auto.resolutions-to-transcode.result', + options + ) + + const sequentialPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] + + for (const resolution of resolutionsEnabled) { + const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + const payloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[] = [ + this.buildWebTorrentJobPayload({ + videoUUID: video.uuid, + resolution, + fps, + isNewVideo + }) + ] + + // Create a subsequent job to create HLS resolution that will just copy web video codecs + if (CONFIG.TRANSCODING.HLS.ENABLED) { + payloads.push( + this.buildHLSJobPayload({ + videoUUID: video.uuid, + resolution, + fps, + isNewVideo, + copyCodecs: true + }) + ) + } + + sequentialPayloads.push(payloads) + } else if (CONFIG.TRANSCODING.HLS.ENABLED) { + sequentialPayloads.push([ + this.buildHLSJobPayload({ + videoUUID: video.uuid, + resolution, + fps, + copyCodecs: false, + isNewVideo + }) + ]) + } + } + + return sequentialPayloads + } + + private buildHLSJobPayload (options: { + videoUUID: string + resolution: number + fps: number + isNewVideo: boolean + deleteWebTorrentFiles?: boolean // default false + copyCodecs?: boolean // default false + }): HLSTranscodingPayload { + const { videoUUID, resolution, fps, isNewVideo, deleteWebTorrentFiles = false, copyCodecs = false } = options + + return { + type: 'new-resolution-to-hls', + videoUUID, + resolution, + fps, + copyCodecs, + isNewVideo, + deleteWebTorrentFiles + } + } + + private buildWebTorrentJobPayload (options: { + videoUUID: string + resolution: number + fps: number + isNewVideo: boolean + }): NewWebTorrentResolutionTranscodingPayload { + const { videoUUID, resolution, fps, isNewVideo } = options + + return { + type: 'new-resolution-to-webtorrent', + videoUUID, + isNewVideo, + resolution, + fps + } + } + + private buildMergeAudioPayload (options: { + videoUUID: string + isNewVideo: boolean + }): MergeAudioTranscodingPayload { + const { videoUUID, isNewVideo } = options + + return { + type: 'merge-audio-to-webtorrent', + resolution: DEFAULT_AUDIO_RESOLUTION, + fps: VIDEO_TRANSCODING_FPS.AUDIO_MERGE, + videoUUID, + isNewVideo + } + } + + private buildOptimizePayload (options: { + videoUUID: string + quickTranscode: boolean + isNewVideo: boolean + }): OptimizeTranscodingPayload { + const { videoUUID, quickTranscode, isNewVideo } = options + + return { + type: 'optimize-to-webtorrent', + videoUUID, + isNewVideo, + quickTranscode + } + } +} diff --git a/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts new file mode 100644 index 000000000..c7a63d2e2 --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts @@ -0,0 +1,189 @@ +import { computeOutputFPS } from '@server/helpers/ffmpeg' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' +import { Hooks } from '@server/lib/plugins/hooks' +import { VODAudioMergeTranscodingJobHandler, VODHLSTranscodingJobHandler, VODWebVideoTranscodingJobHandler } from '@server/lib/runners' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' +import { computeResolutionsToTranscode } from '../../transcoding-resolutions' +import { AbstractJobBuilder } from './abstract-job-builder' + +/** + * + * Class to build transcoding job in the local job queue + * + */ + +const lTags = loggerTagsFactory('transcoding') + +export class TranscodingRunnerJobBuilder extends AbstractJobBuilder { + + async createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId + }) { + const { video, videoFile, isNewVideo, user } = options + + const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { + const probe = await ffprobePromise(videoFilePath) + + const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) + const hasAudio = await hasAudioStream(videoFilePath, probe) + const inputFPS = videoFile.isAudio() + ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value + : await getVideoStreamFPS(videoFilePath, probe) + + const maxResolution = await isAudioFile(videoFilePath, probe) + ? DEFAULT_AUDIO_RESOLUTION + : resolution + + const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) + const priority = await this.getTranscodingJobPriority({ user, fallback: 0 }) + + const mainRunnerJob = videoFile.isAudio() + ? await new VODAudioMergeTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority }) + : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority }) + + if (CONFIG.TRANSCODING.HLS.ENABLED === true) { + await new VODHLSTranscodingJobHandler().create({ + video, + deleteWebVideoFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, + resolution: maxResolution, + fps, + isNewVideo, + dependsOnRunnerJob: mainRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + } + + await this.buildLowerResolutionJobPayloads({ + video, + inputVideoResolution: maxResolution, + inputVideoFPS: inputFPS, + hasAudio, + isNewVideo, + mainRunnerJob, + user + }) + }) + } finally { + mutexReleaser() + } + } + + // --------------------------------------------------------------------------- + + async createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId | null + }) { + const { video, transcodingType, resolutions, isNewVideo, user } = options + + const maxResolution = Math.max(...resolutions) + const { fps: inputFPS } = await video.probeMaxQualityFile() + const maxFPS = computeOutputFPS({ inputFPS, resolution: maxResolution }) + const priority = await this.getTranscodingJobPriority({ user, fallback: 0 }) + + const childrenResolutions = resolutions.filter(r => r !== maxResolution) + + logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) + + // Process the last resolution before the other ones to prevent concurrency issue + // Because low resolutions use the biggest one as ffmpeg input + const mainJob = transcodingType === 'hls' + // eslint-disable-next-line max-len + ? await new VODHLSTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, deleteWebVideoFiles: false, priority }) + : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, priority }) + + for (const resolution of childrenResolutions) { + const dependsOnRunnerJob = mainJob + const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) + + if (transcodingType === 'hls') { + await new VODHLSTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + deleteWebVideoFiles: false, + dependsOnRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + continue + } + + if (transcodingType === 'webtorrent') { + await new VODWebVideoTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + dependsOnRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + continue + } + + throw new Error('Unknown transcoding type') + } + } + + private async buildLowerResolutionJobPayloads (options: { + mainRunnerJob: MRunnerJob + video: MVideoWithFileThumbnail + inputVideoResolution: number + inputVideoFPS: number + hasAudio: boolean + isNewVideo: boolean + user: MUserId + }) { + const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio, mainRunnerJob, user } = options + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = await Hooks.wrapObject( + computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), + 'filter:transcoding.auto.resolutions-to-transcode.result', + options + ) + + logger.debug('Lower resolutions build for %s.', video.uuid, { resolutionsEnabled, ...lTags(video.uuid) }) + + for (const resolution of resolutionsEnabled) { + const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + await new VODWebVideoTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + dependsOnRunnerJob: mainRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + } + + if (CONFIG.TRANSCODING.HLS.ENABLED) { + await new VODHLSTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + deleteWebVideoFiles: false, + dependsOnRunnerJob: mainRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + } + } + } +} diff --git a/server/lib/transcoding/transcoding-quick-transcode.ts b/server/lib/transcoding/transcoding-quick-transcode.ts new file mode 100644 index 000000000..b7f921890 --- /dev/null +++ b/server/lib/transcoding/transcoding-quick-transcode.ts @@ -0,0 +1,61 @@ +import { FfprobeData } from 'fluent-ffmpeg' +import { CONFIG } from '@server/initializers/config' +import { VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' +import { getMaxBitrate } from '@shared/core-utils' +import { + ffprobePromise, + getAudioStream, + getMaxAudioBitrate, + getVideoStream, + getVideoStreamBitrate, + getVideoStreamDimensionsInfo, + getVideoStreamFPS +} from '@shared/ffmpeg' + +export async function canDoQuickTranscode (path: string, existingProbe?: FfprobeData): Promise { + if (CONFIG.TRANSCODING.PROFILE !== 'default') return false + + const probe = existingProbe || await ffprobePromise(path) + + return await canDoQuickVideoTranscode(path, probe) && + await canDoQuickAudioTranscode(path, probe) +} + +export async function canDoQuickAudioTranscode (path: string, probe?: FfprobeData): Promise { + const parsedAudio = await getAudioStream(path, probe) + + if (!parsedAudio.audioStream) return true + + if (parsedAudio.audioStream['codec_name'] !== 'aac') return false + + const audioBitrate = parsedAudio.bitrate + if (!audioBitrate) return false + + const maxAudioBitrate = getMaxAudioBitrate('aac', audioBitrate) + if (maxAudioBitrate !== -1 && audioBitrate > maxAudioBitrate) return false + + const channelLayout = parsedAudio.audioStream['channel_layout'] + // Causes playback issues with Chrome + if (!channelLayout || channelLayout === 'unknown' || channelLayout === 'quad') return false + + return true +} + +export async function canDoQuickVideoTranscode (path: string, probe?: FfprobeData): Promise { + const videoStream = await getVideoStream(path, probe) + const fps = await getVideoStreamFPS(path, probe) + const bitRate = await getVideoStreamBitrate(path, probe) + const resolutionData = await getVideoStreamDimensionsInfo(path, probe) + + // If ffprobe did not manage to guess the bitrate + if (!bitRate) return false + + // check video params + if (!videoStream) return false + if (videoStream['codec_name'] !== 'h264') return false + if (videoStream['pix_fmt'] !== 'yuv420p') return false + if (fps < VIDEO_TRANSCODING_FPS.MIN || fps > VIDEO_TRANSCODING_FPS.MAX) return false + if (bitRate > getMaxBitrate({ ...resolutionData, fps })) return false + + return true +} diff --git a/server/lib/transcoding/transcoding-resolutions.ts b/server/lib/transcoding/transcoding-resolutions.ts new file mode 100644 index 000000000..91f4d18d8 --- /dev/null +++ b/server/lib/transcoding/transcoding-resolutions.ts @@ -0,0 +1,52 @@ +import { CONFIG } from '@server/initializers/config' +import { toEven } from '@shared/core-utils' +import { VideoResolution } from '@shared/models' + +export function computeResolutionsToTranscode (options: { + input: number + type: 'vod' | 'live' + includeInput: boolean + strictLower: boolean + hasAudio: boolean +}) { + const { input, type, includeInput, strictLower, hasAudio } = options + + const configResolutions = type === 'vod' + ? CONFIG.TRANSCODING.RESOLUTIONS + : CONFIG.LIVE.TRANSCODING.RESOLUTIONS + + const resolutionsEnabled = new Set() + + // Put in the order we want to proceed jobs + const availableResolutions: VideoResolution[] = [ + VideoResolution.H_NOVIDEO, + VideoResolution.H_480P, + VideoResolution.H_360P, + VideoResolution.H_720P, + VideoResolution.H_240P, + VideoResolution.H_144P, + VideoResolution.H_1080P, + VideoResolution.H_1440P, + VideoResolution.H_4K + ] + + for (const resolution of availableResolutions) { + // Resolution not enabled + if (configResolutions[resolution + 'p'] !== true) continue + // Too big resolution for input file + if (input < resolution) continue + // We only want lower resolutions than input file + if (strictLower && input === resolution) continue + // Audio resolutio but no audio in the video + if (resolution === VideoResolution.H_NOVIDEO && !hasAudio) continue + + resolutionsEnabled.add(resolution) + } + + if (includeInput) { + // Always use an even resolution to avoid issues with ffmpeg + resolutionsEnabled.add(toEven(input)) + } + + return Array.from(resolutionsEnabled) +} diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts deleted file mode 100644 index c7b61e9ba..000000000 --- a/server/lib/transcoding/transcoding.ts +++ /dev/null @@ -1,465 +0,0 @@ -import { MutexInterface } from 'async-mutex' -import { Job } from 'bullmq' -import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' -import { basename, extname as extnameUtil, join } from 'path' -import { toEven } from '@server/helpers/core-utils' -import { retryTransactionWrapper } from '@server/helpers/database-utils' -import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' -import { sequelizeTypescript } from '@server/initializers/database' -import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' -import { pick } from '@shared/core-utils' -import { VideoResolution, VideoStorage } from '../../../shared/models/videos' -import { - buildFileMetadata, - canDoQuickTranscode, - computeResolutionsToTranscode, - ffprobePromise, - getVideoStreamDuration, - getVideoStreamFPS, - transcodeVOD, - TranscodeVODOptions, - TranscodeVODOptionsType -} from '../../helpers/ffmpeg' -import { CONFIG } from '../../initializers/config' -import { VideoFileModel } from '../../models/video/video-file' -import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist' -import { updatePlaylistAfterFileChange } from '../hls' -import { generateHLSVideoFilename, generateWebTorrentVideoFilename, getHlsResolutionPlaylistFilename } from '../paths' -import { VideoPathManager } from '../video-path-manager' -import { VideoTranscodingProfilesManager } from './default-transcoding-profiles' - -/** - * - * Functions that run transcoding functions, update the database, cleanup files, create torrent files... - * Mainly called by the job queue - * - */ - -// Optimize the original video file and replace it. The resolution is not changed. -async function optimizeOriginalVideofile (options: { - video: MVideoFullLight - inputVideoFile: MVideoFile - job: Job -}) { - const { video, inputVideoFile, job } = options - - const transcodeDirectory = CONFIG.STORAGE.TMP_DIR - const newExtname = '.mp4' - - // Will be released by our transcodeVOD function once ffmpeg is ran - const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - await video.reload() - - const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) - - const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => { - const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) - - const transcodeType: TranscodeVODOptionsType = await canDoQuickTranscode(videoInputPath) - ? 'quick-transcode' - : 'video' - - const resolution = buildOriginalFileResolution(inputVideoFile.resolution) - - const transcodeOptions: TranscodeVODOptions = { - type: transcodeType, - - inputPath: videoInputPath, - outputPath: videoTranscodedPath, - - inputFileMutexReleaser, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE, - - resolution, - - job - } - - // Could be very long! - await transcodeVOD(transcodeOptions) - - // Important to do this before getVideoFilename() to take in account the new filename - inputVideoFile.resolution = resolution - inputVideoFile.extname = newExtname - inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname) - inputVideoFile.storage = VideoStorage.FILE_SYSTEM - - const { videoFile } = await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile) - await remove(videoInputPath) - - return { transcodeType, videoFile } - }) - - return result - } finally { - inputFileMutexReleaser() - } -} - -// Transcode the original video file to a lower resolution compatible with WebTorrent -async function transcodeNewWebTorrentResolution (options: { - video: MVideoFullLight - resolution: VideoResolution - job: Job -}) { - const { video, resolution, job } = options - - const transcodeDirectory = CONFIG.STORAGE.TMP_DIR - const newExtname = '.mp4' - - const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - await video.reload() - - const file = video.getMaxQualityFile().withVideoOrPlaylist(video) - - const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => { - const newVideoFile = new VideoFileModel({ - resolution, - extname: newExtname, - filename: generateWebTorrentVideoFilename(resolution, newExtname), - size: 0, - videoId: video.id - }) - - const videoTranscodedPath = join(transcodeDirectory, newVideoFile.filename) - - const transcodeOptions = resolution === VideoResolution.H_NOVIDEO - ? { - type: 'only-audio' as 'only-audio', - - inputPath: videoInputPath, - outputPath: videoTranscodedPath, - - inputFileMutexReleaser, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE, - - resolution, - - job - } - : { - type: 'video' as 'video', - inputPath: videoInputPath, - outputPath: videoTranscodedPath, - - inputFileMutexReleaser, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE, - - resolution, - - job - } - - await transcodeVOD(transcodeOptions) - - return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, newVideoFile) - }) - - return result - } finally { - inputFileMutexReleaser() - } -} - -// Merge an image with an audio file to create a video -async function mergeAudioVideofile (options: { - video: MVideoFullLight - resolution: VideoResolution - job: Job -}) { - const { video, resolution, job } = options - - const transcodeDirectory = CONFIG.STORAGE.TMP_DIR - const newExtname = '.mp4' - - const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - await video.reload() - - const inputVideoFile = video.getMinQualityFile() - - const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) - - const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => { - const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) - - // If the user updates the video preview during transcoding - const previewPath = video.getPreview().getPath() - const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath)) - await copyFile(previewPath, tmpPreviewPath) - - const transcodeOptions = { - type: 'merge-audio' as 'merge-audio', - - inputPath: tmpPreviewPath, - outputPath: videoTranscodedPath, - - inputFileMutexReleaser, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE, - - audioPath: audioInputPath, - resolution, - - job - } - - try { - await transcodeVOD(transcodeOptions) - - await remove(audioInputPath) - await remove(tmpPreviewPath) - } catch (err) { - await remove(tmpPreviewPath) - throw err - } - - // Important to do this before getVideoFilename() to take in account the new file extension - inputVideoFile.extname = newExtname - inputVideoFile.resolution = resolution - inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname) - - // ffmpeg generated a new video file, so update the video duration - // See https://trac.ffmpeg.org/ticket/5456 - video.duration = await getVideoStreamDuration(videoTranscodedPath) - await video.save() - - return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile) - }) - - return result - } finally { - inputFileMutexReleaser() - } -} - -// Concat TS segments from a live video to a fragmented mp4 HLS playlist -async function generateHlsPlaylistResolutionFromTS (options: { - video: MVideo - concatenatedTsFilePath: string - resolution: VideoResolution - isAAC: boolean - inputFileMutexReleaser: MutexInterface.Releaser -}) { - return generateHlsPlaylistCommon({ - type: 'hls-from-ts' as 'hls-from-ts', - inputPath: options.concatenatedTsFilePath, - - ...pick(options, [ 'video', 'resolution', 'inputFileMutexReleaser', 'isAAC' ]) - }) -} - -// Generate an HLS playlist from an input file, and update the master playlist -function generateHlsPlaylistResolution (options: { - video: MVideo - videoInputPath: string - resolution: VideoResolution - copyCodecs: boolean - inputFileMutexReleaser: MutexInterface.Releaser - job?: Job -}) { - return generateHlsPlaylistCommon({ - type: 'hls' as 'hls', - inputPath: options.videoInputPath, - - ...pick(options, [ 'video', 'resolution', 'copyCodecs', 'inputFileMutexReleaser', 'job' ]) - }) -} - -// --------------------------------------------------------------------------- - -export { - generateHlsPlaylistResolution, - generateHlsPlaylistResolutionFromTS, - optimizeOriginalVideofile, - transcodeNewWebTorrentResolution, - mergeAudioVideofile -} - -// --------------------------------------------------------------------------- - -async function onWebTorrentVideoFileTranscoding ( - video: MVideoFullLight, - videoFile: MVideoFile, - transcodingPath: string, - newVideoFile: MVideoFile -) { - const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - await video.reload() - - const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile) - - const stats = await stat(transcodingPath) - - const probe = await ffprobePromise(transcodingPath) - const fps = await getVideoStreamFPS(transcodingPath, probe) - const metadata = await buildFileMetadata(transcodingPath, probe) - - await move(transcodingPath, outputPath, { overwrite: true }) - - videoFile.size = stats.size - videoFile.fps = fps - videoFile.metadata = metadata - - await createTorrentAndSetInfoHash(video, videoFile) - - const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution }) - if (oldFile) await video.removeWebTorrentFile(oldFile) - - await VideoFileModel.customUpsert(videoFile, 'video', undefined) - video.VideoFiles = await video.$get('VideoFiles') - - return { video, videoFile } - } finally { - mutexReleaser() - } -} - -async function generateHlsPlaylistCommon (options: { - type: 'hls' | 'hls-from-ts' - video: MVideo - inputPath: string - resolution: VideoResolution - - inputFileMutexReleaser: MutexInterface.Releaser - - copyCodecs?: boolean - isAAC?: boolean - - job?: Job -}) { - const { type, video, inputPath, resolution, copyCodecs, isAAC, job, inputFileMutexReleaser } = options - const transcodeDirectory = CONFIG.STORAGE.TMP_DIR - - const videoTranscodedBasePath = join(transcodeDirectory, type) - await ensureDir(videoTranscodedBasePath) - - const videoFilename = generateHLSVideoFilename(resolution) - const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFilename) - const resolutionPlaylistFileTranscodePath = join(videoTranscodedBasePath, resolutionPlaylistFilename) - - const transcodeOptions = { - type, - - inputPath, - outputPath: resolutionPlaylistFileTranscodePath, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.TRANSCODING.PROFILE, - - resolution, - copyCodecs, - - isAAC, - - inputFileMutexReleaser, - - hlsPlaylist: { - videoFilename - }, - - job - } - - await transcodeVOD(transcodeOptions) - - // Create or update the playlist - const playlist = await retryTransactionWrapper(() => { - return sequelizeTypescript.transaction(async transaction => { - return VideoStreamingPlaylistModel.loadOrGenerate(video, transaction) - }) - }) - - const newVideoFile = new VideoFileModel({ - resolution, - extname: extnameUtil(videoFilename), - size: 0, - filename: videoFilename, - fps: -1, - videoStreamingPlaylistId: playlist.id - }) - - const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - - try { - // VOD transcoding is a long task, refresh video attributes - await video.reload() - - const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, newVideoFile) - await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video)) - - // Move playlist file - const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, resolutionPlaylistFilename) - await move(resolutionPlaylistFileTranscodePath, resolutionPlaylistPath, { overwrite: true }) - // Move video file - await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true }) - - // Update video duration if it was not set (in case of a live for example) - if (!video.duration) { - video.duration = await getVideoStreamDuration(videoFilePath) - await video.save() - } - - const stats = await stat(videoFilePath) - - newVideoFile.size = stats.size - newVideoFile.fps = await getVideoStreamFPS(videoFilePath) - newVideoFile.metadata = await buildFileMetadata(videoFilePath) - - await createTorrentAndSetInfoHash(playlist, newVideoFile) - - const oldFile = await VideoFileModel.loadHLSFile({ - playlistId: playlist.id, - fps: newVideoFile.fps, - resolution: newVideoFile.resolution - }) - - if (oldFile) { - await video.removeStreamingPlaylistVideoFile(playlist, oldFile) - await oldFile.destroy() - } - - const savedVideoFile = await VideoFileModel.customUpsert(newVideoFile, 'streaming-playlist', undefined) - - await updatePlaylistAfterFileChange(video, playlist) - - return { resolutionPlaylistPath, videoFile: savedVideoFile } - } finally { - mutexReleaser() - } -} - -function buildOriginalFileResolution (inputResolution: number) { - if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) { - return toEven(inputResolution) - } - - const resolutions = computeResolutionsToTranscode({ - input: inputResolution, - type: 'vod', - includeInput: false, - strictLower: false, - // We don't really care about the audio resolution in this context - hasAudio: true - }) - - if (resolutions.length === 0) { - return toEven(inputResolution) - } - - return Math.max(...resolutions) -} diff --git a/server/lib/transcoding/web-transcoding.ts b/server/lib/transcoding/web-transcoding.ts new file mode 100644 index 000000000..d43d03b2a --- /dev/null +++ b/server/lib/transcoding/web-transcoding.ts @@ -0,0 +1,273 @@ +import { Job } from 'bullmq' +import { copyFile, move, remove, stat } from 'fs-extra' +import { basename, join } from 'path' +import { computeOutputFPS } from '@server/helpers/ffmpeg' +import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' +import { MVideoFile, MVideoFullLight } from '@server/types/models' +import { toEven } from '@shared/core-utils' +import { ffprobePromise, getVideoStreamDuration, getVideoStreamFPS, TranscodeVODOptionsType } from '@shared/ffmpeg' +import { VideoResolution, VideoStorage } from '@shared/models' +import { CONFIG } from '../../initializers/config' +import { VideoFileModel } from '../../models/video/video-file' +import { generateWebTorrentVideoFilename } from '../paths' +import { buildFileMetadata } from '../video-file' +import { VideoPathManager } from '../video-path-manager' +import { buildFFmpegVOD } from './shared' +import { computeResolutionsToTranscode } from './transcoding-resolutions' + +// Optimize the original video file and replace it. The resolution is not changed. +export async function optimizeOriginalVideofile (options: { + video: MVideoFullLight + inputVideoFile: MVideoFile + quickTranscode: boolean + job: Job +}) { + const { video, inputVideoFile, quickTranscode, job } = options + + const transcodeDirectory = CONFIG.STORAGE.TMP_DIR + const newExtname = '.mp4' + + // Will be released by our transcodeVOD function once ffmpeg is ran + const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await video.reload() + + const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) + + const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => { + const videoOutputPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) + + const transcodeType: TranscodeVODOptionsType = quickTranscode + ? 'quick-transcode' + : 'video' + + const resolution = buildOriginalFileResolution(inputVideoFile.resolution) + const fps = computeOutputFPS({ inputFPS: inputVideoFile.fps, resolution }) + + // Could be very long! + await buildFFmpegVOD(job).transcode({ + type: transcodeType, + + inputPath: videoInputPath, + outputPath: videoOutputPath, + + inputFileMutexReleaser, + + resolution, + fps + }) + + // Important to do this before getVideoFilename() to take in account the new filename + inputVideoFile.resolution = resolution + inputVideoFile.extname = newExtname + inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname) + inputVideoFile.storage = VideoStorage.FILE_SYSTEM + + const { videoFile } = await onWebTorrentVideoFileTranscoding({ + video, + videoFile: inputVideoFile, + videoOutputPath + }) + + await remove(videoInputPath) + + return { transcodeType, videoFile } + }) + + return result + } finally { + inputFileMutexReleaser() + } +} + +// Transcode the original video file to a lower resolution compatible with WebTorrent +export async function transcodeNewWebTorrentResolution (options: { + video: MVideoFullLight + resolution: VideoResolution + fps: number + job: Job +}) { + const { video, resolution, fps, job } = options + + const transcodeDirectory = CONFIG.STORAGE.TMP_DIR + const newExtname = '.mp4' + + const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await video.reload() + + const file = video.getMaxQualityFile().withVideoOrPlaylist(video) + + const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => { + const newVideoFile = new VideoFileModel({ + resolution, + extname: newExtname, + filename: generateWebTorrentVideoFilename(resolution, newExtname), + size: 0, + videoId: video.id + }) + + const videoOutputPath = join(transcodeDirectory, newVideoFile.filename) + + const transcodeOptions = { + type: 'video' as 'video', + + inputPath: videoInputPath, + outputPath: videoOutputPath, + + inputFileMutexReleaser, + + resolution, + fps + } + + await buildFFmpegVOD(job).transcode(transcodeOptions) + + return onWebTorrentVideoFileTranscoding({ video, videoFile: newVideoFile, videoOutputPath }) + }) + + return result + } finally { + inputFileMutexReleaser() + } +} + +// Merge an image with an audio file to create a video +export async function mergeAudioVideofile (options: { + video: MVideoFullLight + resolution: VideoResolution + fps: number + job: Job +}) { + const { video, resolution, fps, job } = options + + const transcodeDirectory = CONFIG.STORAGE.TMP_DIR + const newExtname = '.mp4' + + const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await video.reload() + + const inputVideoFile = video.getMinQualityFile() + + const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) + + const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => { + const videoOutputPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) + + // If the user updates the video preview during transcoding + const previewPath = video.getPreview().getPath() + const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath)) + await copyFile(previewPath, tmpPreviewPath) + + const transcodeOptions = { + type: 'merge-audio' as 'merge-audio', + + inputPath: tmpPreviewPath, + outputPath: videoOutputPath, + + inputFileMutexReleaser, + + audioPath: audioInputPath, + resolution, + fps + } + + try { + await buildFFmpegVOD(job).transcode(transcodeOptions) + + await remove(audioInputPath) + await remove(tmpPreviewPath) + } catch (err) { + await remove(tmpPreviewPath) + throw err + } + + // Important to do this before getVideoFilename() to take in account the new file extension + inputVideoFile.extname = newExtname + inputVideoFile.resolution = resolution + inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname) + + // ffmpeg generated a new video file, so update the video duration + // See https://trac.ffmpeg.org/ticket/5456 + video.duration = await getVideoStreamDuration(videoOutputPath) + await video.save() + + return onWebTorrentVideoFileTranscoding({ + video, + videoFile: inputVideoFile, + videoOutputPath + }) + }) + + return result + } finally { + inputFileMutexReleaser() + } +} + +export async function onWebTorrentVideoFileTranscoding (options: { + video: MVideoFullLight + videoFile: MVideoFile + videoOutputPath: string +}) { + const { video, videoFile, videoOutputPath } = options + + const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await video.reload() + + const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, videoFile) + + const stats = await stat(videoOutputPath) + + const probe = await ffprobePromise(videoOutputPath) + const fps = await getVideoStreamFPS(videoOutputPath, probe) + const metadata = await buildFileMetadata(videoOutputPath, probe) + + await move(videoOutputPath, outputPath, { overwrite: true }) + + videoFile.size = stats.size + videoFile.fps = fps + videoFile.metadata = metadata + + await createTorrentAndSetInfoHash(video, videoFile) + + const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution }) + if (oldFile) await video.removeWebTorrentFile(oldFile) + + await VideoFileModel.customUpsert(videoFile, 'video', undefined) + video.VideoFiles = await video.$get('VideoFiles') + + return { video, videoFile } + } finally { + mutexReleaser() + } +} + +// --------------------------------------------------------------------------- + +function buildOriginalFileResolution (inputResolution: number) { + if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) { + return toEven(inputResolution) + } + + const resolutions = computeResolutionsToTranscode({ + input: inputResolution, + type: 'vod', + includeInput: false, + strictLower: false, + // We don't really care about the audio resolution in this context + hasAudio: true + }) + + if (resolutions.length === 0) { + return toEven(inputResolution) + } + + return Math.max(...resolutions) +} diff --git a/server/lib/uploadx.ts b/server/lib/uploadx.ts index 58040cb6d..c7e0eb414 100644 --- a/server/lib/uploadx.ts +++ b/server/lib/uploadx.ts @@ -3,6 +3,7 @@ import { buildLogger } from '@server/helpers/logger' import { getResumableUploadPath } from '@server/helpers/upload' import { CONFIG } from '@server/initializers/config' import { LogLevel, Uploadx } from '@uploadx/core' +import { extname } from 'path' const logger = buildLogger('uploadx') @@ -26,7 +27,9 @@ const uploadx = new Uploadx({ if (!res.locals.oauth) return undefined return res.locals.oauth.token.user.id + '' - } + }, + + filename: file => `${file.userId}-${file.id}${extname(file.metadata.filename)}` }) export { diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index fd5837a3a..cb1ea834c 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts @@ -81,7 +81,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video } if (videoInstance.isLive) { - LiveManager.Instance.stopSessionOf(videoInstance.id, LiveVideoError.BLACKLISTED) + LiveManager.Instance.stopSessionOf(videoInstance.uuid, LiveVideoError.BLACKLISTED) } Notifier.Instance.notifyOnVideoBlacklist(blacklist) diff --git a/server/lib/video-file.ts b/server/lib/video-file.ts index 2ab7190f1..8fcc3c253 100644 --- a/server/lib/video-file.ts +++ b/server/lib/video-file.ts @@ -1,6 +1,44 @@ +import { FfprobeData } from 'fluent-ffmpeg' import { logger } from '@server/helpers/logger' +import { VideoFileModel } from '@server/models/video/video-file' import { MVideoWithAllFiles } from '@server/types/models' +import { getLowercaseExtension } from '@shared/core-utils' +import { getFileSize } from '@shared/extra-utils' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' +import { VideoFileMetadata, VideoResolution } from '@shared/models' import { lTags } from './object-storage/shared' +import { generateHLSVideoFilename, generateWebTorrentVideoFilename } from './paths' + +async function buildNewFile (options: { + path: string + mode: 'web-video' | 'hls' +}) { + const { path, mode } = options + + const probe = await ffprobePromise(path) + const size = await getFileSize(path) + + const videoFile = new VideoFileModel({ + extname: getLowercaseExtension(path), + size, + metadata: await buildFileMetadata(path, probe) + }) + + if (await isAudioFile(path, probe)) { + videoFile.resolution = VideoResolution.H_NOVIDEO + } else { + videoFile.fps = await getVideoStreamFPS(path, probe) + videoFile.resolution = (await getVideoStreamDimensionsInfo(path, probe)).resolution + } + + videoFile.filename = mode === 'web-video' + ? generateWebTorrentVideoFilename(videoFile.resolution, videoFile.extname) + : generateHLSVideoFilename(videoFile.resolution) + + return videoFile +} + +// --------------------------------------------------------------------------- async function removeHLSPlaylist (video: MVideoWithAllFiles) { const hls = video.getHLSPlaylist() @@ -61,9 +99,23 @@ async function removeWebTorrentFile (video: MVideoWithAllFiles, fileToDeleteId: return video } +// --------------------------------------------------------------------------- + +async function buildFileMetadata (path: string, existingProbe?: FfprobeData) { + const metadata = existingProbe || await ffprobePromise(path) + + return new VideoFileMetadata(metadata) +} + +// --------------------------------------------------------------------------- + export { + buildNewFile, + removeHLSPlaylist, removeHLSFile, removeAllWebTorrentFiles, - removeWebTorrentFile + removeWebTorrentFile, + + buildFileMetadata } diff --git a/server/lib/video-studio.ts b/server/lib/video-studio.ts index cdacd35f2..b392bdb00 100644 --- a/server/lib/video-studio.ts +++ b/server/lib/video-studio.ts @@ -1,5 +1,5 @@ import { MVideoFullLight } from '@server/types/models' -import { getVideoStreamDuration } from '@shared/extra-utils' +import { getVideoStreamDuration } from '@shared/ffmpeg' import { VideoStudioTask } from '@shared/models' function buildTaskFileFieldname (indice: number, fieldName = 'file') { diff --git a/server/lib/video.ts b/server/lib/video.ts index aacc41a7a..588dc553f 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -2,14 +2,14 @@ import { UploadFiles } from 'express' import memoizee from 'memoizee' import { Transaction } from 'sequelize/types' import { CONFIG } from '@server/initializers/config' -import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' +import { MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' import { TagModel } from '@server/models/video/tag' import { VideoModel } from '@server/models/video/video' import { VideoJobInfoModel } from '@server/models/video/video-job-info' import { FilteredModelAttributes } from '@server/types' -import { MThumbnail, MUserId, MVideoFile, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' -import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' -import { CreateJobArgument, CreateJobOptions, JobQueue } from './job-queue/job-queue' +import { MThumbnail, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' +import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState } from '@shared/models' +import { CreateJobArgument, JobQueue } from './job-queue/job-queue' import { updateVideoMiniatureFromExisting } from './thumbnail' import { moveFilesIfPrivacyChanged } from './video-privacy' @@ -87,58 +87,6 @@ async function setVideoTags (options: { // --------------------------------------------------------------------------- -async function buildOptimizeOrMergeAudioJob (options: { - video: MVideoUUID - videoFile: MVideoFile - user: MUserId - isNewVideo?: boolean // Default true -}) { - const { video, videoFile, user, isNewVideo } = options - - let payload: VideoTranscodingPayload - - if (videoFile.isAudio()) { - payload = { - type: 'merge-audio-to-webtorrent', - resolution: DEFAULT_AUDIO_RESOLUTION, - videoUUID: video.uuid, - createHLSIfNeeded: true, - isNewVideo - } - } else { - payload = { - type: 'optimize-to-webtorrent', - videoUUID: video.uuid, - isNewVideo - } - } - - await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') - - return { - type: 'video-transcoding' as 'video-transcoding', - priority: await getTranscodingJobPriority(user), - payload - } -} - -async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { - await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') - - return { type: 'video-transcoding' as 'video-transcoding', payload, ...options } -} - -async function getTranscodingJobPriority (user: MUserId) { - const now = new Date() - const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) - - const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) - - return JOB_PRIORITY.TRANSCODING + videoUploadedByUser -} - -// --------------------------------------------------------------------------- - async function buildMoveToObjectStorageJob (options: { video: MVideoUUID previousVideoState: VideoState @@ -235,10 +183,7 @@ export { buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags, - buildOptimizeOrMergeAudioJob, - buildTranscodingJob, buildMoveToObjectStorageJob, - getTranscodingJobPriority, addVideoJobsAfterUpdate, getCachedVideoDuration } -- cgit v1.2.3