aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 15:05:27 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit1772b383de490cf406fe93ef3aa3a941f6db513c (patch)
tree7cecc404c8d71951c22079e9bf5180095981b7f9 /packages/peertube-runner/server
parent118626c8752bee7b05c4e0b668852e1aba2416f1 (diff)
downloadPeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.tar.gz
PeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.tar.zst
PeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.zip
Add peertube runner cli
Diffstat (limited to 'packages/peertube-runner/server')
-rw-r--r--packages/peertube-runner/server/index.ts1
-rw-r--r--packages/peertube-runner/server/process/index.ts2
-rw-r--r--packages/peertube-runner/server/process/process.ts30
-rw-r--r--packages/peertube-runner/server/process/shared/common.ts91
-rw-r--r--packages/peertube-runner/server/process/shared/index.ts4
-rw-r--r--packages/peertube-runner/server/process/shared/process-live.ts295
-rw-r--r--packages/peertube-runner/server/process/shared/process-vod.ts131
-rw-r--r--packages/peertube-runner/server/process/shared/transcoding-logger.ts10
-rw-r--r--packages/peertube-runner/server/process/shared/transcoding-profiles.ts134
-rw-r--r--packages/peertube-runner/server/server.ts269
10 files changed, 967 insertions, 0 deletions
diff --git a/packages/peertube-runner/server/index.ts b/packages/peertube-runner/server/index.ts
new file mode 100644
index 000000000..371836515
--- /dev/null
+++ b/packages/peertube-runner/server/index.ts
@@ -0,0 +1 @@
export * from './server'
diff --git a/packages/peertube-runner/server/process/index.ts b/packages/peertube-runner/server/process/index.ts
new file mode 100644
index 000000000..6caedbdaf
--- /dev/null
+++ b/packages/peertube-runner/server/process/index.ts
@@ -0,0 +1,2 @@
1export * from './shared'
2export * from './process'
diff --git a/packages/peertube-runner/server/process/process.ts b/packages/peertube-runner/server/process/process.ts
new file mode 100644
index 000000000..39a929c59
--- /dev/null
+++ b/packages/peertube-runner/server/process/process.ts
@@ -0,0 +1,30 @@
1import { logger } from 'packages/peertube-runner/shared/logger'
2import {
3 RunnerJobLiveRTMPHLSTranscodingPayload,
4 RunnerJobVODAudioMergeTranscodingPayload,
5 RunnerJobVODHLSTranscodingPayload,
6 RunnerJobVODWebVideoTranscodingPayload
7} from '@shared/models'
8import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared'
9import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live'
10
11export async function processJob (options: ProcessOptions) {
12 const { server, job } = options
13
14 logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload })
15
16 if (job.type === 'vod-audio-merge-transcoding') {
17 await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>)
18 } else if (job.type === 'vod-web-video-transcoding') {
19 await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>)
20 } else if (job.type === 'vod-hls-transcoding') {
21 await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>)
22 } else if (job.type === 'live-rtmp-hls-transcoding') {
23 await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process()
24 } else {
25 logger.error(`Unknown job ${job.type} to process`)
26 return
27 }
28
29 logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`)
30}
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts
new file mode 100644
index 000000000..9b2c40728
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/common.ts
@@ -0,0 +1,91 @@
1import { throttle } from 'lodash'
2import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared'
3import { join } from 'path'
4import { buildUUID } from '@shared/extra-utils'
5import { FFmpegLive, FFmpegVOD } from '@shared/ffmpeg'
6import { RunnerJob, RunnerJobPayload } from '@shared/models'
7import { PeerTubeServer } from '@shared/server-commands'
8import { getTranscodingLogger } from './transcoding-logger'
9import { getAvailableEncoders, getEncodersToTry } from './transcoding-profiles'
10
11export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string }
12
13export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = {
14 server: PeerTubeServer
15 job: JobWithToken<T>
16 runnerToken: string
17}
18
19export async function downloadInputFile (options: {
20 url: string
21 job: JobWithToken
22 runnerToken: string
23}) {
24 const { url, job, runnerToken } = options
25 const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
26
27 await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination })
28
29 return destination
30}
31
32export async function updateTranscodingProgress (options: {
33 server: PeerTubeServer
34 runnerToken: string
35 job: JobWithToken
36 progress: number
37}) {
38 const { server, job, runnerToken, progress } = options
39
40 return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress })
41}
42
43export function buildFFmpegVOD (options: {
44 server: PeerTubeServer
45 runnerToken: string
46 job: JobWithToken
47}) {
48 const { server, job, runnerToken } = options
49
50 const updateInterval = ConfigManager.Instance.isTestInstance()
51 ? 500
52 : 60000
53
54 const updateJobProgress = throttle((progress: number) => {
55 if (progress < 0 || progress > 100) progress = undefined
56
57 updateTranscodingProgress({ server, job, runnerToken, progress })
58 .catch(err => logger.error({ err }, 'Cannot send job progress'))
59 }, updateInterval, { trailing: false })
60
61 const config = ConfigManager.Instance.getConfig()
62
63 return new FFmpegVOD({
64 niceness: config.ffmpeg.nice,
65 threads: config.ffmpeg.threads,
66 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
67 profile: 'default',
68 availableEncoders: {
69 available: getAvailableEncoders(),
70 encodersToTry: getEncodersToTry()
71 },
72 logger: getTranscodingLogger(),
73 updateJobProgress
74 })
75}
76
77export function buildFFmpegLive () {
78 const config = ConfigManager.Instance.getConfig()
79
80 return new FFmpegLive({
81 niceness: config.ffmpeg.nice,
82 threads: config.ffmpeg.threads,
83 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
84 profile: 'default',
85 availableEncoders: {
86 available: getAvailableEncoders(),
87 encodersToTry: getEncodersToTry()
88 },
89 logger: getTranscodingLogger()
90 })
91}
diff --git a/packages/peertube-runner/server/process/shared/index.ts b/packages/peertube-runner/server/process/shared/index.ts
new file mode 100644
index 000000000..8e09a7869
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/index.ts
@@ -0,0 +1,4 @@
1export * from './common'
2export * from './process-vod'
3export * from './transcoding-logger'
4export * from './transcoding-profiles'
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts
new file mode 100644
index 000000000..5a3b596a2
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/process-live.ts
@@ -0,0 +1,295 @@
1import { FSWatcher, watch } from 'chokidar'
2import { FfmpegCommand } from 'fluent-ffmpeg'
3import { ensureDir, remove } from 'fs-extra'
4import { logger } from 'packages/peertube-runner/shared'
5import { basename, join } from 'path'
6import { wait } from '@shared/core-utils'
7import { buildUUID } from '@shared/extra-utils'
8import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 ServerErrorCode
15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegLive, ProcessOptions } from './common'
18
19export class ProcessLiveRTMPHLSTranscoding {
20
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
23
24 private readonly playlistsCreated = new Set<string>()
25 private allPlaylistsCreated = false
26
27 private ffmpegCommand: FfmpegCommand
28
29 private ended = false
30 private errored = false
31
32 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
33 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
34 }
35
36 process () {
37 const job = this.options.job
38 const payload = job.payload
39
40 return new Promise<void>(async (res, rej) => {
41 try {
42 await ensureDir(this.outputPath)
43
44 logger.info(`Probing ${payload.input.rtmpUrl}`)
45 const probe = await ffprobePromise(payload.input.rtmpUrl)
46 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
47
48 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
49 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
50 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
51
52 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
53 this.fsWatchers.push(m3u8Watcher)
54
55 const tsWatcher = watch(this.outputPath + '/*.ts')
56 this.fsWatchers.push(tsWatcher)
57
58 m3u8Watcher.on('change', p => {
59 logger.debug(`${p} m3u8 playlist changed`)
60 })
61
62 m3u8Watcher.on('add', p => {
63 this.playlistsCreated.add(p)
64
65 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
66 this.allPlaylistsCreated = true
67 logger.info('All m3u8 playlists are created.')
68 }
69 })
70
71 tsWatcher.on('add', p => {
72 this.sendAddedChunkUpdate(p)
73 .catch(err => this.onUpdateError(err, rej))
74 })
75
76 tsWatcher.on('unlink', p => {
77 this.sendDeletedChunkUpdate(p)
78 .catch(err => this.onUpdateError(err, rej))
79 })
80
81 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
82 inputUrl: payload.input.rtmpUrl,
83
84 outPath: this.outputPath,
85 masterPlaylistName: 'master.m3u8',
86
87 segmentListSize: payload.output.segmentListSize,
88 segmentDuration: payload.output.segmentDuration,
89
90 toTranscode: payload.output.toTranscode,
91
92 bitrate,
93 ratio,
94
95 hasAudio
96 })
97
98 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
99
100 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
101 this.onFFmpegError({ err, stdout, stderr })
102
103 res()
104 })
105
106 this.ffmpegCommand.on('end', () => {
107 this.onFFmpegEnded()
108 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
109
110 res()
111 })
112
113 this.ffmpegCommand.run()
114 } catch (err) {
115 rej(err)
116 }
117 })
118 }
119
120 // ---------------------------------------------------------------------------
121
122 private onUpdateError (err: Error, reject: (reason?: any) => void) {
123 if (this.errored) return
124 if (this.ended) return
125
126 this.errored = true
127
128 reject(err)
129 this.ffmpegCommand.kill('SIGINT')
130
131 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
132 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
133 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
134 } else {
135 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
136
137 this.sendError(err)
138 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
139 }
140
141 this.cleanup()
142 }
143
144 // ---------------------------------------------------------------------------
145
146 private onFFmpegError (options: {
147 err: any
148 stdout: string
149 stderr: string
150 }) {
151 const { err, stdout, stderr } = options
152
153 // Don't care that we killed the ffmpeg process
154 if (err?.message?.includes('Exiting normally')) return
155 if (this.errored) return
156 if (this.ended) return
157
158 this.errored = true
159
160 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
161
162 this.sendError(err)
163 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
164
165 this.cleanup()
166 }
167
168 private async sendError (err: Error) {
169 await this.options.server.runnerJobs.error({
170 jobToken: this.options.job.jobToken,
171 jobUUID: this.options.job.uuid,
172 runnerToken: this.options.runnerToken,
173 message: err.message
174 })
175 }
176
177 // ---------------------------------------------------------------------------
178
179 private async onFFmpegEnded () {
180 if (this.ended) return
181
182 this.ended = true
183 logger.info('FFmpeg ended, sending success to server')
184
185 // Wait last ffmpeg chunks generation
186 await wait(1500)
187
188 this.sendSuccess()
189 .catch(err => logger.error({ err }, 'Cannot send success'))
190
191 this.cleanup()
192 }
193
194 private async sendSuccess () {
195 const successBody: LiveRTMPHLSTranscodingSuccess = {}
196
197 await this.options.server.runnerJobs.success({
198 jobToken: this.options.job.jobToken,
199 jobUUID: this.options.job.uuid,
200 runnerToken: this.options.runnerToken,
201 payload: successBody
202 })
203 }
204
205 // ---------------------------------------------------------------------------
206
207 private sendDeletedChunkUpdate (deletedChunk: string) {
208 if (this.ended) return
209
210 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
211
212 const videoChunkFilename = basename(deletedChunk)
213
214 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
215 type: 'remove-chunk',
216 videoChunkFilename
217 }
218
219 if (this.allPlaylistsCreated) {
220 const playlistName = this.getPlaylistName(videoChunkFilename)
221
222 payload = {
223 ...payload,
224 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
225 resolutionPlaylistFilename: playlistName,
226 resolutionPlaylistFile: join(this.outputPath, playlistName)
227 }
228 }
229
230 return this.updateWithRetry(payload)
231 }
232
233 private sendAddedChunkUpdate (addedChunk: string) {
234 if (this.ended) return
235
236 logger.debug(`Sending added live chunk ${addedChunk} update`)
237
238 const videoChunkFilename = basename(addedChunk)
239
240 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
241 type: 'add-chunk',
242 videoChunkFilename,
243 videoChunkFile: addedChunk
244 }
245
246 if (this.allPlaylistsCreated) {
247 const playlistName = this.getPlaylistName(videoChunkFilename)
248
249 payload = {
250 ...payload,
251 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
252 resolutionPlaylistFilename: playlistName,
253 resolutionPlaylistFile: join(this.outputPath, playlistName)
254 }
255 }
256
257 return this.updateWithRetry(payload)
258 }
259
260 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) {
261 if (this.ended || this.errored) return
262
263 try {
264 await this.options.server.runnerJobs.update({
265 jobToken: this.options.job.jobToken,
266 jobUUID: this.options.job.uuid,
267 runnerToken: this.options.runnerToken,
268 payload
269 })
270 } catch (err) {
271 if (currentTry >= 3) throw err
272
273 logger.warn({ err }, 'Will retry update after error')
274 await wait(250)
275
276 return this.updateWithRetry(payload, currentTry + 1)
277 }
278 }
279
280 private getPlaylistName (videoChunkFilename: string) {
281 return `${videoChunkFilename.split('-')[0]}.m3u8`
282 }
283
284 // ---------------------------------------------------------------------------
285
286 private cleanup () {
287 for (const fsWatcher of this.fsWatchers) {
288 fsWatcher.close()
289 .catch(err => logger.error({ err }, 'Cannot close watcher'))
290 }
291
292 remove(this.outputPath)
293 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
294 }
295}
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts
new file mode 100644
index 000000000..aae61e9c5
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/process-vod.ts
@@ -0,0 +1,131 @@
1import { remove } from 'fs-extra'
2import { join } from 'path'
3import { buildUUID } from '@shared/extra-utils'
4import {
5 RunnerJobVODAudioMergeTranscodingPayload,
6 RunnerJobVODHLSTranscodingPayload,
7 RunnerJobVODWebVideoTranscodingPayload,
8 VODAudioMergeTranscodingSuccess,
9 VODHLSTranscodingSuccess,
10 VODWebVideoTranscodingSuccess
11} from '@shared/models'
12import { ConfigManager } from '../../../shared/config-manager'
13import { buildFFmpegVOD, downloadInputFile, ProcessOptions } from './common'
14
15export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) {
16 const { server, job, runnerToken } = options
17 const payload = job.payload
18
19 const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
20
21 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken })
22
23 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
24
25 await ffmpegVod.transcode({
26 type: 'video',
27
28 inputPath,
29
30 outputPath,
31
32 inputFileMutexReleaser: () => {},
33
34 resolution: payload.output.resolution,
35 fps: payload.output.fps
36 })
37
38 const successBody: VODWebVideoTranscodingSuccess = {
39 videoFile: outputPath
40 }
41
42 await server.runnerJobs.success({
43 jobToken: job.jobToken,
44 jobUUID: job.uuid,
45 runnerToken,
46 payload: successBody
47 })
48
49 await remove(outputPath)
50}
51
52export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) {
53 const { server, job, runnerToken } = options
54 const payload = job.payload
55
56 const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
57 const uuid = buildUUID()
58
59 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`)
60 const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4`
61 const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename))
62
63 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken })
64
65 await ffmpegVod.transcode({
66 type: 'hls',
67 copyCodecs: false,
68 inputPath,
69 hlsPlaylist: { videoFilename },
70 outputPath,
71
72 inputFileMutexReleaser: () => {},
73
74 resolution: payload.output.resolution,
75 fps: payload.output.fps
76 })
77
78 const successBody: VODHLSTranscodingSuccess = {
79 resolutionPlaylistFile: outputPath,
80 videoFile: videoPath
81 }
82
83 await server.runnerJobs.success({
84 jobToken: job.jobToken,
85 jobUUID: job.uuid,
86 runnerToken,
87 payload: successBody
88 })
89
90 await remove(outputPath)
91 await remove(videoPath)
92}
93
94export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) {
95 const { server, job, runnerToken } = options
96 const payload = job.payload
97
98 const audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
99 const inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
100
101 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
102
103 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken })
104
105 await ffmpegVod.transcode({
106 type: 'merge-audio',
107
108 audioPath,
109 inputPath,
110
111 outputPath,
112
113 inputFileMutexReleaser: () => {},
114
115 resolution: payload.output.resolution,
116 fps: payload.output.fps
117 })
118
119 const successBody: VODAudioMergeTranscodingSuccess = {
120 videoFile: outputPath
121 }
122
123 await server.runnerJobs.success({
124 jobToken: job.jobToken,
125 jobUUID: job.uuid,
126 runnerToken,
127 payload: successBody
128 })
129
130 await remove(outputPath)
131}
diff --git a/packages/peertube-runner/server/process/shared/transcoding-logger.ts b/packages/peertube-runner/server/process/shared/transcoding-logger.ts
new file mode 100644
index 000000000..d0f928914
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/transcoding-logger.ts
@@ -0,0 +1,10 @@
1import { logger } from 'packages/peertube-runner/shared/logger'
2
3export function getTranscodingLogger () {
4 return {
5 info: logger.info.bind(logger),
6 debug: logger.debug.bind(logger),
7 warn: logger.warn.bind(logger),
8 error: logger.error.bind(logger)
9 }
10}
diff --git a/packages/peertube-runner/server/process/shared/transcoding-profiles.ts b/packages/peertube-runner/server/process/shared/transcoding-profiles.ts
new file mode 100644
index 000000000..492d17d6a
--- /dev/null
+++ b/packages/peertube-runner/server/process/shared/transcoding-profiles.ts
@@ -0,0 +1,134 @@
1import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils'
2import { buildStreamSuffix, ffprobePromise, getAudioStream, getMaxAudioBitrate } from '@shared/ffmpeg'
3import { EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '@shared/models'
4
5const defaultX264VODOptionsBuilder: EncoderOptionsBuilder = (options: EncoderOptionsBuilderParams) => {
6 const { fps, inputRatio, inputBitrate, resolution } = options
7
8 const targetBitrate = getTargetBitrate({ inputBitrate, ratio: inputRatio, fps, resolution })
9
10 return {
11 outputOptions: [
12 ...getCommonOutputOptions(targetBitrate),
13
14 `-r ${fps}`
15 ]
16 }
17}
18
19const defaultX264LiveOptionsBuilder: EncoderOptionsBuilder = (options: EncoderOptionsBuilderParams) => {
20 const { streamNum, fps, inputBitrate, inputRatio, resolution } = options
21
22 const targetBitrate = getTargetBitrate({ inputBitrate, ratio: inputRatio, fps, resolution })
23
24 return {
25 outputOptions: [
26 ...getCommonOutputOptions(targetBitrate, streamNum),
27
28 `${buildStreamSuffix('-r:v', streamNum)} ${fps}`,
29 `${buildStreamSuffix('-b:v', streamNum)} ${targetBitrate}`
30 ]
31 }
32}
33
34const defaultAACOptionsBuilder: EncoderOptionsBuilder = async ({ input, streamNum, canCopyAudio }) => {
35 const probe = await ffprobePromise(input)
36
37 const parsedAudio = await getAudioStream(input, probe)
38
39 // We try to reduce the ceiling bitrate by making rough matches of bitrates
40 // Of course this is far from perfect, but it might save some space in the end
41
42 const audioCodecName = parsedAudio.audioStream['codec_name']
43
44 const bitrate = getMaxAudioBitrate(audioCodecName, parsedAudio.bitrate)
45
46 // Force stereo as it causes some issues with HLS playback in Chrome
47 const base = [ '-channel_layout', 'stereo' ]
48
49 if (bitrate !== -1) {
50 return { outputOptions: base.concat([ buildStreamSuffix('-b:a', streamNum), bitrate + 'k' ]) }
51 }
52
53 return { outputOptions: base }
54}
55
56const defaultLibFDKAACVODOptionsBuilder: EncoderOptionsBuilder = ({ streamNum }) => {
57 return { outputOptions: [ buildStreamSuffix('-q:a', streamNum), '5' ] }
58}
59
60export function getAvailableEncoders () {
61 return {
62 vod: {
63 libx264: {
64 default: defaultX264VODOptionsBuilder
65 },
66 aac: {
67 default: defaultAACOptionsBuilder
68 },
69 libfdk_aac: {
70 default: defaultLibFDKAACVODOptionsBuilder
71 }
72 },
73 live: {
74 libx264: {
75 default: defaultX264LiveOptionsBuilder
76 },
77 aac: {
78 default: defaultAACOptionsBuilder
79 }
80 }
81 }
82}
83
84export function getEncodersToTry () {
85 return {
86 vod: {
87 video: [ 'libx264' ],
88 audio: [ 'libfdk_aac', 'aac' ]
89 },
90
91 live: {
92 video: [ 'libx264' ],
93 audio: [ 'libfdk_aac', 'aac' ]
94 }
95 }
96}
97
98// ---------------------------------------------------------------------------
99
100function getTargetBitrate (options: {
101 inputBitrate: number
102 resolution: VideoResolution
103 ratio: number
104 fps: number
105}) {
106 const { inputBitrate, resolution, ratio, fps } = options
107
108 const capped = capBitrate(inputBitrate, getAverageBitrate({ resolution, fps, ratio }))
109 const limit = getMinLimitBitrate({ resolution, fps, ratio })
110
111 return Math.max(limit, capped)
112}
113
114function capBitrate (inputBitrate: number, targetBitrate: number) {
115 if (!inputBitrate) return targetBitrate
116
117 // Add 30% margin to input bitrate
118 const inputBitrateWithMargin = inputBitrate + (inputBitrate * 0.3)
119
120 return Math.min(targetBitrate, inputBitrateWithMargin)
121}
122
123function getCommonOutputOptions (targetBitrate: number, streamNum?: number) {
124 return [
125 `-preset veryfast`,
126 `${buildStreamSuffix('-maxrate:v', streamNum)} ${targetBitrate}`,
127 `${buildStreamSuffix('-bufsize:v', streamNum)} ${targetBitrate * 2}`,
128
129 // NOTE: b-strategy 1 - heuristic algorithm, 16 is optimal B-frames for it
130 `-b_strategy 1`,
131 // NOTE: Why 16: https://github.com/Chocobozzz/PeerTube/pull/774. b-strategy 2 -> B-frames<16
132 `-bf 16`
133 ]
134}
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts
new file mode 100644
index 000000000..724f359bd
--- /dev/null
+++ b/packages/peertube-runner/server/server.ts
@@ -0,0 +1,269 @@
1import { ensureDir, readdir, remove } from 'fs-extra'
2import { join } from 'path'
3import { io, Socket } from 'socket.io-client'
4import { pick } from '@shared/core-utils'
5import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models'
6import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands'
7import { ConfigManager } from '../shared'
8import { IPCServer } from '../shared/ipc'
9import { logger } from '../shared/logger'
10import { JobWithToken, processJob } from './process'
11
12type PeerTubeServer = PeerTubeServerCommand & {
13 runnerToken: string
14 runnerName: string
15 runnerDescription?: string
16}
17
18export class RunnerServer {
19 private static instance: RunnerServer
20
21 private servers: PeerTubeServer[] = []
22 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
23
24 private checkingAvailableJobs = false
25
26 private readonly sockets = new Map<PeerTubeServer, Socket>()
27
28 private constructor () {}
29
30 async run () {
31 logger.info('Running PeerTube runner in server mode')
32
33 await ConfigManager.Instance.load()
34
35 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
36 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
37
38 this.loadServer(Object.assign(serverCommand, registered))
39
40 logger.info(`Loading registered instance ${registered.url}`)
41 }
42
43 // Run IPC
44 const ipcServer = new IPCServer()
45 try {
46 await ipcServer.run(this)
47 } catch (err) {
48 console.error('Cannot start local socket for IPC communication', err)
49 process.exit(-1)
50 }
51
52 // Cleanup on exit
53 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
54 process.on(code, async () => {
55 await this.onExit()
56 })
57 }
58
59 // Process jobs
60 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
61 await this.cleanupTMP()
62
63 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
64
65 await this.checkAvailableJobs()
66 }
67
68 // ---------------------------------------------------------------------------
69
70 async registerRunner (options: {
71 url: string
72 registrationToken: string
73 runnerName: string
74 runnerDescription?: string
75 }) {
76 const { url, registrationToken, runnerName, runnerDescription } = options
77
78 logger.info(`Registering runner ${runnerName} on ${url}...`)
79
80 const serverCommand = new PeerTubeServerCommand({ url })
81 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
82
83 const server: PeerTubeServer = Object.assign(serverCommand, {
84 runnerToken,
85 runnerName,
86 runnerDescription
87 })
88
89 this.loadServer(server)
90 await this.saveRegisteredInstancesInConf()
91
92 logger.info(`Registered runner ${runnerName} on ${url}`)
93
94 await this.checkAvailableJobs()
95 }
96
97 private loadServer (server: PeerTubeServer) {
98 this.servers.push(server)
99
100 const url = server.url + '/runners'
101 const socket = io(url, {
102 auth: {
103 runnerToken: server.runnerToken
104 },
105 transports: [ 'websocket' ]
106 })
107
108 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
109 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
110 socket.on('available-jobs', () => this.checkAvailableJobs())
111
112 this.sockets.set(server, socket)
113 }
114
115 async unregisterRunner (options: {
116 url: string
117 }) {
118 const { url } = options
119
120 const server = this.servers.find(s => s.url === url)
121 if (!server) {
122 logger.error(`Unknown server ${url} to unregister`)
123 return
124 }
125
126 logger.info(`Unregistering runner ${server.runnerName} on ${url}...`)
127
128 try {
129 await server.runners.unregister({ runnerToken: server.runnerToken })
130 } catch (err) {
131 logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`)
132 }
133
134 this.unloadServer(server)
135 await this.saveRegisteredInstancesInConf()
136
137 logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`)
138 }
139
140 private unloadServer (server: PeerTubeServer) {
141 this.servers = this.servers.filter(s => s !== server)
142
143 const socket = this.sockets.get(server)
144 socket.disconnect()
145
146 this.sockets.delete(server)
147 }
148
149 listRegistered () {
150 return {
151 servers: this.servers.map(s => {
152 return {
153 url: s.url,
154 runnerName: s.runnerName,
155 runnerDescription: s.runnerDescription
156 }
157 })
158 }
159 }
160
161 // ---------------------------------------------------------------------------
162
163 private async checkAvailableJobs () {
164 if (this.checkingAvailableJobs) return
165
166 logger.info('Checking available jobs')
167
168 this.checkingAvailableJobs = true
169
170 for (const server of this.servers) {
171 try {
172 const job = await this.requestJob(server)
173 if (!job) continue
174
175 await this.tryToExecuteJobAsync(server, job)
176 } catch (err) {
177 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
178 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
179
180 await this.unregisterRunner({ url: server.url })
181 return
182 }
183
184 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
185 }
186 }
187
188 this.checkingAvailableJobs = false
189 }
190
191 private async requestJob (server: PeerTubeServer) {
192 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
193
194 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
195
196 if (availableJobs.length === 0) {
197 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
198 return undefined
199 }
200
201 return availableJobs[0]
202 }
203
204 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
205 if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
206
207 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
208
209 const processingJob = { job, server }
210 this.processingJobs.push(processingJob)
211
212 processJob({ server, job, runnerToken: server.runnerToken })
213 .catch(err => {
214 logger.error({ err }, 'Cannot process job')
215
216 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
217 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
218 })
219 .finally(() => {
220 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
221
222 return this.checkAvailableJobs()
223 })
224 }
225
226 // ---------------------------------------------------------------------------
227
228 private saveRegisteredInstancesInConf () {
229 const data = this.servers.map(s => {
230 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
231 })
232
233 return ConfigManager.Instance.setRegisteredInstances(data)
234 }
235
236 // ---------------------------------------------------------------------------
237
238 private async cleanupTMP () {
239 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
240
241 for (const file of files) {
242 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
243 }
244 }
245
246 private async onExit () {
247 try {
248 for (const { server, job } of this.processingJobs) {
249 await server.runnerJobs.abort({
250 jobToken: job.jobToken,
251 jobUUID: job.uuid,
252 reason: 'Runner stopped',
253 runnerToken: server.runnerToken
254 })
255 }
256
257 await this.cleanupTMP()
258 } catch (err) {
259 console.error(err)
260 process.exit(-1)
261 }
262
263 process.exit()
264 }
265
266 static get Instance () {
267 return this.instance || (this.instance = new this())
268 }
269}