]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/controllers/api/runners/jobs.ts
More robust accept job controller
[github/Chocobozzz/PeerTube.git] / server / controllers / api / runners / jobs.ts
1 import express, { UploadFiles } from 'express'
2 import { retryTransactionWrapper } from '@server/helpers/database-utils'
3 import { createReqFiles } from '@server/helpers/express-utils'
4 import { logger, loggerTagsFactory } from '@server/helpers/logger'
5 import { generateRunnerJobToken } from '@server/helpers/token-generator'
6 import { MIMETYPES } from '@server/initializers/constants'
7 import { sequelizeTypescript } from '@server/initializers/database'
8 import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners'
9 import {
10 asyncMiddleware,
11 authenticate,
12 ensureUserHasRight,
13 paginationValidator,
14 runnerJobsSortValidator,
15 setDefaultPagination,
16 setDefaultSort
17 } from '@server/middlewares'
18 import {
19 abortRunnerJobValidator,
20 acceptRunnerJobValidator,
21 cancelRunnerJobValidator,
22 errorRunnerJobValidator,
23 getRunnerFromTokenValidator,
24 jobOfRunnerGetValidator,
25 runnerJobGetValidator,
26 successRunnerJobValidator,
27 updateRunnerJobValidator
28 } from '@server/middlewares/validators/runners'
29 import { RunnerModel } from '@server/models/runner/runner'
30 import { RunnerJobModel } from '@server/models/runner/runner-job'
31 import {
32 AbortRunnerJobBody,
33 AcceptRunnerJobResult,
34 ErrorRunnerJobBody,
35 HttpStatusCode,
36 ListRunnerJobsQuery,
37 LiveRTMPHLSTranscodingUpdatePayload,
38 RequestRunnerJobResult,
39 RunnerJobState,
40 RunnerJobSuccessBody,
41 RunnerJobSuccessPayload,
42 RunnerJobType,
43 RunnerJobUpdateBody,
44 RunnerJobUpdatePayload,
45 UserRight,
46 VideoStudioTranscodingSuccess,
47 VODAudioMergeTranscodingSuccess,
48 VODHLSTranscodingSuccess,
49 VODWebVideoTranscodingSuccess
50 } from '@shared/models'
51
52 const postRunnerJobSuccessVideoFiles = createReqFiles(
53 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
54 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
55 )
56
57 const runnerJobUpdateVideoFiles = createReqFiles(
58 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
59 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
60 )
61
62 const lTags = loggerTagsFactory('api', 'runner')
63
64 const runnerJobsRouter = express.Router()
65
66 // ---------------------------------------------------------------------------
67 // Controllers for runners
68 // ---------------------------------------------------------------------------
69
70 runnerJobsRouter.post('/jobs/request',
71 asyncMiddleware(getRunnerFromTokenValidator),
72 asyncMiddleware(requestRunnerJob)
73 )
74
75 runnerJobsRouter.post('/jobs/:jobUUID/accept',
76 asyncMiddleware(runnerJobGetValidator),
77 acceptRunnerJobValidator,
78 asyncMiddleware(getRunnerFromTokenValidator),
79 asyncMiddleware(acceptRunnerJob)
80 )
81
82 runnerJobsRouter.post('/jobs/:jobUUID/abort',
83 asyncMiddleware(jobOfRunnerGetValidator),
84 abortRunnerJobValidator,
85 asyncMiddleware(abortRunnerJob)
86 )
87
88 runnerJobsRouter.post('/jobs/:jobUUID/update',
89 runnerJobUpdateVideoFiles,
90 asyncMiddleware(jobOfRunnerGetValidator),
91 updateRunnerJobValidator,
92 asyncMiddleware(updateRunnerJobController)
93 )
94
95 runnerJobsRouter.post('/jobs/:jobUUID/error',
96 asyncMiddleware(jobOfRunnerGetValidator),
97 errorRunnerJobValidator,
98 asyncMiddleware(errorRunnerJob)
99 )
100
101 runnerJobsRouter.post('/jobs/:jobUUID/success',
102 postRunnerJobSuccessVideoFiles,
103 asyncMiddleware(jobOfRunnerGetValidator),
104 successRunnerJobValidator,
105 asyncMiddleware(postRunnerJobSuccess)
106 )
107
108 // ---------------------------------------------------------------------------
109 // Controllers for admins
110 // ---------------------------------------------------------------------------
111
112 runnerJobsRouter.post('/jobs/:jobUUID/cancel',
113 authenticate,
114 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
115 asyncMiddleware(runnerJobGetValidator),
116 cancelRunnerJobValidator,
117 asyncMiddleware(cancelRunnerJob)
118 )
119
120 runnerJobsRouter.get('/jobs',
121 authenticate,
122 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
123 paginationValidator,
124 runnerJobsSortValidator,
125 setDefaultSort,
126 setDefaultPagination,
127 asyncMiddleware(listRunnerJobs)
128 )
129
130 // ---------------------------------------------------------------------------
131
132 export {
133 runnerJobsRouter
134 }
135
136 // ---------------------------------------------------------------------------
137
138 // ---------------------------------------------------------------------------
139 // Controllers for runners
140 // ---------------------------------------------------------------------------
141
142 async function requestRunnerJob (req: express.Request, res: express.Response) {
143 const runner = res.locals.runner
144 const availableJobs = await RunnerJobModel.listAvailableJobs()
145
146 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
147
148 const result: RequestRunnerJobResult = {
149 availableJobs: availableJobs.map(j => ({
150 uuid: j.uuid,
151 type: j.type,
152 payload: j.payload
153 }))
154 }
155
156 updateLastRunnerContact(req, runner)
157
158 return res.json(result)
159 }
160
161 async function acceptRunnerJob (req: express.Request, res: express.Response) {
162 const runner = res.locals.runner
163 const runnerJob = res.locals.runnerJob
164
165 const newRunnerJob = await retryTransactionWrapper(() => {
166 return sequelizeTypescript.transaction(async transaction => {
167 await runnerJob.reload({ transaction })
168
169 if (runnerJob.state !== RunnerJobState.PENDING) {
170 res.fail({
171 message: 'This job is not in pending state anymore',
172 status: HttpStatusCode.CONFLICT_409
173 })
174
175 return undefined
176 }
177
178 runnerJob.state = RunnerJobState.PROCESSING
179 runnerJob.processingJobToken = generateRunnerJobToken()
180 runnerJob.startedAt = new Date()
181 runnerJob.runnerId = runner.id
182
183 return runnerJob.save({ transaction })
184 })
185 })
186 if (!newRunnerJob) return
187
188 newRunnerJob.Runner = runner as RunnerModel
189
190 const result: AcceptRunnerJobResult = {
191 job: {
192 ...newRunnerJob.toFormattedJSON(),
193
194 jobToken: newRunnerJob.processingJobToken
195 }
196 }
197
198 updateLastRunnerContact(req, runner)
199
200 logger.info(
201 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
202 lTags(runner.name, runnerJob.uuid, runnerJob.type)
203 )
204
205 return res.json(result)
206 }
207
208 async function abortRunnerJob (req: express.Request, res: express.Response) {
209 const runnerJob = res.locals.runnerJob
210 const runner = runnerJob.Runner
211 const body: AbortRunnerJobBody = req.body
212
213 logger.info(
214 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
215 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
216 )
217
218 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
219 await new RunnerJobHandler().abort({ runnerJob })
220
221 updateLastRunnerContact(req, runnerJob.Runner)
222
223 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
224 }
225
226 async function errorRunnerJob (req: express.Request, res: express.Response) {
227 const runnerJob = res.locals.runnerJob
228 const runner = runnerJob.Runner
229 const body: ErrorRunnerJobBody = req.body
230
231 runnerJob.failures += 1
232
233 logger.error(
234 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
235 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
236 )
237
238 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
239 await new RunnerJobHandler().error({ runnerJob, message: body.message })
240
241 updateLastRunnerContact(req, runnerJob.Runner)
242
243 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
244 }
245
246 // ---------------------------------------------------------------------------
247
248 const jobUpdateBuilders: {
249 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
250 } = {
251 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
252 return {
253 ...payload,
254
255 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
256 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
257 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
258 }
259 }
260 }
261
262 async function updateRunnerJobController (req: express.Request, res: express.Response) {
263 const runnerJob = res.locals.runnerJob
264 const runner = runnerJob.Runner
265 const body: RunnerJobUpdateBody = req.body
266
267 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
268 const updatePayload = payloadBuilder
269 ? payloadBuilder(body.payload, req.files as UploadFiles)
270 : undefined
271
272 logger.debug(
273 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
274 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
275 )
276
277 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
278 await new RunnerJobHandler().update({
279 runnerJob,
280 progress: req.body.progress,
281 updatePayload
282 })
283
284 updateLastRunnerContact(req, runnerJob.Runner)
285
286 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
287 }
288
289 // ---------------------------------------------------------------------------
290
291 const jobSuccessPayloadBuilders: {
292 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
293 } = {
294 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
295 return {
296 ...payload,
297
298 videoFile: files['payload[videoFile]'][0].path
299 }
300 },
301
302 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
303 return {
304 ...payload,
305
306 videoFile: files['payload[videoFile]'][0].path,
307 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
308 }
309 },
310
311 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
312 return {
313 ...payload,
314
315 videoFile: files['payload[videoFile]'][0].path
316 }
317 },
318
319 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => {
320 return {
321 ...payload,
322
323 videoFile: files['payload[videoFile]'][0].path
324 }
325 },
326
327 'live-rtmp-hls-transcoding': () => ({})
328 }
329
330 async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
331 const runnerJob = res.locals.runnerJob
332 const runner = runnerJob.Runner
333 const body: RunnerJobSuccessBody = req.body
334
335 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
336
337 logger.info(
338 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
339 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
340 )
341
342 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
343 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
344
345 updateLastRunnerContact(req, runnerJob.Runner)
346
347 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
348 }
349
350 // ---------------------------------------------------------------------------
351 // Controllers for admins
352 // ---------------------------------------------------------------------------
353
354 async function cancelRunnerJob (req: express.Request, res: express.Response) {
355 const runnerJob = res.locals.runnerJob
356
357 logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
358
359 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
360 await new RunnerJobHandler().cancel({ runnerJob })
361
362 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
363 }
364
365 async function listRunnerJobs (req: express.Request, res: express.Response) {
366 const query: ListRunnerJobsQuery = req.query
367
368 const resultList = await RunnerJobModel.listForApi({
369 start: query.start,
370 count: query.count,
371 sort: query.sort,
372 search: query.search
373 })
374
375 return res.json({
376 total: resultList.total,
377 data: resultList.data.map(d => d.toFormattedAdminJSON())
378 })
379 }