From 5e47f6ab984a7d00782e4c7030afffa1ba480add Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 4 May 2023 15:29:34 +0200 Subject: Support studio transcoding in peertube runner --- packages/peertube-runner/server/process/process.ts | 4 + .../server/process/shared/common.ts | 39 +++--- .../server/process/shared/process-studio.ts | 138 +++++++++++++++++++++ .../server/process/shared/process-vod.ts | 55 ++++---- packages/peertube-runner/server/server.ts | 7 +- packages/peertube-runner/server/shared/index.ts | 1 + .../peertube-runner/server/shared/supported-job.ts | 43 +++++++ 7 files changed, 244 insertions(+), 43 deletions(-) create mode 100644 packages/peertube-runner/server/process/shared/process-studio.ts create mode 100644 packages/peertube-runner/server/shared/index.ts create mode 100644 packages/peertube-runner/server/shared/supported-job.ts (limited to 'packages') diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts index 39a929c59..ef231cb38 100644 --- a/packages/peertube-runner/server/process/process.ts +++ b/packages/peertube-runner/server/process/process.ts @@ -1,12 +1,14 @@ import { logger } from 'packages/peertube-runner/shared/logger' import { RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobVideoEditionTranscodingPayload, RunnerJobVODAudioMergeTranscodingPayload, RunnerJobVODHLSTranscodingPayload, RunnerJobVODWebVideoTranscodingPayload } from '@shared/models' import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' +import { processStudioTranscoding } from './shared/process-studio' export async function processJob (options: ProcessOptions) { const { server, job } = options @@ -21,6 +23,8 @@ export async function processJob (options: ProcessOptions) { await processHLSTranscoding(options as ProcessOptions) } else if (job.type === 'live-rtmp-hls-transcoding') { await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions).process() + } else if (job.type === 'video-edition-transcoding') { + await processStudioTranscoding(options as ProcessOptions) } else { logger.error(`Unknown job ${job.type} to process`) return diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index 9b2c40728..3cac98388 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts @@ -2,11 +2,12 @@ import { throttle } from 'lodash' import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' import { join } from 'path' import { buildUUID } from '@shared/extra-utils' -import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' +import { FFmpegEdition, FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' import { RunnerJob, RunnerJobPayload } from '@shared/models' import { PeerTubeServer } from '@shared/server-commands' import { getTranscodingLogger } from './transcoding-logger' import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' +import { remove } from 'fs-extra' export type JobWithToken = RunnerJob & { jobToken: string } @@ -24,7 +25,14 @@ export async function downloadInputFile (options: { const { url, job, runnerToken } = options const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) - await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) + try { + await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) + } catch (err) { + remove(destination) + .catch(err => logger.error({ err }, `Cannot remove ${destination}`)) + + throw err + } return destination } @@ -40,6 +48,8 @@ export async function updateTranscodingProgress (options: { return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) } +// --------------------------------------------------------------------------- + export function buildFFmpegVOD (options: { server: PeerTubeServer runnerToken: string @@ -58,26 +68,25 @@ export function buildFFmpegVOD (options: { .catch(err => logger.error({ err }, 'Cannot send job progress')) }, updateInterval, { trailing: false }) - const config = ConfigManager.Instance.getConfig() - return new FFmpegVOD({ - niceness: config.ffmpeg.nice, - threads: config.ffmpeg.threads, - tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), - profile: 'default', - availableEncoders: { - available: getAvailableEncoders(), - encodersToTry: getEncodersToTry() - }, - logger: getTranscodingLogger(), + ...getCommonFFmpegOptions(), + updateJobProgress }) } export function buildFFmpegLive () { + return new FFmpegLive(getCommonFFmpegOptions()) +} + +export function buildFFmpegEdition () { + return new FFmpegEdition(getCommonFFmpegOptions()) +} + +function getCommonFFmpegOptions () { const config = ConfigManager.Instance.getConfig() - return new FFmpegLive({ + return { niceness: config.ffmpeg.nice, threads: config.ffmpeg.threads, tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), @@ -87,5 +96,5 @@ export function buildFFmpegLive () { encodersToTry: getEncodersToTry() }, logger: getTranscodingLogger() - }) + } } diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts new file mode 100644 index 000000000..f8262096e --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-studio.ts @@ -0,0 +1,138 @@ +import { remove } from 'fs-extra' +import { pick } from 'lodash' +import { logger } from 'packages/peertube-runner/shared' +import { extname, join } from 'path' +import { buildUUID } from '@shared/extra-utils' +import { + RunnerJobVideoEditionTranscodingPayload, + VideoEditionTranscodingSuccess, + VideoStudioTask, + VideoStudioTaskCutPayload, + VideoStudioTaskIntroPayload, + VideoStudioTaskOutroPayload, + VideoStudioTaskPayload, + VideoStudioTaskWatermarkPayload +} from '@shared/models' +import { ConfigManager } from '../../../shared/config-manager' +import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' + +export async function processStudioTranscoding (options: ProcessOptions) { + const { server, job, runnerToken } = options + const payload = job.payload + + let outputPath: string + const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) + let tmpInputFilePath = inputPath + + try { + for (const task of payload.tasks) { + const outputFilename = 'output-edition-' + buildUUID() + '.mp4' + outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) + + await processTask({ + inputPath: tmpInputFilePath, + outputPath, + task, + job, + runnerToken + }) + + if (tmpInputFilePath) await remove(tmpInputFilePath) + + // For the next iteration + tmpInputFilePath = outputPath + } + + const successBody: VideoEditionTranscodingSuccess = { + videoFile: outputPath + } + + await server.runnerJobs.success({ + jobToken: job.jobToken, + jobUUID: job.uuid, + runnerToken, + payload: successBody + }) + } finally { + await remove(tmpInputFilePath) + await remove(outputPath) + } +} + +// --------------------------------------------------------------------------- +// Private +// --------------------------------------------------------------------------- + +type TaskProcessorOptions = { + inputPath: string + outputPath: string + task: T + runnerToken: string + job: JobWithToken +} + +const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise } = { + 'add-intro': processAddIntroOutro, + 'add-outro': processAddIntroOutro, + 'cut': processCut, + 'add-watermark': processAddWatermark +} + +async function processTask (options: TaskProcessorOptions) { + const { task } = options + + const processor = taskProcessors[options.task.name] + if (!process) throw new Error('Unknown task ' + task.name) + + return processor(options) +} + +async function processAddIntroOutro (options: TaskProcessorOptions) { + const { inputPath, task, runnerToken, job } = options + + logger.debug('Adding intro/outro to ' + inputPath) + + const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) + + return buildFFmpegEdition().addIntroOutro({ + ...pick(options, [ 'inputPath', 'outputPath' ]), + + introOutroPath, + type: task.name === 'add-intro' + ? 'intro' + : 'outro' + }) +} + +function processCut (options: TaskProcessorOptions) { + const { inputPath, task } = options + + logger.debug(`Cutting ${inputPath}`) + + return buildFFmpegEdition().cutVideo({ + ...pick(options, [ 'inputPath', 'outputPath' ]), + + start: task.options.start, + end: task.options.end + }) +} + +async function processAddWatermark (options: TaskProcessorOptions) { + const { inputPath, task, runnerToken, job } = options + + logger.debug('Adding watermark to ' + inputPath) + + const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) + + return buildFFmpegEdition().addWatermark({ + ...pick(options, [ 'inputPath', 'outputPath' ]), + + watermarkPath, + + videoFilters: { + watermarkSizeRatio: task.options.watermarkSizeRatio, + horitonzalMarginRatio: task.options.horitonzalMarginRatio, + verticalMarginRatio: task.options.verticalMarginRatio + } + }) +} diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts index aae61e9c5..d84ece3cb 100644 --- a/packages/peertube-runner/server/process/shared/process-vod.ts +++ b/packages/peertube-runner/server/process/shared/process-vod.ts @@ -62,33 +62,36 @@ export async function processHLSTranscoding (options: ProcessOptions {}, - - resolution: payload.output.resolution, - fps: payload.output.fps - }) - - const successBody: VODHLSTranscodingSuccess = { - resolutionPlaylistFile: outputPath, - videoFile: videoPath + try { + await ffmpegVod.transcode({ + type: 'hls', + copyCodecs: false, + inputPath, + hlsPlaylist: { videoFilename }, + outputPath, + + inputFileMutexReleaser: () => {}, + + resolution: payload.output.resolution, + fps: payload.output.fps + }) + + const successBody: VODHLSTranscodingSuccess = { + resolutionPlaylistFile: outputPath, + videoFile: videoPath + } + + await server.runnerJobs.success({ + jobToken: job.jobToken, + jobUUID: job.uuid, + runnerToken, + payload: successBody + }) + } finally { + await remove(inputPath) + await remove(outputPath) + await remove(videoPath) } - - await server.runnerJobs.success({ - jobToken: job.jobToken, - jobUUID: job.uuid, - runnerToken, - payload: successBody - }) - - await remove(outputPath) - await remove(videoPath) } export async function processAudioMergeTranscoding (options: ProcessOptions) { diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index e851dfc7c..8eff4bd2f 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts @@ -8,6 +8,7 @@ import { ConfigManager } from '../shared' import { IPCServer } from '../shared/ipc' import { logger } from '../shared/logger' import { JobWithToken, processJob } from './process' +import { isJobSupported } from './shared' type PeerTubeServer = PeerTubeServerCommand & { runnerToken: string @@ -199,12 +200,14 @@ export class RunnerServer { const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) - if (availableJobs.length === 0) { + const filtered = availableJobs.filter(j => isJobSupported(j)) + + if (filtered.length === 0) { logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) return undefined } - return availableJobs[0] + return filtered[0] } private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { diff --git a/packages/peertube-runner/server/shared/index.ts b/packages/peertube-runner/server/shared/index.ts new file mode 100644 index 000000000..5c86bafc0 --- /dev/null +++ b/packages/peertube-runner/server/shared/index.ts @@ -0,0 +1 @@ +export * from './supported-job' diff --git a/packages/peertube-runner/server/shared/supported-job.ts b/packages/peertube-runner/server/shared/supported-job.ts new file mode 100644 index 000000000..87d5a39cc --- /dev/null +++ b/packages/peertube-runner/server/shared/supported-job.ts @@ -0,0 +1,43 @@ +import { + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobPayload, + RunnerJobType, + RunnerJobVideoEditionTranscodingPayload, + RunnerJobVODAudioMergeTranscodingPayload, + RunnerJobVODHLSTranscodingPayload, + RunnerJobVODWebVideoTranscodingPayload, + VideoStudioTaskPayload +} from '@shared/models' + +const supportedMatrix = { + 'vod-web-video-transcoding': (_payload: RunnerJobVODWebVideoTranscodingPayload) => { + return true + }, + 'vod-hls-transcoding': (_payload: RunnerJobVODHLSTranscodingPayload) => { + return true + }, + 'vod-audio-merge-transcoding': (_payload: RunnerJobVODAudioMergeTranscodingPayload) => { + return true + }, + 'live-rtmp-hls-transcoding': (_payload: RunnerJobLiveRTMPHLSTranscodingPayload) => { + return true + }, + 'video-edition-transcoding': (payload: RunnerJobVideoEditionTranscodingPayload) => { + const tasks = payload?.tasks + const supported = new Set([ 'add-intro', 'add-outro', 'add-watermark', 'cut' ]) + + if (!Array.isArray(tasks)) return false + + return tasks.every(t => t && supported.has(t.name)) + } +} + +export function isJobSupported (job: { + type: RunnerJobType + payload: RunnerJobPayload +}) { + const fn = supportedMatrix[job.type] + if (!fn) return false + + return fn(job.payload as any) +} -- cgit v1.2.3