From 1772b383de490cf406fe93ef3aa3a941f6db513c Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 15:05:27 +0200 Subject: Add peertube runner cli --- packages/peertube-runner/server/server.ts | 269 ++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 packages/peertube-runner/server/server.ts (limited to 'packages/peertube-runner/server/server.ts') diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts new file mode 100644 index 000000000..724f359bd --- /dev/null +++ b/packages/peertube-runner/server/server.ts @@ -0,0 +1,269 @@ +import { ensureDir, readdir, remove } from 'fs-extra' +import { join } from 'path' +import { io, Socket } from 'socket.io-client' +import { pick } from '@shared/core-utils' +import { PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' +import { PeerTubeServer as PeerTubeServerCommand } from '@shared/server-commands' +import { ConfigManager } from '../shared' +import { IPCServer } from '../shared/ipc' +import { logger } from '../shared/logger' +import { JobWithToken, processJob } from './process' + +type PeerTubeServer = PeerTubeServerCommand & { + runnerToken: string + runnerName: string + runnerDescription?: string +} + +export class RunnerServer { + private static instance: RunnerServer + + private servers: PeerTubeServer[] = [] + private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] + + private checkingAvailableJobs = false + + private readonly sockets = new Map() + + private constructor () {} + + async run () { + logger.info('Running PeerTube runner in server mode') + + await ConfigManager.Instance.load() + + for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { + const serverCommand = new PeerTubeServerCommand({ url: registered.url }) + + this.loadServer(Object.assign(serverCommand, registered)) + + logger.info(`Loading registered instance ${registered.url}`) + } + + // Run IPC + const ipcServer = new IPCServer() + try { + await ipcServer.run(this) + } catch (err) { + console.error('Cannot start local socket for IPC communication', err) + process.exit(-1) + } + + // Cleanup on exit + for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { + process.on(code, async () => { + await this.onExit() + }) + } + + // Process jobs + await ensureDir(ConfigManager.Instance.getTranscodingDirectory()) + await this.cleanupTMP() + + logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`) + + await this.checkAvailableJobs() + } + + // --------------------------------------------------------------------------- + + async registerRunner (options: { + url: string + registrationToken: string + runnerName: string + runnerDescription?: string + }) { + const { url, registrationToken, runnerName, runnerDescription } = options + + logger.info(`Registering runner ${runnerName} on ${url}...`) + + const serverCommand = new PeerTubeServerCommand({ url }) + const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken }) + + const server: PeerTubeServer = Object.assign(serverCommand, { + runnerToken, + runnerName, + runnerDescription + }) + + this.loadServer(server) + await this.saveRegisteredInstancesInConf() + + logger.info(`Registered runner ${runnerName} on ${url}`) + + await this.checkAvailableJobs() + } + + private loadServer (server: PeerTubeServer) { + this.servers.push(server) + + const url = server.url + '/runners' + const socket = io(url, { + auth: { + runnerToken: server.runnerToken + }, + transports: [ 'websocket' ] + }) + + socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`)) + socket.on('connect', () => logger.info(`Connected to ${url} socket`)) + socket.on('available-jobs', () => this.checkAvailableJobs()) + + this.sockets.set(server, socket) + } + + async unregisterRunner (options: { + url: string + }) { + const { url } = options + + const server = this.servers.find(s => s.url === url) + if (!server) { + logger.error(`Unknown server ${url} to unregister`) + return + } + + logger.info(`Unregistering runner ${server.runnerName} on ${url}...`) + + try { + await server.runners.unregister({ runnerToken: server.runnerToken }) + } catch (err) { + logger.error({ err }, `Cannot unregister runner ${server.runnerName} on ${url}`) + } + + this.unloadServer(server) + await this.saveRegisteredInstancesInConf() + + logger.info(`Unregistered runner ${server.runnerName} on ${server.url}`) + } + + private unloadServer (server: PeerTubeServer) { + this.servers = this.servers.filter(s => s !== server) + + const socket = this.sockets.get(server) + socket.disconnect() + + this.sockets.delete(server) + } + + listRegistered () { + return { + servers: this.servers.map(s => { + return { + url: s.url, + runnerName: s.runnerName, + runnerDescription: s.runnerDescription + } + }) + } + } + + // --------------------------------------------------------------------------- + + private async checkAvailableJobs () { + if (this.checkingAvailableJobs) return + + logger.info('Checking available jobs') + + this.checkingAvailableJobs = true + + for (const server of this.servers) { + try { + const job = await this.requestJob(server) + if (!job) continue + + await this.tryToExecuteJobAsync(server, job) + } catch (err) { + if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { + logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) + + await this.unregisterRunner({ url: server.url }) + return + } + + logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`) + } + } + + this.checkingAvailableJobs = false + } + + private async requestJob (server: PeerTubeServer) { + logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) + + const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) + + if (availableJobs.length === 0) { + logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) + return undefined + } + + return availableJobs[0] + } + + private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { + if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return + + const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) + + const processingJob = { job, server } + this.processingJobs.push(processingJob) + + processJob({ server, job, runnerToken: server.runnerToken }) + .catch(err => { + logger.error({ err }, 'Cannot process job') + + server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message }) + .catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error')) + }) + .finally(() => { + this.processingJobs = this.processingJobs.filter(p => p !== processingJob) + + return this.checkAvailableJobs() + }) + } + + // --------------------------------------------------------------------------- + + private saveRegisteredInstancesInConf () { + const data = this.servers.map(s => { + return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ]) + }) + + return ConfigManager.Instance.setRegisteredInstances(data) + } + + // --------------------------------------------------------------------------- + + private async cleanupTMP () { + const files = await readdir(ConfigManager.Instance.getTranscodingDirectory()) + + for (const file of files) { + await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file)) + } + } + + private async onExit () { + try { + for (const { server, job } of this.processingJobs) { + await server.runnerJobs.abort({ + jobToken: job.jobToken, + jobUUID: job.uuid, + reason: 'Runner stopped', + runnerToken: server.runnerToken + }) + } + + await this.cleanupTMP() + } catch (err) { + console.error(err) + process.exit(-1) + } + + process.exit() + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} -- cgit v1.2.3