From 5e47f6ab984a7d00782e4c7030afffa1ba480add Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 4 May 2023 15:29:34 +0200 Subject: Support studio transcoding in peertube runner --- .../server/process/shared/common.ts | 39 +++--- .../server/process/shared/process-studio.ts | 138 +++++++++++++++++++++ .../server/process/shared/process-vod.ts | 55 ++++---- 3 files changed, 191 insertions(+), 41 deletions(-) create mode 100644 packages/peertube-runner/server/process/shared/process-studio.ts (limited to 'packages/peertube-runner/server/process/shared') diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index 9b2c40728..3cac98388 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts @@ -2,11 +2,12 @@ import { throttle } from 'lodash' import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' import { join } from 'path' import { buildUUID } from '@shared/extra-utils' -import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' +import { FFmpegEdition, FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' import { RunnerJob, RunnerJobPayload } from '@shared/models' import { PeerTubeServer } from '@shared/server-commands' import { getTranscodingLogger } from './transcoding-logger' import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' +import { remove } from 'fs-extra' export type JobWithToken = RunnerJob & { jobToken: string } @@ -24,7 +25,14 @@ export async function downloadInputFile (options: { const { url, job, runnerToken } = options const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) - await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) + try { + await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) + } catch (err) { + remove(destination) + .catch(err => logger.error({ err }, `Cannot remove ${destination}`)) + + throw err + } return destination } @@ -40,6 +48,8 @@ export async function updateTranscodingProgress (options: { return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) } +// --------------------------------------------------------------------------- + export function buildFFmpegVOD (options: { server: PeerTubeServer runnerToken: string @@ -58,26 +68,25 @@ export function buildFFmpegVOD (options: { .catch(err => logger.error({ err }, 'Cannot send job progress')) }, updateInterval, { trailing: false }) - const config = ConfigManager.Instance.getConfig() - return new FFmpegVOD({ - niceness: config.ffmpeg.nice, - threads: config.ffmpeg.threads, - tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), - profile: 'default', - availableEncoders: { - available: getAvailableEncoders(), - encodersToTry: getEncodersToTry() - }, - logger: getTranscodingLogger(), + ...getCommonFFmpegOptions(), + updateJobProgress }) } export function buildFFmpegLive () { + return new FFmpegLive(getCommonFFmpegOptions()) +} + +export function buildFFmpegEdition () { + return new FFmpegEdition(getCommonFFmpegOptions()) +} + +function getCommonFFmpegOptions () { const config = ConfigManager.Instance.getConfig() - return new FFmpegLive({ + return { niceness: config.ffmpeg.nice, threads: config.ffmpeg.threads, tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), @@ -87,5 +96,5 @@ export function buildFFmpegLive () { encodersToTry: getEncodersToTry() }, logger: getTranscodingLogger() - }) + } } diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts new file mode 100644 index 000000000..f8262096e --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-studio.ts @@ -0,0 +1,138 @@ +import { remove } from 'fs-extra' +import { pick } from 'lodash' +import { logger } from 'packages/peertube-runner/shared' +import { extname, join } from 'path' +import { buildUUID } from '@shared/extra-utils' +import { + RunnerJobVideoEditionTranscodingPayload, + VideoEditionTranscodingSuccess, + VideoStudioTask, + VideoStudioTaskCutPayload, + VideoStudioTaskIntroPayload, + VideoStudioTaskOutroPayload, + VideoStudioTaskPayload, + VideoStudioTaskWatermarkPayload +} from '@shared/models' +import { ConfigManager } from '../../../shared/config-manager' +import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' + +export async function processStudioTranscoding (options: ProcessOptions) { + const { server, job, runnerToken } = options + const payload = job.payload + + let outputPath: string + const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) + let tmpInputFilePath = inputPath + + try { + for (const task of payload.tasks) { + const outputFilename = 'output-edition-' + buildUUID() + '.mp4' + outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) + + await processTask({ + inputPath: tmpInputFilePath, + outputPath, + task, + job, + runnerToken + }) + + if (tmpInputFilePath) await remove(tmpInputFilePath) + + // For the next iteration + tmpInputFilePath = outputPath + } + + const successBody: VideoEditionTranscodingSuccess = { + videoFile: outputPath + } + + await server.runnerJobs.success({ + jobToken: job.jobToken, + jobUUID: job.uuid, + runnerToken, + payload: successBody + }) + } finally { + await remove(tmpInputFilePath) + await remove(outputPath) + } +} + +// --------------------------------------------------------------------------- +// Private +// --------------------------------------------------------------------------- + +type TaskProcessorOptions = { + inputPath: string + outputPath: string + task: T + runnerToken: string + job: JobWithToken +} + +const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise } = { + 'add-intro': processAddIntroOutro, + 'add-outro': processAddIntroOutro, + 'cut': processCut, + 'add-watermark': processAddWatermark +} + +async function processTask (options: TaskProcessorOptions) { + const { task } = options + + const processor = taskProcessors[options.task.name] + if (!process) throw new Error('Unknown task ' + task.name) + + return processor(options) +} + +async function processAddIntroOutro (options: TaskProcessorOptions) { + const { inputPath, task, runnerToken, job } = options + + logger.debug('Adding intro/outro to ' + inputPath) + + const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) + + return buildFFmpegEdition().addIntroOutro({ + ...pick(options, [ 'inputPath', 'outputPath' ]), + + introOutroPath, + type: task.name === 'add-intro' + ? 'intro' + : 'outro' + }) +} + +function processCut (options: TaskProcessorOptions) { + const { inputPath, task } = options + + logger.debug(`Cutting ${inputPath}`) + + return buildFFmpegEdition().cutVideo({ + ...pick(options, [ 'inputPath', 'outputPath' ]), + + start: task.options.start, + end: task.options.end + }) +} + +async function processAddWatermark (options: TaskProcessorOptions) { + const { inputPath, task, runnerToken, job } = options + + logger.debug('Adding watermark to ' + inputPath) + + const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) + + return buildFFmpegEdition().addWatermark({ + ...pick(options, [ 'inputPath', 'outputPath' ]), + + watermarkPath, + + videoFilters: { + watermarkSizeRatio: task.options.watermarkSizeRatio, + horitonzalMarginRatio: task.options.horitonzalMarginRatio, + verticalMarginRatio: task.options.verticalMarginRatio + } + }) +} diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts index aae61e9c5..d84ece3cb 100644 --- a/packages/peertube-runner/server/process/shared/process-vod.ts +++ b/packages/peertube-runner/server/process/shared/process-vod.ts @@ -62,33 +62,36 @@ export async function processHLSTranscoding (options: ProcessOptions {}, - - resolution: payload.output.resolution, - fps: payload.output.fps - }) - - const successBody: VODHLSTranscodingSuccess = { - resolutionPlaylistFile: outputPath, - videoFile: videoPath + try { + await ffmpegVod.transcode({ + type: 'hls', + copyCodecs: false, + inputPath, + hlsPlaylist: { videoFilename }, + outputPath, + + inputFileMutexReleaser: () => {}, + + resolution: payload.output.resolution, + fps: payload.output.fps + }) + + const successBody: VODHLSTranscodingSuccess = { + resolutionPlaylistFile: outputPath, + videoFile: videoPath + } + + await server.runnerJobs.success({ + jobToken: job.jobToken, + jobUUID: job.uuid, + runnerToken, + payload: successBody + }) + } finally { + await remove(inputPath) + await remove(outputPath) + await remove(videoPath) } - - await server.runnerJobs.success({ - jobToken: job.jobToken, - jobUUID: job.uuid, - runnerToken, - payload: successBody - }) - - await remove(outputPath) - await remove(videoPath) } export async function processAudioMergeTranscoding (options: ProcessOptions) { -- cgit v1.2.3