]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - packages/peertube-runner/server/server.ts
Fix peertube runner concurrency
[github/Chocobozzz/PeerTube.git] / packages / peertube-runner / server / server.ts
index 724f359bd36dd7f1dd4ad7c0ddd208d493163cb4..cb1533dc6db6095bab2bdea88acd4ea5afda1ddc 100644 (file)
@@ -8,6 +8,7 @@ import { ConfigManager } from '../shared'
 import { IPCServer } from '../shared/ipc'
 import { logger } from '../shared/logger'
 import { JobWithToken, processJob } from './process'
+import { isJobSupported } from './shared'
 
 type PeerTubeServer = PeerTubeServerCommand & {
   runnerToken: string
@@ -23,6 +24,8 @@ export class RunnerServer {
 
   private checkingAvailableJobs = false
 
+  private cleaningUp = false
+
   private readonly sockets = new Map<PeerTubeServer, Socket>()
 
   private constructor () {}
@@ -45,13 +48,17 @@ export class RunnerServer {
     try {
       await ipcServer.run(this)
     } catch (err) {
-      console.error('Cannot start local socket for IPC communication', err)
+      logger.error('Cannot start local socket for IPC communication', err)
       process.exit(-1)
     }
 
     // Cleanup on exit
     for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
-      process.on(code, async () => {
+      process.on(code, async (err, origin) => {
+        if (code === 'uncaughtException') {
+          logger.error({ err, origin }, 'uncaughtException')
+        }
+
         await this.onExit()
       })
     }
@@ -163,18 +170,29 @@ export class RunnerServer {
   private async checkAvailableJobs () {
     if (this.checkingAvailableJobs) return
 
-    logger.info('Checking available jobs')
-
     this.checkingAvailableJobs = true
 
+    let hadAvailableJob = false
+
     for (const server of this.servers) {
       try {
+        logger.info('Checking available jobs on ' + server.url)
+
         const job = await this.requestJob(server)
         if (!job) continue
 
+        hadAvailableJob = true
+
         await this.tryToExecuteJobAsync(server, job)
       } catch (err) {
-        if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
+        const code = (err.res?.body as PeerTubeProblemDocument)?.code
+
+        if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
+          logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
+          return
+        }
+
+        if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
           logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
 
           await this.unregisterRunner({ url: server.url })
@@ -186,6 +204,11 @@ export class RunnerServer {
     }
 
     this.checkingAvailableJobs = false
+
+    if (hadAvailableJob && this.canProcessMoreJobs()) {
+      this.checkAvailableJobs()
+        .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
+    }
   }
 
   private async requestJob (server: PeerTubeServer) {
@@ -193,16 +216,18 @@ export class RunnerServer {
 
     const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
 
-    if (availableJobs.length === 0) {
+    const filtered = availableJobs.filter(j => isJobSupported(j))
+
+    if (filtered.length === 0) {
       logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
       return undefined
     }
 
-    return availableJobs[0]
+    return filtered[0]
   }
 
   private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
-    if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
+    if (!this.canProcessMoreJobs()) return
 
     const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
 
@@ -233,6 +258,10 @@ export class RunnerServer {
     return ConfigManager.Instance.setRegisteredInstances(data)
   }
 
+  private canProcessMoreJobs () {
+    return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
+  }
+
   // ---------------------------------------------------------------------------
 
   private async cleanupTMP () {
@@ -244,6 +273,11 @@ export class RunnerServer {
   }
 
   private async onExit () {
+    if (this.cleaningUp) return
+    this.cleaningUp = true
+
+    logger.info('Cleaning up after program exit')
+
     try {
       for (const { server, job } of this.processingJobs) {
         await server.runnerJobs.abort({
@@ -256,7 +290,7 @@ export class RunnerServer {
 
       await this.cleanupTMP()
     } catch (err) {
-      console.error(err)
+      logger.error(err)
       process.exit(-1)
     }