]>
Commit | Line | Data |
---|---|---|
0c9668f7 | 1 | import express, { UploadFiles } from 'express' |
d38541fd | 2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
0c9668f7 C |
3 | import { createReqFiles } from '@server/helpers/express-utils' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
5 | import { generateRunnerJobToken } from '@server/helpers/token-generator' | |
6 | import { MIMETYPES } from '@server/initializers/constants' | |
7 | import { sequelizeTypescript } from '@server/initializers/database' | |
8 | import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners' | |
9 | import { | |
10 | asyncMiddleware, | |
11 | authenticate, | |
12 | ensureUserHasRight, | |
13 | paginationValidator, | |
14 | runnerJobsSortValidator, | |
15 | setDefaultPagination, | |
16 | setDefaultSort | |
17 | } from '@server/middlewares' | |
18 | import { | |
19 | abortRunnerJobValidator, | |
20 | acceptRunnerJobValidator, | |
5e47f6ab | 21 | cancelRunnerJobValidator, |
0c9668f7 C |
22 | errorRunnerJobValidator, |
23 | getRunnerFromTokenValidator, | |
24 | jobOfRunnerGetValidator, | |
25 | runnerJobGetValidator, | |
26 | successRunnerJobValidator, | |
27 | updateRunnerJobValidator | |
28 | } from '@server/middlewares/validators/runners' | |
29 | import { RunnerModel } from '@server/models/runner/runner' | |
30 | import { RunnerJobModel } from '@server/models/runner/runner-job' | |
31 | import { | |
32 | AbortRunnerJobBody, | |
33 | AcceptRunnerJobResult, | |
34 | ErrorRunnerJobBody, | |
35 | HttpStatusCode, | |
36 | ListRunnerJobsQuery, | |
37 | LiveRTMPHLSTranscodingUpdatePayload, | |
38 | RequestRunnerJobResult, | |
39 | RunnerJobState, | |
40 | RunnerJobSuccessBody, | |
41 | RunnerJobSuccessPayload, | |
42 | RunnerJobType, | |
43 | RunnerJobUpdateBody, | |
44 | RunnerJobUpdatePayload, | |
fe7019b2 | 45 | ServerErrorCode, |
0c9668f7 | 46 | UserRight, |
ab14f0e0 | 47 | VideoStudioTranscodingSuccess, |
0c9668f7 C |
48 | VODAudioMergeTranscodingSuccess, |
49 | VODHLSTranscodingSuccess, | |
50 | VODWebVideoTranscodingSuccess | |
51 | } from '@shared/models' | |
52 | ||
53 | const postRunnerJobSuccessVideoFiles = createReqFiles( | |
54 | [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ], | |
55 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | |
56 | ) | |
57 | ||
58 | const runnerJobUpdateVideoFiles = createReqFiles( | |
59 | [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ], | |
60 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | |
61 | ) | |
62 | ||
63 | const lTags = loggerTagsFactory('api', 'runner') | |
64 | ||
65 | const runnerJobsRouter = express.Router() | |
66 | ||
67 | // --------------------------------------------------------------------------- | |
68 | // Controllers for runners | |
69 | // --------------------------------------------------------------------------- | |
70 | ||
71 | runnerJobsRouter.post('/jobs/request', | |
72 | asyncMiddleware(getRunnerFromTokenValidator), | |
73 | asyncMiddleware(requestRunnerJob) | |
74 | ) | |
75 | ||
76 | runnerJobsRouter.post('/jobs/:jobUUID/accept', | |
77 | asyncMiddleware(runnerJobGetValidator), | |
78 | acceptRunnerJobValidator, | |
79 | asyncMiddleware(getRunnerFromTokenValidator), | |
80 | asyncMiddleware(acceptRunnerJob) | |
81 | ) | |
82 | ||
83 | runnerJobsRouter.post('/jobs/:jobUUID/abort', | |
84 | asyncMiddleware(jobOfRunnerGetValidator), | |
85 | abortRunnerJobValidator, | |
86 | asyncMiddleware(abortRunnerJob) | |
87 | ) | |
88 | ||
89 | runnerJobsRouter.post('/jobs/:jobUUID/update', | |
90 | runnerJobUpdateVideoFiles, | |
91 | asyncMiddleware(jobOfRunnerGetValidator), | |
92 | updateRunnerJobValidator, | |
93 | asyncMiddleware(updateRunnerJobController) | |
94 | ) | |
95 | ||
96 | runnerJobsRouter.post('/jobs/:jobUUID/error', | |
97 | asyncMiddleware(jobOfRunnerGetValidator), | |
98 | errorRunnerJobValidator, | |
99 | asyncMiddleware(errorRunnerJob) | |
100 | ) | |
101 | ||
102 | runnerJobsRouter.post('/jobs/:jobUUID/success', | |
103 | postRunnerJobSuccessVideoFiles, | |
104 | asyncMiddleware(jobOfRunnerGetValidator), | |
105 | successRunnerJobValidator, | |
106 | asyncMiddleware(postRunnerJobSuccess) | |
107 | ) | |
108 | ||
109 | // --------------------------------------------------------------------------- | |
110 | // Controllers for admins | |
111 | // --------------------------------------------------------------------------- | |
112 | ||
113 | runnerJobsRouter.post('/jobs/:jobUUID/cancel', | |
114 | authenticate, | |
115 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | |
116 | asyncMiddleware(runnerJobGetValidator), | |
5e47f6ab | 117 | cancelRunnerJobValidator, |
0c9668f7 C |
118 | asyncMiddleware(cancelRunnerJob) |
119 | ) | |
120 | ||
121 | runnerJobsRouter.get('/jobs', | |
122 | authenticate, | |
123 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | |
124 | paginationValidator, | |
125 | runnerJobsSortValidator, | |
126 | setDefaultSort, | |
127 | setDefaultPagination, | |
128 | asyncMiddleware(listRunnerJobs) | |
129 | ) | |
130 | ||
131 | // --------------------------------------------------------------------------- | |
132 | ||
133 | export { | |
134 | runnerJobsRouter | |
135 | } | |
136 | ||
137 | // --------------------------------------------------------------------------- | |
138 | ||
139 | // --------------------------------------------------------------------------- | |
140 | // Controllers for runners | |
141 | // --------------------------------------------------------------------------- | |
142 | ||
143 | async function requestRunnerJob (req: express.Request, res: express.Response) { | |
144 | const runner = res.locals.runner | |
145 | const availableJobs = await RunnerJobModel.listAvailableJobs() | |
146 | ||
147 | logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) | |
148 | ||
149 | const result: RequestRunnerJobResult = { | |
150 | availableJobs: availableJobs.map(j => ({ | |
151 | uuid: j.uuid, | |
152 | type: j.type, | |
153 | payload: j.payload | |
154 | })) | |
155 | } | |
156 | ||
157 | updateLastRunnerContact(req, runner) | |
158 | ||
159 | return res.json(result) | |
160 | } | |
161 | ||
162 | async function acceptRunnerJob (req: express.Request, res: express.Response) { | |
163 | const runner = res.locals.runner | |
164 | const runnerJob = res.locals.runnerJob | |
165 | ||
d38541fd C |
166 | const newRunnerJob = await retryTransactionWrapper(() => { |
167 | return sequelizeTypescript.transaction(async transaction => { | |
168 | await runnerJob.reload({ transaction }) | |
0c9668f7 | 169 | |
d38541fd C |
170 | if (runnerJob.state !== RunnerJobState.PENDING) { |
171 | res.fail({ | |
fe7019b2 | 172 | type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE, |
d38541fd C |
173 | message: 'This job is not in pending state anymore', |
174 | status: HttpStatusCode.CONFLICT_409 | |
175 | }) | |
176 | ||
177 | return undefined | |
178 | } | |
179 | ||
180 | runnerJob.state = RunnerJobState.PROCESSING | |
181 | runnerJob.processingJobToken = generateRunnerJobToken() | |
182 | runnerJob.startedAt = new Date() | |
183 | runnerJob.runnerId = runner.id | |
184 | ||
185 | return runnerJob.save({ transaction }) | |
186 | }) | |
0c9668f7 | 187 | }) |
d38541fd C |
188 | if (!newRunnerJob) return |
189 | ||
0c9668f7 C |
190 | newRunnerJob.Runner = runner as RunnerModel |
191 | ||
192 | const result: AcceptRunnerJobResult = { | |
193 | job: { | |
194 | ...newRunnerJob.toFormattedJSON(), | |
195 | ||
196 | jobToken: newRunnerJob.processingJobToken | |
197 | } | |
198 | } | |
199 | ||
200 | updateLastRunnerContact(req, runner) | |
201 | ||
202 | logger.info( | |
203 | 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
204 | lTags(runner.name, runnerJob.uuid, runnerJob.type) | |
205 | ) | |
206 | ||
207 | return res.json(result) | |
208 | } | |
209 | ||
210 | async function abortRunnerJob (req: express.Request, res: express.Response) { | |
211 | const runnerJob = res.locals.runnerJob | |
212 | const runner = runnerJob.Runner | |
213 | const body: AbortRunnerJobBody = req.body | |
214 | ||
215 | logger.info( | |
216 | 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
217 | { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
218 | ) | |
219 | ||
220 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
221 | await new RunnerJobHandler().abort({ runnerJob }) | |
222 | ||
223 | updateLastRunnerContact(req, runnerJob.Runner) | |
224 | ||
225 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
226 | } | |
227 | ||
228 | async function errorRunnerJob (req: express.Request, res: express.Response) { | |
229 | const runnerJob = res.locals.runnerJob | |
230 | const runner = runnerJob.Runner | |
231 | const body: ErrorRunnerJobBody = req.body | |
232 | ||
233 | runnerJob.failures += 1 | |
234 | ||
235 | logger.error( | |
236 | 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
237 | { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
238 | ) | |
239 | ||
240 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
241 | await new RunnerJobHandler().error({ runnerJob, message: body.message }) | |
242 | ||
243 | updateLastRunnerContact(req, runnerJob.Runner) | |
244 | ||
245 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
246 | } | |
247 | ||
248 | // --------------------------------------------------------------------------- | |
249 | ||
250 | const jobUpdateBuilders: { | |
251 | [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload | |
252 | } = { | |
253 | 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => { | |
254 | return { | |
255 | ...payload, | |
256 | ||
257 | masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path, | |
258 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path, | |
259 | videoChunkFile: files['payload[videoChunkFile]']?.[0].path | |
260 | } | |
261 | } | |
262 | } | |
263 | ||
264 | async function updateRunnerJobController (req: express.Request, res: express.Response) { | |
265 | const runnerJob = res.locals.runnerJob | |
266 | const runner = runnerJob.Runner | |
267 | const body: RunnerJobUpdateBody = req.body | |
268 | ||
269 | const payloadBuilder = jobUpdateBuilders[runnerJob.type] | |
270 | const updatePayload = payloadBuilder | |
271 | ? payloadBuilder(body.payload, req.files as UploadFiles) | |
272 | : undefined | |
273 | ||
274 | logger.debug( | |
275 | 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | |
276 | { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
277 | ) | |
278 | ||
279 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
280 | await new RunnerJobHandler().update({ | |
281 | runnerJob, | |
282 | progress: req.body.progress, | |
283 | updatePayload | |
284 | }) | |
285 | ||
286 | updateLastRunnerContact(req, runnerJob.Runner) | |
287 | ||
288 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
289 | } | |
290 | ||
291 | // --------------------------------------------------------------------------- | |
292 | ||
293 | const jobSuccessPayloadBuilders: { | |
294 | [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload | |
295 | } = { | |
296 | 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => { | |
297 | return { | |
298 | ...payload, | |
299 | ||
300 | videoFile: files['payload[videoFile]'][0].path | |
301 | } | |
302 | }, | |
303 | ||
304 | 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => { | |
305 | return { | |
306 | ...payload, | |
307 | ||
308 | videoFile: files['payload[videoFile]'][0].path, | |
309 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path | |
310 | } | |
311 | }, | |
312 | ||
313 | 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => { | |
314 | return { | |
315 | ...payload, | |
316 | ||
317 | videoFile: files['payload[videoFile]'][0].path | |
318 | } | |
319 | }, | |
320 | ||
ab14f0e0 | 321 | 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => { |
5e47f6ab C |
322 | return { |
323 | ...payload, | |
324 | ||
325 | videoFile: files['payload[videoFile]'][0].path | |
326 | } | |
327 | }, | |
328 | ||
0c9668f7 C |
329 | 'live-rtmp-hls-transcoding': () => ({}) |
330 | } | |
331 | ||
332 | async function postRunnerJobSuccess (req: express.Request, res: express.Response) { | |
333 | const runnerJob = res.locals.runnerJob | |
334 | const runner = runnerJob.Runner | |
335 | const body: RunnerJobSuccessBody = req.body | |
336 | ||
337 | const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles) | |
338 | ||
339 | logger.info( | |
340 | 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | |
341 | { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
342 | ) | |
343 | ||
344 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
345 | await new RunnerJobHandler().complete({ runnerJob, resultPayload }) | |
346 | ||
347 | updateLastRunnerContact(req, runnerJob.Runner) | |
348 | ||
349 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
350 | } | |
351 | ||
352 | // --------------------------------------------------------------------------- | |
353 | // Controllers for admins | |
354 | // --------------------------------------------------------------------------- | |
355 | ||
356 | async function cancelRunnerJob (req: express.Request, res: express.Response) { | |
357 | const runnerJob = res.locals.runnerJob | |
358 | ||
5e47f6ab | 359 | logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) |
0c9668f7 C |
360 | |
361 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
362 | await new RunnerJobHandler().cancel({ runnerJob }) | |
363 | ||
364 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
365 | } | |
366 | ||
367 | async function listRunnerJobs (req: express.Request, res: express.Response) { | |
368 | const query: ListRunnerJobsQuery = req.query | |
369 | ||
370 | const resultList = await RunnerJobModel.listForApi({ | |
371 | start: query.start, | |
372 | count: query.count, | |
373 | sort: query.sort, | |
374 | search: query.search | |
375 | }) | |
376 | ||
377 | return res.json({ | |
378 | total: resultList.total, | |
379 | data: resultList.data.map(d => d.toFormattedAdminJSON()) | |
380 | }) | |
381 | } |