]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - packages/peertube-runner/server/server.ts
Specify runner name when unregistering the runner
[github/Chocobozzz/PeerTube.git] / packages / peertube-runner / server / server.ts
CommitLineData
1772b383
C
1import { ensureDir, readdir, remove } from 'fs-extra'
2import { join } from 'path'
3import { io, Socket } from 'socket.io-client'
296d07c6 4import { pick, wait } from '@shared/core-utils'
1772b383
C
5import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models'
6import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands'
7import { ConfigManager } from '../shared'
8import { IPCServer } from '../shared/ipc'
9import { logger } from '../shared/logger'
10import { JobWithToken, processJob } from './process'
5e47f6ab 11import { isJobSupported } from './shared'
1772b383
C
12
13type PeerTubeServer = PeerTubeServerCommand & {
14 runnerToken: string
15 runnerName: string
16 runnerDescription?: string
17}
18
19export class RunnerServer {
20 private static instance: RunnerServer
21
22 private servers: PeerTubeServer[] = []
23 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
24
25 private checkingAvailableJobs = false
26
3a0c2a77
C
27 private cleaningUp = false
28
1772b383
C
29 private readonly sockets = new Map<PeerTubeServer, Socket>()
30
31 private constructor () {}
32
33 async run () {
34 logger.info('Running PeerTube runner in server mode')
35
36 await ConfigManager.Instance.load()
37
38 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
39 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
40
41 this.loadServer(Object.assign(serverCommand, registered))
42
43 logger.info(`Loading registered instance ${registered.url}`)
44 }
45
46 // Run IPC
47 const ipcServer = new IPCServer()
48 try {
49 await ipcServer.run(this)
50 } catch (err) {
3a0c2a77 51 logger.error('Cannot start local socket for IPC communication', err)
1772b383
C
52 process.exit(-1)
53 }
54
55 // Cleanup on exit
56 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
3a0c2a77
C
57 process.on(code, async (err, origin) => {
58 if (code === 'uncaughtException') {
59 logger.error({ err, origin }, 'uncaughtException')
60 }
61
1772b383
C
62 await this.onExit()
63 })
64 }
65
66 // Process jobs
67 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
68 await this.cleanupTMP()
69
70 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
71
72 await this.checkAvailableJobs()
73 }
74
75 // ---------------------------------------------------------------------------
76
77 async registerRunner (options: {
78 url: string
79 registrationToken: string
80 runnerName: string
81 runnerDescription?: string
82 }) {
83 const { url, registrationToken, runnerName, runnerDescription } = options
84
85 logger.info(`Registering runner ${runnerName} on ${url}...`)
86
87 const serverCommand = new PeerTubeServerCommand({ url })
88 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
89
90 const server: PeerTubeServer = Object.assign(serverCommand, {
91 runnerToken,
92 runnerName,
93 runnerDescription
94 })
95
96 this.loadServer(server)
97 await this.saveRegisteredInstancesInConf()
98
99 logger.info(`Registered runner ${runnerName} on ${url}`)
100
101 await this.checkAvailableJobs()
102 }
103
104 private loadServer (server: PeerTubeServer) {
105 this.servers.push(server)
106
107 const url = server.url + '/runners'
108 const socket = io(url, {
109 auth: {
110 runnerToken: server.runnerToken
111 },
112 transports: [ 'websocket' ]
113 })
114
115 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
116 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
117 socket.on('available-jobs', () => this.checkAvailableJobs())
118
119 this.sockets.set(server, socket)
120 }
121
122 async unregisterRunner (options: {
123 url: string
f474a519 124 runnerName: string
1772b383 125 }) {
f474a519 126 const { url, runnerName } = options
1772b383 127
f474a519 128 const server = this.servers.find(s => s.url === url && s.runnerName === runnerName)
1772b383 129 if (!server) {
f474a519 130 logger.error(`Unknown server ${url} - ${runnerName} to unregister`)
1772b383
C
131 return
132 }
133
f474a519 134 logger.info(`Unregistering runner ${runnerName} on ${url}...`)
1772b383
C
135
136 try {
137 await server.runners.unregister({ runnerToken: server.runnerToken })
138 } catch (err) {
f474a519 139 logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`)
1772b383
C
140 }
141
142 this.unloadServer(server)
143 await this.saveRegisteredInstancesInConf()
144
f474a519 145 logger.info(`Unregistered runner ${runnerName} on ${url}`)
1772b383
C
146 }
147
148 private unloadServer (server: PeerTubeServer) {
149 this.servers = this.servers.filter(s => s !== server)
150
151 const socket = this.sockets.get(server)
152 socket.disconnect()
153
154 this.sockets.delete(server)
155 }
156
157 listRegistered () {
158 return {
159 servers: this.servers.map(s => {
160 return {
161 url: s.url,
162 runnerName: s.runnerName,
163 runnerDescription: s.runnerDescription
164 }
165 })
166 }
167 }
168
169 // ---------------------------------------------------------------------------
170
171 private async checkAvailableJobs () {
172 if (this.checkingAvailableJobs) return
173
1772b383
C
174 this.checkingAvailableJobs = true
175
fe7019b2
C
176 let hadAvailableJob = false
177
1772b383
C
178 for (const server of this.servers) {
179 try {
fe7019b2
C
180 logger.info('Checking available jobs on ' + server.url)
181
1772b383
C
182 const job = await this.requestJob(server)
183 if (!job) continue
184
fe7019b2
C
185 hadAvailableJob = true
186
1772b383
C
187 await this.tryToExecuteJobAsync(server, job)
188 } catch (err) {
fe7019b2
C
189 const code = (err.res?.body as PeerTubeProblemDocument)?.code
190
191 if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
192 logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
193 return
194 }
195
196 if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
1772b383
C
197 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
198
f474a519 199 await this.unregisterRunner({ url: server.url, runnerName: server.runnerName })
1772b383
C
200 return
201 }
202
203 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
204 }
205 }
206
207 this.checkingAvailableJobs = false
fe7019b2
C
208
209 if (hadAvailableJob && this.canProcessMoreJobs()) {
296d07c6
C
210 await wait(2500)
211
fe7019b2
C
212 this.checkAvailableJobs()
213 .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
214 }
1772b383
C
215 }
216
217 private async requestJob (server: PeerTubeServer) {
218 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
219
220 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
221
5e47f6ab
C
222 const filtered = availableJobs.filter(j => isJobSupported(j))
223
224 if (filtered.length === 0) {
1772b383
C
225 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
226 return undefined
227 }
228
5e47f6ab 229 return filtered[0]
1772b383
C
230 }
231
232 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
fe7019b2 233 if (!this.canProcessMoreJobs()) return
1772b383
C
234
235 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
236
237 const processingJob = { job, server }
238 this.processingJobs.push(processingJob)
239
240 processJob({ server, job, runnerToken: server.runnerToken })
241 .catch(err => {
242 logger.error({ err }, 'Cannot process job')
243
244 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
245 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
246 })
247 .finally(() => {
248 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
249
250 return this.checkAvailableJobs()
251 })
252 }
253
254 // ---------------------------------------------------------------------------
255
256 private saveRegisteredInstancesInConf () {
257 const data = this.servers.map(s => {
258 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
259 })
260
261 return ConfigManager.Instance.setRegisteredInstances(data)
262 }
263
fe7019b2
C
264 private canProcessMoreJobs () {
265 return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
266 }
267
1772b383
C
268 // ---------------------------------------------------------------------------
269
270 private async cleanupTMP () {
271 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
272
273 for (const file of files) {
274 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
275 }
276 }
277
278 private async onExit () {
3a0c2a77
C
279 if (this.cleaningUp) return
280 this.cleaningUp = true
281
282 logger.info('Cleaning up after program exit')
283
1772b383
C
284 try {
285 for (const { server, job } of this.processingJobs) {
286 await server.runnerJobs.abort({
287 jobToken: job.jobToken,
288 jobUUID: job.uuid,
289 reason: 'Runner stopped',
290 runnerToken: server.runnerToken
291 })
292 }
293
294 await this.cleanupTMP()
295 } catch (err) {
3a0c2a77 296 logger.error(err)
1772b383
C
297 process.exit(-1)
298 }
299
300 process.exit()
301 }
302
303 static get Instance () {
304 return this.instance || (this.instance = new this())
305 }
306}