X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=packages%2Fpeertube-runner%2Fserver%2Fserver.ts;fp=packages%2Fpeertube-runner%2Fserver%2Fserver.ts;h=cb1533dc6db6095bab2bdea88acd4ea5afda1ddc;hb=fe7019b2323768f7e33890303f95c9a45688ac1d;hp=8eff4bd2f4f1600a99c7006d44467ba33c1fb635;hpb=ef2e6aabf755feeec96011e70ff2522a491c5cb3;p=github%2FChocobozzz%2FPeerTube.git diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 8eff4bd2f..cb1533dc6 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts @@ -170,18 +170,29 @@ export class RunnerServer { private async checkAvailableJobs () { if (this.checkingAvailableJobs) return - logger.info('Checking available jobs') - this.checkingAvailableJobs = true + let hadAvailableJob = false + for (const server of this.servers) { try { + logger.info('Checking available jobs on ' + server.url) + const job = await this.requestJob(server) if (!job) continue + hadAvailableJob = true + await this.tryToExecuteJobAsync(server, job) } catch (err) { - if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { + const code = (err.res?.body as PeerTubeProblemDocument)?.code + + if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { + logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') + return + } + + if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) await this.unregisterRunner({ url: server.url }) @@ -193,6 +204,11 @@ export class RunnerServer { } this.checkingAvailableJobs = false + + if (hadAvailableJob && this.canProcessMoreJobs()) { + this.checkAvailableJobs() + .catch(err => logger.error({ err }, 'Cannot check more available jobs')) + } } private async requestJob (server: PeerTubeServer) { @@ -211,7 +227,7 @@ export class RunnerServer { } private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { - if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return + if (!this.canProcessMoreJobs()) return const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) @@ -242,6 +258,10 @@ export class RunnerServer { return ConfigManager.Instance.setRegisteredInstances(data) } + private canProcessMoreJobs () { + return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency + } + // --------------------------------------------------------------------------- private async cleanupTMP () {