]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - shared/server-commands/runners/runner-jobs-command.ts
Support studio transcoding in peertube runner
[github/Chocobozzz/PeerTube.git] / shared / server-commands / runners / runner-jobs-command.ts
CommitLineData
d102de1b
C
1import { omit, pick, wait } from '@shared/core-utils'
2import {
3 AbortRunnerJobBody,
4 AcceptRunnerJobBody,
5 AcceptRunnerJobResult,
6 ErrorRunnerJobBody,
7 HttpStatusCode,
8 isHLSTranscodingPayloadSuccess,
9 isLiveRTMPHLSTranscodingUpdatePayload,
10 isWebVideoOrAudioMergeTranscodingPayloadSuccess,
11 RequestRunnerJobBody,
12 RequestRunnerJobResult,
13 ResultList,
14 RunnerJobAdmin,
15 RunnerJobLiveRTMPHLSTranscodingPayload,
16 RunnerJobPayload,
17 RunnerJobState,
18 RunnerJobSuccessBody,
19 RunnerJobSuccessPayload,
20 RunnerJobType,
21 RunnerJobUpdateBody,
22 RunnerJobVODPayload
23} from '@shared/models'
24import { unwrapBody } from '../requests'
25import { waitJobs } from '../server'
26import { AbstractCommand, OverrideCommandOptions } from '../shared'
27
28export class RunnerJobsCommand extends AbstractCommand {
29
30 list (options: OverrideCommandOptions & {
31 start?: number
32 count?: number
33 sort?: string
34 search?: string
35 } = {}) {
36 const path = '/api/v1/runners/jobs'
37
38 return this.getRequestBody<ResultList<RunnerJobAdmin>>({
39 ...options,
40
41 path,
42 query: pick(options, [ 'start', 'count', 'sort', 'search' ]),
43 implicitToken: true,
44 defaultExpectedStatus: HttpStatusCode.OK_200
45 })
46 }
47
48 cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
49 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel'
50
51 return this.postBodyRequest({
52 ...options,
53
54 path,
55 implicitToken: true,
56 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
57 })
58 }
59
60 // ---------------------------------------------------------------------------
61
62 request (options: OverrideCommandOptions & RequestRunnerJobBody) {
63 const path = '/api/v1/runners/jobs/request'
64
65 return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({
66 ...options,
67
68 path,
69 fields: pick(options, [ 'runnerToken' ]),
70 implicitToken: false,
71 defaultExpectedStatus: HttpStatusCode.OK_200
72 }))
73 }
74
75 async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
76 const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
77
78 const { availableJobs } = await this.request(options)
79
80 return {
81 availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
82 } as RequestRunnerJobResult<RunnerJobVODPayload>
83 }
84
85 async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
86 const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
87
88 const { availableJobs } = await this.request(options)
89
90 return {
91 availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
92 } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
93 }
94
95 // ---------------------------------------------------------------------------
96
97 accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
98 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'
99
100 return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
101 ...options,
102
103 path,
104 fields: pick(options, [ 'runnerToken' ]),
105 implicitToken: false,
106 defaultExpectedStatus: HttpStatusCode.OK_200
107 }))
108 }
109
110 abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) {
111 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort'
112
113 return this.postBodyRequest({
114 ...options,
115
116 path,
117 fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]),
118 implicitToken: false,
119 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
120 })
121 }
122
123 update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
124 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
125
126 const { payload } = options
127 const attaches: { [id: string]: any } = {}
128 let payloadWithoutFiles = payload
129
130 if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
131 if (payload.masterPlaylistFile) {
132 attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
133 }
134
135 attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
136 attaches[`payload[videoChunkFile]`] = payload.videoChunkFile
137
138 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
139 }
140
141 return this.postUploadRequest({
142 ...options,
143
144 path,
145 fields: {
146 ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
147
148 payload: payloadWithoutFiles
149 },
150 attaches,
151 implicitToken: false,
152 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
153 })
154 }
155
156 error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
157 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
158
159 return this.postBodyRequest({
160 ...options,
161
162 path,
163 fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]),
164 implicitToken: false,
165 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
166 })
167 }
168
169 success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
170 const { payload } = options
171
172 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
173 const attaches: { [id: string]: any } = {}
174 let payloadWithoutFiles = payload
175
176 if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
177 attaches[`payload[videoFile]`] = payload.videoFile
178
179 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ])
180 }
181
182 if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
183 attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
184
185 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ])
186 }
187
188 return this.postUploadRequest({
189 ...options,
190
191 path,
192 attaches,
193 fields: {
194 ...pick(options, [ 'jobToken', 'runnerToken' ]),
195
196 payload: payloadWithoutFiles
197 },
198 implicitToken: false,
199 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
200 })
201 }
202
5e47f6ab 203 getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
d102de1b
C
204 const { host, protocol, pathname } = new URL(options.url)
205
206 return this.postBodyRequest({
207 url: `${protocol}//${host}`,
208 path: pathname,
209
210 fields: pick(options, [ 'jobToken', 'runnerToken' ]),
211 implicitToken: false,
212 defaultExpectedStatus: HttpStatusCode.OK_200
213 })
214 }
215
216 // ---------------------------------------------------------------------------
217
218 async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) {
219 const { availableJobs } = await this.request(options)
220
221 const job = options.type
222 ? availableJobs.find(j => j.type === options.type)
223 : availableJobs[0]
224
225 return this.accept({ ...options, jobUUID: job.uuid })
226 }
227
228 async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) {
229 let jobUUID = jobUUIDToProcess
230
231 if (!jobUUID) {
232 const { availableJobs } = await this.request({ runnerToken })
233 jobUUID = availableJobs[0].uuid
234 }
235
236 const { job } = await this.accept({ runnerToken, jobUUID })
237 const jobToken = job.jobToken
238
239 const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
240 await this.success({ runnerToken, jobUUID, jobToken, payload })
241
242 await waitJobs([ this.server ])
243
244 return job
245 }
246
247 async cancelAllJobs (options: { state?: RunnerJobState } = {}) {
248 const { state } = options
249
250 const { data } = await this.list({ count: 100 })
251
5e47f6ab
C
252 const allowedStates = new Set<RunnerJobState>([
253 RunnerJobState.PENDING,
254 RunnerJobState.PROCESSING,
255 RunnerJobState.WAITING_FOR_PARENT_JOB
256 ])
257
d102de1b
C
258 for (const job of data) {
259 if (state && job.state.id !== state) continue
5e47f6ab 260 else if (allowedStates.has(job.state.id) !== true) continue
d102de1b
C
261
262 await this.cancelByAdmin({ jobUUID: job.uuid })
263 }
264 }
265
266 async getJob (options: OverrideCommandOptions & { uuid: string }) {
267 const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' })
268
269 return data.find(j => j.uuid === options.uuid)
270 }
271
272 async requestLiveJob (runnerToken: string) {
273 let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = []
274
275 while (availableJobs.length === 0) {
276 const result = await this.requestLive({ runnerToken })
277 availableJobs = result.availableJobs
278
279 if (availableJobs.length === 1) break
280
281 await wait(150)
282 }
283
284 return availableJobs[0]
285 }
286}