From fe7019b2323768f7e33890303f95c9a45688ac1d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 19 May 2023 09:23:20 +0200 Subject: Fix peertube runner concurrency --- packages/peertube-runner/server/server.ts | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) (limited to 'packages') diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 8eff4bd2f..cb1533dc6 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts @@ -170,18 +170,29 @@ export class RunnerServer { private async checkAvailableJobs () { if (this.checkingAvailableJobs) return - logger.info('Checking available jobs') - this.checkingAvailableJobs = true + let hadAvailableJob = false + for (const server of this.servers) { try { + logger.info('Checking available jobs on ' + server.url) + const job = await this.requestJob(server) if (!job) continue + hadAvailableJob = true + await this.tryToExecuteJobAsync(server, job) } catch (err) { - if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { + const code = (err.res?.body as PeerTubeProblemDocument)?.code + + if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { + logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') + return + } + + if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) await this.unregisterRunner({ url: server.url }) @@ -193,6 +204,11 @@ export class RunnerServer { } this.checkingAvailableJobs = false + + if (hadAvailableJob && this.canProcessMoreJobs()) { + this.checkAvailableJobs() + .catch(err => logger.error({ err }, 'Cannot check more available jobs')) + } } private async requestJob (server: PeerTubeServer) { @@ -211,7 +227,7 @@ export class RunnerServer { } private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { - if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return + if (!this.canProcessMoreJobs()) return const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) @@ -242,6 +258,10 @@ export class RunnerServer { return ConfigManager.Instance.setRegisteredInstances(data) } + private canProcessMoreJobs () { + return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency + } + // --------------------------------------------------------------------------- private async cleanupTMP () { -- cgit v1.2.3