aboutsummaryrefslogtreecommitdiffhomepage
path: root/apps/peertube-runner/src/server/server.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/peertube-runner/src/server/server.ts')
-rw-r--r--apps/peertube-runner/src/server/server.ts307
1 files changed, 307 insertions, 0 deletions
diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts
new file mode 100644
index 000000000..825e3f297
--- /dev/null
+++ b/apps/peertube-runner/src/server/server.ts
@@ -0,0 +1,307 @@
1import { ensureDir, remove } from 'fs-extra/esm'
2import { readdir } from 'fs/promises'
3import { join } from 'path'
4import { io, Socket } from 'socket.io-client'
5import { pick, shuffle, wait } from '@peertube/peertube-core-utils'
6import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models'
7import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands'
8import { ConfigManager } from '../shared/index.js'
9import { IPCServer } from '../shared/ipc/index.js'
10import { logger } from '../shared/logger.js'
11import { JobWithToken, processJob } from './process/index.js'
12import { isJobSupported } from './shared/index.js'
13
14type PeerTubeServer = PeerTubeServerCommand & {
15 runnerToken: string
16 runnerName: string
17 runnerDescription?: string
18}
19
20export class RunnerServer {
21 private static instance: RunnerServer
22
23 private servers: PeerTubeServer[] = []
24 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
25
26 private checkingAvailableJobs = false
27
28 private cleaningUp = false
29
30 private readonly sockets = new Map<PeerTubeServer, Socket>()
31
32 private constructor () {}
33
34 async run () {
35 logger.info('Running PeerTube runner in server mode')
36
37 await ConfigManager.Instance.load()
38
39 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
40 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
41
42 this.loadServer(Object.assign(serverCommand, registered))
43
44 logger.info(`Loading registered instance ${registered.url}`)
45 }
46
47 // Run IPC
48 const ipcServer = new IPCServer()
49 try {
50 await ipcServer.run(this)
51 } catch (err) {
52 logger.error('Cannot start local socket for IPC communication', err)
53 process.exit(-1)
54 }
55
56 // Cleanup on exit
57 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
58 process.on(code, async (err, origin) => {
59 if (code === 'uncaughtException') {
60 logger.error({ err, origin }, 'uncaughtException')
61 }
62
63 await this.onExit()
64 })
65 }
66
67 // Process jobs
68 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
69 await this.cleanupTMP()
70
71 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
72
73 await this.checkAvailableJobs()
74 }
75
76 // ---------------------------------------------------------------------------
77
78 async registerRunner (options: {
79 url: string
80 registrationToken: string
81 runnerName: string
82 runnerDescription?: string
83 }) {
84 const { url, registrationToken, runnerName, runnerDescription } = options
85
86 logger.info(`Registering runner ${runnerName} on ${url}...`)
87
88 const serverCommand = new PeerTubeServerCommand({ url })
89 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
90
91 const server: PeerTubeServer = Object.assign(serverCommand, {
92 runnerToken,
93 runnerName,
94 runnerDescription
95 })
96
97 this.loadServer(server)
98 await this.saveRegisteredInstancesInConf()
99
100 logger.info(`Registered runner ${runnerName} on ${url}`)
101
102 await this.checkAvailableJobs()
103 }
104
105 private loadServer (server: PeerTubeServer) {
106 this.servers.push(server)
107
108 const url = server.url + '/runners'
109 const socket = io(url, {
110 auth: {
111 runnerToken: server.runnerToken
112 },
113 transports: [ 'websocket' ]
114 })
115
116 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
117 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
118 socket.on('available-jobs', () => this.checkAvailableJobs())
119
120 this.sockets.set(server, socket)
121 }
122
123 async unregisterRunner (options: {
124 url: string
125 runnerName: string
126 }) {
127 const { url, runnerName } = options
128
129 const server = this.servers.find(s => s.url === url && s.runnerName === runnerName)
130 if (!server) {
131 logger.error(`Unknown server ${url} - ${runnerName} to unregister`)
132 return
133 }
134
135 logger.info(`Unregistering runner ${runnerName} on ${url}...`)
136
137 try {
138 await server.runners.unregister({ runnerToken: server.runnerToken })
139 } catch (err) {
140 logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`)
141 }
142
143 this.unloadServer(server)
144 await this.saveRegisteredInstancesInConf()
145
146 logger.info(`Unregistered runner ${runnerName} on ${url}`)
147 }
148
149 private unloadServer (server: PeerTubeServer) {
150 this.servers = this.servers.filter(s => s !== server)
151
152 const socket = this.sockets.get(server)
153 socket.disconnect()
154
155 this.sockets.delete(server)
156 }
157
158 listRegistered () {
159 return {
160 servers: this.servers.map(s => {
161 return {
162 url: s.url,
163 runnerName: s.runnerName,
164 runnerDescription: s.runnerDescription
165 }
166 })
167 }
168 }
169
170 // ---------------------------------------------------------------------------
171
172 private async checkAvailableJobs () {
173 if (this.checkingAvailableJobs) return
174
175 this.checkingAvailableJobs = true
176
177 let hadAvailableJob = false
178
179 for (const server of shuffle([ ...this.servers ])) {
180 try {
181 logger.info('Checking available jobs on ' + server.url)
182
183 const job = await this.requestJob(server)
184 if (!job) continue
185
186 hadAvailableJob = true
187
188 await this.tryToExecuteJobAsync(server, job)
189 } catch (err) {
190 const code = (err.res?.body as PeerTubeProblemDocument)?.code
191
192 if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
193 logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
194 return
195 }
196
197 if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
198 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
199
200 await this.unregisterRunner({ url: server.url, runnerName: server.runnerName })
201 return
202 }
203
204 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
205 }
206 }
207
208 this.checkingAvailableJobs = false
209
210 if (hadAvailableJob && this.canProcessMoreJobs()) {
211 await wait(2500)
212
213 this.checkAvailableJobs()
214 .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
215 }
216 }
217
218 private async requestJob (server: PeerTubeServer) {
219 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
220
221 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
222
223 const filtered = availableJobs.filter(j => isJobSupported(j))
224
225 if (filtered.length === 0) {
226 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
227 return undefined
228 }
229
230 return filtered[0]
231 }
232
233 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
234 if (!this.canProcessMoreJobs()) return
235
236 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
237
238 const processingJob = { job, server }
239 this.processingJobs.push(processingJob)
240
241 processJob({ server, job, runnerToken: server.runnerToken })
242 .catch(err => {
243 logger.error({ err }, 'Cannot process job')
244
245 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
246 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
247 })
248 .finally(() => {
249 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
250
251 return this.checkAvailableJobs()
252 })
253 }
254
255 // ---------------------------------------------------------------------------
256
257 private saveRegisteredInstancesInConf () {
258 const data = this.servers.map(s => {
259 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
260 })
261
262 return ConfigManager.Instance.setRegisteredInstances(data)
263 }
264
265 private canProcessMoreJobs () {
266 return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
267 }
268
269 // ---------------------------------------------------------------------------
270
271 private async cleanupTMP () {
272 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
273
274 for (const file of files) {
275 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
276 }
277 }
278
279 private async onExit () {
280 if (this.cleaningUp) return
281 this.cleaningUp = true
282
283 logger.info('Cleaning up after program exit')
284
285 try {
286 for (const { server, job } of this.processingJobs) {
287 await server.runnerJobs.abort({
288 jobToken: job.jobToken,
289 jobUUID: job.uuid,
290 reason: 'Runner stopped',
291 runnerToken: server.runnerToken
292 })
293 }
294
295 await this.cleanupTMP()
296 } catch (err) {
297 logger.error(err)
298 process.exit(-1)
299 }
300
301 process.exit()
302 }
303
304 static get Instance () {
305 return this.instance || (this.instance = new this())
306 }
307}