From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- .../shared/job-builders/abstract-job-builder.ts | 38 +++ .../lib/transcoding/shared/job-builders/index.ts | 2 + .../job-builders/transcoding-job-queue-builder.ts | 308 +++++++++++++++++++++ .../job-builders/transcoding-runner-job-builder.ts | 189 +++++++++++++ 4 files changed, 537 insertions(+) create mode 100644 server/lib/transcoding/shared/job-builders/abstract-job-builder.ts create mode 100644 server/lib/transcoding/shared/job-builders/index.ts create mode 100644 server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts create mode 100644 server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts (limited to 'server/lib/transcoding/shared/job-builders') diff --git a/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts new file mode 100644 index 000000000..f1e9efdcf --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts @@ -0,0 +1,38 @@ + +import { JOB_PRIORITY } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' +import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' + +export abstract class AbstractJobBuilder { + + abstract createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId + }): Promise + + abstract createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId | null + }): Promise + + protected async getTranscodingJobPriority (options: { + user: MUserId + fallback: number + }) { + const { user, fallback } = options + + if (!user) return fallback + + const now = new Date() + const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) + + const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) + + return JOB_PRIORITY.TRANSCODING + videoUploadedByUser + } +} diff --git a/server/lib/transcoding/shared/job-builders/index.ts b/server/lib/transcoding/shared/job-builders/index.ts new file mode 100644 index 000000000..9b1c82adf --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/index.ts @@ -0,0 +1,2 @@ +export * from './transcoding-job-queue-builder' +export * from './transcoding-runner-job-builder' diff --git a/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts new file mode 100644 index 000000000..7c892718b --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts @@ -0,0 +1,308 @@ +import Bluebird from 'bluebird' +import { computeOutputFPS } from '@server/helpers/ffmpeg' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' +import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' +import { Hooks } from '@server/lib/plugins/hooks' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' +import { + HLSTranscodingPayload, + MergeAudioTranscodingPayload, + NewWebTorrentResolutionTranscodingPayload, + OptimizeTranscodingPayload, + VideoTranscodingPayload +} from '@shared/models' +import { canDoQuickTranscode } from '../../transcoding-quick-transcode' +import { computeResolutionsToTranscode } from '../../transcoding-resolutions' +import { AbstractJobBuilder } from './abstract-job-builder' + +export class TranscodingJobQueueBuilder extends AbstractJobBuilder { + + async createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId + }) { + const { video, videoFile, isNewVideo, user } = options + + let mergeOrOptimizePayload: MergeAudioTranscodingPayload | OptimizeTranscodingPayload + let nextTranscodingSequentialJobPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] + + const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { + const probe = await ffprobePromise(videoFilePath) + + const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) + const hasAudio = await hasAudioStream(videoFilePath, probe) + const quickTranscode = await canDoQuickTranscode(videoFilePath, probe) + const inputFPS = videoFile.isAudio() + ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value + : await getVideoStreamFPS(videoFilePath, probe) + + const maxResolution = await isAudioFile(videoFilePath, probe) + ? DEFAULT_AUDIO_RESOLUTION + : resolution + + if (CONFIG.TRANSCODING.HLS.ENABLED === true) { + nextTranscodingSequentialJobPayloads.push([ + this.buildHLSJobPayload({ + deleteWebTorrentFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, + + // We had some issues with a web video quick transcoded while producing a HLS version of it + copyCodecs: !quickTranscode, + + resolution: maxResolution, + fps: computeOutputFPS({ inputFPS, resolution: maxResolution }), + videoUUID: video.uuid, + isNewVideo + }) + ]) + } + + const lowerResolutionJobPayloads = await this.buildLowerResolutionJobPayloads({ + video, + inputVideoResolution: maxResolution, + inputVideoFPS: inputFPS, + hasAudio, + isNewVideo + }) + + nextTranscodingSequentialJobPayloads = [ ...nextTranscodingSequentialJobPayloads, ...lowerResolutionJobPayloads ] + + mergeOrOptimizePayload = videoFile.isAudio() + ? this.buildMergeAudioPayload({ videoUUID: video.uuid, isNewVideo }) + : this.buildOptimizePayload({ videoUUID: video.uuid, isNewVideo, quickTranscode }) + }) + } finally { + mutexReleaser() + } + + const nextTranscodingSequentialJobs = await Bluebird.mapSeries(nextTranscodingSequentialJobPayloads, payloads => { + return Bluebird.mapSeries(payloads, payload => { + return this.buildTranscodingJob({ payload, user }) + }) + }) + + const transcodingJobBuilderJob: CreateJobArgument = { + type: 'transcoding-job-builder', + payload: { + videoUUID: video.uuid, + sequentialJobs: nextTranscodingSequentialJobs + } + } + + const mergeOrOptimizeJob = await this.buildTranscodingJob({ payload: mergeOrOptimizePayload, user }) + + return JobQueue.Instance.createSequentialJobFlow(...[ mergeOrOptimizeJob, transcodingJobBuilderJob ]) + } + + // --------------------------------------------------------------------------- + + async createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId | null + }) { + const { video, transcodingType, resolutions, isNewVideo } = options + + const maxResolution = Math.max(...resolutions) + const childrenResolutions = resolutions.filter(r => r !== maxResolution) + + logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) + + const { fps: inputFPS } = await video.probeMaxQualityFile() + + const children = childrenResolutions.map(resolution => { + const fps = computeOutputFPS({ inputFPS, resolution }) + + if (transcodingType === 'hls') { + return this.buildHLSJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) + } + + if (transcodingType === 'webtorrent') { + return this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) + } + + throw new Error('Unknown transcoding type') + }) + + const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) + + const parent = transcodingType === 'hls' + ? this.buildHLSJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) + : this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) + + // Process the last resolution after the other ones to prevent concurrency issue + // Because low resolutions use the biggest one as ffmpeg input + await this.createTranscodingJobsWithChildren({ videoUUID: video.uuid, parent, children, user: null }) + } + + // --------------------------------------------------------------------------- + + private async createTranscodingJobsWithChildren (options: { + videoUUID: string + parent: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload) + children: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload)[] + user: MUserId | null + }) { + const { videoUUID, parent, children, user } = options + + const parentJob = await this.buildTranscodingJob({ payload: parent, user }) + const childrenJobs = await Bluebird.mapSeries(children, c => this.buildTranscodingJob({ payload: c, user })) + + await JobQueue.Instance.createJobWithChildren(parentJob, childrenJobs) + + await VideoJobInfoModel.increaseOrCreate(videoUUID, 'pendingTranscode', 1 + children.length) + } + + private async buildTranscodingJob (options: { + payload: VideoTranscodingPayload + user: MUserId | null // null means we don't want priority + }) { + const { user, payload } = options + + return { + type: 'video-transcoding' as 'video-transcoding', + priority: await this.getTranscodingJobPriority({ user, fallback: undefined }), + payload + } + } + + private async buildLowerResolutionJobPayloads (options: { + video: MVideoWithFileThumbnail + inputVideoResolution: number + inputVideoFPS: number + hasAudio: boolean + isNewVideo: boolean + }) { + const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio } = options + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = await Hooks.wrapObject( + computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), + 'filter:transcoding.auto.resolutions-to-transcode.result', + options + ) + + const sequentialPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] + + for (const resolution of resolutionsEnabled) { + const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + const payloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[] = [ + this.buildWebTorrentJobPayload({ + videoUUID: video.uuid, + resolution, + fps, + isNewVideo + }) + ] + + // Create a subsequent job to create HLS resolution that will just copy web video codecs + if (CONFIG.TRANSCODING.HLS.ENABLED) { + payloads.push( + this.buildHLSJobPayload({ + videoUUID: video.uuid, + resolution, + fps, + isNewVideo, + copyCodecs: true + }) + ) + } + + sequentialPayloads.push(payloads) + } else if (CONFIG.TRANSCODING.HLS.ENABLED) { + sequentialPayloads.push([ + this.buildHLSJobPayload({ + videoUUID: video.uuid, + resolution, + fps, + copyCodecs: false, + isNewVideo + }) + ]) + } + } + + return sequentialPayloads + } + + private buildHLSJobPayload (options: { + videoUUID: string + resolution: number + fps: number + isNewVideo: boolean + deleteWebTorrentFiles?: boolean // default false + copyCodecs?: boolean // default false + }): HLSTranscodingPayload { + const { videoUUID, resolution, fps, isNewVideo, deleteWebTorrentFiles = false, copyCodecs = false } = options + + return { + type: 'new-resolution-to-hls', + videoUUID, + resolution, + fps, + copyCodecs, + isNewVideo, + deleteWebTorrentFiles + } + } + + private buildWebTorrentJobPayload (options: { + videoUUID: string + resolution: number + fps: number + isNewVideo: boolean + }): NewWebTorrentResolutionTranscodingPayload { + const { videoUUID, resolution, fps, isNewVideo } = options + + return { + type: 'new-resolution-to-webtorrent', + videoUUID, + isNewVideo, + resolution, + fps + } + } + + private buildMergeAudioPayload (options: { + videoUUID: string + isNewVideo: boolean + }): MergeAudioTranscodingPayload { + const { videoUUID, isNewVideo } = options + + return { + type: 'merge-audio-to-webtorrent', + resolution: DEFAULT_AUDIO_RESOLUTION, + fps: VIDEO_TRANSCODING_FPS.AUDIO_MERGE, + videoUUID, + isNewVideo + } + } + + private buildOptimizePayload (options: { + videoUUID: string + quickTranscode: boolean + isNewVideo: boolean + }): OptimizeTranscodingPayload { + const { videoUUID, quickTranscode, isNewVideo } = options + + return { + type: 'optimize-to-webtorrent', + videoUUID, + isNewVideo, + quickTranscode + } + } +} diff --git a/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts new file mode 100644 index 000000000..c7a63d2e2 --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts @@ -0,0 +1,189 @@ +import { computeOutputFPS } from '@server/helpers/ffmpeg' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' +import { Hooks } from '@server/lib/plugins/hooks' +import { VODAudioMergeTranscodingJobHandler, VODHLSTranscodingJobHandler, VODWebVideoTranscodingJobHandler } from '@server/lib/runners' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' +import { computeResolutionsToTranscode } from '../../transcoding-resolutions' +import { AbstractJobBuilder } from './abstract-job-builder' + +/** + * + * Class to build transcoding job in the local job queue + * + */ + +const lTags = loggerTagsFactory('transcoding') + +export class TranscodingRunnerJobBuilder extends AbstractJobBuilder { + + async createOptimizeOrMergeAudioJobs (options: { + video: MVideoFullLight + videoFile: MVideoFile + isNewVideo: boolean + user: MUserId + }) { + const { video, videoFile, isNewVideo, user } = options + + const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + + try { + await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { + const probe = await ffprobePromise(videoFilePath) + + const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) + const hasAudio = await hasAudioStream(videoFilePath, probe) + const inputFPS = videoFile.isAudio() + ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value + : await getVideoStreamFPS(videoFilePath, probe) + + const maxResolution = await isAudioFile(videoFilePath, probe) + ? DEFAULT_AUDIO_RESOLUTION + : resolution + + const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) + const priority = await this.getTranscodingJobPriority({ user, fallback: 0 }) + + const mainRunnerJob = videoFile.isAudio() + ? await new VODAudioMergeTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority }) + : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority }) + + if (CONFIG.TRANSCODING.HLS.ENABLED === true) { + await new VODHLSTranscodingJobHandler().create({ + video, + deleteWebVideoFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, + resolution: maxResolution, + fps, + isNewVideo, + dependsOnRunnerJob: mainRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + } + + await this.buildLowerResolutionJobPayloads({ + video, + inputVideoResolution: maxResolution, + inputVideoFPS: inputFPS, + hasAudio, + isNewVideo, + mainRunnerJob, + user + }) + }) + } finally { + mutexReleaser() + } + } + + // --------------------------------------------------------------------------- + + async createTranscodingJobs (options: { + transcodingType: 'hls' | 'webtorrent' + video: MVideoFullLight + resolutions: number[] + isNewVideo: boolean + user: MUserId | null + }) { + const { video, transcodingType, resolutions, isNewVideo, user } = options + + const maxResolution = Math.max(...resolutions) + const { fps: inputFPS } = await video.probeMaxQualityFile() + const maxFPS = computeOutputFPS({ inputFPS, resolution: maxResolution }) + const priority = await this.getTranscodingJobPriority({ user, fallback: 0 }) + + const childrenResolutions = resolutions.filter(r => r !== maxResolution) + + logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) + + // Process the last resolution before the other ones to prevent concurrency issue + // Because low resolutions use the biggest one as ffmpeg input + const mainJob = transcodingType === 'hls' + // eslint-disable-next-line max-len + ? await new VODHLSTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, deleteWebVideoFiles: false, priority }) + : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, priority }) + + for (const resolution of childrenResolutions) { + const dependsOnRunnerJob = mainJob + const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) + + if (transcodingType === 'hls') { + await new VODHLSTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + deleteWebVideoFiles: false, + dependsOnRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + continue + } + + if (transcodingType === 'webtorrent') { + await new VODWebVideoTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + dependsOnRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + continue + } + + throw new Error('Unknown transcoding type') + } + } + + private async buildLowerResolutionJobPayloads (options: { + mainRunnerJob: MRunnerJob + video: MVideoWithFileThumbnail + inputVideoResolution: number + inputVideoFPS: number + hasAudio: boolean + isNewVideo: boolean + user: MUserId + }) { + const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio, mainRunnerJob, user } = options + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = await Hooks.wrapObject( + computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), + 'filter:transcoding.auto.resolutions-to-transcode.result', + options + ) + + logger.debug('Lower resolutions build for %s.', video.uuid, { resolutionsEnabled, ...lTags(video.uuid) }) + + for (const resolution of resolutionsEnabled) { + const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + await new VODWebVideoTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + dependsOnRunnerJob: mainRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + } + + if (CONFIG.TRANSCODING.HLS.ENABLED) { + await new VODHLSTranscodingJobHandler().create({ + video, + resolution, + fps, + isNewVideo, + deleteWebVideoFiles: false, + dependsOnRunnerJob: mainRunnerJob, + priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) + }) + } + } + } +} -- cgit v1.2.3