]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - packages/peertube-runner/server/server.ts
Add TMP persistent directory
[github/Chocobozzz/PeerTube.git] / packages / peertube-runner / server / server.ts
CommitLineData
1772b383
C
1import { ensureDir, readdir, remove } from 'fs-extra'
2import { join } from 'path'
3import { io, Socket } from 'socket.io-client'
4import { pick } from '@shared/core-utils'
5import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models'
6import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands'
7import { ConfigManager } from '../shared'
8import { IPCServer } from '../shared/ipc'
9import { logger } from '../shared/logger'
10import { JobWithToken, processJob } from './process'
11
12type PeerTubeServer = PeerTubeServerCommand & {
13 runnerToken: string
14 runnerName: string
15 runnerDescription?: string
16}
17
18export class RunnerServer {
19 private static instance: RunnerServer
20
21 private servers: PeerTubeServer[] = []
22 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
23
24 private checkingAvailableJobs = false
25
3a0c2a77
C
26 private cleaningUp = false
27
1772b383
C
28 private readonly sockets = new Map<PeerTubeServer, Socket>()
29
30 private constructor () {}
31
32 async run () {
33 logger.info('Running PeerTube runner in server mode')
34
35 await ConfigManager.Instance.load()
36
37 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
38 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
39
40 this.loadServer(Object.assign(serverCommand, registered))
41
42 logger.info(`Loading registered instance ${registered.url}`)
43 }
44
45 // Run IPC
46 const ipcServer = new IPCServer()
47 try {
48 await ipcServer.run(this)
49 } catch (err) {
3a0c2a77 50 logger.error('Cannot start local socket for IPC communication', err)
1772b383
C
51 process.exit(-1)
52 }
53
54 // Cleanup on exit
55 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
3a0c2a77
C
56 process.on(code, async (err, origin) => {
57 if (code === 'uncaughtException') {
58 logger.error({ err, origin }, 'uncaughtException')
59 }
60
1772b383
C
61 await this.onExit()
62 })
63 }
64
65 // Process jobs
66 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
67 await this.cleanupTMP()
68
69 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
70
71 await this.checkAvailableJobs()
72 }
73
74 // ---------------------------------------------------------------------------
75
76 async registerRunner (options: {
77 url: string
78 registrationToken: string
79 runnerName: string
80 runnerDescription?: string
81 }) {
82 const { url, registrationToken, runnerName, runnerDescription } = options
83
84 logger.info(`Registering runner ${runnerName} on ${url}...`)
85
86 const serverCommand = new PeerTubeServerCommand({ url })
87 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
88
89 const server: PeerTubeServer = Object.assign(serverCommand, {
90 runnerToken,
91 runnerName,
92 runnerDescription
93 })
94
95 this.loadServer(server)
96 await this.saveRegisteredInstancesInConf()
97
98 logger.info(`Registered runner ${runnerName} on ${url}`)
99
100 await this.checkAvailableJobs()
101 }
102
103 private loadServer (server: PeerTubeServer) {
104 this.servers.push(server)
105
106 const url = server.url + '/runners'
107 const socket = io(url, {
108 auth: {
109 runnerToken: server.runnerToken
110 },
111 transports: [ 'websocket' ]
112 })
113
114 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
115 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
116 socket.on('available-jobs', () => this.checkAvailableJobs())
117
118 this.sockets.set(server, socket)
119 }
120
121 async unregisterRunner (options: {
122 url: string
123 }) {
124 const { url } = options
125
126 const server = this.servers.find(s => s.url === url)
127 if (!server) {
128 logger.error(`Unknown server ${url} to unregister`)
129 return
130 }
131
132 logger.info(`Unregistering runner ${server.runnerName} on ${url}...`)
133
134 try {
135 await server.runners.unregister({ runnerToken: server.runnerToken })
136 } catch (err) {
137 logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`)
138 }
139
140 this.unloadServer(server)
141 await this.saveRegisteredInstancesInConf()
142
143 logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`)
144 }
145
146 private unloadServer (server: PeerTubeServer) {
147 this.servers = this.servers.filter(s => s !== server)
148
149 const socket = this.sockets.get(server)
150 socket.disconnect()
151
152 this.sockets.delete(server)
153 }
154
155 listRegistered () {
156 return {
157 servers: this.servers.map(s => {
158 return {
159 url: s.url,
160 runnerName: s.runnerName,
161 runnerDescription: s.runnerDescription
162 }
163 })
164 }
165 }
166
167 // ---------------------------------------------------------------------------
168
169 private async checkAvailableJobs () {
170 if (this.checkingAvailableJobs) return
171
172 logger.info('Checking available jobs')
173
174 this.checkingAvailableJobs = true
175
176 for (const server of this.servers) {
177 try {
178 const job = await this.requestJob(server)
179 if (!job) continue
180
181 await this.tryToExecuteJobAsync(server, job)
182 } catch (err) {
183 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
184 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
185
186 await this.unregisterRunner({ url: server.url })
187 return
188 }
189
190 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
191 }
192 }
193
194 this.checkingAvailableJobs = false
195 }
196
197 private async requestJob (server: PeerTubeServer) {
198 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
199
200 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
201
202 if (availableJobs.length === 0) {
203 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
204 return undefined
205 }
206
207 return availableJobs[0]
208 }
209
210 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
211 if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
212
213 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
214
215 const processingJob = { job, server }
216 this.processingJobs.push(processingJob)
217
218 processJob({ server, job, runnerToken: server.runnerToken })
219 .catch(err => {
220 logger.error({ err }, 'Cannot process job')
221
222 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
223 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
224 })
225 .finally(() => {
226 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
227
228 return this.checkAvailableJobs()
229 })
230 }
231
232 // ---------------------------------------------------------------------------
233
234 private saveRegisteredInstancesInConf () {
235 const data = this.servers.map(s => {
236 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
237 })
238
239 return ConfigManager.Instance.setRegisteredInstances(data)
240 }
241
242 // ---------------------------------------------------------------------------
243
244 private async cleanupTMP () {
245 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
246
247 for (const file of files) {
248 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
249 }
250 }
251
252 private async onExit () {
3a0c2a77
C
253 if (this.cleaningUp) return
254 this.cleaningUp = true
255
256 logger.info('Cleaning up after program exit')
257
1772b383
C
258 try {
259 for (const { server, job } of this.processingJobs) {
260 await server.runnerJobs.abort({
261 jobToken: job.jobToken,
262 jobUUID: job.uuid,
263 reason: 'Runner stopped',
264 runnerToken: server.runnerToken
265 })
266 }
267
268 await this.cleanupTMP()
269 } catch (err) {
3a0c2a77 270 logger.error(err)
1772b383
C
271 process.exit(-1)
272 }
273
274 process.exit()
275 }
276
277 static get Instance () {
278 return this.instance || (this.instance = new this())
279 }
280}