]>
Commit | Line | Data |
---|---|---|
0c9668f7 | 1 | import express, { UploadFiles } from 'express' |
d38541fd | 2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
0c9668f7 C |
3 | import { createReqFiles } from '@server/helpers/express-utils' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
5 | import { generateRunnerJobToken } from '@server/helpers/token-generator' | |
6 | import { MIMETYPES } from '@server/initializers/constants' | |
7 | import { sequelizeTypescript } from '@server/initializers/database' | |
8 | import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners' | |
9 | import { | |
e915cde3 | 10 | apiRateLimiter, |
0c9668f7 C |
11 | asyncMiddleware, |
12 | authenticate, | |
13 | ensureUserHasRight, | |
14 | paginationValidator, | |
15 | runnerJobsSortValidator, | |
16 | setDefaultPagination, | |
17 | setDefaultSort | |
18 | } from '@server/middlewares' | |
19 | import { | |
20 | abortRunnerJobValidator, | |
21 | acceptRunnerJobValidator, | |
5e47f6ab | 22 | cancelRunnerJobValidator, |
0c9668f7 C |
23 | errorRunnerJobValidator, |
24 | getRunnerFromTokenValidator, | |
25 | jobOfRunnerGetValidator, | |
26 | runnerJobGetValidator, | |
27 | successRunnerJobValidator, | |
28 | updateRunnerJobValidator | |
29 | } from '@server/middlewares/validators/runners' | |
30 | import { RunnerModel } from '@server/models/runner/runner' | |
31 | import { RunnerJobModel } from '@server/models/runner/runner-job' | |
32 | import { | |
33 | AbortRunnerJobBody, | |
34 | AcceptRunnerJobResult, | |
35 | ErrorRunnerJobBody, | |
36 | HttpStatusCode, | |
37 | ListRunnerJobsQuery, | |
38 | LiveRTMPHLSTranscodingUpdatePayload, | |
39 | RequestRunnerJobResult, | |
40 | RunnerJobState, | |
41 | RunnerJobSuccessBody, | |
42 | RunnerJobSuccessPayload, | |
43 | RunnerJobType, | |
44 | RunnerJobUpdateBody, | |
45 | RunnerJobUpdatePayload, | |
fe7019b2 | 46 | ServerErrorCode, |
0c9668f7 | 47 | UserRight, |
ab14f0e0 | 48 | VideoStudioTranscodingSuccess, |
0c9668f7 C |
49 | VODAudioMergeTranscodingSuccess, |
50 | VODHLSTranscodingSuccess, | |
51 | VODWebVideoTranscodingSuccess | |
52 | } from '@shared/models' | |
53 | ||
54 | const postRunnerJobSuccessVideoFiles = createReqFiles( | |
55 | [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ], | |
56 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | |
57 | ) | |
58 | ||
59 | const runnerJobUpdateVideoFiles = createReqFiles( | |
60 | [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ], | |
61 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | |
62 | ) | |
63 | ||
64 | const lTags = loggerTagsFactory('api', 'runner') | |
65 | ||
66 | const runnerJobsRouter = express.Router() | |
67 | ||
68 | // --------------------------------------------------------------------------- | |
69 | // Controllers for runners | |
70 | // --------------------------------------------------------------------------- | |
71 | ||
72 | runnerJobsRouter.post('/jobs/request', | |
e915cde3 | 73 | apiRateLimiter, |
0c9668f7 C |
74 | asyncMiddleware(getRunnerFromTokenValidator), |
75 | asyncMiddleware(requestRunnerJob) | |
76 | ) | |
77 | ||
78 | runnerJobsRouter.post('/jobs/:jobUUID/accept', | |
e915cde3 | 79 | apiRateLimiter, |
0c9668f7 C |
80 | asyncMiddleware(runnerJobGetValidator), |
81 | acceptRunnerJobValidator, | |
82 | asyncMiddleware(getRunnerFromTokenValidator), | |
83 | asyncMiddleware(acceptRunnerJob) | |
84 | ) | |
85 | ||
86 | runnerJobsRouter.post('/jobs/:jobUUID/abort', | |
e915cde3 | 87 | apiRateLimiter, |
0c9668f7 C |
88 | asyncMiddleware(jobOfRunnerGetValidator), |
89 | abortRunnerJobValidator, | |
90 | asyncMiddleware(abortRunnerJob) | |
91 | ) | |
92 | ||
93 | runnerJobsRouter.post('/jobs/:jobUUID/update', | |
94 | runnerJobUpdateVideoFiles, | |
e915cde3 | 95 | apiRateLimiter, // Has to be after multer middleware to parse runner token |
0c9668f7 C |
96 | asyncMiddleware(jobOfRunnerGetValidator), |
97 | updateRunnerJobValidator, | |
98 | asyncMiddleware(updateRunnerJobController) | |
99 | ) | |
100 | ||
101 | runnerJobsRouter.post('/jobs/:jobUUID/error', | |
102 | asyncMiddleware(jobOfRunnerGetValidator), | |
103 | errorRunnerJobValidator, | |
104 | asyncMiddleware(errorRunnerJob) | |
105 | ) | |
106 | ||
107 | runnerJobsRouter.post('/jobs/:jobUUID/success', | |
108 | postRunnerJobSuccessVideoFiles, | |
e915cde3 | 109 | apiRateLimiter, // Has to be after multer middleware to parse runner token |
0c9668f7 C |
110 | asyncMiddleware(jobOfRunnerGetValidator), |
111 | successRunnerJobValidator, | |
112 | asyncMiddleware(postRunnerJobSuccess) | |
113 | ) | |
114 | ||
115 | // --------------------------------------------------------------------------- | |
116 | // Controllers for admins | |
117 | // --------------------------------------------------------------------------- | |
118 | ||
119 | runnerJobsRouter.post('/jobs/:jobUUID/cancel', | |
120 | authenticate, | |
121 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | |
122 | asyncMiddleware(runnerJobGetValidator), | |
5e47f6ab | 123 | cancelRunnerJobValidator, |
0c9668f7 C |
124 | asyncMiddleware(cancelRunnerJob) |
125 | ) | |
126 | ||
127 | runnerJobsRouter.get('/jobs', | |
128 | authenticate, | |
129 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | |
130 | paginationValidator, | |
131 | runnerJobsSortValidator, | |
132 | setDefaultSort, | |
133 | setDefaultPagination, | |
134 | asyncMiddleware(listRunnerJobs) | |
135 | ) | |
136 | ||
137 | // --------------------------------------------------------------------------- | |
138 | ||
139 | export { | |
140 | runnerJobsRouter | |
141 | } | |
142 | ||
143 | // --------------------------------------------------------------------------- | |
144 | ||
145 | // --------------------------------------------------------------------------- | |
146 | // Controllers for runners | |
147 | // --------------------------------------------------------------------------- | |
148 | ||
149 | async function requestRunnerJob (req: express.Request, res: express.Response) { | |
150 | const runner = res.locals.runner | |
151 | const availableJobs = await RunnerJobModel.listAvailableJobs() | |
152 | ||
153 | logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) | |
154 | ||
155 | const result: RequestRunnerJobResult = { | |
156 | availableJobs: availableJobs.map(j => ({ | |
157 | uuid: j.uuid, | |
158 | type: j.type, | |
159 | payload: j.payload | |
160 | })) | |
161 | } | |
162 | ||
163 | updateLastRunnerContact(req, runner) | |
164 | ||
165 | return res.json(result) | |
166 | } | |
167 | ||
168 | async function acceptRunnerJob (req: express.Request, res: express.Response) { | |
169 | const runner = res.locals.runner | |
170 | const runnerJob = res.locals.runnerJob | |
171 | ||
d38541fd C |
172 | const newRunnerJob = await retryTransactionWrapper(() => { |
173 | return sequelizeTypescript.transaction(async transaction => { | |
174 | await runnerJob.reload({ transaction }) | |
0c9668f7 | 175 | |
d38541fd C |
176 | if (runnerJob.state !== RunnerJobState.PENDING) { |
177 | res.fail({ | |
fe7019b2 | 178 | type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE, |
d38541fd C |
179 | message: 'This job is not in pending state anymore', |
180 | status: HttpStatusCode.CONFLICT_409 | |
181 | }) | |
182 | ||
183 | return undefined | |
184 | } | |
185 | ||
186 | runnerJob.state = RunnerJobState.PROCESSING | |
187 | runnerJob.processingJobToken = generateRunnerJobToken() | |
188 | runnerJob.startedAt = new Date() | |
189 | runnerJob.runnerId = runner.id | |
190 | ||
191 | return runnerJob.save({ transaction }) | |
192 | }) | |
0c9668f7 | 193 | }) |
d38541fd C |
194 | if (!newRunnerJob) return |
195 | ||
0c9668f7 C |
196 | newRunnerJob.Runner = runner as RunnerModel |
197 | ||
198 | const result: AcceptRunnerJobResult = { | |
199 | job: { | |
200 | ...newRunnerJob.toFormattedJSON(), | |
201 | ||
202 | jobToken: newRunnerJob.processingJobToken | |
203 | } | |
204 | } | |
205 | ||
206 | updateLastRunnerContact(req, runner) | |
207 | ||
208 | logger.info( | |
209 | 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
210 | lTags(runner.name, runnerJob.uuid, runnerJob.type) | |
211 | ) | |
212 | ||
213 | return res.json(result) | |
214 | } | |
215 | ||
216 | async function abortRunnerJob (req: express.Request, res: express.Response) { | |
217 | const runnerJob = res.locals.runnerJob | |
218 | const runner = runnerJob.Runner | |
219 | const body: AbortRunnerJobBody = req.body | |
220 | ||
221 | logger.info( | |
222 | 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
223 | { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
224 | ) | |
225 | ||
226 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
227 | await new RunnerJobHandler().abort({ runnerJob }) | |
228 | ||
229 | updateLastRunnerContact(req, runnerJob.Runner) | |
230 | ||
231 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
232 | } | |
233 | ||
234 | async function errorRunnerJob (req: express.Request, res: express.Response) { | |
235 | const runnerJob = res.locals.runnerJob | |
236 | const runner = runnerJob.Runner | |
237 | const body: ErrorRunnerJobBody = req.body | |
238 | ||
239 | runnerJob.failures += 1 | |
240 | ||
241 | logger.error( | |
242 | 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
243 | { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
244 | ) | |
245 | ||
246 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
247 | await new RunnerJobHandler().error({ runnerJob, message: body.message }) | |
248 | ||
249 | updateLastRunnerContact(req, runnerJob.Runner) | |
250 | ||
251 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
252 | } | |
253 | ||
254 | // --------------------------------------------------------------------------- | |
255 | ||
256 | const jobUpdateBuilders: { | |
257 | [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload | |
258 | } = { | |
259 | 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => { | |
260 | return { | |
261 | ...payload, | |
262 | ||
263 | masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path, | |
264 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path, | |
265 | videoChunkFile: files['payload[videoChunkFile]']?.[0].path | |
266 | } | |
267 | } | |
268 | } | |
269 | ||
270 | async function updateRunnerJobController (req: express.Request, res: express.Response) { | |
271 | const runnerJob = res.locals.runnerJob | |
272 | const runner = runnerJob.Runner | |
273 | const body: RunnerJobUpdateBody = req.body | |
274 | ||
275 | const payloadBuilder = jobUpdateBuilders[runnerJob.type] | |
276 | const updatePayload = payloadBuilder | |
277 | ? payloadBuilder(body.payload, req.files as UploadFiles) | |
278 | : undefined | |
279 | ||
280 | logger.debug( | |
281 | 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | |
282 | { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
283 | ) | |
284 | ||
285 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
286 | await new RunnerJobHandler().update({ | |
287 | runnerJob, | |
288 | progress: req.body.progress, | |
289 | updatePayload | |
290 | }) | |
291 | ||
292 | updateLastRunnerContact(req, runnerJob.Runner) | |
293 | ||
294 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
295 | } | |
296 | ||
297 | // --------------------------------------------------------------------------- | |
298 | ||
299 | const jobSuccessPayloadBuilders: { | |
300 | [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload | |
301 | } = { | |
302 | 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => { | |
303 | return { | |
304 | ...payload, | |
305 | ||
306 | videoFile: files['payload[videoFile]'][0].path | |
307 | } | |
308 | }, | |
309 | ||
310 | 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => { | |
311 | return { | |
312 | ...payload, | |
313 | ||
314 | videoFile: files['payload[videoFile]'][0].path, | |
315 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path | |
316 | } | |
317 | }, | |
318 | ||
319 | 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => { | |
320 | return { | |
321 | ...payload, | |
322 | ||
323 | videoFile: files['payload[videoFile]'][0].path | |
324 | } | |
325 | }, | |
326 | ||
ab14f0e0 | 327 | 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => { |
5e47f6ab C |
328 | return { |
329 | ...payload, | |
330 | ||
331 | videoFile: files['payload[videoFile]'][0].path | |
332 | } | |
333 | }, | |
334 | ||
0c9668f7 C |
335 | 'live-rtmp-hls-transcoding': () => ({}) |
336 | } | |
337 | ||
338 | async function postRunnerJobSuccess (req: express.Request, res: express.Response) { | |
339 | const runnerJob = res.locals.runnerJob | |
340 | const runner = runnerJob.Runner | |
341 | const body: RunnerJobSuccessBody = req.body | |
342 | ||
343 | const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles) | |
344 | ||
345 | logger.info( | |
346 | 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | |
347 | { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
348 | ) | |
349 | ||
350 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
351 | await new RunnerJobHandler().complete({ runnerJob, resultPayload }) | |
352 | ||
353 | updateLastRunnerContact(req, runnerJob.Runner) | |
354 | ||
355 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
356 | } | |
357 | ||
358 | // --------------------------------------------------------------------------- | |
359 | // Controllers for admins | |
360 | // --------------------------------------------------------------------------- | |
361 | ||
362 | async function cancelRunnerJob (req: express.Request, res: express.Response) { | |
363 | const runnerJob = res.locals.runnerJob | |
364 | ||
5e47f6ab | 365 | logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) |
0c9668f7 C |
366 | |
367 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
368 | await new RunnerJobHandler().cancel({ runnerJob }) | |
369 | ||
370 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
371 | } | |
372 | ||
373 | async function listRunnerJobs (req: express.Request, res: express.Response) { | |
374 | const query: ListRunnerJobsQuery = req.query | |
375 | ||
376 | const resultList = await RunnerJobModel.listForApi({ | |
377 | start: query.start, | |
378 | count: query.count, | |
379 | sort: query.sort, | |
380 | search: query.search | |
381 | }) | |
382 | ||
383 | return res.json({ | |
384 | total: resultList.total, | |
385 | data: resultList.data.map(d => d.toFormattedAdminJSON()) | |
386 | }) | |
387 | } |