X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Fvideo-transcoding.ts;h=8d659daa6f1ce60d754072564c6e2c3e506158d0;hb=7d9ba5c08999c6482f0bc5e0c09c6f55b7724090;hp=46add57d492bce13b55af7e0aacd3c9f57805c6a;hpb=8dc8a34ee8428e7657414115d1c137592efa174d;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 46add57d4..8d659daa6 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,21 +1,51 @@ import * as Bull from 'bull' +import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' +import { getTranscodingJobPriority, publishAndFederateIfNeeded } from '@server/lib/video' +import { getVideoFilePath } from '@server/lib/video-paths' +import { UserModel } from '@server/models/user/user' +import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' import { + HLSTranscodingPayload, MergeAudioTranscodingPayload, NewResolutionTranscodingPayload, OptimizeTranscodingPayload, VideoTranscodingPayload } from '../../../../shared' +import { retryTransactionWrapper } from '../../../helpers/database-utils' +import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils' import { logger } from '../../../helpers/logger' +import { CONFIG } from '../../../initializers/config' +import { sequelizeTypescript } from '../../../initializers/database' import { VideoModel } from '../../../models/video/video' -import { JobQueue } from '../job-queue' import { federateVideoIfNeeded } from '../../activitypub/videos' -import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { sequelizeTypescript } from '../../../initializers' -import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' -import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' import { Notifier } from '../../notifier' -import { CONFIG } from '../../../initializers/config' -import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/typings/models' +import { + generateHlsPlaylistResolution, + mergeAudioVideofile, + optimizeOriginalVideofile, + transcodeNewWebTorrentResolution +} from '../../transcoding/video-transcoding' +import { JobQueue } from '../job-queue' + +type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise + +const handlers: { [ id: string ]: HandlerFunction } = { + // Deprecated, introduced in 3.1 + 'hls': handleHLSJob, + 'new-resolution-to-hls': handleHLSJob, + + // Deprecated, introduced in 3.1 + 'new-resolution': handleNewWebTorrentResolutionJob, + 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob, + + // Deprecated, introduced in 3.1 + 'merge-audio': handleWebTorrentMergeAudioJob, + 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob, + + // Deprecated, introduced in 3.1 + 'optimize': handleWebTorrentOptimizeJob, + 'optimize-to-webtorrent': handleWebTorrentOptimizeJob +} async function processVideoTranscoding (job: Bull.Job) { const payload = job.data as VideoTranscodingPayload @@ -28,54 +58,98 @@ async function processVideoTranscoding (job: Bull.Job) { return undefined } - if (payload.type === 'hls') { - await generateHlsPlaylist(video, payload.resolution, payload.copyCodecs, payload.isPortraitMode || false) + const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) - await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) - } else if (payload.type === 'new-resolution') { - await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false) + const handler = handlers[payload.type] - await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) - } else if (payload.type === 'merge-audio') { - await mergeAudioVideofile(video, payload.resolution) - - await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) - } else { - await optimizeOriginalVideofile(video) - - await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) + if (!handler) { + throw new Error('Cannot find transcoding handler for ' + payload.type) } + await handler(job, payload, video, user) + return video } -async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { +// --------------------------------------------------------------------------- +// Job handlers +// --------------------------------------------------------------------------- + +async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { + const videoFileInput = payload.copyCodecs + ? video.getWebTorrentFile(payload.resolution) + : video.getMaxQualityFile() + + const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() + const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) + + await generateHlsPlaylistResolution({ + video, + videoInputPath, + resolution: payload.resolution, + copyCodecs: payload.copyCodecs, + isPortraitMode: payload.isPortraitMode || false, + job + }) + + await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) +} + +async function handleNewWebTorrentResolutionJob ( + job: Bull.Job, + payload: NewResolutionTranscodingPayload, + video: MVideoFullLight, + user: MUserId +) { + await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) + + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) +} + +async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { + await mergeAudioVideofile(video, payload.resolution, job) + + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) +} + +async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { + const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) + + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) +} + +// --------------------------------------------------------------------------- + +async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { if (video === undefined) return undefined - // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + // Remove webtorrent files if not enabled for (const file of video.VideoFiles) { await video.removeFile(file) + await file.removeTorrent() await file.destroy() } video.VideoFiles = [] + + // Create HLS new resolution jobs + await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') } return publishAndFederateIfNeeded(video) } -async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) { - await publishAndFederateIfNeeded(video) - - await createHlsJobIfEnabled(payload) -} - -async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload) { +async function onVideoFileOptimizer ( + videoArg: MVideoWithFile, + payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, + transcodeType: TranscodeOptionsType, + user: MUserId +) { if (videoArg === undefined) return undefined // Outside the transaction (IO on disk) - const { videoFileResolution } = await videoArg.getMaxQualityResolution() + const { videoFileResolution, isPortraitMode } = await videoArg.getMaxQualityResolution() const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it @@ -83,47 +157,23 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O // Video does not exist anymore if (!videoDatabase) return undefined - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) - let videoPublished = false - const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getMaxQualityFile().resolution }) - await createHlsJobIfEnabled(hlsPayload) - - if (resolutionsEnabled.length !== 0) { - for (const resolution of resolutionsEnabled) { - let dataInput: VideoTranscodingPayload - - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { - dataInput = { - type: 'new-resolution' as 'new-resolution', - videoUUID: videoDatabase.uuid, - resolution - } - } else if (CONFIG.TRANSCODING.HLS.ENABLED) { - dataInput = { - type: 'hls', - videoUUID: videoDatabase.uuid, - resolution, - isPortraitMode: false, - copyCodecs: false - } - } - - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) - } + // Generate HLS version of the original file + const originalFileHLSPayload = Object.assign({}, payload, { + isPortraitMode, + resolution: videoDatabase.getMaxQualityFile().resolution, + // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues + copyCodecs: transcodeType !== 'quick-transcode', + isMaxQuality: true + }) + const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } else { + const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent') + + if (!hasHls && !hasNewResolutions) { // No transcoding to do, it's now published videoPublished = await videoDatabase.publishIfNeededAndSave(t) - - logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) } await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) @@ -135,48 +185,108 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) } +async function onNewWebTorrentFileResolution ( + video: MVideoUUID, + user: MUserId, + payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload +) { + await publishAndFederateIfNeeded(video) + + await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) +} + // --------------------------------------------------------------------------- export { processVideoTranscoding, - publishNewResolutionIfNeeded + onNewWebTorrentFileResolution } // --------------------------------------------------------------------------- -function createHlsJobIfEnabled (payload?: { videoUUID: string, resolution: number, isPortraitMode?: boolean }) { - // Generate HLS playlist? - if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { - const hlsTranscodingPayload = { - type: 'hls' as 'hls', - videoUUID: payload.videoUUID, - resolution: payload.resolution, - isPortraitMode: payload.isPortraitMode, - copyCodecs: true - } +async function createHlsJobIfEnabled (user: MUserId, payload: { + videoUUID: string + resolution: number + isPortraitMode?: boolean + copyCodecs: boolean + isMaxQuality: boolean +}) { + if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false + + const jobOptions = { + priority: await getTranscodingJobPriority(user) + } - return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) + const hlsTranscodingPayload: HLSTranscodingPayload = { + type: 'new-resolution-to-hls', + videoUUID: payload.videoUUID, + resolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + copyCodecs: payload.copyCodecs, + isMaxQuality: payload.isMaxQuality } + + JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) + + return true } -async function publishAndFederateIfNeeded (video: MVideoUUID) { - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined +async function createLowerResolutionsJobs ( + video: MVideoFullLight, + user: MUserId, + videoFileResolution: number, + isPortraitMode: boolean, + type: 'hls' | 'webtorrent' +) { + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') + const resolutionCreated: number[] = [] + + for (const resolution of resolutionsEnabled) { + let dataInput: VideoTranscodingPayload + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { + // WebTorrent will create subsequent HLS job + dataInput = { + type: 'new-resolution-to-webtorrent', + videoUUID: video.uuid, + resolution, + isPortraitMode + } + } - // We transcoded the video file in another format, now we can publish it - const videoPublished = await videoDatabase.publishIfNeededAndSave(t) + if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { + dataInput = { + type: 'new-resolution-to-hls', + videoUUID: video.uuid, + resolution, + isPortraitMode, + copyCodecs: false, + isMaxQuality: false + } + } - // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, videoPublished, t) + if (!dataInput) continue - return { videoDatabase, videoPublished } - }) + resolutionCreated.push(resolution) + + const jobOptions = { + priority: await getTranscodingJobPriority(user) + } + + JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) + } + + if (resolutionCreated.length === 0) { + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) - if (videoPublished) { - Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) - Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) + return false } + + logger.info( + 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, + { resolutionCreated } + ) + + return true }