aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/server/server.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 15:05:27 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit1772b383de490cf406fe93ef3aa3a941f6db513c (patch)
tree7cecc404c8d71951c22079e9bf5180095981b7f9 /packages/peertube-runner/server/server.ts
parent118626c8752bee7b05c4e0b668852e1aba2416f1 (diff)
downloadPeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.tar.gz
PeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.tar.zst
PeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.zip
Add peertube runner cli
Diffstat (limited to 'packages/peertube-runner/server/server.ts')
-rw-r--r--packages/peertube-runner/server/server.ts269
1 files changed, 269 insertions, 0 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts
new file mode 100644
index 000000000..724f359bd
--- /dev/null
+++ b/packages/peertube-runner/server/server.ts
@@ -0,0 +1,269 @@
1import { ensureDir, readdir, remove } from 'fs-extra'
2import { join } from 'path'
3import { io, Socket } from 'socket.io-client'
4import { pick } from '@shared/core-utils'
5import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models'
6import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands'
7import { ConfigManager } from '../shared'
8import { IPCServer } from '../shared/ipc'
9import { logger } from '../shared/logger'
10import { JobWithToken, processJob } from './process'
11
12type PeerTubeServer = PeerTubeServerCommand & {
13 runnerToken: string
14 runnerName: string
15 runnerDescription?: string
16}
17
18export class RunnerServer {
19 private static instance: RunnerServer
20
21 private servers: PeerTubeServer[] = []
22 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
23
24 private checkingAvailableJobs = false
25
26 private readonly sockets = new Map<PeerTubeServer, Socket>()
27
28 private constructor () {}
29
30 async run () {
31 logger.info('Running PeerTube runner in server mode')
32
33 await ConfigManager.Instance.load()
34
35 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
36 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
37
38 this.loadServer(Object.assign(serverCommand, registered))
39
40 logger.info(`Loading registered instance ${registered.url}`)
41 }
42
43 // Run IPC
44 const ipcServer = new IPCServer()
45 try {
46 await ipcServer.run(this)
47 } catch (err) {
48 console.error('Cannot start local socket for IPC communication', err)
49 process.exit(-1)
50 }
51
52 // Cleanup on exit
53 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
54 process.on(code, async () => {
55 await this.onExit()
56 })
57 }
58
59 // Process jobs
60 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
61 await this.cleanupTMP()
62
63 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
64
65 await this.checkAvailableJobs()
66 }
67
68 // ---------------------------------------------------------------------------
69
70 async registerRunner (options: {
71 url: string
72 registrationToken: string
73 runnerName: string
74 runnerDescription?: string
75 }) {
76 const { url, registrationToken, runnerName, runnerDescription } = options
77
78 logger.info(`Registering runner ${runnerName} on ${url}...`)
79
80 const serverCommand = new PeerTubeServerCommand({ url })
81 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
82
83 const server: PeerTubeServer = Object.assign(serverCommand, {
84 runnerToken,
85 runnerName,
86 runnerDescription
87 })
88
89 this.loadServer(server)
90 await this.saveRegisteredInstancesInConf()
91
92 logger.info(`Registered runner ${runnerName} on ${url}`)
93
94 await this.checkAvailableJobs()
95 }
96
97 private loadServer (server: PeerTubeServer) {
98 this.servers.push(server)
99
100 const url = server.url + '/runners'
101 const socket = io(url, {
102 auth: {
103 runnerToken: server.runnerToken
104 },
105 transports: [ 'websocket' ]
106 })
107
108 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
109 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
110 socket.on('available-jobs', () => this.checkAvailableJobs())
111
112 this.sockets.set(server, socket)
113 }
114
115 async unregisterRunner (options: {
116 url: string
117 }) {
118 const { url } = options
119
120 const server = this.servers.find(s => s.url === url)
121 if (!server) {
122 logger.error(`Unknown server ${url} to unregister`)
123 return
124 }
125
126 logger.info(`Unregistering runner ${server.runnerName} on ${url}...`)
127
128 try {
129 await server.runners.unregister({ runnerToken: server.runnerToken })
130 } catch (err) {
131 logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`)
132 }
133
134 this.unloadServer(server)
135 await this.saveRegisteredInstancesInConf()
136
137 logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`)
138 }
139
140 private unloadServer (server: PeerTubeServer) {
141 this.servers = this.servers.filter(s => s !== server)
142
143 const socket = this.sockets.get(server)
144 socket.disconnect()
145
146 this.sockets.delete(server)
147 }
148
149 listRegistered () {
150 return {
151 servers: this.servers.map(s => {
152 return {
153 url: s.url,
154 runnerName: s.runnerName,
155 runnerDescription: s.runnerDescription
156 }
157 })
158 }
159 }
160
161 // ---------------------------------------------------------------------------
162
163 private async checkAvailableJobs () {
164 if (this.checkingAvailableJobs) return
165
166 logger.info('Checking available jobs')
167
168 this.checkingAvailableJobs = true
169
170 for (const server of this.servers) {
171 try {
172 const job = await this.requestJob(server)
173 if (!job) continue
174
175 await this.tryToExecuteJobAsync(server, job)
176 } catch (err) {
177 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
178 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
179
180 await this.unregisterRunner({ url: server.url })
181 return
182 }
183
184 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
185 }
186 }
187
188 this.checkingAvailableJobs = false
189 }
190
191 private async requestJob (server: PeerTubeServer) {
192 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
193
194 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
195
196 if (availableJobs.length === 0) {
197 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
198 return undefined
199 }
200
201 return availableJobs[0]
202 }
203
204 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
205 if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
206
207 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
208
209 const processingJob = { job, server }
210 this.processingJobs.push(processingJob)
211
212 processJob({ server, job, runnerToken: server.runnerToken })
213 .catch(err => {
214 logger.error({ err }, 'Cannot process job')
215
216 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
217 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
218 })
219 .finally(() => {
220 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
221
222 return this.checkAvailableJobs()
223 })
224 }
225
226 // ---------------------------------------------------------------------------
227
228 private saveRegisteredInstancesInConf () {
229 const data = this.servers.map(s => {
230 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
231 })
232
233 return ConfigManager.Instance.setRegisteredInstances(data)
234 }
235
236 // ---------------------------------------------------------------------------
237
238 private async cleanupTMP () {
239 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
240
241 for (const file of files) {
242 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
243 }
244 }
245
246 private async onExit () {
247 try {
248 for (const { server, job } of this.processingJobs) {
249 await server.runnerJobs.abort({
250 jobToken: job.jobToken,
251 jobUUID: job.uuid,
252 reason: 'Runner stopped',
253 runnerToken: server.runnerToken
254 })
255 }
256
257 await this.cleanupTMP()
258 } catch (err) {
259 console.error(err)
260 process.exit(-1)
261 }
262
263 process.exit()
264 }
265
266 static get Instance () {
267 return this.instance || (this.instance = new this())
268 }
269}