X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Fvideo-transcoding.ts;h=5afca65cab154fa62be86e5f2a0eb8a18a111d85;hb=9452d4fd3321148fb80b64a67bd9983fee6c208e;hp=36d9594af998f4e64103ef1093bdb12c5bedf9c6;hpb=171efc48e67498406feb6d7873b3482b41505515;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 36d9594af..5afca65ca 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,59 +1,51 @@ -import * as Bull from 'bull' -import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' -import { getTranscodingJobPriority, publishAndFederateIfNeeded } from '@server/lib/video' -import { getVideoFilePath } from '@server/lib/video-paths' +import { Job } from 'bull' +import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' +import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' import { UserModel } from '@server/models/user/user' -import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' +import { pick } from '@shared/core-utils' import { HLSTranscodingPayload, MergeAudioTranscodingPayload, - NewResolutionTranscodingPayload, + NewWebTorrentResolutionTranscodingPayload, OptimizeTranscodingPayload, + VideoResolution, VideoTranscodingPayload -} from '../../../../shared' +} from '@shared/models' import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils' -import { logger } from '../../../helpers/logger' +import { computeLowerResolutionsToTranscode } from '../../../helpers/ffmpeg' +import { logger, loggerTagsFactory } from '../../../helpers/logger' import { CONFIG } from '../../../initializers/config' import { VideoModel } from '../../../models/video/video' -import { federateVideoIfNeeded } from '../../activitypub/videos' -import { Notifier } from '../../notifier' import { generateHlsPlaylistResolution, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution -} from '../../transcoding/video-transcoding' -import { JobQueue } from '../job-queue' +} from '../../transcoding/transcoding' -type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise +type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise -const handlers: { [ id: string ]: HandlerFunction } = { - // Deprecated, introduced in 3.1 - 'hls': handleHLSJob, +const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { 'new-resolution-to-hls': handleHLSJob, - - // Deprecated, introduced in 3.1 - 'new-resolution': handleNewWebTorrentResolutionJob, 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob, - - // Deprecated, introduced in 3.1 - 'merge-audio': handleWebTorrentMergeAudioJob, 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob, - - // Deprecated, introduced in 3.1 - 'optimize': handleWebTorrentOptimizeJob, 'optimize-to-webtorrent': handleWebTorrentOptimizeJob } -async function processVideoTranscoding (job: Bull.Job) { +const lTags = loggerTagsFactory('transcoding') + +async function processVideoTranscoding (job: Job) { const payload = job.data as VideoTranscodingPayload - logger.info('Processing video file in job %d.', job.id) + logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID)) - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) + const video = await VideoModel.loadFull(payload.videoUUID) // No video, maybe deleted? if (!video) { - logger.info('Do not process job %d, video does not exist.', job.id) + logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID)) return undefined } @@ -62,138 +54,178 @@ async function processVideoTranscoding (job: Bull.Job) { const handler = handlers[payload.type] if (!handler) { + await moveToFailedTranscodingState(video) + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + throw new Error('Cannot find transcoding handler for ' + payload.type) } - await handler(job, payload, video, user) + try { + await handler(job, payload, video, user) + } catch (error) { + await moveToFailedTranscodingState(video) + + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + + throw error + } return video } +// --------------------------------------------------------------------------- + +export { + processVideoTranscoding +} + // --------------------------------------------------------------------------- // Job handlers // --------------------------------------------------------------------------- -async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { +async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { + logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) + const videoFileInput = payload.copyCodecs ? video.getWebTorrentFile(payload.resolution) : video.getMaxQualityFile() const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() - const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) - - await generateHlsPlaylistResolution({ - video, - videoInputPath, - resolution: payload.resolution, - copyCodecs: payload.copyCodecs, - isPortraitMode: payload.isPortraitMode || false, - job + + await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { + return generateHlsPlaylistResolution({ + video, + videoInputPath, + resolution: payload.resolution, + copyCodecs: payload.copyCodecs, + isPortraitMode: payload.isPortraitMode || false, + job + }) }) - await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) + logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) + + await onHlsPlaylistGeneration(video, user, payload) } async function handleNewWebTorrentResolutionJob ( - job: Bull.Job, - payload: NewResolutionTranscodingPayload, + job: Job, + payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight, user: MUserId ) { + logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) + await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) + logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) + + await onNewWebTorrentFileResolution(video, user, payload) } -async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { +async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { + logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) + await mergeAudioVideofile(video, payload.resolution, job) - await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) + logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) + + await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) } -async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { - const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) +async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { + logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) + + const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) + + logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) + await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) } // --------------------------------------------------------------------------- async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { - if (video === undefined) return undefined - - if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { // Remove webtorrent files if not enabled for (const file of video.VideoFiles) { - await video.removeFileAndTorrent(file) + await video.removeWebTorrentFileAndTorrent(file) await file.destroy() } video.VideoFiles = [] // Create HLS new resolution jobs - await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') + await createLowerResolutionsJobs({ + video, + user, + videoFileResolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + hasAudio: payload.hasAudio, + isNewVideo: payload.isNewVideo ?? true, + type: 'hls' + }) } - return publishAndFederateIfNeeded(video) + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) } -async function onVideoFileOptimizer ( +async function onVideoFirstWebTorrentTranscoding ( videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, - transcodeType: TranscodeOptionsType, + transcodeType: TranscodeVODOptionsType, user: MUserId ) { - if (videoArg === undefined) return undefined - - // Outside the transaction (IO on disk) - const { videoFileResolution, isPortraitMode } = await videoArg.getMaxQualityResolution() + const { resolution, isPortraitMode, audioStream } = await videoArg.probeMaxQualityFile() // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid) + const videoDatabase = await VideoModel.loadFull(videoArg.uuid) // Video does not exist anymore if (!videoDatabase) return undefined - let videoPublished = false - // Generate HLS version of the original file - const originalFileHLSPayload = Object.assign({}, payload, { + const originalFileHLSPayload = { + ...payload, + isPortraitMode, + hasAudio: !!audioStream, resolution: videoDatabase.getMaxQualityFile().resolution, // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues copyCodecs: transcodeType !== 'quick-transcode', isMaxQuality: true - }) + } const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) + const hasNewResolutions = await createLowerResolutionsJobs({ + video: videoDatabase, + user, + videoFileResolution: resolution, + hasAudio: !!audioStream, + isPortraitMode, + type: 'webtorrent', + isNewVideo: payload.isNewVideo ?? true + }) - const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent') + await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') + // Move to next state if there are no other resolutions to generate if (!hasHls && !hasNewResolutions) { - // No transcoding to do, it's now published - videoPublished = await videoDatabase.publishIfNeededAndSave(undefined) + await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) } - - await federateVideoIfNeeded(videoDatabase, payload.isNewVideo) - - if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) - if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) } async function onNewWebTorrentFileResolution ( - video: MVideoUUID, + video: MVideo, user: MUserId, - payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload + payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload ) { - await publishAndFederateIfNeeded(video) - - await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) -} + if (payload.createHLSIfNeeded) { + await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) + } -// --------------------------------------------------------------------------- + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') -export { - processVideoTranscoding, - onNewWebTorrentFileResolution + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) } // --------------------------------------------------------------------------- @@ -201,11 +233,13 @@ export { async function createHlsJobIfEnabled (user: MUserId, payload: { videoUUID: string resolution: number + hasAudio: boolean isPortraitMode?: boolean copyCodecs: boolean isMaxQuality: boolean + isNewVideo?: boolean }) { - if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false + if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false const jobOptions = { priority: await getTranscodingJobPriority(user) @@ -213,30 +247,34 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { const hlsTranscodingPayload: HLSTranscodingPayload = { type: 'new-resolution-to-hls', - videoUUID: payload.videoUUID, - resolution: payload.resolution, - isPortraitMode: payload.isPortraitMode, - copyCodecs: payload.copyCodecs, - isMaxQuality: payload.isMaxQuality + autoDeleteWebTorrentIfNeeded: true, + + ...pick(payload, [ 'videoUUID', 'resolution', 'isPortraitMode', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) } - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) + await addTranscodingJob(hlsTranscodingPayload, jobOptions) return true } -async function createLowerResolutionsJobs ( - video: MVideoFullLight, - user: MUserId, - videoFileResolution: number, - isPortraitMode: boolean, +async function createLowerResolutionsJobs (options: { + video: MVideoFullLight + user: MUserId + videoFileResolution: number + isPortraitMode: boolean + hasAudio: boolean + isNewVideo: boolean type: 'hls' | 'webtorrent' -) { +}) { + const { video, user, videoFileResolution, isPortraitMode, isNewVideo, hasAudio, type } = options + // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') - const resolutionCreated: number[] = [] + const resolutionsEnabled = computeLowerResolutionsToTranscode(videoFileResolution, 'vod') + const resolutionCreated: string[] = [] for (const resolution of resolutionsEnabled) { + if (resolution === VideoResolution.H_NOVIDEO && hasAudio === false) continue + let dataInput: VideoTranscodingPayload if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { @@ -245,8 +283,13 @@ async function createLowerResolutionsJobs ( type: 'new-resolution-to-webtorrent', videoUUID: video.uuid, resolution, - isPortraitMode + isPortraitMode, + hasAudio, + createHLSIfNeeded: true, + isNewVideo } + + resolutionCreated.push('webtorrent-' + resolution) } if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { @@ -255,31 +298,34 @@ async function createLowerResolutionsJobs ( videoUUID: video.uuid, resolution, isPortraitMode, + hasAudio, copyCodecs: false, - isMaxQuality: false + isMaxQuality: false, + autoDeleteWebTorrentIfNeeded: true, + isNewVideo } + + resolutionCreated.push('hls-' + resolution) } if (!dataInput) continue - resolutionCreated.push(resolution) - const jobOptions = { priority: await getTranscodingJobPriority(user) } - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) + await addTranscodingJob(dataInput, jobOptions) } if (resolutionCreated.length === 0) { - logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) return false } logger.info( 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, - { resolutionCreated } + { resolutionCreated, ...lTags(video.uuid) } ) return true