diff options
Diffstat (limited to 'apps/peertube-runner/src')
25 files changed, 1744 insertions, 0 deletions
diff --git a/apps/peertube-runner/src/peertube-runner.ts b/apps/peertube-runner/src/peertube-runner.ts new file mode 100644 index 000000000..67ca0e0ac --- /dev/null +++ b/apps/peertube-runner/src/peertube-runner.ts | |||
@@ -0,0 +1,91 @@ | |||
1 | #!/usr/bin/env node | ||
2 | |||
3 | import { Command, InvalidArgumentError } from '@commander-js/extra-typings' | ||
4 | import { listRegistered, registerRunner, unregisterRunner } from './register/index.js' | ||
5 | import { RunnerServer } from './server/index.js' | ||
6 | import { ConfigManager, logger } from './shared/index.js' | ||
7 | |||
8 | const program = new Command() | ||
9 | .version(process.env.PACKAGE_VERSION) | ||
10 | .option( | ||
11 | '--id <id>', | ||
12 | 'Runner server id, so you can run multiple PeerTube server runners with different configurations on the same machine', | ||
13 | 'default' | ||
14 | ) | ||
15 | .option('--verbose', 'Run in verbose mode') | ||
16 | .hook('preAction', thisCommand => { | ||
17 | const options = thisCommand.opts() | ||
18 | |||
19 | ConfigManager.Instance.init(options.id) | ||
20 | |||
21 | if (options.verbose === true) { | ||
22 | logger.level = 'debug' | ||
23 | } | ||
24 | }) | ||
25 | |||
26 | program.command('server') | ||
27 | .description('Run in server mode, to execute remote jobs of registered PeerTube instances') | ||
28 | .action(async () => { | ||
29 | try { | ||
30 | await RunnerServer.Instance.run() | ||
31 | } catch (err) { | ||
32 | logger.error(err, 'Cannot run PeerTube runner as server mode') | ||
33 | process.exit(-1) | ||
34 | } | ||
35 | }) | ||
36 | |||
37 | program.command('register') | ||
38 | .description('Register a new PeerTube instance to process runner jobs') | ||
39 | .requiredOption('--url <url>', 'PeerTube instance URL', parseUrl) | ||
40 | .requiredOption('--registration-token <token>', 'Runner registration token (can be found in PeerTube instance administration') | ||
41 | .requiredOption('--runner-name <name>', 'Runner name') | ||
42 | .option('--runner-description <description>', 'Runner description') | ||
43 | .action(async options => { | ||
44 | try { | ||
45 | await registerRunner(options) | ||
46 | } catch (err) { | ||
47 | console.error('Cannot register this PeerTube runner.') | ||
48 | console.error(err) | ||
49 | process.exit(-1) | ||
50 | } | ||
51 | }) | ||
52 | |||
53 | program.command('unregister') | ||
54 | .description('Unregister the runner from PeerTube instance') | ||
55 | .requiredOption('--url <url>', 'PeerTube instance URL', parseUrl) | ||
56 | .requiredOption('--runner-name <name>', 'Runner name') | ||
57 | .action(async options => { | ||
58 | try { | ||
59 | await unregisterRunner(options) | ||
60 | } catch (err) { | ||
61 | console.error('Cannot unregister this PeerTube runner.') | ||
62 | console.error(err) | ||
63 | process.exit(-1) | ||
64 | } | ||
65 | }) | ||
66 | |||
67 | program.command('list-registered') | ||
68 | .description('List registered PeerTube instances') | ||
69 | .action(async () => { | ||
70 | try { | ||
71 | await listRegistered() | ||
72 | } catch (err) { | ||
73 | console.error('Cannot list registered PeerTube instances.') | ||
74 | console.error(err) | ||
75 | process.exit(-1) | ||
76 | } | ||
77 | }) | ||
78 | |||
79 | program.parse() | ||
80 | |||
81 | // --------------------------------------------------------------------------- | ||
82 | // Private | ||
83 | // --------------------------------------------------------------------------- | ||
84 | |||
85 | function parseUrl (url: string) { | ||
86 | if (url.startsWith('http://') !== true && url.startsWith('https://') !== true) { | ||
87 | throw new InvalidArgumentError('URL should start with a http:// or https://') | ||
88 | } | ||
89 | |||
90 | return url | ||
91 | } | ||
diff --git a/apps/peertube-runner/src/register/index.ts b/apps/peertube-runner/src/register/index.ts new file mode 100644 index 000000000..a7d6cf457 --- /dev/null +++ b/apps/peertube-runner/src/register/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './register.js' | |||
diff --git a/apps/peertube-runner/src/register/register.ts b/apps/peertube-runner/src/register/register.ts new file mode 100644 index 000000000..e8af21661 --- /dev/null +++ b/apps/peertube-runner/src/register/register.ts | |||
@@ -0,0 +1,36 @@ | |||
1 | import { IPCClient } from '../shared/ipc/index.js' | ||
2 | |||
3 | export async function registerRunner (options: { | ||
4 | url: string | ||
5 | registrationToken: string | ||
6 | runnerName: string | ||
7 | runnerDescription?: string | ||
8 | }) { | ||
9 | const client = new IPCClient() | ||
10 | await client.run() | ||
11 | |||
12 | await client.askRegister(options) | ||
13 | |||
14 | client.stop() | ||
15 | } | ||
16 | |||
17 | export async function unregisterRunner (options: { | ||
18 | url: string | ||
19 | runnerName: string | ||
20 | }) { | ||
21 | const client = new IPCClient() | ||
22 | await client.run() | ||
23 | |||
24 | await client.askUnregister(options) | ||
25 | |||
26 | client.stop() | ||
27 | } | ||
28 | |||
29 | export async function listRegistered () { | ||
30 | const client = new IPCClient() | ||
31 | await client.run() | ||
32 | |||
33 | await client.askListRegistered() | ||
34 | |||
35 | client.stop() | ||
36 | } | ||
diff --git a/apps/peertube-runner/src/server/index.ts b/apps/peertube-runner/src/server/index.ts new file mode 100644 index 000000000..e56cda526 --- /dev/null +++ b/apps/peertube-runner/src/server/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './server.js' | |||
diff --git a/apps/peertube-runner/src/server/process/index.ts b/apps/peertube-runner/src/server/process/index.ts new file mode 100644 index 000000000..64a7b00fc --- /dev/null +++ b/apps/peertube-runner/src/server/process/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './shared/index.js' | ||
2 | export * from './process.js' | ||
diff --git a/apps/peertube-runner/src/server/process/process.ts b/apps/peertube-runner/src/server/process/process.ts new file mode 100644 index 000000000..e8a1d7c28 --- /dev/null +++ b/apps/peertube-runner/src/server/process/process.ts | |||
@@ -0,0 +1,34 @@ | |||
1 | import { | ||
2 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
3 | RunnerJobStudioTranscodingPayload, | ||
4 | RunnerJobVODAudioMergeTranscodingPayload, | ||
5 | RunnerJobVODHLSTranscodingPayload, | ||
6 | RunnerJobVODWebVideoTranscodingPayload | ||
7 | } from '@peertube/peertube-models' | ||
8 | import { logger } from '../../shared/index.js' | ||
9 | import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared/index.js' | ||
10 | import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live.js' | ||
11 | import { processStudioTranscoding } from './shared/process-studio.js' | ||
12 | |||
13 | export async function processJob (options: ProcessOptions) { | ||
14 | const { server, job } = options | ||
15 | |||
16 | logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload }) | ||
17 | |||
18 | if (job.type === 'vod-audio-merge-transcoding') { | ||
19 | await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) | ||
20 | } else if (job.type === 'vod-web-video-transcoding') { | ||
21 | await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) | ||
22 | } else if (job.type === 'vod-hls-transcoding') { | ||
23 | await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>) | ||
24 | } else if (job.type === 'live-rtmp-hls-transcoding') { | ||
25 | await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process() | ||
26 | } else if (job.type === 'video-studio-transcoding') { | ||
27 | await processStudioTranscoding(options as ProcessOptions<RunnerJobStudioTranscodingPayload>) | ||
28 | } else { | ||
29 | logger.error(`Unknown job ${job.type} to process`) | ||
30 | return | ||
31 | } | ||
32 | |||
33 | logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`) | ||
34 | } | ||
diff --git a/apps/peertube-runner/src/server/process/shared/common.ts b/apps/peertube-runner/src/server/process/shared/common.ts new file mode 100644 index 000000000..09241d93b --- /dev/null +++ b/apps/peertube-runner/src/server/process/shared/common.ts | |||
@@ -0,0 +1,106 @@ | |||
1 | import { remove } from 'fs-extra/esm' | ||
2 | import { join } from 'path' | ||
3 | import { FFmpegEdition, FFmpegLive, FFmpegVOD, getDefaultAvailableEncoders, getDefaultEncodersToTry } from '@peertube/peertube-ffmpeg' | ||
4 | import { RunnerJob, RunnerJobPayload } from '@peertube/peertube-models' | ||
5 | import { buildUUID } from '@peertube/peertube-node-utils' | ||
6 | import { PeerTubeServer } from '@peertube/peertube-server-commands' | ||
7 | import { ConfigManager, downloadFile, logger } from '../../../shared/index.js' | ||
8 | import { getTranscodingLogger } from './transcoding-logger.js' | ||
9 | |||
10 | export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string } | ||
11 | |||
12 | export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = { | ||
13 | server: PeerTubeServer | ||
14 | job: JobWithToken<T> | ||
15 | runnerToken: string | ||
16 | } | ||
17 | |||
18 | export async function downloadInputFile (options: { | ||
19 | url: string | ||
20 | job: JobWithToken | ||
21 | runnerToken: string | ||
22 | }) { | ||
23 | const { url, job, runnerToken } = options | ||
24 | const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
25 | |||
26 | try { | ||
27 | await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination }) | ||
28 | } catch (err) { | ||
29 | remove(destination) | ||
30 | .catch(err => logger.error({ err }, `Cannot remove ${destination}`)) | ||
31 | |||
32 | throw err | ||
33 | } | ||
34 | |||
35 | return destination | ||
36 | } | ||
37 | |||
38 | export function scheduleTranscodingProgress (options: { | ||
39 | server: PeerTubeServer | ||
40 | runnerToken: string | ||
41 | job: JobWithToken | ||
42 | progressGetter: () => number | ||
43 | }) { | ||
44 | const { job, server, progressGetter, runnerToken } = options | ||
45 | |||
46 | const updateInterval = ConfigManager.Instance.isTestInstance() | ||
47 | ? 500 | ||
48 | : 60000 | ||
49 | |||
50 | const update = () => { | ||
51 | server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() }) | ||
52 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | ||
53 | } | ||
54 | |||
55 | const interval = setInterval(() => { | ||
56 | update() | ||
57 | }, updateInterval) | ||
58 | |||
59 | update() | ||
60 | |||
61 | return interval | ||
62 | } | ||
63 | |||
64 | // --------------------------------------------------------------------------- | ||
65 | |||
66 | export function buildFFmpegVOD (options: { | ||
67 | onJobProgress: (progress: number) => void | ||
68 | }) { | ||
69 | const { onJobProgress } = options | ||
70 | |||
71 | return new FFmpegVOD({ | ||
72 | ...getCommonFFmpegOptions(), | ||
73 | |||
74 | updateJobProgress: arg => { | ||
75 | const progress = arg < 0 || arg > 100 | ||
76 | ? undefined | ||
77 | : arg | ||
78 | |||
79 | onJobProgress(progress) | ||
80 | } | ||
81 | }) | ||
82 | } | ||
83 | |||
84 | export function buildFFmpegLive () { | ||
85 | return new FFmpegLive(getCommonFFmpegOptions()) | ||
86 | } | ||
87 | |||
88 | export function buildFFmpegEdition () { | ||
89 | return new FFmpegEdition(getCommonFFmpegOptions()) | ||
90 | } | ||
91 | |||
92 | function getCommonFFmpegOptions () { | ||
93 | const config = ConfigManager.Instance.getConfig() | ||
94 | |||
95 | return { | ||
96 | niceness: config.ffmpeg.nice, | ||
97 | threads: config.ffmpeg.threads, | ||
98 | tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(), | ||
99 | profile: 'default', | ||
100 | availableEncoders: { | ||
101 | available: getDefaultAvailableEncoders(), | ||
102 | encodersToTry: getDefaultEncodersToTry() | ||
103 | }, | ||
104 | logger: getTranscodingLogger() | ||
105 | } | ||
106 | } | ||
diff --git a/apps/peertube-runner/src/server/process/shared/index.ts b/apps/peertube-runner/src/server/process/shared/index.ts new file mode 100644 index 000000000..638bf127f --- /dev/null +++ b/apps/peertube-runner/src/server/process/shared/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './common.js' | ||
2 | export * from './process-vod.js' | ||
3 | export * from './transcoding-logger.js' | ||
diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts new file mode 100644 index 000000000..0dc4e5b13 --- /dev/null +++ b/apps/peertube-runner/src/server/process/shared/process-live.ts | |||
@@ -0,0 +1,338 @@ | |||
1 | import { FSWatcher, watch } from 'chokidar' | ||
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
3 | import { ensureDir, remove } from 'fs-extra/esm' | ||
4 | import { basename, join } from 'path' | ||
5 | import { wait } from '@peertube/peertube-core-utils' | ||
6 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@peertube/peertube-ffmpeg' | ||
7 | import { | ||
8 | LiveRTMPHLSTranscodingSuccess, | ||
9 | LiveRTMPHLSTranscodingUpdatePayload, | ||
10 | PeerTubeProblemDocument, | ||
11 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
12 | ServerErrorCode | ||
13 | } from '@peertube/peertube-models' | ||
14 | import { buildUUID } from '@peertube/peertube-node-utils' | ||
15 | import { ConfigManager } from '../../../shared/config-manager.js' | ||
16 | import { logger } from '../../../shared/index.js' | ||
17 | import { buildFFmpegLive, ProcessOptions } from './common.js' | ||
18 | |||
19 | export class ProcessLiveRTMPHLSTranscoding { | ||
20 | |||
21 | private readonly outputPath: string | ||
22 | private readonly fsWatchers: FSWatcher[] = [] | ||
23 | |||
24 | // Playlist name -> chunks | ||
25 | private readonly pendingChunksPerPlaylist = new Map<string, string[]>() | ||
26 | |||
27 | private readonly playlistsCreated = new Set<string>() | ||
28 | private allPlaylistsCreated = false | ||
29 | |||
30 | private ffmpegCommand: FfmpegCommand | ||
31 | |||
32 | private ended = false | ||
33 | private errored = false | ||
34 | |||
35 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | ||
36 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
37 | |||
38 | logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`) | ||
39 | } | ||
40 | |||
41 | process () { | ||
42 | const job = this.options.job | ||
43 | const payload = job.payload | ||
44 | |||
45 | return new Promise<void>(async (res, rej) => { | ||
46 | try { | ||
47 | await ensureDir(this.outputPath) | ||
48 | |||
49 | logger.info(`Probing ${payload.input.rtmpUrl}`) | ||
50 | const probe = await ffprobePromise(payload.input.rtmpUrl) | ||
51 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | ||
52 | |||
53 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | ||
54 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | ||
55 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | ||
56 | |||
57 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | ||
58 | this.fsWatchers.push(m3u8Watcher) | ||
59 | |||
60 | const tsWatcher = watch(this.outputPath + '/*.ts') | ||
61 | this.fsWatchers.push(tsWatcher) | ||
62 | |||
63 | m3u8Watcher.on('change', p => { | ||
64 | logger.debug(`${p} m3u8 playlist changed`) | ||
65 | }) | ||
66 | |||
67 | m3u8Watcher.on('add', p => { | ||
68 | this.playlistsCreated.add(p) | ||
69 | |||
70 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | ||
71 | this.allPlaylistsCreated = true | ||
72 | logger.info('All m3u8 playlists are created.') | ||
73 | } | ||
74 | }) | ||
75 | |||
76 | tsWatcher.on('add', async p => { | ||
77 | try { | ||
78 | await this.sendPendingChunks() | ||
79 | } catch (err) { | ||
80 | this.onUpdateError({ err, rej, res }) | ||
81 | } | ||
82 | |||
83 | const playlistName = this.getPlaylistIdFromTS(p) | ||
84 | |||
85 | const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || [] | ||
86 | pendingChunks.push(p) | ||
87 | |||
88 | this.pendingChunksPerPlaylist.set(playlistName, pendingChunks) | ||
89 | }) | ||
90 | |||
91 | tsWatcher.on('unlink', p => { | ||
92 | this.sendDeletedChunkUpdate(p) | ||
93 | .catch(err => this.onUpdateError({ err, rej, res })) | ||
94 | }) | ||
95 | |||
96 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | ||
97 | inputUrl: payload.input.rtmpUrl, | ||
98 | |||
99 | outPath: this.outputPath, | ||
100 | masterPlaylistName: 'master.m3u8', | ||
101 | |||
102 | segmentListSize: payload.output.segmentListSize, | ||
103 | segmentDuration: payload.output.segmentDuration, | ||
104 | |||
105 | toTranscode: payload.output.toTranscode, | ||
106 | |||
107 | bitrate, | ||
108 | ratio, | ||
109 | |||
110 | hasAudio | ||
111 | }) | ||
112 | |||
113 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | ||
114 | |||
115 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
116 | this.onFFmpegError({ err, stdout, stderr }) | ||
117 | |||
118 | res() | ||
119 | }) | ||
120 | |||
121 | this.ffmpegCommand.on('end', () => { | ||
122 | this.onFFmpegEnded() | ||
123 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | ||
124 | |||
125 | res() | ||
126 | }) | ||
127 | |||
128 | this.ffmpegCommand.run() | ||
129 | } catch (err) { | ||
130 | rej(err) | ||
131 | } | ||
132 | }) | ||
133 | } | ||
134 | |||
135 | // --------------------------------------------------------------------------- | ||
136 | |||
137 | private onUpdateError (options: { | ||
138 | err: Error | ||
139 | res: () => void | ||
140 | rej: (reason?: any) => void | ||
141 | }) { | ||
142 | const { err, res, rej } = options | ||
143 | |||
144 | if (this.errored) return | ||
145 | if (this.ended) return | ||
146 | |||
147 | this.errored = true | ||
148 | |||
149 | this.ffmpegCommand.kill('SIGINT') | ||
150 | |||
151 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | ||
152 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
153 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | ||
154 | |||
155 | res() | ||
156 | } else { | ||
157 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | ||
158 | |||
159 | this.sendError(err) | ||
160 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
161 | |||
162 | rej(err) | ||
163 | } | ||
164 | |||
165 | this.cleanup() | ||
166 | } | ||
167 | |||
168 | // --------------------------------------------------------------------------- | ||
169 | |||
170 | private onFFmpegError (options: { | ||
171 | err: any | ||
172 | stdout: string | ||
173 | stderr: string | ||
174 | }) { | ||
175 | const { err, stdout, stderr } = options | ||
176 | |||
177 | // Don't care that we killed the ffmpeg process | ||
178 | if (err?.message?.includes('Exiting normally')) return | ||
179 | if (this.errored) return | ||
180 | if (this.ended) return | ||
181 | |||
182 | this.errored = true | ||
183 | |||
184 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | ||
185 | |||
186 | this.sendError(err) | ||
187 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
188 | |||
189 | this.cleanup() | ||
190 | } | ||
191 | |||
192 | private async sendError (err: Error) { | ||
193 | await this.options.server.runnerJobs.error({ | ||
194 | jobToken: this.options.job.jobToken, | ||
195 | jobUUID: this.options.job.uuid, | ||
196 | runnerToken: this.options.runnerToken, | ||
197 | message: err.message | ||
198 | }) | ||
199 | } | ||
200 | |||
201 | // --------------------------------------------------------------------------- | ||
202 | |||
203 | private async onFFmpegEnded () { | ||
204 | if (this.ended) return | ||
205 | |||
206 | this.ended = true | ||
207 | logger.info('FFmpeg ended, sending success to server') | ||
208 | |||
209 | // Wait last ffmpeg chunks generation | ||
210 | await wait(1500) | ||
211 | |||
212 | this.sendSuccess() | ||
213 | .catch(err => logger.error({ err }, 'Cannot send success')) | ||
214 | |||
215 | this.cleanup() | ||
216 | } | ||
217 | |||
218 | private async sendSuccess () { | ||
219 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | ||
220 | |||
221 | await this.options.server.runnerJobs.success({ | ||
222 | jobToken: this.options.job.jobToken, | ||
223 | jobUUID: this.options.job.uuid, | ||
224 | runnerToken: this.options.runnerToken, | ||
225 | payload: successBody | ||
226 | }) | ||
227 | } | ||
228 | |||
229 | // --------------------------------------------------------------------------- | ||
230 | |||
231 | private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> { | ||
232 | if (this.ended) return Promise.resolve() | ||
233 | |||
234 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | ||
235 | |||
236 | const videoChunkFilename = basename(deletedChunk) | ||
237 | |||
238 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
239 | type: 'remove-chunk', | ||
240 | videoChunkFilename | ||
241 | } | ||
242 | |||
243 | if (this.allPlaylistsCreated) { | ||
244 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
245 | |||
246 | payload = { | ||
247 | ...payload, | ||
248 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
249 | resolutionPlaylistFilename: playlistName, | ||
250 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
251 | } | ||
252 | } | ||
253 | |||
254 | return this.updateWithRetry(payload) | ||
255 | } | ||
256 | |||
257 | private async sendPendingChunks (): Promise<any> { | ||
258 | if (this.ended) return Promise.resolve() | ||
259 | |||
260 | const promises: Promise<any>[] = [] | ||
261 | |||
262 | for (const playlist of this.pendingChunksPerPlaylist.keys()) { | ||
263 | for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { | ||
264 | logger.debug(`Sending added live chunk ${chunk} update`) | ||
265 | |||
266 | const videoChunkFilename = basename(chunk) | ||
267 | |||
268 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
269 | type: 'add-chunk', | ||
270 | videoChunkFilename, | ||
271 | videoChunkFile: chunk | ||
272 | } | ||
273 | |||
274 | if (this.allPlaylistsCreated) { | ||
275 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
276 | |||
277 | payload = { | ||
278 | ...payload, | ||
279 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
280 | resolutionPlaylistFilename: playlistName, | ||
281 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
282 | } | ||
283 | } | ||
284 | |||
285 | promises.push(this.updateWithRetry(payload)) | ||
286 | } | ||
287 | |||
288 | this.pendingChunksPerPlaylist.set(playlist, []) | ||
289 | } | ||
290 | |||
291 | await Promise.all(promises) | ||
292 | } | ||
293 | |||
294 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { | ||
295 | if (this.ended || this.errored) return | ||
296 | |||
297 | try { | ||
298 | await this.options.server.runnerJobs.update({ | ||
299 | jobToken: this.options.job.jobToken, | ||
300 | jobUUID: this.options.job.uuid, | ||
301 | runnerToken: this.options.runnerToken, | ||
302 | payload | ||
303 | }) | ||
304 | } catch (err) { | ||
305 | if (currentTry >= 3) throw err | ||
306 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err | ||
307 | |||
308 | logger.warn({ err }, 'Will retry update after error') | ||
309 | await wait(250) | ||
310 | |||
311 | return this.updateWithRetry(payload, currentTry + 1) | ||
312 | } | ||
313 | } | ||
314 | |||
315 | private getPlaylistName (videoChunkFilename: string) { | ||
316 | return `${videoChunkFilename.split('-')[0]}.m3u8` | ||
317 | } | ||
318 | |||
319 | private getPlaylistIdFromTS (segmentPath: string) { | ||
320 | const playlistIdMatcher = /^([\d+])-/ | ||
321 | |||
322 | return basename(segmentPath).match(playlistIdMatcher)[1] | ||
323 | } | ||
324 | |||
325 | // --------------------------------------------------------------------------- | ||
326 | |||
327 | private cleanup () { | ||
328 | logger.debug(`Cleaning up job ${this.options.job.uuid}`) | ||
329 | |||
330 | for (const fsWatcher of this.fsWatchers) { | ||
331 | fsWatcher.close() | ||
332 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | ||
333 | } | ||
334 | |||
335 | remove(this.outputPath) | ||
336 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | ||
337 | } | ||
338 | } | ||
diff --git a/apps/peertube-runner/src/server/process/shared/process-studio.ts b/apps/peertube-runner/src/server/process/shared/process-studio.ts new file mode 100644 index 000000000..11b7b7d9a --- /dev/null +++ b/apps/peertube-runner/src/server/process/shared/process-studio.ts | |||
@@ -0,0 +1,165 @@ | |||
1 | import { remove } from 'fs-extra/esm' | ||
2 | import { join } from 'path' | ||
3 | import { pick } from '@peertube/peertube-core-utils' | ||
4 | import { | ||
5 | RunnerJobStudioTranscodingPayload, | ||
6 | VideoStudioTask, | ||
7 | VideoStudioTaskCutPayload, | ||
8 | VideoStudioTaskIntroPayload, | ||
9 | VideoStudioTaskOutroPayload, | ||
10 | VideoStudioTaskPayload, | ||
11 | VideoStudioTaskWatermarkPayload, | ||
12 | VideoStudioTranscodingSuccess | ||
13 | } from '@peertube/peertube-models' | ||
14 | import { buildUUID } from '@peertube/peertube-node-utils' | ||
15 | import { ConfigManager } from '../../../shared/config-manager.js' | ||
16 | import { logger } from '../../../shared/index.js' | ||
17 | import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common.js' | ||
18 | |||
19 | export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) { | ||
20 | const { server, job, runnerToken } = options | ||
21 | const payload = job.payload | ||
22 | |||
23 | let inputPath: string | ||
24 | let outputPath: string | ||
25 | let tmpInputFilePath: string | ||
26 | |||
27 | let tasksProgress = 0 | ||
28 | |||
29 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
30 | job, | ||
31 | server, | ||
32 | runnerToken, | ||
33 | progressGetter: () => tasksProgress | ||
34 | }) | ||
35 | |||
36 | try { | ||
37 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) | ||
38 | |||
39 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
40 | tmpInputFilePath = inputPath | ||
41 | |||
42 | logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) | ||
43 | |||
44 | for (const task of payload.tasks) { | ||
45 | const outputFilename = 'output-edition-' + buildUUID() + '.mp4' | ||
46 | outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) | ||
47 | |||
48 | await processTask({ | ||
49 | inputPath: tmpInputFilePath, | ||
50 | outputPath, | ||
51 | task, | ||
52 | job, | ||
53 | runnerToken | ||
54 | }) | ||
55 | |||
56 | if (tmpInputFilePath) await remove(tmpInputFilePath) | ||
57 | |||
58 | // For the next iteration | ||
59 | tmpInputFilePath = outputPath | ||
60 | |||
61 | tasksProgress += Math.floor(100 / payload.tasks.length) | ||
62 | } | ||
63 | |||
64 | const successBody: VideoStudioTranscodingSuccess = { | ||
65 | videoFile: outputPath | ||
66 | } | ||
67 | |||
68 | await server.runnerJobs.success({ | ||
69 | jobToken: job.jobToken, | ||
70 | jobUUID: job.uuid, | ||
71 | runnerToken, | ||
72 | payload: successBody | ||
73 | }) | ||
74 | } finally { | ||
75 | if (tmpInputFilePath) await remove(tmpInputFilePath) | ||
76 | if (outputPath) await remove(outputPath) | ||
77 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
78 | } | ||
79 | } | ||
80 | |||
81 | // --------------------------------------------------------------------------- | ||
82 | // Private | ||
83 | // --------------------------------------------------------------------------- | ||
84 | |||
85 | type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = { | ||
86 | inputPath: string | ||
87 | outputPath: string | ||
88 | task: T | ||
89 | runnerToken: string | ||
90 | job: JobWithToken | ||
91 | } | ||
92 | |||
93 | const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { | ||
94 | 'add-intro': processAddIntroOutro, | ||
95 | 'add-outro': processAddIntroOutro, | ||
96 | 'cut': processCut, | ||
97 | 'add-watermark': processAddWatermark | ||
98 | } | ||
99 | |||
100 | async function processTask (options: TaskProcessorOptions) { | ||
101 | const { task } = options | ||
102 | |||
103 | const processor = taskProcessors[options.task.name] | ||
104 | if (!process) throw new Error('Unknown task ' + task.name) | ||
105 | |||
106 | return processor(options) | ||
107 | } | ||
108 | |||
109 | async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { | ||
110 | const { inputPath, task, runnerToken, job } = options | ||
111 | |||
112 | logger.debug('Adding intro/outro to ' + inputPath) | ||
113 | |||
114 | const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) | ||
115 | |||
116 | try { | ||
117 | await buildFFmpegEdition().addIntroOutro({ | ||
118 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
119 | |||
120 | introOutroPath, | ||
121 | type: task.name === 'add-intro' | ||
122 | ? 'intro' | ||
123 | : 'outro' | ||
124 | }) | ||
125 | } finally { | ||
126 | await remove(introOutroPath) | ||
127 | } | ||
128 | } | ||
129 | |||
130 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { | ||
131 | const { inputPath, task } = options | ||
132 | |||
133 | logger.debug(`Cutting ${inputPath}`) | ||
134 | |||
135 | return buildFFmpegEdition().cutVideo({ | ||
136 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
137 | |||
138 | start: task.options.start, | ||
139 | end: task.options.end | ||
140 | }) | ||
141 | } | ||
142 | |||
143 | async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { | ||
144 | const { inputPath, task, runnerToken, job } = options | ||
145 | |||
146 | logger.debug('Adding watermark to ' + inputPath) | ||
147 | |||
148 | const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job }) | ||
149 | |||
150 | try { | ||
151 | await buildFFmpegEdition().addWatermark({ | ||
152 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
153 | |||
154 | watermarkPath, | ||
155 | |||
156 | videoFilters: { | ||
157 | watermarkSizeRatio: task.options.watermarkSizeRatio, | ||
158 | horitonzalMarginRatio: task.options.horitonzalMarginRatio, | ||
159 | verticalMarginRatio: task.options.verticalMarginRatio | ||
160 | } | ||
161 | }) | ||
162 | } finally { | ||
163 | await remove(watermarkPath) | ||
164 | } | ||
165 | } | ||
diff --git a/apps/peertube-runner/src/server/process/shared/process-vod.ts b/apps/peertube-runner/src/server/process/shared/process-vod.ts new file mode 100644 index 000000000..fe1715ca9 --- /dev/null +++ b/apps/peertube-runner/src/server/process/shared/process-vod.ts | |||
@@ -0,0 +1,201 @@ | |||
1 | import { remove } from 'fs-extra/esm' | ||
2 | import { join } from 'path' | ||
3 | import { | ||
4 | RunnerJobVODAudioMergeTranscodingPayload, | ||
5 | RunnerJobVODHLSTranscodingPayload, | ||
6 | RunnerJobVODWebVideoTranscodingPayload, | ||
7 | VODAudioMergeTranscodingSuccess, | ||
8 | VODHLSTranscodingSuccess, | ||
9 | VODWebVideoTranscodingSuccess | ||
10 | } from '@peertube/peertube-models' | ||
11 | import { buildUUID } from '@peertube/peertube-node-utils' | ||
12 | import { ConfigManager } from '../../../shared/config-manager.js' | ||
13 | import { logger } from '../../../shared/index.js' | ||
14 | import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common.js' | ||
15 | |||
16 | export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { | ||
17 | const { server, job, runnerToken } = options | ||
18 | |||
19 | const payload = job.payload | ||
20 | |||
21 | let ffmpegProgress: number | ||
22 | let inputPath: string | ||
23 | |||
24 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
25 | |||
26 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
27 | job, | ||
28 | server, | ||
29 | runnerToken, | ||
30 | progressGetter: () => ffmpegProgress | ||
31 | }) | ||
32 | |||
33 | try { | ||
34 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) | ||
35 | |||
36 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
37 | |||
38 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) | ||
39 | |||
40 | const ffmpegVod = buildFFmpegVOD({ | ||
41 | onJobProgress: progress => { ffmpegProgress = progress } | ||
42 | }) | ||
43 | |||
44 | await ffmpegVod.transcode({ | ||
45 | type: 'video', | ||
46 | |||
47 | inputPath, | ||
48 | |||
49 | outputPath, | ||
50 | |||
51 | inputFileMutexReleaser: () => {}, | ||
52 | |||
53 | resolution: payload.output.resolution, | ||
54 | fps: payload.output.fps | ||
55 | }) | ||
56 | |||
57 | const successBody: VODWebVideoTranscodingSuccess = { | ||
58 | videoFile: outputPath | ||
59 | } | ||
60 | |||
61 | await server.runnerJobs.success({ | ||
62 | jobToken: job.jobToken, | ||
63 | jobUUID: job.uuid, | ||
64 | runnerToken, | ||
65 | payload: successBody | ||
66 | }) | ||
67 | } finally { | ||
68 | if (inputPath) await remove(inputPath) | ||
69 | if (outputPath) await remove(outputPath) | ||
70 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
71 | } | ||
72 | } | ||
73 | |||
74 | export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) { | ||
75 | const { server, job, runnerToken } = options | ||
76 | const payload = job.payload | ||
77 | |||
78 | let ffmpegProgress: number | ||
79 | let inputPath: string | ||
80 | |||
81 | const uuid = buildUUID() | ||
82 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) | ||
83 | const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` | ||
84 | const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) | ||
85 | |||
86 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
87 | job, | ||
88 | server, | ||
89 | runnerToken, | ||
90 | progressGetter: () => ffmpegProgress | ||
91 | }) | ||
92 | |||
93 | try { | ||
94 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) | ||
95 | |||
96 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
97 | |||
98 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`) | ||
99 | |||
100 | const ffmpegVod = buildFFmpegVOD({ | ||
101 | onJobProgress: progress => { ffmpegProgress = progress } | ||
102 | }) | ||
103 | |||
104 | await ffmpegVod.transcode({ | ||
105 | type: 'hls', | ||
106 | copyCodecs: false, | ||
107 | inputPath, | ||
108 | hlsPlaylist: { videoFilename }, | ||
109 | outputPath, | ||
110 | |||
111 | inputFileMutexReleaser: () => {}, | ||
112 | |||
113 | resolution: payload.output.resolution, | ||
114 | fps: payload.output.fps | ||
115 | }) | ||
116 | |||
117 | const successBody: VODHLSTranscodingSuccess = { | ||
118 | resolutionPlaylistFile: outputPath, | ||
119 | videoFile: videoPath | ||
120 | } | ||
121 | |||
122 | await server.runnerJobs.success({ | ||
123 | jobToken: job.jobToken, | ||
124 | jobUUID: job.uuid, | ||
125 | runnerToken, | ||
126 | payload: successBody | ||
127 | }) | ||
128 | } finally { | ||
129 | if (inputPath) await remove(inputPath) | ||
130 | if (outputPath) await remove(outputPath) | ||
131 | if (videoPath) await remove(videoPath) | ||
132 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
133 | } | ||
134 | } | ||
135 | |||
136 | export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) { | ||
137 | const { server, job, runnerToken } = options | ||
138 | const payload = job.payload | ||
139 | |||
140 | let ffmpegProgress: number | ||
141 | let audioPath: string | ||
142 | let inputPath: string | ||
143 | |||
144 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
145 | |||
146 | const updateProgressInterval = scheduleTranscodingProgress({ | ||
147 | job, | ||
148 | server, | ||
149 | runnerToken, | ||
150 | progressGetter: () => ffmpegProgress | ||
151 | }) | ||
152 | |||
153 | try { | ||
154 | logger.info( | ||
155 | `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
156 | `for audio merge transcoding job ${job.jobToken}` | ||
157 | ) | ||
158 | |||
159 | audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) | ||
160 | inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) | ||
161 | |||
162 | logger.info( | ||
163 | `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
164 | `for job ${job.jobToken}. Running audio merge transcoding.` | ||
165 | ) | ||
166 | |||
167 | const ffmpegVod = buildFFmpegVOD({ | ||
168 | onJobProgress: progress => { ffmpegProgress = progress } | ||
169 | }) | ||
170 | |||
171 | await ffmpegVod.transcode({ | ||
172 | type: 'merge-audio', | ||
173 | |||
174 | audioPath, | ||
175 | inputPath, | ||
176 | |||
177 | outputPath, | ||
178 | |||
179 | inputFileMutexReleaser: () => {}, | ||
180 | |||
181 | resolution: payload.output.resolution, | ||
182 | fps: payload.output.fps | ||
183 | }) | ||
184 | |||
185 | const successBody: VODAudioMergeTranscodingSuccess = { | ||
186 | videoFile: outputPath | ||
187 | } | ||
188 | |||
189 | await server.runnerJobs.success({ | ||
190 | jobToken: job.jobToken, | ||
191 | jobUUID: job.uuid, | ||
192 | runnerToken, | ||
193 | payload: successBody | ||
194 | }) | ||
195 | } finally { | ||
196 | if (audioPath) await remove(audioPath) | ||
197 | if (inputPath) await remove(inputPath) | ||
198 | if (outputPath) await remove(outputPath) | ||
199 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
200 | } | ||
201 | } | ||
diff --git a/apps/peertube-runner/src/server/process/shared/transcoding-logger.ts b/apps/peertube-runner/src/server/process/shared/transcoding-logger.ts new file mode 100644 index 000000000..041dd62eb --- /dev/null +++ b/apps/peertube-runner/src/server/process/shared/transcoding-logger.ts | |||
@@ -0,0 +1,10 @@ | |||
1 | import { logger } from '../../../shared/index.js' | ||
2 | |||
3 | export function getTranscodingLogger () { | ||
4 | return { | ||
5 | info: logger.info.bind(logger), | ||
6 | debug: logger.debug.bind(logger), | ||
7 | warn: logger.warn.bind(logger), | ||
8 | error: logger.error.bind(logger) | ||
9 | } | ||
10 | } | ||
diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts new file mode 100644 index 000000000..825e3f297 --- /dev/null +++ b/apps/peertube-runner/src/server/server.ts | |||
@@ -0,0 +1,307 @@ | |||
1 | import { ensureDir, remove } from 'fs-extra/esm' | ||
2 | import { readdir } from 'fs/promises' | ||
3 | import { join } from 'path' | ||
4 | import { io, Socket } from 'socket.io-client' | ||
5 | import { pick, shuffle, wait } from '@peertube/peertube-core-utils' | ||
6 | import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models' | ||
7 | import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands' | ||
8 | import { ConfigManager } from '../shared/index.js' | ||
9 | import { IPCServer } from '../shared/ipc/index.js' | ||
10 | import { logger } from '../shared/logger.js' | ||
11 | import { JobWithToken, processJob } from './process/index.js' | ||
12 | import { isJobSupported } from './shared/index.js' | ||
13 | |||
14 | type PeerTubeServer = PeerTubeServerCommand & { | ||
15 | runnerToken: string | ||
16 | runnerName: string | ||
17 | runnerDescription?: string | ||
18 | } | ||
19 | |||
20 | export class RunnerServer { | ||
21 | private static instance: RunnerServer | ||
22 | |||
23 | private servers: PeerTubeServer[] = [] | ||
24 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | ||
25 | |||
26 | private checkingAvailableJobs = false | ||
27 | |||
28 | private cleaningUp = false | ||
29 | |||
30 | private readonly sockets = new Map<PeerTubeServer, Socket>() | ||
31 | |||
32 | private constructor () {} | ||
33 | |||
34 | async run () { | ||
35 | logger.info('Running PeerTube runner in server mode') | ||
36 | |||
37 | await ConfigManager.Instance.load() | ||
38 | |||
39 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | ||
40 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | ||
41 | |||
42 | this.loadServer(Object.assign(serverCommand, registered)) | ||
43 | |||
44 | logger.info(`Loading registered instance ${registered.url}`) | ||
45 | } | ||
46 | |||
47 | // Run IPC | ||
48 | const ipcServer = new IPCServer() | ||
49 | try { | ||
50 | await ipcServer.run(this) | ||
51 | } catch (err) { | ||
52 | logger.error('Cannot start local socket for IPC communication', err) | ||
53 | process.exit(-1) | ||
54 | } | ||
55 | |||
56 | // Cleanup on exit | ||
57 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | ||
58 | process.on(code, async (err, origin) => { | ||
59 | if (code === 'uncaughtException') { | ||
60 | logger.error({ err, origin }, 'uncaughtException') | ||
61 | } | ||
62 | |||
63 | await this.onExit() | ||
64 | }) | ||
65 | } | ||
66 | |||
67 | // Process jobs | ||
68 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | ||
69 | await this.cleanupTMP() | ||
70 | |||
71 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | ||
72 | |||
73 | await this.checkAvailableJobs() | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async registerRunner (options: { | ||
79 | url: string | ||
80 | registrationToken: string | ||
81 | runnerName: string | ||
82 | runnerDescription?: string | ||
83 | }) { | ||
84 | const { url, registrationToken, runnerName, runnerDescription } = options | ||
85 | |||
86 | logger.info(`Registering runner ${runnerName} on ${url}...`) | ||
87 | |||
88 | const serverCommand = new PeerTubeServerCommand({ url }) | ||
89 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | ||
90 | |||
91 | const server: PeerTubeServer = Object.assign(serverCommand, { | ||
92 | runnerToken, | ||
93 | runnerName, | ||
94 | runnerDescription | ||
95 | }) | ||
96 | |||
97 | this.loadServer(server) | ||
98 | await this.saveRegisteredInstancesInConf() | ||
99 | |||
100 | logger.info(`Registered runner ${runnerName} on ${url}`) | ||
101 | |||
102 | await this.checkAvailableJobs() | ||
103 | } | ||
104 | |||
105 | private loadServer (server: PeerTubeServer) { | ||
106 | this.servers.push(server) | ||
107 | |||
108 | const url = server.url + '/runners' | ||
109 | const socket = io(url, { | ||
110 | auth: { | ||
111 | runnerToken: server.runnerToken | ||
112 | }, | ||
113 | transports: [ 'websocket' ] | ||
114 | }) | ||
115 | |||
116 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | ||
117 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | ||
118 | socket.on('available-jobs', () => this.checkAvailableJobs()) | ||
119 | |||
120 | this.sockets.set(server, socket) | ||
121 | } | ||
122 | |||
123 | async unregisterRunner (options: { | ||
124 | url: string | ||
125 | runnerName: string | ||
126 | }) { | ||
127 | const { url, runnerName } = options | ||
128 | |||
129 | const server = this.servers.find(s => s.url === url && s.runnerName === runnerName) | ||
130 | if (!server) { | ||
131 | logger.error(`Unknown server ${url} - ${runnerName} to unregister`) | ||
132 | return | ||
133 | } | ||
134 | |||
135 | logger.info(`Unregistering runner ${runnerName} on ${url}...`) | ||
136 | |||
137 | try { | ||
138 | await server.runners.unregister({ runnerToken: server.runnerToken }) | ||
139 | } catch (err) { | ||
140 | logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`) | ||
141 | } | ||
142 | |||
143 | this.unloadServer(server) | ||
144 | await this.saveRegisteredInstancesInConf() | ||
145 | |||
146 | logger.info(`Unregistered runner ${runnerName} on ${url}`) | ||
147 | } | ||
148 | |||
149 | private unloadServer (server: PeerTubeServer) { | ||
150 | this.servers = this.servers.filter(s => s !== server) | ||
151 | |||
152 | const socket = this.sockets.get(server) | ||
153 | socket.disconnect() | ||
154 | |||
155 | this.sockets.delete(server) | ||
156 | } | ||
157 | |||
158 | listRegistered () { | ||
159 | return { | ||
160 | servers: this.servers.map(s => { | ||
161 | return { | ||
162 | url: s.url, | ||
163 | runnerName: s.runnerName, | ||
164 | runnerDescription: s.runnerDescription | ||
165 | } | ||
166 | }) | ||
167 | } | ||
168 | } | ||
169 | |||
170 | // --------------------------------------------------------------------------- | ||
171 | |||
172 | private async checkAvailableJobs () { | ||
173 | if (this.checkingAvailableJobs) return | ||
174 | |||
175 | this.checkingAvailableJobs = true | ||
176 | |||
177 | let hadAvailableJob = false | ||
178 | |||
179 | for (const server of shuffle([ ...this.servers ])) { | ||
180 | try { | ||
181 | logger.info('Checking available jobs on ' + server.url) | ||
182 | |||
183 | const job = await this.requestJob(server) | ||
184 | if (!job) continue | ||
185 | |||
186 | hadAvailableJob = true | ||
187 | |||
188 | await this.tryToExecuteJobAsync(server, job) | ||
189 | } catch (err) { | ||
190 | const code = (err.res?.body as PeerTubeProblemDocument)?.code | ||
191 | |||
192 | if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
193 | logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') | ||
194 | return | ||
195 | } | ||
196 | |||
197 | if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
198 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | ||
199 | |||
200 | await this.unregisterRunner({ url: server.url, runnerName: server.runnerName }) | ||
201 | return | ||
202 | } | ||
203 | |||
204 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | ||
205 | } | ||
206 | } | ||
207 | |||
208 | this.checkingAvailableJobs = false | ||
209 | |||
210 | if (hadAvailableJob && this.canProcessMoreJobs()) { | ||
211 | await wait(2500) | ||
212 | |||
213 | this.checkAvailableJobs() | ||
214 | .catch(err => logger.error({ err }, 'Cannot check more available jobs')) | ||
215 | } | ||
216 | } | ||
217 | |||
218 | private async requestJob (server: PeerTubeServer) { | ||
219 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | ||
220 | |||
221 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | ||
222 | |||
223 | const filtered = availableJobs.filter(j => isJobSupported(j)) | ||
224 | |||
225 | if (filtered.length === 0) { | ||
226 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | ||
227 | return undefined | ||
228 | } | ||
229 | |||
230 | return filtered[0] | ||
231 | } | ||
232 | |||
233 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | ||
234 | if (!this.canProcessMoreJobs()) return | ||
235 | |||
236 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | ||
237 | |||
238 | const processingJob = { job, server } | ||
239 | this.processingJobs.push(processingJob) | ||
240 | |||
241 | processJob({ server, job, runnerToken: server.runnerToken }) | ||
242 | .catch(err => { | ||
243 | logger.error({ err }, 'Cannot process job') | ||
244 | |||
245 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | ||
246 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | ||
247 | }) | ||
248 | .finally(() => { | ||
249 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | ||
250 | |||
251 | return this.checkAvailableJobs() | ||
252 | }) | ||
253 | } | ||
254 | |||
255 | // --------------------------------------------------------------------------- | ||
256 | |||
257 | private saveRegisteredInstancesInConf () { | ||
258 | const data = this.servers.map(s => { | ||
259 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | ||
260 | }) | ||
261 | |||
262 | return ConfigManager.Instance.setRegisteredInstances(data) | ||
263 | } | ||
264 | |||
265 | private canProcessMoreJobs () { | ||
266 | return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency | ||
267 | } | ||
268 | |||
269 | // --------------------------------------------------------------------------- | ||
270 | |||
271 | private async cleanupTMP () { | ||
272 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | ||
273 | |||
274 | for (const file of files) { | ||
275 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | ||
276 | } | ||
277 | } | ||
278 | |||
279 | private async onExit () { | ||
280 | if (this.cleaningUp) return | ||
281 | this.cleaningUp = true | ||
282 | |||
283 | logger.info('Cleaning up after program exit') | ||
284 | |||
285 | try { | ||
286 | for (const { server, job } of this.processingJobs) { | ||
287 | await server.runnerJobs.abort({ | ||
288 | jobToken: job.jobToken, | ||
289 | jobUUID: job.uuid, | ||
290 | reason: 'Runner stopped', | ||
291 | runnerToken: server.runnerToken | ||
292 | }) | ||
293 | } | ||
294 | |||
295 | await this.cleanupTMP() | ||
296 | } catch (err) { | ||
297 | logger.error(err) | ||
298 | process.exit(-1) | ||
299 | } | ||
300 | |||
301 | process.exit() | ||
302 | } | ||
303 | |||
304 | static get Instance () { | ||
305 | return this.instance || (this.instance = new this()) | ||
306 | } | ||
307 | } | ||
diff --git a/apps/peertube-runner/src/server/shared/index.ts b/apps/peertube-runner/src/server/shared/index.ts new file mode 100644 index 000000000..34d51196b --- /dev/null +++ b/apps/peertube-runner/src/server/shared/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './supported-job.js' | |||
diff --git a/apps/peertube-runner/src/server/shared/supported-job.ts b/apps/peertube-runner/src/server/shared/supported-job.ts new file mode 100644 index 000000000..d905b5de2 --- /dev/null +++ b/apps/peertube-runner/src/server/shared/supported-job.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import { | ||
2 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
3 | RunnerJobPayload, | ||
4 | RunnerJobType, | ||
5 | RunnerJobStudioTranscodingPayload, | ||
6 | RunnerJobVODAudioMergeTranscodingPayload, | ||
7 | RunnerJobVODHLSTranscodingPayload, | ||
8 | RunnerJobVODWebVideoTranscodingPayload, | ||
9 | VideoStudioTaskPayload | ||
10 | } from '@peertube/peertube-models' | ||
11 | |||
12 | const supportedMatrix = { | ||
13 | 'vod-web-video-transcoding': (_payload: RunnerJobVODWebVideoTranscodingPayload) => { | ||
14 | return true | ||
15 | }, | ||
16 | 'vod-hls-transcoding': (_payload: RunnerJobVODHLSTranscodingPayload) => { | ||
17 | return true | ||
18 | }, | ||
19 | 'vod-audio-merge-transcoding': (_payload: RunnerJobVODAudioMergeTranscodingPayload) => { | ||
20 | return true | ||
21 | }, | ||
22 | 'live-rtmp-hls-transcoding': (_payload: RunnerJobLiveRTMPHLSTranscodingPayload) => { | ||
23 | return true | ||
24 | }, | ||
25 | 'video-studio-transcoding': (payload: RunnerJobStudioTranscodingPayload) => { | ||
26 | const tasks = payload?.tasks | ||
27 | const supported = new Set<VideoStudioTaskPayload['name']>([ 'add-intro', 'add-outro', 'add-watermark', 'cut' ]) | ||
28 | |||
29 | if (!Array.isArray(tasks)) return false | ||
30 | |||
31 | return tasks.every(t => t && supported.has(t.name)) | ||
32 | } | ||
33 | } | ||
34 | |||
35 | export function isJobSupported (job: { | ||
36 | type: RunnerJobType | ||
37 | payload: RunnerJobPayload | ||
38 | }) { | ||
39 | const fn = supportedMatrix[job.type] | ||
40 | if (!fn) return false | ||
41 | |||
42 | return fn(job.payload as any) | ||
43 | } | ||
diff --git a/apps/peertube-runner/src/shared/config-manager.ts b/apps/peertube-runner/src/shared/config-manager.ts new file mode 100644 index 000000000..84a326a16 --- /dev/null +++ b/apps/peertube-runner/src/shared/config-manager.ts | |||
@@ -0,0 +1,140 @@ | |||
1 | import { parse, stringify } from '@iarna/toml' | ||
2 | import envPaths from 'env-paths' | ||
3 | import { ensureDir, pathExists, remove } from 'fs-extra/esm' | ||
4 | import { readFile, writeFile } from 'fs/promises' | ||
5 | import merge from 'lodash-es/merge.js' | ||
6 | import { dirname, join } from 'path' | ||
7 | import { logger } from '../shared/index.js' | ||
8 | |||
9 | const paths = envPaths('peertube-runner') | ||
10 | |||
11 | type Config = { | ||
12 | jobs: { | ||
13 | concurrency: number | ||
14 | } | ||
15 | |||
16 | ffmpeg: { | ||
17 | threads: number | ||
18 | nice: number | ||
19 | } | ||
20 | |||
21 | registeredInstances: { | ||
22 | url: string | ||
23 | runnerToken: string | ||
24 | runnerName: string | ||
25 | runnerDescription?: string | ||
26 | }[] | ||
27 | } | ||
28 | |||
29 | export class ConfigManager { | ||
30 | private static instance: ConfigManager | ||
31 | |||
32 | private config: Config = { | ||
33 | jobs: { | ||
34 | concurrency: 2 | ||
35 | }, | ||
36 | ffmpeg: { | ||
37 | threads: 2, | ||
38 | nice: 20 | ||
39 | }, | ||
40 | registeredInstances: [] | ||
41 | } | ||
42 | |||
43 | private id: string | ||
44 | private configFilePath: string | ||
45 | |||
46 | private constructor () {} | ||
47 | |||
48 | init (id: string) { | ||
49 | this.id = id | ||
50 | this.configFilePath = join(this.getConfigDir(), 'config.toml') | ||
51 | } | ||
52 | |||
53 | async load () { | ||
54 | logger.info(`Using ${this.configFilePath} as configuration file`) | ||
55 | |||
56 | if (this.isTestInstance()) { | ||
57 | logger.info('Removing configuration file as we are using the "test" id') | ||
58 | await remove(this.configFilePath) | ||
59 | } | ||
60 | |||
61 | await ensureDir(dirname(this.configFilePath)) | ||
62 | |||
63 | if (!await pathExists(this.configFilePath)) { | ||
64 | await this.save() | ||
65 | } | ||
66 | |||
67 | const file = await readFile(this.configFilePath, 'utf-8') | ||
68 | |||
69 | this.config = merge(this.config, parse(file)) | ||
70 | } | ||
71 | |||
72 | save () { | ||
73 | return writeFile(this.configFilePath, stringify(this.config)) | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async setRegisteredInstances (registeredInstances: { | ||
79 | url: string | ||
80 | runnerToken: string | ||
81 | runnerName: string | ||
82 | runnerDescription?: string | ||
83 | }[]) { | ||
84 | this.config.registeredInstances = registeredInstances | ||
85 | |||
86 | await this.save() | ||
87 | } | ||
88 | |||
89 | // --------------------------------------------------------------------------- | ||
90 | |||
91 | getConfig () { | ||
92 | return this.deepFreeze(this.config) | ||
93 | } | ||
94 | |||
95 | // --------------------------------------------------------------------------- | ||
96 | |||
97 | getTranscodingDirectory () { | ||
98 | return join(paths.cache, this.id, 'transcoding') | ||
99 | } | ||
100 | |||
101 | getSocketDirectory () { | ||
102 | return join(paths.data, this.id) | ||
103 | } | ||
104 | |||
105 | getSocketPath () { | ||
106 | return join(this.getSocketDirectory(), 'peertube-runner.sock') | ||
107 | } | ||
108 | |||
109 | getConfigDir () { | ||
110 | return join(paths.config, this.id) | ||
111 | } | ||
112 | |||
113 | // --------------------------------------------------------------------------- | ||
114 | |||
115 | isTestInstance () { | ||
116 | return typeof this.id === 'string' && this.id.match(/^test-\d$/) | ||
117 | } | ||
118 | |||
119 | // --------------------------------------------------------------------------- | ||
120 | |||
121 | // Thanks: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze | ||
122 | private deepFreeze <T extends object> (object: T) { | ||
123 | const propNames = Reflect.ownKeys(object) | ||
124 | |||
125 | // Freeze properties before freezing self | ||
126 | for (const name of propNames) { | ||
127 | const value = object[name] | ||
128 | |||
129 | if ((value && typeof value === 'object') || typeof value === 'function') { | ||
130 | this.deepFreeze(value) | ||
131 | } | ||
132 | } | ||
133 | |||
134 | return Object.freeze({ ...object }) | ||
135 | } | ||
136 | |||
137 | static get Instance () { | ||
138 | return this.instance || (this.instance = new this()) | ||
139 | } | ||
140 | } | ||
diff --git a/apps/peertube-runner/src/shared/http.ts b/apps/peertube-runner/src/shared/http.ts new file mode 100644 index 000000000..42886279c --- /dev/null +++ b/apps/peertube-runner/src/shared/http.ts | |||
@@ -0,0 +1,67 @@ | |||
1 | import { createWriteStream } from 'fs' | ||
2 | import { remove } from 'fs-extra/esm' | ||
3 | import { request as requestHTTP } from 'http' | ||
4 | import { request as requestHTTPS, RequestOptions } from 'https' | ||
5 | import { logger } from './logger.js' | ||
6 | |||
7 | export function downloadFile (options: { | ||
8 | url: string | ||
9 | destination: string | ||
10 | runnerToken: string | ||
11 | jobToken: string | ||
12 | }) { | ||
13 | const { url, destination, runnerToken, jobToken } = options | ||
14 | |||
15 | logger.debug(`Downloading file ${url}`) | ||
16 | |||
17 | return new Promise<void>((res, rej) => { | ||
18 | const parsed = new URL(url) | ||
19 | |||
20 | const body = JSON.stringify({ | ||
21 | runnerToken, | ||
22 | jobToken | ||
23 | }) | ||
24 | |||
25 | const getOptions: RequestOptions = { | ||
26 | method: 'POST', | ||
27 | hostname: parsed.hostname, | ||
28 | port: parsed.port, | ||
29 | path: parsed.pathname, | ||
30 | headers: { | ||
31 | 'Content-Type': 'application/json', | ||
32 | 'Content-Length': Buffer.byteLength(body, 'utf-8') | ||
33 | } | ||
34 | } | ||
35 | |||
36 | const request = getRequest(url)(getOptions, response => { | ||
37 | const code = response.statusCode ?? 0 | ||
38 | |||
39 | if (code >= 400) { | ||
40 | return rej(new Error(response.statusMessage)) | ||
41 | } | ||
42 | |||
43 | const file = createWriteStream(destination) | ||
44 | file.on('finish', () => res()) | ||
45 | |||
46 | response.pipe(file) | ||
47 | }) | ||
48 | |||
49 | request.on('error', err => { | ||
50 | remove(destination) | ||
51 | .catch(err => logger.error(err)) | ||
52 | |||
53 | return rej(err) | ||
54 | }) | ||
55 | |||
56 | request.write(body) | ||
57 | request.end() | ||
58 | }) | ||
59 | } | ||
60 | |||
61 | // --------------------------------------------------------------------------- | ||
62 | |||
63 | function getRequest (url: string) { | ||
64 | if (url.startsWith('https://')) return requestHTTPS | ||
65 | |||
66 | return requestHTTP | ||
67 | } | ||
diff --git a/apps/peertube-runner/src/shared/index.ts b/apps/peertube-runner/src/shared/index.ts new file mode 100644 index 000000000..951eef55b --- /dev/null +++ b/apps/peertube-runner/src/shared/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './config-manager.js' | ||
2 | export * from './http.js' | ||
3 | export * from './logger.js' | ||
diff --git a/apps/peertube-runner/src/shared/ipc/index.ts b/apps/peertube-runner/src/shared/ipc/index.ts new file mode 100644 index 000000000..337d4de16 --- /dev/null +++ b/apps/peertube-runner/src/shared/ipc/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './ipc-client.js' | ||
2 | export * from './ipc-server.js' | ||
diff --git a/apps/peertube-runner/src/shared/ipc/ipc-client.ts b/apps/peertube-runner/src/shared/ipc/ipc-client.ts new file mode 100644 index 000000000..aa5740dd1 --- /dev/null +++ b/apps/peertube-runner/src/shared/ipc/ipc-client.ts | |||
@@ -0,0 +1,88 @@ | |||
1 | import CliTable3 from 'cli-table3' | ||
2 | import { ensureDir } from 'fs-extra/esm' | ||
3 | import { Client as NetIPC } from 'net-ipc' | ||
4 | import { ConfigManager } from '../config-manager.js' | ||
5 | import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js' | ||
6 | |||
7 | export class IPCClient { | ||
8 | private netIPC: NetIPC | ||
9 | |||
10 | async run () { | ||
11 | await ensureDir(ConfigManager.Instance.getSocketDirectory()) | ||
12 | |||
13 | const socketPath = ConfigManager.Instance.getSocketPath() | ||
14 | |||
15 | this.netIPC = new NetIPC({ path: socketPath }) | ||
16 | |||
17 | try { | ||
18 | await this.netIPC.connect() | ||
19 | } catch (err) { | ||
20 | if (err.code === 'ECONNREFUSED') { | ||
21 | throw new Error( | ||
22 | 'This runner is not currently running in server mode on this system. ' + | ||
23 | 'Please run it using the `server` command first (in another terminal for example) and then retry your command.' | ||
24 | ) | ||
25 | } | ||
26 | |||
27 | throw err | ||
28 | } | ||
29 | } | ||
30 | |||
31 | async askRegister (options: { | ||
32 | url: string | ||
33 | registrationToken: string | ||
34 | runnerName: string | ||
35 | runnerDescription?: string | ||
36 | }) { | ||
37 | const req: IPCRequest = { | ||
38 | type: 'register', | ||
39 | ...options | ||
40 | } | ||
41 | |||
42 | const { success, error } = await this.netIPC.request(req) as IPCReponse | ||
43 | |||
44 | if (success) console.log('PeerTube instance registered') | ||
45 | else console.error('Could not register PeerTube instance on runner server side', error) | ||
46 | } | ||
47 | |||
48 | async askUnregister (options: { | ||
49 | url: string | ||
50 | runnerName: string | ||
51 | }) { | ||
52 | const req: IPCRequest = { | ||
53 | type: 'unregister', | ||
54 | ...options | ||
55 | } | ||
56 | |||
57 | const { success, error } = await this.netIPC.request(req) as IPCReponse | ||
58 | |||
59 | if (success) console.log('PeerTube instance unregistered') | ||
60 | else console.error('Could not unregister PeerTube instance on runner server side', error) | ||
61 | } | ||
62 | |||
63 | async askListRegistered () { | ||
64 | const req: IPCRequest = { | ||
65 | type: 'list-registered' | ||
66 | } | ||
67 | |||
68 | const { success, error, data } = await this.netIPC.request(req) as IPCReponse<IPCReponseData> | ||
69 | if (!success) { | ||
70 | console.error('Could not list registered PeerTube instances', error) | ||
71 | return | ||
72 | } | ||
73 | |||
74 | const table = new CliTable3({ | ||
75 | head: [ 'instance', 'runner name', 'runner description' ] | ||
76 | }) | ||
77 | |||
78 | for (const server of data.servers) { | ||
79 | table.push([ server.url, server.runnerName, server.runnerDescription ]) | ||
80 | } | ||
81 | |||
82 | console.log(table.toString()) | ||
83 | } | ||
84 | |||
85 | stop () { | ||
86 | this.netIPC.destroy() | ||
87 | } | ||
88 | } | ||
diff --git a/apps/peertube-runner/src/shared/ipc/ipc-server.ts b/apps/peertube-runner/src/shared/ipc/ipc-server.ts new file mode 100644 index 000000000..c68438504 --- /dev/null +++ b/apps/peertube-runner/src/shared/ipc/ipc-server.ts | |||
@@ -0,0 +1,61 @@ | |||
1 | import { ensureDir } from 'fs-extra/esm' | ||
2 | import { Server as NetIPC } from 'net-ipc' | ||
3 | import { pick } from '@peertube/peertube-core-utils' | ||
4 | import { RunnerServer } from '../../server/index.js' | ||
5 | import { ConfigManager } from '../config-manager.js' | ||
6 | import { logger } from '../logger.js' | ||
7 | import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js' | ||
8 | |||
9 | export class IPCServer { | ||
10 | private netIPC: NetIPC | ||
11 | private runnerServer: RunnerServer | ||
12 | |||
13 | async run (runnerServer: RunnerServer) { | ||
14 | this.runnerServer = runnerServer | ||
15 | |||
16 | await ensureDir(ConfigManager.Instance.getSocketDirectory()) | ||
17 | |||
18 | const socketPath = ConfigManager.Instance.getSocketPath() | ||
19 | this.netIPC = new NetIPC({ path: socketPath }) | ||
20 | await this.netIPC.start() | ||
21 | |||
22 | logger.info(`IPC socket created on ${socketPath}`) | ||
23 | |||
24 | this.netIPC.on('request', async (req: IPCRequest, res) => { | ||
25 | try { | ||
26 | const data = await this.process(req) | ||
27 | |||
28 | this.sendReponse(res, { success: true, data }) | ||
29 | } catch (err) { | ||
30 | logger.error('Cannot execute RPC call', err) | ||
31 | this.sendReponse(res, { success: false, error: err.message }) | ||
32 | } | ||
33 | }) | ||
34 | } | ||
35 | |||
36 | private async process (req: IPCRequest) { | ||
37 | switch (req.type) { | ||
38 | case 'register': | ||
39 | await this.runnerServer.registerRunner(pick(req, [ 'url', 'registrationToken', 'runnerName', 'runnerDescription' ])) | ||
40 | return undefined | ||
41 | |||
42 | case 'unregister': | ||
43 | await this.runnerServer.unregisterRunner(pick(req, [ 'url', 'runnerName' ])) | ||
44 | return undefined | ||
45 | |||
46 | case 'list-registered': | ||
47 | return Promise.resolve(this.runnerServer.listRegistered()) | ||
48 | |||
49 | default: | ||
50 | throw new Error('Unknown RPC call ' + (req as any).type) | ||
51 | } | ||
52 | } | ||
53 | |||
54 | private sendReponse <T extends IPCReponseData> ( | ||
55 | response: (data: any) => Promise<void>, | ||
56 | body: IPCReponse<T> | ||
57 | ) { | ||
58 | response(body) | ||
59 | .catch(err => logger.error('Cannot send response after IPC request', err)) | ||
60 | } | ||
61 | } | ||
diff --git a/apps/peertube-runner/src/shared/ipc/shared/index.ts b/apps/peertube-runner/src/shared/ipc/shared/index.ts new file mode 100644 index 000000000..986acafb0 --- /dev/null +++ b/apps/peertube-runner/src/shared/ipc/shared/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './ipc-request.model.js' | ||
2 | export * from './ipc-response.model.js' | ||
diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts new file mode 100644 index 000000000..352808c74 --- /dev/null +++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-request.model.ts | |||
@@ -0,0 +1,15 @@ | |||
1 | export type IPCRequest = | ||
2 | IPCRequestRegister | | ||
3 | IPCRequestUnregister | | ||
4 | IPCRequestListRegistered | ||
5 | |||
6 | export type IPCRequestRegister = { | ||
7 | type: 'register' | ||
8 | url: string | ||
9 | registrationToken: string | ||
10 | runnerName: string | ||
11 | runnerDescription?: string | ||
12 | } | ||
13 | |||
14 | export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string } | ||
15 | export type IPCRequestListRegistered = { type: 'list-registered' } | ||
diff --git a/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts new file mode 100644 index 000000000..689d6e09a --- /dev/null +++ b/apps/peertube-runner/src/shared/ipc/shared/ipc-response.model.ts | |||
@@ -0,0 +1,15 @@ | |||
1 | export type IPCReponse <T extends IPCReponseData = undefined> = { | ||
2 | success: boolean | ||
3 | error?: string | ||
4 | data?: T | ||
5 | } | ||
6 | |||
7 | export type IPCReponseData = | ||
8 | // list registered | ||
9 | { | ||
10 | servers: { | ||
11 | runnerName: string | ||
12 | runnerDescription: string | ||
13 | url: string | ||
14 | }[] | ||
15 | } | ||
diff --git a/apps/peertube-runner/src/shared/logger.ts b/apps/peertube-runner/src/shared/logger.ts new file mode 100644 index 000000000..ef5283892 --- /dev/null +++ b/apps/peertube-runner/src/shared/logger.ts | |||
@@ -0,0 +1,12 @@ | |||
1 | import { pino } from 'pino' | ||
2 | import pretty from 'pino-pretty' | ||
3 | |||
4 | const logger = pino(pretty.default({ | ||
5 | colorize: true | ||
6 | })) | ||
7 | |||
8 | logger.level = 'info' | ||
9 | |||
10 | export { | ||
11 | logger | ||
12 | } | ||