aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/controllers/api/runners/jobs.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/controllers/api/runners/jobs.ts')
-rw-r--r--server/controllers/api/runners/jobs.ts416
1 files changed, 0 insertions, 416 deletions
diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts
deleted file mode 100644
index e9e2ddf49..000000000
--- a/server/controllers/api/runners/jobs.ts
+++ /dev/null
@@ -1,416 +0,0 @@
1import express, { UploadFiles } from 'express'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { createReqFiles } from '@server/helpers/express-utils'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { generateRunnerJobToken } from '@server/helpers/token-generator'
6import { MIMETYPES } from '@server/initializers/constants'
7import { sequelizeTypescript } from '@server/initializers/database'
8import { getRunnerJobHandlerClass, runnerJobCanBeCancelled, updateLastRunnerContact } from '@server/lib/runners'
9import {
10 apiRateLimiter,
11 asyncMiddleware,
12 authenticate,
13 ensureUserHasRight,
14 paginationValidator,
15 runnerJobsSortValidator,
16 setDefaultPagination,
17 setDefaultSort
18} from '@server/middlewares'
19import {
20 abortRunnerJobValidator,
21 acceptRunnerJobValidator,
22 cancelRunnerJobValidator,
23 errorRunnerJobValidator,
24 getRunnerFromTokenValidator,
25 jobOfRunnerGetValidatorFactory,
26 listRunnerJobsValidator,
27 runnerJobGetValidator,
28 successRunnerJobValidator,
29 updateRunnerJobValidator
30} from '@server/middlewares/validators/runners'
31import { RunnerModel } from '@server/models/runner/runner'
32import { RunnerJobModel } from '@server/models/runner/runner-job'
33import {
34 AbortRunnerJobBody,
35 AcceptRunnerJobResult,
36 ErrorRunnerJobBody,
37 HttpStatusCode,
38 ListRunnerJobsQuery,
39 LiveRTMPHLSTranscodingUpdatePayload,
40 RequestRunnerJobResult,
41 RunnerJobState,
42 RunnerJobSuccessBody,
43 RunnerJobSuccessPayload,
44 RunnerJobType,
45 RunnerJobUpdateBody,
46 RunnerJobUpdatePayload,
47 ServerErrorCode,
48 UserRight,
49 VideoStudioTranscodingSuccess,
50 VODAudioMergeTranscodingSuccess,
51 VODHLSTranscodingSuccess,
52 VODWebVideoTranscodingSuccess
53} from '@shared/models'
54
55const postRunnerJobSuccessVideoFiles = createReqFiles(
56 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
57 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
58)
59
60const runnerJobUpdateVideoFiles = createReqFiles(
61 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
62 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
63)
64
65const lTags = loggerTagsFactory('api', 'runner')
66
67const runnerJobsRouter = express.Router()
68
69// ---------------------------------------------------------------------------
70// Controllers for runners
71// ---------------------------------------------------------------------------
72
73runnerJobsRouter.post('/jobs/request',
74 apiRateLimiter,
75 asyncMiddleware(getRunnerFromTokenValidator),
76 asyncMiddleware(requestRunnerJob)
77)
78
79runnerJobsRouter.post('/jobs/:jobUUID/accept',
80 apiRateLimiter,
81 asyncMiddleware(runnerJobGetValidator),
82 acceptRunnerJobValidator,
83 asyncMiddleware(getRunnerFromTokenValidator),
84 asyncMiddleware(acceptRunnerJob)
85)
86
87runnerJobsRouter.post('/jobs/:jobUUID/abort',
88 apiRateLimiter,
89 asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING ])),
90 abortRunnerJobValidator,
91 asyncMiddleware(abortRunnerJob)
92)
93
94runnerJobsRouter.post('/jobs/:jobUUID/update',
95 runnerJobUpdateVideoFiles,
96 apiRateLimiter, // Has to be after multer middleware to parse runner token
97 asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING, RunnerJobState.COMPLETING, RunnerJobState.COMPLETED ])),
98 updateRunnerJobValidator,
99 asyncMiddleware(updateRunnerJobController)
100)
101
102runnerJobsRouter.post('/jobs/:jobUUID/error',
103 asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING ])),
104 errorRunnerJobValidator,
105 asyncMiddleware(errorRunnerJob)
106)
107
108runnerJobsRouter.post('/jobs/:jobUUID/success',
109 postRunnerJobSuccessVideoFiles,
110 apiRateLimiter, // Has to be after multer middleware to parse runner token
111 asyncMiddleware(jobOfRunnerGetValidatorFactory([ RunnerJobState.PROCESSING ])),
112 successRunnerJobValidator,
113 asyncMiddleware(postRunnerJobSuccess)
114)
115
116// ---------------------------------------------------------------------------
117// Controllers for admins
118// ---------------------------------------------------------------------------
119
120runnerJobsRouter.post('/jobs/:jobUUID/cancel',
121 authenticate,
122 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
123 asyncMiddleware(runnerJobGetValidator),
124 cancelRunnerJobValidator,
125 asyncMiddleware(cancelRunnerJob)
126)
127
128runnerJobsRouter.get('/jobs',
129 authenticate,
130 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
131 paginationValidator,
132 runnerJobsSortValidator,
133 setDefaultSort,
134 setDefaultPagination,
135 listRunnerJobsValidator,
136 asyncMiddleware(listRunnerJobs)
137)
138
139runnerJobsRouter.delete('/jobs/:jobUUID',
140 authenticate,
141 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
142 asyncMiddleware(runnerJobGetValidator),
143 asyncMiddleware(deleteRunnerJob)
144)
145
146// ---------------------------------------------------------------------------
147
148export {
149 runnerJobsRouter
150}
151
152// ---------------------------------------------------------------------------
153
154// ---------------------------------------------------------------------------
155// Controllers for runners
156// ---------------------------------------------------------------------------
157
158async function requestRunnerJob (req: express.Request, res: express.Response) {
159 const runner = res.locals.runner
160 const availableJobs = await RunnerJobModel.listAvailableJobs()
161
162 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
163
164 const result: RequestRunnerJobResult = {
165 availableJobs: availableJobs.map(j => ({
166 uuid: j.uuid,
167 type: j.type,
168 payload: j.payload
169 }))
170 }
171
172 updateLastRunnerContact(req, runner)
173
174 return res.json(result)
175}
176
177async function acceptRunnerJob (req: express.Request, res: express.Response) {
178 const runner = res.locals.runner
179 const runnerJob = res.locals.runnerJob
180
181 const newRunnerJob = await retryTransactionWrapper(() => {
182 return sequelizeTypescript.transaction(async transaction => {
183 await runnerJob.reload({ transaction })
184
185 if (runnerJob.state !== RunnerJobState.PENDING) {
186 res.fail({
187 type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
188 message: 'This job is not in pending state anymore',
189 status: HttpStatusCode.CONFLICT_409
190 })
191
192 return undefined
193 }
194
195 runnerJob.state = RunnerJobState.PROCESSING
196 runnerJob.processingJobToken = generateRunnerJobToken()
197 runnerJob.startedAt = new Date()
198 runnerJob.runnerId = runner.id
199
200 return runnerJob.save({ transaction })
201 })
202 })
203 if (!newRunnerJob) return
204
205 newRunnerJob.Runner = runner as RunnerModel
206
207 const result: AcceptRunnerJobResult = {
208 job: {
209 ...newRunnerJob.toFormattedJSON(),
210
211 jobToken: newRunnerJob.processingJobToken
212 }
213 }
214
215 updateLastRunnerContact(req, runner)
216
217 logger.info(
218 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
219 lTags(runner.name, runnerJob.uuid, runnerJob.type)
220 )
221
222 return res.json(result)
223}
224
225async function abortRunnerJob (req: express.Request, res: express.Response) {
226 const runnerJob = res.locals.runnerJob
227 const runner = runnerJob.Runner
228 const body: AbortRunnerJobBody = req.body
229
230 logger.info(
231 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
232 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
233 )
234
235 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
236 await new RunnerJobHandler().abort({ runnerJob })
237
238 updateLastRunnerContact(req, runnerJob.Runner)
239
240 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
241}
242
243async function errorRunnerJob (req: express.Request, res: express.Response) {
244 const runnerJob = res.locals.runnerJob
245 const runner = runnerJob.Runner
246 const body: ErrorRunnerJobBody = req.body
247
248 runnerJob.failures += 1
249
250 logger.error(
251 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
252 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
253 )
254
255 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
256 await new RunnerJobHandler().error({ runnerJob, message: body.message })
257
258 updateLastRunnerContact(req, runnerJob.Runner)
259
260 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
261}
262
263// ---------------------------------------------------------------------------
264
265const jobUpdateBuilders: {
266 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
267} = {
268 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
269 return {
270 ...payload,
271
272 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
273 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
274 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
275 }
276 }
277}
278
279async function updateRunnerJobController (req: express.Request, res: express.Response) {
280 const runnerJob = res.locals.runnerJob
281 const runner = runnerJob.Runner
282 const body: RunnerJobUpdateBody = req.body
283
284 if (runnerJob.state === RunnerJobState.COMPLETING || runnerJob.state === RunnerJobState.COMPLETED) {
285 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
286 }
287
288 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
289 const updatePayload = payloadBuilder
290 ? payloadBuilder(body.payload, req.files as UploadFiles)
291 : undefined
292
293 logger.debug(
294 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
295 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
296 )
297
298 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
299 await new RunnerJobHandler().update({
300 runnerJob,
301 progress: req.body.progress,
302 updatePayload
303 })
304
305 updateLastRunnerContact(req, runnerJob.Runner)
306
307 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
308}
309
310// ---------------------------------------------------------------------------
311
312const jobSuccessPayloadBuilders: {
313 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
314} = {
315 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
316 return {
317 ...payload,
318
319 videoFile: files['payload[videoFile]'][0].path
320 }
321 },
322
323 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
324 return {
325 ...payload,
326
327 videoFile: files['payload[videoFile]'][0].path,
328 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
329 }
330 },
331
332 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
333 return {
334 ...payload,
335
336 videoFile: files['payload[videoFile]'][0].path
337 }
338 },
339
340 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => {
341 return {
342 ...payload,
343
344 videoFile: files['payload[videoFile]'][0].path
345 }
346 },
347
348 'live-rtmp-hls-transcoding': () => ({})
349}
350
351async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
352 const runnerJob = res.locals.runnerJob
353 const runner = runnerJob.Runner
354 const body: RunnerJobSuccessBody = req.body
355
356 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
357
358 logger.info(
359 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
360 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
361 )
362
363 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
364 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
365
366 updateLastRunnerContact(req, runnerJob.Runner)
367
368 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
369}
370
371// ---------------------------------------------------------------------------
372// Controllers for admins
373// ---------------------------------------------------------------------------
374
375async function cancelRunnerJob (req: express.Request, res: express.Response) {
376 const runnerJob = res.locals.runnerJob
377
378 logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
379
380 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
381 await new RunnerJobHandler().cancel({ runnerJob })
382
383 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
384}
385
386async function deleteRunnerJob (req: express.Request, res: express.Response) {
387 const runnerJob = res.locals.runnerJob
388
389 logger.info('Deleting job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
390
391 if (runnerJobCanBeCancelled(runnerJob)) {
392 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
393 await new RunnerJobHandler().cancel({ runnerJob })
394 }
395
396 await runnerJob.destroy()
397
398 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
399}
400
401async function listRunnerJobs (req: express.Request, res: express.Response) {
402 const query: ListRunnerJobsQuery = req.query
403
404 const resultList = await RunnerJobModel.listForApi({
405 start: query.start,
406 count: query.count,
407 sort: query.sort,
408 search: query.search,
409 stateOneOf: query.stateOneOf
410 })
411
412 return res.json({
413 total: resultList.total,
414 data: resultList.data.map(d => d.toFormattedAdminJSON())
415 })
416}