]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/controllers/api/runners/jobs.ts
Fix peertube runner concurrency
[github/Chocobozzz/PeerTube.git] / server / controllers / api / runners / jobs.ts
CommitLineData
0c9668f7 1import express, { UploadFiles } from 'express'
d38541fd 2import { retryTransactionWrapper } from '@server/helpers/database-utils'
0c9668f7
C
3import { createReqFiles } from '@server/helpers/express-utils'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { generateRunnerJobToken } from '@server/helpers/token-generator'
6import { MIMETYPES } from '@server/initializers/constants'
7import { sequelizeTypescript } from '@server/initializers/database'
8import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners'
9import {
10 asyncMiddleware,
11 authenticate,
12 ensureUserHasRight,
13 paginationValidator,
14 runnerJobsSortValidator,
15 setDefaultPagination,
16 setDefaultSort
17} from '@server/middlewares'
18import {
19 abortRunnerJobValidator,
20 acceptRunnerJobValidator,
5e47f6ab 21 cancelRunnerJobValidator,
0c9668f7
C
22 errorRunnerJobValidator,
23 getRunnerFromTokenValidator,
24 jobOfRunnerGetValidator,
25 runnerJobGetValidator,
26 successRunnerJobValidator,
27 updateRunnerJobValidator
28} from '@server/middlewares/validators/runners'
29import { RunnerModel } from '@server/models/runner/runner'
30import { RunnerJobModel } from '@server/models/runner/runner-job'
31import {
32 AbortRunnerJobBody,
33 AcceptRunnerJobResult,
34 ErrorRunnerJobBody,
35 HttpStatusCode,
36 ListRunnerJobsQuery,
37 LiveRTMPHLSTranscodingUpdatePayload,
38 RequestRunnerJobResult,
39 RunnerJobState,
40 RunnerJobSuccessBody,
41 RunnerJobSuccessPayload,
42 RunnerJobType,
43 RunnerJobUpdateBody,
44 RunnerJobUpdatePayload,
fe7019b2 45 ServerErrorCode,
0c9668f7 46 UserRight,
ab14f0e0 47 VideoStudioTranscodingSuccess,
0c9668f7
C
48 VODAudioMergeTranscodingSuccess,
49 VODHLSTranscodingSuccess,
50 VODWebVideoTranscodingSuccess
51} from '@shared/models'
52
53const postRunnerJobSuccessVideoFiles = createReqFiles(
54 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
55 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
56)
57
58const runnerJobUpdateVideoFiles = createReqFiles(
59 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
60 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
61)
62
63const lTags = loggerTagsFactory('api', 'runner')
64
65const runnerJobsRouter = express.Router()
66
67// ---------------------------------------------------------------------------
68// Controllers for runners
69// ---------------------------------------------------------------------------
70
71runnerJobsRouter.post('/jobs/request',
72 asyncMiddleware(getRunnerFromTokenValidator),
73 asyncMiddleware(requestRunnerJob)
74)
75
76runnerJobsRouter.post('/jobs/:jobUUID/accept',
77 asyncMiddleware(runnerJobGetValidator),
78 acceptRunnerJobValidator,
79 asyncMiddleware(getRunnerFromTokenValidator),
80 asyncMiddleware(acceptRunnerJob)
81)
82
83runnerJobsRouter.post('/jobs/:jobUUID/abort',
84 asyncMiddleware(jobOfRunnerGetValidator),
85 abortRunnerJobValidator,
86 asyncMiddleware(abortRunnerJob)
87)
88
89runnerJobsRouter.post('/jobs/:jobUUID/update',
90 runnerJobUpdateVideoFiles,
91 asyncMiddleware(jobOfRunnerGetValidator),
92 updateRunnerJobValidator,
93 asyncMiddleware(updateRunnerJobController)
94)
95
96runnerJobsRouter.post('/jobs/:jobUUID/error',
97 asyncMiddleware(jobOfRunnerGetValidator),
98 errorRunnerJobValidator,
99 asyncMiddleware(errorRunnerJob)
100)
101
102runnerJobsRouter.post('/jobs/:jobUUID/success',
103 postRunnerJobSuccessVideoFiles,
104 asyncMiddleware(jobOfRunnerGetValidator),
105 successRunnerJobValidator,
106 asyncMiddleware(postRunnerJobSuccess)
107)
108
109// ---------------------------------------------------------------------------
110// Controllers for admins
111// ---------------------------------------------------------------------------
112
113runnerJobsRouter.post('/jobs/:jobUUID/cancel',
114 authenticate,
115 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
116 asyncMiddleware(runnerJobGetValidator),
5e47f6ab 117 cancelRunnerJobValidator,
0c9668f7
C
118 asyncMiddleware(cancelRunnerJob)
119)
120
121runnerJobsRouter.get('/jobs',
122 authenticate,
123 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
124 paginationValidator,
125 runnerJobsSortValidator,
126 setDefaultSort,
127 setDefaultPagination,
128 asyncMiddleware(listRunnerJobs)
129)
130
131// ---------------------------------------------------------------------------
132
133export {
134 runnerJobsRouter
135}
136
137// ---------------------------------------------------------------------------
138
139// ---------------------------------------------------------------------------
140// Controllers for runners
141// ---------------------------------------------------------------------------
142
143async function requestRunnerJob (req: express.Request, res: express.Response) {
144 const runner = res.locals.runner
145 const availableJobs = await RunnerJobModel.listAvailableJobs()
146
147 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
148
149 const result: RequestRunnerJobResult = {
150 availableJobs: availableJobs.map(j => ({
151 uuid: j.uuid,
152 type: j.type,
153 payload: j.payload
154 }))
155 }
156
157 updateLastRunnerContact(req, runner)
158
159 return res.json(result)
160}
161
162async function acceptRunnerJob (req: express.Request, res: express.Response) {
163 const runner = res.locals.runner
164 const runnerJob = res.locals.runnerJob
165
d38541fd
C
166 const newRunnerJob = await retryTransactionWrapper(() => {
167 return sequelizeTypescript.transaction(async transaction => {
168 await runnerJob.reload({ transaction })
0c9668f7 169
d38541fd
C
170 if (runnerJob.state !== RunnerJobState.PENDING) {
171 res.fail({
fe7019b2 172 type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
d38541fd
C
173 message: 'This job is not in pending state anymore',
174 status: HttpStatusCode.CONFLICT_409
175 })
176
177 return undefined
178 }
179
180 runnerJob.state = RunnerJobState.PROCESSING
181 runnerJob.processingJobToken = generateRunnerJobToken()
182 runnerJob.startedAt = new Date()
183 runnerJob.runnerId = runner.id
184
185 return runnerJob.save({ transaction })
186 })
0c9668f7 187 })
d38541fd
C
188 if (!newRunnerJob) return
189
0c9668f7
C
190 newRunnerJob.Runner = runner as RunnerModel
191
192 const result: AcceptRunnerJobResult = {
193 job: {
194 ...newRunnerJob.toFormattedJSON(),
195
196 jobToken: newRunnerJob.processingJobToken
197 }
198 }
199
200 updateLastRunnerContact(req, runner)
201
202 logger.info(
203 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
204 lTags(runner.name, runnerJob.uuid, runnerJob.type)
205 )
206
207 return res.json(result)
208}
209
210async function abortRunnerJob (req: express.Request, res: express.Response) {
211 const runnerJob = res.locals.runnerJob
212 const runner = runnerJob.Runner
213 const body: AbortRunnerJobBody = req.body
214
215 logger.info(
216 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
217 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
218 )
219
220 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
221 await new RunnerJobHandler().abort({ runnerJob })
222
223 updateLastRunnerContact(req, runnerJob.Runner)
224
225 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
226}
227
228async function errorRunnerJob (req: express.Request, res: express.Response) {
229 const runnerJob = res.locals.runnerJob
230 const runner = runnerJob.Runner
231 const body: ErrorRunnerJobBody = req.body
232
233 runnerJob.failures += 1
234
235 logger.error(
236 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
237 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
238 )
239
240 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
241 await new RunnerJobHandler().error({ runnerJob, message: body.message })
242
243 updateLastRunnerContact(req, runnerJob.Runner)
244
245 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
246}
247
248// ---------------------------------------------------------------------------
249
250const jobUpdateBuilders: {
251 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
252} = {
253 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
254 return {
255 ...payload,
256
257 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
258 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
259 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
260 }
261 }
262}
263
264async function updateRunnerJobController (req: express.Request, res: express.Response) {
265 const runnerJob = res.locals.runnerJob
266 const runner = runnerJob.Runner
267 const body: RunnerJobUpdateBody = req.body
268
269 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
270 const updatePayload = payloadBuilder
271 ? payloadBuilder(body.payload, req.files as UploadFiles)
272 : undefined
273
274 logger.debug(
275 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
276 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
277 )
278
279 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
280 await new RunnerJobHandler().update({
281 runnerJob,
282 progress: req.body.progress,
283 updatePayload
284 })
285
286 updateLastRunnerContact(req, runnerJob.Runner)
287
288 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
289}
290
291// ---------------------------------------------------------------------------
292
293const jobSuccessPayloadBuilders: {
294 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
295} = {
296 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
297 return {
298 ...payload,
299
300 videoFile: files['payload[videoFile]'][0].path
301 }
302 },
303
304 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
305 return {
306 ...payload,
307
308 videoFile: files['payload[videoFile]'][0].path,
309 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
310 }
311 },
312
313 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
314 return {
315 ...payload,
316
317 videoFile: files['payload[videoFile]'][0].path
318 }
319 },
320
ab14f0e0 321 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => {
5e47f6ab
C
322 return {
323 ...payload,
324
325 videoFile: files['payload[videoFile]'][0].path
326 }
327 },
328
0c9668f7
C
329 'live-rtmp-hls-transcoding': () => ({})
330}
331
332async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
333 const runnerJob = res.locals.runnerJob
334 const runner = runnerJob.Runner
335 const body: RunnerJobSuccessBody = req.body
336
337 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
338
339 logger.info(
340 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
341 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
342 )
343
344 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
345 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
346
347 updateLastRunnerContact(req, runnerJob.Runner)
348
349 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
350}
351
352// ---------------------------------------------------------------------------
353// Controllers for admins
354// ---------------------------------------------------------------------------
355
356async function cancelRunnerJob (req: express.Request, res: express.Response) {
357 const runnerJob = res.locals.runnerJob
358
5e47f6ab 359 logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
0c9668f7
C
360
361 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
362 await new RunnerJobHandler().cancel({ runnerJob })
363
364 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
365}
366
367async function listRunnerJobs (req: express.Request, res: express.Response) {
368 const query: ListRunnerJobsQuery = req.query
369
370 const resultList = await RunnerJobModel.listForApi({
371 start: query.start,
372 count: query.count,
373 sort: query.sort,
374 search: query.search
375 })
376
377 return res.json({
378 total: resultList.total,
379 data: resultList.data.map(d => d.toFormattedAdminJSON())
380 })
381}