]>
Commit | Line | Data |
---|---|---|
5e47f6ab | 1 | import { throttle } from 'lodash' |
c0b5a5eb | 2 | import { saveInTransactionWithRetries } from '@server/helpers/database-utils' |
0c9668f7 C |
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
4 | import { RUNNER_JOBS } from '@server/initializers/constants' | |
5 | import { sequelizeTypescript } from '@server/initializers/database' | |
6 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | |
7 | import { RunnerJobModel } from '@server/models/runner/runner-job' | |
8 | import { setAsUpdated } from '@server/models/shared' | |
9 | import { MRunnerJob } from '@server/types/models/runners' | |
10 | import { pick } from '@shared/core-utils' | |
11 | import { | |
12 | RunnerJobLiveRTMPHLSTranscodingPayload, | |
13 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | |
14 | RunnerJobState, | |
472170b4 | 15 | RunnerJobStudioTranscodingPayload, |
0c9668f7 C |
16 | RunnerJobSuccessPayload, |
17 | RunnerJobType, | |
18 | RunnerJobUpdatePayload, | |
ab14f0e0 | 19 | RunnerJobVideoStudioTranscodingPrivatePayload, |
0c9668f7 C |
20 | RunnerJobVODAudioMergeTranscodingPayload, |
21 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | |
22 | RunnerJobVODHLSTranscodingPayload, | |
23 | RunnerJobVODHLSTranscodingPrivatePayload, | |
24 | RunnerJobVODWebVideoTranscodingPayload, | |
25 | RunnerJobVODWebVideoTranscodingPrivatePayload | |
26 | } from '@shared/models' | |
27 | ||
28 | type CreateRunnerJobArg = | |
29 | { | |
30 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | |
31 | payload: RunnerJobVODWebVideoTranscodingPayload | |
32 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | |
33 | } | | |
34 | { | |
35 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | |
36 | payload: RunnerJobVODHLSTranscodingPayload | |
37 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | |
38 | } | | |
39 | { | |
40 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | |
41 | payload: RunnerJobVODAudioMergeTranscodingPayload | |
42 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | |
43 | } | | |
44 | { | |
45 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | |
46 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | |
47 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | |
5e47f6ab C |
48 | } | |
49 | { | |
ab14f0e0 C |
50 | type: Extract<RunnerJobType, 'video-studio-transcoding'> |
51 | payload: RunnerJobStudioTranscodingPayload | |
52 | privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload | |
0c9668f7 C |
53 | } |
54 | ||
55 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | |
56 | ||
57 | protected readonly lTags = loggerTagsFactory('runner') | |
58 | ||
3a0c2a77 C |
59 | static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000) |
60 | ||
0c9668f7 C |
61 | // --------------------------------------------------------------------------- |
62 | ||
63 | abstract create (options: C): Promise<MRunnerJob> | |
64 | ||
65 | protected async createRunnerJob (options: CreateRunnerJobArg & { | |
66 | jobUUID: string | |
67 | priority: number | |
68 | dependsOnRunnerJob?: MRunnerJob | |
69 | }): Promise<MRunnerJob> { | |
70 | const { priority, dependsOnRunnerJob } = options | |
71 | ||
5e47f6ab C |
72 | logger.debug('Creating runner job', { options, ...this.lTags(options.type) }) |
73 | ||
0c9668f7 C |
74 | const runnerJob = new RunnerJobModel({ |
75 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | |
76 | ||
77 | uuid: options.jobUUID, | |
78 | ||
79 | state: dependsOnRunnerJob | |
80 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | |
81 | : RunnerJobState.PENDING, | |
82 | ||
83 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | |
84 | ||
85 | priority | |
86 | }) | |
87 | ||
88 | const job = await sequelizeTypescript.transaction(async transaction => { | |
89 | return runnerJob.save({ transaction }) | |
90 | }) | |
91 | ||
92 | if (runnerJob.state === RunnerJobState.PENDING) { | |
93 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | |
94 | } | |
95 | ||
96 | return job | |
97 | } | |
98 | ||
99 | // --------------------------------------------------------------------------- | |
100 | ||
101 | protected abstract specificUpdate (options: { | |
102 | runnerJob: MRunnerJob | |
103 | updatePayload?: U | |
104 | }): Promise<void> | void | |
105 | ||
106 | async update (options: { | |
107 | runnerJob: MRunnerJob | |
108 | progress?: number | |
109 | updatePayload?: U | |
110 | }) { | |
111 | const { runnerJob, progress } = options | |
112 | ||
113 | await this.specificUpdate(options) | |
114 | ||
115 | if (progress) runnerJob.progress = progress | |
116 | ||
3a0c2a77 C |
117 | if (!runnerJob.changed()) { |
118 | try { | |
119 | await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id }) | |
120 | } catch (err) { | |
121 | logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | |
122 | } | |
123 | ||
124 | return | |
125 | } | |
126 | ||
c0b5a5eb | 127 | await saveInTransactionWithRetries(runnerJob) |
0c9668f7 C |
128 | } |
129 | ||
130 | // --------------------------------------------------------------------------- | |
131 | ||
132 | async complete (options: { | |
133 | runnerJob: MRunnerJob | |
134 | resultPayload: S | |
135 | }) { | |
136 | const { runnerJob } = options | |
137 | ||
472170b4 C |
138 | runnerJob.state = RunnerJobState.COMPLETING |
139 | await saveInTransactionWithRetries(runnerJob) | |
140 | ||
0c9668f7 C |
141 | try { |
142 | await this.specificComplete(options) | |
143 | ||
144 | runnerJob.state = RunnerJobState.COMPLETED | |
145 | } catch (err) { | |
146 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | |
147 | ||
148 | runnerJob.state = RunnerJobState.ERRORED | |
149 | runnerJob.error = err.message | |
150 | } | |
151 | ||
152 | runnerJob.progress = null | |
153 | runnerJob.finishedAt = new Date() | |
154 | ||
472170b4 | 155 | await saveInTransactionWithRetries(runnerJob) |
0c9668f7 C |
156 | |
157 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | |
158 | ||
159 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | |
160 | } | |
161 | ||
162 | protected abstract specificComplete (options: { | |
163 | runnerJob: MRunnerJob | |
164 | resultPayload: S | |
165 | }): Promise<void> | void | |
166 | ||
167 | // --------------------------------------------------------------------------- | |
168 | ||
169 | async cancel (options: { | |
170 | runnerJob: MRunnerJob | |
171 | fromParent?: boolean | |
172 | }) { | |
173 | const { runnerJob, fromParent } = options | |
174 | ||
175 | await this.specificCancel(options) | |
176 | ||
177 | const cancelState = fromParent | |
178 | ? RunnerJobState.PARENT_CANCELLED | |
179 | : RunnerJobState.CANCELLED | |
180 | ||
181 | runnerJob.setToErrorOrCancel(cancelState) | |
182 | ||
c0b5a5eb | 183 | await saveInTransactionWithRetries(runnerJob) |
0c9668f7 C |
184 | |
185 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | |
186 | for (const child of children) { | |
187 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | |
188 | ||
189 | await this.cancel({ runnerJob: child, fromParent: true }) | |
190 | } | |
191 | } | |
192 | ||
193 | protected abstract specificCancel (options: { | |
194 | runnerJob: MRunnerJob | |
195 | }): Promise<void> | void | |
196 | ||
197 | // --------------------------------------------------------------------------- | |
198 | ||
199 | protected abstract isAbortSupported (): boolean | |
200 | ||
201 | async abort (options: { | |
202 | runnerJob: MRunnerJob | |
203 | }) { | |
204 | const { runnerJob } = options | |
205 | ||
206 | if (this.isAbortSupported() !== true) { | |
207 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | |
208 | } | |
209 | ||
210 | await this.specificAbort(options) | |
211 | ||
212 | runnerJob.resetToPending() | |
213 | ||
c0b5a5eb | 214 | await saveInTransactionWithRetries(runnerJob) |
0c9668f7 C |
215 | } |
216 | ||
217 | protected setAbortState (runnerJob: MRunnerJob) { | |
218 | runnerJob.resetToPending() | |
219 | } | |
220 | ||
221 | protected abstract specificAbort (options: { | |
222 | runnerJob: MRunnerJob | |
223 | }): Promise<void> | void | |
224 | ||
225 | // --------------------------------------------------------------------------- | |
226 | ||
227 | async error (options: { | |
228 | runnerJob: MRunnerJob | |
229 | message: string | |
230 | fromParent?: boolean | |
231 | }) { | |
232 | const { runnerJob, message, fromParent } = options | |
233 | ||
234 | const errorState = fromParent | |
235 | ? RunnerJobState.PARENT_ERRORED | |
236 | : RunnerJobState.ERRORED | |
237 | ||
238 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | |
239 | ? RunnerJobState.PENDING | |
240 | : errorState | |
241 | ||
242 | await this.specificError({ ...options, nextState }) | |
243 | ||
244 | if (nextState === errorState) { | |
245 | runnerJob.setToErrorOrCancel(nextState) | |
246 | runnerJob.error = message | |
247 | } else { | |
248 | runnerJob.resetToPending() | |
249 | } | |
250 | ||
c0b5a5eb | 251 | await saveInTransactionWithRetries(runnerJob) |
0c9668f7 C |
252 | |
253 | if (runnerJob.state === errorState) { | |
254 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | |
255 | ||
256 | for (const child of children) { | |
257 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | |
258 | ||
259 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | |
260 | } | |
261 | } | |
262 | } | |
263 | ||
264 | protected abstract specificError (options: { | |
265 | runnerJob: MRunnerJob | |
266 | message: string | |
267 | nextState: RunnerJobState | |
268 | }): Promise<void> | void | |
269 | } |