diff options
author | Chocobozzz <me@florianbigard.com> | 2023-08-17 08:59:21 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-08-17 08:59:21 +0200 |
commit | c380e3928517eb5311b38cf257816642617d7a33 (patch) | |
tree | 2ea9b70ebca16b5d109bcce98fe7f944dad89319 /packages/server-commands/src/runners/runner-jobs-command.ts | |
parent | a8ca6190fb462bf6eb5685cfc1d8ae444164a487 (diff) | |
parent | 3a4992633ee62d5edfbb484d9c6bcb3cf158489d (diff) | |
download | PeerTube-c380e3928517eb5311b38cf257816642617d7a33.tar.gz PeerTube-c380e3928517eb5311b38cf257816642617d7a33.tar.zst PeerTube-c380e3928517eb5311b38cf257816642617d7a33.zip |
Merge branch 'feature/esm-and-nx' into develop
Diffstat (limited to 'packages/server-commands/src/runners/runner-jobs-command.ts')
-rw-r--r-- | packages/server-commands/src/runners/runner-jobs-command.ts | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/packages/server-commands/src/runners/runner-jobs-command.ts b/packages/server-commands/src/runners/runner-jobs-command.ts new file mode 100644 index 000000000..4e702199f --- /dev/null +++ b/packages/server-commands/src/runners/runner-jobs-command.ts | |||
@@ -0,0 +1,297 @@ | |||
1 | import { omit, pick, wait } from '@peertube/peertube-core-utils' | ||
2 | import { | ||
3 | AbortRunnerJobBody, | ||
4 | AcceptRunnerJobBody, | ||
5 | AcceptRunnerJobResult, | ||
6 | ErrorRunnerJobBody, | ||
7 | HttpStatusCode, | ||
8 | isHLSTranscodingPayloadSuccess, | ||
9 | isLiveRTMPHLSTranscodingUpdatePayload, | ||
10 | isWebVideoOrAudioMergeTranscodingPayloadSuccess, | ||
11 | ListRunnerJobsQuery, | ||
12 | RequestRunnerJobBody, | ||
13 | RequestRunnerJobResult, | ||
14 | ResultList, | ||
15 | RunnerJobAdmin, | ||
16 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
17 | RunnerJobPayload, | ||
18 | RunnerJobState, | ||
19 | RunnerJobStateType, | ||
20 | RunnerJobSuccessBody, | ||
21 | RunnerJobSuccessPayload, | ||
22 | RunnerJobType, | ||
23 | RunnerJobUpdateBody, | ||
24 | RunnerJobVODPayload, | ||
25 | VODHLSTranscodingSuccess, | ||
26 | VODWebVideoTranscodingSuccess | ||
27 | } from '@peertube/peertube-models' | ||
28 | import { unwrapBody } from '../requests/index.js' | ||
29 | import { waitJobs } from '../server/jobs.js' | ||
30 | import { AbstractCommand, OverrideCommandOptions } from '../shared/index.js' | ||
31 | |||
32 | export class RunnerJobsCommand extends AbstractCommand { | ||
33 | |||
34 | list (options: OverrideCommandOptions & ListRunnerJobsQuery = {}) { | ||
35 | const path = '/api/v1/runners/jobs' | ||
36 | |||
37 | return this.getRequestBody<ResultList<RunnerJobAdmin>>({ | ||
38 | ...options, | ||
39 | |||
40 | path, | ||
41 | query: pick(options, [ 'start', 'count', 'sort', 'search', 'stateOneOf' ]), | ||
42 | implicitToken: true, | ||
43 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
44 | }) | ||
45 | } | ||
46 | |||
47 | cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) { | ||
48 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel' | ||
49 | |||
50 | return this.postBodyRequest({ | ||
51 | ...options, | ||
52 | |||
53 | path, | ||
54 | implicitToken: true, | ||
55 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
56 | }) | ||
57 | } | ||
58 | |||
59 | deleteByAdmin (options: OverrideCommandOptions & { jobUUID: string }) { | ||
60 | const path = '/api/v1/runners/jobs/' + options.jobUUID | ||
61 | |||
62 | return this.deleteRequest({ | ||
63 | ...options, | ||
64 | |||
65 | path, | ||
66 | implicitToken: true, | ||
67 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
68 | }) | ||
69 | } | ||
70 | |||
71 | // --------------------------------------------------------------------------- | ||
72 | |||
73 | request (options: OverrideCommandOptions & RequestRunnerJobBody) { | ||
74 | const path = '/api/v1/runners/jobs/request' | ||
75 | |||
76 | return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({ | ||
77 | ...options, | ||
78 | |||
79 | path, | ||
80 | fields: pick(options, [ 'runnerToken' ]), | ||
81 | implicitToken: false, | ||
82 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
83 | })) | ||
84 | } | ||
85 | |||
86 | async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) { | ||
87 | const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]) | ||
88 | |||
89 | const { availableJobs } = await this.request(options) | ||
90 | |||
91 | return { | ||
92 | availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) | ||
93 | } as RequestRunnerJobResult<RunnerJobVODPayload> | ||
94 | } | ||
95 | |||
96 | async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) { | ||
97 | const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ]) | ||
98 | |||
99 | const { availableJobs } = await this.request(options) | ||
100 | |||
101 | return { | ||
102 | availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) | ||
103 | } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload> | ||
104 | } | ||
105 | |||
106 | // --------------------------------------------------------------------------- | ||
107 | |||
108 | accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) { | ||
109 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept' | ||
110 | |||
111 | return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({ | ||
112 | ...options, | ||
113 | |||
114 | path, | ||
115 | fields: pick(options, [ 'runnerToken' ]), | ||
116 | implicitToken: false, | ||
117 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
118 | })) | ||
119 | } | ||
120 | |||
121 | abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) { | ||
122 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort' | ||
123 | |||
124 | return this.postBodyRequest({ | ||
125 | ...options, | ||
126 | |||
127 | path, | ||
128 | fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]), | ||
129 | implicitToken: false, | ||
130 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
131 | }) | ||
132 | } | ||
133 | |||
134 | update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) { | ||
135 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update' | ||
136 | |||
137 | const { payload } = options | ||
138 | const attaches: { [id: string]: any } = {} | ||
139 | let payloadWithoutFiles = payload | ||
140 | |||
141 | if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) { | ||
142 | if (payload.masterPlaylistFile) { | ||
143 | attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile | ||
144 | } | ||
145 | |||
146 | attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile | ||
147 | attaches[`payload[videoChunkFile]`] = payload.videoChunkFile | ||
148 | |||
149 | payloadWithoutFiles = omit(payloadWithoutFiles, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ]) | ||
150 | } | ||
151 | |||
152 | return this.postUploadRequest({ | ||
153 | ...options, | ||
154 | |||
155 | path, | ||
156 | fields: { | ||
157 | ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]), | ||
158 | |||
159 | payload: payloadWithoutFiles | ||
160 | }, | ||
161 | attaches, | ||
162 | implicitToken: false, | ||
163 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
164 | }) | ||
165 | } | ||
166 | |||
167 | error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) { | ||
168 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error' | ||
169 | |||
170 | return this.postBodyRequest({ | ||
171 | ...options, | ||
172 | |||
173 | path, | ||
174 | fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]), | ||
175 | implicitToken: false, | ||
176 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
177 | }) | ||
178 | } | ||
179 | |||
180 | success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) { | ||
181 | const { payload } = options | ||
182 | |||
183 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success' | ||
184 | const attaches: { [id: string]: any } = {} | ||
185 | let payloadWithoutFiles = payload | ||
186 | |||
187 | if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) { | ||
188 | attaches[`payload[videoFile]`] = payload.videoFile | ||
189 | |||
190 | payloadWithoutFiles = omit(payloadWithoutFiles as VODWebVideoTranscodingSuccess, [ 'videoFile' ]) | ||
191 | } | ||
192 | |||
193 | if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) { | ||
194 | attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile | ||
195 | |||
196 | payloadWithoutFiles = omit(payloadWithoutFiles as VODHLSTranscodingSuccess, [ 'resolutionPlaylistFile' ]) | ||
197 | } | ||
198 | |||
199 | return this.postUploadRequest({ | ||
200 | ...options, | ||
201 | |||
202 | path, | ||
203 | attaches, | ||
204 | fields: { | ||
205 | ...pick(options, [ 'jobToken', 'runnerToken' ]), | ||
206 | |||
207 | payload: payloadWithoutFiles | ||
208 | }, | ||
209 | implicitToken: false, | ||
210 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
211 | }) | ||
212 | } | ||
213 | |||
214 | getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) { | ||
215 | const { host, protocol, pathname } = new URL(options.url) | ||
216 | |||
217 | return this.postBodyRequest({ | ||
218 | url: `${protocol}//${host}`, | ||
219 | path: pathname, | ||
220 | |||
221 | fields: pick(options, [ 'jobToken', 'runnerToken' ]), | ||
222 | implicitToken: false, | ||
223 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
224 | }) | ||
225 | } | ||
226 | |||
227 | // --------------------------------------------------------------------------- | ||
228 | |||
229 | async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) { | ||
230 | const { availableJobs } = await this.request(options) | ||
231 | |||
232 | const job = options.type | ||
233 | ? availableJobs.find(j => j.type === options.type) | ||
234 | : availableJobs[0] | ||
235 | |||
236 | return this.accept({ ...options, jobUUID: job.uuid }) | ||
237 | } | ||
238 | |||
239 | async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) { | ||
240 | let jobUUID = jobUUIDToProcess | ||
241 | |||
242 | if (!jobUUID) { | ||
243 | const { availableJobs } = await this.request({ runnerToken }) | ||
244 | jobUUID = availableJobs[0].uuid | ||
245 | } | ||
246 | |||
247 | const { job } = await this.accept({ runnerToken, jobUUID }) | ||
248 | const jobToken = job.jobToken | ||
249 | |||
250 | const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' } | ||
251 | await this.success({ runnerToken, jobUUID, jobToken, payload }) | ||
252 | |||
253 | await waitJobs([ this.server ]) | ||
254 | |||
255 | return job | ||
256 | } | ||
257 | |||
258 | async cancelAllJobs (options: { state?: RunnerJobStateType } = {}) { | ||
259 | const { state } = options | ||
260 | |||
261 | const { data } = await this.list({ count: 100 }) | ||
262 | |||
263 | const allowedStates = new Set<RunnerJobStateType>([ | ||
264 | RunnerJobState.PENDING, | ||
265 | RunnerJobState.PROCESSING, | ||
266 | RunnerJobState.WAITING_FOR_PARENT_JOB | ||
267 | ]) | ||
268 | |||
269 | for (const job of data) { | ||
270 | if (state && job.state.id !== state) continue | ||
271 | else if (allowedStates.has(job.state.id) !== true) continue | ||
272 | |||
273 | await this.cancelByAdmin({ jobUUID: job.uuid }) | ||
274 | } | ||
275 | } | ||
276 | |||
277 | async getJob (options: OverrideCommandOptions & { uuid: string }) { | ||
278 | const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' }) | ||
279 | |||
280 | return data.find(j => j.uuid === options.uuid) | ||
281 | } | ||
282 | |||
283 | async requestLiveJob (runnerToken: string) { | ||
284 | let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = [] | ||
285 | |||
286 | while (availableJobs.length === 0) { | ||
287 | const result = await this.requestLive({ runnerToken }) | ||
288 | availableJobs = result.availableJobs | ||
289 | |||
290 | if (availableJobs.length === 1) break | ||
291 | |||
292 | await wait(150) | ||
293 | } | ||
294 | |||
295 | return availableJobs[0] | ||
296 | } | ||
297 | } | ||