diff options
Diffstat (limited to 'apps/peertube-runner/src/server/server.ts')
-rw-r--r-- | apps/peertube-runner/src/server/server.ts | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts new file mode 100644 index 000000000..825e3f297 --- /dev/null +++ b/apps/peertube-runner/src/server/server.ts | |||
@@ -0,0 +1,307 @@ | |||
1 | import { ensureDir, remove } from 'fs-extra/esm' | ||
2 | import { readdir } from 'fs/promises' | ||
3 | import { join } from 'path' | ||
4 | import { io, Socket } from 'socket.io-client' | ||
5 | import { pick, shuffle, wait } from '@peertube/peertube-core-utils' | ||
6 | import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models' | ||
7 | import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands' | ||
8 | import { ConfigManager } from '../shared/index.js' | ||
9 | import { IPCServer } from '../shared/ipc/index.js' | ||
10 | import { logger } from '../shared/logger.js' | ||
11 | import { JobWithToken, processJob } from './process/index.js' | ||
12 | import { isJobSupported } from './shared/index.js' | ||
13 | |||
14 | type PeerTubeServer = PeerTubeServerCommand & { | ||
15 | runnerToken: string | ||
16 | runnerName: string | ||
17 | runnerDescription?: string | ||
18 | } | ||
19 | |||
20 | export class RunnerServer { | ||
21 | private static instance: RunnerServer | ||
22 | |||
23 | private servers: PeerTubeServer[] = [] | ||
24 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | ||
25 | |||
26 | private checkingAvailableJobs = false | ||
27 | |||
28 | private cleaningUp = false | ||
29 | |||
30 | private readonly sockets = new Map<PeerTubeServer, Socket>() | ||
31 | |||
32 | private constructor () {} | ||
33 | |||
34 | async run () { | ||
35 | logger.info('Running PeerTube runner in server mode') | ||
36 | |||
37 | await ConfigManager.Instance.load() | ||
38 | |||
39 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | ||
40 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | ||
41 | |||
42 | this.loadServer(Object.assign(serverCommand, registered)) | ||
43 | |||
44 | logger.info(`Loading registered instance ${registered.url}`) | ||
45 | } | ||
46 | |||
47 | // Run IPC | ||
48 | const ipcServer = new IPCServer() | ||
49 | try { | ||
50 | await ipcServer.run(this) | ||
51 | } catch (err) { | ||
52 | logger.error('Cannot start local socket for IPC communication', err) | ||
53 | process.exit(-1) | ||
54 | } | ||
55 | |||
56 | // Cleanup on exit | ||
57 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | ||
58 | process.on(code, async (err, origin) => { | ||
59 | if (code === 'uncaughtException') { | ||
60 | logger.error({ err, origin }, 'uncaughtException') | ||
61 | } | ||
62 | |||
63 | await this.onExit() | ||
64 | }) | ||
65 | } | ||
66 | |||
67 | // Process jobs | ||
68 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | ||
69 | await this.cleanupTMP() | ||
70 | |||
71 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | ||
72 | |||
73 | await this.checkAvailableJobs() | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async registerRunner (options: { | ||
79 | url: string | ||
80 | registrationToken: string | ||
81 | runnerName: string | ||
82 | runnerDescription?: string | ||
83 | }) { | ||
84 | const { url, registrationToken, runnerName, runnerDescription } = options | ||
85 | |||
86 | logger.info(`Registering runner ${runnerName} on ${url}...`) | ||
87 | |||
88 | const serverCommand = new PeerTubeServerCommand({ url }) | ||
89 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | ||
90 | |||
91 | const server: PeerTubeServer = Object.assign(serverCommand, { | ||
92 | runnerToken, | ||
93 | runnerName, | ||
94 | runnerDescription | ||
95 | }) | ||
96 | |||
97 | this.loadServer(server) | ||
98 | await this.saveRegisteredInstancesInConf() | ||
99 | |||
100 | logger.info(`Registered runner ${runnerName} on ${url}`) | ||
101 | |||
102 | await this.checkAvailableJobs() | ||
103 | } | ||
104 | |||
105 | private loadServer (server: PeerTubeServer) { | ||
106 | this.servers.push(server) | ||
107 | |||
108 | const url = server.url + '/runners' | ||
109 | const socket = io(url, { | ||
110 | auth: { | ||
111 | runnerToken: server.runnerToken | ||
112 | }, | ||
113 | transports: [ 'websocket' ] | ||
114 | }) | ||
115 | |||
116 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | ||
117 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | ||
118 | socket.on('available-jobs', () => this.checkAvailableJobs()) | ||
119 | |||
120 | this.sockets.set(server, socket) | ||
121 | } | ||
122 | |||
123 | async unregisterRunner (options: { | ||
124 | url: string | ||
125 | runnerName: string | ||
126 | }) { | ||
127 | const { url, runnerName } = options | ||
128 | |||
129 | const server = this.servers.find(s => s.url === url && s.runnerName === runnerName) | ||
130 | if (!server) { | ||
131 | logger.error(`Unknown server ${url} - ${runnerName} to unregister`) | ||
132 | return | ||
133 | } | ||
134 | |||
135 | logger.info(`Unregistering runner ${runnerName} on ${url}...`) | ||
136 | |||
137 | try { | ||
138 | await server.runners.unregister({ runnerToken: server.runnerToken }) | ||
139 | } catch (err) { | ||
140 | logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`) | ||
141 | } | ||
142 | |||
143 | this.unloadServer(server) | ||
144 | await this.saveRegisteredInstancesInConf() | ||
145 | |||
146 | logger.info(`Unregistered runner ${runnerName} on ${url}`) | ||
147 | } | ||
148 | |||
149 | private unloadServer (server: PeerTubeServer) { | ||
150 | this.servers = this.servers.filter(s => s !== server) | ||
151 | |||
152 | const socket = this.sockets.get(server) | ||
153 | socket.disconnect() | ||
154 | |||
155 | this.sockets.delete(server) | ||
156 | } | ||
157 | |||
158 | listRegistered () { | ||
159 | return { | ||
160 | servers: this.servers.map(s => { | ||
161 | return { | ||
162 | url: s.url, | ||
163 | runnerName: s.runnerName, | ||
164 | runnerDescription: s.runnerDescription | ||
165 | } | ||
166 | }) | ||
167 | } | ||
168 | } | ||
169 | |||
170 | // --------------------------------------------------------------------------- | ||
171 | |||
172 | private async checkAvailableJobs () { | ||
173 | if (this.checkingAvailableJobs) return | ||
174 | |||
175 | this.checkingAvailableJobs = true | ||
176 | |||
177 | let hadAvailableJob = false | ||
178 | |||
179 | for (const server of shuffle([ ...this.servers ])) { | ||
180 | try { | ||
181 | logger.info('Checking available jobs on ' + server.url) | ||
182 | |||
183 | const job = await this.requestJob(server) | ||
184 | if (!job) continue | ||
185 | |||
186 | hadAvailableJob = true | ||
187 | |||
188 | await this.tryToExecuteJobAsync(server, job) | ||
189 | } catch (err) { | ||
190 | const code = (err.res?.body as PeerTubeProblemDocument)?.code | ||
191 | |||
192 | if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
193 | logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') | ||
194 | return | ||
195 | } | ||
196 | |||
197 | if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
198 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | ||
199 | |||
200 | await this.unregisterRunner({ url: server.url, runnerName: server.runnerName }) | ||
201 | return | ||
202 | } | ||
203 | |||
204 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | ||
205 | } | ||
206 | } | ||
207 | |||
208 | this.checkingAvailableJobs = false | ||
209 | |||
210 | if (hadAvailableJob && this.canProcessMoreJobs()) { | ||
211 | await wait(2500) | ||
212 | |||
213 | this.checkAvailableJobs() | ||
214 | .catch(err => logger.error({ err }, 'Cannot check more available jobs')) | ||
215 | } | ||
216 | } | ||
217 | |||
218 | private async requestJob (server: PeerTubeServer) { | ||
219 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | ||
220 | |||
221 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | ||
222 | |||
223 | const filtered = availableJobs.filter(j => isJobSupported(j)) | ||
224 | |||
225 | if (filtered.length === 0) { | ||
226 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | ||
227 | return undefined | ||
228 | } | ||
229 | |||
230 | return filtered[0] | ||
231 | } | ||
232 | |||
233 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | ||
234 | if (!this.canProcessMoreJobs()) return | ||
235 | |||
236 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | ||
237 | |||
238 | const processingJob = { job, server } | ||
239 | this.processingJobs.push(processingJob) | ||
240 | |||
241 | processJob({ server, job, runnerToken: server.runnerToken }) | ||
242 | .catch(err => { | ||
243 | logger.error({ err }, 'Cannot process job') | ||
244 | |||
245 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | ||
246 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | ||
247 | }) | ||
248 | .finally(() => { | ||
249 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | ||
250 | |||
251 | return this.checkAvailableJobs() | ||
252 | }) | ||
253 | } | ||
254 | |||
255 | // --------------------------------------------------------------------------- | ||
256 | |||
257 | private saveRegisteredInstancesInConf () { | ||
258 | const data = this.servers.map(s => { | ||
259 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | ||
260 | }) | ||
261 | |||
262 | return ConfigManager.Instance.setRegisteredInstances(data) | ||
263 | } | ||
264 | |||
265 | private canProcessMoreJobs () { | ||
266 | return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency | ||
267 | } | ||
268 | |||
269 | // --------------------------------------------------------------------------- | ||
270 | |||
271 | private async cleanupTMP () { | ||
272 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | ||
273 | |||
274 | for (const file of files) { | ||
275 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | ||
276 | } | ||
277 | } | ||
278 | |||
279 | private async onExit () { | ||
280 | if (this.cleaningUp) return | ||
281 | this.cleaningUp = true | ||
282 | |||
283 | logger.info('Cleaning up after program exit') | ||
284 | |||
285 | try { | ||
286 | for (const { server, job } of this.processingJobs) { | ||
287 | await server.runnerJobs.abort({ | ||
288 | jobToken: job.jobToken, | ||
289 | jobUUID: job.uuid, | ||
290 | reason: 'Runner stopped', | ||
291 | runnerToken: server.runnerToken | ||
292 | }) | ||
293 | } | ||
294 | |||
295 | await this.cleanupTMP() | ||
296 | } catch (err) { | ||
297 | logger.error(err) | ||
298 | process.exit(-1) | ||
299 | } | ||
300 | |||
301 | process.exit() | ||
302 | } | ||
303 | |||
304 | static get Instance () { | ||
305 | return this.instance || (this.instance = new this()) | ||
306 | } | ||
307 | } | ||