]>
Commit | Line | Data |
---|---|---|
5e47f6ab | 1 | import { throttle } from 'lodash' |
0c9668f7 C |
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
4 | import { RUNNER_JOBS } from '@server/initializers/constants' | |
5 | import { sequelizeTypescript } from '@server/initializers/database' | |
6 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | |
7 | import { RunnerJobModel } from '@server/models/runner/runner-job' | |
8 | import { setAsUpdated } from '@server/models/shared' | |
9 | import { MRunnerJob } from '@server/types/models/runners' | |
10 | import { pick } from '@shared/core-utils' | |
11 | import { | |
12 | RunnerJobLiveRTMPHLSTranscodingPayload, | |
13 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | |
14 | RunnerJobState, | |
15 | RunnerJobSuccessPayload, | |
16 | RunnerJobType, | |
17 | RunnerJobUpdatePayload, | |
ab14f0e0 C |
18 | RunnerJobStudioTranscodingPayload, |
19 | RunnerJobVideoStudioTranscodingPrivatePayload, | |
0c9668f7 C |
20 | RunnerJobVODAudioMergeTranscodingPayload, |
21 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | |
22 | RunnerJobVODHLSTranscodingPayload, | |
23 | RunnerJobVODHLSTranscodingPrivatePayload, | |
24 | RunnerJobVODWebVideoTranscodingPayload, | |
25 | RunnerJobVODWebVideoTranscodingPrivatePayload | |
26 | } from '@shared/models' | |
27 | ||
28 | type CreateRunnerJobArg = | |
29 | { | |
30 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | |
31 | payload: RunnerJobVODWebVideoTranscodingPayload | |
32 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | |
33 | } | | |
34 | { | |
35 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | |
36 | payload: RunnerJobVODHLSTranscodingPayload | |
37 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | |
38 | } | | |
39 | { | |
40 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | |
41 | payload: RunnerJobVODAudioMergeTranscodingPayload | |
42 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | |
43 | } | | |
44 | { | |
45 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | |
46 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | |
47 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | |
5e47f6ab C |
48 | } | |
49 | { | |
ab14f0e0 C |
50 | type: Extract<RunnerJobType, 'video-studio-transcoding'> |
51 | payload: RunnerJobStudioTranscodingPayload | |
52 | privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload | |
0c9668f7 C |
53 | } |
54 | ||
55 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | |
56 | ||
57 | protected readonly lTags = loggerTagsFactory('runner') | |
58 | ||
3a0c2a77 C |
59 | static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000) |
60 | ||
0c9668f7 C |
61 | // --------------------------------------------------------------------------- |
62 | ||
63 | abstract create (options: C): Promise<MRunnerJob> | |
64 | ||
65 | protected async createRunnerJob (options: CreateRunnerJobArg & { | |
66 | jobUUID: string | |
67 | priority: number | |
68 | dependsOnRunnerJob?: MRunnerJob | |
69 | }): Promise<MRunnerJob> { | |
70 | const { priority, dependsOnRunnerJob } = options | |
71 | ||
5e47f6ab C |
72 | logger.debug('Creating runner job', { options, ...this.lTags(options.type) }) |
73 | ||
0c9668f7 C |
74 | const runnerJob = new RunnerJobModel({ |
75 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | |
76 | ||
77 | uuid: options.jobUUID, | |
78 | ||
79 | state: dependsOnRunnerJob | |
80 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | |
81 | : RunnerJobState.PENDING, | |
82 | ||
83 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | |
84 | ||
85 | priority | |
86 | }) | |
87 | ||
88 | const job = await sequelizeTypescript.transaction(async transaction => { | |
89 | return runnerJob.save({ transaction }) | |
90 | }) | |
91 | ||
92 | if (runnerJob.state === RunnerJobState.PENDING) { | |
93 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | |
94 | } | |
95 | ||
96 | return job | |
97 | } | |
98 | ||
99 | // --------------------------------------------------------------------------- | |
100 | ||
101 | protected abstract specificUpdate (options: { | |
102 | runnerJob: MRunnerJob | |
103 | updatePayload?: U | |
104 | }): Promise<void> | void | |
105 | ||
106 | async update (options: { | |
107 | runnerJob: MRunnerJob | |
108 | progress?: number | |
109 | updatePayload?: U | |
110 | }) { | |
111 | const { runnerJob, progress } = options | |
112 | ||
113 | await this.specificUpdate(options) | |
114 | ||
115 | if (progress) runnerJob.progress = progress | |
116 | ||
3a0c2a77 C |
117 | if (!runnerJob.changed()) { |
118 | try { | |
119 | await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id }) | |
120 | } catch (err) { | |
121 | logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | |
122 | } | |
123 | ||
124 | return | |
125 | } | |
126 | ||
0c9668f7 C |
127 | await retryTransactionWrapper(() => { |
128 | return sequelizeTypescript.transaction(async transaction => { | |
3a0c2a77 | 129 | return runnerJob.save({ transaction }) |
0c9668f7 C |
130 | }) |
131 | }) | |
132 | } | |
133 | ||
134 | // --------------------------------------------------------------------------- | |
135 | ||
136 | async complete (options: { | |
137 | runnerJob: MRunnerJob | |
138 | resultPayload: S | |
139 | }) { | |
140 | const { runnerJob } = options | |
141 | ||
142 | try { | |
143 | await this.specificComplete(options) | |
144 | ||
145 | runnerJob.state = RunnerJobState.COMPLETED | |
146 | } catch (err) { | |
147 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | |
148 | ||
149 | runnerJob.state = RunnerJobState.ERRORED | |
150 | runnerJob.error = err.message | |
151 | } | |
152 | ||
153 | runnerJob.progress = null | |
154 | runnerJob.finishedAt = new Date() | |
155 | ||
156 | await retryTransactionWrapper(() => { | |
157 | return sequelizeTypescript.transaction(async transaction => { | |
158 | await runnerJob.save({ transaction }) | |
159 | }) | |
160 | }) | |
161 | ||
162 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | |
163 | ||
164 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | |
165 | } | |
166 | ||
167 | protected abstract specificComplete (options: { | |
168 | runnerJob: MRunnerJob | |
169 | resultPayload: S | |
170 | }): Promise<void> | void | |
171 | ||
172 | // --------------------------------------------------------------------------- | |
173 | ||
174 | async cancel (options: { | |
175 | runnerJob: MRunnerJob | |
176 | fromParent?: boolean | |
177 | }) { | |
178 | const { runnerJob, fromParent } = options | |
179 | ||
180 | await this.specificCancel(options) | |
181 | ||
182 | const cancelState = fromParent | |
183 | ? RunnerJobState.PARENT_CANCELLED | |
184 | : RunnerJobState.CANCELLED | |
185 | ||
186 | runnerJob.setToErrorOrCancel(cancelState) | |
187 | ||
188 | await retryTransactionWrapper(() => { | |
189 | return sequelizeTypescript.transaction(async transaction => { | |
190 | await runnerJob.save({ transaction }) | |
191 | }) | |
192 | }) | |
193 | ||
194 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | |
195 | for (const child of children) { | |
196 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | |
197 | ||
198 | await this.cancel({ runnerJob: child, fromParent: true }) | |
199 | } | |
200 | } | |
201 | ||
202 | protected abstract specificCancel (options: { | |
203 | runnerJob: MRunnerJob | |
204 | }): Promise<void> | void | |
205 | ||
206 | // --------------------------------------------------------------------------- | |
207 | ||
208 | protected abstract isAbortSupported (): boolean | |
209 | ||
210 | async abort (options: { | |
211 | runnerJob: MRunnerJob | |
212 | }) { | |
213 | const { runnerJob } = options | |
214 | ||
215 | if (this.isAbortSupported() !== true) { | |
216 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | |
217 | } | |
218 | ||
219 | await this.specificAbort(options) | |
220 | ||
221 | runnerJob.resetToPending() | |
222 | ||
223 | await retryTransactionWrapper(() => { | |
224 | return sequelizeTypescript.transaction(async transaction => { | |
225 | await runnerJob.save({ transaction }) | |
226 | }) | |
227 | }) | |
228 | } | |
229 | ||
230 | protected setAbortState (runnerJob: MRunnerJob) { | |
231 | runnerJob.resetToPending() | |
232 | } | |
233 | ||
234 | protected abstract specificAbort (options: { | |
235 | runnerJob: MRunnerJob | |
236 | }): Promise<void> | void | |
237 | ||
238 | // --------------------------------------------------------------------------- | |
239 | ||
240 | async error (options: { | |
241 | runnerJob: MRunnerJob | |
242 | message: string | |
243 | fromParent?: boolean | |
244 | }) { | |
245 | const { runnerJob, message, fromParent } = options | |
246 | ||
247 | const errorState = fromParent | |
248 | ? RunnerJobState.PARENT_ERRORED | |
249 | : RunnerJobState.ERRORED | |
250 | ||
251 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | |
252 | ? RunnerJobState.PENDING | |
253 | : errorState | |
254 | ||
255 | await this.specificError({ ...options, nextState }) | |
256 | ||
257 | if (nextState === errorState) { | |
258 | runnerJob.setToErrorOrCancel(nextState) | |
259 | runnerJob.error = message | |
260 | } else { | |
261 | runnerJob.resetToPending() | |
262 | } | |
263 | ||
264 | await retryTransactionWrapper(() => { | |
265 | return sequelizeTypescript.transaction(async transaction => { | |
266 | await runnerJob.save({ transaction }) | |
267 | }) | |
268 | }) | |
269 | ||
270 | if (runnerJob.state === errorState) { | |
271 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | |
272 | ||
273 | for (const child of children) { | |
274 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | |
275 | ||
276 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | |
277 | } | |
278 | } | |
279 | } | |
280 | ||
281 | protected abstract specificError (options: { | |
282 | runnerJob: MRunnerJob | |
283 | message: string | |
284 | nextState: RunnerJobState | |
285 | }): Promise<void> | void | |
286 | } |