diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/runners/job-handlers/abstract-job-handler.ts | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/runners/job-handlers/abstract-job-handler.ts')
-rw-r--r-- | server/lib/runners/job-handlers/abstract-job-handler.ts | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts new file mode 100644 index 000000000..73fc14574 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts | |||
@@ -0,0 +1,271 @@ | |||
1 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
3 | import { RUNNER_JOBS } from '@server/initializers/constants' | ||
4 | import { sequelizeTypescript } from '@server/initializers/database' | ||
5 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | ||
6 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
7 | import { setAsUpdated } from '@server/models/shared' | ||
8 | import { MRunnerJob } from '@server/types/models/runners' | ||
9 | import { pick } from '@shared/core-utils' | ||
10 | import { | ||
11 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
12 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
13 | RunnerJobState, | ||
14 | RunnerJobSuccessPayload, | ||
15 | RunnerJobType, | ||
16 | RunnerJobUpdatePayload, | ||
17 | RunnerJobVODAudioMergeTranscodingPayload, | ||
18 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | ||
19 | RunnerJobVODHLSTranscodingPayload, | ||
20 | RunnerJobVODHLSTranscodingPrivatePayload, | ||
21 | RunnerJobVODWebVideoTranscodingPayload, | ||
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | ||
23 | } from '@shared/models' | ||
24 | |||
25 | type CreateRunnerJobArg = | ||
26 | { | ||
27 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | ||
28 | payload: RunnerJobVODWebVideoTranscodingPayload | ||
29 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | ||
30 | } | | ||
31 | { | ||
32 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | ||
33 | payload: RunnerJobVODHLSTranscodingPayload | ||
34 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | ||
35 | } | | ||
36 | { | ||
37 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | ||
38 | payload: RunnerJobVODAudioMergeTranscodingPayload | ||
39 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | ||
40 | } | | ||
41 | { | ||
42 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | ||
43 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | ||
44 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
45 | } | ||
46 | |||
47 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | ||
48 | |||
49 | protected readonly lTags = loggerTagsFactory('runner') | ||
50 | |||
51 | // --------------------------------------------------------------------------- | ||
52 | |||
53 | abstract create (options: C): Promise<MRunnerJob> | ||
54 | |||
55 | protected async createRunnerJob (options: CreateRunnerJobArg & { | ||
56 | jobUUID: string | ||
57 | priority: number | ||
58 | dependsOnRunnerJob?: MRunnerJob | ||
59 | }): Promise<MRunnerJob> { | ||
60 | const { priority, dependsOnRunnerJob } = options | ||
61 | |||
62 | const runnerJob = new RunnerJobModel({ | ||
63 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | ||
64 | |||
65 | uuid: options.jobUUID, | ||
66 | |||
67 | state: dependsOnRunnerJob | ||
68 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | ||
69 | : RunnerJobState.PENDING, | ||
70 | |||
71 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | ||
72 | |||
73 | priority | ||
74 | }) | ||
75 | |||
76 | const job = await sequelizeTypescript.transaction(async transaction => { | ||
77 | return runnerJob.save({ transaction }) | ||
78 | }) | ||
79 | |||
80 | if (runnerJob.state === RunnerJobState.PENDING) { | ||
81 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
82 | } | ||
83 | |||
84 | return job | ||
85 | } | ||
86 | |||
87 | // --------------------------------------------------------------------------- | ||
88 | |||
89 | protected abstract specificUpdate (options: { | ||
90 | runnerJob: MRunnerJob | ||
91 | updatePayload?: U | ||
92 | }): Promise<void> | void | ||
93 | |||
94 | async update (options: { | ||
95 | runnerJob: MRunnerJob | ||
96 | progress?: number | ||
97 | updatePayload?: U | ||
98 | }) { | ||
99 | const { runnerJob, progress } = options | ||
100 | |||
101 | await this.specificUpdate(options) | ||
102 | |||
103 | if (progress) runnerJob.progress = progress | ||
104 | |||
105 | await retryTransactionWrapper(() => { | ||
106 | return sequelizeTypescript.transaction(async transaction => { | ||
107 | if (runnerJob.changed()) { | ||
108 | return runnerJob.save({ transaction }) | ||
109 | } | ||
110 | |||
111 | // Don't update the job too often | ||
112 | if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { | ||
113 | await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) | ||
114 | } | ||
115 | }) | ||
116 | }) | ||
117 | } | ||
118 | |||
119 | // --------------------------------------------------------------------------- | ||
120 | |||
121 | async complete (options: { | ||
122 | runnerJob: MRunnerJob | ||
123 | resultPayload: S | ||
124 | }) { | ||
125 | const { runnerJob } = options | ||
126 | |||
127 | try { | ||
128 | await this.specificComplete(options) | ||
129 | |||
130 | runnerJob.state = RunnerJobState.COMPLETED | ||
131 | } catch (err) { | ||
132 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
133 | |||
134 | runnerJob.state = RunnerJobState.ERRORED | ||
135 | runnerJob.error = err.message | ||
136 | } | ||
137 | |||
138 | runnerJob.progress = null | ||
139 | runnerJob.finishedAt = new Date() | ||
140 | |||
141 | await retryTransactionWrapper(() => { | ||
142 | return sequelizeTypescript.transaction(async transaction => { | ||
143 | await runnerJob.save({ transaction }) | ||
144 | }) | ||
145 | }) | ||
146 | |||
147 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | ||
148 | |||
149 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
150 | } | ||
151 | |||
152 | protected abstract specificComplete (options: { | ||
153 | runnerJob: MRunnerJob | ||
154 | resultPayload: S | ||
155 | }): Promise<void> | void | ||
156 | |||
157 | // --------------------------------------------------------------------------- | ||
158 | |||
159 | async cancel (options: { | ||
160 | runnerJob: MRunnerJob | ||
161 | fromParent?: boolean | ||
162 | }) { | ||
163 | const { runnerJob, fromParent } = options | ||
164 | |||
165 | await this.specificCancel(options) | ||
166 | |||
167 | const cancelState = fromParent | ||
168 | ? RunnerJobState.PARENT_CANCELLED | ||
169 | : RunnerJobState.CANCELLED | ||
170 | |||
171 | runnerJob.setToErrorOrCancel(cancelState) | ||
172 | |||
173 | await retryTransactionWrapper(() => { | ||
174 | return sequelizeTypescript.transaction(async transaction => { | ||
175 | await runnerJob.save({ transaction }) | ||
176 | }) | ||
177 | }) | ||
178 | |||
179 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
180 | for (const child of children) { | ||
181 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | ||
182 | |||
183 | await this.cancel({ runnerJob: child, fromParent: true }) | ||
184 | } | ||
185 | } | ||
186 | |||
187 | protected abstract specificCancel (options: { | ||
188 | runnerJob: MRunnerJob | ||
189 | }): Promise<void> | void | ||
190 | |||
191 | // --------------------------------------------------------------------------- | ||
192 | |||
193 | protected abstract isAbortSupported (): boolean | ||
194 | |||
195 | async abort (options: { | ||
196 | runnerJob: MRunnerJob | ||
197 | }) { | ||
198 | const { runnerJob } = options | ||
199 | |||
200 | if (this.isAbortSupported() !== true) { | ||
201 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | ||
202 | } | ||
203 | |||
204 | await this.specificAbort(options) | ||
205 | |||
206 | runnerJob.resetToPending() | ||
207 | |||
208 | await retryTransactionWrapper(() => { | ||
209 | return sequelizeTypescript.transaction(async transaction => { | ||
210 | await runnerJob.save({ transaction }) | ||
211 | }) | ||
212 | }) | ||
213 | } | ||
214 | |||
215 | protected setAbortState (runnerJob: MRunnerJob) { | ||
216 | runnerJob.resetToPending() | ||
217 | } | ||
218 | |||
219 | protected abstract specificAbort (options: { | ||
220 | runnerJob: MRunnerJob | ||
221 | }): Promise<void> | void | ||
222 | |||
223 | // --------------------------------------------------------------------------- | ||
224 | |||
225 | async error (options: { | ||
226 | runnerJob: MRunnerJob | ||
227 | message: string | ||
228 | fromParent?: boolean | ||
229 | }) { | ||
230 | const { runnerJob, message, fromParent } = options | ||
231 | |||
232 | const errorState = fromParent | ||
233 | ? RunnerJobState.PARENT_ERRORED | ||
234 | : RunnerJobState.ERRORED | ||
235 | |||
236 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | ||
237 | ? RunnerJobState.PENDING | ||
238 | : errorState | ||
239 | |||
240 | await this.specificError({ ...options, nextState }) | ||
241 | |||
242 | if (nextState === errorState) { | ||
243 | runnerJob.setToErrorOrCancel(nextState) | ||
244 | runnerJob.error = message | ||
245 | } else { | ||
246 | runnerJob.resetToPending() | ||
247 | } | ||
248 | |||
249 | await retryTransactionWrapper(() => { | ||
250 | return sequelizeTypescript.transaction(async transaction => { | ||
251 | await runnerJob.save({ transaction }) | ||
252 | }) | ||
253 | }) | ||
254 | |||
255 | if (runnerJob.state === errorState) { | ||
256 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
257 | |||
258 | for (const child of children) { | ||
259 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | ||
260 | |||
261 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | ||
262 | } | ||
263 | } | ||
264 | } | ||
265 | |||
266 | protected abstract specificError (options: { | ||
267 | runnerJob: MRunnerJob | ||
268 | message: string | ||
269 | nextState: RunnerJobState | ||
270 | }): Promise<void> | void | ||
271 | } | ||