diff options
Diffstat (limited to 'packages/peertube-runner/server/process')
8 files changed, 697 insertions, 0 deletions
diff --git a/packages/peertube-runner/server/process/index.ts b/packages/peertube-runner/server/process/index.ts new file mode 100644 index 000000000..6caedbdaf --- /dev/null +++ b/packages/peertube-runner/server/process/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './shared' | ||
2 | export * from './process' | ||
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts new file mode 100644 index 000000000..39a929c59 --- /dev/null +++ b/packages/peertube-runner/server/process/process.ts | |||
@@ -0,0 +1,30 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | ||
2 | import { | ||
3 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
4 | RunnerJobVODAudioMergeTranscodingPayload, | ||
5 | RunnerJobVODHLSTranscodingPayload, | ||
6 | RunnerJobVODWebVideoTranscodingPayload | ||
7 | } from '@shared/models' | ||
8 | import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' | ||
9 | import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' | ||
10 | |||
11 | export async function processJob (options: ProcessOptions) { | ||
12 | const { server, job } = options | ||
13 | |||
14 | logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload }) | ||
15 | |||
16 | if (job.type === 'vod-audio-merge-transcoding') { | ||
17 | await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) | ||
18 | } else if (job.type === 'vod-web-video-transcoding') { | ||
19 | await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) | ||
20 | } else if (job.type === 'vod-hls-transcoding') { | ||
21 | await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) | ||
22 | } else if (job.type === 'live-rtmp-hls-transcoding') { | ||
23 | await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() | ||
24 | } else { | ||
25 | logger.error(`Unknown job ${job.type} to process`) | ||
26 | return | ||
27 | } | ||
28 | |||
29 | logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`) | ||
30 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts new file mode 100644 index 000000000..9b2c40728 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/common.ts | |||
@@ -0,0 +1,91 @@ | |||
1 | import { throttle } from 'lodash' | ||
2 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' | ||
3 | import { join } from 'path' | ||
4 | import { buildUUID } from '@shared/extra-utils' | ||
5 | import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' | ||
6 | import { RunnerJob, RunnerJobPayload } from '@shared/models' | ||
7 | import { PeerTubeServer } from '@shared/server-commands' | ||
8 | import { getTranscodingLogger } from './transcoding-logger' | ||
9 | import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' | ||
10 | |||
11 | export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } | ||
12 | |||
13 | export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = { | ||
14 | server: PeerTubeServer | ||
15 | job: JobWithToken<T> | ||
16 | runnerToken: string | ||
17 | } | ||
18 | |||
19 | export async function downloadInputFile (options: { | ||
20 | url: string | ||
21 | job: JobWithToken | ||
22 | runnerToken: string | ||
23 | }) { | ||
24 | const { url, job, runnerToken } = options | ||
25 | const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
26 | |||
27 | await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) | ||
28 | |||
29 | return destination | ||
30 | } | ||
31 | |||
32 | export async function updateTranscodingProgress (options: { | ||
33 | server: PeerTubeServer | ||
34 | runnerToken: string | ||
35 | job: JobWithToken | ||
36 | progress: number | ||
37 | }) { | ||
38 | const { server, job, runnerToken, progress } = options | ||
39 | |||
40 | return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) | ||
41 | } | ||
42 | |||
43 | export function buildFFmpegVOD (options: { | ||
44 | server: PeerTubeServer | ||
45 | runnerToken: string | ||
46 | job: JobWithToken | ||
47 | }) { | ||
48 | const { server, job, runnerToken } = options | ||
49 | |||
50 | const updateInterval = ConfigManager.Instance.isTestInstance() | ||
51 | ? 500 | ||
52 | : 60000 | ||
53 | |||
54 | const updateJobProgress = throttle((progress: number) => { | ||
55 | if (progress < 0 || progress > 100) progress = undefined | ||
56 | |||
57 | updateTranscodingProgress({ server, job, runnerToken, progress }) | ||
58 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | ||
59 | }, updateInterval, { trailing: false }) | ||
60 | |||
61 | const config = ConfigManager.Instance.getConfig() | ||
62 | |||
63 | return new FFmpegVOD({ | ||
64 | niceness: config.ffmpeg.nice, | ||
65 | threads: config.ffmpeg.threads, | ||
66 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
67 | profile: 'default', | ||
68 | availableEncoders: { | ||
69 | available: getAvailableEncoders(), | ||
70 | encodersToTry: getEncodersToTry() | ||
71 | }, | ||
72 | logger: getTranscodingLogger(), | ||
73 | updateJobProgress | ||
74 | }) | ||
75 | } | ||
76 | |||
77 | export function buildFFmpegLive () { | ||
78 | const config = ConfigManager.Instance.getConfig() | ||
79 | |||
80 | return new FFmpegLive({ | ||
81 | niceness: config.ffmpeg.nice, | ||
82 | threads: config.ffmpeg.threads, | ||
83 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
84 | profile: 'default', | ||
85 | availableEncoders: { | ||
86 | available: getAvailableEncoders(), | ||
87 | encodersToTry: getEncodersToTry() | ||
88 | }, | ||
89 | logger: getTranscodingLogger() | ||
90 | }) | ||
91 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/index.ts b/packages/peertube-runner/server/process/shared/index.ts new file mode 100644 index 000000000..8e09a7869 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/index.ts | |||
@@ -0,0 +1,4 @@ | |||
1 | export * from './common' | ||
2 | export * from './process-vod' | ||
3 | export * from './transcoding-logger' | ||
4 | export * from './transcoding-profiles' | ||
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts new file mode 100644 index 000000000..5a3b596a2 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -0,0 +1,295 @@ | |||
1 | import { FSWatcher, watch } from 'chokidar' | ||
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
3 | import { ensureDir, remove } from 'fs-extra' | ||
4 | import { logger } from 'packages/peertube-runner/shared' | ||
5 | import { basename, join } from 'path' | ||
6 | import { wait } from '@shared/core-utils' | ||
7 | import { buildUUID } from '@shared/extra-utils' | ||
8 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' | ||
9 | import { | ||
10 | LiveRTMPHLSTranscodingSuccess, | ||
11 | LiveRTMPHLSTranscodingUpdatePayload, | ||
12 | PeerTubeProblemDocument, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
14 | ServerErrorCode | ||
15 | } from '@shared/models' | ||
16 | import { ConfigManager } from '../../../shared/config-manager' | ||
17 | import { buildFFmpegLive, ProcessOptions } from './common' | ||
18 | |||
19 | export class ProcessLiveRTMPHLSTranscoding { | ||
20 | |||
21 | private readonly outputPath: string | ||
22 | private readonly fsWatchers: FSWatcher[] = [] | ||
23 | |||
24 | private readonly playlistsCreated = new Set<string>() | ||
25 | private allPlaylistsCreated = false | ||
26 | |||
27 | private ffmpegCommand: FfmpegCommand | ||
28 | |||
29 | private ended = false | ||
30 | private errored = false | ||
31 | |||
32 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | ||
33 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
34 | } | ||
35 | |||
36 | process () { | ||
37 | const job = this.options.job | ||
38 | const payload = job.payload | ||
39 | |||
40 | return new Promise<void>(async (res, rej) => { | ||
41 | try { | ||
42 | await ensureDir(this.outputPath) | ||
43 | |||
44 | logger.info(`Probing ${payload.input.rtmpUrl}`) | ||
45 | const probe = await ffprobePromise(payload.input.rtmpUrl) | ||
46 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | ||
47 | |||
48 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | ||
49 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | ||
50 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | ||
51 | |||
52 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | ||
53 | this.fsWatchers.push(m3u8Watcher) | ||
54 | |||
55 | const tsWatcher = watch(this.outputPath + '/*.ts') | ||
56 | this.fsWatchers.push(tsWatcher) | ||
57 | |||
58 | m3u8Watcher.on('change', p => { | ||
59 | logger.debug(`${p} m3u8 playlist changed`) | ||
60 | }) | ||
61 | |||
62 | m3u8Watcher.on('add', p => { | ||
63 | this.playlistsCreated.add(p) | ||
64 | |||
65 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | ||
66 | this.allPlaylistsCreated = true | ||
67 | logger.info('All m3u8 playlists are created.') | ||
68 | } | ||
69 | }) | ||
70 | |||
71 | tsWatcher.on('add', p => { | ||
72 | this.sendAddedChunkUpdate(p) | ||
73 | .catch(err => this.onUpdateError(err, rej)) | ||
74 | }) | ||
75 | |||
76 | tsWatcher.on('unlink', p => { | ||
77 | this.sendDeletedChunkUpdate(p) | ||
78 | .catch(err => this.onUpdateError(err, rej)) | ||
79 | }) | ||
80 | |||
81 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | ||
82 | inputUrl: payload.input.rtmpUrl, | ||
83 | |||
84 | outPath: this.outputPath, | ||
85 | masterPlaylistName: 'master.m3u8', | ||
86 | |||
87 | segmentListSize: payload.output.segmentListSize, | ||
88 | segmentDuration: payload.output.segmentDuration, | ||
89 | |||
90 | toTranscode: payload.output.toTranscode, | ||
91 | |||
92 | bitrate, | ||
93 | ratio, | ||
94 | |||
95 | hasAudio | ||
96 | }) | ||
97 | |||
98 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | ||
99 | |||
100 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
101 | this.onFFmpegError({ err, stdout, stderr }) | ||
102 | |||
103 | res() | ||
104 | }) | ||
105 | |||
106 | this.ffmpegCommand.on('end', () => { | ||
107 | this.onFFmpegEnded() | ||
108 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | ||
109 | |||
110 | res() | ||
111 | }) | ||
112 | |||
113 | this.ffmpegCommand.run() | ||
114 | } catch (err) { | ||
115 | rej(err) | ||
116 | } | ||
117 | }) | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | private onUpdateError (err: Error, reject: (reason?: any) => void) { | ||
123 | if (this.errored) return | ||
124 | if (this.ended) return | ||
125 | |||
126 | this.errored = true | ||
127 | |||
128 | reject(err) | ||
129 | this.ffmpegCommand.kill('SIGINT') | ||
130 | |||
131 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | ||
132 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
133 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | ||
134 | } else { | ||
135 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | ||
136 | |||
137 | this.sendError(err) | ||
138 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
139 | } | ||
140 | |||
141 | this.cleanup() | ||
142 | } | ||
143 | |||
144 | // --------------------------------------------------------------------------- | ||
145 | |||
146 | private onFFmpegError (options: { | ||
147 | err: any | ||
148 | stdout: string | ||
149 | stderr: string | ||
150 | }) { | ||
151 | const { err, stdout, stderr } = options | ||
152 | |||
153 | // Don't care that we killed the ffmpeg process | ||
154 | if (err?.message?.includes('Exiting normally')) return | ||
155 | if (this.errored) return | ||
156 | if (this.ended) return | ||
157 | |||
158 | this.errored = true | ||
159 | |||
160 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | ||
161 | |||
162 | this.sendError(err) | ||
163 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
164 | |||
165 | this.cleanup() | ||
166 | } | ||
167 | |||
168 | private async sendError (err: Error) { | ||
169 | await this.options.server.runnerJobs.error({ | ||
170 | jobToken: this.options.job.jobToken, | ||
171 | jobUUID: this.options.job.uuid, | ||
172 | runnerToken: this.options.runnerToken, | ||
173 | message: err.message | ||
174 | }) | ||
175 | } | ||
176 | |||
177 | // --------------------------------------------------------------------------- | ||
178 | |||
179 | private async onFFmpegEnded () { | ||
180 | if (this.ended) return | ||
181 | |||
182 | this.ended = true | ||
183 | logger.info('FFmpeg ended, sending success to server') | ||
184 | |||
185 | // Wait last ffmpeg chunks generation | ||
186 | await wait(1500) | ||
187 | |||
188 | this.sendSuccess() | ||
189 | .catch(err => logger.error({ err }, 'Cannot send success')) | ||
190 | |||
191 | this.cleanup() | ||
192 | } | ||
193 | |||
194 | private async sendSuccess () { | ||
195 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | ||
196 | |||
197 | await this.options.server.runnerJobs.success({ | ||
198 | jobToken: this.options.job.jobToken, | ||
199 | jobUUID: this.options.job.uuid, | ||
200 | runnerToken: this.options.runnerToken, | ||
201 | payload: successBody | ||
202 | }) | ||
203 | } | ||
204 | |||
205 | // --------------------------------------------------------------------------- | ||
206 | |||
207 | private sendDeletedChunkUpdate (deletedChunk: string) { | ||
208 | if (this.ended) return | ||
209 | |||
210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | ||
211 | |||
212 | const videoChunkFilename = basename(deletedChunk) | ||
213 | |||
214 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
215 | type: 'remove-chunk', | ||
216 | videoChunkFilename | ||
217 | } | ||
218 | |||
219 | if (this.allPlaylistsCreated) { | ||
220 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
221 | |||
222 | payload = { | ||
223 | ...payload, | ||
224 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
225 | resolutionPlaylistFilename: playlistName, | ||
226 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
227 | } | ||
228 | } | ||
229 | |||
230 | return this.updateWithRetry(payload) | ||
231 | } | ||
232 | |||
233 | private sendAddedChunkUpdate (addedChunk: string) { | ||
234 | if (this.ended) return | ||
235 | |||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | ||
237 | |||
238 | const videoChunkFilename = basename(addedChunk) | ||
239 | |||
240 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
241 | type: 'add-chunk', | ||
242 | videoChunkFilename, | ||
243 | videoChunkFile: addedChunk | ||
244 | } | ||
245 | |||
246 | if (this.allPlaylistsCreated) { | ||
247 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
248 | |||
249 | payload = { | ||
250 | ...payload, | ||
251 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
252 | resolutionPlaylistFilename: playlistName, | ||
253 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
254 | } | ||
255 | } | ||
256 | |||
257 | return this.updateWithRetry(payload) | ||
258 | } | ||
259 | |||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { | ||
261 | if (this.ended || this.errored) return | ||
262 | |||
263 | try { | ||
264 | await this.options.server.runnerJobs.update({ | ||
265 | jobToken: this.options.job.jobToken, | ||
266 | jobUUID: this.options.job.uuid, | ||
267 | runnerToken: this.options.runnerToken, | ||
268 | payload | ||
269 | }) | ||
270 | } catch (err) { | ||
271 | if (currentTry >= 3) throw err | ||
272 | |||
273 | logger.warn({ err }, 'Will retry update after error') | ||
274 | await wait(250) | ||
275 | |||
276 | return this.updateWithRetry(payload, currentTry + 1) | ||
277 | } | ||
278 | } | ||
279 | |||
280 | private getPlaylistName (videoChunkFilename: string) { | ||
281 | return `${videoChunkFilename.split('-')[0]}.m3u8` | ||
282 | } | ||
283 | |||
284 | // --------------------------------------------------------------------------- | ||
285 | |||
286 | private cleanup () { | ||
287 | for (const fsWatcher of this.fsWatchers) { | ||
288 | fsWatcher.close() | ||
289 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | ||
290 | } | ||
291 | |||
292 | remove(this.outputPath) | ||
293 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | ||
294 | } | ||
295 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts new file mode 100644 index 000000000..aae61e9c5 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-vod.ts | |||
@@ -0,0 +1,131 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { buildUUID } from '@shared/extra-utils' | ||
4 | import { | ||
5 | RunnerJobVODAudioMergeTranscodingPayload, | ||
6 | RunnerJobVODHLSTranscodingPayload, | ||
7 | RunnerJobVODWebVideoTranscodingPayload, | ||
8 | VODAudioMergeTranscodingSuccess, | ||
9 | VODHLSTranscodingSuccess, | ||
10 | VODWebVideoTranscodingSuccess | ||
11 | } from '@shared/models' | ||
12 | import { ConfigManager } from '../../../shared/config-manager' | ||
13 | import { buildFFmpegVOD, downloadInputFile, ProcessOptions } from './common' | ||
14 | |||
15 | export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { | ||
16 | const { server, job, runnerToken } = options | ||
17 | const payload = job.payload | ||
18 | |||
19 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
20 | |||
21 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | ||
22 | |||
23 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
24 | |||
25 | await ffmpegVod.transcode({ | ||
26 | type: 'video', | ||
27 | |||
28 | inputPath, | ||
29 | |||
30 | outputPath, | ||
31 | |||
32 | inputFileMutexReleaser: () => {}, | ||
33 | |||
34 | resolution: payload.output.resolution, | ||
35 | fps: payload.output.fps | ||
36 | }) | ||
37 | |||
38 | const successBody: VODWebVideoTranscodingSuccess = { | ||
39 | videoFile: outputPath | ||
40 | } | ||
41 | |||
42 | await server.runnerJobs.success({ | ||
43 | jobToken: job.jobToken, | ||
44 | jobUUID: job.uuid, | ||
45 | runnerToken, | ||
46 | payload: successBody | ||
47 | }) | ||
48 | |||
49 | await remove(outputPath) | ||
50 | } | ||
51 | |||
52 | export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) { | ||
53 | const { server, job, runnerToken } = options | ||
54 | const payload = job.payload | ||
55 | |||
56 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
57 | const uuid = buildUUID() | ||
58 | |||
59 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) | ||
60 | const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` | ||
61 | const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) | ||
62 | |||
63 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | ||
64 | |||
65 | await ffmpegVod.transcode({ | ||
66 | type: 'hls', | ||
67 | copyCodecs: false, | ||
68 | inputPath, | ||
69 | hlsPlaylist: { videoFilename }, | ||
70 | outputPath, | ||
71 | |||
72 | inputFileMutexReleaser: () => {}, | ||
73 | |||
74 | resolution: payload.output.resolution, | ||
75 | fps: payload.output.fps | ||
76 | }) | ||
77 | |||
78 | const successBody: VODHLSTranscodingSuccess = { | ||
79 | resolutionPlaylistFile: outputPath, | ||
80 | videoFile: videoPath | ||
81 | } | ||
82 | |||
83 | await server.runnerJobs.success({ | ||
84 | jobToken: job.jobToken, | ||
85 | jobUUID: job.uuid, | ||
86 | runnerToken, | ||
87 | payload: successBody | ||
88 | }) | ||
89 | |||
90 | await remove(outputPath) | ||
91 | await remove(videoPath) | ||
92 | } | ||
93 | |||
94 | export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { | ||
95 | const { server, job, runnerToken } = options | ||
96 | const payload = job.payload | ||
97 | |||
98 | const audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) | ||
99 | const inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) | ||
100 | |||
101 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
102 | |||
103 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | ||
104 | |||
105 | await ffmpegVod.transcode({ | ||
106 | type: 'merge-audio', | ||
107 | |||
108 | audioPath, | ||
109 | inputPath, | ||
110 | |||
111 | outputPath, | ||
112 | |||
113 | inputFileMutexReleaser: () => {}, | ||
114 | |||
115 | resolution: payload.output.resolution, | ||
116 | fps: payload.output.fps | ||
117 | }) | ||
118 | |||
119 | const successBody: VODAudioMergeTranscodingSuccess = { | ||
120 | videoFile: outputPath | ||
121 | } | ||
122 | |||
123 | await server.runnerJobs.success({ | ||
124 | jobToken: job.jobToken, | ||
125 | jobUUID: job.uuid, | ||
126 | runnerToken, | ||
127 | payload: successBody | ||
128 | }) | ||
129 | |||
130 | await remove(outputPath) | ||
131 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/transcoding-logger.ts b/packages/peertube-runner/server/process/shared/transcoding-logger.ts new file mode 100644 index 000000000..d0f928914 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/transcoding-logger.ts | |||
@@ -0,0 +1,10 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | ||
2 | |||
3 | export function getTranscodingLogger () { | ||
4 | return { | ||
5 | info: logger.info.bind(logger), | ||
6 | debug: logger.debug.bind(logger), | ||
7 | warn: logger.warn.bind(logger), | ||
8 | error: logger.error.bind(logger) | ||
9 | } | ||
10 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/transcoding-profiles.ts b/packages/peertube-runner/server/process/shared/transcoding-profiles.ts new file mode 100644 index 000000000..492d17d6a --- /dev/null +++ b/packages/peertube-runner/server/process/shared/transcoding-profiles.ts | |||
@@ -0,0 +1,134 @@ | |||
1 | import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils' | ||
2 | import { buildStreamSuffix, ffprobePromise, getAudioStream, getMaxAudioBitrate } from '@shared/ffmpeg' | ||
3 | import { EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '@shared/models' | ||
4 | |||
5 | const defaultX264VODOptionsBuilder: EncoderOptionsBuilder = (options: EncoderOptionsBuilderParams) => { | ||
6 | const { fps, inputRatio, inputBitrate, resolution } = options | ||
7 | |||
8 | const targetBitrate = getTargetBitrate({ inputBitrate, ratio: inputRatio, fps, resolution }) | ||
9 | |||
10 | return { | ||
11 | outputOptions: [ | ||
12 | ...getCommonOutputOptions(targetBitrate), | ||
13 | |||
14 | `-r ${fps}` | ||
15 | ] | ||
16 | } | ||
17 | } | ||
18 | |||
19 | const defaultX264LiveOptionsBuilder: EncoderOptionsBuilder = (options: EncoderOptionsBuilderParams) => { | ||
20 | const { streamNum, fps, inputBitrate, inputRatio, resolution } = options | ||
21 | |||
22 | const targetBitrate = getTargetBitrate({ inputBitrate, ratio: inputRatio, fps, resolution }) | ||
23 | |||
24 | return { | ||
25 | outputOptions: [ | ||
26 | ...getCommonOutputOptions(targetBitrate, streamNum), | ||
27 | |||
28 | `${buildStreamSuffix('-r:v', streamNum)} ${fps}`, | ||
29 | `${buildStreamSuffix('-b:v', streamNum)} ${targetBitrate}` | ||
30 | ] | ||
31 | } | ||
32 | } | ||
33 | |||
34 | const defaultAACOptionsBuilder: EncoderOptionsBuilder = async ({ input, streamNum, canCopyAudio }) => { | ||
35 | const probe = await ffprobePromise(input) | ||
36 | |||
37 | const parsedAudio = await getAudioStream(input, probe) | ||
38 | |||
39 | // We try to reduce the ceiling bitrate by making rough matches of bitrates | ||
40 | // Of course this is far from perfect, but it might save some space in the end | ||
41 | |||
42 | const audioCodecName = parsedAudio.audioStream['codec_name'] | ||
43 | |||
44 | const bitrate = getMaxAudioBitrate(audioCodecName, parsedAudio.bitrate) | ||
45 | |||
46 | // Force stereo as it causes some issues with HLS playback in Chrome | ||
47 | const base = [ '-channel_layout', 'stereo' ] | ||
48 | |||
49 | if (bitrate !== -1) { | ||
50 | return { outputOptions: base.concat([ buildStreamSuffix('-b:a', streamNum), bitrate + 'k' ]) } | ||
51 | } | ||
52 | |||
53 | return { outputOptions: base } | ||
54 | } | ||
55 | |||
56 | const defaultLibFDKAACVODOptionsBuilder: EncoderOptionsBuilder = ({ streamNum }) => { | ||
57 | return { outputOptions: [ buildStreamSuffix('-q:a', streamNum), '5' ] } | ||
58 | } | ||
59 | |||
60 | export function getAvailableEncoders () { | ||
61 | return { | ||
62 | vod: { | ||
63 | libx264: { | ||
64 | default: defaultX264VODOptionsBuilder | ||
65 | }, | ||
66 | aac: { | ||
67 | default: defaultAACOptionsBuilder | ||
68 | }, | ||
69 | libfdk_aac: { | ||
70 | default: defaultLibFDKAACVODOptionsBuilder | ||
71 | } | ||
72 | }, | ||
73 | live: { | ||
74 | libx264: { | ||
75 | default: defaultX264LiveOptionsBuilder | ||
76 | }, | ||
77 | aac: { | ||
78 | default: defaultAACOptionsBuilder | ||
79 | } | ||
80 | } | ||
81 | } | ||
82 | } | ||
83 | |||
84 | export function getEncodersToTry () { | ||
85 | return { | ||
86 | vod: { | ||
87 | video: [ 'libx264' ], | ||
88 | audio: [ 'libfdk_aac', 'aac' ] | ||
89 | }, | ||
90 | |||
91 | live: { | ||
92 | video: [ 'libx264' ], | ||
93 | audio: [ 'libfdk_aac', 'aac' ] | ||
94 | } | ||
95 | } | ||
96 | } | ||
97 | |||
98 | // --------------------------------------------------------------------------- | ||
99 | |||
100 | function getTargetBitrate (options: { | ||
101 | inputBitrate: number | ||
102 | resolution: VideoResolution | ||
103 | ratio: number | ||
104 | fps: number | ||
105 | }) { | ||
106 | const { inputBitrate, resolution, ratio, fps } = options | ||
107 | |||
108 | const capped = capBitrate(inputBitrate, getAverageBitrate({ resolution, fps, ratio })) | ||
109 | const limit = getMinLimitBitrate({ resolution, fps, ratio }) | ||
110 | |||
111 | return Math.max(limit, capped) | ||
112 | } | ||
113 | |||
114 | function capBitrate (inputBitrate: number, targetBitrate: number) { | ||
115 | if (!inputBitrate) return targetBitrate | ||
116 | |||
117 | // Add 30% margin to input bitrate | ||
118 | const inputBitrateWithMargin = inputBitrate + (inputBitrate * 0.3) | ||
119 | |||
120 | return Math.min(targetBitrate, inputBitrateWithMargin) | ||
121 | } | ||
122 | |||
123 | function getCommonOutputOptions (targetBitrate: number, streamNum?: number) { | ||
124 | return [ | ||
125 | `-preset veryfast`, | ||
126 | `${buildStreamSuffix('-maxrate:v', streamNum)} ${targetBitrate}`, | ||
127 | `${buildStreamSuffix('-bufsize:v', streamNum)} ${targetBitrate * 2}`, | ||
128 | |||
129 | // NOTE: b-strategy 1 - heuristic algorithm, 16 is optimal B-frames for it | ||
130 | `-b_strategy 1`, | ||
131 | // NOTE: Why 16: https://github.com/Chocobozzz/PeerTube/pull/774. b-strategy 2 -> B-frames<16 | ||
132 | `-bf 16` | ||
133 | ] | ||
134 | } | ||