aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-04 15:29:34 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit5e47f6ab984a7d00782e4c7030afffa1ba480add (patch)
tree1ce586b591a8d71acbc301eba29b9a5e6490439e /packages/peertube-runner
parent6a4905602636afd6650c9e6f4d0fcc2105d91100 (diff)
downloadPeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.gz
PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.zst
PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.zip
Support studio transcoding in peertube runner
Diffstat (limited to 'packages/peertube-runner')
-rw-r--r--packages/peertube-runner/server/process/process.ts4
-rw-r--r--packages/peertube-runner/server/process/shared/common.ts39
-rw-r--r--packages/peertube-runner/server/process/shared/process-studio.ts138
-rw-r--r--packages/peertube-runner/server/process/shared/process-vod.ts55
-rw-r--r--packages/peertube-runner/server/server.ts7
-rw-r--r--packages/peertube-runner/server/shared/index.ts1
-rw-r--r--packages/peertube-runner/server/shared/supported-job.ts43
7 files changed, 244 insertions, 43 deletions
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts
index 39a929c59..ef231cb38 100644
--- a/packages/peertube-runner/server/process/process.ts
+++ b/packages/peertube-runner/server/process/process.ts
@@ -1,12 +1,14 @@
1import { logger } from 'packages/peertube-runner/shared/logger' 1import { logger } from 'packages/peertube-runner/shared/logger'
2import { 2import {
3 RunnerJobLiveRTMPHLSTranscodingPayload, 3 RunnerJobLiveRTMPHLSTranscodingPayload,
4 RunnerJobVideoEditionTranscodingPayload,
4 RunnerJobVODAudioMergeTranscodingPayload, 5 RunnerJobVODAudioMergeTranscodingPayload,
5 RunnerJobVODHLSTranscodingPayload, 6 RunnerJobVODHLSTranscodingPayload,
6 RunnerJobVODWebVideoTranscodingPayload 7 RunnerJobVODWebVideoTranscodingPayload
7} from '@shared/models' 8} from '@shared/models'
8import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' 9import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared'
9import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' 10import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live'
11import { processStudioTranscoding } from './shared/process-studio'
10 12
11export async function processJob (options: ProcessOptions) { 13export async function processJob (options: ProcessOptions) {
12 const { server, job } = options 14 const { server, job } = options
@@ -21,6 +23,8 @@ export async function processJob (options: ProcessOptions) {
21 await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) 23 await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>)
22 } else if (job.type === 'live-rtmp-hls-transcoding') { 24 } else if (job.type === 'live-rtmp-hls-transcoding') {
23 await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() 25 await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process()
26 } else if (job.type === 'video-edition-transcoding') {
27 await processStudioTranscoding(options as ProcessOptions<RunnerJobVideoEditionTranscodingPayload>)
24 } else { 28 } else {
25 logger.error(`Unknown job ${job.type} to process`) 29 logger.error(`Unknown job ${job.type} to process`)
26 return 30 return
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts
index 9b2c40728..3cac98388 100644
--- a/packages/peertube-runner/server/process/shared/common.ts
+++ b/packages/peertube-runner/server/process/shared/common.ts
@@ -2,11 +2,12 @@ import { throttle } from 'lodash'
2import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' 2import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared'
3import { join } from 'path' 3import { join } from 'path'
4import { buildUUID } from '@shared/extra-utils' 4import { buildUUID } from '@shared/extra-utils'
5import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' 5import { FFmpegEdition, FFmpegLive, FFmpegVOD } from '@shared/ffmpeg'
6import { RunnerJob, RunnerJobPayload } from '@shared/models' 6import { RunnerJob, RunnerJobPayload } from '@shared/models'
7import { PeerTubeServer } from '@shared/server-commands' 7import { PeerTubeServer } from '@shared/server-commands'
8import { getTranscodingLogger } from './transcoding-logger' 8import { getTranscodingLogger } from './transcoding-logger'
9import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' 9import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles'
10import { remove } from 'fs-extra'
10 11
11export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } 12export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string }
12 13
@@ -24,7 +25,14 @@ export async function downloadInputFile (options: {
24 const { url, job, runnerToken } = options 25 const { url, job, runnerToken } = options
25 const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) 26 const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
26 27
27 await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) 28 try {
29 await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination })
30 } catch (err) {
31 remove(destination)
32 .catch(err => logger.error({ err }, `Cannot remove ${destination}`))
33
34 throw err
35 }
28 36
29 return destination 37 return destination
30} 38}
@@ -40,6 +48,8 @@ export async function updateTranscodingProgress (options: {
40 return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) 48 return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress })
41} 49}
42 50
51// ---------------------------------------------------------------------------
52
43export function buildFFmpegVOD (options: { 53export function buildFFmpegVOD (options: {
44 server: PeerTubeServer 54 server: PeerTubeServer
45 runnerToken: string 55 runnerToken: string
@@ -58,26 +68,25 @@ export function buildFFmpegVOD (options: {
58 .catch(err => logger.error({ err }, 'Cannot send job progress')) 68 .catch(err => logger.error({ err }, 'Cannot send job progress'))
59 }, updateInterval, { trailing: false }) 69 }, updateInterval, { trailing: false })
60 70
61 const config = ConfigManager.Instance.getConfig()
62
63 return new FFmpegVOD({ 71 return new FFmpegVOD({
64 niceness: config.ffmpeg.nice, 72 ...getCommonFFmpegOptions(),
65 threads: config.ffmpeg.threads, 73
66 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
67 profile: 'default',
68 availableEncoders: {
69 available: getAvailableEncoders(),
70 encodersToTry: getEncodersToTry()
71 },
72 logger: getTranscodingLogger(),
73 updateJobProgress 74 updateJobProgress
74 }) 75 })
75} 76}
76 77
77export function buildFFmpegLive () { 78export function buildFFmpegLive () {
79 return new FFmpegLive(getCommonFFmpegOptions())
80}
81
82export function buildFFmpegEdition () {
83 return new FFmpegEdition(getCommonFFmpegOptions())
84}
85
86function getCommonFFmpegOptions () {
78 const config = ConfigManager.Instance.getConfig() 87 const config = ConfigManager.Instance.getConfig()
79 88
80 return new FFmpegLive({ 89 return {
81 niceness: config.ffmpeg.nice, 90 niceness: config.ffmpeg.nice,
82 threads: config.ffmpeg.threads, 91 threads: config.ffmpeg.threads,
83 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), 92 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
@@ -87,5 +96,5 @@ export function buildFFmpegLive () {
87 encodersToTry: getEncodersToTry() 96 encodersToTry: getEncodersToTry()
88 }, 97 },
89 logger: getTranscodingLogger() 98 logger: getTranscodingLogger()
90 }) 99 }
91} 100}
diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts
new file mode 100644
index 000000000..f8262096e
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/process-studio.ts
@@ -0,0 +1,138 @@
1import { remove } from 'fs-extra'
2import { pick } from 'lodash'
3import { logger } from 'packages/peertube-runner/shared'
4import { extname, join } from 'path'
5import { buildUUID } from '@shared/extra-utils'
6import {
7 RunnerJobVideoEditionTranscodingPayload,
8 VideoEditionTranscodingSuccess,
9 VideoStudioTask,
10 VideoStudioTaskCutPayload,
11 VideoStudioTaskIntroPayload,
12 VideoStudioTaskOutroPayload,
13 VideoStudioTaskPayload,
14 VideoStudioTaskWatermarkPayload
15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common'
18
19export async function processStudioTranscoding (options: ProcessOptions<RunnerJobVideoEditionTranscodingPayload>) {
20 const { server, job, runnerToken } = options
21 const payload = job.payload
22
23 let outputPath: string
24 const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
25 let tmpInputFilePath = inputPath
26
27 try {
28 for (const task of payload.tasks) {
29 const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
30 outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)
31
32 await processTask({
33 inputPath: tmpInputFilePath,
34 outputPath,
35 task,
36 job,
37 runnerToken
38 })
39
40 if (tmpInputFilePath) await remove(tmpInputFilePath)
41
42 // For the next iteration
43 tmpInputFilePath = outputPath
44 }
45
46 const successBody: VideoEditionTranscodingSuccess = {
47 videoFile: outputPath
48 }
49
50 await server.runnerJobs.success({
51 jobToken: job.jobToken,
52 jobUUID: job.uuid,
53 runnerToken,
54 payload: successBody
55 })
56 } finally {
57 await remove(tmpInputFilePath)
58 await remove(outputPath)
59 }
60}
61
62// ---------------------------------------------------------------------------
63// Private
64// ---------------------------------------------------------------------------
65
66type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = {
67 inputPath: string
68 outputPath: string
69 task: T
70 runnerToken: string
71 job: JobWithToken
72}
73
74const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = {
75 'add-intro': processAddIntroOutro,
76 'add-outro': processAddIntroOutro,
77 'cut': processCut,
78 'add-watermark': processAddWatermark
79}
80
81async function processTask (options: TaskProcessorOptions) {
82 const { task } = options
83
84 const processor = taskProcessors[options.task.name]
85 if (!process) throw new Error('Unknown task ' + task.name)
86
87 return processor(options)
88}
89
90async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
91 const { inputPath, task, runnerToken, job } = options
92
93 logger.debug('Adding intro/outro to ' + inputPath)
94
95 const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
96
97 return buildFFmpegEdition().addIntroOutro({
98 ...pick(options, [ 'inputPath', 'outputPath' ]),
99
100 introOutroPath,
101 type: task.name === 'add-intro'
102 ? 'intro'
103 : 'outro'
104 })
105}
106
107function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
108 const { inputPath, task } = options
109
110 logger.debug(`Cutting ${inputPath}`)
111
112 return buildFFmpegEdition().cutVideo({
113 ...pick(options, [ 'inputPath', 'outputPath' ]),
114
115 start: task.options.start,
116 end: task.options.end
117 })
118}
119
120async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
121 const { inputPath, task, runnerToken, job } = options
122
123 logger.debug('Adding watermark to ' + inputPath)
124
125 const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
126
127 return buildFFmpegEdition().addWatermark({
128 ...pick(options, [ 'inputPath', 'outputPath' ]),
129
130 watermarkPath,
131
132 videoFilters: {
133 watermarkSizeRatio: task.options.watermarkSizeRatio,
134 horitonzalMarginRatio: task.options.horitonzalMarginRatio,
135 verticalMarginRatio: task.options.verticalMarginRatio
136 }
137 })
138}
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts
index aae61e9c5..d84ece3cb 100644
--- a/packages/peertube-runner/server/process/shared/process-vod.ts
+++ b/packages/peertube-runner/server/process/shared/process-vod.ts
@@ -62,33 +62,36 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
62 62
63 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) 63 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken })
64 64
65 await ffmpegVod.transcode({ 65 try {
66 type: 'hls', 66 await ffmpegVod.transcode({
67 copyCodecs: false, 67 type: 'hls',
68 inputPath, 68 copyCodecs: false,
69 hlsPlaylist: { videoFilename }, 69 inputPath,
70 outputPath, 70 hlsPlaylist: { videoFilename },
71 71 outputPath,
72 inputFileMutexReleaser: () => {}, 72
73 73 inputFileMutexReleaser: () => {},
74 resolution: payload.output.resolution, 74
75 fps: payload.output.fps 75 resolution: payload.output.resolution,
76 }) 76 fps: payload.output.fps
77 77 })
78 const successBody: VODHLSTranscodingSuccess = { 78
79 resolutionPlaylistFile: outputPath, 79 const successBody: VODHLSTranscodingSuccess = {
80 videoFile: videoPath 80 resolutionPlaylistFile: outputPath,
81 videoFile: videoPath
82 }
83
84 await server.runnerJobs.success({
85 jobToken: job.jobToken,
86 jobUUID: job.uuid,
87 runnerToken,
88 payload: successBody
89 })
90 } finally {
91 await remove(inputPath)
92 await remove(outputPath)
93 await remove(videoPath)
81 } 94 }
82
83 await server.runnerJobs.success({
84 jobToken: job.jobToken,
85 jobUUID: job.uuid,
86 runnerToken,
87 payload: successBody
88 })
89
90 await remove(outputPath)
91 await remove(videoPath)
92} 95}
93 96
94export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { 97export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) {
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts
index e851dfc7c..8eff4bd2f 100644
--- a/packages/peertube-runner/server/server.ts
+++ b/packages/peertube-runner/server/server.ts
@@ -8,6 +8,7 @@ import { ConfigManager } from '../shared'
8import { IPCServer } from '../shared/ipc' 8import { IPCServer } from '../shared/ipc'
9import { logger } from '../shared/logger' 9import { logger } from '../shared/logger'
10import { JobWithToken, processJob } from './process' 10import { JobWithToken, processJob } from './process'
11import { isJobSupported } from './shared'
11 12
12type PeerTubeServer = PeerTubeServerCommand & { 13type PeerTubeServer = PeerTubeServerCommand & {
13 runnerToken: string 14 runnerToken: string
@@ -199,12 +200,14 @@ export class RunnerServer {
199 200
200 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) 201 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
201 202
202 if (availableJobs.length === 0) { 203 const filtered = availableJobs.filter(j => isJobSupported(j))
204
205 if (filtered.length === 0) {
203 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) 206 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
204 return undefined 207 return undefined
205 } 208 }
206 209
207 return availableJobs[0] 210 return filtered[0]
208 } 211 }
209 212
210 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { 213 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
diff --git a/packages/peertube-runner/server/shared/index.ts b/packages/peertube-runner/server/shared/index.ts
new file mode 100644
index 000000000..5c86bafc0
--- /dev/null
+++ b/packages/peertube-runner/server/shared/index.ts
@@ -0,0 +1 @@
export * from './supported-job'
diff --git a/packages/peertube-runner/server/shared/supported-job.ts b/packages/peertube-runner/server/shared/supported-job.ts
new file mode 100644
index 000000000..87d5a39cc
--- /dev/null
+++ b/packages/peertube-runner/server/shared/supported-job.ts
@@ -0,0 +1,43 @@
1import {
2 RunnerJobLiveRTMPHLSTranscodingPayload,
3 RunnerJobPayload,
4 RunnerJobType,
5 RunnerJobVideoEditionTranscodingPayload,
6 RunnerJobVODAudioMergeTranscodingPayload,
7 RunnerJobVODHLSTranscodingPayload,
8 RunnerJobVODWebVideoTranscodingPayload,
9 VideoStudioTaskPayload
10} from '@shared/models'
11
12const supportedMatrix = {
13 'vod-web-video-transcoding': (_payload: RunnerJobVODWebVideoTranscodingPayload) => {
14 return true
15 },
16 'vod-hls-transcoding': (_payload: RunnerJobVODHLSTranscodingPayload) => {
17 return true
18 },
19 'vod-audio-merge-transcoding': (_payload: RunnerJobVODAudioMergeTranscodingPayload) => {
20 return true
21 },
22 'live-rtmp-hls-transcoding': (_payload: RunnerJobLiveRTMPHLSTranscodingPayload) => {
23 return true
24 },
25 'video-edition-transcoding': (payload: RunnerJobVideoEditionTranscodingPayload) => {
26 const tasks = payload?.tasks
27 const supported = new Set<VideoStudioTaskPayload['name']>([ 'add-intro', 'add-outro', 'add-watermark', 'cut' ])
28
29 if (!Array.isArray(tasks)) return false
30
31 return tasks.every(t => t && supported.has(t.name))
32 }
33}
34
35export function isJobSupported (job: {
36 type: RunnerJobType
37 payload: RunnerJobPayload
38}) {
39 const fn = supportedMatrix[job.type]
40 if (!fn) return false
41
42 return fn(job.payload as any)
43}