]>
Commit | Line | Data |
---|---|---|
0c9668f7 C |
1 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
3 | import { RUNNER_JOBS } from '@server/initializers/constants' | |
4 | import { sequelizeTypescript } from '@server/initializers/database' | |
5 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | |
6 | import { RunnerJobModel } from '@server/models/runner/runner-job' | |
7 | import { setAsUpdated } from '@server/models/shared' | |
8 | import { MRunnerJob } from '@server/types/models/runners' | |
9 | import { pick } from '@shared/core-utils' | |
10 | import { | |
11 | RunnerJobLiveRTMPHLSTranscodingPayload, | |
12 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | |
13 | RunnerJobState, | |
14 | RunnerJobSuccessPayload, | |
15 | RunnerJobType, | |
16 | RunnerJobUpdatePayload, | |
17 | RunnerJobVODAudioMergeTranscodingPayload, | |
18 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | |
19 | RunnerJobVODHLSTranscodingPayload, | |
20 | RunnerJobVODHLSTranscodingPrivatePayload, | |
21 | RunnerJobVODWebVideoTranscodingPayload, | |
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | |
23 | } from '@shared/models' | |
3a0c2a77 | 24 | import { throttle } from 'lodash' |
0c9668f7 C |
25 | |
26 | type CreateRunnerJobArg = | |
27 | { | |
28 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | |
29 | payload: RunnerJobVODWebVideoTranscodingPayload | |
30 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | |
31 | } | | |
32 | { | |
33 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | |
34 | payload: RunnerJobVODHLSTranscodingPayload | |
35 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | |
36 | } | | |
37 | { | |
38 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | |
39 | payload: RunnerJobVODAudioMergeTranscodingPayload | |
40 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | |
41 | } | | |
42 | { | |
43 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | |
44 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | |
45 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | |
46 | } | |
47 | ||
48 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | |
49 | ||
50 | protected readonly lTags = loggerTagsFactory('runner') | |
51 | ||
3a0c2a77 C |
52 | static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000) |
53 | ||
0c9668f7 C |
54 | // --------------------------------------------------------------------------- |
55 | ||
56 | abstract create (options: C): Promise<MRunnerJob> | |
57 | ||
58 | protected async createRunnerJob (options: CreateRunnerJobArg & { | |
59 | jobUUID: string | |
60 | priority: number | |
61 | dependsOnRunnerJob?: MRunnerJob | |
62 | }): Promise<MRunnerJob> { | |
63 | const { priority, dependsOnRunnerJob } = options | |
64 | ||
65 | const runnerJob = new RunnerJobModel({ | |
66 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | |
67 | ||
68 | uuid: options.jobUUID, | |
69 | ||
70 | state: dependsOnRunnerJob | |
71 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | |
72 | : RunnerJobState.PENDING, | |
73 | ||
74 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | |
75 | ||
76 | priority | |
77 | }) | |
78 | ||
79 | const job = await sequelizeTypescript.transaction(async transaction => { | |
80 | return runnerJob.save({ transaction }) | |
81 | }) | |
82 | ||
83 | if (runnerJob.state === RunnerJobState.PENDING) { | |
84 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | |
85 | } | |
86 | ||
87 | return job | |
88 | } | |
89 | ||
90 | // --------------------------------------------------------------------------- | |
91 | ||
92 | protected abstract specificUpdate (options: { | |
93 | runnerJob: MRunnerJob | |
94 | updatePayload?: U | |
95 | }): Promise<void> | void | |
96 | ||
97 | async update (options: { | |
98 | runnerJob: MRunnerJob | |
99 | progress?: number | |
100 | updatePayload?: U | |
101 | }) { | |
102 | const { runnerJob, progress } = options | |
103 | ||
104 | await this.specificUpdate(options) | |
105 | ||
106 | if (progress) runnerJob.progress = progress | |
107 | ||
3a0c2a77 C |
108 | if (!runnerJob.changed()) { |
109 | try { | |
110 | await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id }) | |
111 | } catch (err) { | |
112 | logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | |
113 | } | |
114 | ||
115 | return | |
116 | } | |
117 | ||
0c9668f7 C |
118 | await retryTransactionWrapper(() => { |
119 | return sequelizeTypescript.transaction(async transaction => { | |
3a0c2a77 | 120 | return runnerJob.save({ transaction }) |
0c9668f7 C |
121 | }) |
122 | }) | |
123 | } | |
124 | ||
125 | // --------------------------------------------------------------------------- | |
126 | ||
127 | async complete (options: { | |
128 | runnerJob: MRunnerJob | |
129 | resultPayload: S | |
130 | }) { | |
131 | const { runnerJob } = options | |
132 | ||
133 | try { | |
134 | await this.specificComplete(options) | |
135 | ||
136 | runnerJob.state = RunnerJobState.COMPLETED | |
137 | } catch (err) { | |
138 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | |
139 | ||
140 | runnerJob.state = RunnerJobState.ERRORED | |
141 | runnerJob.error = err.message | |
142 | } | |
143 | ||
144 | runnerJob.progress = null | |
145 | runnerJob.finishedAt = new Date() | |
146 | ||
147 | await retryTransactionWrapper(() => { | |
148 | return sequelizeTypescript.transaction(async transaction => { | |
149 | await runnerJob.save({ transaction }) | |
150 | }) | |
151 | }) | |
152 | ||
153 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | |
154 | ||
155 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | |
156 | } | |
157 | ||
158 | protected abstract specificComplete (options: { | |
159 | runnerJob: MRunnerJob | |
160 | resultPayload: S | |
161 | }): Promise<void> | void | |
162 | ||
163 | // --------------------------------------------------------------------------- | |
164 | ||
165 | async cancel (options: { | |
166 | runnerJob: MRunnerJob | |
167 | fromParent?: boolean | |
168 | }) { | |
169 | const { runnerJob, fromParent } = options | |
170 | ||
171 | await this.specificCancel(options) | |
172 | ||
173 | const cancelState = fromParent | |
174 | ? RunnerJobState.PARENT_CANCELLED | |
175 | : RunnerJobState.CANCELLED | |
176 | ||
177 | runnerJob.setToErrorOrCancel(cancelState) | |
178 | ||
179 | await retryTransactionWrapper(() => { | |
180 | return sequelizeTypescript.transaction(async transaction => { | |
181 | await runnerJob.save({ transaction }) | |
182 | }) | |
183 | }) | |
184 | ||
185 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | |
186 | for (const child of children) { | |
187 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | |
188 | ||
189 | await this.cancel({ runnerJob: child, fromParent: true }) | |
190 | } | |
191 | } | |
192 | ||
193 | protected abstract specificCancel (options: { | |
194 | runnerJob: MRunnerJob | |
195 | }): Promise<void> | void | |
196 | ||
197 | // --------------------------------------------------------------------------- | |
198 | ||
199 | protected abstract isAbortSupported (): boolean | |
200 | ||
201 | async abort (options: { | |
202 | runnerJob: MRunnerJob | |
203 | }) { | |
204 | const { runnerJob } = options | |
205 | ||
206 | if (this.isAbortSupported() !== true) { | |
207 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | |
208 | } | |
209 | ||
210 | await this.specificAbort(options) | |
211 | ||
212 | runnerJob.resetToPending() | |
213 | ||
214 | await retryTransactionWrapper(() => { | |
215 | return sequelizeTypescript.transaction(async transaction => { | |
216 | await runnerJob.save({ transaction }) | |
217 | }) | |
218 | }) | |
219 | } | |
220 | ||
221 | protected setAbortState (runnerJob: MRunnerJob) { | |
222 | runnerJob.resetToPending() | |
223 | } | |
224 | ||
225 | protected abstract specificAbort (options: { | |
226 | runnerJob: MRunnerJob | |
227 | }): Promise<void> | void | |
228 | ||
229 | // --------------------------------------------------------------------------- | |
230 | ||
231 | async error (options: { | |
232 | runnerJob: MRunnerJob | |
233 | message: string | |
234 | fromParent?: boolean | |
235 | }) { | |
236 | const { runnerJob, message, fromParent } = options | |
237 | ||
238 | const errorState = fromParent | |
239 | ? RunnerJobState.PARENT_ERRORED | |
240 | : RunnerJobState.ERRORED | |
241 | ||
242 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | |
243 | ? RunnerJobState.PENDING | |
244 | : errorState | |
245 | ||
246 | await this.specificError({ ...options, nextState }) | |
247 | ||
248 | if (nextState === errorState) { | |
249 | runnerJob.setToErrorOrCancel(nextState) | |
250 | runnerJob.error = message | |
251 | } else { | |
252 | runnerJob.resetToPending() | |
253 | } | |
254 | ||
255 | await retryTransactionWrapper(() => { | |
256 | return sequelizeTypescript.transaction(async transaction => { | |
257 | await runnerJob.save({ transaction }) | |
258 | }) | |
259 | }) | |
260 | ||
261 | if (runnerJob.state === errorState) { | |
262 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | |
263 | ||
264 | for (const child of children) { | |
265 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | |
266 | ||
267 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | |
268 | } | |
269 | } | |
270 | } | |
271 | ||
272 | protected abstract specificError (options: { | |
273 | runnerJob: MRunnerJob | |
274 | message: string | |
275 | nextState: RunnerJobState | |
276 | }): Promise<void> | void | |
277 | } |