aboutsummaryrefslogblamecommitdiffhomepage
path: root/server/lib/runners/job-handlers/abstract-job-handler.ts
blob: ca97d08816f3d4c1b2bbcd7814c2a0532ca38c29 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
                                 
                                                                                                      











                                                                   
                                    


                          
                                                



























                                                                   

     


                                                                 





                                                                                                                   

                                                                










                                                                                

                                                                                 










































                                                                                









                                                                                                                                   

                                                                   
                                              











                                                                                


                                                 













                                                                                                      
                                                 





























































































































                                                                                                                                       
import { throttle } from 'lodash'
import { retryTransactionWrapper, saveInTransactionWithRetries } from '@server/helpers/database-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { RUNNER_JOBS } from '@server/initializers/constants'
import { sequelizeTypescript } from '@server/initializers/database'
import { PeerTubeSocket } from '@server/lib/peertube-socket'
import { RunnerJobModel } from '@server/models/runner/runner-job'
import { setAsUpdated } from '@server/models/shared'
import { MRunnerJob } from '@server/types/models/runners'
import { pick } from '@shared/core-utils'
import {
  RunnerJobLiveRTMPHLSTranscodingPayload,
  RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
  RunnerJobState,
  RunnerJobStudioTranscodingPayload,
  RunnerJobSuccessPayload,
  RunnerJobType,
  RunnerJobUpdatePayload,
  RunnerJobVideoStudioTranscodingPrivatePayload,
  RunnerJobVODAudioMergeTranscodingPayload,
  RunnerJobVODAudioMergeTranscodingPrivatePayload,
  RunnerJobVODHLSTranscodingPayload,
  RunnerJobVODHLSTranscodingPrivatePayload,
  RunnerJobVODWebVideoTranscodingPayload,
  RunnerJobVODWebVideoTranscodingPrivatePayload
} from '@shared/models'

type CreateRunnerJobArg =
  {
    type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
    payload: RunnerJobVODWebVideoTranscodingPayload
    privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
  } |
  {
    type: Extract<RunnerJobType, 'vod-hls-transcoding'>
    payload: RunnerJobVODHLSTranscodingPayload
    privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
  } |
  {
    type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
    payload: RunnerJobVODAudioMergeTranscodingPayload
    privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
  } |
  {
    type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
    payload: RunnerJobLiveRTMPHLSTranscodingPayload
    privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
  } |
  {
    type: Extract<RunnerJobType, 'video-studio-transcoding'>
    payload: RunnerJobStudioTranscodingPayload
    privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
  }

export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {

  protected readonly lTags = loggerTagsFactory('runner')

  static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)

  // ---------------------------------------------------------------------------

  abstract create (options: C): Promise<MRunnerJob>

  protected async createRunnerJob (options: CreateRunnerJobArg & {
    jobUUID: string
    priority: number
    dependsOnRunnerJob?: MRunnerJob
  }): Promise<MRunnerJob> {
    const { priority, dependsOnRunnerJob } = options

    logger.debug('Creating runner job', { options, ...this.lTags(options.type) })

    const runnerJob = new RunnerJobModel({
      ...pick(options, [ 'type', 'payload', 'privatePayload' ]),

      uuid: options.jobUUID,

      state: dependsOnRunnerJob
        ? RunnerJobState.WAITING_FOR_PARENT_JOB
        : RunnerJobState.PENDING,

      dependsOnRunnerJobId: dependsOnRunnerJob?.id,

      priority
    })

    const job = await sequelizeTypescript.transaction(async transaction => {
      return runnerJob.save({ transaction })
    })

    if (runnerJob.state === RunnerJobState.PENDING) {
      PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
    }

    return job
  }

  // ---------------------------------------------------------------------------

  protected abstract specificUpdate (options: {
    runnerJob: MRunnerJob
    updatePayload?: U
  }): Promise<void> | void

  async update (options: {
    runnerJob: MRunnerJob
    progress?: number
    updatePayload?: U
  }) {
    const { runnerJob, progress } = options

    await this.specificUpdate(options)

    if (progress) runnerJob.progress = progress

    if (!runnerJob.changed()) {
      try {
        await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
      } catch (err) {
        logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
      }

      return
    }

    await retryTransactionWrapper(() => {
      return sequelizeTypescript.transaction(async transaction => {
        return runnerJob.save({ transaction })
      })
    })
  }

  // ---------------------------------------------------------------------------

  async complete (options: {
    runnerJob: MRunnerJob
    resultPayload: S
  }) {
    const { runnerJob } = options

    runnerJob.state = RunnerJobState.COMPLETING
    await saveInTransactionWithRetries(runnerJob)

    try {
      await this.specificComplete(options)

      runnerJob.state = RunnerJobState.COMPLETED
    } catch (err) {
      logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })

      runnerJob.state = RunnerJobState.ERRORED
      runnerJob.error = err.message
    }

    runnerJob.progress = null
    runnerJob.finishedAt = new Date()

    await saveInTransactionWithRetries(runnerJob)

    const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)

    if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
  }

  protected abstract specificComplete (options: {
    runnerJob: MRunnerJob
    resultPayload: S
  }): Promise<void> | void

  // ---------------------------------------------------------------------------

  async cancel (options: {
    runnerJob: MRunnerJob
    fromParent?: boolean
  }) {
    const { runnerJob, fromParent } = options

    await this.specificCancel(options)

    const cancelState = fromParent
      ? RunnerJobState.PARENT_CANCELLED
      : RunnerJobState.CANCELLED

    runnerJob.setToErrorOrCancel(cancelState)

    await retryTransactionWrapper(() => {
      return sequelizeTypescript.transaction(async transaction => {
        await runnerJob.save({ transaction })
      })
    })

    const children = await RunnerJobModel.listChildrenOf(runnerJob)
    for (const child of children) {
      logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))

      await this.cancel({ runnerJob: child, fromParent: true })
    }
  }

  protected abstract specificCancel (options: {
    runnerJob: MRunnerJob
  }): Promise<void> | void

  // ---------------------------------------------------------------------------

  protected abstract isAbortSupported (): boolean

  async abort (options: {
    runnerJob: MRunnerJob
  }) {
    const { runnerJob } = options

    if (this.isAbortSupported() !== true) {
      return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
    }

    await this.specificAbort(options)

    runnerJob.resetToPending()

    await retryTransactionWrapper(() => {
      return sequelizeTypescript.transaction(async transaction => {
        await runnerJob.save({ transaction })
      })
    })
  }

  protected setAbortState (runnerJob: MRunnerJob) {
    runnerJob.resetToPending()
  }

  protected abstract specificAbort (options: {
    runnerJob: MRunnerJob
  }): Promise<void> | void

  // ---------------------------------------------------------------------------

  async error (options: {
    runnerJob: MRunnerJob
    message: string
    fromParent?: boolean
  }) {
    const { runnerJob, message, fromParent } = options

    const errorState = fromParent
      ? RunnerJobState.PARENT_ERRORED
      : RunnerJobState.ERRORED

    const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
      ? RunnerJobState.PENDING
      : errorState

    await this.specificError({ ...options, nextState })

    if (nextState === errorState) {
      runnerJob.setToErrorOrCancel(nextState)
      runnerJob.error = message
    } else {
      runnerJob.resetToPending()
    }

    await retryTransactionWrapper(() => {
      return sequelizeTypescript.transaction(async transaction => {
        await runnerJob.save({ transaction })
      })
    })

    if (runnerJob.state === errorState) {
      const children = await RunnerJobModel.listChildrenOf(runnerJob)

      for (const child of children) {
        logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))

        await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
      }
    }
  }

  protected abstract specificError (options: {
    runnerJob: MRunnerJob
    message: string
    nextState: RunnerJobState
  }): Promise<void> | void
}