1 import express, { UploadFiles } from 'express'
2 import { createReqFiles } from '@server/helpers/express-utils'
3 import { logger, loggerTagsFactory } from '@server/helpers/logger'
4 import { generateRunnerJobToken } from '@server/helpers/token-generator'
5 import { MIMETYPES } from '@server/initializers/constants'
6 import { sequelizeTypescript } from '@server/initializers/database'
7 import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners'
13 runnerJobsSortValidator,
16 } from '@server/middlewares'
18 abortRunnerJobValidator,
19 acceptRunnerJobValidator,
20 cancelRunnerJobValidator,
21 errorRunnerJobValidator,
22 getRunnerFromTokenValidator,
23 jobOfRunnerGetValidator,
24 runnerJobGetValidator,
25 successRunnerJobValidator,
26 updateRunnerJobValidator
27 } from '@server/middlewares/validators/runners'
28 import { RunnerModel } from '@server/models/runner/runner'
29 import { RunnerJobModel } from '@server/models/runner/runner-job'
32 AcceptRunnerJobResult,
36 LiveRTMPHLSTranscodingUpdatePayload,
37 RequestRunnerJobResult,
40 RunnerJobSuccessPayload,
43 RunnerJobUpdatePayload,
45 VideoStudioTranscodingSuccess,
46 VODAudioMergeTranscodingSuccess,
47 VODHLSTranscodingSuccess,
48 VODWebVideoTranscodingSuccess
49 } from '@shared/models'
51 const postRunnerJobSuccessVideoFiles = createReqFiles(
52 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
53 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
56 const runnerJobUpdateVideoFiles = createReqFiles(
57 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
58 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
61 const lTags = loggerTagsFactory('api', 'runner')
63 const runnerJobsRouter = express.Router()
65 // ---------------------------------------------------------------------------
66 // Controllers for runners
67 // ---------------------------------------------------------------------------
69 runnerJobsRouter.post('/jobs/request',
70 asyncMiddleware(getRunnerFromTokenValidator),
71 asyncMiddleware(requestRunnerJob)
74 runnerJobsRouter.post('/jobs/:jobUUID/accept',
75 asyncMiddleware(runnerJobGetValidator),
76 acceptRunnerJobValidator,
77 asyncMiddleware(getRunnerFromTokenValidator),
78 asyncMiddleware(acceptRunnerJob)
81 runnerJobsRouter.post('/jobs/:jobUUID/abort',
82 asyncMiddleware(jobOfRunnerGetValidator),
83 abortRunnerJobValidator,
84 asyncMiddleware(abortRunnerJob)
87 runnerJobsRouter.post('/jobs/:jobUUID/update',
88 runnerJobUpdateVideoFiles,
89 asyncMiddleware(jobOfRunnerGetValidator),
90 updateRunnerJobValidator,
91 asyncMiddleware(updateRunnerJobController)
94 runnerJobsRouter.post('/jobs/:jobUUID/error',
95 asyncMiddleware(jobOfRunnerGetValidator),
96 errorRunnerJobValidator,
97 asyncMiddleware(errorRunnerJob)
100 runnerJobsRouter.post('/jobs/:jobUUID/success',
101 postRunnerJobSuccessVideoFiles,
102 asyncMiddleware(jobOfRunnerGetValidator),
103 successRunnerJobValidator,
104 asyncMiddleware(postRunnerJobSuccess)
107 // ---------------------------------------------------------------------------
108 // Controllers for admins
109 // ---------------------------------------------------------------------------
111 runnerJobsRouter.post('/jobs/:jobUUID/cancel',
113 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
114 asyncMiddleware(runnerJobGetValidator),
115 cancelRunnerJobValidator,
116 asyncMiddleware(cancelRunnerJob)
119 runnerJobsRouter.get('/jobs',
121 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
123 runnerJobsSortValidator,
125 setDefaultPagination,
126 asyncMiddleware(listRunnerJobs)
129 // ---------------------------------------------------------------------------
135 // ---------------------------------------------------------------------------
137 // ---------------------------------------------------------------------------
138 // Controllers for runners
139 // ---------------------------------------------------------------------------
141 async function requestRunnerJob (req: express.Request, res: express.Response) {
142 const runner = res.locals.runner
143 const availableJobs = await RunnerJobModel.listAvailableJobs()
145 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
147 const result: RequestRunnerJobResult = {
148 availableJobs: availableJobs.map(j => ({
155 updateLastRunnerContact(req, runner)
157 return res.json(result)
160 async function acceptRunnerJob (req: express.Request, res: express.Response) {
161 const runner = res.locals.runner
162 const runnerJob = res.locals.runnerJob
164 runnerJob.state = RunnerJobState.PROCESSING
165 runnerJob.processingJobToken = generateRunnerJobToken()
166 runnerJob.startedAt = new Date()
167 runnerJob.runnerId = runner.id
169 const newRunnerJob = await sequelizeTypescript.transaction(transaction => {
170 return runnerJob.save({ transaction })
172 newRunnerJob.Runner = runner as RunnerModel
174 const result: AcceptRunnerJobResult = {
176 ...newRunnerJob.toFormattedJSON(),
178 jobToken: newRunnerJob.processingJobToken
182 updateLastRunnerContact(req, runner)
185 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
186 lTags(runner.name, runnerJob.uuid, runnerJob.type)
189 return res.json(result)
192 async function abortRunnerJob (req: express.Request, res: express.Response) {
193 const runnerJob = res.locals.runnerJob
194 const runner = runnerJob.Runner
195 const body: AbortRunnerJobBody = req.body
198 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
199 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
202 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
203 await new RunnerJobHandler().abort({ runnerJob })
205 updateLastRunnerContact(req, runnerJob.Runner)
207 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
210 async function errorRunnerJob (req: express.Request, res: express.Response) {
211 const runnerJob = res.locals.runnerJob
212 const runner = runnerJob.Runner
213 const body: ErrorRunnerJobBody = req.body
215 runnerJob.failures += 1
218 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
219 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
222 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
223 await new RunnerJobHandler().error({ runnerJob, message: body.message })
225 updateLastRunnerContact(req, runnerJob.Runner)
227 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
230 // ---------------------------------------------------------------------------
232 const jobUpdateBuilders: {
233 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
235 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
239 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
240 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
241 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
246 async function updateRunnerJobController (req: express.Request, res: express.Response) {
247 const runnerJob = res.locals.runnerJob
248 const runner = runnerJob.Runner
249 const body: RunnerJobUpdateBody = req.body
251 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
252 const updatePayload = payloadBuilder
253 ? payloadBuilder(body.payload, req.files as UploadFiles)
257 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
258 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
261 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
262 await new RunnerJobHandler().update({
264 progress: req.body.progress,
268 updateLastRunnerContact(req, runnerJob.Runner)
270 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
273 // ---------------------------------------------------------------------------
275 const jobSuccessPayloadBuilders: {
276 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
278 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
282 videoFile: files['payload[videoFile]'][0].path
286 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
290 videoFile: files['payload[videoFile]'][0].path,
291 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
295 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
299 videoFile: files['payload[videoFile]'][0].path
303 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => {
307 videoFile: files['payload[videoFile]'][0].path
311 'live-rtmp-hls-transcoding': () => ({})
314 async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
315 const runnerJob = res.locals.runnerJob
316 const runner = runnerJob.Runner
317 const body: RunnerJobSuccessBody = req.body
319 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
322 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
323 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
326 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
327 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
329 updateLastRunnerContact(req, runnerJob.Runner)
331 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
334 // ---------------------------------------------------------------------------
335 // Controllers for admins
336 // ---------------------------------------------------------------------------
338 async function cancelRunnerJob (req: express.Request, res: express.Response) {
339 const runnerJob = res.locals.runnerJob
341 logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
343 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
344 await new RunnerJobHandler().cancel({ runnerJob })
346 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
349 async function listRunnerJobs (req: express.Request, res: express.Response) {
350 const query: ListRunnerJobsQuery = req.query
352 const resultList = await RunnerJobModel.listForApi({
360 total: resultList.total,
361 data: resultList.data.map(d => d.toFormattedAdminJSON())