diff options
Diffstat (limited to 'server/lib/runners/job-handlers/abstract-job-handler.ts')
-rw-r--r-- | server/lib/runners/job-handlers/abstract-job-handler.ts | 269 |
1 files changed, 0 insertions, 269 deletions
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts deleted file mode 100644 index 329977de1..000000000 --- a/server/lib/runners/job-handlers/abstract-job-handler.ts +++ /dev/null | |||
@@ -1,269 +0,0 @@ | |||
1 | import { throttle } from 'lodash' | ||
2 | import { saveInTransactionWithRetries } from '@server/helpers/database-utils' | ||
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
4 | import { RUNNER_JOBS } from '@server/initializers/constants' | ||
5 | import { sequelizeTypescript } from '@server/initializers/database' | ||
6 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | ||
7 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
8 | import { setAsUpdated } from '@server/models/shared' | ||
9 | import { MRunnerJob } from '@server/types/models/runners' | ||
10 | import { pick } from '@shared/core-utils' | ||
11 | import { | ||
12 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
14 | RunnerJobState, | ||
15 | RunnerJobStudioTranscodingPayload, | ||
16 | RunnerJobSuccessPayload, | ||
17 | RunnerJobType, | ||
18 | RunnerJobUpdatePayload, | ||
19 | RunnerJobVideoStudioTranscodingPrivatePayload, | ||
20 | RunnerJobVODAudioMergeTranscodingPayload, | ||
21 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | ||
22 | RunnerJobVODHLSTranscodingPayload, | ||
23 | RunnerJobVODHLSTranscodingPrivatePayload, | ||
24 | RunnerJobVODWebVideoTranscodingPayload, | ||
25 | RunnerJobVODWebVideoTranscodingPrivatePayload | ||
26 | } from '@shared/models' | ||
27 | |||
28 | type CreateRunnerJobArg = | ||
29 | { | ||
30 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | ||
31 | payload: RunnerJobVODWebVideoTranscodingPayload | ||
32 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | ||
33 | } | | ||
34 | { | ||
35 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | ||
36 | payload: RunnerJobVODHLSTranscodingPayload | ||
37 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | ||
38 | } | | ||
39 | { | ||
40 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | ||
41 | payload: RunnerJobVODAudioMergeTranscodingPayload | ||
42 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | ||
43 | } | | ||
44 | { | ||
45 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | ||
46 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | ||
47 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
48 | } | | ||
49 | { | ||
50 | type: Extract<RunnerJobType, 'video-studio-transcoding'> | ||
51 | payload: RunnerJobStudioTranscodingPayload | ||
52 | privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload | ||
53 | } | ||
54 | |||
55 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | ||
56 | |||
57 | protected readonly lTags = loggerTagsFactory('runner') | ||
58 | |||
59 | static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000) | ||
60 | |||
61 | // --------------------------------------------------------------------------- | ||
62 | |||
63 | abstract create (options: C): Promise<MRunnerJob> | ||
64 | |||
65 | protected async createRunnerJob (options: CreateRunnerJobArg & { | ||
66 | jobUUID: string | ||
67 | priority: number | ||
68 | dependsOnRunnerJob?: MRunnerJob | ||
69 | }): Promise<MRunnerJob> { | ||
70 | const { priority, dependsOnRunnerJob } = options | ||
71 | |||
72 | logger.debug('Creating runner job', { options, ...this.lTags(options.type) }) | ||
73 | |||
74 | const runnerJob = new RunnerJobModel({ | ||
75 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | ||
76 | |||
77 | uuid: options.jobUUID, | ||
78 | |||
79 | state: dependsOnRunnerJob | ||
80 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | ||
81 | : RunnerJobState.PENDING, | ||
82 | |||
83 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | ||
84 | |||
85 | priority | ||
86 | }) | ||
87 | |||
88 | const job = await sequelizeTypescript.transaction(async transaction => { | ||
89 | return runnerJob.save({ transaction }) | ||
90 | }) | ||
91 | |||
92 | if (runnerJob.state === RunnerJobState.PENDING) { | ||
93 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
94 | } | ||
95 | |||
96 | return job | ||
97 | } | ||
98 | |||
99 | // --------------------------------------------------------------------------- | ||
100 | |||
101 | protected abstract specificUpdate (options: { | ||
102 | runnerJob: MRunnerJob | ||
103 | updatePayload?: U | ||
104 | }): Promise<void> | void | ||
105 | |||
106 | async update (options: { | ||
107 | runnerJob: MRunnerJob | ||
108 | progress?: number | ||
109 | updatePayload?: U | ||
110 | }) { | ||
111 | const { runnerJob, progress } = options | ||
112 | |||
113 | await this.specificUpdate(options) | ||
114 | |||
115 | if (progress) runnerJob.progress = progress | ||
116 | |||
117 | if (!runnerJob.changed()) { | ||
118 | try { | ||
119 | await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id }) | ||
120 | } catch (err) { | ||
121 | logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
122 | } | ||
123 | |||
124 | return | ||
125 | } | ||
126 | |||
127 | await saveInTransactionWithRetries(runnerJob) | ||
128 | } | ||
129 | |||
130 | // --------------------------------------------------------------------------- | ||
131 | |||
132 | async complete (options: { | ||
133 | runnerJob: MRunnerJob | ||
134 | resultPayload: S | ||
135 | }) { | ||
136 | const { runnerJob } = options | ||
137 | |||
138 | runnerJob.state = RunnerJobState.COMPLETING | ||
139 | await saveInTransactionWithRetries(runnerJob) | ||
140 | |||
141 | try { | ||
142 | await this.specificComplete(options) | ||
143 | |||
144 | runnerJob.state = RunnerJobState.COMPLETED | ||
145 | } catch (err) { | ||
146 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
147 | |||
148 | runnerJob.state = RunnerJobState.ERRORED | ||
149 | runnerJob.error = err.message | ||
150 | } | ||
151 | |||
152 | runnerJob.progress = null | ||
153 | runnerJob.finishedAt = new Date() | ||
154 | |||
155 | await saveInTransactionWithRetries(runnerJob) | ||
156 | |||
157 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | ||
158 | |||
159 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
160 | } | ||
161 | |||
162 | protected abstract specificComplete (options: { | ||
163 | runnerJob: MRunnerJob | ||
164 | resultPayload: S | ||
165 | }): Promise<void> | void | ||
166 | |||
167 | // --------------------------------------------------------------------------- | ||
168 | |||
169 | async cancel (options: { | ||
170 | runnerJob: MRunnerJob | ||
171 | fromParent?: boolean | ||
172 | }) { | ||
173 | const { runnerJob, fromParent } = options | ||
174 | |||
175 | await this.specificCancel(options) | ||
176 | |||
177 | const cancelState = fromParent | ||
178 | ? RunnerJobState.PARENT_CANCELLED | ||
179 | : RunnerJobState.CANCELLED | ||
180 | |||
181 | runnerJob.setToErrorOrCancel(cancelState) | ||
182 | |||
183 | await saveInTransactionWithRetries(runnerJob) | ||
184 | |||
185 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
186 | for (const child of children) { | ||
187 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | ||
188 | |||
189 | await this.cancel({ runnerJob: child, fromParent: true }) | ||
190 | } | ||
191 | } | ||
192 | |||
193 | protected abstract specificCancel (options: { | ||
194 | runnerJob: MRunnerJob | ||
195 | }): Promise<void> | void | ||
196 | |||
197 | // --------------------------------------------------------------------------- | ||
198 | |||
199 | protected abstract isAbortSupported (): boolean | ||
200 | |||
201 | async abort (options: { | ||
202 | runnerJob: MRunnerJob | ||
203 | }) { | ||
204 | const { runnerJob } = options | ||
205 | |||
206 | if (this.isAbortSupported() !== true) { | ||
207 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | ||
208 | } | ||
209 | |||
210 | await this.specificAbort(options) | ||
211 | |||
212 | runnerJob.resetToPending() | ||
213 | |||
214 | await saveInTransactionWithRetries(runnerJob) | ||
215 | } | ||
216 | |||
217 | protected setAbortState (runnerJob: MRunnerJob) { | ||
218 | runnerJob.resetToPending() | ||
219 | } | ||
220 | |||
221 | protected abstract specificAbort (options: { | ||
222 | runnerJob: MRunnerJob | ||
223 | }): Promise<void> | void | ||
224 | |||
225 | // --------------------------------------------------------------------------- | ||
226 | |||
227 | async error (options: { | ||
228 | runnerJob: MRunnerJob | ||
229 | message: string | ||
230 | fromParent?: boolean | ||
231 | }) { | ||
232 | const { runnerJob, message, fromParent } = options | ||
233 | |||
234 | const errorState = fromParent | ||
235 | ? RunnerJobState.PARENT_ERRORED | ||
236 | : RunnerJobState.ERRORED | ||
237 | |||
238 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | ||
239 | ? RunnerJobState.PENDING | ||
240 | : errorState | ||
241 | |||
242 | await this.specificError({ ...options, nextState }) | ||
243 | |||
244 | if (nextState === errorState) { | ||
245 | runnerJob.setToErrorOrCancel(nextState) | ||
246 | runnerJob.error = message | ||
247 | } else { | ||
248 | runnerJob.resetToPending() | ||
249 | } | ||
250 | |||
251 | await saveInTransactionWithRetries(runnerJob) | ||
252 | |||
253 | if (runnerJob.state === errorState) { | ||
254 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
255 | |||
256 | for (const child of children) { | ||
257 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | ||
258 | |||
259 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | ||
260 | } | ||
261 | } | ||
262 | } | ||
263 | |||
264 | protected abstract specificError (options: { | ||
265 | runnerJob: MRunnerJob | ||
266 | message: string | ||
267 | nextState: RunnerJobState | ||
268 | }): Promise<void> | void | ||
269 | } | ||