aboutsummaryrefslogtreecommitdiffhomepage
path: root/apps/peertube-runner/src
diff options
context:
space:
mode:
Diffstat (limited to 'apps/peertube-runner/src')
-rw-r--r--apps/peertube-runner/src/peertube-runner.ts91
-rw-r--r--apps/peertube-runner/src/register/index.ts1
-rw-r--r--apps/peertube-runner/src/register/register.ts36
-rw-r--r--apps/peertube-runner/src/server/index.ts1
-rw-r--r--apps/peertube-runner/src/server/process/index.ts2
-rw-r--r--apps/peertube-runner/src/server/process/process.ts34
-rw-r--r--apps/peertube-runner/src/server/process/shared/common.ts106
-rw-r--r--apps/peertube-runner/src/server/process/shared/index.ts3
-rw-r--r--apps/peertube-runner/src/server/process/shared/process-live.ts338
-rw-r--r--apps/peertube-runner/src/server/process/shared/process-studio.ts165
-rw-r--r--apps/peertube-runner/src/server/process/shared/process-vod.ts201
-rw-r--r--apps/peertube-runner/src/server/process/shared/transcoding-logger.ts10
-rw-r--r--apps/peertube-runner/src/server/server.ts307
-rw-r--r--apps/peertube-runner/src/server/shared/index.ts1
-rw-r--r--apps/peertube-runner/src/server/shared/supported-job.ts43
-rw-r--r--apps/peertube-runner/src/shared/config-manager.ts140
-rw-r--r--apps/peertube-runner/src/shared/http.ts67
-rw-r--r--apps/peertube-runner/src/shared/index.ts3
-rw-r--r--apps/peertube-runner/src/shared/ipc/index.ts2
-rw-r--r--apps/peertube-runner/src/shared/ipc/ipc-client.ts88
-rw-r--r--apps/peertube-runner/src/shared/ipc/ipc-server.ts61
-rw-r--r--apps/peertube-runner/src/shared/ipc/shared/index.ts2
-rw-r--r--apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts15
-rw-r--r--apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts15
-rw-r--r--apps/peertube-runner/src/shared/logger.ts12
25 files changed, 1744 insertions, 0 deletions
diff --git a/apps/peertube-runner/src/peertube-runner.ts b/apps/peertube-runner/src/peertube-runner.ts
new file mode 100644
index 000000000..67ca0e0ac
--- /dev/null
+++ b/apps/peertube-runner/src/peertube-runner.ts
@@ -0,0 +1,91 @@
1#!/usr/bin/env node
2
3import { Command, InvalidArgumentError } from '@commander-js/extra-typings'
4import { listRegistered, registerRunner, unregisterRunner } from './register/index.js'
5import { RunnerServer } from './server/index.js'
6import { ConfigManager, logger } from './shared/index.js'
7
8const program = new Command()
9 .version(process.env.PACKAGE_VERSION)
10 .option(
11 '--id <id>',
12 'Runner server id, so you can run multiple PeerTube server runners with different configurations on the same machine',
13 'default'
14 )
15 .option('--verbose', 'Run in verbose mode')
16 .hook('preAction', thisCommand => {
17 const options = thisCommand.opts()
18
19 ConfigManager.Instance.init(options.id)
20
21 if (options.verbose === true) {
22 logger.level = 'debug'
23 }
24 })
25
26program.command('server')
27 .description('Run in server mode, to execute remote jobs of registered PeerTube instances')
28 .action(async () => {
29 try {
30 await RunnerServer.Instance.run()
31 } catch (err) {
32 logger.error(err, 'Cannot run PeerTube runner as server mode')
33 process.exit(-1)
34 }
35 })
36
37program.command('register')
38 .description('Register a new PeerTube instance to process runner jobs')
39 .requiredOption('--url <url>', 'PeerTube instance URL', parseUrl)
40 .requiredOption('--registration-token <token>', 'Runner registration token (can be found in PeerTube instance administration')
41 .requiredOption('--runner-name <name>', 'Runner name')
42 .option('--runner-description <description>', 'Runner description')
43 .action(async options => {
44 try {
45 await registerRunner(options)
46 } catch (err) {
47 console.error('Cannot register this PeerTube runner.')
48 console.error(err)
49 process.exit(-1)
50 }
51 })
52
53program.command('unregister')
54 .description('Unregister the runner from PeerTube instance')
55 .requiredOption('--url <url>', 'PeerTube instance URL', parseUrl)
56 .requiredOption('--runner-name <name>', 'Runner name')
57 .action(async options => {
58 try {
59 await unregisterRunner(options)
60 } catch (err) {
61 console.error('Cannot unregister this PeerTube runner.')
62 console.error(err)
63 process.exit(-1)
64 }
65 })
66
67program.command('list-registered')
68 .description('List registered PeerTube instances')
69 .action(async () => {
70 try {
71 await listRegistered()
72 } catch (err) {
73 console.error('Cannot list registered PeerTube instances.')
74 console.error(err)
75 process.exit(-1)
76 }
77 })
78
79program.parse()
80
81// ---------------------------------------------------------------------------
82// Private
83// ---------------------------------------------------------------------------
84
85function parseUrl (url: string) {
86 if (url.startsWith('http://') !== true && url.startsWith('https://') !== true) {
87 throw new InvalidArgumentError('URL should start with a http:// or https://')
88 }
89
90 return url
91}
diff --git a/apps/peertube-runner/src/register/index.ts b/apps/peertube-runner/src/register/index.ts
new file mode 100644
index 000000000..a7d6cf457
--- /dev/null
+++ b/apps/peertube-runner/src/register/index.ts
@@ -0,0 +1 @@
export * from './register.js'
diff --git a/apps/peertube-runner/src/register/register.ts b/apps/peertube-runner/src/register/register.ts
new file mode 100644
index 000000000..e8af21661
--- /dev/null
+++ b/apps/peertube-runner/src/register/register.ts
@@ -0,0 +1,36 @@
1import { IPCClient } from '../shared/ipc/index.js'
2
3export async function registerRunner (options: {
4 url: string
5 registrationToken: string
6 runnerName: string
7 runnerDescription?: string
8}) {
9 const client = new IPCClient()
10 await client.run()
11
12 await client.askRegister(options)
13
14 client.stop()
15}
16
17export async function unregisterRunner (options: {
18 url: string
19 runnerName: string
20}) {
21 const client = new IPCClient()
22 await client.run()
23
24 await client.askUnregister(options)
25
26 client.stop()
27}
28
29export async function listRegistered () {
30 const client = new IPCClient()
31 await client.run()
32
33 await client.askListRegistered()
34
35 client.stop()
36}
diff --git a/apps/peertube-runner/src/server/index.ts b/apps/peertube-runner/src/server/index.ts
new file mode 100644
index 000000000..e56cda526
--- /dev/null
+++ b/apps/peertube-runner/src/server/index.ts
@@ -0,0 +1 @@
export * from './server.js'
diff --git a/apps/peertube-runner/src/server/process/index.ts b/apps/peertube-runner/src/server/process/index.ts
new file mode 100644
index 000000000..64a7b00fc
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/index.ts
@@ -0,0 +1,2 @@
1export * from './shared/index.js'
2export * from './process.js'
diff --git a/apps/peertube-runner/src/server/process/process.ts b/apps/peertube-runner/src/server/process/process.ts
new file mode 100644
index 000000000..e8a1d7c28
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/process.ts
@@ -0,0 +1,34 @@
1import {
2 RunnerJobLiveRTMPHLSTranscodingPayload,
3 RunnerJobStudioTranscodingPayload,
4 RunnerJobVODAudioMergeTranscodingPayload,
5 RunnerJobVODHLSTranscodingPayload,
6 RunnerJobVODWebVideoTranscodingPayload
7} from '@peertube/peertube-models'
8import { logger } from '../../shared/index.js'
9import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared/index.js'
10import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live.js'
11import { processStudioTranscoding } from './shared/process-studio.js'
12
13export async function processJob (options: ProcessOptions) {
14 const { server, job } = options
15
16 logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload })
17
18 if (job.type === 'vod-audio-merge-transcoding') {
19 await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>)
20 } else if (job.type === 'vod-web-video-transcoding') {
21 await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>)
22 } else if (job.type === 'vod-hls-transcoding') {
23 await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>)
24 } else if (job.type === 'live-rtmp-hls-transcoding') {
25 await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process()
26 } else if (job.type === 'video-studio-transcoding') {
27 await processStudioTranscoding(options as ProcessOptions<RunnerJobStudioTranscodingPayload>)
28 } else {
29 logger.error(`Unknown job ${job.type} to process`)
30 return
31 }
32
33 logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`)
34}
diff --git a/apps/peertube-runner/src/server/process/shared/common.ts b/apps/peertube-runner/src/server/process/shared/common.ts
new file mode 100644
index 000000000..09241d93b
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/shared/common.ts
@@ -0,0 +1,106 @@
1import { remove } from 'fs-extra/esm'
2import { join } from 'path'
3import { FFmpegEdition, FFmpegLive, FFmpegVOD, getDefaultAvailableEncoders, getDefaultEncodersToTry } from '@peertube/peertube-ffmpeg'
4import { RunnerJob, RunnerJobPayload } from '@peertube/peertube-models'
5import { buildUUID } from '@peertube/peertube-node-utils'
6import { PeerTubeServer } from '@peertube/peertube-server-commands'
7import { ConfigManager, downloadFile, logger } from '../../../shared/index.js'
8import { getTranscodingLogger } from './transcoding-logger.js'
9
10export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string }
11
12export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = {
13 server: PeerTubeServer
14 job: JobWithToken<T>
15 runnerToken: string
16}
17
18export async function downloadInputFile (options: {
19 url: string
20 job: JobWithToken
21 runnerToken: string
22}) {
23 const { url, job, runnerToken } = options
24 const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
25
26 try {
27 await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination })
28 } catch (err) {
29 remove(destination)
30 .catch(err => logger.error({ err }, `Cannot remove ${destination}`))
31
32 throw err
33 }
34
35 return destination
36}
37
38export function scheduleTranscodingProgress (options: {
39 server: PeerTubeServer
40 runnerToken: string
41 job: JobWithToken
42 progressGetter: () => number
43}) {
44 const { job, server, progressGetter, runnerToken } = options
45
46 const updateInterval = ConfigManager.Instance.isTestInstance()
47 ? 500
48 : 60000
49
50 const update = () => {
51 server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
52 .catch(err => logger.error({ err }, 'Cannot send job progress'))
53 }
54
55 const interval = setInterval(() => {
56 update()
57 }, updateInterval)
58
59 update()
60
61 return interval
62}
63
64// ---------------------------------------------------------------------------
65
66export function buildFFmpegVOD (options: {
67 onJobProgress: (progress: number) => void
68}) {
69 const { onJobProgress } = options
70
71 return new FFmpegVOD({
72 ...getCommonFFmpegOptions(),
73
74 updateJobProgress: arg => {
75 const progress = arg < 0 || arg > 100
76 ? undefined
77 : arg
78
79 onJobProgress(progress)
80 }
81 })
82}
83
84export function buildFFmpegLive () {
85 return new FFmpegLive(getCommonFFmpegOptions())
86}
87
88export function buildFFmpegEdition () {
89 return new FFmpegEdition(getCommonFFmpegOptions())
90}
91
92function getCommonFFmpegOptions () {
93 const config = ConfigManager.Instance.getConfig()
94
95 return {
96 niceness: config.ffmpeg.nice,
97 threads: config.ffmpeg.threads,
98 tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
99 profile: 'default',
100 availableEncoders: {
101 available: getDefaultAvailableEncoders(),
102 encodersToTry: getDefaultEncodersToTry()
103 },
104 logger: getTranscodingLogger()
105 }
106}
diff --git a/apps/peertube-runner/src/server/process/shared/index.ts b/apps/peertube-runner/src/server/process/shared/index.ts
new file mode 100644
index 000000000..638bf127f
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/shared/index.ts
@@ -0,0 +1,3 @@
1export * from './common.js'
2export * from './process-vod.js'
3export * from './transcoding-logger.js'
diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts
new file mode 100644
index 000000000..0dc4e5b13
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/shared/process-live.ts
@@ -0,0 +1,338 @@
1import { FSWatcher, watch } from 'chokidar'
2import { FfmpegCommand } from 'fluent-ffmpeg'
3import { ensureDir, remove } from 'fs-extra/esm'
4import { basename, join } from 'path'
5import { wait } from '@peertube/peertube-core-utils'
6import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@peertube/peertube-ffmpeg'
7import {
8 LiveRTMPHLSTranscodingSuccess,
9 LiveRTMPHLSTranscodingUpdatePayload,
10 PeerTubeProblemDocument,
11 RunnerJobLiveRTMPHLSTranscodingPayload,
12 ServerErrorCode
13} from '@peertube/peertube-models'
14import { buildUUID } from '@peertube/peertube-node-utils'
15import { ConfigManager } from '../../../shared/config-manager.js'
16import { logger } from '../../../shared/index.js'
17import { buildFFmpegLive, ProcessOptions } from './common.js'
18
19export class ProcessLiveRTMPHLSTranscoding {
20
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
23
24 // Playlist name -> chunks
25 private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
26
27 private readonly playlistsCreated = new Set<string>()
28 private allPlaylistsCreated = false
29
30 private ffmpegCommand: FfmpegCommand
31
32 private ended = false
33 private errored = false
34
35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
37
38 logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
39 }
40
41 process () {
42 const job = this.options.job
43 const payload = job.payload
44
45 return new Promise<void>(async (res, rej) => {
46 try {
47 await ensureDir(this.outputPath)
48
49 logger.info(`Probing ${payload.input.rtmpUrl}`)
50 const probe = await ffprobePromise(payload.input.rtmpUrl)
51 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
52
53 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
54 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
55 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
56
57 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
58 this.fsWatchers.push(m3u8Watcher)
59
60 const tsWatcher = watch(this.outputPath + '/*.ts')
61 this.fsWatchers.push(tsWatcher)
62
63 m3u8Watcher.on('change', p => {
64 logger.debug(`${p} m3u8 playlist changed`)
65 })
66
67 m3u8Watcher.on('add', p => {
68 this.playlistsCreated.add(p)
69
70 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
71 this.allPlaylistsCreated = true
72 logger.info('All m3u8 playlists are created.')
73 }
74 })
75
76 tsWatcher.on('add', async p => {
77 try {
78 await this.sendPendingChunks()
79 } catch (err) {
80 this.onUpdateError({ err, rej, res })
81 }
82
83 const playlistName = this.getPlaylistIdFromTS(p)
84
85 const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
86 pendingChunks.push(p)
87
88 this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
89 })
90
91 tsWatcher.on('unlink', p => {
92 this.sendDeletedChunkUpdate(p)
93 .catch(err => this.onUpdateError({ err, rej, res }))
94 })
95
96 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
97 inputUrl: payload.input.rtmpUrl,
98
99 outPath: this.outputPath,
100 masterPlaylistName: 'master.m3u8',
101
102 segmentListSize: payload.output.segmentListSize,
103 segmentDuration: payload.output.segmentDuration,
104
105 toTranscode: payload.output.toTranscode,
106
107 bitrate,
108 ratio,
109
110 hasAudio
111 })
112
113 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
114
115 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
116 this.onFFmpegError({ err, stdout, stderr })
117
118 res()
119 })
120
121 this.ffmpegCommand.on('end', () => {
122 this.onFFmpegEnded()
123 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
124
125 res()
126 })
127
128 this.ffmpegCommand.run()
129 } catch (err) {
130 rej(err)
131 }
132 })
133 }
134
135 // ---------------------------------------------------------------------------
136
137 private onUpdateError (options: {
138 err: Error
139 res: () => void
140 rej: (reason?: any) => void
141 }) {
142 const { err, res, rej } = options
143
144 if (this.errored) return
145 if (this.ended) return
146
147 this.errored = true
148
149 this.ffmpegCommand.kill('SIGINT')
150
151 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
152 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
153 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
154
155 res()
156 } else {
157 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
158
159 this.sendError(err)
160 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
161
162 rej(err)
163 }
164
165 this.cleanup()
166 }
167
168 // ---------------------------------------------------------------------------
169
170 private onFFmpegError (options: {
171 err: any
172 stdout: string
173 stderr: string
174 }) {
175 const { err, stdout, stderr } = options
176
177 // Don't care that we killed the ffmpeg process
178 if (err?.message?.includes('Exiting normally')) return
179 if (this.errored) return
180 if (this.ended) return
181
182 this.errored = true
183
184 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
185
186 this.sendError(err)
187 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
188
189 this.cleanup()
190 }
191
192 private async sendError (err: Error) {
193 await this.options.server.runnerJobs.error({
194 jobToken: this.options.job.jobToken,
195 jobUUID: this.options.job.uuid,
196 runnerToken: this.options.runnerToken,
197 message: err.message
198 })
199 }
200
201 // ---------------------------------------------------------------------------
202
203 private async onFFmpegEnded () {
204 if (this.ended) return
205
206 this.ended = true
207 logger.info('FFmpeg ended, sending success to server')
208
209 // Wait last ffmpeg chunks generation
210 await wait(1500)
211
212 this.sendSuccess()
213 .catch(err => logger.error({ err }, 'Cannot send success'))
214
215 this.cleanup()
216 }
217
218 private async sendSuccess () {
219 const successBody: LiveRTMPHLSTranscodingSuccess = {}
220
221 await this.options.server.runnerJobs.success({
222 jobToken: this.options.job.jobToken,
223 jobUUID: this.options.job.uuid,
224 runnerToken: this.options.runnerToken,
225 payload: successBody
226 })
227 }
228
229 // ---------------------------------------------------------------------------
230
231 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
232 if (this.ended) return Promise.resolve()
233
234 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
235
236 const videoChunkFilename = basename(deletedChunk)
237
238 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
239 type: 'remove-chunk',
240 videoChunkFilename
241 }
242
243 if (this.allPlaylistsCreated) {
244 const playlistName = this.getPlaylistName(videoChunkFilename)
245
246 payload = {
247 ...payload,
248 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
249 resolutionPlaylistFilename: playlistName,
250 resolutionPlaylistFile: join(this.outputPath, playlistName)
251 }
252 }
253
254 return this.updateWithRetry(payload)
255 }
256
257 private async sendPendingChunks (): Promise<any> {
258 if (this.ended) return Promise.resolve()
259
260 const promises: Promise<any>[] = []
261
262 for (const playlist of this.pendingChunksPerPlaylist.keys()) {
263 for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
264 logger.debug(`Sending added live chunk ${chunk} update`)
265
266 const videoChunkFilename = basename(chunk)
267
268 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
269 type: 'add-chunk',
270 videoChunkFilename,
271 videoChunkFile: chunk
272 }
273
274 if (this.allPlaylistsCreated) {
275 const playlistName = this.getPlaylistName(videoChunkFilename)
276
277 payload = {
278 ...payload,
279 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
280 resolutionPlaylistFilename: playlistName,
281 resolutionPlaylistFile: join(this.outputPath, playlistName)
282 }
283 }
284
285 promises.push(this.updateWithRetry(payload))
286 }
287
288 this.pendingChunksPerPlaylist.set(playlist, [])
289 }
290
291 await Promise.all(promises)
292 }
293
294 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
295 if (this.ended || this.errored) return
296
297 try {
298 await this.options.server.runnerJobs.update({
299 jobToken: this.options.job.jobToken,
300 jobUUID: this.options.job.uuid,
301 runnerToken: this.options.runnerToken,
302 payload
303 })
304 } catch (err) {
305 if (currentTry >= 3) throw err
306 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
307
308 logger.warn({ err }, 'Will retry update after error')
309 await wait(250)
310
311 return this.updateWithRetry(payload, currentTry + 1)
312 }
313 }
314
315 private getPlaylistName (videoChunkFilename: string) {
316 return `${videoChunkFilename.split('-')[0]}.m3u8`
317 }
318
319 private getPlaylistIdFromTS (segmentPath: string) {
320 const playlistIdMatcher = /^([\d+])-/
321
322 return basename(segmentPath).match(playlistIdMatcher)[1]
323 }
324
325 // ---------------------------------------------------------------------------
326
327 private cleanup () {
328 logger.debug(`Cleaning up job ${this.options.job.uuid}`)
329
330 for (const fsWatcher of this.fsWatchers) {
331 fsWatcher.close()
332 .catch(err => logger.error({ err }, 'Cannot close watcher'))
333 }
334
335 remove(this.outputPath)
336 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
337 }
338}
diff --git a/apps/peertube-runner/src/server/process/shared/process-studio.ts b/apps/peertube-runner/src/server/process/shared/process-studio.ts
new file mode 100644
index 000000000..11b7b7d9a
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/shared/process-studio.ts
@@ -0,0 +1,165 @@
1import { remove } from 'fs-extra/esm'
2import { join } from 'path'
3import { pick } from '@peertube/peertube-core-utils'
4import {
5 RunnerJobStudioTranscodingPayload,
6 VideoStudioTask,
7 VideoStudioTaskCutPayload,
8 VideoStudioTaskIntroPayload,
9 VideoStudioTaskOutroPayload,
10 VideoStudioTaskPayload,
11 VideoStudioTaskWatermarkPayload,
12 VideoStudioTranscodingSuccess
13} from '@peertube/peertube-models'
14import { buildUUID } from '@peertube/peertube-node-utils'
15import { ConfigManager } from '../../../shared/config-manager.js'
16import { logger } from '../../../shared/index.js'
17import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common.js'
18
19export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) {
20 const { server, job, runnerToken } = options
21 const payload = job.payload
22
23 let inputPath: string
24 let outputPath: string
25 let tmpInputFilePath: string
26
27 let tasksProgress = 0
28
29 const updateProgressInterval = scheduleTranscodingProgress({
30 job,
31 server,
32 runnerToken,
33 progressGetter: () => tasksProgress
34 })
35
36 try {
37 logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`)
38
39 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
40 tmpInputFilePath = inputPath
41
42 logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`)
43
44 for (const task of payload.tasks) {
45 const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
46 outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)
47
48 await processTask({
49 inputPath: tmpInputFilePath,
50 outputPath,
51 task,
52 job,
53 runnerToken
54 })
55
56 if (tmpInputFilePath) await remove(tmpInputFilePath)
57
58 // For the next iteration
59 tmpInputFilePath = outputPath
60
61 tasksProgress += Math.floor(100 / payload.tasks.length)
62 }
63
64 const successBody: VideoStudioTranscodingSuccess = {
65 videoFile: outputPath
66 }
67
68 await server.runnerJobs.success({
69 jobToken: job.jobToken,
70 jobUUID: job.uuid,
71 runnerToken,
72 payload: successBody
73 })
74 } finally {
75 if (tmpInputFilePath) await remove(tmpInputFilePath)
76 if (outputPath) await remove(outputPath)
77 if (updateProgressInterval) clearInterval(updateProgressInterval)
78 }
79}
80
81// ---------------------------------------------------------------------------
82// Private
83// ---------------------------------------------------------------------------
84
85type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = {
86 inputPath: string
87 outputPath: string
88 task: T
89 runnerToken: string
90 job: JobWithToken
91}
92
93const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = {
94 'add-intro': processAddIntroOutro,
95 'add-outro': processAddIntroOutro,
96 'cut': processCut,
97 'add-watermark': processAddWatermark
98}
99
100async function processTask (options: TaskProcessorOptions) {
101 const { task } = options
102
103 const processor = taskProcessors[options.task.name]
104 if (!process) throw new Error('Unknown task ' + task.name)
105
106 return processor(options)
107}
108
109async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
110 const { inputPath, task, runnerToken, job } = options
111
112 logger.debug('Adding intro/outro to ' + inputPath)
113
114 const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
115
116 try {
117 await buildFFmpegEdition().addIntroOutro({
118 ...pick(options, [ 'inputPath', 'outputPath' ]),
119
120 introOutroPath,
121 type: task.name === 'add-intro'
122 ? 'intro'
123 : 'outro'
124 })
125 } finally {
126 await remove(introOutroPath)
127 }
128}
129
130function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
131 const { inputPath, task } = options
132
133 logger.debug(`Cutting ${inputPath}`)
134
135 return buildFFmpegEdition().cutVideo({
136 ...pick(options, [ 'inputPath', 'outputPath' ]),
137
138 start: task.options.start,
139 end: task.options.end
140 })
141}
142
143async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
144 const { inputPath, task, runnerToken, job } = options
145
146 logger.debug('Adding watermark to ' + inputPath)
147
148 const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
149
150 try {
151 await buildFFmpegEdition().addWatermark({
152 ...pick(options, [ 'inputPath', 'outputPath' ]),
153
154 watermarkPath,
155
156 videoFilters: {
157 watermarkSizeRatio: task.options.watermarkSizeRatio,
158 horitonzalMarginRatio: task.options.horitonzalMarginRatio,
159 verticalMarginRatio: task.options.verticalMarginRatio
160 }
161 })
162 } finally {
163 await remove(watermarkPath)
164 }
165}
diff --git a/apps/peertube-runner/src/server/process/shared/process-vod.ts b/apps/peertube-runner/src/server/process/shared/process-vod.ts
new file mode 100644
index 000000000..fe1715ca9
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/shared/process-vod.ts
@@ -0,0 +1,201 @@
1import { remove } from 'fs-extra/esm'
2import { join } from 'path'
3import {
4 RunnerJobVODAudioMergeTranscodingPayload,
5 RunnerJobVODHLSTranscodingPayload,
6 RunnerJobVODWebVideoTranscodingPayload,
7 VODAudioMergeTranscodingSuccess,
8 VODHLSTranscodingSuccess,
9 VODWebVideoTranscodingSuccess
10} from '@peertube/peertube-models'
11import { buildUUID } from '@peertube/peertube-node-utils'
12import { ConfigManager } from '../../../shared/config-manager.js'
13import { logger } from '../../../shared/index.js'
14import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common.js'
15
16export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) {
17 const { server, job, runnerToken } = options
18
19 const payload = job.payload
20
21 let ffmpegProgress: number
22 let inputPath: string
23
24 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
25
26 const updateProgressInterval = scheduleTranscodingProgress({
27 job,
28 server,
29 runnerToken,
30 progressGetter: () => ffmpegProgress
31 })
32
33 try {
34 logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`)
35
36 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
37
38 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`)
39
40 const ffmpegVod = buildFFmpegVOD({
41 onJobProgress: progress => { ffmpegProgress = progress }
42 })
43
44 await ffmpegVod.transcode({
45 type: 'video',
46
47 inputPath,
48
49 outputPath,
50
51 inputFileMutexReleaser: () => {},
52
53 resolution: payload.output.resolution,
54 fps: payload.output.fps
55 })
56
57 const successBody: VODWebVideoTranscodingSuccess = {
58 videoFile: outputPath
59 }
60
61 await server.runnerJobs.success({
62 jobToken: job.jobToken,
63 jobUUID: job.uuid,
64 runnerToken,
65 payload: successBody
66 })
67 } finally {
68 if (inputPath) await remove(inputPath)
69 if (outputPath) await remove(outputPath)
70 if (updateProgressInterval) clearInterval(updateProgressInterval)
71 }
72}
73
74export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) {
75 const { server, job, runnerToken } = options
76 const payload = job.payload
77
78 let ffmpegProgress: number
79 let inputPath: string
80
81 const uuid = buildUUID()
82 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`)
83 const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4`
84 const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename))
85
86 const updateProgressInterval = scheduleTranscodingProgress({
87 job,
88 server,
89 runnerToken,
90 progressGetter: () => ffmpegProgress
91 })
92
93 try {
94 logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`)
95
96 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
97
98 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
99
100 const ffmpegVod = buildFFmpegVOD({
101 onJobProgress: progress => { ffmpegProgress = progress }
102 })
103
104 await ffmpegVod.transcode({
105 type: 'hls',
106 copyCodecs: false,
107 inputPath,
108 hlsPlaylist: { videoFilename },
109 outputPath,
110
111 inputFileMutexReleaser: () => {},
112
113 resolution: payload.output.resolution,
114 fps: payload.output.fps
115 })
116
117 const successBody: VODHLSTranscodingSuccess = {
118 resolutionPlaylistFile: outputPath,
119 videoFile: videoPath
120 }
121
122 await server.runnerJobs.success({
123 jobToken: job.jobToken,
124 jobUUID: job.uuid,
125 runnerToken,
126 payload: successBody
127 })
128 } finally {
129 if (inputPath) await remove(inputPath)
130 if (outputPath) await remove(outputPath)
131 if (videoPath) await remove(videoPath)
132 if (updateProgressInterval) clearInterval(updateProgressInterval)
133 }
134}
135
136export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) {
137 const { server, job, runnerToken } = options
138 const payload = job.payload
139
140 let ffmpegProgress: number
141 let audioPath: string
142 let inputPath: string
143
144 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
145
146 const updateProgressInterval = scheduleTranscodingProgress({
147 job,
148 server,
149 runnerToken,
150 progressGetter: () => ffmpegProgress
151 })
152
153 try {
154 logger.info(
155 `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
156 `for audio merge transcoding job ${job.jobToken}`
157 )
158
159 audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
160 inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
161
162 logger.info(
163 `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
164 `for job ${job.jobToken}. Running audio merge transcoding.`
165 )
166
167 const ffmpegVod = buildFFmpegVOD({
168 onJobProgress: progress => { ffmpegProgress = progress }
169 })
170
171 await ffmpegVod.transcode({
172 type: 'merge-audio',
173
174 audioPath,
175 inputPath,
176
177 outputPath,
178
179 inputFileMutexReleaser: () => {},
180
181 resolution: payload.output.resolution,
182 fps: payload.output.fps
183 })
184
185 const successBody: VODAudioMergeTranscodingSuccess = {
186 videoFile: outputPath
187 }
188
189 await server.runnerJobs.success({
190 jobToken: job.jobToken,
191 jobUUID: job.uuid,
192 runnerToken,
193 payload: successBody
194 })
195 } finally {
196 if (audioPath) await remove(audioPath)
197 if (inputPath) await remove(inputPath)
198 if (outputPath) await remove(outputPath)
199 if (updateProgressInterval) clearInterval(updateProgressInterval)
200 }
201}
diff --git a/apps/peertube-runner/src/server/process/shared/transcoding-logger.ts b/apps/peertube-runner/src/server/process/shared/transcoding-logger.ts
new file mode 100644
index 000000000..041dd62eb
--- /dev/null
+++ b/apps/peertube-runner/src/server/process/shared/transcoding-logger.ts
@@ -0,0 +1,10 @@
1import { logger } from '../../../shared/index.js'
2
3export function getTranscodingLogger () {
4 return {
5 info: logger.info.bind(logger),
6 debug: logger.debug.bind(logger),
7 warn: logger.warn.bind(logger),
8 error: logger.error.bind(logger)
9 }
10}
diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts
new file mode 100644
index 000000000..825e3f297
--- /dev/null
+++ b/apps/peertube-runner/src/server/server.ts
@@ -0,0 +1,307 @@
1import { ensureDir, remove } from 'fs-extra/esm'
2import { readdir } from 'fs/promises'
3import { join } from 'path'
4import { io, Socket } from 'socket.io-client'
5import { pick, shuffle, wait } from '@peertube/peertube-core-utils'
6import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models'
7import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands'
8import { ConfigManager } from '../shared/index.js'
9import { IPCServer } from '../shared/ipc/index.js'
10import { logger } from '../shared/logger.js'
11import { JobWithToken, processJob } from './process/index.js'
12import { isJobSupported } from './shared/index.js'
13
14type PeerTubeServer = PeerTubeServerCommand & {
15 runnerToken: string
16 runnerName: string
17 runnerDescription?: string
18}
19
20export class RunnerServer {
21 private static instance: RunnerServer
22
23 private servers: PeerTubeServer[] = []
24 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
25
26 private checkingAvailableJobs = false
27
28 private cleaningUp = false
29
30 private readonly sockets = new Map<PeerTubeServer, Socket>()
31
32 private constructor () {}
33
34 async run () {
35 logger.info('Running PeerTube runner in server mode')
36
37 await ConfigManager.Instance.load()
38
39 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
40 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
41
42 this.loadServer(Object.assign(serverCommand, registered))
43
44 logger.info(`Loading registered instance ${registered.url}`)
45 }
46
47 // Run IPC
48 const ipcServer = new IPCServer()
49 try {
50 await ipcServer.run(this)
51 } catch (err) {
52 logger.error('Cannot start local socket for IPC communication', err)
53 process.exit(-1)
54 }
55
56 // Cleanup on exit
57 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
58 process.on(code, async (err, origin) => {
59 if (code === 'uncaughtException') {
60 logger.error({ err, origin }, 'uncaughtException')
61 }
62
63 await this.onExit()
64 })
65 }
66
67 // Process jobs
68 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
69 await this.cleanupTMP()
70
71 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
72
73 await this.checkAvailableJobs()
74 }
75
76 // ---------------------------------------------------------------------------
77
78 async registerRunner (options: {
79 url: string
80 registrationToken: string
81 runnerName: string
82 runnerDescription?: string
83 }) {
84 const { url, registrationToken, runnerName, runnerDescription } = options
85
86 logger.info(`Registering runner ${runnerName} on ${url}...`)
87
88 const serverCommand = new PeerTubeServerCommand({ url })
89 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
90
91 const server: PeerTubeServer = Object.assign(serverCommand, {
92 runnerToken,
93 runnerName,
94 runnerDescription
95 })
96
97 this.loadServer(server)
98 await this.saveRegisteredInstancesInConf()
99
100 logger.info(`Registered runner ${runnerName} on ${url}`)
101
102 await this.checkAvailableJobs()
103 }
104
105 private loadServer (server: PeerTubeServer) {
106 this.servers.push(server)
107
108 const url = server.url + '/runners'
109 const socket = io(url, {
110 auth: {
111 runnerToken: server.runnerToken
112 },
113 transports: [ 'websocket' ]
114 })
115
116 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
117 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
118 socket.on('available-jobs', () => this.checkAvailableJobs())
119
120 this.sockets.set(server, socket)
121 }
122
123 async unregisterRunner (options: {
124 url: string
125 runnerName: string
126 }) {
127 const { url, runnerName } = options
128
129 const server = this.servers.find(s => s.url === url && s.runnerName === runnerName)
130 if (!server) {
131 logger.error(`Unknown server ${url} - ${runnerName} to unregister`)
132 return
133 }
134
135 logger.info(`Unregistering runner ${runnerName} on ${url}...`)
136
137 try {
138 await server.runners.unregister({ runnerToken: server.runnerToken })
139 } catch (err) {
140 logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`)
141 }
142
143 this.unloadServer(server)
144 await this.saveRegisteredInstancesInConf()
145
146 logger.info(`Unregistered runner ${runnerName} on ${url}`)
147 }
148
149 private unloadServer (server: PeerTubeServer) {
150 this.servers = this.servers.filter(s => s !== server)
151
152 const socket = this.sockets.get(server)
153 socket.disconnect()
154
155 this.sockets.delete(server)
156 }
157
158 listRegistered () {
159 return {
160 servers: this.servers.map(s => {
161 return {
162 url: s.url,
163 runnerName: s.runnerName,
164 runnerDescription: s.runnerDescription
165 }
166 })
167 }
168 }
169
170 // ---------------------------------------------------------------------------
171
172 private async checkAvailableJobs () {
173 if (this.checkingAvailableJobs) return
174
175 this.checkingAvailableJobs = true
176
177 let hadAvailableJob = false
178
179 for (const server of shuffle([ ...this.servers ])) {
180 try {
181 logger.info('Checking available jobs on ' + server.url)
182
183 const job = await this.requestJob(server)
184 if (!job) continue
185
186 hadAvailableJob = true
187
188 await this.tryToExecuteJobAsync(server, job)
189 } catch (err) {
190 const code = (err.res?.body as PeerTubeProblemDocument)?.code
191
192 if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
193 logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
194 return
195 }
196
197 if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
198 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
199
200 await this.unregisterRunner({ url: server.url, runnerName: server.runnerName })
201 return
202 }
203
204 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
205 }
206 }
207
208 this.checkingAvailableJobs = false
209
210 if (hadAvailableJob && this.canProcessMoreJobs()) {
211 await wait(2500)
212
213 this.checkAvailableJobs()
214 .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
215 }
216 }
217
218 private async requestJob (server: PeerTubeServer) {
219 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
220
221 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
222
223 const filtered = availableJobs.filter(j => isJobSupported(j))
224
225 if (filtered.length === 0) {
226 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
227 return undefined
228 }
229
230 return filtered[0]
231 }
232
233 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
234 if (!this.canProcessMoreJobs()) return
235
236 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
237
238 const processingJob = { job, server }
239 this.processingJobs.push(processingJob)
240
241 processJob({ server, job, runnerToken: server.runnerToken })
242 .catch(err => {
243 logger.error({ err }, 'Cannot process job')
244
245 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
246 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
247 })
248 .finally(() => {
249 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
250
251 return this.checkAvailableJobs()
252 })
253 }
254
255 // ---------------------------------------------------------------------------
256
257 private saveRegisteredInstancesInConf () {
258 const data = this.servers.map(s => {
259 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
260 })
261
262 return ConfigManager.Instance.setRegisteredInstances(data)
263 }
264
265 private canProcessMoreJobs () {
266 return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
267 }
268
269 // ---------------------------------------------------------------------------
270
271 private async cleanupTMP () {
272 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
273
274 for (const file of files) {
275 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
276 }
277 }
278
279 private async onExit () {
280 if (this.cleaningUp) return
281 this.cleaningUp = true
282
283 logger.info('Cleaning up after program exit')
284
285 try {
286 for (const { server, job } of this.processingJobs) {
287 await server.runnerJobs.abort({
288 jobToken: job.jobToken,
289 jobUUID: job.uuid,
290 reason: 'Runner stopped',
291 runnerToken: server.runnerToken
292 })
293 }
294
295 await this.cleanupTMP()
296 } catch (err) {
297 logger.error(err)
298 process.exit(-1)
299 }
300
301 process.exit()
302 }
303
304 static get Instance () {
305 return this.instance || (this.instance = new this())
306 }
307}
diff --git a/apps/peertube-runner/src/server/shared/index.ts b/apps/peertube-runner/src/server/shared/index.ts
new file mode 100644
index 000000000..34d51196b
--- /dev/null
+++ b/apps/peertube-runner/src/server/shared/index.ts
@@ -0,0 +1 @@
export * from './supported-job.js'
diff --git a/apps/peertube-runner/src/server/shared/supported-job.ts b/apps/peertube-runner/src/server/shared/supported-job.ts
new file mode 100644
index 000000000..d905b5de2
--- /dev/null
+++ b/apps/peertube-runner/src/server/shared/supported-job.ts
@@ -0,0 +1,43 @@
1import {
2 RunnerJobLiveRTMPHLSTranscodingPayload,
3 RunnerJobPayload,
4 RunnerJobType,
5 RunnerJobStudioTranscodingPayload,
6 RunnerJobVODAudioMergeTranscodingPayload,
7 RunnerJobVODHLSTranscodingPayload,
8 RunnerJobVODWebVideoTranscodingPayload,
9 VideoStudioTaskPayload
10} from '@peertube/peertube-models'
11
12const supportedMatrix = {
13 'vod-web-video-transcoding': (_payload: RunnerJobVODWebVideoTranscodingPayload) => {
14 return true
15 },
16 'vod-hls-transcoding': (_payload: RunnerJobVODHLSTranscodingPayload) => {
17 return true
18 },
19 'vod-audio-merge-transcoding': (_payload: RunnerJobVODAudioMergeTranscodingPayload) => {
20 return true
21 },
22 'live-rtmp-hls-transcoding': (_payload: RunnerJobLiveRTMPHLSTranscodingPayload) => {
23 return true
24 },
25 'video-studio-transcoding': (payload: RunnerJobStudioTranscodingPayload) => {
26 const tasks = payload?.tasks
27 const supported = new Set<VideoStudioTaskPayload['name']>([ 'add-intro', 'add-outro', 'add-watermark', 'cut' ])
28
29 if (!Array.isArray(tasks)) return false
30
31 return tasks.every(t => t && supported.has(t.name))
32 }
33}
34
35export function isJobSupported (job: {
36 type: RunnerJobType
37 payload: RunnerJobPayload
38}) {
39 const fn = supportedMatrix[job.type]
40 if (!fn) return false
41
42 return fn(job.payload as any)
43}
diff --git a/apps/peertube-runner/src/shared/config-manager.ts b/apps/peertube-runner/src/shared/config-manager.ts
new file mode 100644
index 000000000..84a326a16
--- /dev/null
+++ b/apps/peertube-runner/src/shared/config-manager.ts
@@ -0,0 +1,140 @@
1import { parse, stringify } from '@iarna/toml'
2import envPaths from 'env-paths'
3import { ensureDir, pathExists, remove } from 'fs-extra/esm'
4import { readFile, writeFile } from 'fs/promises'
5import merge from 'lodash-es/merge.js'
6import { dirname, join } from 'path'
7import { logger } from '../shared/index.js'
8
9const paths = envPaths('peertube-runner')
10
11type Config = {
12 jobs: {
13 concurrency: number
14 }
15
16 ffmpeg: {
17 threads: number
18 nice: number
19 }
20
21 registeredInstances: {
22 url: string
23 runnerToken: string
24 runnerName: string
25 runnerDescription?: string
26 }[]
27}
28
29export class ConfigManager {
30 private static instance: ConfigManager
31
32 private config: Config = {
33 jobs: {
34 concurrency: 2
35 },
36 ffmpeg: {
37 threads: 2,
38 nice: 20
39 },
40 registeredInstances: []
41 }
42
43 private id: string
44 private configFilePath: string
45
46 private constructor () {}
47
48 init (id: string) {
49 this.id = id
50 this.configFilePath = join(this.getConfigDir(), 'config.toml')
51 }
52
53 async load () {
54 logger.info(`Using ${this.configFilePath} as configuration file`)
55
56 if (this.isTestInstance()) {
57 logger.info('Removing configuration file as we are using the "test" id')
58 await remove(this.configFilePath)
59 }
60
61 await ensureDir(dirname(this.configFilePath))
62
63 if (!await pathExists(this.configFilePath)) {
64 await this.save()
65 }
66
67 const file = await readFile(this.configFilePath, 'utf-8')
68
69 this.config = merge(this.config, parse(file))
70 }
71
72 save () {
73 return writeFile(this.configFilePath, stringify(this.config))
74 }
75
76 // ---------------------------------------------------------------------------
77
78 async setRegisteredInstances (registeredInstances: {
79 url: string
80 runnerToken: string
81 runnerName: string
82 runnerDescription?: string
83 }[]) {
84 this.config.registeredInstances = registeredInstances
85
86 await this.save()
87 }
88
89 // ---------------------------------------------------------------------------
90
91 getConfig () {
92 return this.deepFreeze(this.config)
93 }
94
95 // ---------------------------------------------------------------------------
96
97 getTranscodingDirectory () {
98 return join(paths.cache, this.id, 'transcoding')
99 }
100
101 getSocketDirectory () {
102 return join(paths.data, this.id)
103 }
104
105 getSocketPath () {
106 return join(this.getSocketDirectory(), 'peertube-runner.sock')
107 }
108
109 getConfigDir () {
110 return join(paths.config, this.id)
111 }
112
113 // ---------------------------------------------------------------------------
114
115 isTestInstance () {
116 return typeof this.id === 'string' && this.id.match(/^test-\d$/)
117 }
118
119 // ---------------------------------------------------------------------------
120
121 // Thanks: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
122 private deepFreeze <T extends object> (object: T) {
123 const propNames = Reflect.ownKeys(object)
124
125 // Freeze properties before freezing self
126 for (const name of propNames) {
127 const value = object[name]
128
129 if ((value && typeof value === 'object') || typeof value === 'function') {
130 this.deepFreeze(value)
131 }
132 }
133
134 return Object.freeze({ ...object })
135 }
136
137 static get Instance () {
138 return this.instance || (this.instance = new this())
139 }
140}
diff --git a/apps/peertube-runner/src/shared/http.ts b/apps/peertube-runner/src/shared/http.ts
new file mode 100644
index 000000000..42886279c
--- /dev/null
+++ b/apps/peertube-runner/src/shared/http.ts
@@ -0,0 +1,67 @@
1import { createWriteStream } from 'fs'
2import { remove } from 'fs-extra/esm'
3import { request as requestHTTP } from 'http'
4import { request as requestHTTPS, RequestOptions } from 'https'
5import { logger } from './logger.js'
6
7export function downloadFile (options: {
8 url: string
9 destination: string
10 runnerToken: string
11 jobToken: string
12}) {
13 const { url, destination, runnerToken, jobToken } = options
14
15 logger.debug(`Downloading file ${url}`)
16
17 return new Promise<void>((res, rej) => {
18 const parsed = new URL(url)
19
20 const body = JSON.stringify({
21 runnerToken,
22 jobToken
23 })
24
25 const getOptions: RequestOptions = {
26 method: 'POST',
27 hostname: parsed.hostname,
28 port: parsed.port,
29 path: parsed.pathname,
30 headers: {
31 'Content-Type': 'application/json',
32 'Content-Length': Buffer.byteLength(body, 'utf-8')
33 }
34 }
35
36 const request = getRequest(url)(getOptions, response => {
37 const code = response.statusCode ?? 0
38
39 if (code >= 400) {
40 return rej(new Error(response.statusMessage))
41 }
42
43 const file = createWriteStream(destination)
44 file.on('finish', () => res())
45
46 response.pipe(file)
47 })
48
49 request.on('error', err => {
50 remove(destination)
51 .catch(err => logger.error(err))
52
53 return rej(err)
54 })
55
56 request.write(body)
57 request.end()
58 })
59}
60
61// ---------------------------------------------------------------------------
62
63function getRequest (url: string) {
64 if (url.startsWith('https://')) return requestHTTPS
65
66 return requestHTTP
67}
diff --git a/apps/peertube-runner/src/shared/index.ts b/apps/peertube-runner/src/shared/index.ts
new file mode 100644
index 000000000..951eef55b
--- /dev/null
+++ b/apps/peertube-runner/src/shared/index.ts
@@ -0,0 +1,3 @@
1export * from './config-manager.js'
2export * from './http.js'
3export * from './logger.js'
diff --git a/apps/peertube-runner/src/shared/ipc/index.ts b/apps/peertube-runner/src/shared/ipc/index.ts
new file mode 100644
index 000000000..337d4de16
--- /dev/null
+++ b/apps/peertube-runner/src/shared/ipc/index.ts
@@ -0,0 +1,2 @@
1export * from './ipc-client.js'
2export * from './ipc-server.js'
diff --git a/apps/peertube-runner/src/shared/ipc/ipc-client.ts b/apps/peertube-runner/src/shared/ipc/ipc-client.ts
new file mode 100644
index 000000000..aa5740dd1
--- /dev/null
+++ b/apps/peertube-runner/src/shared/ipc/ipc-client.ts
@@ -0,0 +1,88 @@
1import CliTable3 from 'cli-table3'
2import { ensureDir } from 'fs-extra/esm'
3import { Client as NetIPC } from 'net-ipc'
4import { ConfigManager } from '../config-manager.js'
5import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js'
6
7export class IPCClient {
8 private netIPC: NetIPC
9
10 async run () {
11 await ensureDir(ConfigManager.Instance.getSocketDirectory())
12
13 const socketPath = ConfigManager.Instance.getSocketPath()
14
15 this.netIPC = new NetIPC({ path: socketPath })
16
17 try {
18 await this.netIPC.connect()
19 } catch (err) {
20 if (err.code === 'ECONNREFUSED') {
21 throw new Error(
22 'This runner is not currently running in server mode on this system. ' +
23 'Please run it using the `server` command first (in another terminal for example) and then retry your command.'
24 )
25 }
26
27 throw err
28 }
29 }
30
31 async askRegister (options: {
32 url: string
33 registrationToken: string
34 runnerName: string
35 runnerDescription?: string
36 }) {
37 const req: IPCRequest = {
38 type: 'register',
39 ...options
40 }
41
42 const { success, error } = await this.netIPC.request(req) as IPCReponse
43
44 if (success) console.log('PeerTube instance registered')
45 else console.error('Could not register PeerTube instance on runner server side', error)
46 }
47
48 async askUnregister (options: {
49 url: string
50 runnerName: string
51 }) {
52 const req: IPCRequest = {
53 type: 'unregister',
54 ...options
55 }
56
57 const { success, error } = await this.netIPC.request(req) as IPCReponse
58
59 if (success) console.log('PeerTube instance unregistered')
60 else console.error('Could not unregister PeerTube instance on runner server side', error)
61 }
62
63 async askListRegistered () {
64 const req: IPCRequest = {
65 type: 'list-registered'
66 }
67
68 const { success, error, data } = await this.netIPC.request(req) as IPCReponse<IPCReponseData>
69 if (!success) {
70 console.error('Could not list registered PeerTube instances', error)
71 return
72 }
73
74 const table = new CliTable3({
75 head: [ 'instance', 'runner name', 'runner description' ]
76 })
77
78 for (const server of data.servers) {
79 table.push([ server.url, server.runnerName, server.runnerDescription ])
80 }
81
82 console.log(table.toString())
83 }
84
85 stop () {
86 this.netIPC.destroy()
87 }
88}
diff --git a/apps/peertube-runner/src/shared/ipc/ipc-server.ts b/apps/peertube-runner/src/shared/ipc/ipc-server.ts
new file mode 100644
index 000000000..c68438504
--- /dev/null
+++ b/apps/peertube-runner/src/shared/ipc/ipc-server.ts
@@ -0,0 +1,61 @@
1import { ensureDir } from 'fs-extra/esm'
2import { Server as NetIPC } from 'net-ipc'
3import { pick } from '@peertube/peertube-core-utils'
4import { RunnerServer } from '../../server/index.js'
5import { ConfigManager } from '../config-manager.js'
6import { logger } from '../logger.js'
7import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js'
8
9export class IPCServer {
10 private netIPC: NetIPC
11 private runnerServer: RunnerServer
12
13 async run (runnerServer: RunnerServer) {
14 this.runnerServer = runnerServer
15
16 await ensureDir(ConfigManager.Instance.getSocketDirectory())
17
18 const socketPath = ConfigManager.Instance.getSocketPath()
19 this.netIPC = new NetIPC({ path: socketPath })
20 await this.netIPC.start()
21
22 logger.info(`IPC socket created on ${socketPath}`)
23
24 this.netIPC.on('request', async (req: IPCRequest, res) => {
25 try {
26 const data = await this.process(req)
27
28 this.sendReponse(res, { success: true, data })
29 } catch (err) {
30 logger.error('Cannot execute RPC call', err)
31 this.sendReponse(res, { success: false, error: err.message })
32 }
33 })
34 }
35
36 private async process (req: IPCRequest) {
37 switch (req.type) {
38 case 'register':
39 await this.runnerServer.registerRunner(pick(req, [ 'url', 'registrationToken', 'runnerName', 'runnerDescription' ]))
40 return undefined
41
42 case 'unregister':
43 await this.runnerServer.unregisterRunner(pick(req, [ 'url', 'runnerName' ]))
44 return undefined
45
46 case 'list-registered':
47 return Promise.resolve(this.runnerServer.listRegistered())
48
49 default:
50 throw new Error('Unknown RPC call ' + (req as any).type)
51 }
52 }
53
54 private sendReponse <T extends IPCReponseData> (
55 response: (data: any) => Promise<void>,
56 body: IPCReponse<T>
57 ) {
58 response(body)
59 .catch(err => logger.error('Cannot send response after IPC request', err))
60 }
61}
diff --git a/apps/peertube-runner/src/shared/ipc/shared/index.ts b/apps/peertube-runner/src/shared/ipc/shared/index.ts
new file mode 100644
index 000000000..986acafb0
--- /dev/null
+++ b/apps/peertube-runner/src/shared/ipc/shared/index.ts
@@ -0,0 +1,2 @@
1export * from './ipc-request.model.js'
2export * from './ipc-response.model.js'
diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts
new file mode 100644
index 000000000..352808c74
--- /dev/null
+++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts
@@ -0,0 +1,15 @@
1export type IPCRequest =
2 IPCRequestRegister |
3 IPCRequestUnregister |
4 IPCRequestListRegistered
5
6export type IPCRequestRegister = {
7 type: 'register'
8 url: string
9 registrationToken: string
10 runnerName: string
11 runnerDescription?: string
12}
13
14export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string }
15export type IPCRequestListRegistered = { type: 'list-registered' }
diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts
new file mode 100644
index 000000000..689d6e09a
--- /dev/null
+++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts
@@ -0,0 +1,15 @@
1export type IPCReponse <T extends IPCReponseData = undefined> = {
2 success: boolean
3 error?: string
4 data?: T
5}
6
7export type IPCReponseData =
8 // list registered
9 {
10 servers: {
11 runnerName: string
12 runnerDescription: string
13 url: string
14 }[]
15 }
diff --git a/apps/peertube-runner/src/shared/logger.ts b/apps/peertube-runner/src/shared/logger.ts
new file mode 100644
index 000000000..ef5283892
--- /dev/null
+++ b/apps/peertube-runner/src/shared/logger.ts
@@ -0,0 +1,12 @@
1import { pino } from 'pino'
2import pretty from 'pino-pretty'
3
4const logger = pino(pretty.default({
5 colorize: true
6}))
7
8logger.level = 'info'
9
10export {
11 logger
12}