]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/runners/job-handlers/abstract-job-handler.ts
73fc1457407f9719cd686d10c987bc42ed27453d
[github/Chocobozzz/PeerTube.git] / server / lib / runners / job-handlers / abstract-job-handler.ts
1 import { retryTransactionWrapper } from '@server/helpers/database-utils'
2 import { logger, loggerTagsFactory } from '@server/helpers/logger'
3 import { RUNNER_JOBS } from '@server/initializers/constants'
4 import { sequelizeTypescript } from '@server/initializers/database'
5 import { PeerTubeSocket } from '@server/lib/peertube-socket'
6 import { RunnerJobModel } from '@server/models/runner/runner-job'
7 import { setAsUpdated } from '@server/models/shared'
8 import { MRunnerJob } from '@server/types/models/runners'
9 import { pick } from '@shared/core-utils'
10 import {
11 RunnerJobLiveRTMPHLSTranscodingPayload,
12 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
13 RunnerJobState,
14 RunnerJobSuccessPayload,
15 RunnerJobType,
16 RunnerJobUpdatePayload,
17 RunnerJobVODAudioMergeTranscodingPayload,
18 RunnerJobVODAudioMergeTranscodingPrivatePayload,
19 RunnerJobVODHLSTranscodingPayload,
20 RunnerJobVODHLSTranscodingPrivatePayload,
21 RunnerJobVODWebVideoTranscodingPayload,
22 RunnerJobVODWebVideoTranscodingPrivatePayload
23 } from '@shared/models'
24
25 type CreateRunnerJobArg =
26 {
27 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
28 payload: RunnerJobVODWebVideoTranscodingPayload
29 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
30 } |
31 {
32 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
33 payload: RunnerJobVODHLSTranscodingPayload
34 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
35 } |
36 {
37 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
38 payload: RunnerJobVODAudioMergeTranscodingPayload
39 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
40 } |
41 {
42 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
43 payload: RunnerJobLiveRTMPHLSTranscodingPayload
44 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
45 }
46
47 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
48
49 protected readonly lTags = loggerTagsFactory('runner')
50
51 // ---------------------------------------------------------------------------
52
53 abstract create (options: C): Promise<MRunnerJob>
54
55 protected async createRunnerJob (options: CreateRunnerJobArg & {
56 jobUUID: string
57 priority: number
58 dependsOnRunnerJob?: MRunnerJob
59 }): Promise<MRunnerJob> {
60 const { priority, dependsOnRunnerJob } = options
61
62 const runnerJob = new RunnerJobModel({
63 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
64
65 uuid: options.jobUUID,
66
67 state: dependsOnRunnerJob
68 ? RunnerJobState.WAITING_FOR_PARENT_JOB
69 : RunnerJobState.PENDING,
70
71 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
72
73 priority
74 })
75
76 const job = await sequelizeTypescript.transaction(async transaction => {
77 return runnerJob.save({ transaction })
78 })
79
80 if (runnerJob.state === RunnerJobState.PENDING) {
81 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
82 }
83
84 return job
85 }
86
87 // ---------------------------------------------------------------------------
88
89 protected abstract specificUpdate (options: {
90 runnerJob: MRunnerJob
91 updatePayload?: U
92 }): Promise<void> | void
93
94 async update (options: {
95 runnerJob: MRunnerJob
96 progress?: number
97 updatePayload?: U
98 }) {
99 const { runnerJob, progress } = options
100
101 await this.specificUpdate(options)
102
103 if (progress) runnerJob.progress = progress
104
105 await retryTransactionWrapper(() => {
106 return sequelizeTypescript.transaction(async transaction => {
107 if (runnerJob.changed()) {
108 return runnerJob.save({ transaction })
109 }
110
111 // Don't update the job too often
112 if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
113 await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
114 }
115 })
116 })
117 }
118
119 // ---------------------------------------------------------------------------
120
121 async complete (options: {
122 runnerJob: MRunnerJob
123 resultPayload: S
124 }) {
125 const { runnerJob } = options
126
127 try {
128 await this.specificComplete(options)
129
130 runnerJob.state = RunnerJobState.COMPLETED
131 } catch (err) {
132 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
133
134 runnerJob.state = RunnerJobState.ERRORED
135 runnerJob.error = err.message
136 }
137
138 runnerJob.progress = null
139 runnerJob.finishedAt = new Date()
140
141 await retryTransactionWrapper(() => {
142 return sequelizeTypescript.transaction(async transaction => {
143 await runnerJob.save({ transaction })
144 })
145 })
146
147 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
148
149 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
150 }
151
152 protected abstract specificComplete (options: {
153 runnerJob: MRunnerJob
154 resultPayload: S
155 }): Promise<void> | void
156
157 // ---------------------------------------------------------------------------
158
159 async cancel (options: {
160 runnerJob: MRunnerJob
161 fromParent?: boolean
162 }) {
163 const { runnerJob, fromParent } = options
164
165 await this.specificCancel(options)
166
167 const cancelState = fromParent
168 ? RunnerJobState.PARENT_CANCELLED
169 : RunnerJobState.CANCELLED
170
171 runnerJob.setToErrorOrCancel(cancelState)
172
173 await retryTransactionWrapper(() => {
174 return sequelizeTypescript.transaction(async transaction => {
175 await runnerJob.save({ transaction })
176 })
177 })
178
179 const children = await RunnerJobModel.listChildrenOf(runnerJob)
180 for (const child of children) {
181 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
182
183 await this.cancel({ runnerJob: child, fromParent: true })
184 }
185 }
186
187 protected abstract specificCancel (options: {
188 runnerJob: MRunnerJob
189 }): Promise<void> | void
190
191 // ---------------------------------------------------------------------------
192
193 protected abstract isAbortSupported (): boolean
194
195 async abort (options: {
196 runnerJob: MRunnerJob
197 }) {
198 const { runnerJob } = options
199
200 if (this.isAbortSupported() !== true) {
201 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
202 }
203
204 await this.specificAbort(options)
205
206 runnerJob.resetToPending()
207
208 await retryTransactionWrapper(() => {
209 return sequelizeTypescript.transaction(async transaction => {
210 await runnerJob.save({ transaction })
211 })
212 })
213 }
214
215 protected setAbortState (runnerJob: MRunnerJob) {
216 runnerJob.resetToPending()
217 }
218
219 protected abstract specificAbort (options: {
220 runnerJob: MRunnerJob
221 }): Promise<void> | void
222
223 // ---------------------------------------------------------------------------
224
225 async error (options: {
226 runnerJob: MRunnerJob
227 message: string
228 fromParent?: boolean
229 }) {
230 const { runnerJob, message, fromParent } = options
231
232 const errorState = fromParent
233 ? RunnerJobState.PARENT_ERRORED
234 : RunnerJobState.ERRORED
235
236 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
237 ? RunnerJobState.PENDING
238 : errorState
239
240 await this.specificError({ ...options, nextState })
241
242 if (nextState === errorState) {
243 runnerJob.setToErrorOrCancel(nextState)
244 runnerJob.error = message
245 } else {
246 runnerJob.resetToPending()
247 }
248
249 await retryTransactionWrapper(() => {
250 return sequelizeTypescript.transaction(async transaction => {
251 await runnerJob.save({ transaction })
252 })
253 })
254
255 if (runnerJob.state === errorState) {
256 const children = await RunnerJobModel.listChildrenOf(runnerJob)
257
258 for (const child of children) {
259 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
260
261 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
262 }
263 }
264 }
265
266 protected abstract specificError (options: {
267 runnerJob: MRunnerJob
268 message: string
269 nextState: RunnerJobState
270 }): Promise<void> | void
271 }