]>
Commit | Line | Data |
---|---|---|
1772b383 C |
1 | import { ensureDir, readdir, remove } from 'fs-extra' |
2 | import { join } from 'path' | |
3 | import { io, Socket } from 'socket.io-client' | |
296d07c6 | 4 | import { pick, wait } from '@shared/core-utils' |
1772b383 C |
5 | import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' |
6 | import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands' | |
7 | import { ConfigManager } from '../shared' | |
8 | import { IPCServer } from '../shared/ipc' | |
9 | import { logger } from '../shared/logger' | |
10 | import { JobWithToken, processJob } from './process' | |
5e47f6ab | 11 | import { isJobSupported } from './shared' |
1772b383 C |
12 | |
13 | type PeerTubeServer = PeerTubeServerCommand & { | |
14 | runnerToken: string | |
15 | runnerName: string | |
16 | runnerDescription?: string | |
17 | } | |
18 | ||
19 | export class RunnerServer { | |
20 | private static instance: RunnerServer | |
21 | ||
22 | private servers: PeerTubeServer[] = [] | |
23 | private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] | |
24 | ||
25 | private checkingAvailableJobs = false | |
26 | ||
3a0c2a77 C |
27 | private cleaningUp = false |
28 | ||
1772b383 C |
29 | private readonly sockets = new Map<PeerTubeServer, Socket>() |
30 | ||
31 | private constructor () {} | |
32 | ||
33 | async run () { | |
34 | logger.info('Running PeerTube runner in server mode') | |
35 | ||
36 | await ConfigManager.Instance.load() | |
37 | ||
38 | for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { | |
39 | const serverCommand = new PeerTubeServerCommand({ url: registered.url }) | |
40 | ||
41 | this.loadServer(Object.assign(serverCommand, registered)) | |
42 | ||
43 | logger.info(`Loading registered instance ${registered.url}`) | |
44 | } | |
45 | ||
46 | // Run IPC | |
47 | const ipcServer = new IPCServer() | |
48 | try { | |
49 | await ipcServer.run(this) | |
50 | } catch (err) { | |
3a0c2a77 | 51 | logger.error('Cannot start local socket for IPC communication', err) |
1772b383 C |
52 | process.exit(-1) |
53 | } | |
54 | ||
55 | // Cleanup on exit | |
56 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | |
3a0c2a77 C |
57 | process.on(code, async (err, origin) => { |
58 | if (code === 'uncaughtException') { | |
59 | logger.error({ err, origin }, 'uncaughtException') | |
60 | } | |
61 | ||
1772b383 C |
62 | await this.onExit() |
63 | }) | |
64 | } | |
65 | ||
66 | // Process jobs | |
67 | await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) | |
68 | await this.cleanupTMP() | |
69 | ||
70 | logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) | |
71 | ||
72 | await this.checkAvailableJobs() | |
73 | } | |
74 | ||
75 | // --------------------------------------------------------------------------- | |
76 | ||
77 | async registerRunner (options: { | |
78 | url: string | |
79 | registrationToken: string | |
80 | runnerName: string | |
81 | runnerDescription?: string | |
82 | }) { | |
83 | const { url, registrationToken, runnerName, runnerDescription } = options | |
84 | ||
85 | logger.info(`Registering runner ${runnerName} on ${url}...`) | |
86 | ||
87 | const serverCommand = new PeerTubeServerCommand({ url }) | |
88 | const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) | |
89 | ||
90 | const server: PeerTubeServer = Object.assign(serverCommand, { | |
91 | runnerToken, | |
92 | runnerName, | |
93 | runnerDescription | |
94 | }) | |
95 | ||
96 | this.loadServer(server) | |
97 | await this.saveRegisteredInstancesInConf() | |
98 | ||
99 | logger.info(`Registered runner ${runnerName} on ${url}`) | |
100 | ||
101 | await this.checkAvailableJobs() | |
102 | } | |
103 | ||
104 | private loadServer (server: PeerTubeServer) { | |
105 | this.servers.push(server) | |
106 | ||
107 | const url = server.url + '/runners' | |
108 | const socket = io(url, { | |
109 | auth: { | |
110 | runnerToken: server.runnerToken | |
111 | }, | |
112 | transports: [ 'websocket' ] | |
113 | }) | |
114 | ||
115 | socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) | |
116 | socket.on('connect', () => logger.info(`Connected to ${url} socket`)) | |
117 | socket.on('available-jobs', () => this.checkAvailableJobs()) | |
118 | ||
119 | this.sockets.set(server, socket) | |
120 | } | |
121 | ||
122 | async unregisterRunner (options: { | |
123 | url: string | |
124 | }) { | |
125 | const { url } = options | |
126 | ||
127 | const server = this.servers.find(s => s.url === url) | |
128 | if (!server) { | |
129 | logger.error(`Unknown server ${url} to unregister`) | |
130 | return | |
131 | } | |
132 | ||
133 | logger.info(`Unregistering runner ${server.runnerName} on ${url}...`) | |
134 | ||
135 | try { | |
136 | await server.runners.unregister({ runnerToken: server.runnerToken }) | |
137 | } catch (err) { | |
138 | logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`) | |
139 | } | |
140 | ||
141 | this.unloadServer(server) | |
142 | await this.saveRegisteredInstancesInConf() | |
143 | ||
144 | logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`) | |
145 | } | |
146 | ||
147 | private unloadServer (server: PeerTubeServer) { | |
148 | this.servers = this.servers.filter(s => s !== server) | |
149 | ||
150 | const socket = this.sockets.get(server) | |
151 | socket.disconnect() | |
152 | ||
153 | this.sockets.delete(server) | |
154 | } | |
155 | ||
156 | listRegistered () { | |
157 | return { | |
158 | servers: this.servers.map(s => { | |
159 | return { | |
160 | url: s.url, | |
161 | runnerName: s.runnerName, | |
162 | runnerDescription: s.runnerDescription | |
163 | } | |
164 | }) | |
165 | } | |
166 | } | |
167 | ||
168 | // --------------------------------------------------------------------------- | |
169 | ||
170 | private async checkAvailableJobs () { | |
171 | if (this.checkingAvailableJobs) return | |
172 | ||
1772b383 C |
173 | this.checkingAvailableJobs = true |
174 | ||
fe7019b2 C |
175 | let hadAvailableJob = false |
176 | ||
1772b383 C |
177 | for (const server of this.servers) { |
178 | try { | |
fe7019b2 C |
179 | logger.info('Checking available jobs on ' + server.url) |
180 | ||
1772b383 C |
181 | const job = await this.requestJob(server) |
182 | if (!job) continue | |
183 | ||
fe7019b2 C |
184 | hadAvailableJob = true |
185 | ||
1772b383 C |
186 | await this.tryToExecuteJobAsync(server, job) |
187 | } catch (err) { | |
fe7019b2 C |
188 | const code = (err.res?.body as PeerTubeProblemDocument)?.code |
189 | ||
190 | if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | |
191 | logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') | |
192 | return | |
193 | } | |
194 | ||
195 | if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | |
1772b383 C |
196 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) |
197 | ||
198 | await this.unregisterRunner({ url: server.url }) | |
199 | return | |
200 | } | |
201 | ||
202 | logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) | |
203 | } | |
204 | } | |
205 | ||
206 | this.checkingAvailableJobs = false | |
fe7019b2 C |
207 | |
208 | if (hadAvailableJob && this.canProcessMoreJobs()) { | |
296d07c6 C |
209 | await wait(2500) |
210 | ||
fe7019b2 C |
211 | this.checkAvailableJobs() |
212 | .catch(err => logger.error({ err }, 'Cannot check more available jobs')) | |
213 | } | |
1772b383 C |
214 | } |
215 | ||
216 | private async requestJob (server: PeerTubeServer) { | |
217 | logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) | |
218 | ||
219 | const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) | |
220 | ||
5e47f6ab C |
221 | const filtered = availableJobs.filter(j => isJobSupported(j)) |
222 | ||
223 | if (filtered.length === 0) { | |
1772b383 C |
224 | logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) |
225 | return undefined | |
226 | } | |
227 | ||
5e47f6ab | 228 | return filtered[0] |
1772b383 C |
229 | } |
230 | ||
231 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | |
fe7019b2 | 232 | if (!this.canProcessMoreJobs()) return |
1772b383 C |
233 | |
234 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | |
235 | ||
236 | const processingJob = { job, server } | |
237 | this.processingJobs.push(processingJob) | |
238 | ||
239 | processJob({ server, job, runnerToken: server.runnerToken }) | |
240 | .catch(err => { | |
241 | logger.error({ err }, 'Cannot process job') | |
242 | ||
243 | server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) | |
244 | .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) | |
245 | }) | |
246 | .finally(() => { | |
247 | this.processingJobs = this.processingJobs.filter(p => p !== processingJob) | |
248 | ||
249 | return this.checkAvailableJobs() | |
250 | }) | |
251 | } | |
252 | ||
253 | // --------------------------------------------------------------------------- | |
254 | ||
255 | private saveRegisteredInstancesInConf () { | |
256 | const data = this.servers.map(s => { | |
257 | return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) | |
258 | }) | |
259 | ||
260 | return ConfigManager.Instance.setRegisteredInstances(data) | |
261 | } | |
262 | ||
fe7019b2 C |
263 | private canProcessMoreJobs () { |
264 | return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency | |
265 | } | |
266 | ||
1772b383 C |
267 | // --------------------------------------------------------------------------- |
268 | ||
269 | private async cleanupTMP () { | |
270 | const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) | |
271 | ||
272 | for (const file of files) { | |
273 | await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) | |
274 | } | |
275 | } | |
276 | ||
277 | private async onExit () { | |
3a0c2a77 C |
278 | if (this.cleaningUp) return |
279 | this.cleaningUp = true | |
280 | ||
281 | logger.info('Cleaning up after program exit') | |
282 | ||
1772b383 C |
283 | try { |
284 | for (const { server, job } of this.processingJobs) { | |
285 | await server.runnerJobs.abort({ | |
286 | jobToken: job.jobToken, | |
287 | jobUUID: job.uuid, | |
288 | reason: 'Runner stopped', | |
289 | runnerToken: server.runnerToken | |
290 | }) | |
291 | } | |
292 | ||
293 | await this.cleanupTMP() | |
294 | } catch (err) { | |
3a0c2a77 | 295 | logger.error(err) |
1772b383 C |
296 | process.exit(-1) |
297 | } | |
298 | ||
299 | process.exit() | |
300 | } | |
301 | ||
302 | static get Instance () { | |
303 | return this.instance || (this.instance = new this()) | |
304 | } | |
305 | } |