aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-19 09:23:20 +0200
committerChocobozzz <me@florianbigard.com>2023-05-19 09:23:20 +0200
commitfe7019b2323768f7e33890303f95c9a45688ac1d (patch)
tree8ccd4aa71d36626d1bffb1e2a8537241e2002404 /packages/peertube-runner
parentef2e6aabf755feeec96011e70ff2522a491c5cb3 (diff)
downloadPeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.tar.gz
PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.tar.zst
PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.zip
Fix peertube runner concurrency
Diffstat (limited to 'packages/peertube-runner')
-rw-r--r--packages/peertube-runner/server/server.ts28
1 files changed, 24 insertions, 4 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts
index 8eff4bd2f..cb1533dc6 100644
--- a/packages/peertube-runner/server/server.ts
+++ b/packages/peertube-runner/server/server.ts
@@ -170,18 +170,29 @@ export class RunnerServer {
170 private async checkAvailableJobs () { 170 private async checkAvailableJobs () {
171 if (this.checkingAvailableJobs) return 171 if (this.checkingAvailableJobs) return
172 172
173 logger.info('Checking available jobs')
174
175 this.checkingAvailableJobs = true 173 this.checkingAvailableJobs = true
176 174
175 let hadAvailableJob = false
176
177 for (const server of this.servers) { 177 for (const server of this.servers) {
178 try { 178 try {
179 logger.info('Checking available jobs on ' + server.url)
180
179 const job = await this.requestJob(server) 181 const job = await this.requestJob(server)
180 if (!job) continue 182 if (!job) continue
181 183
184 hadAvailableJob = true
185
182 await this.tryToExecuteJobAsync(server, job) 186 await this.tryToExecuteJobAsync(server, job)
183 } catch (err) { 187 } catch (err) {
184 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { 188 const code = (err.res?.body as PeerTubeProblemDocument)?.code
189
190 if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
191 logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
192 return
193 }
194
195 if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
185 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) 196 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
186 197
187 await this.unregisterRunner({ url: server.url }) 198 await this.unregisterRunner({ url: server.url })
@@ -193,6 +204,11 @@ export class RunnerServer {
193 } 204 }
194 205
195 this.checkingAvailableJobs = false 206 this.checkingAvailableJobs = false
207
208 if (hadAvailableJob && this.canProcessMoreJobs()) {
209 this.checkAvailableJobs()
210 .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
211 }
196 } 212 }
197 213
198 private async requestJob (server: PeerTubeServer) { 214 private async requestJob (server: PeerTubeServer) {
@@ -211,7 +227,7 @@ export class RunnerServer {
211 } 227 }
212 228
213 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { 229 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
214 if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return 230 if (!this.canProcessMoreJobs()) return
215 231
216 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) 232 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
217 233
@@ -242,6 +258,10 @@ export class RunnerServer {
242 return ConfigManager.Instance.setRegisteredInstances(data) 258 return ConfigManager.Instance.setRegisteredInstances(data)
243 } 259 }
244 260
261 private canProcessMoreJobs () {
262 return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
263 }
264
245 // --------------------------------------------------------------------------- 265 // ---------------------------------------------------------------------------
246 266
247 private async cleanupTMP () { 267 private async cleanupTMP () {