-import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { throttle } from 'lodash'
+import { retryTransactionWrapper, saveInTransactionWithRetries } from '@server/helpers/database-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { RUNNER_JOBS } from '@server/initializers/constants'
import { sequelizeTypescript } from '@server/initializers/database'
RunnerJobLiveRTMPHLSTranscodingPayload,
RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
RunnerJobState,
+ RunnerJobStudioTranscodingPayload,
RunnerJobSuccessPayload,
RunnerJobType,
RunnerJobUpdatePayload,
+ RunnerJobVideoStudioTranscodingPrivatePayload,
RunnerJobVODAudioMergeTranscodingPayload,
RunnerJobVODAudioMergeTranscodingPrivatePayload,
RunnerJobVODHLSTranscodingPayload,
type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
payload: RunnerJobLiveRTMPHLSTranscodingPayload
privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
+ } |
+ {
+ type: Extract<RunnerJobType, 'video-studio-transcoding'>
+ payload: RunnerJobStudioTranscodingPayload
+ privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
}
export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
protected readonly lTags = loggerTagsFactory('runner')
+ static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
+
// ---------------------------------------------------------------------------
abstract create (options: C): Promise<MRunnerJob>
}): Promise<MRunnerJob> {
const { priority, dependsOnRunnerJob } = options
+ logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
+
const runnerJob = new RunnerJobModel({
...pick(options, [ 'type', 'payload', 'privatePayload' ]),
if (progress) runnerJob.progress = progress
+ if (!runnerJob.changed()) {
+ try {
+ await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
+ } catch (err) {
+ logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
+ }
+
+ return
+ }
+
await retryTransactionWrapper(() => {
return sequelizeTypescript.transaction(async transaction => {
- if (runnerJob.changed()) {
- return runnerJob.save({ transaction })
- }
-
- // Don't update the job too often
- if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
- await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
- }
+ return runnerJob.save({ transaction })
})
})
}
}) {
const { runnerJob } = options
+ runnerJob.state = RunnerJobState.COMPLETING
+ await saveInTransactionWithRetries(runnerJob)
+
try {
await this.specificComplete(options)
runnerJob.progress = null
runnerJob.finishedAt = new Date()
- await retryTransactionWrapper(() => {
- return sequelizeTypescript.transaction(async transaction => {
- await runnerJob.save({ transaction })
- })
- })
+ await saveInTransactionWithRetries(runnerJob)
const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)