aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/controllers/api/runners/jobs.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/controllers/api/runners/jobs.ts')
-rw-r--r--server/controllers/api/runners/jobs.ts352
1 files changed, 352 insertions, 0 deletions
diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts
new file mode 100644
index 000000000..7d488ec11
--- /dev/null
+++ b/server/controllers/api/runners/jobs.ts
@@ -0,0 +1,352 @@
1import express, { UploadFiles } from 'express'
2import { createReqFiles } from '@server/helpers/express-utils'
3import { logger, loggerTagsFactory } from '@server/helpers/logger'
4import { generateRunnerJobToken } from '@server/helpers/token-generator'
5import { MIMETYPES } from '@server/initializers/constants'
6import { sequelizeTypescript } from '@server/initializers/database'
7import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners'
8import {
9 asyncMiddleware,
10 authenticate,
11 ensureUserHasRight,
12 paginationValidator,
13 runnerJobsSortValidator,
14 setDefaultPagination,
15 setDefaultSort
16} from '@server/middlewares'
17import {
18 abortRunnerJobValidator,
19 acceptRunnerJobValidator,
20 errorRunnerJobValidator,
21 getRunnerFromTokenValidator,
22 jobOfRunnerGetValidator,
23 runnerJobGetValidator,
24 successRunnerJobValidator,
25 updateRunnerJobValidator
26} from '@server/middlewares/validators/runners'
27import { RunnerModel } from '@server/models/runner/runner'
28import { RunnerJobModel } from '@server/models/runner/runner-job'
29import {
30 AbortRunnerJobBody,
31 AcceptRunnerJobResult,
32 ErrorRunnerJobBody,
33 HttpStatusCode,
34 ListRunnerJobsQuery,
35 LiveRTMPHLSTranscodingUpdatePayload,
36 RequestRunnerJobResult,
37 RunnerJobState,
38 RunnerJobSuccessBody,
39 RunnerJobSuccessPayload,
40 RunnerJobType,
41 RunnerJobUpdateBody,
42 RunnerJobUpdatePayload,
43 UserRight,
44 VODAudioMergeTranscodingSuccess,
45 VODHLSTranscodingSuccess,
46 VODWebVideoTranscodingSuccess
47} from '@shared/models'
48
49const postRunnerJobSuccessVideoFiles = createReqFiles(
50 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
51 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
52)
53
54const runnerJobUpdateVideoFiles = createReqFiles(
55 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
56 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
57)
58
59const lTags = loggerTagsFactory('api', 'runner')
60
61const runnerJobsRouter = express.Router()
62
63// ---------------------------------------------------------------------------
64// Controllers for runners
65// ---------------------------------------------------------------------------
66
67runnerJobsRouter.post('/jobs/request',
68 asyncMiddleware(getRunnerFromTokenValidator),
69 asyncMiddleware(requestRunnerJob)
70)
71
72runnerJobsRouter.post('/jobs/:jobUUID/accept',
73 asyncMiddleware(runnerJobGetValidator),
74 acceptRunnerJobValidator,
75 asyncMiddleware(getRunnerFromTokenValidator),
76 asyncMiddleware(acceptRunnerJob)
77)
78
79runnerJobsRouter.post('/jobs/:jobUUID/abort',
80 asyncMiddleware(jobOfRunnerGetValidator),
81 abortRunnerJobValidator,
82 asyncMiddleware(abortRunnerJob)
83)
84
85runnerJobsRouter.post('/jobs/:jobUUID/update',
86 runnerJobUpdateVideoFiles,
87 asyncMiddleware(jobOfRunnerGetValidator),
88 updateRunnerJobValidator,
89 asyncMiddleware(updateRunnerJobController)
90)
91
92runnerJobsRouter.post('/jobs/:jobUUID/error',
93 asyncMiddleware(jobOfRunnerGetValidator),
94 errorRunnerJobValidator,
95 asyncMiddleware(errorRunnerJob)
96)
97
98runnerJobsRouter.post('/jobs/:jobUUID/success',
99 postRunnerJobSuccessVideoFiles,
100 asyncMiddleware(jobOfRunnerGetValidator),
101 successRunnerJobValidator,
102 asyncMiddleware(postRunnerJobSuccess)
103)
104
105// ---------------------------------------------------------------------------
106// Controllers for admins
107// ---------------------------------------------------------------------------
108
109runnerJobsRouter.post('/jobs/:jobUUID/cancel',
110 authenticate,
111 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
112 asyncMiddleware(runnerJobGetValidator),
113 asyncMiddleware(cancelRunnerJob)
114)
115
116runnerJobsRouter.get('/jobs',
117 authenticate,
118 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
119 paginationValidator,
120 runnerJobsSortValidator,
121 setDefaultSort,
122 setDefaultPagination,
123 asyncMiddleware(listRunnerJobs)
124)
125
126// ---------------------------------------------------------------------------
127
128export {
129 runnerJobsRouter
130}
131
132// ---------------------------------------------------------------------------
133
134// ---------------------------------------------------------------------------
135// Controllers for runners
136// ---------------------------------------------------------------------------
137
138async function requestRunnerJob (req: express.Request, res: express.Response) {
139 const runner = res.locals.runner
140 const availableJobs = await RunnerJobModel.listAvailableJobs()
141
142 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
143
144 const result: RequestRunnerJobResult = {
145 availableJobs: availableJobs.map(j => ({
146 uuid: j.uuid,
147 type: j.type,
148 payload: j.payload
149 }))
150 }
151
152 updateLastRunnerContact(req, runner)
153
154 return res.json(result)
155}
156
157async function acceptRunnerJob (req: express.Request, res: express.Response) {
158 const runner = res.locals.runner
159 const runnerJob = res.locals.runnerJob
160
161 runnerJob.state = RunnerJobState.PROCESSING
162 runnerJob.processingJobToken = generateRunnerJobToken()
163 runnerJob.startedAt = new Date()
164 runnerJob.runnerId = runner.id
165
166 const newRunnerJob = await sequelizeTypescript.transaction(transaction => {
167 return runnerJob.save({ transaction })
168 })
169 newRunnerJob.Runner = runner as RunnerModel
170
171 const result: AcceptRunnerJobResult = {
172 job: {
173 ...newRunnerJob.toFormattedJSON(),
174
175 jobToken: newRunnerJob.processingJobToken
176 }
177 }
178
179 updateLastRunnerContact(req, runner)
180
181 logger.info(
182 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
183 lTags(runner.name, runnerJob.uuid, runnerJob.type)
184 )
185
186 return res.json(result)
187}
188
189async function abortRunnerJob (req: express.Request, res: express.Response) {
190 const runnerJob = res.locals.runnerJob
191 const runner = runnerJob.Runner
192 const body: AbortRunnerJobBody = req.body
193
194 logger.info(
195 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
196 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
197 )
198
199 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
200 await new RunnerJobHandler().abort({ runnerJob })
201
202 updateLastRunnerContact(req, runnerJob.Runner)
203
204 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
205}
206
207async function errorRunnerJob (req: express.Request, res: express.Response) {
208 const runnerJob = res.locals.runnerJob
209 const runner = runnerJob.Runner
210 const body: ErrorRunnerJobBody = req.body
211
212 runnerJob.failures += 1
213
214 logger.error(
215 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
216 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
217 )
218
219 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
220 await new RunnerJobHandler().error({ runnerJob, message: body.message })
221
222 updateLastRunnerContact(req, runnerJob.Runner)
223
224 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
225}
226
227// ---------------------------------------------------------------------------
228
229const jobUpdateBuilders: {
230 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
231} = {
232 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
233 return {
234 ...payload,
235
236 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
237 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
238 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
239 }
240 }
241}
242
243async function updateRunnerJobController (req: express.Request, res: express.Response) {
244 const runnerJob = res.locals.runnerJob
245 const runner = runnerJob.Runner
246 const body: RunnerJobUpdateBody = req.body
247
248 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
249 const updatePayload = payloadBuilder
250 ? payloadBuilder(body.payload, req.files as UploadFiles)
251 : undefined
252
253 logger.debug(
254 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
255 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
256 )
257
258 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
259 await new RunnerJobHandler().update({
260 runnerJob,
261 progress: req.body.progress,
262 updatePayload
263 })
264
265 updateLastRunnerContact(req, runnerJob.Runner)
266
267 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
268}
269
270// ---------------------------------------------------------------------------
271
272const jobSuccessPayloadBuilders: {
273 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
274} = {
275 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
276 return {
277 ...payload,
278
279 videoFile: files['payload[videoFile]'][0].path
280 }
281 },
282
283 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
284 return {
285 ...payload,
286
287 videoFile: files['payload[videoFile]'][0].path,
288 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
289 }
290 },
291
292 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
293 return {
294 ...payload,
295
296 videoFile: files['payload[videoFile]'][0].path
297 }
298 },
299
300 'live-rtmp-hls-transcoding': () => ({})
301}
302
303async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
304 const runnerJob = res.locals.runnerJob
305 const runner = runnerJob.Runner
306 const body: RunnerJobSuccessBody = req.body
307
308 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
309
310 logger.info(
311 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
312 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
313 )
314
315 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
316 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
317
318 updateLastRunnerContact(req, runnerJob.Runner)
319
320 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
321}
322
323// ---------------------------------------------------------------------------
324// Controllers for admins
325// ---------------------------------------------------------------------------
326
327async function cancelRunnerJob (req: express.Request, res: express.Response) {
328 const runnerJob = res.locals.runnerJob
329
330 logger.info('Cancelling job %s (%s)', runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
331
332 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
333 await new RunnerJobHandler().cancel({ runnerJob })
334
335 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
336}
337
338async function listRunnerJobs (req: express.Request, res: express.Response) {
339 const query: ListRunnerJobsQuery = req.query
340
341 const resultList = await RunnerJobModel.listForApi({
342 start: query.start,
343 count: query.count,
344 sort: query.sort,
345 search: query.search
346 })
347
348 return res.json({
349 total: resultList.total,
350 data: resultList.data.map(d => d.toFormattedAdminJSON())
351 })
352}