diff options
Diffstat (limited to 'packages/peertube-runner/server/process')
8 files changed, 0 insertions, 859 deletions
diff --git a/packages/peertube-runner/server/process/index.ts b/packages/peertube-runner/server/process/index.ts deleted file mode 100644 index 6caedbdaf..000000000 --- a/packages/peertube-runner/server/process/index.ts +++ /dev/null | |||
@@ -1,2 +0,0 @@ | |||
1 | export * from './shared' | ||
2 | export * from './process' | ||
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts deleted file mode 100644 index 1caafda8c..000000000 --- a/packages/peertube-runner/server/process/process.ts +++ /dev/null | |||
@@ -1,34 +0,0 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | ||
2 | import { | ||
3 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
4 | RunnerJobStudioTranscodingPayload, | ||
5 | RunnerJobVODAudioMergeTranscodingPayload, | ||
6 | RunnerJobVODHLSTranscodingPayload, | ||
7 | RunnerJobVODWebVideoTranscodingPayload | ||
8 | } from '@shared/models' | ||
9 | import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' | ||
10 | import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' | ||
11 | import { processStudioTranscoding } from './shared/process-studio' | ||
12 | |||
13 | export async function processJob (options: ProcessOptions) { | ||
14 | const { server, job } = options | ||
15 | |||
16 | logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload }) | ||
17 | |||
18 | if (job.type === 'vod-audio-merge-transcoding') { | ||
19 | await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) | ||
20 | } else if (job.type === 'vod-web-video-transcoding') { | ||
21 | await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) | ||
22 | } else if (job.type === 'vod-hls-transcoding') { | ||
23 | await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) | ||
24 | } else if (job.type === 'live-rtmp-hls-transcoding') { | ||
25 | await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() | ||
26 | } else if (job.type === 'video-studio-transcoding') { | ||
27 | await processStudioTranscoding(options as ProcessOptions<RunnerJobStudioTranscodingPayload>) | ||
28 | } else { | ||
29 | logger.error(`Unknown job ${job.type} to process`) | ||
30 | return | ||
31 | } | ||
32 | |||
33 | logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`) | ||
34 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts deleted file mode 100644 index a9b37bbc4..000000000 --- a/packages/peertube-runner/server/process/shared/common.ts +++ /dev/null | |||
@@ -1,106 +0,0 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' | ||
3 | import { join } from 'path' | ||
4 | import { buildUUID } from '@shared/extra-utils' | ||
5 | import { FFmpegEdition, FFmpegLive, FFmpegVOD, getDefaultAvailableEncoders, getDefaultEncodersToTry } from '@shared/ffmpeg' | ||
6 | import { RunnerJob, RunnerJobPayload } from '@shared/models' | ||
7 | import { PeerTubeServer } from '@shared/server-commands' | ||
8 | import { getTranscodingLogger } from './transcoding-logger' | ||
9 | |||
10 | export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } | ||
11 | |||
12 | export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = { | ||
13 | server: PeerTubeServer | ||
14 | job: JobWithToken<T> | ||
15 | runnerToken: string | ||
16 | } | ||
17 | |||
18 | export async function downloadInputFile (options: { | ||
19 | url: string | ||
20 | job: JobWithToken | ||
21 | runnerToken: string | ||
22 | }) { | ||
23 | const { url, job, runnerToken } = options | ||
24 | const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
25 | |||
26 | try { | ||
27 | await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) | ||
28 | } catch (err) { | ||
29 | remove(destination) | ||
30 | .catch(err => logger.error({ err }, `Cannot remove ${destination}`)) | ||
31 | |||
32 | throw err | ||
33 | } | ||
34 | |||
35 | return destination | ||
36 | } | ||
37 | |||
38 | export function scheduleTranscodingProgress (options: { | ||
39 | server: PeerTubeServer | ||
40 | runnerToken: string | ||
41 | job: JobWithToken | ||
42 | progressGetter: () => number | ||
43 | }) { | ||
44 | const { job, server, progressGetter, runnerToken } = options | ||
45 | |||
46 | const updateInterval = ConfigManager.Instance.isTestInstance() | ||
47 | ? 500 | ||
48 | : 60000 | ||
49 | |||
50 | const update = () => { | ||
51 | server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() }) | ||
52 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | ||
53 | } | ||
54 | |||
55 | const interval = setInterval(() => { | ||
56 | update() | ||
57 | }, updateInterval) | ||
58 | |||
59 | update() | ||
60 | |||
61 | return interval | ||
62 | } | ||
63 | |||
64 | // --------------------------------------------------------------------------- | ||
65 | |||
66 | export function buildFFmpegVOD (options: { | ||
67 | onJobProgress: (progress: number) => void | ||
68 | }) { | ||
69 | const { onJobProgress } = options | ||
70 | |||
71 | return new FFmpegVOD({ | ||
72 | ...getCommonFFmpegOptions(), | ||
73 | |||
74 | updateJobProgress: arg => { | ||
75 | const progress = arg < 0 || arg > 100 | ||
76 | ? undefined | ||
77 | : arg | ||
78 | |||
79 | onJobProgress(progress) | ||
80 | } | ||
81 | }) | ||
82 | } | ||
83 | |||
84 | export function buildFFmpegLive () { | ||
85 | return new FFmpegLive(getCommonFFmpegOptions()) | ||
86 | } | ||
87 | |||
88 | export function buildFFmpegEdition () { | ||
89 | return new FFmpegEdition(getCommonFFmpegOptions()) | ||
90 | } | ||
91 | |||
92 | function getCommonFFmpegOptions () { | ||
93 | const config = ConfigManager.Instance.getConfig() | ||
94 | |||
95 | return { | ||
96 | niceness: config.ffmpeg.nice, | ||
97 | threads: config.ffmpeg.threads, | ||
98 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
99 | profile: 'default', | ||
100 | availableEncoders: { | ||
101 | available: getDefaultAvailableEncoders(), | ||
102 | encodersToTry: getDefaultEncodersToTry() | ||
103 | }, | ||
104 | logger: getTranscodingLogger() | ||
105 | } | ||
106 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/index.ts b/packages/peertube-runner/server/process/shared/index.ts deleted file mode 100644 index 556c51365..000000000 --- a/packages/peertube-runner/server/process/shared/index.ts +++ /dev/null | |||
@@ -1,3 +0,0 @@ | |||
1 | export * from './common' | ||
2 | export * from './process-vod' | ||
3 | export * from './transcoding-logger' | ||
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts deleted file mode 100644 index e1fc0e34e..000000000 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ /dev/null | |||
@@ -1,338 +0,0 @@ | |||
1 | import { FSWatcher, watch } from 'chokidar' | ||
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
3 | import { ensureDir, remove } from 'fs-extra' | ||
4 | import { logger } from 'packages/peertube-runner/shared' | ||
5 | import { basename, join } from 'path' | ||
6 | import { wait } from '@shared/core-utils' | ||
7 | import { buildUUID } from '@shared/extra-utils' | ||
8 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' | ||
9 | import { | ||
10 | LiveRTMPHLSTranscodingSuccess, | ||
11 | LiveRTMPHLSTranscodingUpdatePayload, | ||
12 | PeerTubeProblemDocument, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
14 | ServerErrorCode | ||
15 | } from '@shared/models' | ||
16 | import { ConfigManager } from '../../../shared/config-manager' | ||
17 | import { buildFFmpegLive, ProcessOptions } from './common' | ||
18 | |||
19 | export class ProcessLiveRTMPHLSTranscoding { | ||
20 | |||
21 | private readonly outputPath: string | ||
22 | private readonly fsWatchers: FSWatcher[] = [] | ||
23 | |||
24 | // Playlist name -> chunks | ||
25 | private readonly pendingChunksPerPlaylist = new Map<string, string[]>() | ||
26 | |||
27 | private readonly playlistsCreated = new Set<string>() | ||
28 | private allPlaylistsCreated = false | ||
29 | |||
30 | private ffmpegCommand: FfmpegCommand | ||
31 | |||
32 | private ended = false | ||
33 | private errored = false | ||
34 | |||
35 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | ||
36 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
37 | |||
38 | logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`) | ||
39 | } | ||
40 | |||
41 | process () { | ||
42 | const job = this.options.job | ||
43 | const payload = job.payload | ||
44 | |||
45 | return new Promise<void>(async (res, rej) => { | ||
46 | try { | ||
47 | await ensureDir(this.outputPath) | ||
48 | |||
49 | logger.info(`Probing ${payload.input.rtmpUrl}`) | ||
50 | const probe = await ffprobePromise(payload.input.rtmpUrl) | ||
51 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | ||
52 | |||
53 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | ||
54 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | ||
55 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | ||
56 | |||
57 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | ||
58 | this.fsWatchers.push(m3u8Watcher) | ||
59 | |||
60 | const tsWatcher = watch(this.outputPath + '/*.ts') | ||
61 | this.fsWatchers.push(tsWatcher) | ||
62 | |||
63 | m3u8Watcher.on('change', p => { | ||
64 | logger.debug(`${p} m3u8 playlist changed`) | ||
65 | }) | ||
66 | |||
67 | m3u8Watcher.on('add', p => { | ||
68 | this.playlistsCreated.add(p) | ||
69 | |||
70 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | ||
71 | this.allPlaylistsCreated = true | ||
72 | logger.info('All m3u8 playlists are created.') | ||
73 | } | ||
74 | }) | ||
75 | |||
76 | tsWatcher.on('add', async p => { | ||
77 | try { | ||
78 | await this.sendPendingChunks() | ||
79 | } catch (err) { | ||
80 | this.onUpdateError({ err, rej, res }) | ||
81 | } | ||
82 | |||
83 | const playlistName = this.getPlaylistIdFromTS(p) | ||
84 | |||
85 | const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || [] | ||
86 | pendingChunks.push(p) | ||
87 | |||
88 | this.pendingChunksPerPlaylist.set(playlistName, pendingChunks) | ||
89 | }) | ||
90 | |||
91 | tsWatcher.on('unlink', p => { | ||
92 | this.sendDeletedChunkUpdate(p) | ||
93 | .catch(err => this.onUpdateError({ err, rej, res })) | ||
94 | }) | ||
95 | |||
96 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | ||
97 | inputUrl: payload.input.rtmpUrl, | ||
98 | |||
99 | outPath: this.outputPath, | ||
100 | masterPlaylistName: 'master.m3u8', | ||
101 | |||
102 | segmentListSize: payload.output.segmentListSize, | ||
103 | segmentDuration: payload.output.segmentDuration, | ||
104 | |||
105 | toTranscode: payload.output.toTranscode, | ||
106 | |||
107 | bitrate, | ||
108 | ratio, | ||
109 | |||
110 | hasAudio | ||
111 | }) | ||
112 | |||
113 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | ||
114 | |||
115 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
116 | this.onFFmpegError({ err, stdout, stderr }) | ||
117 | |||
118 | res() | ||
119 | }) | ||
120 | |||
121 | this.ffmpegCommand.on('end', () => { | ||
122 | this.onFFmpegEnded() | ||
123 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | ||
124 | |||
125 | res() | ||
126 | }) | ||
127 | |||
128 | this.ffmpegCommand.run() | ||
129 | } catch (err) { | ||
130 | rej(err) | ||
131 | } | ||
132 | }) | ||
133 | } | ||
134 | |||
135 | // --------------------------------------------------------------------------- | ||
136 | |||
137 | private onUpdateError (options: { | ||
138 | err: Error | ||
139 | res: () => void | ||
140 | rej: (reason?: any) => void | ||
141 | }) { | ||
142 | const { err, res, rej } = options | ||
143 | |||
144 | if (this.errored) return | ||
145 | if (this.ended) return | ||
146 | |||
147 | this.errored = true | ||
148 | |||
149 | this.ffmpegCommand.kill('SIGINT') | ||
150 | |||
151 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | ||
152 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
153 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | ||
154 | |||
155 | res() | ||
156 | } else { | ||
157 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | ||
158 | |||
159 | this.sendError(err) | ||
160 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
161 | |||
162 | rej(err) | ||
163 | } | ||
164 | |||
165 | this.cleanup() | ||
166 | } | ||
167 | |||
168 | // --------------------------------------------------------------------------- | ||
169 | |||
170 | private onFFmpegError (options: { | ||
171 | err: any | ||
172 | stdout: string | ||
173 | stderr: string | ||
174 | }) { | ||
175 | const { err, stdout, stderr } = options | ||
176 | |||
177 | // Don't care that we killed the ffmpeg process | ||
178 | if (err?.message?.includes('Exiting normally')) return | ||
179 | if (this.errored) return | ||
180 | if (this.ended) return | ||
181 | |||
182 | this.errored = true | ||
183 | |||
184 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | ||
185 | |||
186 | this.sendError(err) | ||
187 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
188 | |||
189 | this.cleanup() | ||
190 | } | ||
191 | |||
192 | private async sendError (err: Error) { | ||
193 | await this.options.server.runnerJobs.error({ | ||
194 | jobToken: this.options.job.jobToken, | ||
195 | jobUUID: this.options.job.uuid, | ||
196 | runnerToken: this.options.runnerToken, | ||
197 | message: err.message | ||
198 | }) | ||
199 | } | ||
200 | |||
201 | // --------------------------------------------------------------------------- | ||
202 | |||
203 | private async onFFmpegEnded () { | ||
204 | if (this.ended) return | ||
205 | |||
206 | this.ended = true | ||
207 | logger.info('FFmpeg ended, sending success to server') | ||
208 | |||
209 | // Wait last ffmpeg chunks generation | ||
210 | await wait(1500) | ||
211 | |||
212 | this.sendSuccess() | ||
213 | .catch(err => logger.error({ err }, 'Cannot send success')) | ||
214 | |||
215 | this.cleanup() | ||
216 | } | ||
217 | |||
218 | private async sendSuccess () { | ||
219 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | ||
220 | |||
221 | await this.options.server.runnerJobs.success({ | ||
222 | jobToken: this.options.job.jobToken, | ||
223 | jobUUID: this.options.job.uuid, | ||
224 | runnerToken: this.options.runnerToken, | ||
225 | payload: successBody | ||
226 | }) | ||
227 | } | ||
228 | |||
229 | // --------------------------------------------------------------------------- | ||
230 | |||
231 | private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> { | ||
232 | if (this.ended) return Promise.resolve() | ||
233 | |||
234 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | ||
235 | |||
236 | const videoChunkFilename = basename(deletedChunk) | ||
237 | |||
238 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
239 | type: 'remove-chunk', | ||
240 | videoChunkFilename | ||
241 | } | ||
242 | |||
243 | if (this.allPlaylistsCreated) { | ||
244 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
245 | |||
246 | payload = { | ||
247 | ...payload, | ||
248 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
249 | resolutionPlaylistFilename: playlistName, | ||
250 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
251 | } | ||
252 | } | ||
253 | |||
254 | return this.updateWithRetry(payload) | ||
255 | } | ||
256 | |||
257 | private async sendPendingChunks (): Promise<any> { | ||
258 | if (this.ended) return Promise.resolve() | ||
259 | |||
260 | const promises: Promise<any>[] = [] | ||
261 | |||
262 | for (const playlist of this.pendingChunksPerPlaylist.keys()) { | ||
263 | for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { | ||
264 | logger.debug(`Sending added live chunk ${chunk} update`) | ||
265 | |||
266 | const videoChunkFilename = basename(chunk) | ||
267 | |||
268 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
269 | type: 'add-chunk', | ||
270 | videoChunkFilename, | ||
271 | videoChunkFile: chunk | ||
272 | } | ||
273 | |||
274 | if (this.allPlaylistsCreated) { | ||
275 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
276 | |||
277 | payload = { | ||
278 | ...payload, | ||
279 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
280 | resolutionPlaylistFilename: playlistName, | ||
281 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
282 | } | ||
283 | } | ||
284 | |||
285 | promises.push(this.updateWithRetry(payload)) | ||
286 | } | ||
287 | |||
288 | this.pendingChunksPerPlaylist.set(playlist, []) | ||
289 | } | ||
290 | |||
291 | await Promise.all(promises) | ||
292 | } | ||
293 | |||
294 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { | ||
295 | if (this.ended || this.errored) return | ||
296 | |||
297 | try { | ||
298 | await this.options.server.runnerJobs.update({ | ||
299 | jobToken: this.options.job.jobToken, | ||
300 | jobUUID: this.options.job.uuid, | ||
301 | runnerToken: this.options.runnerToken, | ||
302 | payload | ||
303 | }) | ||
304 | } catch (err) { | ||
305 | if (currentTry >= 3) throw err | ||
306 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err | ||
307 | |||
308 | logger.warn({ err }, 'Will retry update after error') | ||
309 | await wait(250) | ||
310 | |||
311 | return this.updateWithRetry(payload, currentTry + 1) | ||
312 | } | ||
313 | } | ||
314 | |||
315 | private getPlaylistName (videoChunkFilename: string) { | ||
316 | return `${videoChunkFilename.split('-')[0]}.m3u8` | ||
317 | } | ||
318 | |||
319 | private getPlaylistIdFromTS (segmentPath: string) { | ||
320 | const playlistIdMatcher = /^([\d+])-/ | ||
321 | |||
322 | return basename(segmentPath).match(playlistIdMatcher)[1] | ||
323 | } | ||
324 | |||
325 | // --------------------------------------------------------------------------- | ||
326 | |||
327 | private cleanup () { | ||
328 | logger.debug(`Cleaning up job ${this.options.job.uuid}`) | ||
329 | |||
330 | for (const fsWatcher of this.fsWatchers) { | ||
331 | fsWatcher.close() | ||
332 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | ||
333 | } | ||
334 | |||
335 | remove(this.outputPath) | ||
336 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | ||
337 | } | ||
338 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts deleted file mode 100644 index 7bb209e80..000000000 --- a/packages/peertube-runner/server/process/shared/process-studio.ts +++ /dev/null | |||
@@ -1,165 +0,0 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { logger } from 'packages/peertube-runner/shared' | ||
3 | import { join } from 'path' | ||
4 | import { pick } from '@shared/core-utils' | ||
5 | import { buildUUID } from '@shared/extra-utils' | ||
6 | import { | ||
7 | RunnerJobStudioTranscodingPayload, | ||
8 | VideoStudioTask, | ||
9 | VideoStudioTaskCutPayload, | ||
10 | VideoStudioTaskIntroPayload, | ||
11 | VideoStudioTaskOutroPayload, | ||
12 | VideoStudioTaskPayload, | ||
13 | VideoStudioTaskWatermarkPayload, | ||
14 | VideoStudioTranscodingSuccess | ||
15 | } from '@shared/models' | ||
16 | import { ConfigManager } from '../../../shared/config-manager' | ||
17 | import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common' | ||
18 | |||
19 | export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) { | ||
20 | const { server, job, runnerToken } = options | ||
21 | const payload = job.payload | ||
22 | |||
23 | let inputPath: string | ||
24 | let outputPath: string | ||
25 | let tmpInputFilePath: string | ||
26 | |||
27 | let tasksProgress = 0 | ||
28 | |||
29 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
30 | job, | ||
31 | server, | ||
32 | runnerToken, | ||
33 | progressGetter: () => tasksProgress | ||
34 | }) | ||
35 | |||
36 | try { | ||
37 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) | ||
38 | |||
39 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
40 | tmpInputFilePath = inputPath | ||
41 | |||
42 | logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) | ||
43 | |||
44 | for (const task of payload.tasks) { | ||
45 | const outputFilename = 'output-edition-' + buildUUID() + '.mp4' | ||
46 | outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) | ||
47 | |||
48 | await processTask({ | ||
49 | inputPath: tmpInputFilePath, | ||
50 | outputPath, | ||
51 | task, | ||
52 | job, | ||
53 | runnerToken | ||
54 | }) | ||
55 | |||
56 | if (tmpInputFilePath) await remove(tmpInputFilePath) | ||
57 | |||
58 | // For the next iteration | ||
59 | tmpInputFilePath = outputPath | ||
60 | |||
61 | tasksProgress += Math.floor(100 / payload.tasks.length) | ||
62 | } | ||
63 | |||
64 | const successBody: VideoStudioTranscodingSuccess = { | ||
65 | videoFile: outputPath | ||
66 | } | ||
67 | |||
68 | await server.runnerJobs.success({ | ||
69 | jobToken: job.jobToken, | ||
70 | jobUUID: job.uuid, | ||
71 | runnerToken, | ||
72 | payload: successBody | ||
73 | }) | ||
74 | } finally { | ||
75 | if (tmpInputFilePath) await remove(tmpInputFilePath) | ||
76 | if (outputPath) await remove(outputPath) | ||
77 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
78 | } | ||
79 | } | ||
80 | |||
81 | // --------------------------------------------------------------------------- | ||
82 | // Private | ||
83 | // --------------------------------------------------------------------------- | ||
84 | |||
85 | type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = { | ||
86 | inputPath: string | ||
87 | outputPath: string | ||
88 | task: T | ||
89 | runnerToken: string | ||
90 | job: JobWithToken | ||
91 | } | ||
92 | |||
93 | const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { | ||
94 | 'add-intro': processAddIntroOutro, | ||
95 | 'add-outro': processAddIntroOutro, | ||
96 | 'cut': processCut, | ||
97 | 'add-watermark': processAddWatermark | ||
98 | } | ||
99 | |||
100 | async function processTask (options: TaskProcessorOptions) { | ||
101 | const { task } = options | ||
102 | |||
103 | const processor = taskProcessors[options.task.name] | ||
104 | if (!process) throw new Error('Unknown task ' + task.name) | ||
105 | |||
106 | return processor(options) | ||
107 | } | ||
108 | |||
109 | async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { | ||
110 | const { inputPath, task, runnerToken, job } = options | ||
111 | |||
112 | logger.debug('Adding intro/outro to ' + inputPath) | ||
113 | |||
114 | const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) | ||
115 | |||
116 | try { | ||
117 | await buildFFmpegEdition().addIntroOutro({ | ||
118 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
119 | |||
120 | introOutroPath, | ||
121 | type: task.name === 'add-intro' | ||
122 | ? 'intro' | ||
123 | : 'outro' | ||
124 | }) | ||
125 | } finally { | ||
126 | await remove(introOutroPath) | ||
127 | } | ||
128 | } | ||
129 | |||
130 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { | ||
131 | const { inputPath, task } = options | ||
132 | |||
133 | logger.debug(`Cutting ${inputPath}`) | ||
134 | |||
135 | return buildFFmpegEdition().cutVideo({ | ||
136 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
137 | |||
138 | start: task.options.start, | ||
139 | end: task.options.end | ||
140 | }) | ||
141 | } | ||
142 | |||
143 | async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { | ||
144 | const { inputPath, task, runnerToken, job } = options | ||
145 | |||
146 | logger.debug('Adding watermark to ' + inputPath) | ||
147 | |||
148 | const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) | ||
149 | |||
150 | try { | ||
151 | await buildFFmpegEdition().addWatermark({ | ||
152 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
153 | |||
154 | watermarkPath, | ||
155 | |||
156 | videoFilters: { | ||
157 | watermarkSizeRatio: task.options.watermarkSizeRatio, | ||
158 | horitonzalMarginRatio: task.options.horitonzalMarginRatio, | ||
159 | verticalMarginRatio: task.options.verticalMarginRatio | ||
160 | } | ||
161 | }) | ||
162 | } finally { | ||
163 | await remove(watermarkPath) | ||
164 | } | ||
165 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts deleted file mode 100644 index f7c076b27..000000000 --- a/packages/peertube-runner/server/process/shared/process-vod.ts +++ /dev/null | |||
@@ -1,201 +0,0 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { logger } from 'packages/peertube-runner/shared' | ||
3 | import { join } from 'path' | ||
4 | import { buildUUID } from '@shared/extra-utils' | ||
5 | import { | ||
6 | RunnerJobVODAudioMergeTranscodingPayload, | ||
7 | RunnerJobVODHLSTranscodingPayload, | ||
8 | RunnerJobVODWebVideoTranscodingPayload, | ||
9 | VODAudioMergeTranscodingSuccess, | ||
10 | VODHLSTranscodingSuccess, | ||
11 | VODWebVideoTranscodingSuccess | ||
12 | } from '@shared/models' | ||
13 | import { ConfigManager } from '../../../shared/config-manager' | ||
14 | import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common' | ||
15 | |||
16 | export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { | ||
17 | const { server, job, runnerToken } = options | ||
18 | |||
19 | const payload = job.payload | ||
20 | |||
21 | let ffmpegProgress: number | ||
22 | let inputPath: string | ||
23 | |||
24 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
25 | |||
26 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
27 | job, | ||
28 | server, | ||
29 | runnerToken, | ||
30 | progressGetter: () => ffmpegProgress | ||
31 | }) | ||
32 | |||
33 | try { | ||
34 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) | ||
35 | |||
36 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
37 | |||
38 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) | ||
39 | |||
40 | const ffmpegVod = buildFFmpegVOD({ | ||
41 | onJobProgress: progress => { ffmpegProgress = progress } | ||
42 | }) | ||
43 | |||
44 | await ffmpegVod.transcode({ | ||
45 | type: 'video', | ||
46 | |||
47 | inputPath, | ||
48 | |||
49 | outputPath, | ||
50 | |||
51 | inputFileMutexReleaser: () => {}, | ||
52 | |||
53 | resolution: payload.output.resolution, | ||
54 | fps: payload.output.fps | ||
55 | }) | ||
56 | |||
57 | const successBody: VODWebVideoTranscodingSuccess = { | ||
58 | videoFile: outputPath | ||
59 | } | ||
60 | |||
61 | await server.runnerJobs.success({ | ||
62 | jobToken: job.jobToken, | ||
63 | jobUUID: job.uuid, | ||
64 | runnerToken, | ||
65 | payload: successBody | ||
66 | }) | ||
67 | } finally { | ||
68 | if (inputPath) await remove(inputPath) | ||
69 | if (outputPath) await remove(outputPath) | ||
70 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
71 | } | ||
72 | } | ||
73 | |||
74 | export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) { | ||
75 | const { server, job, runnerToken } = options | ||
76 | const payload = job.payload | ||
77 | |||
78 | let ffmpegProgress: number | ||
79 | let inputPath: string | ||
80 | |||
81 | const uuid = buildUUID() | ||
82 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) | ||
83 | const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` | ||
84 | const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) | ||
85 | |||
86 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
87 | job, | ||
88 | server, | ||
89 | runnerToken, | ||
90 | progressGetter: () => ffmpegProgress | ||
91 | }) | ||
92 | |||
93 | try { | ||
94 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) | ||
95 | |||
96 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
97 | |||
98 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`) | ||
99 | |||
100 | const ffmpegVod = buildFFmpegVOD({ | ||
101 | onJobProgress: progress => { ffmpegProgress = progress } | ||
102 | }) | ||
103 | |||
104 | await ffmpegVod.transcode({ | ||
105 | type: 'hls', | ||
106 | copyCodecs: false, | ||
107 | inputPath, | ||
108 | hlsPlaylist: { videoFilename }, | ||
109 | outputPath, | ||
110 | |||
111 | inputFileMutexReleaser: () => {}, | ||
112 | |||
113 | resolution: payload.output.resolution, | ||
114 | fps: payload.output.fps | ||
115 | }) | ||
116 | |||
117 | const successBody: VODHLSTranscodingSuccess = { | ||
118 | resolutionPlaylistFile: outputPath, | ||
119 | videoFile: videoPath | ||
120 | } | ||
121 | |||
122 | await server.runnerJobs.success({ | ||
123 | jobToken: job.jobToken, | ||
124 | jobUUID: job.uuid, | ||
125 | runnerToken, | ||
126 | payload: successBody | ||
127 | }) | ||
128 | } finally { | ||
129 | if (inputPath) await remove(inputPath) | ||
130 | if (outputPath) await remove(outputPath) | ||
131 | if (videoPath) await remove(videoPath) | ||
132 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
133 | } | ||
134 | } | ||
135 | |||
136 | export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { | ||
137 | const { server, job, runnerToken } = options | ||
138 | const payload = job.payload | ||
139 | |||
140 | let ffmpegProgress: number | ||
141 | let audioPath: string | ||
142 | let inputPath: string | ||
143 | |||
144 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
145 | |||
146 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
147 | job, | ||
148 | server, | ||
149 | runnerToken, | ||
150 | progressGetter: () => ffmpegProgress | ||
151 | }) | ||
152 | |||
153 | try { | ||
154 | logger.info( | ||
155 | `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
156 | `for audio merge transcoding job ${job.jobToken}` | ||
157 | ) | ||
158 | |||
159 | audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) | ||
160 | inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) | ||
161 | |||
162 | logger.info( | ||
163 | `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
164 | `for job ${job.jobToken}. Running audio merge transcoding.` | ||
165 | ) | ||
166 | |||
167 | const ffmpegVod = buildFFmpegVOD({ | ||
168 | onJobProgress: progress => { ffmpegProgress = progress } | ||
169 | }) | ||
170 | |||
171 | await ffmpegVod.transcode({ | ||
172 | type: 'merge-audio', | ||
173 | |||
174 | audioPath, | ||
175 | inputPath, | ||
176 | |||
177 | outputPath, | ||
178 | |||
179 | inputFileMutexReleaser: () => {}, | ||
180 | |||
181 | resolution: payload.output.resolution, | ||
182 | fps: payload.output.fps | ||
183 | }) | ||
184 | |||
185 | const successBody: VODAudioMergeTranscodingSuccess = { | ||
186 | videoFile: outputPath | ||
187 | } | ||
188 | |||
189 | await server.runnerJobs.success({ | ||
190 | jobToken: job.jobToken, | ||
191 | jobUUID: job.uuid, | ||
192 | runnerToken, | ||
193 | payload: successBody | ||
194 | }) | ||
195 | } finally { | ||
196 | if (audioPath) await remove(audioPath) | ||
197 | if (inputPath) await remove(inputPath) | ||
198 | if (outputPath) await remove(outputPath) | ||
199 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
200 | } | ||
201 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/transcoding-logger.ts b/packages/peertube-runner/server/process/shared/transcoding-logger.ts deleted file mode 100644 index d0f928914..000000000 --- a/packages/peertube-runner/server/process/shared/transcoding-logger.ts +++ /dev/null | |||
@@ -1,10 +0,0 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | ||
2 | |||
3 | export function getTranscodingLogger () { | ||
4 | return { | ||
5 | info: logger.info.bind(logger), | ||
6 | debug: logger.debug.bind(logger), | ||
7 | warn: logger.warn.bind(logger), | ||
8 | error: logger.error.bind(logger) | ||
9 | } | ||
10 | } | ||