1 import { retryTransactionWrapper } from '@server/helpers/database-utils'
2 import { logger, loggerTagsFactory } from '@server/helpers/logger'
3 import { RUNNER_JOBS } from '@server/initializers/constants'
4 import { sequelizeTypescript } from '@server/initializers/database'
5 import { PeerTubeSocket } from '@server/lib/peertube-socket'
6 import { RunnerJobModel } from '@server/models/runner/runner-job'
7 import { setAsUpdated } from '@server/models/shared'
8 import { MRunnerJob } from '@server/types/models/runners'
9 import { pick } from '@shared/core-utils'
11 RunnerJobLiveRTMPHLSTranscodingPayload,
12 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
14 RunnerJobSuccessPayload,
16 RunnerJobUpdatePayload,
17 RunnerJobVODAudioMergeTranscodingPayload,
18 RunnerJobVODAudioMergeTranscodingPrivatePayload,
19 RunnerJobVODHLSTranscodingPayload,
20 RunnerJobVODHLSTranscodingPrivatePayload,
21 RunnerJobVODWebVideoTranscodingPayload,
22 RunnerJobVODWebVideoTranscodingPrivatePayload
23 } from '@shared/models'
25 type CreateRunnerJobArg =
27 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
28 payload: RunnerJobVODWebVideoTranscodingPayload
29 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
32 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
33 payload: RunnerJobVODHLSTranscodingPayload
34 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
37 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
38 payload: RunnerJobVODAudioMergeTranscodingPayload
39 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
42 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
43 payload: RunnerJobLiveRTMPHLSTranscodingPayload
44 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
47 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
49 protected readonly lTags = loggerTagsFactory('runner')
51 // ---------------------------------------------------------------------------
53 abstract create (options: C): Promise<MRunnerJob>
55 protected async createRunnerJob (options: CreateRunnerJobArg & {
58 dependsOnRunnerJob?: MRunnerJob
59 }): Promise<MRunnerJob> {
60 const { priority, dependsOnRunnerJob } = options
62 const runnerJob = new RunnerJobModel({
63 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
65 uuid: options.jobUUID,
67 state: dependsOnRunnerJob
68 ? RunnerJobState.WAITING_FOR_PARENT_JOB
69 : RunnerJobState.PENDING,
71 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
76 const job = await sequelizeTypescript.transaction(async transaction => {
77 return runnerJob.save({ transaction })
80 if (runnerJob.state === RunnerJobState.PENDING) {
81 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
87 // ---------------------------------------------------------------------------
89 protected abstract specificUpdate (options: {
92 }): Promise<void> | void
94 async update (options: {
99 const { runnerJob, progress } = options
101 await this.specificUpdate(options)
103 if (progress) runnerJob.progress = progress
105 await retryTransactionWrapper(() => {
106 return sequelizeTypescript.transaction(async transaction => {
107 if (runnerJob.changed()) {
108 return runnerJob.save({ transaction })
111 // Don't update the job too often
112 if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
113 await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
119 // ---------------------------------------------------------------------------
121 async complete (options: {
122 runnerJob: MRunnerJob
125 const { runnerJob } = options
128 await this.specificComplete(options)
130 runnerJob.state = RunnerJobState.COMPLETED
132 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
134 runnerJob.state = RunnerJobState.ERRORED
135 runnerJob.error = err.message
138 runnerJob.progress = null
139 runnerJob.finishedAt = new Date()
141 await retryTransactionWrapper(() => {
142 return sequelizeTypescript.transaction(async transaction => {
143 await runnerJob.save({ transaction })
147 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
149 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
152 protected abstract specificComplete (options: {
153 runnerJob: MRunnerJob
155 }): Promise<void> | void
157 // ---------------------------------------------------------------------------
159 async cancel (options: {
160 runnerJob: MRunnerJob
163 const { runnerJob, fromParent } = options
165 await this.specificCancel(options)
167 const cancelState = fromParent
168 ? RunnerJobState.PARENT_CANCELLED
169 : RunnerJobState.CANCELLED
171 runnerJob.setToErrorOrCancel(cancelState)
173 await retryTransactionWrapper(() => {
174 return sequelizeTypescript.transaction(async transaction => {
175 await runnerJob.save({ transaction })
179 const children = await RunnerJobModel.listChildrenOf(runnerJob)
180 for (const child of children) {
181 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
183 await this.cancel({ runnerJob: child, fromParent: true })
187 protected abstract specificCancel (options: {
188 runnerJob: MRunnerJob
189 }): Promise<void> | void
191 // ---------------------------------------------------------------------------
193 protected abstract isAbortSupported (): boolean
195 async abort (options: {
196 runnerJob: MRunnerJob
198 const { runnerJob } = options
200 if (this.isAbortSupported() !== true) {
201 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
204 await this.specificAbort(options)
206 runnerJob.resetToPending()
208 await retryTransactionWrapper(() => {
209 return sequelizeTypescript.transaction(async transaction => {
210 await runnerJob.save({ transaction })
215 protected setAbortState (runnerJob: MRunnerJob) {
216 runnerJob.resetToPending()
219 protected abstract specificAbort (options: {
220 runnerJob: MRunnerJob
221 }): Promise<void> | void
223 // ---------------------------------------------------------------------------
225 async error (options: {
226 runnerJob: MRunnerJob
230 const { runnerJob, message, fromParent } = options
232 const errorState = fromParent
233 ? RunnerJobState.PARENT_ERRORED
234 : RunnerJobState.ERRORED
236 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
237 ? RunnerJobState.PENDING
240 await this.specificError({ ...options, nextState })
242 if (nextState === errorState) {
243 runnerJob.setToErrorOrCancel(nextState)
244 runnerJob.error = message
246 runnerJob.resetToPending()
249 await retryTransactionWrapper(() => {
250 return sequelizeTypescript.transaction(async transaction => {
251 await runnerJob.save({ transaction })
255 if (runnerJob.state === errorState) {
256 const children = await RunnerJobModel.listChildrenOf(runnerJob)
258 for (const child of children) {
259 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
261 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
266 protected abstract specificError (options: {
267 runnerJob: MRunnerJob
269 nextState: RunnerJobState
270 }): Promise<void> | void