X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Fvideo-transcoding.ts;h=95ee6b384d9902f50a4ca39738e78c3a6b2dfe1f;hb=e722fb5923ddf11d72e48cec9788abc64327c22f;hp=e9b84ecd66722aec9bcdedb0655ba874ae4bd022;hpb=73b3aa6429dfb2e31628fa09a479dce318289d7d;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index e9b84ecd6..95ee6b384 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,203 +1,332 @@ -import * as Bull from 'bull' -import { VideoResolution, VideoState } from '../../../../shared' -import { logger } from '../../../helpers/logger' -import { VideoModel } from '../../../models/video/video' -import { JobQueue } from '../job-queue' -import { federateVideoIfNeeded } from '../../activitypub' +import { Job } from 'bull' +import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' +import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' +import { UserModel } from '@server/models/user/user' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' +import { pick } from '@shared/core-utils' +import { + HLSTranscodingPayload, + MergeAudioTranscodingPayload, + NewWebTorrentResolutionTranscodingPayload, + OptimizeTranscodingPayload, + VideoResolution, + VideoTranscodingPayload +} from '@shared/models' import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { sequelizeTypescript } from '../../../initializers' -import * as Bluebird from 'bluebird' -import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' -import { generateHlsPlaylist, optimizeVideofile, transcodeOriginalVideofile, mergeAudioVideofile } from '../../video-transcoding' -import { Notifier } from '../../notifier' +import { computeLowerResolutionsToTranscode } from '../../../helpers/ffmpeg' +import { logger, loggerTagsFactory } from '../../../helpers/logger' import { CONFIG } from '../../../initializers/config' - -interface BaseTranscodingPayload { - videoUUID: string - isNewVideo?: boolean -} - -interface HLSTranscodingPayload extends BaseTranscodingPayload { - type: 'hls' - isPortraitMode?: boolean - resolution: VideoResolution -} - -interface NewResolutionTranscodingPayload extends BaseTranscodingPayload { - type: 'new-resolution' - isPortraitMode?: boolean - resolution: VideoResolution -} - -interface MergeAudioTranscodingPayload extends BaseTranscodingPayload { - type: 'merge-audio' - resolution: VideoResolution -} - -interface OptimizeTranscodingPayload extends BaseTranscodingPayload { - type: 'optimize' +import { VideoModel } from '../../../models/video/video' +import { + generateHlsPlaylistResolution, + mergeAudioVideofile, + optimizeOriginalVideofile, + transcodeNewWebTorrentResolution +} from '../../transcoding/transcoding' + +type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise + +const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { + 'new-resolution-to-hls': handleHLSJob, + 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob, + 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob, + 'optimize-to-webtorrent': handleWebTorrentOptimizeJob } -export type VideoTranscodingPayload = HLSTranscodingPayload | NewResolutionTranscodingPayload - | OptimizeTranscodingPayload | MergeAudioTranscodingPayload +const lTags = loggerTagsFactory('transcoding') -async function processVideoTranscoding (job: Bull.Job) { +async function processVideoTranscoding (job: Job) { const payload = job.data as VideoTranscodingPayload - logger.info('Processing video file in job %d.', job.id) + logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID)) const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) // No video, maybe deleted? if (!video) { - logger.info('Do not process job %d, video does not exist.', job.id) + logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID)) return undefined } - if (payload.type === 'hls') { - await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false) + const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) + + const handler = handlers[payload.type] - await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) - } else if (payload.type === 'new-resolution') { - await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) + if (!handler) { + await moveToFailedTranscodingState(video) + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + + throw new Error('Cannot find transcoding handler for ' + payload.type) + } - await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) - } else if (payload.type === 'merge-audio') { - await mergeAudioVideofile(video, payload.resolution) + try { + await handler(job, payload, video, user) + } catch (error) { + await moveToFailedTranscodingState(video) - await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) - } else { - await optimizeVideofile(video) + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') - await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) + throw error } return video } -async function onHlsPlaylistGenerationSuccess (video: VideoModel) { - if (video === undefined) return undefined - - await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined +// --------------------------------------------------------------------------- - // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, false, t) - }) +export { + processVideoTranscoding } -async function publishNewResolutionIfNeeded (video: VideoModel, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) { - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined - - let videoPublished = false +// --------------------------------------------------------------------------- +// Job handlers +// --------------------------------------------------------------------------- - // We transcoded the video file in another format, now we can publish it - if (videoDatabase.state !== VideoState.PUBLISHED) { - videoPublished = true +async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { + logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) - videoDatabase.state = VideoState.PUBLISHED - videoDatabase.publishedAt = new Date() - videoDatabase = await videoDatabase.save({ transaction: t }) - } + const videoFileInput = payload.copyCodecs + ? video.getWebTorrentFile(payload.resolution) + : video.getMaxQualityFile() - // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, videoPublished, t) + const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() - return { videoDatabase, videoPublished } + await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { + return generateHlsPlaylistResolution({ + video, + videoInputPath, + resolution: payload.resolution, + copyCodecs: payload.copyCodecs, + isPortraitMode: payload.isPortraitMode || false, + job + }) }) - if (videoPublished) { - Notifier.Instance.notifyOnNewVideo(videoDatabase) - Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) - } + logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - await createHlsJobIfEnabled(payload) + await onHlsPlaylistGeneration(video, user, payload) } -async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: OptimizeTranscodingPayload) { - if (videoArg === undefined) return undefined +async function handleNewWebTorrentResolutionJob ( + job: Job, + payload: NewWebTorrentResolutionTranscodingPayload, + video: MVideoFullLight, + user: MUserId +) { + logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) - // Outside the transaction (IO on disk) - const { videoFileResolution } = await videoArg.getOriginalFileResolution() + await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined + logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) + await onNewWebTorrentFileResolution(video, user, payload) +} - let videoPublished = false +async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { + logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) - if (resolutionsEnabled.length !== 0) { - const tasks: (Bluebird> | Promise>)[] = [] + await mergeAudioVideofile(video, payload.resolution, job) - for (const resolution of resolutionsEnabled) { - const dataInput = { - type: 'new-resolution' as 'new-resolution', - videoUUID: videoDatabase.uuid, - resolution - } + logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) - tasks.push(p) - } + await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) +} - await Promise.all(tasks) +async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { + logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } else { - videoPublished = true + const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) - // No transcoding to do, it's now published - videoDatabase.state = VideoState.PUBLISHED - videoDatabase = await videoDatabase.save({ transaction: t }) + logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) - logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) + await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) +} + +// --------------------------------------------------------------------------- + +async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { + if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { + // Remove webtorrent files if not enabled + for (const file of video.VideoFiles) { + await video.removeWebTorrentFileAndTorrent(file) + await file.destroy() } - await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) + video.VideoFiles = [] + + // Create HLS new resolution jobs + await createLowerResolutionsJobs({ + video, + user, + videoFileResolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + hasAudio: payload.hasAudio, + isNewVideo: payload.isNewVideo ?? true, + type: 'hls' + }) + } - return { videoDatabase, videoPublished } + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) +} + +async function onVideoFirstWebTorrentTranscoding ( + videoArg: MVideoWithFile, + payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, + transcodeType: TranscodeVODOptionsType, + user: MUserId +) { + const { resolution, isPortraitMode, audioStream } = await videoArg.probeMaxQualityFile() + + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + // Generate HLS version of the original file + const originalFileHLSPayload = { + ...payload, + + isPortraitMode, + hasAudio: !!audioStream, + resolution: videoDatabase.getMaxQualityFile().resolution, + // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues + copyCodecs: transcodeType !== 'quick-transcode', + isMaxQuality: true + } + const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) + const hasNewResolutions = await createLowerResolutionsJobs({ + video: videoDatabase, + user, + videoFileResolution: resolution, + hasAudio: !!audioStream, + isPortraitMode, + type: 'webtorrent', + isNewVideo: payload.isNewVideo ?? true }) - if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) - if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) + await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') - const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }) - await createHlsJobIfEnabled(hlsPayload) + // Move to next state if there are no other resolutions to generate + if (!hasHls && !hasNewResolutions) { + await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) + } } -// --------------------------------------------------------------------------- +async function onNewWebTorrentFileResolution ( + video: MVideo, + user: MUserId, + payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload +) { + if (payload.createHLSIfNeeded) { + await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) + } -export { - processVideoTranscoding, - publishNewResolutionIfNeeded + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) } // --------------------------------------------------------------------------- -function createHlsJobIfEnabled (payload?: { videoUUID: string, resolution: number, isPortraitMode?: boolean }) { - // Generate HLS playlist? - if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { - const hlsTranscodingPayload = { - type: 'hls' as 'hls', - videoUUID: payload.videoUUID, - resolution: payload.resolution, - isPortraitMode: payload.isPortraitMode +async function createHlsJobIfEnabled (user: MUserId, payload: { + videoUUID: string + resolution: number + hasAudio: boolean + isPortraitMode?: boolean + copyCodecs: boolean + isMaxQuality: boolean + isNewVideo?: boolean +}) { + if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false + + const jobOptions = { + priority: await getTranscodingJobPriority(user) + } + + const hlsTranscodingPayload: HLSTranscodingPayload = { + type: 'new-resolution-to-hls', + autoDeleteWebTorrentIfNeeded: true, + + ...pick(payload, [ 'videoUUID', 'resolution', 'isPortraitMode', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) + } + + await addTranscodingJob(hlsTranscodingPayload, jobOptions) + + return true +} + +async function createLowerResolutionsJobs (options: { + video: MVideoFullLight + user: MUserId + videoFileResolution: number + isPortraitMode: boolean + hasAudio: boolean + isNewVideo: boolean + type: 'hls' | 'webtorrent' +}) { + const { video, user, videoFileResolution, isPortraitMode, isNewVideo, hasAudio, type } = options + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeLowerResolutionsToTranscode(videoFileResolution, 'vod') + const resolutionCreated: string[] = [] + + for (const resolution of resolutionsEnabled) { + if (resolution === VideoResolution.H_NOVIDEO && hasAudio === false) continue + + let dataInput: VideoTranscodingPayload + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { + // WebTorrent will create subsequent HLS job + dataInput = { + type: 'new-resolution-to-webtorrent', + videoUUID: video.uuid, + resolution, + isPortraitMode, + hasAudio, + createHLSIfNeeded: true, + isNewVideo + } + + resolutionCreated.push('webtorrent-' + resolution) + } + + if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { + dataInput = { + type: 'new-resolution-to-hls', + videoUUID: video.uuid, + resolution, + isPortraitMode, + hasAudio, + copyCodecs: false, + isMaxQuality: false, + autoDeleteWebTorrentIfNeeded: true, + isNewVideo + } + + resolutionCreated.push('hls-' + resolution) } - return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) + if (!dataInput) continue + + const jobOptions = { + priority: await getTranscodingJobPriority(user) + } + + await addTranscodingJob(dataInput, jobOptions) + } + + if (resolutionCreated.length === 0) { + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) + + return false } + + logger.info( + 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, + { resolutionCreated, ...lTags(video.uuid) } + ) + + return true }