aboutsummaryrefslogtreecommitdiffhomepage
path: root/shared/server-commands/runners
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 15:00:01 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commitd102de1b38f2877463529c3b27bd35ffef4fd8bf (patch)
tree31fa0bdf26ad7a2ee46d600d804a6f03260266c8 /shared/server-commands/runners
parent2fe978744e5b74eb824e4d79c1bb9b840169f125 (diff)
downloadPeerTube-d102de1b38f2877463529c3b27bd35ffef4fd8bf.tar.gz
PeerTube-d102de1b38f2877463529c3b27bd35ffef4fd8bf.tar.zst
PeerTube-d102de1b38f2877463529c3b27bd35ffef4fd8bf.zip
Add runner server tests
Diffstat (limited to 'shared/server-commands/runners')
-rw-r--r--shared/server-commands/runners/index.ts3
-rw-r--r--shared/server-commands/runners/runner-jobs-command.ts279
-rw-r--r--shared/server-commands/runners/runner-registration-tokens-command.ts55
-rw-r--r--shared/server-commands/runners/runners-command.ts77
4 files changed, 414 insertions, 0 deletions
diff --git a/shared/server-commands/runners/index.ts b/shared/server-commands/runners/index.ts
new file mode 100644
index 000000000..9e8e1baf2
--- /dev/null
+++ b/shared/server-commands/runners/index.ts
@@ -0,0 +1,3 @@
1export * from './runner-jobs-command'
2export * from './runner-registration-tokens-command'
3export * from './runners-command'
diff --git a/shared/server-commands/runners/runner-jobs-command.ts b/shared/server-commands/runners/runner-jobs-command.ts
new file mode 100644
index 000000000..3b0f84b9d
--- /dev/null
+++ b/shared/server-commands/runners/runner-jobs-command.ts
@@ -0,0 +1,279 @@
1import { omit, pick, wait } from '@shared/core-utils'
2import {
3 AbortRunnerJobBody,
4 AcceptRunnerJobBody,
5 AcceptRunnerJobResult,
6 ErrorRunnerJobBody,
7 HttpStatusCode,
8 isHLSTranscodingPayloadSuccess,
9 isLiveRTMPHLSTranscodingUpdatePayload,
10 isWebVideoOrAudioMergeTranscodingPayloadSuccess,
11 RequestRunnerJobBody,
12 RequestRunnerJobResult,
13 ResultList,
14 RunnerJobAdmin,
15 RunnerJobLiveRTMPHLSTranscodingPayload,
16 RunnerJobPayload,
17 RunnerJobState,
18 RunnerJobSuccessBody,
19 RunnerJobSuccessPayload,
20 RunnerJobType,
21 RunnerJobUpdateBody,
22 RunnerJobVODPayload
23} from '@shared/models'
24import { unwrapBody } from '../requests'
25import { waitJobs } from '../server'
26import { AbstractCommand, OverrideCommandOptions } from '../shared'
27
28export class RunnerJobsCommand extends AbstractCommand {
29
30 list (options: OverrideCommandOptions & {
31 start?: number
32 count?: number
33 sort?: string
34 search?: string
35 } = {}) {
36 const path = '/api/v1/runners/jobs'
37
38 return this.getRequestBody<ResultList<RunnerJobAdmin>>({
39 ...options,
40
41 path,
42 query: pick(options, [ 'start', 'count', 'sort', 'search' ]),
43 implicitToken: true,
44 defaultExpectedStatus: HttpStatusCode.OK_200
45 })
46 }
47
48 cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
49 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel'
50
51 return this.postBodyRequest({
52 ...options,
53
54 path,
55 implicitToken: true,
56 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
57 })
58 }
59
60 // ---------------------------------------------------------------------------
61
62 request (options: OverrideCommandOptions & RequestRunnerJobBody) {
63 const path = '/api/v1/runners/jobs/request'
64
65 return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({
66 ...options,
67
68 path,
69 fields: pick(options, [ 'runnerToken' ]),
70 implicitToken: false,
71 defaultExpectedStatus: HttpStatusCode.OK_200
72 }))
73 }
74
75 async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
76 const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
77
78 const { availableJobs } = await this.request(options)
79
80 return {
81 availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
82 } as RequestRunnerJobResult<RunnerJobVODPayload>
83 }
84
85 async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
86 const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
87
88 const { availableJobs } = await this.request(options)
89
90 return {
91 availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
92 } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
93 }
94
95 // ---------------------------------------------------------------------------
96
97 accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
98 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'
99
100 return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
101 ...options,
102
103 path,
104 fields: pick(options, [ 'runnerToken' ]),
105 implicitToken: false,
106 defaultExpectedStatus: HttpStatusCode.OK_200
107 }))
108 }
109
110 abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) {
111 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort'
112
113 return this.postBodyRequest({
114 ...options,
115
116 path,
117 fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]),
118 implicitToken: false,
119 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
120 })
121 }
122
123 update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
124 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
125
126 const { payload } = options
127 const attaches: { [id: string]: any } = {}
128 let payloadWithoutFiles = payload
129
130 if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
131 if (payload.masterPlaylistFile) {
132 attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
133 }
134
135 attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
136 attaches[`payload[videoChunkFile]`] = payload.videoChunkFile
137
138 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
139 }
140
141 return this.postUploadRequest({
142 ...options,
143
144 path,
145 fields: {
146 ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
147
148 payload: payloadWithoutFiles
149 },
150 attaches,
151 implicitToken: false,
152 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
153 })
154 }
155
156 error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
157 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
158
159 return this.postBodyRequest({
160 ...options,
161
162 path,
163 fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]),
164 implicitToken: false,
165 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
166 })
167 }
168
169 success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
170 const { payload } = options
171
172 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
173 const attaches: { [id: string]: any } = {}
174 let payloadWithoutFiles = payload
175
176 if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
177 attaches[`payload[videoFile]`] = payload.videoFile
178
179 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ])
180 }
181
182 if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
183 attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
184
185 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ])
186 }
187
188 return this.postUploadRequest({
189 ...options,
190
191 path,
192 attaches,
193 fields: {
194 ...pick(options, [ 'jobToken', 'runnerToken' ]),
195
196 payload: payloadWithoutFiles
197 },
198 implicitToken: false,
199 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
200 })
201 }
202
203 getInputFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
204 const { host, protocol, pathname } = new URL(options.url)
205
206 return this.postBodyRequest({
207 url: `${protocol}//${host}`,
208 path: pathname,
209
210 fields: pick(options, [ 'jobToken', 'runnerToken' ]),
211 implicitToken: false,
212 defaultExpectedStatus: HttpStatusCode.OK_200
213 })
214 }
215
216 // ---------------------------------------------------------------------------
217
218 async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) {
219 const { availableJobs } = await this.request(options)
220
221 const job = options.type
222 ? availableJobs.find(j => j.type === options.type)
223 : availableJobs[0]
224
225 return this.accept({ ...options, jobUUID: job.uuid })
226 }
227
228 async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) {
229 let jobUUID = jobUUIDToProcess
230
231 if (!jobUUID) {
232 const { availableJobs } = await this.request({ runnerToken })
233 jobUUID = availableJobs[0].uuid
234 }
235
236 const { job } = await this.accept({ runnerToken, jobUUID })
237 const jobToken = job.jobToken
238
239 const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
240 await this.success({ runnerToken, jobUUID, jobToken, payload })
241
242 await waitJobs([ this.server ])
243
244 return job
245 }
246
247 async cancelAllJobs (options: { state?: RunnerJobState } = {}) {
248 const { state } = options
249
250 const { data } = await this.list({ count: 100 })
251
252 for (const job of data) {
253 if (state && job.state.id !== state) continue
254
255 await this.cancelByAdmin({ jobUUID: job.uuid })
256 }
257 }
258
259 async getJob (options: OverrideCommandOptions & { uuid: string }) {
260 const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' })
261
262 return data.find(j => j.uuid === options.uuid)
263 }
264
265 async requestLiveJob (runnerToken: string) {
266 let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = []
267
268 while (availableJobs.length === 0) {
269 const result = await this.requestLive({ runnerToken })
270 availableJobs = result.availableJobs
271
272 if (availableJobs.length === 1) break
273
274 await wait(150)
275 }
276
277 return availableJobs[0]
278 }
279}
diff --git a/shared/server-commands/runners/runner-registration-tokens-command.ts b/shared/server-commands/runners/runner-registration-tokens-command.ts
new file mode 100644
index 000000000..e4f2e3d95
--- /dev/null
+++ b/shared/server-commands/runners/runner-registration-tokens-command.ts
@@ -0,0 +1,55 @@
1import { pick } from '@shared/core-utils'
2import { HttpStatusCode, ResultList, RunnerRegistrationToken } from '@shared/models'
3import { AbstractCommand, OverrideCommandOptions } from '../shared'
4
5export class RunnerRegistrationTokensCommand extends AbstractCommand {
6
7 list (options: OverrideCommandOptions & {
8 start?: number
9 count?: number
10 sort?: string
11 } = {}) {
12 const path = '/api/v1/runners/registration-tokens'
13
14 return this.getRequestBody<ResultList<RunnerRegistrationToken>>({
15 ...options,
16
17 path,
18 query: pick(options, [ 'start', 'count', 'sort' ]),
19 implicitToken: true,
20 defaultExpectedStatus: HttpStatusCode.OK_200
21 })
22 }
23
24 generate (options: OverrideCommandOptions = {}) {
25 const path = '/api/v1/runners/registration-tokens/generate'
26
27 return this.postBodyRequest({
28 ...options,
29
30 path,
31 implicitToken: true,
32 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
33 })
34 }
35
36 delete (options: OverrideCommandOptions & {
37 id: number
38 }) {
39 const path = '/api/v1/runners/registration-tokens/' + options.id
40
41 return this.deleteRequest({
42 ...options,
43
44 path,
45 implicitToken: true,
46 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
47 })
48 }
49
50 async getFirstRegistrationToken (options: OverrideCommandOptions = {}) {
51 const { data } = await this.list(options)
52
53 return data[0].registrationToken
54 }
55}
diff --git a/shared/server-commands/runners/runners-command.ts b/shared/server-commands/runners/runners-command.ts
new file mode 100644
index 000000000..ca9a1d7a3
--- /dev/null
+++ b/shared/server-commands/runners/runners-command.ts
@@ -0,0 +1,77 @@
1import { pick } from '@shared/core-utils'
2import { HttpStatusCode, RegisterRunnerBody, RegisterRunnerResult, ResultList, Runner, UnregisterRunnerBody } from '@shared/models'
3import { unwrapBody } from '../requests'
4import { AbstractCommand, OverrideCommandOptions } from '../shared'
5
6export class RunnersCommand extends AbstractCommand {
7
8 list (options: OverrideCommandOptions & {
9 start?: number
10 count?: number
11 sort?: string
12 } = {}) {
13 const path = '/api/v1/runners'
14
15 return this.getRequestBody<ResultList<Runner>>({
16 ...options,
17
18 path,
19 query: pick(options, [ 'start', 'count', 'sort' ]),
20 implicitToken: true,
21 defaultExpectedStatus: HttpStatusCode.OK_200
22 })
23 }
24
25 register (options: OverrideCommandOptions & RegisterRunnerBody) {
26 const path = '/api/v1/runners/register'
27
28 return unwrapBody<RegisterRunnerResult>(this.postBodyRequest({
29 ...options,
30
31 path,
32 fields: pick(options, [ 'name', 'registrationToken', 'description' ]),
33 implicitToken: true,
34 defaultExpectedStatus: HttpStatusCode.OK_200
35 }))
36 }
37
38 unregister (options: OverrideCommandOptions & UnregisterRunnerBody) {
39 const path = '/api/v1/runners/unregister'
40
41 return this.postBodyRequest({
42 ...options,
43
44 path,
45 fields: pick(options, [ 'runnerToken' ]),
46 implicitToken: false,
47 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
48 })
49 }
50
51 delete (options: OverrideCommandOptions & {
52 id: number
53 }) {
54 const path = '/api/v1/runners/' + options.id
55
56 return this.deleteRequest({
57 ...options,
58
59 path,
60 implicitToken: true,
61 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
62 })
63 }
64
65 // ---------------------------------------------------------------------------
66
67 async autoRegisterRunner () {
68 const { data } = await this.server.runnerRegistrationTokens.list({ sort: 'createdAt' })
69
70 const { runnerToken } = await this.register({
71 name: 'runner',
72 registrationToken: data[0].registrationToken
73 })
74
75 return runnerToken
76 }
77}