]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/runners/job-handlers/abstract-job-handler.ts
Avoid aborting completing jobs
[github/Chocobozzz/PeerTube.git] / server / lib / runners / job-handlers / abstract-job-handler.ts
index 74b455107ea254fe95393421ac95a463dc1761cf..ca97d08816f3d4c1b2bbcd7814c2a0532ca38c29 100644 (file)
@@ -1,4 +1,5 @@
-import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { throttle } from 'lodash'
+import { retryTransactionWrapper, saveInTransactionWithRetries } from '@server/helpers/database-utils'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { RUNNER_JOBS } from '@server/initializers/constants'
 import { sequelizeTypescript } from '@server/initializers/database'
@@ -11,9 +12,11 @@ import {
   RunnerJobLiveRTMPHLSTranscodingPayload,
   RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
   RunnerJobState,
+  RunnerJobStudioTranscodingPayload,
   RunnerJobSuccessPayload,
   RunnerJobType,
   RunnerJobUpdatePayload,
+  RunnerJobVideoStudioTranscodingPrivatePayload,
   RunnerJobVODAudioMergeTranscodingPayload,
   RunnerJobVODAudioMergeTranscodingPrivatePayload,
   RunnerJobVODHLSTranscodingPayload,
@@ -21,7 +24,6 @@ import {
   RunnerJobVODWebVideoTranscodingPayload,
   RunnerJobVODWebVideoTranscodingPrivatePayload
 } from '@shared/models'
-import { throttle } from 'lodash'
 
 type CreateRunnerJobArg =
   {
@@ -43,6 +45,11 @@ type CreateRunnerJobArg =
     type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
     payload: RunnerJobLiveRTMPHLSTranscodingPayload
     privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
+  } |
+  {
+    type: Extract<RunnerJobType, 'video-studio-transcoding'>
+    payload: RunnerJobStudioTranscodingPayload
+    privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
   }
 
 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
@@ -62,6 +69,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
   }): Promise<MRunnerJob> {
     const { priority, dependsOnRunnerJob } = options
 
+    logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
+
     const runnerJob = new RunnerJobModel({
       ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
 
@@ -130,6 +139,9 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
   }) {
     const { runnerJob } = options
 
+    runnerJob.state = RunnerJobState.COMPLETING
+    await saveInTransactionWithRetries(runnerJob)
+
     try {
       await this.specificComplete(options)
 
@@ -144,11 +156,7 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
     runnerJob.progress = null
     runnerJob.finishedAt = new Date()
 
-    await retryTransactionWrapper(() => {
-      return sequelizeTypescript.transaction(async transaction => {
-        await runnerJob.save({ transaction })
-      })
-    })
+    await saveInTransactionWithRetries(runnerJob)
 
     const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)