diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 15:00:01 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | d102de1b38f2877463529c3b27bd35ffef4fd8bf (patch) | |
tree | 31fa0bdf26ad7a2ee46d600d804a6f03260266c8 /shared/server-commands/runners/runner-jobs-command.ts | |
parent | 2fe978744e5b74eb824e4d79c1bb9b840169f125 (diff) | |
download | PeerTube-d102de1b38f2877463529c3b27bd35ffef4fd8bf.tar.gz PeerTube-d102de1b38f2877463529c3b27bd35ffef4fd8bf.tar.zst PeerTube-d102de1b38f2877463529c3b27bd35ffef4fd8bf.zip |
Add runner server tests
Diffstat (limited to 'shared/server-commands/runners/runner-jobs-command.ts')
-rw-r--r-- | shared/server-commands/runners/runner-jobs-command.ts | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/shared/server-commands/runners/runner-jobs-command.ts b/shared/server-commands/runners/runner-jobs-command.ts new file mode 100644 index 000000000..3b0f84b9d --- /dev/null +++ b/shared/server-commands/runners/runner-jobs-command.ts | |||
@@ -0,0 +1,279 @@ | |||
1 | import { omit, pick, wait } from '@shared/core-utils' | ||
2 | import { | ||
3 | AbortRunnerJobBody, | ||
4 | AcceptRunnerJobBody, | ||
5 | AcceptRunnerJobResult, | ||
6 | ErrorRunnerJobBody, | ||
7 | HttpStatusCode, | ||
8 | isHLSTranscodingPayloadSuccess, | ||
9 | isLiveRTMPHLSTranscodingUpdatePayload, | ||
10 | isWebVideoOrAudioMergeTranscodingPayloadSuccess, | ||
11 | RequestRunnerJobBody, | ||
12 | RequestRunnerJobResult, | ||
13 | ResultList, | ||
14 | RunnerJobAdmin, | ||
15 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
16 | RunnerJobPayload, | ||
17 | RunnerJobState, | ||
18 | RunnerJobSuccessBody, | ||
19 | RunnerJobSuccessPayload, | ||
20 | RunnerJobType, | ||
21 | RunnerJobUpdateBody, | ||
22 | RunnerJobVODPayload | ||
23 | } from '@shared/models' | ||
24 | import { unwrapBody } from '../requests' | ||
25 | import { waitJobs } from '../server' | ||
26 | import { AbstractCommand, OverrideCommandOptions } from '../shared' | ||
27 | |||
28 | export class RunnerJobsCommand extends AbstractCommand { | ||
29 | |||
30 | list (options: OverrideCommandOptions & { | ||
31 | start?: number | ||
32 | count?: number | ||
33 | sort?: string | ||
34 | search?: string | ||
35 | } = {}) { | ||
36 | const path = '/api/v1/runners/jobs' | ||
37 | |||
38 | return this.getRequestBody<ResultList<RunnerJobAdmin>>({ | ||
39 | ...options, | ||
40 | |||
41 | path, | ||
42 | query: pick(options, [ 'start', 'count', 'sort', 'search' ]), | ||
43 | implicitToken: true, | ||
44 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
45 | }) | ||
46 | } | ||
47 | |||
48 | cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) { | ||
49 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel' | ||
50 | |||
51 | return this.postBodyRequest({ | ||
52 | ...options, | ||
53 | |||
54 | path, | ||
55 | implicitToken: true, | ||
56 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
57 | }) | ||
58 | } | ||
59 | |||
60 | // --------------------------------------------------------------------------- | ||
61 | |||
62 | request (options: OverrideCommandOptions & RequestRunnerJobBody) { | ||
63 | const path = '/api/v1/runners/jobs/request' | ||
64 | |||
65 | return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({ | ||
66 | ...options, | ||
67 | |||
68 | path, | ||
69 | fields: pick(options, [ 'runnerToken' ]), | ||
70 | implicitToken: false, | ||
71 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
72 | })) | ||
73 | } | ||
74 | |||
75 | async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) { | ||
76 | const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]) | ||
77 | |||
78 | const { availableJobs } = await this.request(options) | ||
79 | |||
80 | return { | ||
81 | availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) | ||
82 | } as RequestRunnerJobResult<RunnerJobVODPayload> | ||
83 | } | ||
84 | |||
85 | async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) { | ||
86 | const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ]) | ||
87 | |||
88 | const { availableJobs } = await this.request(options) | ||
89 | |||
90 | return { | ||
91 | availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) | ||
92 | } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload> | ||
93 | } | ||
94 | |||
95 | // --------------------------------------------------------------------------- | ||
96 | |||
97 | accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) { | ||
98 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept' | ||
99 | |||
100 | return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({ | ||
101 | ...options, | ||
102 | |||
103 | path, | ||
104 | fields: pick(options, [ 'runnerToken' ]), | ||
105 | implicitToken: false, | ||
106 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
107 | })) | ||
108 | } | ||
109 | |||
110 | abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) { | ||
111 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort' | ||
112 | |||
113 | return this.postBodyRequest({ | ||
114 | ...options, | ||
115 | |||
116 | path, | ||
117 | fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]), | ||
118 | implicitToken: false, | ||
119 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
120 | }) | ||
121 | } | ||
122 | |||
123 | update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) { | ||
124 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update' | ||
125 | |||
126 | const { payload } = options | ||
127 | const attaches: { [id: string]: any } = {} | ||
128 | let payloadWithoutFiles = payload | ||
129 | |||
130 | if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) { | ||
131 | if (payload.masterPlaylistFile) { | ||
132 | attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile | ||
133 | } | ||
134 | |||
135 | attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile | ||
136 | attaches[`payload[videoChunkFile]`] = payload.videoChunkFile | ||
137 | |||
138 | payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ]) | ||
139 | } | ||
140 | |||
141 | return this.postUploadRequest({ | ||
142 | ...options, | ||
143 | |||
144 | path, | ||
145 | fields: { | ||
146 | ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]), | ||
147 | |||
148 | payload: payloadWithoutFiles | ||
149 | }, | ||
150 | attaches, | ||
151 | implicitToken: false, | ||
152 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
153 | }) | ||
154 | } | ||
155 | |||
156 | error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) { | ||
157 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error' | ||
158 | |||
159 | return this.postBodyRequest({ | ||
160 | ...options, | ||
161 | |||
162 | path, | ||
163 | fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]), | ||
164 | implicitToken: false, | ||
165 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
166 | }) | ||
167 | } | ||
168 | |||
169 | success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) { | ||
170 | const { payload } = options | ||
171 | |||
172 | const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success' | ||
173 | const attaches: { [id: string]: any } = {} | ||
174 | let payloadWithoutFiles = payload | ||
175 | |||
176 | if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) { | ||
177 | attaches[`payload[videoFile]`] = payload.videoFile | ||
178 | |||
179 | payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ]) | ||
180 | } | ||
181 | |||
182 | if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) { | ||
183 | attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile | ||
184 | |||
185 | payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ]) | ||
186 | } | ||
187 | |||
188 | return this.postUploadRequest({ | ||
189 | ...options, | ||
190 | |||
191 | path, | ||
192 | attaches, | ||
193 | fields: { | ||
194 | ...pick(options, [ 'jobToken', 'runnerToken' ]), | ||
195 | |||
196 | payload: payloadWithoutFiles | ||
197 | }, | ||
198 | implicitToken: false, | ||
199 | defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 | ||
200 | }) | ||
201 | } | ||
202 | |||
203 | getInputFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) { | ||
204 | const { host, protocol, pathname } = new URL(options.url) | ||
205 | |||
206 | return this.postBodyRequest({ | ||
207 | url: `${protocol}//${host}`, | ||
208 | path: pathname, | ||
209 | |||
210 | fields: pick(options, [ 'jobToken', 'runnerToken' ]), | ||
211 | implicitToken: false, | ||
212 | defaultExpectedStatus: HttpStatusCode.OK_200 | ||
213 | }) | ||
214 | } | ||
215 | |||
216 | // --------------------------------------------------------------------------- | ||
217 | |||
218 | async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) { | ||
219 | const { availableJobs } = await this.request(options) | ||
220 | |||
221 | const job = options.type | ||
222 | ? availableJobs.find(j => j.type === options.type) | ||
223 | : availableJobs[0] | ||
224 | |||
225 | return this.accept({ ...options, jobUUID: job.uuid }) | ||
226 | } | ||
227 | |||
228 | async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) { | ||
229 | let jobUUID = jobUUIDToProcess | ||
230 | |||
231 | if (!jobUUID) { | ||
232 | const { availableJobs } = await this.request({ runnerToken }) | ||
233 | jobUUID = availableJobs[0].uuid | ||
234 | } | ||
235 | |||
236 | const { job } = await this.accept({ runnerToken, jobUUID }) | ||
237 | const jobToken = job.jobToken | ||
238 | |||
239 | const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' } | ||
240 | await this.success({ runnerToken, jobUUID, jobToken, payload }) | ||
241 | |||
242 | await waitJobs([ this.server ]) | ||
243 | |||
244 | return job | ||
245 | } | ||
246 | |||
247 | async cancelAllJobs (options: { state?: RunnerJobState } = {}) { | ||
248 | const { state } = options | ||
249 | |||
250 | const { data } = await this.list({ count: 100 }) | ||
251 | |||
252 | for (const job of data) { | ||
253 | if (state && job.state.id !== state) continue | ||
254 | |||
255 | await this.cancelByAdmin({ jobUUID: job.uuid }) | ||
256 | } | ||
257 | } | ||
258 | |||
259 | async getJob (options: OverrideCommandOptions & { uuid: string }) { | ||
260 | const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' }) | ||
261 | |||
262 | return data.find(j => j.uuid === options.uuid) | ||
263 | } | ||
264 | |||
265 | async requestLiveJob (runnerToken: string) { | ||
266 | let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = [] | ||
267 | |||
268 | while (availableJobs.length === 0) { | ||
269 | const result = await this.requestLive({ runnerToken }) | ||
270 | availableJobs = result.availableJobs | ||
271 | |||
272 | if (availableJobs.length === 1) break | ||
273 | |||
274 | await wait(150) | ||
275 | } | ||
276 | |||
277 | return availableJobs[0] | ||
278 | } | ||
279 | } | ||