]>
Commit | Line | Data |
---|---|---|
1 | import { ensureDir, readdir, remove } from 'fs-extra' | |
2 | import { join } from 'path' | |
3 | import { io, Socket } from 'socket.io-client' | |
4 | import { pick } from '@shared/core-utils' | |
5 | import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' | |
6 | import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands' | |
7 | import { ConfigManager } from '../shared' | |
8 | import { IPCServer } from '../shared/ipc' | |
9 | import { logger } from '../shared/logger' | |
10 | import { JobWithToken, processJob } from './process' | |
11 | ||
12 | type PeerTubeServer = PeerTubeServerCommand & { | |
13 | runnerToken: string | |
14 | runnerName: string | |
15 | runnerDescription?: string | |
16 | } | |
17 | ||
18 | export class RunnerServer { | |
19 | private static instance: RunnerServer | |
20 | ||
21 | private servers: PeerTubeServer[] = [] | |
22 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | |
23 | ||
24 | private checkingAvailableJobs = false | |
25 | ||
26 | private cleaningUp = false | |
27 | ||
28 | private readonly sockets = new Map<PeerTubeServer, Socket>() | |
29 | ||
30 | private constructor () {} | |
31 | ||
32 | async run () { | |
33 | logger.info('Running PeerTube runner in server mode') | |
34 | ||
35 | await ConfigManager.Instance.load() | |
36 | ||
37 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | |
38 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | |
39 | ||
40 | this.loadServer(Object.assign(serverCommand, registered)) | |
41 | ||
42 | logger.info(`Loading registered instance ${registered.url}`) | |
43 | } | |
44 | ||
45 | // Run IPC | |
46 | const ipcServer = new IPCServer() | |
47 | try { | |
48 | await ipcServer.run(this) | |
49 | } catch (err) { | |
50 | logger.error('Cannot start local socket for IPC communication', err) | |
51 | process.exit(-1) | |
52 | } | |
53 | ||
54 | // Cleanup on exit | |
55 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | |
56 | process.on(code, async (err, origin) => { | |
57 | if (code === 'uncaughtException') { | |
58 | logger.error({ err, origin }, 'uncaughtException') | |
59 | } | |
60 | ||
61 | await this.onExit() | |
62 | }) | |
63 | } | |
64 | ||
65 | // Process jobs | |
66 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | |
67 | await this.cleanupTMP() | |
68 | ||
69 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | |
70 | ||
71 | await this.checkAvailableJobs() | |
72 | } | |
73 | ||
74 | // --------------------------------------------------------------------------- | |
75 | ||
76 | async registerRunner (options: { | |
77 | url: string | |
78 | registrationToken: string | |
79 | runnerName: string | |
80 | runnerDescription?: string | |
81 | }) { | |
82 | const { url, registrationToken, runnerName, runnerDescription } = options | |
83 | ||
84 | logger.info(`Registering runner ${runnerName} on ${url}...`) | |
85 | ||
86 | const serverCommand = new PeerTubeServerCommand({ url }) | |
87 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | |
88 | ||
89 | const server: PeerTubeServer = Object.assign(serverCommand, { | |
90 | runnerToken, | |
91 | runnerName, | |
92 | runnerDescription | |
93 | }) | |
94 | ||
95 | this.loadServer(server) | |
96 | await this.saveRegisteredInstancesInConf() | |
97 | ||
98 | logger.info(`Registered runner ${runnerName} on ${url}`) | |
99 | ||
100 | await this.checkAvailableJobs() | |
101 | } | |
102 | ||
103 | private loadServer (server: PeerTubeServer) { | |
104 | this.servers.push(server) | |
105 | ||
106 | const url = server.url + '/runners' | |
107 | const socket = io(url, { | |
108 | auth: { | |
109 | runnerToken: server.runnerToken | |
110 | }, | |
111 | transports: [ 'websocket' ] | |
112 | }) | |
113 | ||
114 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | |
115 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | |
116 | socket.on('available-jobs', () => this.checkAvailableJobs()) | |
117 | ||
118 | this.sockets.set(server, socket) | |
119 | } | |
120 | ||
121 | async unregisterRunner (options: { | |
122 | url: string | |
123 | }) { | |
124 | const { url } = options | |
125 | ||
126 | const server = this.servers.find(s => s.url === url) | |
127 | if (!server) { | |
128 | logger.error(`Unknown server ${url} to unregister`) | |
129 | return | |
130 | } | |
131 | ||
132 | logger.info(`Unregistering runner ${server.runnerName} on ${url}...`) | |
133 | ||
134 | try { | |
135 | await server.runners.unregister({ runnerToken: server.runnerToken }) | |
136 | } catch (err) { | |
137 | logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`) | |
138 | } | |
139 | ||
140 | this.unloadServer(server) | |
141 | await this.saveRegisteredInstancesInConf() | |
142 | ||
143 | logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`) | |
144 | } | |
145 | ||
146 | private unloadServer (server: PeerTubeServer) { | |
147 | this.servers = this.servers.filter(s => s !== server) | |
148 | ||
149 | const socket = this.sockets.get(server) | |
150 | socket.disconnect() | |
151 | ||
152 | this.sockets.delete(server) | |
153 | } | |
154 | ||
155 | listRegistered () { | |
156 | return { | |
157 | servers: this.servers.map(s => { | |
158 | return { | |
159 | url: s.url, | |
160 | runnerName: s.runnerName, | |
161 | runnerDescription: s.runnerDescription | |
162 | } | |
163 | }) | |
164 | } | |
165 | } | |
166 | ||
167 | // --------------------------------------------------------------------------- | |
168 | ||
169 | private async checkAvailableJobs () { | |
170 | if (this.checkingAvailableJobs) return | |
171 | ||
172 | logger.info('Checking available jobs') | |
173 | ||
174 | this.checkingAvailableJobs = true | |
175 | ||
176 | for (const server of this.servers) { | |
177 | try { | |
178 | const job = await this.requestJob(server) | |
179 | if (!job) continue | |
180 | ||
181 | await this.tryToExecuteJobAsync(server, job) | |
182 | } catch (err) { | |
183 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | |
184 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | |
185 | ||
186 | await this.unregisterRunner({ url: server.url }) | |
187 | return | |
188 | } | |
189 | ||
190 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | |
191 | } | |
192 | } | |
193 | ||
194 | this.checkingAvailableJobs = false | |
195 | } | |
196 | ||
197 | private async requestJob (server: PeerTubeServer) { | |
198 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | |
199 | ||
200 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | |
201 | ||
202 | if (availableJobs.length === 0) { | |
203 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) | |
204 | return undefined | |
205 | } | |
206 | ||
207 | return availableJobs[0] | |
208 | } | |
209 | ||
210 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | |
211 | if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return | |
212 | ||
213 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | |
214 | ||
215 | const processingJob = { job, server } | |
216 | this.processingJobs.push(processingJob) | |
217 | ||
218 | processJob({ server, job, runnerToken: server.runnerToken }) | |
219 | .catch(err => { | |
220 | logger.error({ err }, 'Cannot process job') | |
221 | ||
222 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | |
223 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | |
224 | }) | |
225 | .finally(() => { | |
226 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | |
227 | ||
228 | return this.checkAvailableJobs() | |
229 | }) | |
230 | } | |
231 | ||
232 | // --------------------------------------------------------------------------- | |
233 | ||
234 | private saveRegisteredInstancesInConf () { | |
235 | const data = this.servers.map(s => { | |
236 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | |
237 | }) | |
238 | ||
239 | return ConfigManager.Instance.setRegisteredInstances(data) | |
240 | } | |
241 | ||
242 | // --------------------------------------------------------------------------- | |
243 | ||
244 | private async cleanupTMP () { | |
245 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | |
246 | ||
247 | for (const file of files) { | |
248 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | |
249 | } | |
250 | } | |
251 | ||
252 | private async onExit () { | |
253 | if (this.cleaningUp) return | |
254 | this.cleaningUp = true | |
255 | ||
256 | logger.info('Cleaning up after program exit') | |
257 | ||
258 | try { | |
259 | for (const { server, job } of this.processingJobs) { | |
260 | await server.runnerJobs.abort({ | |
261 | jobToken: job.jobToken, | |
262 | jobUUID: job.uuid, | |
263 | reason: 'Runner stopped', | |
264 | runnerToken: server.runnerToken | |
265 | }) | |
266 | } | |
267 | ||
268 | await this.cleanupTMP() | |
269 | } catch (err) { | |
270 | logger.error(err) | |
271 | process.exit(-1) | |
272 | } | |
273 | ||
274 | process.exit() | |
275 | } | |
276 | ||
277 | static get Instance () { | |
278 | return this.instance || (this.instance = new this()) | |
279 | } | |
280 | } |