diff options
Diffstat (limited to 'server/controllers/api/runners/jobs.ts')
-rw-r--r-- | server/controllers/api/runners/jobs.ts | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts new file mode 100644 index 000000000..7d488ec11 --- /dev/null +++ b/server/controllers/api/runners/jobs.ts | |||
@@ -0,0 +1,352 @@ | |||
1 | import express, { UploadFiles } from 'express' | ||
2 | import { createReqFiles } from '@server/helpers/express-utils' | ||
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
4 | import { generateRunnerJobToken } from '@server/helpers/token-generator' | ||
5 | import { MIMETYPES } from '@server/initializers/constants' | ||
6 | import { sequelizeTypescript } from '@server/initializers/database' | ||
7 | import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners' | ||
8 | import { | ||
9 | asyncMiddleware, | ||
10 | authenticate, | ||
11 | ensureUserHasRight, | ||
12 | paginationValidator, | ||
13 | runnerJobsSortValidator, | ||
14 | setDefaultPagination, | ||
15 | setDefaultSort | ||
16 | } from '@server/middlewares' | ||
17 | import { | ||
18 | abortRunnerJobValidator, | ||
19 | acceptRunnerJobValidator, | ||
20 | errorRunnerJobValidator, | ||
21 | getRunnerFromTokenValidator, | ||
22 | jobOfRunnerGetValidator, | ||
23 | runnerJobGetValidator, | ||
24 | successRunnerJobValidator, | ||
25 | updateRunnerJobValidator | ||
26 | } from '@server/middlewares/validators/runners' | ||
27 | import { RunnerModel } from '@server/models/runner/runner' | ||
28 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
29 | import { | ||
30 | AbortRunnerJobBody, | ||
31 | AcceptRunnerJobResult, | ||
32 | ErrorRunnerJobBody, | ||
33 | HttpStatusCode, | ||
34 | ListRunnerJobsQuery, | ||
35 | LiveRTMPHLSTranscodingUpdatePayload, | ||
36 | RequestRunnerJobResult, | ||
37 | RunnerJobState, | ||
38 | RunnerJobSuccessBody, | ||
39 | RunnerJobSuccessPayload, | ||
40 | RunnerJobType, | ||
41 | RunnerJobUpdateBody, | ||
42 | RunnerJobUpdatePayload, | ||
43 | UserRight, | ||
44 | VODAudioMergeTranscodingSuccess, | ||
45 | VODHLSTranscodingSuccess, | ||
46 | VODWebVideoTranscodingSuccess | ||
47 | } from '@shared/models' | ||
48 | |||
49 | const postRunnerJobSuccessVideoFiles = createReqFiles( | ||
50 | [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ], | ||
51 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | ||
52 | ) | ||
53 | |||
54 | const runnerJobUpdateVideoFiles = createReqFiles( | ||
55 | [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ], | ||
56 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | ||
57 | ) | ||
58 | |||
59 | const lTags = loggerTagsFactory('api', 'runner') | ||
60 | |||
61 | const runnerJobsRouter = express.Router() | ||
62 | |||
63 | // --------------------------------------------------------------------------- | ||
64 | // Controllers for runners | ||
65 | // --------------------------------------------------------------------------- | ||
66 | |||
67 | runnerJobsRouter.post('/jobs/request', | ||
68 | asyncMiddleware(getRunnerFromTokenValidator), | ||
69 | asyncMiddleware(requestRunnerJob) | ||
70 | ) | ||
71 | |||
72 | runnerJobsRouter.post('/jobs/:jobUUID/accept', | ||
73 | asyncMiddleware(runnerJobGetValidator), | ||
74 | acceptRunnerJobValidator, | ||
75 | asyncMiddleware(getRunnerFromTokenValidator), | ||
76 | asyncMiddleware(acceptRunnerJob) | ||
77 | ) | ||
78 | |||
79 | runnerJobsRouter.post('/jobs/:jobUUID/abort', | ||
80 | asyncMiddleware(jobOfRunnerGetValidator), | ||
81 | abortRunnerJobValidator, | ||
82 | asyncMiddleware(abortRunnerJob) | ||
83 | ) | ||
84 | |||
85 | runnerJobsRouter.post('/jobs/:jobUUID/update', | ||
86 | runnerJobUpdateVideoFiles, | ||
87 | asyncMiddleware(jobOfRunnerGetValidator), | ||
88 | updateRunnerJobValidator, | ||
89 | asyncMiddleware(updateRunnerJobController) | ||
90 | ) | ||
91 | |||
92 | runnerJobsRouter.post('/jobs/:jobUUID/error', | ||
93 | asyncMiddleware(jobOfRunnerGetValidator), | ||
94 | errorRunnerJobValidator, | ||
95 | asyncMiddleware(errorRunnerJob) | ||
96 | ) | ||
97 | |||
98 | runnerJobsRouter.post('/jobs/:jobUUID/success', | ||
99 | postRunnerJobSuccessVideoFiles, | ||
100 | asyncMiddleware(jobOfRunnerGetValidator), | ||
101 | successRunnerJobValidator, | ||
102 | asyncMiddleware(postRunnerJobSuccess) | ||
103 | ) | ||
104 | |||
105 | // --------------------------------------------------------------------------- | ||
106 | // Controllers for admins | ||
107 | // --------------------------------------------------------------------------- | ||
108 | |||
109 | runnerJobsRouter.post('/jobs/:jobUUID/cancel', | ||
110 | authenticate, | ||
111 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | ||
112 | asyncMiddleware(runnerJobGetValidator), | ||
113 | asyncMiddleware(cancelRunnerJob) | ||
114 | ) | ||
115 | |||
116 | runnerJobsRouter.get('/jobs', | ||
117 | authenticate, | ||
118 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | ||
119 | paginationValidator, | ||
120 | runnerJobsSortValidator, | ||
121 | setDefaultSort, | ||
122 | setDefaultPagination, | ||
123 | asyncMiddleware(listRunnerJobs) | ||
124 | ) | ||
125 | |||
126 | // --------------------------------------------------------------------------- | ||
127 | |||
128 | export { | ||
129 | runnerJobsRouter | ||
130 | } | ||
131 | |||
132 | // --------------------------------------------------------------------------- | ||
133 | |||
134 | // --------------------------------------------------------------------------- | ||
135 | // Controllers for runners | ||
136 | // --------------------------------------------------------------------------- | ||
137 | |||
138 | async function requestRunnerJob (req: express.Request, res: express.Response) { | ||
139 | const runner = res.locals.runner | ||
140 | const availableJobs = await RunnerJobModel.listAvailableJobs() | ||
141 | |||
142 | logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) | ||
143 | |||
144 | const result: RequestRunnerJobResult = { | ||
145 | availableJobs: availableJobs.map(j => ({ | ||
146 | uuid: j.uuid, | ||
147 | type: j.type, | ||
148 | payload: j.payload | ||
149 | })) | ||
150 | } | ||
151 | |||
152 | updateLastRunnerContact(req, runner) | ||
153 | |||
154 | return res.json(result) | ||
155 | } | ||
156 | |||
157 | async function acceptRunnerJob (req: express.Request, res: express.Response) { | ||
158 | const runner = res.locals.runner | ||
159 | const runnerJob = res.locals.runnerJob | ||
160 | |||
161 | runnerJob.state = RunnerJobState.PROCESSING | ||
162 | runnerJob.processingJobToken = generateRunnerJobToken() | ||
163 | runnerJob.startedAt = new Date() | ||
164 | runnerJob.runnerId = runner.id | ||
165 | |||
166 | const newRunnerJob = await sequelizeTypescript.transaction(transaction => { | ||
167 | return runnerJob.save({ transaction }) | ||
168 | }) | ||
169 | newRunnerJob.Runner = runner as RunnerModel | ||
170 | |||
171 | const result: AcceptRunnerJobResult = { | ||
172 | job: { | ||
173 | ...newRunnerJob.toFormattedJSON(), | ||
174 | |||
175 | jobToken: newRunnerJob.processingJobToken | ||
176 | } | ||
177 | } | ||
178 | |||
179 | updateLastRunnerContact(req, runner) | ||
180 | |||
181 | logger.info( | ||
182 | 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | ||
183 | lTags(runner.name, runnerJob.uuid, runnerJob.type) | ||
184 | ) | ||
185 | |||
186 | return res.json(result) | ||
187 | } | ||
188 | |||
189 | async function abortRunnerJob (req: express.Request, res: express.Response) { | ||
190 | const runnerJob = res.locals.runnerJob | ||
191 | const runner = runnerJob.Runner | ||
192 | const body: AbortRunnerJobBody = req.body | ||
193 | |||
194 | logger.info( | ||
195 | 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | ||
196 | { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
197 | ) | ||
198 | |||
199 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
200 | await new RunnerJobHandler().abort({ runnerJob }) | ||
201 | |||
202 | updateLastRunnerContact(req, runnerJob.Runner) | ||
203 | |||
204 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
205 | } | ||
206 | |||
207 | async function errorRunnerJob (req: express.Request, res: express.Response) { | ||
208 | const runnerJob = res.locals.runnerJob | ||
209 | const runner = runnerJob.Runner | ||
210 | const body: ErrorRunnerJobBody = req.body | ||
211 | |||
212 | runnerJob.failures += 1 | ||
213 | |||
214 | logger.error( | ||
215 | 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | ||
216 | { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
217 | ) | ||
218 | |||
219 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
220 | await new RunnerJobHandler().error({ runnerJob, message: body.message }) | ||
221 | |||
222 | updateLastRunnerContact(req, runnerJob.Runner) | ||
223 | |||
224 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
225 | } | ||
226 | |||
227 | // --------------------------------------------------------------------------- | ||
228 | |||
229 | const jobUpdateBuilders: { | ||
230 | [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload | ||
231 | } = { | ||
232 | 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => { | ||
233 | return { | ||
234 | ...payload, | ||
235 | |||
236 | masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path, | ||
237 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path, | ||
238 | videoChunkFile: files['payload[videoChunkFile]']?.[0].path | ||
239 | } | ||
240 | } | ||
241 | } | ||
242 | |||
243 | async function updateRunnerJobController (req: express.Request, res: express.Response) { | ||
244 | const runnerJob = res.locals.runnerJob | ||
245 | const runner = runnerJob.Runner | ||
246 | const body: RunnerJobUpdateBody = req.body | ||
247 | |||
248 | const payloadBuilder = jobUpdateBuilders[runnerJob.type] | ||
249 | const updatePayload = payloadBuilder | ||
250 | ? payloadBuilder(body.payload, req.files as UploadFiles) | ||
251 | : undefined | ||
252 | |||
253 | logger.debug( | ||
254 | 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | ||
255 | { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
256 | ) | ||
257 | |||
258 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
259 | await new RunnerJobHandler().update({ | ||
260 | runnerJob, | ||
261 | progress: req.body.progress, | ||
262 | updatePayload | ||
263 | }) | ||
264 | |||
265 | updateLastRunnerContact(req, runnerJob.Runner) | ||
266 | |||
267 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
268 | } | ||
269 | |||
270 | // --------------------------------------------------------------------------- | ||
271 | |||
272 | const jobSuccessPayloadBuilders: { | ||
273 | [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload | ||
274 | } = { | ||
275 | 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => { | ||
276 | return { | ||
277 | ...payload, | ||
278 | |||
279 | videoFile: files['payload[videoFile]'][0].path | ||
280 | } | ||
281 | }, | ||
282 | |||
283 | 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => { | ||
284 | return { | ||
285 | ...payload, | ||
286 | |||
287 | videoFile: files['payload[videoFile]'][0].path, | ||
288 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path | ||
289 | } | ||
290 | }, | ||
291 | |||
292 | 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => { | ||
293 | return { | ||
294 | ...payload, | ||
295 | |||
296 | videoFile: files['payload[videoFile]'][0].path | ||
297 | } | ||
298 | }, | ||
299 | |||
300 | 'live-rtmp-hls-transcoding': () => ({}) | ||
301 | } | ||
302 | |||
303 | async function postRunnerJobSuccess (req: express.Request, res: express.Response) { | ||
304 | const runnerJob = res.locals.runnerJob | ||
305 | const runner = runnerJob.Runner | ||
306 | const body: RunnerJobSuccessBody = req.body | ||
307 | |||
308 | const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles) | ||
309 | |||
310 | logger.info( | ||
311 | 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | ||
312 | { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
313 | ) | ||
314 | |||
315 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
316 | await new RunnerJobHandler().complete({ runnerJob, resultPayload }) | ||
317 | |||
318 | updateLastRunnerContact(req, runnerJob.Runner) | ||
319 | |||
320 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
321 | } | ||
322 | |||
323 | // --------------------------------------------------------------------------- | ||
324 | // Controllers for admins | ||
325 | // --------------------------------------------------------------------------- | ||
326 | |||
327 | async function cancelRunnerJob (req: express.Request, res: express.Response) { | ||
328 | const runnerJob = res.locals.runnerJob | ||
329 | |||
330 | logger.info('Cancelling job %s (%s)', runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) | ||
331 | |||
332 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
333 | await new RunnerJobHandler().cancel({ runnerJob }) | ||
334 | |||
335 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
336 | } | ||
337 | |||
338 | async function listRunnerJobs (req: express.Request, res: express.Response) { | ||
339 | const query: ListRunnerJobsQuery = req.query | ||
340 | |||
341 | const resultList = await RunnerJobModel.listForApi({ | ||
342 | start: query.start, | ||
343 | count: query.count, | ||
344 | sort: query.sort, | ||
345 | search: query.search | ||
346 | }) | ||
347 | |||
348 | return res.json({ | ||
349 | total: resultList.total, | ||
350 | data: resultList.data.map(d => d.toFormattedAdminJSON()) | ||
351 | }) | ||
352 | } | ||