]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/controllers/api/runners/jobs.ts
Fix peertube runner concurrency
[github/Chocobozzz/PeerTube.git] / server / controllers / api / runners / jobs.ts
index 7d488ec111403b2c2d6f9f16eec966f0b6bfba53..140f062bed225d4f0ef112fc3ba7de8d42d025fc 100644 (file)
@@ -1,4 +1,5 @@
 import express, { UploadFiles } from 'express'
+import { retryTransactionWrapper } from '@server/helpers/database-utils'
 import { createReqFiles } from '@server/helpers/express-utils'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { generateRunnerJobToken } from '@server/helpers/token-generator'
@@ -17,6 +18,7 @@ import {
 import {
   abortRunnerJobValidator,
   acceptRunnerJobValidator,
+  cancelRunnerJobValidator,
   errorRunnerJobValidator,
   getRunnerFromTokenValidator,
   jobOfRunnerGetValidator,
@@ -40,7 +42,9 @@ import {
   RunnerJobType,
   RunnerJobUpdateBody,
   RunnerJobUpdatePayload,
+  ServerErrorCode,
   UserRight,
+  VideoStudioTranscodingSuccess,
   VODAudioMergeTranscodingSuccess,
   VODHLSTranscodingSuccess,
   VODWebVideoTranscodingSuccess
@@ -110,6 +114,7 @@ runnerJobsRouter.post('/jobs/:jobUUID/cancel',
   authenticate,
   ensureUserHasRight(UserRight.MANAGE_RUNNERS),
   asyncMiddleware(runnerJobGetValidator),
+  cancelRunnerJobValidator,
   asyncMiddleware(cancelRunnerJob)
 )
 
@@ -158,14 +163,30 @@ async function acceptRunnerJob (req: express.Request, res: express.Response) {
   const runner = res.locals.runner
   const runnerJob = res.locals.runnerJob
 
-  runnerJob.state = RunnerJobState.PROCESSING
-  runnerJob.processingJobToken = generateRunnerJobToken()
-  runnerJob.startedAt = new Date()
-  runnerJob.runnerId = runner.id
+  const newRunnerJob = await retryTransactionWrapper(() => {
+    return sequelizeTypescript.transaction(async transaction => {
+      await runnerJob.reload({ transaction })
 
-  const newRunnerJob = await sequelizeTypescript.transaction(transaction => {
-    return runnerJob.save({ transaction })
+      if (runnerJob.state !== RunnerJobState.PENDING) {
+        res.fail({
+          type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
+          message: 'This job is not in pending state anymore',
+          status: HttpStatusCode.CONFLICT_409
+        })
+
+        return undefined
+      }
+
+      runnerJob.state = RunnerJobState.PROCESSING
+      runnerJob.processingJobToken = generateRunnerJobToken()
+      runnerJob.startedAt = new Date()
+      runnerJob.runnerId = runner.id
+
+      return runnerJob.save({ transaction })
+    })
   })
+  if (!newRunnerJob) return
+
   newRunnerJob.Runner = runner as RunnerModel
 
   const result: AcceptRunnerJobResult = {
@@ -297,6 +318,14 @@ const jobSuccessPayloadBuilders: {
     }
   },
 
+  'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => {
+    return {
+      ...payload,
+
+      videoFile: files['payload[videoFile]'][0].path
+    }
+  },
+
   'live-rtmp-hls-transcoding': () => ({})
 }
 
@@ -327,7 +356,7 @@ async function postRunnerJobSuccess (req: express.Request, res: express.Response
 async function cancelRunnerJob (req: express.Request, res: express.Response) {
   const runnerJob = res.locals.runnerJob
 
-  logger.info('Cancelling job %s (%s)', runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
+  logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
 
   const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
   await new RunnerJobHandler().cancel({ runnerJob })