diff options
Diffstat (limited to 'server/controllers/api/runners/jobs.ts')
-rw-r--r-- | server/controllers/api/runners/jobs.ts | 416 |
1 files changed, 0 insertions, 416 deletions
diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts deleted file mode 100644 index e9e2ddf49..000000000 --- a/server/controllers/api/runners/jobs.ts +++ /dev/null | |||
@@ -1,416 +0,0 @@ | |||
1 | import express, { UploadFiles } from 'express' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { createReqFiles } from '@server/helpers/express-utils' | ||
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
5 | import { generateRunnerJobToken } from '@server/helpers/token-generator' | ||
6 | import { MIMETYPES } from '@server/initializers/constants' | ||
7 | import { sequelizeTypescript } from '@server/initializers/database' | ||
8 | import { getRunnerJobHandlerClass, runnerJobCanBeCancelled, updateLastRunnerContact } from '@server/lib/runners' | ||
9 | import { | ||
10 | apiRateLimiter, | ||
11 | asyncMiddleware, | ||
12 | authenticate, | ||
13 | ensureUserHasRight, | ||
14 | paginationValidator, | ||
15 | runnerJobsSortValidator, | ||
16 | setDefaultPagination, | ||
17 | setDefaultSort | ||
18 | } from '@server/middlewares' | ||
19 | import { | ||
20 | abortRunnerJobValidator, | ||
21 | acceptRunnerJobValidator, | ||
22 | cancelRunnerJobValidator, | ||
23 | errorRunnerJobValidator, | ||
24 | getRunnerFromTokenValidator, | ||
25 | jobOfRunnerGetValidatorFactory, | ||
26 | listRunnerJobsValidator, | ||
27 | runnerJobGetValidator, | ||
28 | successRunnerJobValidator, | ||
29 | updateRunnerJobValidator | ||
30 | } from '@server/middlewares/validators/runners' | ||
31 | import { RunnerModel } from '@server/models/runner/runner' | ||
32 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
33 | import { | ||
34 | AbortRunnerJobBody, | ||
35 | AcceptRunnerJobResult, | ||
36 | ErrorRunnerJobBody, | ||
37 | HttpStatusCode, | ||
38 | ListRunnerJobsQuery, | ||
39 | LiveRTMPHLSTranscodingUpdatePayload, | ||
40 | RequestRunnerJobResult, | ||
41 | RunnerJobState, | ||
42 | RunnerJobSuccessBody, | ||
43 | RunnerJobSuccessPayload, | ||
44 | RunnerJobType, | ||
45 | RunnerJobUpdateBody, | ||
46 | RunnerJobUpdatePayload, | ||
47 | ServerErrorCode, | ||
48 | UserRight, | ||
49 | VideoStudioTranscodingSuccess, | ||
50 | VODAudioMergeTranscodingSuccess, | ||
51 | VODHLSTranscodingSuccess, | ||
52 | VODWebVideoTranscodingSuccess | ||
53 | } from '@shared/models' | ||
54 | |||
55 | const postRunnerJobSuccessVideoFiles = createReqFiles( | ||
56 | [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ], | ||
57 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | ||
58 | ) | ||
59 | |||
60 | const runnerJobUpdateVideoFiles = createReqFiles( | ||
61 | [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ], | ||
62 | { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } | ||
63 | ) | ||
64 | |||
65 | const lTags = loggerTagsFactory('api', 'runner') | ||
66 | |||
67 | const runnerJobsRouter = express.Router() | ||
68 | |||
69 | // --------------------------------------------------------------------------- | ||
70 | // Controllers for runners | ||
71 | // --------------------------------------------------------------------------- | ||
72 | |||
73 | runnerJobsRouter.post('/jobs/request', | ||
74 | apiRateLimiter, | ||
75 | asyncMiddleware(getRunnerFromTokenValidator), | ||
76 | asyncMiddleware(requestRunnerJob) | ||
77 | ) | ||
78 | |||
79 | runnerJobsRouter.post('/jobs/:jobUUID/accept', | ||
80 | apiRateLimiter, | ||
81 | asyncMiddleware(runnerJobGetValidator), | ||
82 | acceptRunnerJobValidator, | ||
83 | asyncMiddleware(getRunnerFromTokenValidator), | ||
84 | asyncMiddleware(acceptRunnerJob) | ||
85 | ) | ||
86 | |||
87 | runnerJobsRouter.post('/jobs/:jobUUID/abort', | ||
88 | apiRateLimiter, | ||
89 | asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING ])), | ||
90 | abortRunnerJobValidator, | ||
91 | asyncMiddleware(abortRunnerJob) | ||
92 | ) | ||
93 | |||
94 | runnerJobsRouter.post('/jobs/:jobUUID/update', | ||
95 | runnerJobUpdateVideoFiles, | ||
96 | apiRateLimiter, // Has to be after multer middleware to parse runner token | ||
97 | asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING, RunnerJobState.COMPLETING, RunnerJobState.COMPLETED ])), | ||
98 | updateRunnerJobValidator, | ||
99 | asyncMiddleware(updateRunnerJobController) | ||
100 | ) | ||
101 | |||
102 | runnerJobsRouter.post('/jobs/:jobUUID/error', | ||
103 | asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING ])), | ||
104 | errorRunnerJobValidator, | ||
105 | asyncMiddleware(errorRunnerJob) | ||
106 | ) | ||
107 | |||
108 | runnerJobsRouter.post('/jobs/:jobUUID/success', | ||
109 | postRunnerJobSuccessVideoFiles, | ||
110 | apiRateLimiter, // Has to be after multer middleware to parse runner token | ||
111 | asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING ])), | ||
112 | successRunnerJobValidator, | ||
113 | asyncMiddleware(postRunnerJobSuccess) | ||
114 | ) | ||
115 | |||
116 | // --------------------------------------------------------------------------- | ||
117 | // Controllers for admins | ||
118 | // --------------------------------------------------------------------------- | ||
119 | |||
120 | runnerJobsRouter.post('/jobs/:jobUUID/cancel', | ||
121 | authenticate, | ||
122 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | ||
123 | asyncMiddleware(runnerJobGetValidator), | ||
124 | cancelRunnerJobValidator, | ||
125 | asyncMiddleware(cancelRunnerJob) | ||
126 | ) | ||
127 | |||
128 | runnerJobsRouter.get('/jobs', | ||
129 | authenticate, | ||
130 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | ||
131 | paginationValidator, | ||
132 | runnerJobsSortValidator, | ||
133 | setDefaultSort, | ||
134 | setDefaultPagination, | ||
135 | listRunnerJobsValidator, | ||
136 | asyncMiddleware(listRunnerJobs) | ||
137 | ) | ||
138 | |||
139 | runnerJobsRouter.delete('/jobs/:jobUUID', | ||
140 | authenticate, | ||
141 | ensureUserHasRight(UserRight.MANAGE_RUNNERS), | ||
142 | asyncMiddleware(runnerJobGetValidator), | ||
143 | asyncMiddleware(deleteRunnerJob) | ||
144 | ) | ||
145 | |||
146 | // --------------------------------------------------------------------------- | ||
147 | |||
148 | export { | ||
149 | runnerJobsRouter | ||
150 | } | ||
151 | |||
152 | // --------------------------------------------------------------------------- | ||
153 | |||
154 | // --------------------------------------------------------------------------- | ||
155 | // Controllers for runners | ||
156 | // --------------------------------------------------------------------------- | ||
157 | |||
158 | async function requestRunnerJob (req: express.Request, res: express.Response) { | ||
159 | const runner = res.locals.runner | ||
160 | const availableJobs = await RunnerJobModel.listAvailableJobs() | ||
161 | |||
162 | logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) | ||
163 | |||
164 | const result: RequestRunnerJobResult = { | ||
165 | availableJobs: availableJobs.map(j => ({ | ||
166 | uuid: j.uuid, | ||
167 | type: j.type, | ||
168 | payload: j.payload | ||
169 | })) | ||
170 | } | ||
171 | |||
172 | updateLastRunnerContact(req, runner) | ||
173 | |||
174 | return res.json(result) | ||
175 | } | ||
176 | |||
177 | async function acceptRunnerJob (req: express.Request, res: express.Response) { | ||
178 | const runner = res.locals.runner | ||
179 | const runnerJob = res.locals.runnerJob | ||
180 | |||
181 | const newRunnerJob = await retryTransactionWrapper(() => { | ||
182 | return sequelizeTypescript.transaction(async transaction => { | ||
183 | await runnerJob.reload({ transaction }) | ||
184 | |||
185 | if (runnerJob.state !== RunnerJobState.PENDING) { | ||
186 | res.fail({ | ||
187 | type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE, | ||
188 | message: 'This job is not in pending state anymore', | ||
189 | status: HttpStatusCode.CONFLICT_409 | ||
190 | }) | ||
191 | |||
192 | return undefined | ||
193 | } | ||
194 | |||
195 | runnerJob.state = RunnerJobState.PROCESSING | ||
196 | runnerJob.processingJobToken = generateRunnerJobToken() | ||
197 | runnerJob.startedAt = new Date() | ||
198 | runnerJob.runnerId = runner.id | ||
199 | |||
200 | return runnerJob.save({ transaction }) | ||
201 | }) | ||
202 | }) | ||
203 | if (!newRunnerJob) return | ||
204 | |||
205 | newRunnerJob.Runner = runner as RunnerModel | ||
206 | |||
207 | const result: AcceptRunnerJobResult = { | ||
208 | job: { | ||
209 | ...newRunnerJob.toFormattedJSON(), | ||
210 | |||
211 | jobToken: newRunnerJob.processingJobToken | ||
212 | } | ||
213 | } | ||
214 | |||
215 | updateLastRunnerContact(req, runner) | ||
216 | |||
217 | logger.info( | ||
218 | 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | ||
219 | lTags(runner.name, runnerJob.uuid, runnerJob.type) | ||
220 | ) | ||
221 | |||
222 | return res.json(result) | ||
223 | } | ||
224 | |||
225 | async function abortRunnerJob (req: express.Request, res: express.Response) { | ||
226 | const runnerJob = res.locals.runnerJob | ||
227 | const runner = runnerJob.Runner | ||
228 | const body: AbortRunnerJobBody = req.body | ||
229 | |||
230 | logger.info( | ||
231 | 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | ||
232 | { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
233 | ) | ||
234 | |||
235 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
236 | await new RunnerJobHandler().abort({ runnerJob }) | ||
237 | |||
238 | updateLastRunnerContact(req, runnerJob.Runner) | ||
239 | |||
240 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
241 | } | ||
242 | |||
243 | async function errorRunnerJob (req: express.Request, res: express.Response) { | ||
244 | const runnerJob = res.locals.runnerJob | ||
245 | const runner = runnerJob.Runner | ||
246 | const body: ErrorRunnerJobBody = req.body | ||
247 | |||
248 | runnerJob.failures += 1 | ||
249 | |||
250 | logger.error( | ||
251 | 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, | ||
252 | { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
253 | ) | ||
254 | |||
255 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
256 | await new RunnerJobHandler().error({ runnerJob, message: body.message }) | ||
257 | |||
258 | updateLastRunnerContact(req, runnerJob.Runner) | ||
259 | |||
260 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
261 | } | ||
262 | |||
263 | // --------------------------------------------------------------------------- | ||
264 | |||
265 | const jobUpdateBuilders: { | ||
266 | [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload | ||
267 | } = { | ||
268 | 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => { | ||
269 | return { | ||
270 | ...payload, | ||
271 | |||
272 | masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path, | ||
273 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path, | ||
274 | videoChunkFile: files['payload[videoChunkFile]']?.[0].path | ||
275 | } | ||
276 | } | ||
277 | } | ||
278 | |||
279 | async function updateRunnerJobController (req: express.Request, res: express.Response) { | ||
280 | const runnerJob = res.locals.runnerJob | ||
281 | const runner = runnerJob.Runner | ||
282 | const body: RunnerJobUpdateBody = req.body | ||
283 | |||
284 | if (runnerJob.state === RunnerJobState.COMPLETING || runnerJob.state === RunnerJobState.COMPLETED) { | ||
285 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
286 | } | ||
287 | |||
288 | const payloadBuilder = jobUpdateBuilders[runnerJob.type] | ||
289 | const updatePayload = payloadBuilder | ||
290 | ? payloadBuilder(body.payload, req.files as UploadFiles) | ||
291 | : undefined | ||
292 | |||
293 | logger.debug( | ||
294 | 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | ||
295 | { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
296 | ) | ||
297 | |||
298 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
299 | await new RunnerJobHandler().update({ | ||
300 | runnerJob, | ||
301 | progress: req.body.progress, | ||
302 | updatePayload | ||
303 | }) | ||
304 | |||
305 | updateLastRunnerContact(req, runnerJob.Runner) | ||
306 | |||
307 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
308 | } | ||
309 | |||
310 | // --------------------------------------------------------------------------- | ||
311 | |||
312 | const jobSuccessPayloadBuilders: { | ||
313 | [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload | ||
314 | } = { | ||
315 | 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => { | ||
316 | return { | ||
317 | ...payload, | ||
318 | |||
319 | videoFile: files['payload[videoFile]'][0].path | ||
320 | } | ||
321 | }, | ||
322 | |||
323 | 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => { | ||
324 | return { | ||
325 | ...payload, | ||
326 | |||
327 | videoFile: files['payload[videoFile]'][0].path, | ||
328 | resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path | ||
329 | } | ||
330 | }, | ||
331 | |||
332 | 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => { | ||
333 | return { | ||
334 | ...payload, | ||
335 | |||
336 | videoFile: files['payload[videoFile]'][0].path | ||
337 | } | ||
338 | }, | ||
339 | |||
340 | 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => { | ||
341 | return { | ||
342 | ...payload, | ||
343 | |||
344 | videoFile: files['payload[videoFile]'][0].path | ||
345 | } | ||
346 | }, | ||
347 | |||
348 | 'live-rtmp-hls-transcoding': () => ({}) | ||
349 | } | ||
350 | |||
351 | async function postRunnerJobSuccess (req: express.Request, res: express.Response) { | ||
352 | const runnerJob = res.locals.runnerJob | ||
353 | const runner = runnerJob.Runner | ||
354 | const body: RunnerJobSuccessBody = req.body | ||
355 | |||
356 | const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles) | ||
357 | |||
358 | logger.info( | ||
359 | 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, | ||
360 | { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } | ||
361 | ) | ||
362 | |||
363 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
364 | await new RunnerJobHandler().complete({ runnerJob, resultPayload }) | ||
365 | |||
366 | updateLastRunnerContact(req, runnerJob.Runner) | ||
367 | |||
368 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
369 | } | ||
370 | |||
371 | // --------------------------------------------------------------------------- | ||
372 | // Controllers for admins | ||
373 | // --------------------------------------------------------------------------- | ||
374 | |||
375 | async function cancelRunnerJob (req: express.Request, res: express.Response) { | ||
376 | const runnerJob = res.locals.runnerJob | ||
377 | |||
378 | logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) | ||
379 | |||
380 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
381 | await new RunnerJobHandler().cancel({ runnerJob }) | ||
382 | |||
383 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
384 | } | ||
385 | |||
386 | async function deleteRunnerJob (req: express.Request, res: express.Response) { | ||
387 | const runnerJob = res.locals.runnerJob | ||
388 | |||
389 | logger.info('Deleting job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) | ||
390 | |||
391 | if (runnerJobCanBeCancelled(runnerJob)) { | ||
392 | const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) | ||
393 | await new RunnerJobHandler().cancel({ runnerJob }) | ||
394 | } | ||
395 | |||
396 | await runnerJob.destroy() | ||
397 | |||
398 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | ||
399 | } | ||
400 | |||
401 | async function listRunnerJobs (req: express.Request, res: express.Response) { | ||
402 | const query: ListRunnerJobsQuery = req.query | ||
403 | |||
404 | const resultList = await RunnerJobModel.listForApi({ | ||
405 | start: query.start, | ||
406 | count: query.count, | ||
407 | sort: query.sort, | ||
408 | search: query.search, | ||
409 | stateOneOf: query.stateOneOf | ||
410 | }) | ||
411 | |||
412 | return res.json({ | ||
413 | total: resultList.total, | ||
414 | data: resultList.data.map(d => d.toFormattedAdminJSON()) | ||
415 | }) | ||
416 | } | ||