diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-04 15:29:34 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 5e47f6ab984a7d00782e4c7030afffa1ba480add (patch) | |
tree | 1ce586b591a8d71acbc301eba29b9a5e6490439e /packages/peertube-runner | |
parent | 6a4905602636afd6650c9e6f4d0fcc2105d91100 (diff) | |
download | PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.gz PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.zst PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.zip |
Support studio transcoding in peertube runner
Diffstat (limited to 'packages/peertube-runner')
7 files changed, 244 insertions, 43 deletions
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts index 39a929c59..ef231cb38 100644 --- a/packages/peertube-runner/server/process/process.ts +++ b/packages/peertube-runner/server/process/process.ts | |||
@@ -1,12 +1,14 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | 1 | import { logger } from 'packages/peertube-runner/shared/logger' |
2 | import { | 2 | import { |
3 | RunnerJobLiveRTMPHLSTranscodingPayload, | 3 | RunnerJobLiveRTMPHLSTranscodingPayload, |
4 | RunnerJobVideoEditionTranscodingPayload, | ||
4 | RunnerJobVODAudioMergeTranscodingPayload, | 5 | RunnerJobVODAudioMergeTranscodingPayload, |
5 | RunnerJobVODHLSTranscodingPayload, | 6 | RunnerJobVODHLSTranscodingPayload, |
6 | RunnerJobVODWebVideoTranscodingPayload | 7 | RunnerJobVODWebVideoTranscodingPayload |
7 | } from '@shared/models' | 8 | } from '@shared/models' |
8 | import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' | 9 | import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' |
9 | import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' | 10 | import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' |
11 | import { processStudioTranscoding } from './shared/process-studio' | ||
10 | 12 | ||
11 | export async function processJob (options: ProcessOptions) { | 13 | export async function processJob (options: ProcessOptions) { |
12 | const { server, job } = options | 14 | const { server, job } = options |
@@ -21,6 +23,8 @@ export async function processJob (options: ProcessOptions) { | |||
21 | await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) | 23 | await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) |
22 | } else if (job.type === 'live-rtmp-hls-transcoding') { | 24 | } else if (job.type === 'live-rtmp-hls-transcoding') { |
23 | await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() | 25 | await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() |
26 | } else if (job.type === 'video-edition-transcoding') { | ||
27 | await processStudioTranscoding(options as ProcessOptions<RunnerJobVideoEditionTranscodingPayload>) | ||
24 | } else { | 28 | } else { |
25 | logger.error(`Unknown job ${job.type} to process`) | 29 | logger.error(`Unknown job ${job.type} to process`) |
26 | return | 30 | return |
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index 9b2c40728..3cac98388 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts | |||
@@ -2,11 +2,12 @@ import { throttle } from 'lodash' | |||
2 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' | 2 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { buildUUID } from '@shared/extra-utils' | 4 | import { buildUUID } from '@shared/extra-utils' |
5 | import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' | 5 | import { FFmpegEdition, FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' |
6 | import { RunnerJob, RunnerJobPayload } from '@shared/models' | 6 | import { RunnerJob, RunnerJobPayload } from '@shared/models' |
7 | import { PeerTubeServer } from '@shared/server-commands' | 7 | import { PeerTubeServer } from '@shared/server-commands' |
8 | import { getTranscodingLogger } from './transcoding-logger' | 8 | import { getTranscodingLogger } from './transcoding-logger' |
9 | import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' | 9 | import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' |
10 | import { remove } from 'fs-extra' | ||
10 | 11 | ||
11 | export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } | 12 | export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } |
12 | 13 | ||
@@ -24,7 +25,14 @@ export async function downloadInputFile (options: { | |||
24 | const { url, job, runnerToken } = options | 25 | const { url, job, runnerToken } = options |
25 | const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | 26 | const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) |
26 | 27 | ||
27 | await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) | 28 | try { |
29 | await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) | ||
30 | } catch (err) { | ||
31 | remove(destination) | ||
32 | .catch(err => logger.error({ err }, `Cannot remove ${destination}`)) | ||
33 | |||
34 | throw err | ||
35 | } | ||
28 | 36 | ||
29 | return destination | 37 | return destination |
30 | } | 38 | } |
@@ -40,6 +48,8 @@ export async function updateTranscodingProgress (options: { | |||
40 | return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) | 48 | return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) |
41 | } | 49 | } |
42 | 50 | ||
51 | // --------------------------------------------------------------------------- | ||
52 | |||
43 | export function buildFFmpegVOD (options: { | 53 | export function buildFFmpegVOD (options: { |
44 | server: PeerTubeServer | 54 | server: PeerTubeServer |
45 | runnerToken: string | 55 | runnerToken: string |
@@ -58,26 +68,25 @@ export function buildFFmpegVOD (options: { | |||
58 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | 68 | .catch(err => logger.error({ err }, 'Cannot send job progress')) |
59 | }, updateInterval, { trailing: false }) | 69 | }, updateInterval, { trailing: false }) |
60 | 70 | ||
61 | const config = ConfigManager.Instance.getConfig() | ||
62 | |||
63 | return new FFmpegVOD({ | 71 | return new FFmpegVOD({ |
64 | niceness: config.ffmpeg.nice, | 72 | ...getCommonFFmpegOptions(), |
65 | threads: config.ffmpeg.threads, | 73 | |
66 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
67 | profile: 'default', | ||
68 | availableEncoders: { | ||
69 | available: getAvailableEncoders(), | ||
70 | encodersToTry: getEncodersToTry() | ||
71 | }, | ||
72 | logger: getTranscodingLogger(), | ||
73 | updateJobProgress | 74 | updateJobProgress |
74 | }) | 75 | }) |
75 | } | 76 | } |
76 | 77 | ||
77 | export function buildFFmpegLive () { | 78 | export function buildFFmpegLive () { |
79 | return new FFmpegLive(getCommonFFmpegOptions()) | ||
80 | } | ||
81 | |||
82 | export function buildFFmpegEdition () { | ||
83 | return new FFmpegEdition(getCommonFFmpegOptions()) | ||
84 | } | ||
85 | |||
86 | function getCommonFFmpegOptions () { | ||
78 | const config = ConfigManager.Instance.getConfig() | 87 | const config = ConfigManager.Instance.getConfig() |
79 | 88 | ||
80 | return new FFmpegLive({ | 89 | return { |
81 | niceness: config.ffmpeg.nice, | 90 | niceness: config.ffmpeg.nice, |
82 | threads: config.ffmpeg.threads, | 91 | threads: config.ffmpeg.threads, |
83 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | 92 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), |
@@ -87,5 +96,5 @@ export function buildFFmpegLive () { | |||
87 | encodersToTry: getEncodersToTry() | 96 | encodersToTry: getEncodersToTry() |
88 | }, | 97 | }, |
89 | logger: getTranscodingLogger() | 98 | logger: getTranscodingLogger() |
90 | }) | 99 | } |
91 | } | 100 | } |
diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts new file mode 100644 index 000000000..f8262096e --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-studio.ts | |||
@@ -0,0 +1,138 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { pick } from 'lodash' | ||
3 | import { logger } from 'packages/peertube-runner/shared' | ||
4 | import { extname, join } from 'path' | ||
5 | import { buildUUID } from '@shared/extra-utils' | ||
6 | import { | ||
7 | RunnerJobVideoEditionTranscodingPayload, | ||
8 | VideoEditionTranscodingSuccess, | ||
9 | VideoStudioTask, | ||
10 | VideoStudioTaskCutPayload, | ||
11 | VideoStudioTaskIntroPayload, | ||
12 | VideoStudioTaskOutroPayload, | ||
13 | VideoStudioTaskPayload, | ||
14 | VideoStudioTaskWatermarkPayload | ||
15 | } from '@shared/models' | ||
16 | import { ConfigManager } from '../../../shared/config-manager' | ||
17 | import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' | ||
18 | |||
19 | export async function processStudioTranscoding (options: ProcessOptions<RunnerJobVideoEditionTranscodingPayload>) { | ||
20 | const { server, job, runnerToken } = options | ||
21 | const payload = job.payload | ||
22 | |||
23 | let outputPath: string | ||
24 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
25 | let tmpInputFilePath = inputPath | ||
26 | |||
27 | try { | ||
28 | for (const task of payload.tasks) { | ||
29 | const outputFilename = 'output-edition-' + buildUUID() + '.mp4' | ||
30 | outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) | ||
31 | |||
32 | await processTask({ | ||
33 | inputPath: tmpInputFilePath, | ||
34 | outputPath, | ||
35 | task, | ||
36 | job, | ||
37 | runnerToken | ||
38 | }) | ||
39 | |||
40 | if (tmpInputFilePath) await remove(tmpInputFilePath) | ||
41 | |||
42 | // For the next iteration | ||
43 | tmpInputFilePath = outputPath | ||
44 | } | ||
45 | |||
46 | const successBody: VideoEditionTranscodingSuccess = { | ||
47 | videoFile: outputPath | ||
48 | } | ||
49 | |||
50 | await server.runnerJobs.success({ | ||
51 | jobToken: job.jobToken, | ||
52 | jobUUID: job.uuid, | ||
53 | runnerToken, | ||
54 | payload: successBody | ||
55 | }) | ||
56 | } finally { | ||
57 | await remove(tmpInputFilePath) | ||
58 | await remove(outputPath) | ||
59 | } | ||
60 | } | ||
61 | |||
62 | // --------------------------------------------------------------------------- | ||
63 | // Private | ||
64 | // --------------------------------------------------------------------------- | ||
65 | |||
66 | type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = { | ||
67 | inputPath: string | ||
68 | outputPath: string | ||
69 | task: T | ||
70 | runnerToken: string | ||
71 | job: JobWithToken | ||
72 | } | ||
73 | |||
74 | const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { | ||
75 | 'add-intro': processAddIntroOutro, | ||
76 | 'add-outro': processAddIntroOutro, | ||
77 | 'cut': processCut, | ||
78 | 'add-watermark': processAddWatermark | ||
79 | } | ||
80 | |||
81 | async function processTask (options: TaskProcessorOptions) { | ||
82 | const { task } = options | ||
83 | |||
84 | const processor = taskProcessors[options.task.name] | ||
85 | if (!process) throw new Error('Unknown task ' + task.name) | ||
86 | |||
87 | return processor(options) | ||
88 | } | ||
89 | |||
90 | async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { | ||
91 | const { inputPath, task, runnerToken, job } = options | ||
92 | |||
93 | logger.debug('Adding intro/outro to ' + inputPath) | ||
94 | |||
95 | const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) | ||
96 | |||
97 | return buildFFmpegEdition().addIntroOutro({ | ||
98 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
99 | |||
100 | introOutroPath, | ||
101 | type: task.name === 'add-intro' | ||
102 | ? 'intro' | ||
103 | : 'outro' | ||
104 | }) | ||
105 | } | ||
106 | |||
107 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { | ||
108 | const { inputPath, task } = options | ||
109 | |||
110 | logger.debug(`Cutting ${inputPath}`) | ||
111 | |||
112 | return buildFFmpegEdition().cutVideo({ | ||
113 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
114 | |||
115 | start: task.options.start, | ||
116 | end: task.options.end | ||
117 | }) | ||
118 | } | ||
119 | |||
120 | async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { | ||
121 | const { inputPath, task, runnerToken, job } = options | ||
122 | |||
123 | logger.debug('Adding watermark to ' + inputPath) | ||
124 | |||
125 | const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) | ||
126 | |||
127 | return buildFFmpegEdition().addWatermark({ | ||
128 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
129 | |||
130 | watermarkPath, | ||
131 | |||
132 | videoFilters: { | ||
133 | watermarkSizeRatio: task.options.watermarkSizeRatio, | ||
134 | horitonzalMarginRatio: task.options.horitonzalMarginRatio, | ||
135 | verticalMarginRatio: task.options.verticalMarginRatio | ||
136 | } | ||
137 | }) | ||
138 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts index aae61e9c5..d84ece3cb 100644 --- a/packages/peertube-runner/server/process/shared/process-vod.ts +++ b/packages/peertube-runner/server/process/shared/process-vod.ts | |||
@@ -62,33 +62,36 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO | |||
62 | 62 | ||
63 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | 63 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) |
64 | 64 | ||
65 | await ffmpegVod.transcode({ | 65 | try { |
66 | type: 'hls', | 66 | await ffmpegVod.transcode({ |
67 | copyCodecs: false, | 67 | type: 'hls', |
68 | inputPath, | 68 | copyCodecs: false, |
69 | hlsPlaylist: { videoFilename }, | 69 | inputPath, |
70 | outputPath, | 70 | hlsPlaylist: { videoFilename }, |
71 | 71 | outputPath, | |
72 | inputFileMutexReleaser: () => {}, | 72 | |
73 | 73 | inputFileMutexReleaser: () => {}, | |
74 | resolution: payload.output.resolution, | 74 | |
75 | fps: payload.output.fps | 75 | resolution: payload.output.resolution, |
76 | }) | 76 | fps: payload.output.fps |
77 | 77 | }) | |
78 | const successBody: VODHLSTranscodingSuccess = { | 78 | |
79 | resolutionPlaylistFile: outputPath, | 79 | const successBody: VODHLSTranscodingSuccess = { |
80 | videoFile: videoPath | 80 | resolutionPlaylistFile: outputPath, |
81 | videoFile: videoPath | ||
82 | } | ||
83 | |||
84 | await server.runnerJobs.success({ | ||
85 | jobToken: job.jobToken, | ||
86 | jobUUID: job.uuid, | ||
87 | runnerToken, | ||
88 | payload: successBody | ||
89 | }) | ||
90 | } finally { | ||
91 | await remove(inputPath) | ||
92 | await remove(outputPath) | ||
93 | await remove(videoPath) | ||
81 | } | 94 | } |
82 | |||
83 | await server.runnerJobs.success({ | ||
84 | jobToken: job.jobToken, | ||
85 | jobUUID: job.uuid, | ||
86 | runnerToken, | ||
87 | payload: successBody | ||
88 | }) | ||
89 | |||
90 | await remove(outputPath) | ||
91 | await remove(videoPath) | ||
92 | } | 95 | } |
93 | 96 | ||
94 | export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { | 97 | export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { |
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index e851dfc7c..8eff4bd2f 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts | |||
@@ -8,6 +8,7 @@ import { ConfigManager } from '../shared' | |||
8 | import { IPCServer } from '../shared/ipc' | 8 | import { IPCServer } from '../shared/ipc' |
9 | import { logger } from '../shared/logger' | 9 | import { logger } from '../shared/logger' |
10 | import { JobWithToken, processJob } from './process' | 10 | import { JobWithToken, processJob } from './process' |
11 | import { isJobSupported } from './shared' | ||
11 | 12 | ||
12 | type PeerTubeServer = PeerTubeServerCommand & { | 13 | type PeerTubeServer = PeerTubeServerCommand & { |
13 | runnerToken: string | 14 | runnerToken: string |
@@ -199,12 +200,14 @@ export class RunnerServer { | |||
199 | 200 | ||
200 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | 201 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) |
201 | 202 | ||
202 | if (availableJobs.length === 0) { | 203 | const filtered = availableJobs.filter(j => isJobSupported(j)) |
204 | |||
205 | if (filtered.length === 0) { | ||
203 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | 206 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) |
204 | return undefined | 207 | return undefined |
205 | } | 208 | } |
206 | 209 | ||
207 | return availableJobs[0] | 210 | return filtered[0] |
208 | } | 211 | } |
209 | 212 | ||
210 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | 213 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { |
diff --git a/packages/peertube-runner/server/shared/index.ts b/packages/peertube-runner/server/shared/index.ts new file mode 100644 index 000000000..5c86bafc0 --- /dev/null +++ b/packages/peertube-runner/server/shared/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './supported-job' | |||
diff --git a/packages/peertube-runner/server/shared/supported-job.ts b/packages/peertube-runner/server/shared/supported-job.ts new file mode 100644 index 000000000..87d5a39cc --- /dev/null +++ b/packages/peertube-runner/server/shared/supported-job.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import { | ||
2 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
3 | RunnerJobPayload, | ||
4 | RunnerJobType, | ||
5 | RunnerJobVideoEditionTranscodingPayload, | ||
6 | RunnerJobVODAudioMergeTranscodingPayload, | ||
7 | RunnerJobVODHLSTranscodingPayload, | ||
8 | RunnerJobVODWebVideoTranscodingPayload, | ||
9 | VideoStudioTaskPayload | ||
10 | } from '@shared/models' | ||
11 | |||
12 | const supportedMatrix = { | ||
13 | 'vod-web-video-transcoding': (_payload: RunnerJobVODWebVideoTranscodingPayload) => { | ||
14 | return true | ||
15 | }, | ||
16 | 'vod-hls-transcoding': (_payload: RunnerJobVODHLSTranscodingPayload) => { | ||
17 | return true | ||
18 | }, | ||
19 | 'vod-audio-merge-transcoding': (_payload: RunnerJobVODAudioMergeTranscodingPayload) => { | ||
20 | return true | ||
21 | }, | ||
22 | 'live-rtmp-hls-transcoding': (_payload: RunnerJobLiveRTMPHLSTranscodingPayload) => { | ||
23 | return true | ||
24 | }, | ||
25 | 'video-edition-transcoding': (payload: RunnerJobVideoEditionTranscodingPayload) => { | ||
26 | const tasks = payload?.tasks | ||
27 | const supported = new Set<VideoStudioTaskPayload['name']>([ 'add-intro', 'add-outro', 'add-watermark', 'cut' ]) | ||
28 | |||
29 | if (!Array.isArray(tasks)) return false | ||
30 | |||
31 | return tasks.every(t => t && supported.has(t.name)) | ||
32 | } | ||
33 | } | ||
34 | |||
35 | export function isJobSupported (job: { | ||
36 | type: RunnerJobType | ||
37 | payload: RunnerJobPayload | ||
38 | }) { | ||
39 | const fn = supportedMatrix[job.type] | ||
40 | if (!fn) return false | ||
41 | |||
42 | return fn(job.payload as any) | ||
43 | } | ||