]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/runners/job-handlers/abstract-job-handler.ts
ca97d08816f3d4c1b2bbcd7814c2a0532ca38c29
[github/Chocobozzz/PeerTube.git] / server / lib / runners / job-handlers / abstract-job-handler.ts
1 import { throttle } from 'lodash'
2 import { retryTransactionWrapper, saveInTransactionWithRetries } from '@server/helpers/database-utils'
3 import { logger, loggerTagsFactory } from '@server/helpers/logger'
4 import { RUNNER_JOBS } from '@server/initializers/constants'
5 import { sequelizeTypescript } from '@server/initializers/database'
6 import { PeerTubeSocket } from '@server/lib/peertube-socket'
7 import { RunnerJobModel } from '@server/models/runner/runner-job'
8 import { setAsUpdated } from '@server/models/shared'
9 import { MRunnerJob } from '@server/types/models/runners'
10 import { pick } from '@shared/core-utils'
11 import {
12 RunnerJobLiveRTMPHLSTranscodingPayload,
13 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
14 RunnerJobState,
15 RunnerJobStudioTranscodingPayload,
16 RunnerJobSuccessPayload,
17 RunnerJobType,
18 RunnerJobUpdatePayload,
19 RunnerJobVideoStudioTranscodingPrivatePayload,
20 RunnerJobVODAudioMergeTranscodingPayload,
21 RunnerJobVODAudioMergeTranscodingPrivatePayload,
22 RunnerJobVODHLSTranscodingPayload,
23 RunnerJobVODHLSTranscodingPrivatePayload,
24 RunnerJobVODWebVideoTranscodingPayload,
25 RunnerJobVODWebVideoTranscodingPrivatePayload
26 } from '@shared/models'
27
28 type CreateRunnerJobArg =
29 {
30 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
31 payload: RunnerJobVODWebVideoTranscodingPayload
32 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
33 } |
34 {
35 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
36 payload: RunnerJobVODHLSTranscodingPayload
37 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
38 } |
39 {
40 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
41 payload: RunnerJobVODAudioMergeTranscodingPayload
42 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
43 } |
44 {
45 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
46 payload: RunnerJobLiveRTMPHLSTranscodingPayload
47 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
48 } |
49 {
50 type: Extract<RunnerJobType, 'video-studio-transcoding'>
51 payload: RunnerJobStudioTranscodingPayload
52 privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
53 }
54
55 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
56
57 protected readonly lTags = loggerTagsFactory('runner')
58
59 static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
60
61 // ---------------------------------------------------------------------------
62
63 abstract create (options: C): Promise<MRunnerJob>
64
65 protected async createRunnerJob (options: CreateRunnerJobArg & {
66 jobUUID: string
67 priority: number
68 dependsOnRunnerJob?: MRunnerJob
69 }): Promise<MRunnerJob> {
70 const { priority, dependsOnRunnerJob } = options
71
72 logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
73
74 const runnerJob = new RunnerJobModel({
75 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
76
77 uuid: options.jobUUID,
78
79 state: dependsOnRunnerJob
80 ? RunnerJobState.WAITING_FOR_PARENT_JOB
81 : RunnerJobState.PENDING,
82
83 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
84
85 priority
86 })
87
88 const job = await sequelizeTypescript.transaction(async transaction => {
89 return runnerJob.save({ transaction })
90 })
91
92 if (runnerJob.state === RunnerJobState.PENDING) {
93 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
94 }
95
96 return job
97 }
98
99 // ---------------------------------------------------------------------------
100
101 protected abstract specificUpdate (options: {
102 runnerJob: MRunnerJob
103 updatePayload?: U
104 }): Promise<void> | void
105
106 async update (options: {
107 runnerJob: MRunnerJob
108 progress?: number
109 updatePayload?: U
110 }) {
111 const { runnerJob, progress } = options
112
113 await this.specificUpdate(options)
114
115 if (progress) runnerJob.progress = progress
116
117 if (!runnerJob.changed()) {
118 try {
119 await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
120 } catch (err) {
121 logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
122 }
123
124 return
125 }
126
127 await retryTransactionWrapper(() => {
128 return sequelizeTypescript.transaction(async transaction => {
129 return runnerJob.save({ transaction })
130 })
131 })
132 }
133
134 // ---------------------------------------------------------------------------
135
136 async complete (options: {
137 runnerJob: MRunnerJob
138 resultPayload: S
139 }) {
140 const { runnerJob } = options
141
142 runnerJob.state = RunnerJobState.COMPLETING
143 await saveInTransactionWithRetries(runnerJob)
144
145 try {
146 await this.specificComplete(options)
147
148 runnerJob.state = RunnerJobState.COMPLETED
149 } catch (err) {
150 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
151
152 runnerJob.state = RunnerJobState.ERRORED
153 runnerJob.error = err.message
154 }
155
156 runnerJob.progress = null
157 runnerJob.finishedAt = new Date()
158
159 await saveInTransactionWithRetries(runnerJob)
160
161 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
162
163 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
164 }
165
166 protected abstract specificComplete (options: {
167 runnerJob: MRunnerJob
168 resultPayload: S
169 }): Promise<void> | void
170
171 // ---------------------------------------------------------------------------
172
173 async cancel (options: {
174 runnerJob: MRunnerJob
175 fromParent?: boolean
176 }) {
177 const { runnerJob, fromParent } = options
178
179 await this.specificCancel(options)
180
181 const cancelState = fromParent
182 ? RunnerJobState.PARENT_CANCELLED
183 : RunnerJobState.CANCELLED
184
185 runnerJob.setToErrorOrCancel(cancelState)
186
187 await retryTransactionWrapper(() => {
188 return sequelizeTypescript.transaction(async transaction => {
189 await runnerJob.save({ transaction })
190 })
191 })
192
193 const children = await RunnerJobModel.listChildrenOf(runnerJob)
194 for (const child of children) {
195 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
196
197 await this.cancel({ runnerJob: child, fromParent: true })
198 }
199 }
200
201 protected abstract specificCancel (options: {
202 runnerJob: MRunnerJob
203 }): Promise<void> | void
204
205 // ---------------------------------------------------------------------------
206
207 protected abstract isAbortSupported (): boolean
208
209 async abort (options: {
210 runnerJob: MRunnerJob
211 }) {
212 const { runnerJob } = options
213
214 if (this.isAbortSupported() !== true) {
215 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
216 }
217
218 await this.specificAbort(options)
219
220 runnerJob.resetToPending()
221
222 await retryTransactionWrapper(() => {
223 return sequelizeTypescript.transaction(async transaction => {
224 await runnerJob.save({ transaction })
225 })
226 })
227 }
228
229 protected setAbortState (runnerJob: MRunnerJob) {
230 runnerJob.resetToPending()
231 }
232
233 protected abstract specificAbort (options: {
234 runnerJob: MRunnerJob
235 }): Promise<void> | void
236
237 // ---------------------------------------------------------------------------
238
239 async error (options: {
240 runnerJob: MRunnerJob
241 message: string
242 fromParent?: boolean
243 }) {
244 const { runnerJob, message, fromParent } = options
245
246 const errorState = fromParent
247 ? RunnerJobState.PARENT_ERRORED
248 : RunnerJobState.ERRORED
249
250 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
251 ? RunnerJobState.PENDING
252 : errorState
253
254 await this.specificError({ ...options, nextState })
255
256 if (nextState === errorState) {
257 runnerJob.setToErrorOrCancel(nextState)
258 runnerJob.error = message
259 } else {
260 runnerJob.resetToPending()
261 }
262
263 await retryTransactionWrapper(() => {
264 return sequelizeTypescript.transaction(async transaction => {
265 await runnerJob.save({ transaction })
266 })
267 })
268
269 if (runnerJob.state === errorState) {
270 const children = await RunnerJobModel.listChildrenOf(runnerJob)
271
272 for (const child of children) {
273 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
274
275 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
276 }
277 }
278 }
279
280 protected abstract specificError (options: {
281 runnerJob: MRunnerJob
282 message: string
283 nextState: RunnerJobState
284 }): Promise<void> | void
285 }