diff options
Diffstat (limited to 'packages/peertube-runner/server/server.ts')
-rw-r--r-- | packages/peertube-runner/server/server.ts | 306 |
1 files changed, 0 insertions, 306 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts deleted file mode 100644 index 5fa86fa1a..000000000 --- a/packages/peertube-runner/server/server.ts +++ /dev/null | |||
@@ -1,306 +0,0 @@ | |||
1 | import { ensureDir, readdir, remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { io, Socket } from 'socket.io-client' | ||
4 | import { pick, shuffle, wait } from '@shared/core-utils' | ||
5 | import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' | ||
6 | import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands' | ||
7 | import { ConfigManager } from '../shared' | ||
8 | import { IPCServer } from '../shared/ipc' | ||
9 | import { logger } from '../shared/logger' | ||
10 | import { JobWithToken, processJob } from './process' | ||
11 | import { isJobSupported } from './shared' | ||
12 | |||
13 | type PeerTubeServer = PeerTubeServerCommand & { | ||
14 | runnerToken: string | ||
15 | runnerName: string | ||
16 | runnerDescription?: string | ||
17 | } | ||
18 | |||
19 | export class RunnerServer { | ||
20 | private static instance: RunnerServer | ||
21 | |||
22 | private servers: PeerTubeServer[] = [] | ||
23 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | ||
24 | |||
25 | private checkingAvailableJobs = false | ||
26 | |||
27 | private cleaningUp = false | ||
28 | |||
29 | private readonly sockets = new Map<PeerTubeServer, Socket>() | ||
30 | |||
31 | private constructor () {} | ||
32 | |||
33 | async run () { | ||
34 | logger.info('Running PeerTube runner in server mode') | ||
35 | |||
36 | await ConfigManager.Instance.load() | ||
37 | |||
38 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | ||
39 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | ||
40 | |||
41 | this.loadServer(Object.assign(serverCommand, registered)) | ||
42 | |||
43 | logger.info(`Loading registered instance ${registered.url}`) | ||
44 | } | ||
45 | |||
46 | // Run IPC | ||
47 | const ipcServer = new IPCServer() | ||
48 | try { | ||
49 | await ipcServer.run(this) | ||
50 | } catch (err) { | ||
51 | logger.error('Cannot start local socket for IPC communication', err) | ||
52 | process.exit(-1) | ||
53 | } | ||
54 | |||
55 | // Cleanup on exit | ||
56 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | ||
57 | process.on(code, async (err, origin) => { | ||
58 | if (code === 'uncaughtException') { | ||
59 | logger.error({ err, origin }, 'uncaughtException') | ||
60 | } | ||
61 | |||
62 | await this.onExit() | ||
63 | }) | ||
64 | } | ||
65 | |||
66 | // Process jobs | ||
67 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | ||
68 | await this.cleanupTMP() | ||
69 | |||
70 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | ||
71 | |||
72 | await this.checkAvailableJobs() | ||
73 | } | ||
74 | |||
75 | // --------------------------------------------------------------------------- | ||
76 | |||
77 | async registerRunner (options: { | ||
78 | url: string | ||
79 | registrationToken: string | ||
80 | runnerName: string | ||
81 | runnerDescription?: string | ||
82 | }) { | ||
83 | const { url, registrationToken, runnerName, runnerDescription } = options | ||
84 | |||
85 | logger.info(`Registering runner ${runnerName} on ${url}...`) | ||
86 | |||
87 | const serverCommand = new PeerTubeServerCommand({ url }) | ||
88 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | ||
89 | |||
90 | const server: PeerTubeServer = Object.assign(serverCommand, { | ||
91 | runnerToken, | ||
92 | runnerName, | ||
93 | runnerDescription | ||
94 | }) | ||
95 | |||
96 | this.loadServer(server) | ||
97 | await this.saveRegisteredInstancesInConf() | ||
98 | |||
99 | logger.info(`Registered runner ${runnerName} on ${url}`) | ||
100 | |||
101 | await this.checkAvailableJobs() | ||
102 | } | ||
103 | |||
104 | private loadServer (server: PeerTubeServer) { | ||
105 | this.servers.push(server) | ||
106 | |||
107 | const url = server.url + '/runners' | ||
108 | const socket = io(url, { | ||
109 | auth: { | ||
110 | runnerToken: server.runnerToken | ||
111 | }, | ||
112 | transports: [ 'websocket' ] | ||
113 | }) | ||
114 | |||
115 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | ||
116 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | ||
117 | socket.on('available-jobs', () => this.checkAvailableJobs()) | ||
118 | |||
119 | this.sockets.set(server, socket) | ||
120 | } | ||
121 | |||
122 | async unregisterRunner (options: { | ||
123 | url: string | ||
124 | runnerName: string | ||
125 | }) { | ||
126 | const { url, runnerName } = options | ||
127 | |||
128 | const server = this.servers.find(s => s.url === url && s.runnerName === runnerName) | ||
129 | if (!server) { | ||
130 | logger.error(`Unknown server ${url} - ${runnerName} to unregister`) | ||
131 | return | ||
132 | } | ||
133 | |||
134 | logger.info(`Unregistering runner ${runnerName} on ${url}...`) | ||
135 | |||
136 | try { | ||
137 | await server.runners.unregister({ runnerToken: server.runnerToken }) | ||
138 | } catch (err) { | ||
139 | logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`) | ||
140 | } | ||
141 | |||
142 | this.unloadServer(server) | ||
143 | await this.saveRegisteredInstancesInConf() | ||
144 | |||
145 | logger.info(`Unregistered runner ${runnerName} on ${url}`) | ||
146 | } | ||
147 | |||
148 | private unloadServer (server: PeerTubeServer) { | ||
149 | this.servers = this.servers.filter(s => s !== server) | ||
150 | |||
151 | const socket = this.sockets.get(server) | ||
152 | socket.disconnect() | ||
153 | |||
154 | this.sockets.delete(server) | ||
155 | } | ||
156 | |||
157 | listRegistered () { | ||
158 | return { | ||
159 | servers: this.servers.map(s => { | ||
160 | return { | ||
161 | url: s.url, | ||
162 | runnerName: s.runnerName, | ||
163 | runnerDescription: s.runnerDescription | ||
164 | } | ||
165 | }) | ||
166 | } | ||
167 | } | ||
168 | |||
169 | // --------------------------------------------------------------------------- | ||
170 | |||
171 | private async checkAvailableJobs () { | ||
172 | if (this.checkingAvailableJobs) return | ||
173 | |||
174 | this.checkingAvailableJobs = true | ||
175 | |||
176 | let hadAvailableJob = false | ||
177 | |||
178 | for (const server of shuffle([ ...this.servers ])) { | ||
179 | try { | ||
180 | logger.info('Checking available jobs on ' + server.url) | ||
181 | |||
182 | const job = await this.requestJob(server) | ||
183 | if (!job) continue | ||
184 | |||
185 | hadAvailableJob = true | ||
186 | |||
187 | await this.tryToExecuteJobAsync(server, job) | ||
188 | } catch (err) { | ||
189 | const code = (err.res?.body as PeerTubeProblemDocument)?.code | ||
190 | |||
191 | if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
192 | logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') | ||
193 | return | ||
194 | } | ||
195 | |||
196 | if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
197 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | ||
198 | |||
199 | await this.unregisterRunner({ url: server.url, runnerName: server.runnerName }) | ||
200 | return | ||
201 | } | ||
202 | |||
203 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | ||
204 | } | ||
205 | } | ||
206 | |||
207 | this.checkingAvailableJobs = false | ||
208 | |||
209 | if (hadAvailableJob && this.canProcessMoreJobs()) { | ||
210 | await wait(2500) | ||
211 | |||
212 | this.checkAvailableJobs() | ||
213 | .catch(err => logger.error({ err }, 'Cannot check more available jobs')) | ||
214 | } | ||
215 | } | ||
216 | |||
217 | private async requestJob (server: PeerTubeServer) { | ||
218 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | ||
219 | |||
220 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | ||
221 | |||
222 | const filtered = availableJobs.filter(j => isJobSupported(j)) | ||
223 | |||
224 | if (filtered.length === 0) { | ||
225 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | ||
226 | return undefined | ||
227 | } | ||
228 | |||
229 | return filtered[0] | ||
230 | } | ||
231 | |||
232 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | ||
233 | if (!this.canProcessMoreJobs()) return | ||
234 | |||
235 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | ||
236 | |||
237 | const processingJob = { job, server } | ||
238 | this.processingJobs.push(processingJob) | ||
239 | |||
240 | processJob({ server, job, runnerToken: server.runnerToken }) | ||
241 | .catch(err => { | ||
242 | logger.error({ err }, 'Cannot process job') | ||
243 | |||
244 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | ||
245 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | ||
246 | }) | ||
247 | .finally(() => { | ||
248 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | ||
249 | |||
250 | return this.checkAvailableJobs() | ||
251 | }) | ||
252 | } | ||
253 | |||
254 | // --------------------------------------------------------------------------- | ||
255 | |||
256 | private saveRegisteredInstancesInConf () { | ||
257 | const data = this.servers.map(s => { | ||
258 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | ||
259 | }) | ||
260 | |||
261 | return ConfigManager.Instance.setRegisteredInstances(data) | ||
262 | } | ||
263 | |||
264 | private canProcessMoreJobs () { | ||
265 | return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency | ||
266 | } | ||
267 | |||
268 | // --------------------------------------------------------------------------- | ||
269 | |||
270 | private async cleanupTMP () { | ||
271 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | ||
272 | |||
273 | for (const file of files) { | ||
274 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | ||
275 | } | ||
276 | } | ||
277 | |||
278 | private async onExit () { | ||
279 | if (this.cleaningUp) return | ||
280 | this.cleaningUp = true | ||
281 | |||
282 | logger.info('Cleaning up after program exit') | ||
283 | |||
284 | try { | ||
285 | for (const { server, job } of this.processingJobs) { | ||
286 | await server.runnerJobs.abort({ | ||
287 | jobToken: job.jobToken, | ||
288 | jobUUID: job.uuid, | ||
289 | reason: 'Runner stopped', | ||
290 | runnerToken: server.runnerToken | ||
291 | }) | ||
292 | } | ||
293 | |||
294 | await this.cleanupTMP() | ||
295 | } catch (err) { | ||
296 | logger.error(err) | ||
297 | process.exit(-1) | ||
298 | } | ||
299 | |||
300 | process.exit() | ||
301 | } | ||
302 | |||
303 | static get Instance () { | ||
304 | return this.instance || (this.instance = new this()) | ||
305 | } | ||
306 | } | ||