aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/server/process
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-08-17 08:59:21 +0200
committerChocobozzz <me@florianbigard.com>2023-08-17 08:59:21 +0200
commitc380e3928517eb5311b38cf257816642617d7a33 (patch)
tree2ea9b70ebca16b5d109bcce98fe7f944dad89319 /packages/peertube-runner/server/process
parenta8ca6190fb462bf6eb5685cfc1d8ae444164a487 (diff)
parent3a4992633ee62d5edfbb484d9c6bcb3cf158489d (diff)
downloadPeerTube-c380e3928517eb5311b38cf257816642617d7a33.tar.gz
PeerTube-c380e3928517eb5311b38cf257816642617d7a33.tar.zst
PeerTube-c380e3928517eb5311b38cf257816642617d7a33.zip
Merge branch 'feature/esm-and-nx' into develop
Diffstat (limited to 'packages/peertube-runner/server/process')
-rw-r--r--packages/peertube-runner/server/process/index.ts2
-rw-r--r--packages/peertube-runner/server/process/process.ts34
-rw-r--r--packages/peertube-runner/server/process/shared/common.ts106
-rw-r--r--packages/peertube-runner/server/process/shared/index.ts3
-rw-r--r--packages/peertube-runner/server/process/shared/process-live.ts338
-rw-r--r--packages/peertube-runner/server/process/shared/process-studio.ts165
-rw-r--r--packages/peertube-runner/server/process/shared/process-vod.ts201
-rw-r--r--packages/peertube-runner/server/process/shared/transcoding-logger.ts10
8 files changed, 0 insertions, 859 deletions
diff --git a/packages/peertube-runner/server/process/index.ts b/packages/peertube-runner/server/process/index.ts
deleted file mode 100644
index 6caedbdaf..000000000
--- a/packages/peertube-runner/server/process/index.ts
+++ /dev/null
@@ -1,2 +0,0 @@
1export * from './shared'
2export * from './process'
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts
deleted file mode 100644
index 1caafda8c..000000000
--- a/packages/peertube-runner/server/process/process.ts
+++ /dev/null
@@ -1,34 +0,0 @@
1import { logger } from 'packages/peertube-runner/shared/logger'
2import {
3 RunnerJobLiveRTMPHLSTranscodingPayload,
4 RunnerJobStudioTranscodingPayload,
5 RunnerJobVODAudioMergeTranscodingPayload,
6 RunnerJobVODHLSTranscodingPayload,
7 RunnerJobVODWebVideoTranscodingPayload
8} from '@shared/models'
9import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared'
10import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live'
11import { processStudioTranscoding } from './shared/process-studio'
12
13export async function processJob (options: ProcessOptions) {
14 const { server, job } = options
15
16 logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload })
17
18 if (job.type === 'vod-audio-merge-transcoding') {
19 await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>)
20 } else if (job.type === 'vod-web-video-transcoding') {
21 await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>)
22 } else if (job.type === 'vod-hls-transcoding') {
23 await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>)
24 } else if (job.type === 'live-rtmp-hls-transcoding') {
25 await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process()
26 } else if (job.type === 'video-studio-transcoding') {
27 await processStudioTranscoding(options as ProcessOptions<RunnerJobStudioTranscodingPayload>)
28 } else {
29 logger.error(`Unknown job ${job.type} to process`)
30 return
31 }
32
33 logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`)
34}
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts
deleted file mode 100644
index a9b37bbc4..000000000
--- a/packages/peertube-runner/server/process/shared/common.ts
+++ /dev/null
@@ -1,106 +0,0 @@
1import { remove } from 'fs-extra'
2import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared'
3import { join } from 'path'
4import { buildUUID } from '@shared/extra-utils'
5import { FFmpegEdition, FFmpegLive, FFmpegVOD, getDefaultAvailableEncoders, getDefaultEncodersToTry } from '@shared/ffmpeg'
6import { RunnerJob, RunnerJobPayload } from '@shared/models'
7import { PeerTubeServer } from '@shared/server-commands'
8import { getTranscodingLogger } from './transcoding-logger'
9
10export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string }
11
12export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = {
13 server: PeerTubeServer
14 job: JobWithToken<T>
15 runnerToken: string
16}
17
18export async function downloadInputFile (options: {
19 url: string
20 job: JobWithToken
21 runnerToken: string
22}) {
23 const { url, job, runnerToken } = options
24 const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
25
26 try {
27 await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination })
28 } catch (err) {
29 remove(destination)
30 .catch(err => logger.error({ err }, `Cannot remove ${destination}`))
31
32 throw err
33 }
34
35 return destination
36}
37
38export function scheduleTranscodingProgress (options: {
39 server: PeerTubeServer
40 runnerToken: string
41 job: JobWithToken
42 progressGetter: () => number
43}) {
44 const { job, server, progressGetter, runnerToken } = options
45
46 const updateInterval = ConfigManager.Instance.isTestInstance()
47 ? 500
48 : 60000
49
50 const update = () => {
51 server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
52 .catch(err => logger.error({ err }, 'Cannot send job progress'))
53 }
54
55 const interval = setInterval(() => {
56 update()
57 }, updateInterval)
58
59 update()
60
61 return interval
62}
63
64// ---------------------------------------------------------------------------
65
66export function buildFFmpegVOD (options: {
67 onJobProgress: (progress: number) => void
68}) {
69 const { onJobProgress } = options
70
71 return new FFmpegVOD({
72 ...getCommonFFmpegOptions(),
73
74 updateJobProgress: arg => {
75 const progress = arg < 0 || arg > 100
76 ? undefined
77 : arg
78
79 onJobProgress(progress)
80 }
81 })
82}
83
84export function buildFFmpegLive () {
85 return new FFmpegLive(getCommonFFmpegOptions())
86}
87
88export function buildFFmpegEdition () {
89 return new FFmpegEdition(getCommonFFmpegOptions())
90}
91
92function getCommonFFmpegOptions () {
93 const config = ConfigManager.Instance.getConfig()
94
95 return {
96 niceness: config.ffmpeg.nice,
97 threads: config.ffmpeg.threads,
98 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
99 profile: 'default',
100 availableEncoders: {
101 available: getDefaultAvailableEncoders(),
102 encodersToTry: getDefaultEncodersToTry()
103 },
104 logger: getTranscodingLogger()
105 }
106}
diff --git a/packages/peertube-runner/server/process/shared/index.ts b/packages/peertube-runner/server/process/shared/index.ts
deleted file mode 100644
index 556c51365..000000000
--- a/packages/peertube-runner/server/process/shared/index.ts
+++ /dev/null
@@ -1,3 +0,0 @@
1export * from './common'
2export * from './process-vod'
3export * from './transcoding-logger'
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts
deleted file mode 100644
index e1fc0e34e..000000000
--- a/packages/peertube-runner/server/process/shared/process-live.ts
+++ /dev/null
@@ -1,338 +0,0 @@
1import { FSWatcher, watch } from 'chokidar'
2import { FfmpegCommand } from 'fluent-ffmpeg'
3import { ensureDir, remove } from 'fs-extra'
4import { logger } from 'packages/peertube-runner/shared'
5import { basename, join } from 'path'
6import { wait } from '@shared/core-utils'
7import { buildUUID } from '@shared/extra-utils'
8import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 ServerErrorCode
15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegLive, ProcessOptions } from './common'
18
19export class ProcessLiveRTMPHLSTranscoding {
20
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
23
24 // Playlist name -> chunks
25 private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
26
27 private readonly playlistsCreated = new Set<string>()
28 private allPlaylistsCreated = false
29
30 private ffmpegCommand: FfmpegCommand
31
32 private ended = false
33 private errored = false
34
35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
37
38 logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
39 }
40
41 process () {
42 const job = this.options.job
43 const payload = job.payload
44
45 return new Promise<void>(async (res, rej) => {
46 try {
47 await ensureDir(this.outputPath)
48
49 logger.info(`Probing ${payload.input.rtmpUrl}`)
50 const probe = await ffprobePromise(payload.input.rtmpUrl)
51 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
52
53 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
54 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
55 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
56
57 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
58 this.fsWatchers.push(m3u8Watcher)
59
60 const tsWatcher = watch(this.outputPath + '/*.ts')
61 this.fsWatchers.push(tsWatcher)
62
63 m3u8Watcher.on('change', p => {
64 logger.debug(`${p} m3u8 playlist changed`)
65 })
66
67 m3u8Watcher.on('add', p => {
68 this.playlistsCreated.add(p)
69
70 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
71 this.allPlaylistsCreated = true
72 logger.info('All m3u8 playlists are created.')
73 }
74 })
75
76 tsWatcher.on('add', async p => {
77 try {
78 await this.sendPendingChunks()
79 } catch (err) {
80 this.onUpdateError({ err, rej, res })
81 }
82
83 const playlistName = this.getPlaylistIdFromTS(p)
84
85 const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
86 pendingChunks.push(p)
87
88 this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
89 })
90
91 tsWatcher.on('unlink', p => {
92 this.sendDeletedChunkUpdate(p)
93 .catch(err => this.onUpdateError({ err, rej, res }))
94 })
95
96 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
97 inputUrl: payload.input.rtmpUrl,
98
99 outPath: this.outputPath,
100 masterPlaylistName: 'master.m3u8',
101
102 segmentListSize: payload.output.segmentListSize,
103 segmentDuration: payload.output.segmentDuration,
104
105 toTranscode: payload.output.toTranscode,
106
107 bitrate,
108 ratio,
109
110 hasAudio
111 })
112
113 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
114
115 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
116 this.onFFmpegError({ err, stdout, stderr })
117
118 res()
119 })
120
121 this.ffmpegCommand.on('end', () => {
122 this.onFFmpegEnded()
123 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
124
125 res()
126 })
127
128 this.ffmpegCommand.run()
129 } catch (err) {
130 rej(err)
131 }
132 })
133 }
134
135 // ---------------------------------------------------------------------------
136
137 private onUpdateError (options: {
138 err: Error
139 res: () => void
140 rej: (reason?: any) => void
141 }) {
142 const { err, res, rej } = options
143
144 if (this.errored) return
145 if (this.ended) return
146
147 this.errored = true
148
149 this.ffmpegCommand.kill('SIGINT')
150
151 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
152 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
153 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
154
155 res()
156 } else {
157 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
158
159 this.sendError(err)
160 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
161
162 rej(err)
163 }
164
165 this.cleanup()
166 }
167
168 // ---------------------------------------------------------------------------
169
170 private onFFmpegError (options: {
171 err: any
172 stdout: string
173 stderr: string
174 }) {
175 const { err, stdout, stderr } = options
176
177 // Don't care that we killed the ffmpeg process
178 if (err?.message?.includes('Exiting normally')) return
179 if (this.errored) return
180 if (this.ended) return
181
182 this.errored = true
183
184 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
185
186 this.sendError(err)
187 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
188
189 this.cleanup()
190 }
191
192 private async sendError (err: Error) {
193 await this.options.server.runnerJobs.error({
194 jobToken: this.options.job.jobToken,
195 jobUUID: this.options.job.uuid,
196 runnerToken: this.options.runnerToken,
197 message: err.message
198 })
199 }
200
201 // ---------------------------------------------------------------------------
202
203 private async onFFmpegEnded () {
204 if (this.ended) return
205
206 this.ended = true
207 logger.info('FFmpeg ended, sending success to server')
208
209 // Wait last ffmpeg chunks generation
210 await wait(1500)
211
212 this.sendSuccess()
213 .catch(err => logger.error({ err }, 'Cannot send success'))
214
215 this.cleanup()
216 }
217
218 private async sendSuccess () {
219 const successBody: LiveRTMPHLSTranscodingSuccess = {}
220
221 await this.options.server.runnerJobs.success({
222 jobToken: this.options.job.jobToken,
223 jobUUID: this.options.job.uuid,
224 runnerToken: this.options.runnerToken,
225 payload: successBody
226 })
227 }
228
229 // ---------------------------------------------------------------------------
230
231 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
232 if (this.ended) return Promise.resolve()
233
234 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
235
236 const videoChunkFilename = basename(deletedChunk)
237
238 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
239 type: 'remove-chunk',
240 videoChunkFilename
241 }
242
243 if (this.allPlaylistsCreated) {
244 const playlistName = this.getPlaylistName(videoChunkFilename)
245
246 payload = {
247 ...payload,
248 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
249 resolutionPlaylistFilename: playlistName,
250 resolutionPlaylistFile: join(this.outputPath, playlistName)
251 }
252 }
253
254 return this.updateWithRetry(payload)
255 }
256
257 private async sendPendingChunks (): Promise<any> {
258 if (this.ended) return Promise.resolve()
259
260 const promises: Promise<any>[] = []
261
262 for (const playlist of this.pendingChunksPerPlaylist.keys()) {
263 for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
264 logger.debug(`Sending added live chunk ${chunk} update`)
265
266 const videoChunkFilename = basename(chunk)
267
268 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
269 type: 'add-chunk',
270 videoChunkFilename,
271 videoChunkFile: chunk
272 }
273
274 if (this.allPlaylistsCreated) {
275 const playlistName = this.getPlaylistName(videoChunkFilename)
276
277 payload = {
278 ...payload,
279 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
280 resolutionPlaylistFilename: playlistName,
281 resolutionPlaylistFile: join(this.outputPath, playlistName)
282 }
283 }
284
285 promises.push(this.updateWithRetry(payload))
286 }
287
288 this.pendingChunksPerPlaylist.set(playlist, [])
289 }
290
291 await Promise.all(promises)
292 }
293
294 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
295 if (this.ended || this.errored) return
296
297 try {
298 await this.options.server.runnerJobs.update({
299 jobToken: this.options.job.jobToken,
300 jobUUID: this.options.job.uuid,
301 runnerToken: this.options.runnerToken,
302 payload
303 })
304 } catch (err) {
305 if (currentTry >= 3) throw err
306 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
307
308 logger.warn({ err }, 'Will retry update after error')
309 await wait(250)
310
311 return this.updateWithRetry(payload, currentTry + 1)
312 }
313 }
314
315 private getPlaylistName (videoChunkFilename: string) {
316 return `${videoChunkFilename.split('-')[0]}.m3u8`
317 }
318
319 private getPlaylistIdFromTS (segmentPath: string) {
320 const playlistIdMatcher = /^([\d+])-/
321
322 return basename(segmentPath).match(playlistIdMatcher)[1]
323 }
324
325 // ---------------------------------------------------------------------------
326
327 private cleanup () {
328 logger.debug(`Cleaning up job ${this.options.job.uuid}`)
329
330 for (const fsWatcher of this.fsWatchers) {
331 fsWatcher.close()
332 .catch(err => logger.error({ err }, 'Cannot close watcher'))
333 }
334
335 remove(this.outputPath)
336 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
337 }
338}
diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts
deleted file mode 100644
index 7bb209e80..000000000
--- a/packages/peertube-runner/server/process/shared/process-studio.ts
+++ /dev/null
@@ -1,165 +0,0 @@
1import { remove } from 'fs-extra'
2import { logger } from 'packages/peertube-runner/shared'
3import { join } from 'path'
4import { pick } from '@shared/core-utils'
5import { buildUUID } from '@shared/extra-utils'
6import {
7 RunnerJobStudioTranscodingPayload,
8 VideoStudioTask,
9 VideoStudioTaskCutPayload,
10 VideoStudioTaskIntroPayload,
11 VideoStudioTaskOutroPayload,
12 VideoStudioTaskPayload,
13 VideoStudioTaskWatermarkPayload,
14 VideoStudioTranscodingSuccess
15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common'
18
19export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) {
20 const { server, job, runnerToken } = options
21 const payload = job.payload
22
23 let inputPath: string
24 let outputPath: string
25 let tmpInputFilePath: string
26
27 let tasksProgress = 0
28
29 const updateProgressInterval = scheduleTranscodingProgress({
30 job,
31 server,
32 runnerToken,
33 progressGetter: () => tasksProgress
34 })
35
36 try {
37 logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`)
38
39 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
40 tmpInputFilePath = inputPath
41
42 logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`)
43
44 for (const task of payload.tasks) {
45 const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
46 outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)
47
48 await processTask({
49 inputPath: tmpInputFilePath,
50 outputPath,
51 task,
52 job,
53 runnerToken
54 })
55
56 if (tmpInputFilePath) await remove(tmpInputFilePath)
57
58 // For the next iteration
59 tmpInputFilePath = outputPath
60
61 tasksProgress += Math.floor(100 / payload.tasks.length)
62 }
63
64 const successBody: VideoStudioTranscodingSuccess = {
65 videoFile: outputPath
66 }
67
68 await server.runnerJobs.success({
69 jobToken: job.jobToken,
70 jobUUID: job.uuid,
71 runnerToken,
72 payload: successBody
73 })
74 } finally {
75 if (tmpInputFilePath) await remove(tmpInputFilePath)
76 if (outputPath) await remove(outputPath)
77 if (updateProgressInterval) clearInterval(updateProgressInterval)
78 }
79}
80
81// ---------------------------------------------------------------------------
82// Private
83// ---------------------------------------------------------------------------
84
85type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = {
86 inputPath: string
87 outputPath: string
88 task: T
89 runnerToken: string
90 job: JobWithToken
91}
92
93const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = {
94 'add-intro': processAddIntroOutro,
95 'add-outro': processAddIntroOutro,
96 'cut': processCut,
97 'add-watermark': processAddWatermark
98}
99
100async function processTask (options: TaskProcessorOptions) {
101 const { task } = options
102
103 const processor = taskProcessors[options.task.name]
104 if (!process) throw new Error('Unknown task ' + task.name)
105
106 return processor(options)
107}
108
109async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
110 const { inputPath, task, runnerToken, job } = options
111
112 logger.debug('Adding intro/outro to ' + inputPath)
113
114 const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
115
116 try {
117 await buildFFmpegEdition().addIntroOutro({
118 ...pick(options, [ 'inputPath', 'outputPath' ]),
119
120 introOutroPath,
121 type: task.name === 'add-intro'
122 ? 'intro'
123 : 'outro'
124 })
125 } finally {
126 await remove(introOutroPath)
127 }
128}
129
130function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
131 const { inputPath, task } = options
132
133 logger.debug(`Cutting ${inputPath}`)
134
135 return buildFFmpegEdition().cutVideo({
136 ...pick(options, [ 'inputPath', 'outputPath' ]),
137
138 start: task.options.start,
139 end: task.options.end
140 })
141}
142
143async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
144 const { inputPath, task, runnerToken, job } = options
145
146 logger.debug('Adding watermark to ' + inputPath)
147
148 const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
149
150 try {
151 await buildFFmpegEdition().addWatermark({
152 ...pick(options, [ 'inputPath', 'outputPath' ]),
153
154 watermarkPath,
155
156 videoFilters: {
157 watermarkSizeRatio: task.options.watermarkSizeRatio,
158 horitonzalMarginRatio: task.options.horitonzalMarginRatio,
159 verticalMarginRatio: task.options.verticalMarginRatio
160 }
161 })
162 } finally {
163 await remove(watermarkPath)
164 }
165}
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts
deleted file mode 100644
index f7c076b27..000000000
--- a/packages/peertube-runner/server/process/shared/process-vod.ts
+++ /dev/null
@@ -1,201 +0,0 @@
1import { remove } from 'fs-extra'
2import { logger } from 'packages/peertube-runner/shared'
3import { join } from 'path'
4import { buildUUID } from '@shared/extra-utils'
5import {
6 RunnerJobVODAudioMergeTranscodingPayload,
7 RunnerJobVODHLSTranscodingPayload,
8 RunnerJobVODWebVideoTranscodingPayload,
9 VODAudioMergeTranscodingSuccess,
10 VODHLSTranscodingSuccess,
11 VODWebVideoTranscodingSuccess
12} from '@shared/models'
13import { ConfigManager } from '../../../shared/config-manager'
14import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common'
15
16export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) {
17 const { server, job, runnerToken } = options
18
19 const payload = job.payload
20
21 let ffmpegProgress: number
22 let inputPath: string
23
24 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
25
26 const updateProgressInterval = scheduleTranscodingProgress({
27 job,
28 server,
29 runnerToken,
30 progressGetter: () => ffmpegProgress
31 })
32
33 try {
34 logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`)
35
36 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
37
38 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`)
39
40 const ffmpegVod = buildFFmpegVOD({
41 onJobProgress: progress => { ffmpegProgress = progress }
42 })
43
44 await ffmpegVod.transcode({
45 type: 'video',
46
47 inputPath,
48
49 outputPath,
50
51 inputFileMutexReleaser: () => {},
52
53 resolution: payload.output.resolution,
54 fps: payload.output.fps
55 })
56
57 const successBody: VODWebVideoTranscodingSuccess = {
58 videoFile: outputPath
59 }
60
61 await server.runnerJobs.success({
62 jobToken: job.jobToken,
63 jobUUID: job.uuid,
64 runnerToken,
65 payload: successBody
66 })
67 } finally {
68 if (inputPath) await remove(inputPath)
69 if (outputPath) await remove(outputPath)
70 if (updateProgressInterval) clearInterval(updateProgressInterval)
71 }
72}
73
74export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) {
75 const { server, job, runnerToken } = options
76 const payload = job.payload
77
78 let ffmpegProgress: number
79 let inputPath: string
80
81 const uuid = buildUUID()
82 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`)
83 const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4`
84 const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename))
85
86 const updateProgressInterval = scheduleTranscodingProgress({
87 job,
88 server,
89 runnerToken,
90 progressGetter: () => ffmpegProgress
91 })
92
93 try {
94 logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`)
95
96 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
97
98 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
99
100 const ffmpegVod = buildFFmpegVOD({
101 onJobProgress: progress => { ffmpegProgress = progress }
102 })
103
104 await ffmpegVod.transcode({
105 type: 'hls',
106 copyCodecs: false,
107 inputPath,
108 hlsPlaylist: { videoFilename },
109 outputPath,
110
111 inputFileMutexReleaser: () => {},
112
113 resolution: payload.output.resolution,
114 fps: payload.output.fps
115 })
116
117 const successBody: VODHLSTranscodingSuccess = {
118 resolutionPlaylistFile: outputPath,
119 videoFile: videoPath
120 }
121
122 await server.runnerJobs.success({
123 jobToken: job.jobToken,
124 jobUUID: job.uuid,
125 runnerToken,
126 payload: successBody
127 })
128 } finally {
129 if (inputPath) await remove(inputPath)
130 if (outputPath) await remove(outputPath)
131 if (videoPath) await remove(videoPath)
132 if (updateProgressInterval) clearInterval(updateProgressInterval)
133 }
134}
135
136export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) {
137 const { server, job, runnerToken } = options
138 const payload = job.payload
139
140 let ffmpegProgress: number
141 let audioPath: string
142 let inputPath: string
143
144 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
145
146 const updateProgressInterval = scheduleTranscodingProgress({
147 job,
148 server,
149 runnerToken,
150 progressGetter: () => ffmpegProgress
151 })
152
153 try {
154 logger.info(
155 `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
156 `for audio merge transcoding job ${job.jobToken}`
157 )
158
159 audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
160 inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
161
162 logger.info(
163 `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
164 `for job ${job.jobToken}. Running audio merge transcoding.`
165 )
166
167 const ffmpegVod = buildFFmpegVOD({
168 onJobProgress: progress => { ffmpegProgress = progress }
169 })
170
171 await ffmpegVod.transcode({
172 type: 'merge-audio',
173
174 audioPath,
175 inputPath,
176
177 outputPath,
178
179 inputFileMutexReleaser: () => {},
180
181 resolution: payload.output.resolution,
182 fps: payload.output.fps
183 })
184
185 const successBody: VODAudioMergeTranscodingSuccess = {
186 videoFile: outputPath
187 }
188
189 await server.runnerJobs.success({
190 jobToken: job.jobToken,
191 jobUUID: job.uuid,
192 runnerToken,
193 payload: successBody
194 })
195 } finally {
196 if (audioPath) await remove(audioPath)
197 if (inputPath) await remove(inputPath)
198 if (outputPath) await remove(outputPath)
199 if (updateProgressInterval) clearInterval(updateProgressInterval)
200 }
201}
diff --git a/packages/peertube-runner/server/process/shared/transcoding-logger.ts b/packages/peertube-runner/server/process/shared/transcoding-logger.ts
deleted file mode 100644
index d0f928914..000000000
--- a/packages/peertube-runner/server/process/shared/transcoding-logger.ts
+++ /dev/null
@@ -1,10 +0,0 @@
1import { logger } from 'packages/peertube-runner/shared/logger'
2
3export function getTranscodingLogger () {
4 return {
5 info: logger.info.bind(logger),
6 debug: logger.debug.bind(logger),
7 warn: logger.warn.bind(logger),
8 error: logger.error.bind(logger)
9 }
10}