]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/runners/job-handlers/abstract-job-handler.ts
Avoid aborting completing jobs
[github/Chocobozzz/PeerTube.git] / server / lib / runners / job-handlers / abstract-job-handler.ts
index 73fc1457407f9719cd686d10c987bc42ed27453d..ca97d08816f3d4c1b2bbcd7814c2a0532ca38c29 100644 (file)
@@ -1,4 +1,5 @@
-import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { throttle } from 'lodash'
+import { retryTransactionWrapper, saveInTransactionWithRetries } from '@server/helpers/database-utils'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { RUNNER_JOBS } from '@server/initializers/constants'
 import { sequelizeTypescript } from '@server/initializers/database'
@@ -11,9 +12,11 @@ import {
   RunnerJobLiveRTMPHLSTranscodingPayload,
   RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
   RunnerJobState,
+  RunnerJobStudioTranscodingPayload,
   RunnerJobSuccessPayload,
   RunnerJobType,
   RunnerJobUpdatePayload,
+  RunnerJobVideoStudioTranscodingPrivatePayload,
   RunnerJobVODAudioMergeTranscodingPayload,
   RunnerJobVODAudioMergeTranscodingPrivatePayload,
   RunnerJobVODHLSTranscodingPayload,
@@ -42,12 +45,19 @@ type CreateRunnerJobArg =
     type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
     payload: RunnerJobLiveRTMPHLSTranscodingPayload
     privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
+  } |
+  {
+    type: Extract<RunnerJobType, 'video-studio-transcoding'>
+    payload: RunnerJobStudioTranscodingPayload
+    privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
   }
 
 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
 
   protected readonly lTags = loggerTagsFactory('runner')
 
+  static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
+
   // ---------------------------------------------------------------------------
 
   abstract create (options: C): Promise<MRunnerJob>
@@ -59,6 +69,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
   }): Promise<MRunnerJob> {
     const { priority, dependsOnRunnerJob } = options
 
+    logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
+
     const runnerJob = new RunnerJobModel({
       ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
 
@@ -102,16 +114,19 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
 
     if (progress) runnerJob.progress = progress
 
+    if (!runnerJob.changed()) {
+      try {
+        await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
+      } catch (err) {
+        logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
+      }
+
+      return
+    }
+
     await retryTransactionWrapper(() => {
       return sequelizeTypescript.transaction(async transaction => {
-        if (runnerJob.changed()) {
-          return runnerJob.save({ transaction })
-        }
-
-        // Don't update the job too often
-        if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
-          await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
-        }
+        return runnerJob.save({ transaction })
       })
     })
   }
@@ -124,6 +139,9 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
   }) {
     const { runnerJob } = options
 
+    runnerJob.state = RunnerJobState.COMPLETING
+    await saveInTransactionWithRetries(runnerJob)
+
     try {
       await this.specificComplete(options)
 
@@ -138,11 +156,7 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
     runnerJob.progress = null
     runnerJob.finishedAt = new Date()
 
-    await retryTransactionWrapper(() => {
-      return sequelizeTypescript.transaction(async transaction => {
-        await runnerJob.save({ transaction })
-      })
-    })
+    await saveInTransactionWithRetries(runnerJob)
 
     const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)