diff options
Diffstat (limited to 'packages/peertube-runner/server')
10 files changed, 967 insertions, 0 deletions
diff --git a/packages/peertube-runner/server/index.ts b/packages/peertube-runner/server/index.ts new file mode 100644 index 000000000..371836515 --- /dev/null +++ b/packages/peertube-runner/server/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './server' | |||
diff --git a/packages/peertube-runner/server/process/index.ts b/packages/peertube-runner/server/process/index.ts new file mode 100644 index 000000000..6caedbdaf --- /dev/null +++ b/packages/peertube-runner/server/process/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './shared' | ||
2 | export * from './process' | ||
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts new file mode 100644 index 000000000..39a929c59 --- /dev/null +++ b/packages/peertube-runner/server/process/process.ts | |||
@@ -0,0 +1,30 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | ||
2 | import { | ||
3 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
4 | RunnerJobVODAudioMergeTranscodingPayload, | ||
5 | RunnerJobVODHLSTranscodingPayload, | ||
6 | RunnerJobVODWebVideoTranscodingPayload | ||
7 | } from '@shared/models' | ||
8 | import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared' | ||
9 | import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live' | ||
10 | |||
11 | export async function processJob (options: ProcessOptions) { | ||
12 | const { server, job } = options | ||
13 | |||
14 | logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload }) | ||
15 | |||
16 | if (job.type === 'vod-audio-merge-transcoding') { | ||
17 | await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) | ||
18 | } else if (job.type === 'vod-web-video-transcoding') { | ||
19 | await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) | ||
20 | } else if (job.type === 'vod-hls-transcoding') { | ||
21 | await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) | ||
22 | } else if (job.type === 'live-rtmp-hls-transcoding') { | ||
23 | await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() | ||
24 | } else { | ||
25 | logger.error(`Unknown job ${job.type} to process`) | ||
26 | return | ||
27 | } | ||
28 | |||
29 | logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`) | ||
30 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts new file mode 100644 index 000000000..9b2c40728 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/common.ts | |||
@@ -0,0 +1,91 @@ | |||
1 | import { throttle } from 'lodash' | ||
2 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' | ||
3 | import { join } from 'path' | ||
4 | import { buildUUID } from '@shared/extra-utils' | ||
5 | import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg' | ||
6 | import { RunnerJob, RunnerJobPayload } from '@shared/models' | ||
7 | import { PeerTubeServer } from '@shared/server-commands' | ||
8 | import { getTranscodingLogger } from './transcoding-logger' | ||
9 | import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles' | ||
10 | |||
11 | export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } | ||
12 | |||
13 | export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = { | ||
14 | server: PeerTubeServer | ||
15 | job: JobWithToken<T> | ||
16 | runnerToken: string | ||
17 | } | ||
18 | |||
19 | export async function downloadInputFile (options: { | ||
20 | url: string | ||
21 | job: JobWithToken | ||
22 | runnerToken: string | ||
23 | }) { | ||
24 | const { url, job, runnerToken } = options | ||
25 | const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
26 | |||
27 | await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) | ||
28 | |||
29 | return destination | ||
30 | } | ||
31 | |||
32 | export async function updateTranscodingProgress (options: { | ||
33 | server: PeerTubeServer | ||
34 | runnerToken: string | ||
35 | job: JobWithToken | ||
36 | progress: number | ||
37 | }) { | ||
38 | const { server, job, runnerToken, progress } = options | ||
39 | |||
40 | return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) | ||
41 | } | ||
42 | |||
43 | export function buildFFmpegVOD (options: { | ||
44 | server: PeerTubeServer | ||
45 | runnerToken: string | ||
46 | job: JobWithToken | ||
47 | }) { | ||
48 | const { server, job, runnerToken } = options | ||
49 | |||
50 | const updateInterval = ConfigManager.Instance.isTestInstance() | ||
51 | ? 500 | ||
52 | : 60000 | ||
53 | |||
54 | const updateJobProgress = throttle((progress: number) => { | ||
55 | if (progress < 0 || progress > 100) progress = undefined | ||
56 | |||
57 | updateTranscodingProgress({ server, job, runnerToken, progress }) | ||
58 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | ||
59 | }, updateInterval, { trailing: false }) | ||
60 | |||
61 | const config = ConfigManager.Instance.getConfig() | ||
62 | |||
63 | return new FFmpegVOD({ | ||
64 | niceness: config.ffmpeg.nice, | ||
65 | threads: config.ffmpeg.threads, | ||
66 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
67 | profile: 'default', | ||
68 | availableEncoders: { | ||
69 | available: getAvailableEncoders(), | ||
70 | encodersToTry: getEncodersToTry() | ||
71 | }, | ||
72 | logger: getTranscodingLogger(), | ||
73 | updateJobProgress | ||
74 | }) | ||
75 | } | ||
76 | |||
77 | export function buildFFmpegLive () { | ||
78 | const config = ConfigManager.Instance.getConfig() | ||
79 | |||
80 | return new FFmpegLive({ | ||
81 | niceness: config.ffmpeg.nice, | ||
82 | threads: config.ffmpeg.threads, | ||
83 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
84 | profile: 'default', | ||
85 | availableEncoders: { | ||
86 | available: getAvailableEncoders(), | ||
87 | encodersToTry: getEncodersToTry() | ||
88 | }, | ||
89 | logger: getTranscodingLogger() | ||
90 | }) | ||
91 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/index.ts b/packages/peertube-runner/server/process/shared/index.ts new file mode 100644 index 000000000..8e09a7869 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/index.ts | |||
@@ -0,0 +1,4 @@ | |||
1 | export * from './common' | ||
2 | export * from './process-vod' | ||
3 | export * from './transcoding-logger' | ||
4 | export * from './transcoding-profiles' | ||
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts new file mode 100644 index 000000000..5a3b596a2 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -0,0 +1,295 @@ | |||
1 | import { FSWatcher, watch } from 'chokidar' | ||
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
3 | import { ensureDir, remove } from 'fs-extra' | ||
4 | import { logger } from 'packages/peertube-runner/shared' | ||
5 | import { basename, join } from 'path' | ||
6 | import { wait } from '@shared/core-utils' | ||
7 | import { buildUUID } from '@shared/extra-utils' | ||
8 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' | ||
9 | import { | ||
10 | LiveRTMPHLSTranscodingSuccess, | ||
11 | LiveRTMPHLSTranscodingUpdatePayload, | ||
12 | PeerTubeProblemDocument, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
14 | ServerErrorCode | ||
15 | } from '@shared/models' | ||
16 | import { ConfigManager } from '../../../shared/config-manager' | ||
17 | import { buildFFmpegLive, ProcessOptions } from './common' | ||
18 | |||
19 | export class ProcessLiveRTMPHLSTranscoding { | ||
20 | |||
21 | private readonly outputPath: string | ||
22 | private readonly fsWatchers: FSWatcher[] = [] | ||
23 | |||
24 | private readonly playlistsCreated = new Set<string>() | ||
25 | private allPlaylistsCreated = false | ||
26 | |||
27 | private ffmpegCommand: FfmpegCommand | ||
28 | |||
29 | private ended = false | ||
30 | private errored = false | ||
31 | |||
32 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | ||
33 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
34 | } | ||
35 | |||
36 | process () { | ||
37 | const job = this.options.job | ||
38 | const payload = job.payload | ||
39 | |||
40 | return new Promise<void>(async (res, rej) => { | ||
41 | try { | ||
42 | await ensureDir(this.outputPath) | ||
43 | |||
44 | logger.info(`Probing ${payload.input.rtmpUrl}`) | ||
45 | const probe = await ffprobePromise(payload.input.rtmpUrl) | ||
46 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | ||
47 | |||
48 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | ||
49 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | ||
50 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | ||
51 | |||
52 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | ||
53 | this.fsWatchers.push(m3u8Watcher) | ||
54 | |||
55 | const tsWatcher = watch(this.outputPath + '/*.ts') | ||
56 | this.fsWatchers.push(tsWatcher) | ||
57 | |||
58 | m3u8Watcher.on('change', p => { | ||
59 | logger.debug(`${p} m3u8 playlist changed`) | ||
60 | }) | ||
61 | |||
62 | m3u8Watcher.on('add', p => { | ||
63 | this.playlistsCreated.add(p) | ||
64 | |||
65 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | ||
66 | this.allPlaylistsCreated = true | ||
67 | logger.info('All m3u8 playlists are created.') | ||
68 | } | ||
69 | }) | ||
70 | |||
71 | tsWatcher.on('add', p => { | ||
72 | this.sendAddedChunkUpdate(p) | ||
73 | .catch(err => this.onUpdateError(err, rej)) | ||
74 | }) | ||
75 | |||
76 | tsWatcher.on('unlink', p => { | ||
77 | this.sendDeletedChunkUpdate(p) | ||
78 | .catch(err => this.onUpdateError(err, rej)) | ||
79 | }) | ||
80 | |||
81 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | ||
82 | inputUrl: payload.input.rtmpUrl, | ||
83 | |||
84 | outPath: this.outputPath, | ||
85 | masterPlaylistName: 'master.m3u8', | ||
86 | |||
87 | segmentListSize: payload.output.segmentListSize, | ||
88 | segmentDuration: payload.output.segmentDuration, | ||
89 | |||
90 | toTranscode: payload.output.toTranscode, | ||
91 | |||
92 | bitrate, | ||
93 | ratio, | ||
94 | |||
95 | hasAudio | ||
96 | }) | ||
97 | |||
98 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | ||
99 | |||
100 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
101 | this.onFFmpegError({ err, stdout, stderr }) | ||
102 | |||
103 | res() | ||
104 | }) | ||
105 | |||
106 | this.ffmpegCommand.on('end', () => { | ||
107 | this.onFFmpegEnded() | ||
108 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | ||
109 | |||
110 | res() | ||
111 | }) | ||
112 | |||
113 | this.ffmpegCommand.run() | ||
114 | } catch (err) { | ||
115 | rej(err) | ||
116 | } | ||
117 | }) | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | private onUpdateError (err: Error, reject: (reason?: any) => void) { | ||
123 | if (this.errored) return | ||
124 | if (this.ended) return | ||
125 | |||
126 | this.errored = true | ||
127 | |||
128 | reject(err) | ||
129 | this.ffmpegCommand.kill('SIGINT') | ||
130 | |||
131 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | ||
132 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
133 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | ||
134 | } else { | ||
135 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | ||
136 | |||
137 | this.sendError(err) | ||
138 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
139 | } | ||
140 | |||
141 | this.cleanup() | ||
142 | } | ||
143 | |||
144 | // --------------------------------------------------------------------------- | ||
145 | |||
146 | private onFFmpegError (options: { | ||
147 | err: any | ||
148 | stdout: string | ||
149 | stderr: string | ||
150 | }) { | ||
151 | const { err, stdout, stderr } = options | ||
152 | |||
153 | // Don't care that we killed the ffmpeg process | ||
154 | if (err?.message?.includes('Exiting normally')) return | ||
155 | if (this.errored) return | ||
156 | if (this.ended) return | ||
157 | |||
158 | this.errored = true | ||
159 | |||
160 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | ||
161 | |||
162 | this.sendError(err) | ||
163 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
164 | |||
165 | this.cleanup() | ||
166 | } | ||
167 | |||
168 | private async sendError (err: Error) { | ||
169 | await this.options.server.runnerJobs.error({ | ||
170 | jobToken: this.options.job.jobToken, | ||
171 | jobUUID: this.options.job.uuid, | ||
172 | runnerToken: this.options.runnerToken, | ||
173 | message: err.message | ||
174 | }) | ||
175 | } | ||
176 | |||
177 | // --------------------------------------------------------------------------- | ||
178 | |||
179 | private async onFFmpegEnded () { | ||
180 | if (this.ended) return | ||
181 | |||
182 | this.ended = true | ||
183 | logger.info('FFmpeg ended, sending success to server') | ||
184 | |||
185 | // Wait last ffmpeg chunks generation | ||
186 | await wait(1500) | ||
187 | |||
188 | this.sendSuccess() | ||
189 | .catch(err => logger.error({ err }, 'Cannot send success')) | ||
190 | |||
191 | this.cleanup() | ||
192 | } | ||
193 | |||
194 | private async sendSuccess () { | ||
195 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | ||
196 | |||
197 | await this.options.server.runnerJobs.success({ | ||
198 | jobToken: this.options.job.jobToken, | ||
199 | jobUUID: this.options.job.uuid, | ||
200 | runnerToken: this.options.runnerToken, | ||
201 | payload: successBody | ||
202 | }) | ||
203 | } | ||
204 | |||
205 | // --------------------------------------------------------------------------- | ||
206 | |||
207 | private sendDeletedChunkUpdate (deletedChunk: string) { | ||
208 | if (this.ended) return | ||
209 | |||
210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | ||
211 | |||
212 | const videoChunkFilename = basename(deletedChunk) | ||
213 | |||
214 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
215 | type: 'remove-chunk', | ||
216 | videoChunkFilename | ||
217 | } | ||
218 | |||
219 | if (this.allPlaylistsCreated) { | ||
220 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
221 | |||
222 | payload = { | ||
223 | ...payload, | ||
224 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
225 | resolutionPlaylistFilename: playlistName, | ||
226 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
227 | } | ||
228 | } | ||
229 | |||
230 | return this.updateWithRetry(payload) | ||
231 | } | ||
232 | |||
233 | private sendAddedChunkUpdate (addedChunk: string) { | ||
234 | if (this.ended) return | ||
235 | |||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | ||
237 | |||
238 | const videoChunkFilename = basename(addedChunk) | ||
239 | |||
240 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
241 | type: 'add-chunk', | ||
242 | videoChunkFilename, | ||
243 | videoChunkFile: addedChunk | ||
244 | } | ||
245 | |||
246 | if (this.allPlaylistsCreated) { | ||
247 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
248 | |||
249 | payload = { | ||
250 | ...payload, | ||
251 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
252 | resolutionPlaylistFilename: playlistName, | ||
253 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
254 | } | ||
255 | } | ||
256 | |||
257 | return this.updateWithRetry(payload) | ||
258 | } | ||
259 | |||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { | ||
261 | if (this.ended || this.errored) return | ||
262 | |||
263 | try { | ||
264 | await this.options.server.runnerJobs.update({ | ||
265 | jobToken: this.options.job.jobToken, | ||
266 | jobUUID: this.options.job.uuid, | ||
267 | runnerToken: this.options.runnerToken, | ||
268 | payload | ||
269 | }) | ||
270 | } catch (err) { | ||
271 | if (currentTry >= 3) throw err | ||
272 | |||
273 | logger.warn({ err }, 'Will retry update after error') | ||
274 | await wait(250) | ||
275 | |||
276 | return this.updateWithRetry(payload, currentTry + 1) | ||
277 | } | ||
278 | } | ||
279 | |||
280 | private getPlaylistName (videoChunkFilename: string) { | ||
281 | return `${videoChunkFilename.split('-')[0]}.m3u8` | ||
282 | } | ||
283 | |||
284 | // --------------------------------------------------------------------------- | ||
285 | |||
286 | private cleanup () { | ||
287 | for (const fsWatcher of this.fsWatchers) { | ||
288 | fsWatcher.close() | ||
289 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | ||
290 | } | ||
291 | |||
292 | remove(this.outputPath) | ||
293 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | ||
294 | } | ||
295 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts new file mode 100644 index 000000000..aae61e9c5 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-vod.ts | |||
@@ -0,0 +1,131 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { buildUUID } from '@shared/extra-utils' | ||
4 | import { | ||
5 | RunnerJobVODAudioMergeTranscodingPayload, | ||
6 | RunnerJobVODHLSTranscodingPayload, | ||
7 | RunnerJobVODWebVideoTranscodingPayload, | ||
8 | VODAudioMergeTranscodingSuccess, | ||
9 | VODHLSTranscodingSuccess, | ||
10 | VODWebVideoTranscodingSuccess | ||
11 | } from '@shared/models' | ||
12 | import { ConfigManager } from '../../../shared/config-manager' | ||
13 | import { buildFFmpegVOD, downloadInputFile, ProcessOptions } from './common' | ||
14 | |||
15 | export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { | ||
16 | const { server, job, runnerToken } = options | ||
17 | const payload = job.payload | ||
18 | |||
19 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
20 | |||
21 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | ||
22 | |||
23 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
24 | |||
25 | await ffmpegVod.transcode({ | ||
26 | type: 'video', | ||
27 | |||
28 | inputPath, | ||
29 | |||
30 | outputPath, | ||
31 | |||
32 | inputFileMutexReleaser: () => {}, | ||
33 | |||
34 | resolution: payload.output.resolution, | ||
35 | fps: payload.output.fps | ||
36 | }) | ||
37 | |||
38 | const successBody: VODWebVideoTranscodingSuccess = { | ||
39 | videoFile: outputPath | ||
40 | } | ||
41 | |||
42 | await server.runnerJobs.success({ | ||
43 | jobToken: job.jobToken, | ||
44 | jobUUID: job.uuid, | ||
45 | runnerToken, | ||
46 | payload: successBody | ||
47 | }) | ||
48 | |||
49 | await remove(outputPath) | ||
50 | } | ||
51 | |||
52 | export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) { | ||
53 | const { server, job, runnerToken } = options | ||
54 | const payload = job.payload | ||
55 | |||
56 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
57 | const uuid = buildUUID() | ||
58 | |||
59 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) | ||
60 | const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` | ||
61 | const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) | ||
62 | |||
63 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | ||
64 | |||
65 | await ffmpegVod.transcode({ | ||
66 | type: 'hls', | ||
67 | copyCodecs: false, | ||
68 | inputPath, | ||
69 | hlsPlaylist: { videoFilename }, | ||
70 | outputPath, | ||
71 | |||
72 | inputFileMutexReleaser: () => {}, | ||
73 | |||
74 | resolution: payload.output.resolution, | ||
75 | fps: payload.output.fps | ||
76 | }) | ||
77 | |||
78 | const successBody: VODHLSTranscodingSuccess = { | ||
79 | resolutionPlaylistFile: outputPath, | ||
80 | videoFile: videoPath | ||
81 | } | ||
82 | |||
83 | await server.runnerJobs.success({ | ||
84 | jobToken: job.jobToken, | ||
85 | jobUUID: job.uuid, | ||
86 | runnerToken, | ||
87 | payload: successBody | ||
88 | }) | ||
89 | |||
90 | await remove(outputPath) | ||
91 | await remove(videoPath) | ||
92 | } | ||
93 | |||
94 | export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { | ||
95 | const { server, job, runnerToken } = options | ||
96 | const payload = job.payload | ||
97 | |||
98 | const audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) | ||
99 | const inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) | ||
100 | |||
101 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
102 | |||
103 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | ||
104 | |||
105 | await ffmpegVod.transcode({ | ||
106 | type: 'merge-audio', | ||
107 | |||
108 | audioPath, | ||
109 | inputPath, | ||
110 | |||
111 | outputPath, | ||
112 | |||
113 | inputFileMutexReleaser: () => {}, | ||
114 | |||
115 | resolution: payload.output.resolution, | ||
116 | fps: payload.output.fps | ||
117 | }) | ||
118 | |||
119 | const successBody: VODAudioMergeTranscodingSuccess = { | ||
120 | videoFile: outputPath | ||
121 | } | ||
122 | |||
123 | await server.runnerJobs.success({ | ||
124 | jobToken: job.jobToken, | ||
125 | jobUUID: job.uuid, | ||
126 | runnerToken, | ||
127 | payload: successBody | ||
128 | }) | ||
129 | |||
130 | await remove(outputPath) | ||
131 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/transcoding-logger.ts b/packages/peertube-runner/server/process/shared/transcoding-logger.ts new file mode 100644 index 000000000..d0f928914 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/transcoding-logger.ts | |||
@@ -0,0 +1,10 @@ | |||
1 | import { logger } from 'packages/peertube-runner/shared/logger' | ||
2 | |||
3 | export function getTranscodingLogger () { | ||
4 | return { | ||
5 | info: logger.info.bind(logger), | ||
6 | debug: logger.debug.bind(logger), | ||
7 | warn: logger.warn.bind(logger), | ||
8 | error: logger.error.bind(logger) | ||
9 | } | ||
10 | } | ||
diff --git a/packages/peertube-runner/server/process/shared/transcoding-profiles.ts b/packages/peertube-runner/server/process/shared/transcoding-profiles.ts new file mode 100644 index 000000000..492d17d6a --- /dev/null +++ b/packages/peertube-runner/server/process/shared/transcoding-profiles.ts | |||
@@ -0,0 +1,134 @@ | |||
1 | import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils' | ||
2 | import { buildStreamSuffix, ffprobePromise, getAudioStream, getMaxAudioBitrate } from '@shared/ffmpeg' | ||
3 | import { EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '@shared/models' | ||
4 | |||
5 | const defaultX264VODOptionsBuilder: EncoderOptionsBuilder = (options: EncoderOptionsBuilderParams) => { | ||
6 | const { fps, inputRatio, inputBitrate, resolution } = options | ||
7 | |||
8 | const targetBitrate = getTargetBitrate({ inputBitrate, ratio: inputRatio, fps, resolution }) | ||
9 | |||
10 | return { | ||
11 | outputOptions: [ | ||
12 | ...getCommonOutputOptions(targetBitrate), | ||
13 | |||
14 | `-r ${fps}` | ||
15 | ] | ||
16 | } | ||
17 | } | ||
18 | |||
19 | const defaultX264LiveOptionsBuilder: EncoderOptionsBuilder = (options: EncoderOptionsBuilderParams) => { | ||
20 | const { streamNum, fps, inputBitrate, inputRatio, resolution } = options | ||
21 | |||
22 | const targetBitrate = getTargetBitrate({ inputBitrate, ratio: inputRatio, fps, resolution }) | ||
23 | |||
24 | return { | ||
25 | outputOptions: [ | ||
26 | ...getCommonOutputOptions(targetBitrate, streamNum), | ||
27 | |||
28 | `${buildStreamSuffix('-r:v', streamNum)} ${fps}`, | ||
29 | `${buildStreamSuffix('-b:v', streamNum)} ${targetBitrate}` | ||
30 | ] | ||
31 | } | ||
32 | } | ||
33 | |||
34 | const defaultAACOptionsBuilder: EncoderOptionsBuilder = async ({ input, streamNum, canCopyAudio }) => { | ||
35 | const probe = await ffprobePromise(input) | ||
36 | |||
37 | const parsedAudio = await getAudioStream(input, probe) | ||
38 | |||
39 | // We try to reduce the ceiling bitrate by making rough matches of bitrates | ||
40 | // Of course this is far from perfect, but it might save some space in the end | ||
41 | |||
42 | const audioCodecName = parsedAudio.audioStream['codec_name'] | ||
43 | |||
44 | const bitrate = getMaxAudioBitrate(audioCodecName, parsedAudio.bitrate) | ||
45 | |||
46 | // Force stereo as it causes some issues with HLS playback in Chrome | ||
47 | const base = [ '-channel_layout', 'stereo' ] | ||
48 | |||
49 | if (bitrate !== -1) { | ||
50 | return { outputOptions: base.concat([ buildStreamSuffix('-b:a', streamNum), bitrate + 'k' ]) } | ||
51 | } | ||
52 | |||
53 | return { outputOptions: base } | ||
54 | } | ||
55 | |||
56 | const defaultLibFDKAACVODOptionsBuilder: EncoderOptionsBuilder = ({ streamNum }) => { | ||
57 | return { outputOptions: [ buildStreamSuffix('-q:a', streamNum), '5' ] } | ||
58 | } | ||
59 | |||
60 | export function getAvailableEncoders () { | ||
61 | return { | ||
62 | vod: { | ||
63 | libx264: { | ||
64 | default: defaultX264VODOptionsBuilder | ||
65 | }, | ||
66 | aac: { | ||
67 | default: defaultAACOptionsBuilder | ||
68 | }, | ||
69 | libfdk_aac: { | ||
70 | default: defaultLibFDKAACVODOptionsBuilder | ||
71 | } | ||
72 | }, | ||
73 | live: { | ||
74 | libx264: { | ||
75 | default: defaultX264LiveOptionsBuilder | ||
76 | }, | ||
77 | aac: { | ||
78 | default: defaultAACOptionsBuilder | ||
79 | } | ||
80 | } | ||
81 | } | ||
82 | } | ||
83 | |||
84 | export function getEncodersToTry () { | ||
85 | return { | ||
86 | vod: { | ||
87 | video: [ 'libx264' ], | ||
88 | audio: [ 'libfdk_aac', 'aac' ] | ||
89 | }, | ||
90 | |||
91 | live: { | ||
92 | video: [ 'libx264' ], | ||
93 | audio: [ 'libfdk_aac', 'aac' ] | ||
94 | } | ||
95 | } | ||
96 | } | ||
97 | |||
98 | // --------------------------------------------------------------------------- | ||
99 | |||
100 | function getTargetBitrate (options: { | ||
101 | inputBitrate: number | ||
102 | resolution: VideoResolution | ||
103 | ratio: number | ||
104 | fps: number | ||
105 | }) { | ||
106 | const { inputBitrate, resolution, ratio, fps } = options | ||
107 | |||
108 | const capped = capBitrate(inputBitrate, getAverageBitrate({ resolution, fps, ratio })) | ||
109 | const limit = getMinLimitBitrate({ resolution, fps, ratio }) | ||
110 | |||
111 | return Math.max(limit, capped) | ||
112 | } | ||
113 | |||
114 | function capBitrate (inputBitrate: number, targetBitrate: number) { | ||
115 | if (!inputBitrate) return targetBitrate | ||
116 | |||
117 | // Add 30% margin to input bitrate | ||
118 | const inputBitrateWithMargin = inputBitrate + (inputBitrate * 0.3) | ||
119 | |||
120 | return Math.min(targetBitrate, inputBitrateWithMargin) | ||
121 | } | ||
122 | |||
123 | function getCommonOutputOptions (targetBitrate: number, streamNum?: number) { | ||
124 | return [ | ||
125 | `-preset veryfast`, | ||
126 | `${buildStreamSuffix('-maxrate:v', streamNum)} ${targetBitrate}`, | ||
127 | `${buildStreamSuffix('-bufsize:v', streamNum)} ${targetBitrate * 2}`, | ||
128 | |||
129 | // NOTE: b-strategy 1 - heuristic algorithm, 16 is optimal B-frames for it | ||
130 | `-b_strategy 1`, | ||
131 | // NOTE: Why 16: https://github.com/Chocobozzz/PeerTube/pull/774. b-strategy 2 -> B-frames<16 | ||
132 | `-bf 16` | ||
133 | ] | ||
134 | } | ||
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts new file mode 100644 index 000000000..724f359bd --- /dev/null +++ b/packages/peertube-runner/server/server.ts | |||
@@ -0,0 +1,269 @@ | |||
1 | import { ensureDir, readdir, remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { io, Socket } from 'socket.io-client' | ||
4 | import { pick } from '@shared/core-utils' | ||
5 | import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' | ||
6 | import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands' | ||
7 | import { ConfigManager } from '../shared' | ||
8 | import { IPCServer } from '../shared/ipc' | ||
9 | import { logger } from '../shared/logger' | ||
10 | import { JobWithToken, processJob } from './process' | ||
11 | |||
12 | type PeerTubeServer = PeerTubeServerCommand & { | ||
13 | runnerToken: string | ||
14 | runnerName: string | ||
15 | runnerDescription?: string | ||
16 | } | ||
17 | |||
18 | export class RunnerServer { | ||
19 | private static instance: RunnerServer | ||
20 | |||
21 | private servers: PeerTubeServer[] = [] | ||
22 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | ||
23 | |||
24 | private checkingAvailableJobs = false | ||
25 | |||
26 | private readonly sockets = new Map<PeerTubeServer, Socket>() | ||
27 | |||
28 | private constructor () {} | ||
29 | |||
30 | async run () { | ||
31 | logger.info('Running PeerTube runner in server mode') | ||
32 | |||
33 | await ConfigManager.Instance.load() | ||
34 | |||
35 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | ||
36 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | ||
37 | |||
38 | this.loadServer(Object.assign(serverCommand, registered)) | ||
39 | |||
40 | logger.info(`Loading registered instance ${registered.url}`) | ||
41 | } | ||
42 | |||
43 | // Run IPC | ||
44 | const ipcServer = new IPCServer() | ||
45 | try { | ||
46 | await ipcServer.run(this) | ||
47 | } catch (err) { | ||
48 | console.error('Cannot start local socket for IPC communication', err) | ||
49 | process.exit(-1) | ||
50 | } | ||
51 | |||
52 | // Cleanup on exit | ||
53 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | ||
54 | process.on(code, async () => { | ||
55 | await this.onExit() | ||
56 | }) | ||
57 | } | ||
58 | |||
59 | // Process jobs | ||
60 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | ||
61 | await this.cleanupTMP() | ||
62 | |||
63 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | ||
64 | |||
65 | await this.checkAvailableJobs() | ||
66 | } | ||
67 | |||
68 | // --------------------------------------------------------------------------- | ||
69 | |||
70 | async registerRunner (options: { | ||
71 | url: string | ||
72 | registrationToken: string | ||
73 | runnerName: string | ||
74 | runnerDescription?: string | ||
75 | }) { | ||
76 | const { url, registrationToken, runnerName, runnerDescription } = options | ||
77 | |||
78 | logger.info(`Registering runner ${runnerName} on ${url}...`) | ||
79 | |||
80 | const serverCommand = new PeerTubeServerCommand({ url }) | ||
81 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | ||
82 | |||
83 | const server: PeerTubeServer = Object.assign(serverCommand, { | ||
84 | runnerToken, | ||
85 | runnerName, | ||
86 | runnerDescription | ||
87 | }) | ||
88 | |||
89 | this.loadServer(server) | ||
90 | await this.saveRegisteredInstancesInConf() | ||
91 | |||
92 | logger.info(`Registered runner ${runnerName} on ${url}`) | ||
93 | |||
94 | await this.checkAvailableJobs() | ||
95 | } | ||
96 | |||
97 | private loadServer (server: PeerTubeServer) { | ||
98 | this.servers.push(server) | ||
99 | |||
100 | const url = server.url + '/runners' | ||
101 | const socket = io(url, { | ||
102 | auth: { | ||
103 | runnerToken: server.runnerToken | ||
104 | }, | ||
105 | transports: [ 'websocket' ] | ||
106 | }) | ||
107 | |||
108 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | ||
109 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | ||
110 | socket.on('available-jobs', () => this.checkAvailableJobs()) | ||
111 | |||
112 | this.sockets.set(server, socket) | ||
113 | } | ||
114 | |||
115 | async unregisterRunner (options: { | ||
116 | url: string | ||
117 | }) { | ||
118 | const { url } = options | ||
119 | |||
120 | const server = this.servers.find(s => s.url === url) | ||
121 | if (!server) { | ||
122 | logger.error(`Unknown server ${url} to unregister`) | ||
123 | return | ||
124 | } | ||
125 | |||
126 | logger.info(`Unregistering runner ${server.runnerName} on ${url}...`) | ||
127 | |||
128 | try { | ||
129 | await server.runners.unregister({ runnerToken: server.runnerToken }) | ||
130 | } catch (err) { | ||
131 | logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`) | ||
132 | } | ||
133 | |||
134 | this.unloadServer(server) | ||
135 | await this.saveRegisteredInstancesInConf() | ||
136 | |||
137 | logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`) | ||
138 | } | ||
139 | |||
140 | private unloadServer (server: PeerTubeServer) { | ||
141 | this.servers = this.servers.filter(s => s !== server) | ||
142 | |||
143 | const socket = this.sockets.get(server) | ||
144 | socket.disconnect() | ||
145 | |||
146 | this.sockets.delete(server) | ||
147 | } | ||
148 | |||
149 | listRegistered () { | ||
150 | return { | ||
151 | servers: this.servers.map(s => { | ||
152 | return { | ||
153 | url: s.url, | ||
154 | runnerName: s.runnerName, | ||
155 | runnerDescription: s.runnerDescription | ||
156 | } | ||
157 | }) | ||
158 | } | ||
159 | } | ||
160 | |||
161 | // --------------------------------------------------------------------------- | ||
162 | |||
163 | private async checkAvailableJobs () { | ||
164 | if (this.checkingAvailableJobs) return | ||
165 | |||
166 | logger.info('Checking available jobs') | ||
167 | |||
168 | this.checkingAvailableJobs = true | ||
169 | |||
170 | for (const server of this.servers) { | ||
171 | try { | ||
172 | const job = await this.requestJob(server) | ||
173 | if (!job) continue | ||
174 | |||
175 | await this.tryToExecuteJobAsync(server, job) | ||
176 | } catch (err) { | ||
177 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
178 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | ||
179 | |||
180 | await this.unregisterRunner({ url: server.url }) | ||
181 | return | ||
182 | } | ||
183 | |||
184 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | ||
185 | } | ||
186 | } | ||
187 | |||
188 | this.checkingAvailableJobs = false | ||
189 | } | ||
190 | |||
191 | private async requestJob (server: PeerTubeServer) { | ||
192 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | ||
193 | |||
194 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | ||
195 | |||
196 | if (availableJobs.length === 0) { | ||
197 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | ||
198 | return undefined | ||
199 | } | ||
200 | |||
201 | return availableJobs[0] | ||
202 | } | ||
203 | |||
204 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | ||
205 | if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return | ||
206 | |||
207 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | ||
208 | |||
209 | const processingJob = { job, server } | ||
210 | this.processingJobs.push(processingJob) | ||
211 | |||
212 | processJob({ server, job, runnerToken: server.runnerToken }) | ||
213 | .catch(err => { | ||
214 | logger.error({ err }, 'Cannot process job') | ||
215 | |||
216 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | ||
217 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | ||
218 | }) | ||
219 | .finally(() => { | ||
220 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | ||
221 | |||
222 | return this.checkAvailableJobs() | ||
223 | }) | ||
224 | } | ||
225 | |||
226 | // --------------------------------------------------------------------------- | ||
227 | |||
228 | private saveRegisteredInstancesInConf () { | ||
229 | const data = this.servers.map(s => { | ||
230 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | ||
231 | }) | ||
232 | |||
233 | return ConfigManager.Instance.setRegisteredInstances(data) | ||
234 | } | ||
235 | |||
236 | // --------------------------------------------------------------------------- | ||
237 | |||
238 | private async cleanupTMP () { | ||
239 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | ||
240 | |||
241 | for (const file of files) { | ||
242 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | ||
243 | } | ||
244 | } | ||
245 | |||
246 | private async onExit () { | ||
247 | try { | ||
248 | for (const { server, job } of this.processingJobs) { | ||
249 | await server.runnerJobs.abort({ | ||
250 | jobToken: job.jobToken, | ||
251 | jobUUID: job.uuid, | ||
252 | reason: 'Runner stopped', | ||
253 | runnerToken: server.runnerToken | ||
254 | }) | ||
255 | } | ||
256 | |||
257 | await this.cleanupTMP() | ||
258 | } catch (err) { | ||
259 | console.error(err) | ||
260 | process.exit(-1) | ||
261 | } | ||
262 | |||
263 | process.exit() | ||
264 | } | ||
265 | |||
266 | static get Instance () { | ||
267 | return this.instance || (this.instance = new this()) | ||
268 | } | ||
269 | } | ||