]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/controllers/api/runners/jobs.ts
Support studio transcoding in peertube runner
[github/Chocobozzz/PeerTube.git] / server / controllers / api / runners / jobs.ts
CommitLineData
0c9668f7
C
1import express, { UploadFiles } from 'express'
2import { createReqFiles } from '@server/helpers/express-utils'
3import { logger, loggerTagsFactory } from '@server/helpers/logger'
4import { generateRunnerJobToken } from '@server/helpers/token-generator'
5import { MIMETYPES } from '@server/initializers/constants'
6import { sequelizeTypescript } from '@server/initializers/database'
7import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners'
8import {
9 asyncMiddleware,
10 authenticate,
11 ensureUserHasRight,
12 paginationValidator,
13 runnerJobsSortValidator,
14 setDefaultPagination,
15 setDefaultSort
16} from '@server/middlewares'
17import {
18 abortRunnerJobValidator,
19 acceptRunnerJobValidator,
5e47f6ab 20 cancelRunnerJobValidator,
0c9668f7
C
21 errorRunnerJobValidator,
22 getRunnerFromTokenValidator,
23 jobOfRunnerGetValidator,
24 runnerJobGetValidator,
25 successRunnerJobValidator,
26 updateRunnerJobValidator
27} from '@server/middlewares/validators/runners'
28import { RunnerModel } from '@server/models/runner/runner'
29import { RunnerJobModel } from '@server/models/runner/runner-job'
30import {
31 AbortRunnerJobBody,
32 AcceptRunnerJobResult,
33 ErrorRunnerJobBody,
34 HttpStatusCode,
35 ListRunnerJobsQuery,
36 LiveRTMPHLSTranscodingUpdatePayload,
37 RequestRunnerJobResult,
38 RunnerJobState,
39 RunnerJobSuccessBody,
40 RunnerJobSuccessPayload,
41 RunnerJobType,
42 RunnerJobUpdateBody,
43 RunnerJobUpdatePayload,
44 UserRight,
5e47f6ab 45 VideoEditionTranscodingSuccess,
0c9668f7
C
46 VODAudioMergeTranscodingSuccess,
47 VODHLSTranscodingSuccess,
48 VODWebVideoTranscodingSuccess
49} from '@shared/models'
50
51const postRunnerJobSuccessVideoFiles = createReqFiles(
52 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
53 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
54)
55
56const runnerJobUpdateVideoFiles = createReqFiles(
57 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
58 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
59)
60
61const lTags = loggerTagsFactory('api', 'runner')
62
63const runnerJobsRouter = express.Router()
64
65// ---------------------------------------------------------------------------
66// Controllers for runners
67// ---------------------------------------------------------------------------
68
69runnerJobsRouter.post('/jobs/request',
70 asyncMiddleware(getRunnerFromTokenValidator),
71 asyncMiddleware(requestRunnerJob)
72)
73
74runnerJobsRouter.post('/jobs/:jobUUID/accept',
75 asyncMiddleware(runnerJobGetValidator),
76 acceptRunnerJobValidator,
77 asyncMiddleware(getRunnerFromTokenValidator),
78 asyncMiddleware(acceptRunnerJob)
79)
80
81runnerJobsRouter.post('/jobs/:jobUUID/abort',
82 asyncMiddleware(jobOfRunnerGetValidator),
83 abortRunnerJobValidator,
84 asyncMiddleware(abortRunnerJob)
85)
86
87runnerJobsRouter.post('/jobs/:jobUUID/update',
88 runnerJobUpdateVideoFiles,
89 asyncMiddleware(jobOfRunnerGetValidator),
90 updateRunnerJobValidator,
91 asyncMiddleware(updateRunnerJobController)
92)
93
94runnerJobsRouter.post('/jobs/:jobUUID/error',
95 asyncMiddleware(jobOfRunnerGetValidator),
96 errorRunnerJobValidator,
97 asyncMiddleware(errorRunnerJob)
98)
99
100runnerJobsRouter.post('/jobs/:jobUUID/success',
101 postRunnerJobSuccessVideoFiles,
102 asyncMiddleware(jobOfRunnerGetValidator),
103 successRunnerJobValidator,
104 asyncMiddleware(postRunnerJobSuccess)
105)
106
107// ---------------------------------------------------------------------------
108// Controllers for admins
109// ---------------------------------------------------------------------------
110
111runnerJobsRouter.post('/jobs/:jobUUID/cancel',
112 authenticate,
113 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
114 asyncMiddleware(runnerJobGetValidator),
5e47f6ab 115 cancelRunnerJobValidator,
0c9668f7
C
116 asyncMiddleware(cancelRunnerJob)
117)
118
119runnerJobsRouter.get('/jobs',
120 authenticate,
121 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
122 paginationValidator,
123 runnerJobsSortValidator,
124 setDefaultSort,
125 setDefaultPagination,
126 asyncMiddleware(listRunnerJobs)
127)
128
129// ---------------------------------------------------------------------------
130
131export {
132 runnerJobsRouter
133}
134
135// ---------------------------------------------------------------------------
136
137// ---------------------------------------------------------------------------
138// Controllers for runners
139// ---------------------------------------------------------------------------
140
141async function requestRunnerJob (req: express.Request, res: express.Response) {
142 const runner = res.locals.runner
143 const availableJobs = await RunnerJobModel.listAvailableJobs()
144
145 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
146
147 const result: RequestRunnerJobResult = {
148 availableJobs: availableJobs.map(j => ({
149 uuid: j.uuid,
150 type: j.type,
151 payload: j.payload
152 }))
153 }
154
155 updateLastRunnerContact(req, runner)
156
157 return res.json(result)
158}
159
160async function acceptRunnerJob (req: express.Request, res: express.Response) {
161 const runner = res.locals.runner
162 const runnerJob = res.locals.runnerJob
163
164 runnerJob.state = RunnerJobState.PROCESSING
165 runnerJob.processingJobToken = generateRunnerJobToken()
166 runnerJob.startedAt = new Date()
167 runnerJob.runnerId = runner.id
168
169 const newRunnerJob = await sequelizeTypescript.transaction(transaction => {
170 return runnerJob.save({ transaction })
171 })
172 newRunnerJob.Runner = runner as RunnerModel
173
174 const result: AcceptRunnerJobResult = {
175 job: {
176 ...newRunnerJob.toFormattedJSON(),
177
178 jobToken: newRunnerJob.processingJobToken
179 }
180 }
181
182 updateLastRunnerContact(req, runner)
183
184 logger.info(
185 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
186 lTags(runner.name, runnerJob.uuid, runnerJob.type)
187 )
188
189 return res.json(result)
190}
191
192async function abortRunnerJob (req: express.Request, res: express.Response) {
193 const runnerJob = res.locals.runnerJob
194 const runner = runnerJob.Runner
195 const body: AbortRunnerJobBody = req.body
196
197 logger.info(
198 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
199 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
200 )
201
202 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
203 await new RunnerJobHandler().abort({ runnerJob })
204
205 updateLastRunnerContact(req, runnerJob.Runner)
206
207 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
208}
209
210async function errorRunnerJob (req: express.Request, res: express.Response) {
211 const runnerJob = res.locals.runnerJob
212 const runner = runnerJob.Runner
213 const body: ErrorRunnerJobBody = req.body
214
215 runnerJob.failures += 1
216
217 logger.error(
218 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
219 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
220 )
221
222 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
223 await new RunnerJobHandler().error({ runnerJob, message: body.message })
224
225 updateLastRunnerContact(req, runnerJob.Runner)
226
227 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
228}
229
230// ---------------------------------------------------------------------------
231
232const jobUpdateBuilders: {
233 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
234} = {
235 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
236 return {
237 ...payload,
238
239 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
240 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
241 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
242 }
243 }
244}
245
246async function updateRunnerJobController (req: express.Request, res: express.Response) {
247 const runnerJob = res.locals.runnerJob
248 const runner = runnerJob.Runner
249 const body: RunnerJobUpdateBody = req.body
250
251 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
252 const updatePayload = payloadBuilder
253 ? payloadBuilder(body.payload, req.files as UploadFiles)
254 : undefined
255
256 logger.debug(
257 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
258 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
259 )
260
261 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
262 await new RunnerJobHandler().update({
263 runnerJob,
264 progress: req.body.progress,
265 updatePayload
266 })
267
268 updateLastRunnerContact(req, runnerJob.Runner)
269
270 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
271}
272
273// ---------------------------------------------------------------------------
274
275const jobSuccessPayloadBuilders: {
276 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
277} = {
278 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
279 return {
280 ...payload,
281
282 videoFile: files['payload[videoFile]'][0].path
283 }
284 },
285
286 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
287 return {
288 ...payload,
289
290 videoFile: files['payload[videoFile]'][0].path,
291 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
292 }
293 },
294
295 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
296 return {
297 ...payload,
298
299 videoFile: files['payload[videoFile]'][0].path
300 }
301 },
302
5e47f6ab
C
303 'video-edition-transcoding': (payload: VideoEditionTranscodingSuccess, files) => {
304 return {
305 ...payload,
306
307 videoFile: files['payload[videoFile]'][0].path
308 }
309 },
310
0c9668f7
C
311 'live-rtmp-hls-transcoding': () => ({})
312}
313
314async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
315 const runnerJob = res.locals.runnerJob
316 const runner = runnerJob.Runner
317 const body: RunnerJobSuccessBody = req.body
318
319 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
320
321 logger.info(
322 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
323 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
324 )
325
326 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
327 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
328
329 updateLastRunnerContact(req, runnerJob.Runner)
330
331 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
332}
333
334// ---------------------------------------------------------------------------
335// Controllers for admins
336// ---------------------------------------------------------------------------
337
338async function cancelRunnerJob (req: express.Request, res: express.Response) {
339 const runnerJob = res.locals.runnerJob
340
5e47f6ab 341 logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
0c9668f7
C
342
343 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
344 await new RunnerJobHandler().cancel({ runnerJob })
345
346 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
347}
348
349async function listRunnerJobs (req: express.Request, res: express.Response) {
350 const query: ListRunnerJobsQuery = req.query
351
352 const resultList = await RunnerJobModel.listForApi({
353 start: query.start,
354 count: query.count,
355 sort: query.sort,
356 search: query.search
357 })
358
359 return res.json({
360 total: resultList.total,
361 data: resultList.data.map(d => d.toFormattedAdminJSON())
362 })
363}