1 import { ensureDir, readdir, remove } from 'fs-extra'
2 import { join } from 'path'
3 import { io, Socket } from 'socket.io-client'
4 import { pick } from '@shared/core-utils'
5 import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models'
6 import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands'
7 import { ConfigManager } from '../shared'
8 import { IPCServer } from '../shared/ipc'
9 import { logger } from '../shared/logger'
10 import { JobWithToken, processJob } from './process'
11 import { isJobSupported } from './shared'
13 type PeerTubeServer = PeerTubeServerCommand & {
16 runnerDescription?: string
19 export class RunnerServer {
20 private static instance: RunnerServer
22 private servers: PeerTubeServer[] = []
23 private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
25 private checkingAvailableJobs = false
27 private cleaningUp = false
29 private readonly sockets = new Map<PeerTubeServer, Socket>()
31 private constructor () {}
34 logger.info('Running PeerTube runner in server mode')
36 await ConfigManager.Instance.load()
38 for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
39 const serverCommand = new PeerTubeServerCommand({ url: registered.url })
41 this.loadServer(Object.assign(serverCommand, registered))
43 logger.info(`Loading registered instance ${registered.url}`)
47 const ipcServer = new IPCServer()
49 await ipcServer.run(this)
51 logger.error('Cannot start local socket for IPC communication', err)
56 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
57 process.on(code, async (err, origin) => {
58 if (code === 'uncaughtException') {
59 logger.error({ err, origin }, 'uncaughtException')
67 await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
68 await this.cleanupTMP()
70 logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
72 await this.checkAvailableJobs()
75 // ---------------------------------------------------------------------------
77 async registerRunner (options: {
79 registrationToken: string
81 runnerDescription?: string
83 const { url, registrationToken, runnerName, runnerDescription } = options
85 logger.info(`Registering runner ${runnerName} on ${url}...`)
87 const serverCommand = new PeerTubeServerCommand({ url })
88 const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
90 const server: PeerTubeServer = Object.assign(serverCommand, {
96 this.loadServer(server)
97 await this.saveRegisteredInstancesInConf()
99 logger.info(`Registered runner ${runnerName} on ${url}`)
101 await this.checkAvailableJobs()
104 private loadServer (server: PeerTubeServer) {
105 this.servers.push(server)
107 const url = server.url + '/runners'
108 const socket = io(url, {
110 runnerToken: server.runnerToken
112 transports: [ 'websocket' ]
115 socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
116 socket.on('connect', () => logger.info(`Connected to ${url} socket`))
117 socket.on('available-jobs', () => this.checkAvailableJobs())
119 this.sockets.set(server, socket)
122 async unregisterRunner (options: {
125 const { url } = options
127 const server = this.servers.find(s => s.url === url)
129 logger.error(`Unknown server ${url} to unregister`)
133 logger.info(`Unregistering runner ${server.runnerName} on ${url}...`)
136 await server.runners.unregister({ runnerToken: server.runnerToken })
138 logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`)
141 this.unloadServer(server)
142 await this.saveRegisteredInstancesInConf()
144 logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`)
147 private unloadServer (server: PeerTubeServer) {
148 this.servers = this.servers.filter(s => s !== server)
150 const socket = this.sockets.get(server)
153 this.sockets.delete(server)
158 servers: this.servers.map(s => {
161 runnerName: s.runnerName,
162 runnerDescription: s.runnerDescription
168 // ---------------------------------------------------------------------------
170 private async checkAvailableJobs () {
171 if (this.checkingAvailableJobs) return
173 logger.info('Checking available jobs')
175 this.checkingAvailableJobs = true
177 for (const server of this.servers) {
179 const job = await this.requestJob(server)
182 await this.tryToExecuteJobAsync(server, job)
184 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
185 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
187 await this.unregisterRunner({ url: server.url })
191 logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
195 this.checkingAvailableJobs = false
198 private async requestJob (server: PeerTubeServer) {
199 logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
201 const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
203 const filtered = availableJobs.filter(j => isJobSupported(j))
205 if (filtered.length === 0) {
206 logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
213 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
214 if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
216 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
218 const processingJob = { job, server }
219 this.processingJobs.push(processingJob)
221 processJob({ server, job, runnerToken: server.runnerToken })
223 logger.error({ err }, 'Cannot process job')
225 server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
226 .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
229 this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
231 return this.checkAvailableJobs()
235 // ---------------------------------------------------------------------------
237 private saveRegisteredInstancesInConf () {
238 const data = this.servers.map(s => {
239 return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
242 return ConfigManager.Instance.setRegisteredInstances(data)
245 // ---------------------------------------------------------------------------
247 private async cleanupTMP () {
248 const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
250 for (const file of files) {
251 await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
255 private async onExit () {
256 if (this.cleaningUp) return
257 this.cleaningUp = true
259 logger.info('Cleaning up after program exit')
262 for (const { server, job } of this.processingJobs) {
263 await server.runnerJobs.abort({
264 jobToken: job.jobToken,
266 reason: 'Runner stopped',
267 runnerToken: server.runnerToken
271 await this.cleanupTMP()
280 static get Instance () {
281 return this.instance || (this.instance = new this())