1 import { throttle } from 'lodash'
2 import { retryTransactionWrapper } from '@server/helpers/database-utils'
3 import { logger, loggerTagsFactory } from '@server/helpers/logger'
4 import { RUNNER_JOBS } from '@server/initializers/constants'
5 import { sequelizeTypescript } from '@server/initializers/database'
6 import { PeerTubeSocket } from '@server/lib/peertube-socket'
7 import { RunnerJobModel } from '@server/models/runner/runner-job'
8 import { setAsUpdated } from '@server/models/shared'
9 import { MRunnerJob } from '@server/types/models/runners'
10 import { pick } from '@shared/core-utils'
12 RunnerJobLiveRTMPHLSTranscodingPayload,
13 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
15 RunnerJobSuccessPayload,
17 RunnerJobUpdatePayload,
18 RunnerJobStudioTranscodingPayload,
19 RunnerJobVideoStudioTranscodingPrivatePayload,
20 RunnerJobVODAudioMergeTranscodingPayload,
21 RunnerJobVODAudioMergeTranscodingPrivatePayload,
22 RunnerJobVODHLSTranscodingPayload,
23 RunnerJobVODHLSTranscodingPrivatePayload,
24 RunnerJobVODWebVideoTranscodingPayload,
25 RunnerJobVODWebVideoTranscodingPrivatePayload
26 } from '@shared/models'
28 type CreateRunnerJobArg =
30 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
31 payload: RunnerJobVODWebVideoTranscodingPayload
32 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
35 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
36 payload: RunnerJobVODHLSTranscodingPayload
37 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
40 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
41 payload: RunnerJobVODAudioMergeTranscodingPayload
42 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
45 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
46 payload: RunnerJobLiveRTMPHLSTranscodingPayload
47 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
50 type: Extract<RunnerJobType, 'video-studio-transcoding'>
51 payload: RunnerJobStudioTranscodingPayload
52 privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
55 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
57 protected readonly lTags = loggerTagsFactory('runner')
59 static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
61 // ---------------------------------------------------------------------------
63 abstract create (options: C): Promise<MRunnerJob>
65 protected async createRunnerJob (options: CreateRunnerJobArg & {
68 dependsOnRunnerJob?: MRunnerJob
69 }): Promise<MRunnerJob> {
70 const { priority, dependsOnRunnerJob } = options
72 logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
74 const runnerJob = new RunnerJobModel({
75 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
77 uuid: options.jobUUID,
79 state: dependsOnRunnerJob
80 ? RunnerJobState.WAITING_FOR_PARENT_JOB
81 : RunnerJobState.PENDING,
83 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
88 const job = await sequelizeTypescript.transaction(async transaction => {
89 return runnerJob.save({ transaction })
92 if (runnerJob.state === RunnerJobState.PENDING) {
93 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
99 // ---------------------------------------------------------------------------
101 protected abstract specificUpdate (options: {
102 runnerJob: MRunnerJob
104 }): Promise<void> | void
106 async update (options: {
107 runnerJob: MRunnerJob
111 const { runnerJob, progress } = options
113 await this.specificUpdate(options)
115 if (progress) runnerJob.progress = progress
117 if (!runnerJob.changed()) {
119 await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
121 logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
127 await retryTransactionWrapper(() => {
128 return sequelizeTypescript.transaction(async transaction => {
129 return runnerJob.save({ transaction })
134 // ---------------------------------------------------------------------------
136 async complete (options: {
137 runnerJob: MRunnerJob
140 const { runnerJob } = options
143 await this.specificComplete(options)
145 runnerJob.state = RunnerJobState.COMPLETED
147 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
149 runnerJob.state = RunnerJobState.ERRORED
150 runnerJob.error = err.message
153 runnerJob.progress = null
154 runnerJob.finishedAt = new Date()
156 await retryTransactionWrapper(() => {
157 return sequelizeTypescript.transaction(async transaction => {
158 await runnerJob.save({ transaction })
162 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
164 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
167 protected abstract specificComplete (options: {
168 runnerJob: MRunnerJob
170 }): Promise<void> | void
172 // ---------------------------------------------------------------------------
174 async cancel (options: {
175 runnerJob: MRunnerJob
178 const { runnerJob, fromParent } = options
180 await this.specificCancel(options)
182 const cancelState = fromParent
183 ? RunnerJobState.PARENT_CANCELLED
184 : RunnerJobState.CANCELLED
186 runnerJob.setToErrorOrCancel(cancelState)
188 await retryTransactionWrapper(() => {
189 return sequelizeTypescript.transaction(async transaction => {
190 await runnerJob.save({ transaction })
194 const children = await RunnerJobModel.listChildrenOf(runnerJob)
195 for (const child of children) {
196 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
198 await this.cancel({ runnerJob: child, fromParent: true })
202 protected abstract specificCancel (options: {
203 runnerJob: MRunnerJob
204 }): Promise<void> | void
206 // ---------------------------------------------------------------------------
208 protected abstract isAbortSupported (): boolean
210 async abort (options: {
211 runnerJob: MRunnerJob
213 const { runnerJob } = options
215 if (this.isAbortSupported() !== true) {
216 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
219 await this.specificAbort(options)
221 runnerJob.resetToPending()
223 await retryTransactionWrapper(() => {
224 return sequelizeTypescript.transaction(async transaction => {
225 await runnerJob.save({ transaction })
230 protected setAbortState (runnerJob: MRunnerJob) {
231 runnerJob.resetToPending()
234 protected abstract specificAbort (options: {
235 runnerJob: MRunnerJob
236 }): Promise<void> | void
238 // ---------------------------------------------------------------------------
240 async error (options: {
241 runnerJob: MRunnerJob
245 const { runnerJob, message, fromParent } = options
247 const errorState = fromParent
248 ? RunnerJobState.PARENT_ERRORED
249 : RunnerJobState.ERRORED
251 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
252 ? RunnerJobState.PENDING
255 await this.specificError({ ...options, nextState })
257 if (nextState === errorState) {
258 runnerJob.setToErrorOrCancel(nextState)
259 runnerJob.error = message
261 runnerJob.resetToPending()
264 await retryTransactionWrapper(() => {
265 return sequelizeTypescript.transaction(async transaction => {
266 await runnerJob.save({ transaction })
270 if (runnerJob.state === errorState) {
271 const children = await RunnerJobModel.listChildrenOf(runnerJob)
273 for (const child of children) {
274 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
276 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
281 protected abstract specificError (options: {
282 runnerJob: MRunnerJob
284 nextState: RunnerJobState
285 }): Promise<void> | void