diff options
Diffstat (limited to 'packages/peertube-runner/server/server.ts')
-rw-r--r-- | packages/peertube-runner/server/server.ts | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts new file mode 100644 index 000000000..724f359bd --- /dev/null +++ b/packages/peertube-runner/server/server.ts | |||
@@ -0,0 +1,269 @@ | |||
1 | import { ensureDir, readdir, remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { io, Socket } from 'socket.io-client' | ||
4 | import { pick } from '@shared/core-utils' | ||
5 | import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' | ||
6 | import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands' | ||
7 | import { ConfigManager } from '../shared' | ||
8 | import { IPCServer } from '../shared/ipc' | ||
9 | import { logger } from '../shared/logger' | ||
10 | import { JobWithToken, processJob } from './process' | ||
11 | |||
12 | type PeerTubeServer = PeerTubeServerCommand & { | ||
13 | runnerToken: string | ||
14 | runnerName: string | ||
15 | runnerDescription?: string | ||
16 | } | ||
17 | |||
18 | export class RunnerServer { | ||
19 | private static instance: RunnerServer | ||
20 | |||
21 | private servers: PeerTubeServer[] = [] | ||
22 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | ||
23 | |||
24 | private checkingAvailableJobs = false | ||
25 | |||
26 | private readonly sockets = new Map<PeerTubeServer, Socket>() | ||
27 | |||
28 | private constructor () {} | ||
29 | |||
30 | async run () { | ||
31 | logger.info('Running PeerTube runner in server mode') | ||
32 | |||
33 | await ConfigManager.Instance.load() | ||
34 | |||
35 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | ||
36 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | ||
37 | |||
38 | this.loadServer(Object.assign(serverCommand, registered)) | ||
39 | |||
40 | logger.info(`Loading registered instance ${registered.url}`) | ||
41 | } | ||
42 | |||
43 | // Run IPC | ||
44 | const ipcServer = new IPCServer() | ||
45 | try { | ||
46 | await ipcServer.run(this) | ||
47 | } catch (err) { | ||
48 | console.error('Cannot start local socket for IPC communication', err) | ||
49 | process.exit(-1) | ||
50 | } | ||
51 | |||
52 | // Cleanup on exit | ||
53 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | ||
54 | process.on(code, async () => { | ||
55 | await this.onExit() | ||
56 | }) | ||
57 | } | ||
58 | |||
59 | // Process jobs | ||
60 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | ||
61 | await this.cleanupTMP() | ||
62 | |||
63 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | ||
64 | |||
65 | await this.checkAvailableJobs() | ||
66 | } | ||
67 | |||
68 | // --------------------------------------------------------------------------- | ||
69 | |||
70 | async registerRunner (options: { | ||
71 | url: string | ||
72 | registrationToken: string | ||
73 | runnerName: string | ||
74 | runnerDescription?: string | ||
75 | }) { | ||
76 | const { url, registrationToken, runnerName, runnerDescription } = options | ||
77 | |||
78 | logger.info(`Registering runner ${runnerName} on ${url}...`) | ||
79 | |||
80 | const serverCommand = new PeerTubeServerCommand({ url }) | ||
81 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | ||
82 | |||
83 | const server: PeerTubeServer = Object.assign(serverCommand, { | ||
84 | runnerToken, | ||
85 | runnerName, | ||
86 | runnerDescription | ||
87 | }) | ||
88 | |||
89 | this.loadServer(server) | ||
90 | await this.saveRegisteredInstancesInConf() | ||
91 | |||
92 | logger.info(`Registered runner ${runnerName} on ${url}`) | ||
93 | |||
94 | await this.checkAvailableJobs() | ||
95 | } | ||
96 | |||
97 | private loadServer (server: PeerTubeServer) { | ||
98 | this.servers.push(server) | ||
99 | |||
100 | const url = server.url + '/runners' | ||
101 | const socket = io(url, { | ||
102 | auth: { | ||
103 | runnerToken: server.runnerToken | ||
104 | }, | ||
105 | transports: [ 'websocket' ] | ||
106 | }) | ||
107 | |||
108 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | ||
109 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | ||
110 | socket.on('available-jobs', () => this.checkAvailableJobs()) | ||
111 | |||
112 | this.sockets.set(server, socket) | ||
113 | } | ||
114 | |||
115 | async unregisterRunner (options: { | ||
116 | url: string | ||
117 | }) { | ||
118 | const { url } = options | ||
119 | |||
120 | const server = this.servers.find(s => s.url === url) | ||
121 | if (!server) { | ||
122 | logger.error(`Unknown server ${url} to unregister`) | ||
123 | return | ||
124 | } | ||
125 | |||
126 | logger.info(`Unregistering runner ${server.runnerName} on ${url}...`) | ||
127 | |||
128 | try { | ||
129 | await server.runners.unregister({ runnerToken: server.runnerToken }) | ||
130 | } catch (err) { | ||
131 | logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`) | ||
132 | } | ||
133 | |||
134 | this.unloadServer(server) | ||
135 | await this.saveRegisteredInstancesInConf() | ||
136 | |||
137 | logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`) | ||
138 | } | ||
139 | |||
140 | private unloadServer (server: PeerTubeServer) { | ||
141 | this.servers = this.servers.filter(s => s !== server) | ||
142 | |||
143 | const socket = this.sockets.get(server) | ||
144 | socket.disconnect() | ||
145 | |||
146 | this.sockets.delete(server) | ||
147 | } | ||
148 | |||
149 | listRegistered () { | ||
150 | return { | ||
151 | servers: this.servers.map(s => { | ||
152 | return { | ||
153 | url: s.url, | ||
154 | runnerName: s.runnerName, | ||
155 | runnerDescription: s.runnerDescription | ||
156 | } | ||
157 | }) | ||
158 | } | ||
159 | } | ||
160 | |||
161 | // --------------------------------------------------------------------------- | ||
162 | |||
163 | private async checkAvailableJobs () { | ||
164 | if (this.checkingAvailableJobs) return | ||
165 | |||
166 | logger.info('Checking available jobs') | ||
167 | |||
168 | this.checkingAvailableJobs = true | ||
169 | |||
170 | for (const server of this.servers) { | ||
171 | try { | ||
172 | const job = await this.requestJob(server) | ||
173 | if (!job) continue | ||
174 | |||
175 | await this.tryToExecuteJobAsync(server, job) | ||
176 | } catch (err) { | ||
177 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
178 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | ||
179 | |||
180 | await this.unregisterRunner({ url: server.url }) | ||
181 | return | ||
182 | } | ||
183 | |||
184 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | ||
185 | } | ||
186 | } | ||
187 | |||
188 | this.checkingAvailableJobs = false | ||
189 | } | ||
190 | |||
191 | private async requestJob (server: PeerTubeServer) { | ||
192 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | ||
193 | |||
194 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | ||
195 | |||
196 | if (availableJobs.length === 0) { | ||
197 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | ||
198 | return undefined | ||
199 | } | ||
200 | |||
201 | return availableJobs[0] | ||
202 | } | ||
203 | |||
204 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | ||
205 | if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return | ||
206 | |||
207 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | ||
208 | |||
209 | const processingJob = { job, server } | ||
210 | this.processingJobs.push(processingJob) | ||
211 | |||
212 | processJob({ server, job, runnerToken: server.runnerToken }) | ||
213 | .catch(err => { | ||
214 | logger.error({ err }, 'Cannot process job') | ||
215 | |||
216 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | ||
217 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | ||
218 | }) | ||
219 | .finally(() => { | ||
220 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | ||
221 | |||
222 | return this.checkAvailableJobs() | ||
223 | }) | ||
224 | } | ||
225 | |||
226 | // --------------------------------------------------------------------------- | ||
227 | |||
228 | private saveRegisteredInstancesInConf () { | ||
229 | const data = this.servers.map(s => { | ||
230 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | ||
231 | }) | ||
232 | |||
233 | return ConfigManager.Instance.setRegisteredInstances(data) | ||
234 | } | ||
235 | |||
236 | // --------------------------------------------------------------------------- | ||
237 | |||
238 | private async cleanupTMP () { | ||
239 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | ||
240 | |||
241 | for (const file of files) { | ||
242 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | ||
243 | } | ||
244 | } | ||
245 | |||
246 | private async onExit () { | ||
247 | try { | ||
248 | for (const { server, job } of this.processingJobs) { | ||
249 | await server.runnerJobs.abort({ | ||
250 | jobToken: job.jobToken, | ||
251 | jobUUID: job.uuid, | ||
252 | reason: 'Runner stopped', | ||
253 | runnerToken: server.runnerToken | ||
254 | }) | ||
255 | } | ||
256 | |||
257 | await this.cleanupTMP() | ||
258 | } catch (err) { | ||
259 | console.error(err) | ||
260 | process.exit(-1) | ||
261 | } | ||
262 | |||
263 | process.exit() | ||
264 | } | ||
265 | |||
266 | static get Instance () { | ||
267 | return this.instance || (this.instance = new this()) | ||
268 | } | ||
269 | } | ||