]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/runners/job-handlers/abstract-job-handler.ts
Add TMP persistent directory
[github/Chocobozzz/PeerTube.git] / server / lib / runners / job-handlers / abstract-job-handler.ts
1 import { retryTransactionWrapper } from '@server/helpers/database-utils'
2 import { logger, loggerTagsFactory } from '@server/helpers/logger'
3 import { RUNNER_JOBS } from '@server/initializers/constants'
4 import { sequelizeTypescript } from '@server/initializers/database'
5 import { PeerTubeSocket } from '@server/lib/peertube-socket'
6 import { RunnerJobModel } from '@server/models/runner/runner-job'
7 import { setAsUpdated } from '@server/models/shared'
8 import { MRunnerJob } from '@server/types/models/runners'
9 import { pick } from '@shared/core-utils'
10 import {
11 RunnerJobLiveRTMPHLSTranscodingPayload,
12 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
13 RunnerJobState,
14 RunnerJobSuccessPayload,
15 RunnerJobType,
16 RunnerJobUpdatePayload,
17 RunnerJobVODAudioMergeTranscodingPayload,
18 RunnerJobVODAudioMergeTranscodingPrivatePayload,
19 RunnerJobVODHLSTranscodingPayload,
20 RunnerJobVODHLSTranscodingPrivatePayload,
21 RunnerJobVODWebVideoTranscodingPayload,
22 RunnerJobVODWebVideoTranscodingPrivatePayload
23 } from '@shared/models'
24 import { throttle } from 'lodash'
25
26 type CreateRunnerJobArg =
27 {
28 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
29 payload: RunnerJobVODWebVideoTranscodingPayload
30 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
31 } |
32 {
33 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
34 payload: RunnerJobVODHLSTranscodingPayload
35 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
36 } |
37 {
38 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
39 payload: RunnerJobVODAudioMergeTranscodingPayload
40 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
41 } |
42 {
43 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
44 payload: RunnerJobLiveRTMPHLSTranscodingPayload
45 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
46 }
47
48 export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
49
50 protected readonly lTags = loggerTagsFactory('runner')
51
52 static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
53
54 // ---------------------------------------------------------------------------
55
56 abstract create (options: C): Promise<MRunnerJob>
57
58 protected async createRunnerJob (options: CreateRunnerJobArg & {
59 jobUUID: string
60 priority: number
61 dependsOnRunnerJob?: MRunnerJob
62 }): Promise<MRunnerJob> {
63 const { priority, dependsOnRunnerJob } = options
64
65 const runnerJob = new RunnerJobModel({
66 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
67
68 uuid: options.jobUUID,
69
70 state: dependsOnRunnerJob
71 ? RunnerJobState.WAITING_FOR_PARENT_JOB
72 : RunnerJobState.PENDING,
73
74 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
75
76 priority
77 })
78
79 const job = await sequelizeTypescript.transaction(async transaction => {
80 return runnerJob.save({ transaction })
81 })
82
83 if (runnerJob.state === RunnerJobState.PENDING) {
84 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
85 }
86
87 return job
88 }
89
90 // ---------------------------------------------------------------------------
91
92 protected abstract specificUpdate (options: {
93 runnerJob: MRunnerJob
94 updatePayload?: U
95 }): Promise<void> | void
96
97 async update (options: {
98 runnerJob: MRunnerJob
99 progress?: number
100 updatePayload?: U
101 }) {
102 const { runnerJob, progress } = options
103
104 await this.specificUpdate(options)
105
106 if (progress) runnerJob.progress = progress
107
108 if (!runnerJob.changed()) {
109 try {
110 await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
111 } catch (err) {
112 logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
113 }
114
115 return
116 }
117
118 await retryTransactionWrapper(() => {
119 return sequelizeTypescript.transaction(async transaction => {
120 return runnerJob.save({ transaction })
121 })
122 })
123 }
124
125 // ---------------------------------------------------------------------------
126
127 async complete (options: {
128 runnerJob: MRunnerJob
129 resultPayload: S
130 }) {
131 const { runnerJob } = options
132
133 try {
134 await this.specificComplete(options)
135
136 runnerJob.state = RunnerJobState.COMPLETED
137 } catch (err) {
138 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
139
140 runnerJob.state = RunnerJobState.ERRORED
141 runnerJob.error = err.message
142 }
143
144 runnerJob.progress = null
145 runnerJob.finishedAt = new Date()
146
147 await retryTransactionWrapper(() => {
148 return sequelizeTypescript.transaction(async transaction => {
149 await runnerJob.save({ transaction })
150 })
151 })
152
153 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
154
155 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
156 }
157
158 protected abstract specificComplete (options: {
159 runnerJob: MRunnerJob
160 resultPayload: S
161 }): Promise<void> | void
162
163 // ---------------------------------------------------------------------------
164
165 async cancel (options: {
166 runnerJob: MRunnerJob
167 fromParent?: boolean
168 }) {
169 const { runnerJob, fromParent } = options
170
171 await this.specificCancel(options)
172
173 const cancelState = fromParent
174 ? RunnerJobState.PARENT_CANCELLED
175 : RunnerJobState.CANCELLED
176
177 runnerJob.setToErrorOrCancel(cancelState)
178
179 await retryTransactionWrapper(() => {
180 return sequelizeTypescript.transaction(async transaction => {
181 await runnerJob.save({ transaction })
182 })
183 })
184
185 const children = await RunnerJobModel.listChildrenOf(runnerJob)
186 for (const child of children) {
187 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
188
189 await this.cancel({ runnerJob: child, fromParent: true })
190 }
191 }
192
193 protected abstract specificCancel (options: {
194 runnerJob: MRunnerJob
195 }): Promise<void> | void
196
197 // ---------------------------------------------------------------------------
198
199 protected abstract isAbortSupported (): boolean
200
201 async abort (options: {
202 runnerJob: MRunnerJob
203 }) {
204 const { runnerJob } = options
205
206 if (this.isAbortSupported() !== true) {
207 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
208 }
209
210 await this.specificAbort(options)
211
212 runnerJob.resetToPending()
213
214 await retryTransactionWrapper(() => {
215 return sequelizeTypescript.transaction(async transaction => {
216 await runnerJob.save({ transaction })
217 })
218 })
219 }
220
221 protected setAbortState (runnerJob: MRunnerJob) {
222 runnerJob.resetToPending()
223 }
224
225 protected abstract specificAbort (options: {
226 runnerJob: MRunnerJob
227 }): Promise<void> | void
228
229 // ---------------------------------------------------------------------------
230
231 async error (options: {
232 runnerJob: MRunnerJob
233 message: string
234 fromParent?: boolean
235 }) {
236 const { runnerJob, message, fromParent } = options
237
238 const errorState = fromParent
239 ? RunnerJobState.PARENT_ERRORED
240 : RunnerJobState.ERRORED
241
242 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
243 ? RunnerJobState.PENDING
244 : errorState
245
246 await this.specificError({ ...options, nextState })
247
248 if (nextState === errorState) {
249 runnerJob.setToErrorOrCancel(nextState)
250 runnerJob.error = message
251 } else {
252 runnerJob.resetToPending()
253 }
254
255 await retryTransactionWrapper(() => {
256 return sequelizeTypescript.transaction(async transaction => {
257 await runnerJob.save({ transaction })
258 })
259 })
260
261 if (runnerJob.state === errorState) {
262 const children = await RunnerJobModel.listChildrenOf(runnerJob)
263
264 for (const child of children) {
265 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
266
267 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
268 }
269 }
270 }
271
272 protected abstract specificError (options: {
273 runnerJob: MRunnerJob
274 message: string
275 nextState: RunnerJobState
276 }): Promise<void> | void
277 }