X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Fvideo-transcoding.ts;h=904ef2e3cf714ee796638071a60a975928143dc2;hb=b46cf4b920984492df598c1b61179acfc7f6f22e;hp=4718a7d5c456fb8945eab9205b9415bc99647429;hpb=69eddafb17c4cf8e5a6dbd19e6d216c58140d153;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4718a7d5c..904ef2e3c 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,9 +1,11 @@ -import * as Bull from 'bull' +import { Job } from 'bull' import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' -import { JOB_PRIORITY } from '@server/initializers/constants' -import { getJobTranscodingPriorityMalus, publishAndFederateIfNeeded } from '@server/lib/video' -import { getVideoFilePath } from '@server/lib/video-paths' -import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' +import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' +import { UserModel } from '@server/models/user/user' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' import { HLSTranscodingPayload, MergeAudioTranscodingPayload, @@ -13,49 +15,35 @@ import { } from '../../../../shared' import { retryTransactionWrapper } from '../../../helpers/database-utils' import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils' -import { logger } from '../../../helpers/logger' +import { logger, loggerTagsFactory } from '../../../helpers/logger' import { CONFIG } from '../../../initializers/config' -import { sequelizeTypescript } from '../../../initializers/database' import { VideoModel } from '../../../models/video/video' -import { federateVideoIfNeeded } from '../../activitypub/videos' -import { Notifier } from '../../notifier' import { generateHlsPlaylistResolution, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution -} from '../../video-transcoding' -import { JobQueue } from '../job-queue' -import { UserModel } from '@server/models/account/user' +} from '../../transcoding/video-transcoding' -type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise +type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise -const handlers: { [ id: string ]: HandlerFunction } = { - // Deprecated, introduced in 3.1 - 'hls': handleHLSJob, +const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { 'new-resolution-to-hls': handleHLSJob, - - // Deprecated, introduced in 3.1 - 'new-resolution': handleNewWebTorrentResolutionJob, 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob, - - // Deprecated, introduced in 3.1 - 'merge-audio': handleWebTorrentMergeAudioJob, 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob, - - // Deprecated, introduced in 3.1 - 'optimize': handleWebTorrentOptimizeJob, 'optimize-to-webtorrent': handleWebTorrentOptimizeJob } -async function processVideoTranscoding (job: Bull.Job) { +const lTags = loggerTagsFactory('transcoding') + +async function processVideoTranscoding (job: Job) { const payload = job.data as VideoTranscodingPayload - logger.info('Processing video file in job %d.', job.id) + logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID)) const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) // No video, maybe deleted? if (!video) { - logger.info('Do not process job %d, video does not exist.', job.id) + logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID)) return undefined } @@ -64,10 +52,18 @@ async function processVideoTranscoding (job: Bull.Job) { const handler = handlers[payload.type] if (!handler) { + await moveToFailedTranscodingState(video) + throw new Error('Cannot find transcoding handler for ' + payload.type) } - await handler(job, payload, video, user) + try { + await handler(job, payload, video, user) + } catch (error) { + await moveToFailedTranscodingState(video) + + throw error + } return video } @@ -76,145 +72,157 @@ async function processVideoTranscoding (job: Bull.Job) { // Job handlers // --------------------------------------------------------------------------- -async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { +async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { + logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) + const videoFileInput = payload.copyCodecs ? video.getWebTorrentFile(payload.resolution) : video.getMaxQualityFile() const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() - const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) - await generateHlsPlaylistResolution({ - video, - videoInputPath, - resolution: payload.resolution, - copyCodecs: payload.copyCodecs, - isPortraitMode: payload.isPortraitMode || false, - job + await VideoPathManager.Instance.makeAvailableVideoFile(videoOrStreamingPlaylist, videoFileInput, videoInputPath => { + return generateHlsPlaylistResolution({ + video, + videoInputPath, + resolution: payload.resolution, + copyCodecs: payload.copyCodecs, + isPortraitMode: payload.isPortraitMode || false, + job + }) }) - await retryTransactionWrapper(onHlsPlaylistGeneration, video, payload.resolution) + logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) + + await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) } async function handleNewWebTorrentResolutionJob ( - job: Bull.Job, + job: Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight, user: MUserId ) { + logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) + await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) + logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) } -async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { +async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { + logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) + await mergeAudioVideofile(video, payload.resolution, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) + logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await createLowerResolutionsJobs(video, user, payload.resolution, false) + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) } -async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { - const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) +async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { + logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) + + const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) + + logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) } // --------------------------------------------------------------------------- -async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: number) { - if (video === undefined) return undefined - - const maxQualityFile = video.getMaxQualityFile() - - // We generated the max quality HLS playlist, we don't need the webtorrent files anymore if the admin disabled it - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false && video.hasWebTorrentFiles() && maxQualityFile.resolution === resolution) { +async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { + if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + // Remove webtorrent files if not enabled for (const file of video.VideoFiles) { - await video.removeFile(file) - await video.removeTorrent(file) + await video.removeWebTorrentFileAndTorrent(file) await file.destroy() } video.VideoFiles = [] + + // Create HLS new resolution jobs + await createLowerResolutionsJobs({ + video, + user, + videoFileResolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + isNewVideo: payload.isNewVideo ?? true, + type: 'hls' + }) } - return publishAndFederateIfNeeded(video) + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + await moveToNextState(video, payload.isNewVideo) } async function onVideoFileOptimizer ( videoArg: MVideoWithFile, - payload: OptimizeTranscodingPayload, + payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, transcodeType: TranscodeOptionsType, user: MUserId ) { - if (videoArg === undefined) return undefined - - // Outside the transaction (IO on disk) - const { videoFileResolution, isPortraitMode } = await videoArg.getMaxQualityResolution() - - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined - - let videoPublished = false - - // Generate HLS version of the original file - const originalFileHLSPayload = Object.assign({}, payload, { - isPortraitMode, - resolution: videoDatabase.getMaxQualityFile().resolution, - // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues - copyCodecs: transcodeType !== 'quick-transcode' - }) - await createHlsJobIfEnabled(user, originalFileHLSPayload) - - const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) - - if (!hasNewResolutions) { - // No transcoding to do, it's now published - videoPublished = await videoDatabase.publishIfNeededAndSave(t) - } - - await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) - - return { videoDatabase, videoPublished } + const { resolution, isPortraitMode } = await videoArg.getMaxQualityResolution() + + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + // Generate HLS version of the original file + const originalFileHLSPayload = { + ...payload, + + isPortraitMode, + resolution: videoDatabase.getMaxQualityFile().resolution, + // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues + copyCodecs: transcodeType !== 'quick-transcode', + isMaxQuality: true + } + const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) + const hasNewResolutions = await createLowerResolutionsJobs({ + video: videoDatabase, + user, + videoFileResolution: resolution, + isPortraitMode, + type: 'webtorrent', + isNewVideo: payload.isNewVideo ?? true }) - if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) - if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) + await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') + + // Move to next state if there are no other resolutions to generate + if (!hasHls && !hasNewResolutions) { + await moveToNextState(videoDatabase, payload.isNewVideo) + } } async function onNewWebTorrentFileResolution ( - video: MVideoUUID, + video: MVideo, user: MUserId, payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload ) { - await publishAndFederateIfNeeded(video) + await createHlsJobIfEnabled(user, { ...payload, copyCodecs: true, isMaxQuality: false }) + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') - await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) -} - -// --------------------------------------------------------------------------- - -export { - processVideoTranscoding, - onNewWebTorrentFileResolution + await moveToNextState(video, payload.isNewVideo) } -// --------------------------------------------------------------------------- - async function createHlsJobIfEnabled (user: MUserId, payload: { videoUUID: string resolution: number isPortraitMode?: boolean copyCodecs: boolean + isMaxQuality: boolean + isNewVideo?: boolean }) { - if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return + if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false const jobOptions = { - priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) + priority: await getTranscodingJobPriority(user) } const hlsTranscodingPayload: HLSTranscodingPayload = { @@ -222,60 +230,89 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { videoUUID: payload.videoUUID, resolution: payload.resolution, isPortraitMode: payload.isPortraitMode, - copyCodecs: payload.copyCodecs + copyCodecs: payload.copyCodecs, + isMaxQuality: payload.isMaxQuality, + isNewVideo: payload.isNewVideo } - return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) + await addTranscodingJob(hlsTranscodingPayload, jobOptions) + + return true } -async function createLowerResolutionsJobs ( - video: MVideoFullLight, - user: MUserId, - videoFileResolution: number, +// --------------------------------------------------------------------------- + +export { + processVideoTranscoding, + createHlsJobIfEnabled, + onNewWebTorrentFileResolution +} + +// --------------------------------------------------------------------------- + +async function createLowerResolutionsJobs (options: { + video: MVideoFullLight + user: MUserId + videoFileResolution: number isPortraitMode: boolean -) { + isNewVideo: boolean + type: 'hls' | 'webtorrent' +}) { + const { video, user, videoFileResolution, isPortraitMode, isNewVideo, type } = options + // Create transcoding jobs if there are enabled resolutions const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') - logger.info( - 'Resolutions computed for video %s and origin file resolution of %d.', video.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) - - if (resolutionsEnabled.length === 0) { - logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) - - return false - } + const resolutionCreated: string[] = [] for (const resolution of resolutionsEnabled) { let dataInput: VideoTranscodingPayload - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { // WebTorrent will create subsequent HLS job dataInput = { type: 'new-resolution-to-webtorrent', videoUUID: video.uuid, resolution, - isPortraitMode + isPortraitMode, + isNewVideo } - } else if (CONFIG.TRANSCODING.HLS.ENABLED) { + + resolutionCreated.push('webtorrent-' + resolution) + } + + if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { dataInput = { type: 'new-resolution-to-hls', videoUUID: video.uuid, resolution, isPortraitMode, - copyCodecs: false + copyCodecs: false, + isMaxQuality: false, + isNewVideo } + + resolutionCreated.push('hls-' + resolution) } + if (!dataInput) continue + const jobOptions = { - priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) + priority: await getTranscodingJobPriority(user) } - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) + await addTranscodingJob(dataInput, jobOptions) + } + + if (resolutionCreated.length === 0) { + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) + + return false } - logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) + logger.info( + 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, + { resolutionCreated, ...lTags(video.uuid) } + ) return true }