diff options
Diffstat (limited to 'packages/peertube-runner')
-rw-r--r-- | packages/peertube-runner/server/server.ts | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 8eff4bd2f..cb1533dc6 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts | |||
@@ -170,18 +170,29 @@ export class RunnerServer { | |||
170 | private async checkAvailableJobs () { | 170 | private async checkAvailableJobs () { |
171 | if (this.checkingAvailableJobs) return | 171 | if (this.checkingAvailableJobs) return |
172 | 172 | ||
173 | logger.info('Checking available jobs') | ||
174 | |||
175 | this.checkingAvailableJobs = true | 173 | this.checkingAvailableJobs = true |
176 | 174 | ||
175 | let hadAvailableJob = false | ||
176 | |||
177 | for (const server of this.servers) { | 177 | for (const server of this.servers) { |
178 | try { | 178 | try { |
179 | logger.info('Checking available jobs on ' + server.url) | ||
180 | |||
179 | const job = await this.requestJob(server) | 181 | const job = await this.requestJob(server) |
180 | if (!job) continue | 182 | if (!job) continue |
181 | 183 | ||
184 | hadAvailableJob = true | ||
185 | |||
182 | await this.tryToExecuteJobAsync(server, job) | 186 | await this.tryToExecuteJobAsync(server, job) |
183 | } catch (err) { | 187 | } catch (err) { |
184 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | 188 | const code = (err.res?.body as PeerTubeProblemDocument)?.code |
189 | |||
190 | if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
191 | logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') | ||
192 | return | ||
193 | } | ||
194 | |||
195 | if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
185 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | 196 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) |
186 | 197 | ||
187 | await this.unregisterRunner({ url: server.url }) | 198 | await this.unregisterRunner({ url: server.url }) |
@@ -193,6 +204,11 @@ export class RunnerServer { | |||
193 | } | 204 | } |
194 | 205 | ||
195 | this.checkingAvailableJobs = false | 206 | this.checkingAvailableJobs = false |
207 | |||
208 | if (hadAvailableJob && this.canProcessMoreJobs()) { | ||
209 | this.checkAvailableJobs() | ||
210 | .catch(err => logger.error({ err }, 'Cannot check more available jobs')) | ||
211 | } | ||
196 | } | 212 | } |
197 | 213 | ||
198 | private async requestJob (server: PeerTubeServer) { | 214 | private async requestJob (server: PeerTubeServer) { |
@@ -211,7 +227,7 @@ export class RunnerServer { | |||
211 | } | 227 | } |
212 | 228 | ||
213 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | 229 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { |
214 | if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return | 230 | if (!this.canProcessMoreJobs()) return |
215 | 231 | ||
216 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | 232 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) |
217 | 233 | ||
@@ -242,6 +258,10 @@ export class RunnerServer { | |||
242 | return ConfigManager.Instance.setRegisteredInstances(data) | 258 | return ConfigManager.Instance.setRegisteredInstances(data) |
243 | } | 259 | } |
244 | 260 | ||
261 | private canProcessMoreJobs () { | ||
262 | return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency | ||
263 | } | ||
264 | |||
245 | // --------------------------------------------------------------------------- | 265 | // --------------------------------------------------------------------------- |
246 | 266 | ||
247 | private async cleanupTMP () { | 267 | private async cleanupTMP () { |