]>
Commit | Line | Data |
---|---|---|
1 | import express, { UploadFiles } from 'express' | |
2 | import { createReqFiles } from '@server/helpers/express-utils' | |
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
4 | import { generateRunnerJobToken } from '@server/helpers/token-generator' | |
5 | import { MIMETYPES } from '@server/initializers/constants' | |
6 | import { sequelizeTypescript } from '@server/initializers/database' | |
7 | import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners' | |
8 | import { | |
9 | asyncMiddleware, | |
10 | authenticate, | |
11 | ensureUserHasRight, | |
12 | paginationValidator, | |
13 | runnerJobsSortValidator, | |
14 | setDefaultPagination, | |
15 | setDefaultSort | |
16 | } from '@server/middlewares' | |
17 | import { | |
18 | abortRunnerJobValidator, | |
19 | acceptRunnerJobValidator, | |
20 | cancelRunnerJobValidator, | |
21 | errorRunnerJobValidator, | |
22 | getRunnerFromTokenValidator, | |
23 | jobOfRunnerGetValidator, | |
24 | runnerJobGetValidator, | |
25 | successRunnerJobValidator, | |
26 | updateRunnerJobValidator | |
27 | } from '@server/middlewares/validators/runners' | |
28 | import { RunnerModel } from '@server/models/runner/runner' | |
29 | import { RunnerJobModel } from '@server/models/runner/runner-job' | |
30 | import { | |
31 | AbortRunnerJobBody, | |
32 | AcceptRunnerJobResult, | |
33 | ErrorRunnerJobBody, | |
34 | HttpStatusCode, | |
35 | ListRunnerJobsQuery, | |
36 | LiveRTMPHLSTranscodingUpdatePayload, | |
37 | RequestRunnerJobResult, | |
38 | RunnerJobState, | |
39 | RunnerJobSuccessBody, | |
40 | RunnerJobSuccessPayload, | |
41 | RunnerJobType, | |
42 | RunnerJobUpdateBody, | |
43 | RunnerJobUpdatePayload, | |
44 | UserRight, | |
45 | VideoEditionTranscodingSuccess, | |
46 | VODAudioMergeTranscodingSuccess, | |
47 | VODHLSTranscodingSuccess, | |
48 | VODWebVideoTranscodingSuccess | |
49 | } from '@shared/models' | |
50 | ||
51 | const postRunnerJobSuccessVideoFiles = createReqFiles( | |
52 | [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ], | |
53 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | |
54 | ) | |
55 | ||
56 | const runnerJobUpdateVideoFiles = createReqFiles( | |
57 | [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ], | |
58 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | |
59 | ) | |
60 | ||
61 | const lTags = loggerTagsFactory('api', 'runner') | |
62 | ||
63 | const runnerJobsRouter = express.Router() | |
64 | ||
65 | // --------------------------------------------------------------------------- | |
66 | // Controllers for runners | |
67 | // --------------------------------------------------------------------------- | |
68 | ||
69 | runnerJobsRouter.post('/jobs/request', | |
70 | asyncMiddleware(getRunnerFromTokenValidator), | |
71 | asyncMiddleware(requestRunnerJob) | |
72 | ) | |
73 | ||
74 | runnerJobsRouter.post('/jobs/:jobUUID/accept', | |
75 | asyncMiddleware(runnerJobGetValidator), | |
76 | acceptRunnerJobValidator, | |
77 | asyncMiddleware(getRunnerFromTokenValidator), | |
78 | asyncMiddleware(acceptRunnerJob) | |
79 | ) | |
80 | ||
81 | runnerJobsRouter.post('/jobs/:jobUUID/abort', | |
82 | asyncMiddleware(jobOfRunnerGetValidator), | |
83 | abortRunnerJobValidator, | |
84 | asyncMiddleware(abortRunnerJob) | |
85 | ) | |
86 | ||
87 | runnerJobsRouter.post('/jobs/:jobUUID/update', | |
88 | runnerJobUpdateVideoFiles, | |
89 | asyncMiddleware(jobOfRunnerGetValidator), | |
90 | updateRunnerJobValidator, | |
91 | asyncMiddleware(updateRunnerJobController) | |
92 | ) | |
93 | ||
94 | runnerJobsRouter.post('/jobs/:jobUUID/error', | |
95 | asyncMiddleware(jobOfRunnerGetValidator), | |
96 | errorRunnerJobValidator, | |
97 | asyncMiddleware(errorRunnerJob) | |
98 | ) | |
99 | ||
100 | runnerJobsRouter.post('/jobs/:jobUUID/success', | |
101 | postRunnerJobSuccessVideoFiles, | |
102 | asyncMiddleware(jobOfRunnerGetValidator), | |
103 | successRunnerJobValidator, | |
104 | asyncMiddleware(postRunnerJobSuccess) | |
105 | ) | |
106 | ||
107 | // --------------------------------------------------------------------------- | |
108 | // Controllers for admins | |
109 | // --------------------------------------------------------------------------- | |
110 | ||
111 | runnerJobsRouter.post('/jobs/:jobUUID/cancel', | |
112 | authenticate, | |
113 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | |
114 | asyncMiddleware(runnerJobGetValidator), | |
115 | cancelRunnerJobValidator, | |
116 | asyncMiddleware(cancelRunnerJob) | |
117 | ) | |
118 | ||
119 | runnerJobsRouter.get('/jobs', | |
120 | authenticate, | |
121 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | |
122 | paginationValidator, | |
123 | runnerJobsSortValidator, | |
124 | setDefaultSort, | |
125 | setDefaultPagination, | |
126 | asyncMiddleware(listRunnerJobs) | |
127 | ) | |
128 | ||
129 | // --------------------------------------------------------------------------- | |
130 | ||
131 | export { | |
132 | runnerJobsRouter | |
133 | } | |
134 | ||
135 | // --------------------------------------------------------------------------- | |
136 | ||
137 | // --------------------------------------------------------------------------- | |
138 | // Controllers for runners | |
139 | // --------------------------------------------------------------------------- | |
140 | ||
141 | async function requestRunnerJob (req: express.Request, res: express.Response) { | |
142 | const runner = res.locals.runner | |
143 | const availableJobs = await RunnerJobModel.listAvailableJobs() | |
144 | ||
145 | logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) | |
146 | ||
147 | const result: RequestRunnerJobResult = { | |
148 | availableJobs: availableJobs.map(j => ({ | |
149 | uuid: j.uuid, | |
150 | type: j.type, | |
151 | payload: j.payload | |
152 | })) | |
153 | } | |
154 | ||
155 | updateLastRunnerContact(req, runner) | |
156 | ||
157 | return res.json(result) | |
158 | } | |
159 | ||
160 | async function acceptRunnerJob (req: express.Request, res: express.Response) { | |
161 | const runner = res.locals.runner | |
162 | const runnerJob = res.locals.runnerJob | |
163 | ||
164 | runnerJob.state = RunnerJobState.PROCESSING | |
165 | runnerJob.processingJobToken = generateRunnerJobToken() | |
166 | runnerJob.startedAt = new Date() | |
167 | runnerJob.runnerId = runner.id | |
168 | ||
169 | const newRunnerJob = await sequelizeTypescript.transaction(transaction => { | |
170 | return runnerJob.save({ transaction }) | |
171 | }) | |
172 | newRunnerJob.Runner = runner as RunnerModel | |
173 | ||
174 | const result: AcceptRunnerJobResult = { | |
175 | job: { | |
176 | ...newRunnerJob.toFormattedJSON(), | |
177 | ||
178 | jobToken: newRunnerJob.processingJobToken | |
179 | } | |
180 | } | |
181 | ||
182 | updateLastRunnerContact(req, runner) | |
183 | ||
184 | logger.info( | |
185 | 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
186 | lTags(runner.name, runnerJob.uuid, runnerJob.type) | |
187 | ) | |
188 | ||
189 | return res.json(result) | |
190 | } | |
191 | ||
192 | async function abortRunnerJob (req: express.Request, res: express.Response) { | |
193 | const runnerJob = res.locals.runnerJob | |
194 | const runner = runnerJob.Runner | |
195 | const body: AbortRunnerJobBody = req.body | |
196 | ||
197 | logger.info( | |
198 | 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
199 | { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
200 | ) | |
201 | ||
202 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
203 | await new RunnerJobHandler().abort({ runnerJob }) | |
204 | ||
205 | updateLastRunnerContact(req, runnerJob.Runner) | |
206 | ||
207 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
208 | } | |
209 | ||
210 | async function errorRunnerJob (req: express.Request, res: express.Response) { | |
211 | const runnerJob = res.locals.runnerJob | |
212 | const runner = runnerJob.Runner | |
213 | const body: ErrorRunnerJobBody = req.body | |
214 | ||
215 | runnerJob.failures += 1 | |
216 | ||
217 | logger.error( | |
218 | 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | |
219 | { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
220 | ) | |
221 | ||
222 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
223 | await new RunnerJobHandler().error({ runnerJob, message: body.message }) | |
224 | ||
225 | updateLastRunnerContact(req, runnerJob.Runner) | |
226 | ||
227 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
228 | } | |
229 | ||
230 | // --------------------------------------------------------------------------- | |
231 | ||
232 | const jobUpdateBuilders: { | |
233 | [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload | |
234 | } = { | |
235 | 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => { | |
236 | return { | |
237 | ...payload, | |
238 | ||
239 | masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path, | |
240 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path, | |
241 | videoChunkFile: files['payload[videoChunkFile]']?.[0].path | |
242 | } | |
243 | } | |
244 | } | |
245 | ||
246 | async function updateRunnerJobController (req: express.Request, res: express.Response) { | |
247 | const runnerJob = res.locals.runnerJob | |
248 | const runner = runnerJob.Runner | |
249 | const body: RunnerJobUpdateBody = req.body | |
250 | ||
251 | const payloadBuilder = jobUpdateBuilders[runnerJob.type] | |
252 | const updatePayload = payloadBuilder | |
253 | ? payloadBuilder(body.payload, req.files as UploadFiles) | |
254 | : undefined | |
255 | ||
256 | logger.debug( | |
257 | 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | |
258 | { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
259 | ) | |
260 | ||
261 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
262 | await new RunnerJobHandler().update({ | |
263 | runnerJob, | |
264 | progress: req.body.progress, | |
265 | updatePayload | |
266 | }) | |
267 | ||
268 | updateLastRunnerContact(req, runnerJob.Runner) | |
269 | ||
270 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
271 | } | |
272 | ||
273 | // --------------------------------------------------------------------------- | |
274 | ||
275 | const jobSuccessPayloadBuilders: { | |
276 | [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload | |
277 | } = { | |
278 | 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => { | |
279 | return { | |
280 | ...payload, | |
281 | ||
282 | videoFile: files['payload[videoFile]'][0].path | |
283 | } | |
284 | }, | |
285 | ||
286 | 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => { | |
287 | return { | |
288 | ...payload, | |
289 | ||
290 | videoFile: files['payload[videoFile]'][0].path, | |
291 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path | |
292 | } | |
293 | }, | |
294 | ||
295 | 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => { | |
296 | return { | |
297 | ...payload, | |
298 | ||
299 | videoFile: files['payload[videoFile]'][0].path | |
300 | } | |
301 | }, | |
302 | ||
303 | 'video-edition-transcoding': (payload: VideoEditionTranscodingSuccess, files) => { | |
304 | return { | |
305 | ...payload, | |
306 | ||
307 | videoFile: files['payload[videoFile]'][0].path | |
308 | } | |
309 | }, | |
310 | ||
311 | 'live-rtmp-hls-transcoding': () => ({}) | |
312 | } | |
313 | ||
314 | async function postRunnerJobSuccess (req: express.Request, res: express.Response) { | |
315 | const runnerJob = res.locals.runnerJob | |
316 | const runner = runnerJob.Runner | |
317 | const body: RunnerJobSuccessBody = req.body | |
318 | ||
319 | const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles) | |
320 | ||
321 | logger.info( | |
322 | 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | |
323 | { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | |
324 | ) | |
325 | ||
326 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
327 | await new RunnerJobHandler().complete({ runnerJob, resultPayload }) | |
328 | ||
329 | updateLastRunnerContact(req, runnerJob.Runner) | |
330 | ||
331 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
332 | } | |
333 | ||
334 | // --------------------------------------------------------------------------- | |
335 | // Controllers for admins | |
336 | // --------------------------------------------------------------------------- | |
337 | ||
338 | async function cancelRunnerJob (req: express.Request, res: express.Response) { | |
339 | const runnerJob = res.locals.runnerJob | |
340 | ||
341 | logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) | |
342 | ||
343 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | |
344 | await new RunnerJobHandler().cancel({ runnerJob }) | |
345 | ||
346 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | |
347 | } | |
348 | ||
349 | async function listRunnerJobs (req: express.Request, res: express.Response) { | |
350 | const query: ListRunnerJobsQuery = req.query | |
351 | ||
352 | const resultList = await RunnerJobModel.listForApi({ | |
353 | start: query.start, | |
354 | count: query.count, | |
355 | sort: query.sort, | |
356 | search: query.search | |
357 | }) | |
358 | ||
359 | return res.json({ | |
360 | total: resultList.total, | |
361 | data: resultList.data.map(d => d.toFormattedAdminJSON()) | |
362 | }) | |
363 | } |