]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Fix peertube runner concurrency
authorChocobozzz <me@florianbigard.com>
Fri, 19 May 2023 07:23:20 +0000 (09:23 +0200)
committerChocobozzz <me@florianbigard.com>
Fri, 19 May 2023 07:23:20 +0000 (09:23 +0200)
packages/peertube-runner/server/server.ts
server/controllers/api/runners/jobs.ts
server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
server/tests/api/server/open-telemetry.ts
server/tests/peertube-runner/vod-transcoding.ts
shared/models/server/server-error-code.enum.ts

index 8eff4bd2f4f1600a99c7006d44467ba33c1fb635..cb1533dc6db6095bab2bdea88acd4ea5afda1ddc 100644 (file)
@@ -170,18 +170,29 @@ export class RunnerServer {
   private async checkAvailableJobs () {
     if (this.checkingAvailableJobs) return
 
-    logger.info('Checking available jobs')
-
     this.checkingAvailableJobs = true
 
+    let hadAvailableJob = false
+
     for (const server of this.servers) {
       try {
+        logger.info('Checking available jobs on ' + server.url)
+
         const job = await this.requestJob(server)
         if (!job) continue
 
+        hadAvailableJob = true
+
         await this.tryToExecuteJobAsync(server, job)
       } catch (err) {
-        if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
+        const code = (err.res?.body as PeerTubeProblemDocument)?.code
+
+        if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
+          logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
+          return
+        }
+
+        if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
           logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
 
           await this.unregisterRunner({ url: server.url })
@@ -193,6 +204,11 @@ export class RunnerServer {
     }
 
     this.checkingAvailableJobs = false
+
+    if (hadAvailableJob && this.canProcessMoreJobs()) {
+      this.checkAvailableJobs()
+        .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
+    }
   }
 
   private async requestJob (server: PeerTubeServer) {
@@ -211,7 +227,7 @@ export class RunnerServer {
   }
 
   private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
-    if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
+    if (!this.canProcessMoreJobs()) return
 
     const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
 
@@ -242,6 +258,10 @@ export class RunnerServer {
     return ConfigManager.Instance.setRegisteredInstances(data)
   }
 
+  private canProcessMoreJobs () {
+    return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
+  }
+
   // ---------------------------------------------------------------------------
 
   private async cleanupTMP () {
index bdeb0c6cd73358a8778daf310820209bc979debd..140f062bed225d4f0ef112fc3ba7de8d42d025fc 100644 (file)
@@ -42,6 +42,7 @@ import {
   RunnerJobType,
   RunnerJobUpdateBody,
   RunnerJobUpdatePayload,
+  ServerErrorCode,
   UserRight,
   VideoStudioTranscodingSuccess,
   VODAudioMergeTranscodingSuccess,
@@ -168,6 +169,7 @@ async function acceptRunnerJob (req: express.Request, res: express.Response) {
 
       if (runnerJob.state !== RunnerJobState.PENDING) {
         res.fail({
+          type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
           message: 'This job is not in pending state anymore',
           status: HttpStatusCode.CONFLICT_409
         })
index 3c2cf51b7231a51e91aaf56eff92c7d742ca03c0..5bad348606026ab386042dfa648c988cde880c4f 100644 (file)
@@ -68,6 +68,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
   abort () {
     if (this.ended || this.errored || this.aborted) return
 
+    logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags())
+
     this.ffmpegCommand.kill('SIGINT')
 
     this.aborted = true
@@ -95,6 +97,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
   private onFFmpegEnded () {
     if (this.ended || this.errored || this.aborted) return
 
+    logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags())
+
     this.ended = true
     this.emit('end')
   }
index 49f3b520bc5a7be6c7514bacdf64c7991b8e199e..fd85fc5141ec39ba1df0ff787e7dca6b4a542114 100644 (file)
@@ -31,7 +31,7 @@ describe('Open Telemetry', function () {
     it('Should enable open telemetry metrics', async function () {
       this.timeout(120000)
 
-      server = await createSingleServer(1, {
+      await server.run({
         open_telemetry: {
           metrics: {
             enabled: true
@@ -73,7 +73,7 @@ describe('Open Telemetry', function () {
     it('Should disable http request duration metrics', async function () {
       await server.kill()
 
-      server = await createSingleServer(1, {
+      await server.run({
         open_telemetry: {
           metrics: {
             enabled: true,
@@ -114,7 +114,7 @@ describe('Open Telemetry', function () {
     })
 
     it('Should enable open telemetry metrics', async function () {
-      server = await createSingleServer(1, {
+      await server.run({
         open_telemetry: {
           tracing: {
             enabled: true,
index 3c09181029d342cec088195ec24ab181a110e264..d7e2df095a27829c81b787ae57d96d4a1a49730c 100644 (file)
@@ -189,7 +189,7 @@ describe('Test VOD transcoding in peertube-runner program', function () {
     })
 
     it('Should transcode videos on manual run', async function () {
-      this.timeout(120000)
+      this.timeout(240000)
 
       await servers[0].config.disableTranscoding()
 
index 24d3c6d21ae6bb6eb467710255652aacd789e271..2b093380c0a2148ac953427acbc39be8b40cb3ba 100644 (file)
@@ -48,6 +48,7 @@ export const enum ServerErrorCode {
   ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected',
 
   RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state',
+  RUNNER_JOB_NOT_IN_PENDING_STATE = 'runner_job_not_in_pending_state',
   UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token'
 }