]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/controllers/api/runners/jobs.ts
Fix runner api rate limit bypass
[github/Chocobozzz/PeerTube.git] / server / controllers / api / runners / jobs.ts
CommitLineData
0c9668f7 1import express, { UploadFiles } from 'express'
d38541fd 2import { retryTransactionWrapper } from '@server/helpers/database-utils'
0c9668f7
C
3import { createReqFiles } from '@server/helpers/express-utils'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { generateRunnerJobToken } from '@server/helpers/token-generator'
6import { MIMETYPES } from '@server/initializers/constants'
7import { sequelizeTypescript } from '@server/initializers/database'
8import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners'
9import {
e915cde3 10 apiRateLimiter,
0c9668f7
C
11 asyncMiddleware,
12 authenticate,
13 ensureUserHasRight,
14 paginationValidator,
15 runnerJobsSortValidator,
16 setDefaultPagination,
17 setDefaultSort
18} from '@server/middlewares'
19import {
20 abortRunnerJobValidator,
21 acceptRunnerJobValidator,
5e47f6ab 22 cancelRunnerJobValidator,
0c9668f7
C
23 errorRunnerJobValidator,
24 getRunnerFromTokenValidator,
25 jobOfRunnerGetValidator,
26 runnerJobGetValidator,
27 successRunnerJobValidator,
28 updateRunnerJobValidator
29} from '@server/middlewares/validators/runners'
30import { RunnerModel } from '@server/models/runner/runner'
31import { RunnerJobModel } from '@server/models/runner/runner-job'
32import {
33 AbortRunnerJobBody,
34 AcceptRunnerJobResult,
35 ErrorRunnerJobBody,
36 HttpStatusCode,
37 ListRunnerJobsQuery,
38 LiveRTMPHLSTranscodingUpdatePayload,
39 RequestRunnerJobResult,
40 RunnerJobState,
41 RunnerJobSuccessBody,
42 RunnerJobSuccessPayload,
43 RunnerJobType,
44 RunnerJobUpdateBody,
45 RunnerJobUpdatePayload,
fe7019b2 46 ServerErrorCode,
0c9668f7 47 UserRight,
ab14f0e0 48 VideoStudioTranscodingSuccess,
0c9668f7
C
49 VODAudioMergeTranscodingSuccess,
50 VODHLSTranscodingSuccess,
51 VODWebVideoTranscodingSuccess
52} from '@shared/models'
53
54const postRunnerJobSuccessVideoFiles = createReqFiles(
55 [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ],
56 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
57)
58
59const runnerJobUpdateVideoFiles = createReqFiles(
60 [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ],
61 { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT }
62)
63
64const lTags = loggerTagsFactory('api', 'runner')
65
66const runnerJobsRouter = express.Router()
67
68// ---------------------------------------------------------------------------
69// Controllers for runners
70// ---------------------------------------------------------------------------
71
72runnerJobsRouter.post('/jobs/request',
e915cde3 73 apiRateLimiter,
0c9668f7
C
74 asyncMiddleware(getRunnerFromTokenValidator),
75 asyncMiddleware(requestRunnerJob)
76)
77
78runnerJobsRouter.post('/jobs/:jobUUID/accept',
e915cde3 79 apiRateLimiter,
0c9668f7
C
80 asyncMiddleware(runnerJobGetValidator),
81 acceptRunnerJobValidator,
82 asyncMiddleware(getRunnerFromTokenValidator),
83 asyncMiddleware(acceptRunnerJob)
84)
85
86runnerJobsRouter.post('/jobs/:jobUUID/abort',
e915cde3 87 apiRateLimiter,
0c9668f7
C
88 asyncMiddleware(jobOfRunnerGetValidator),
89 abortRunnerJobValidator,
90 asyncMiddleware(abortRunnerJob)
91)
92
93runnerJobsRouter.post('/jobs/:jobUUID/update',
94 runnerJobUpdateVideoFiles,
e915cde3 95 apiRateLimiter, // Has to be after multer middleware to parse runner token
0c9668f7
C
96 asyncMiddleware(jobOfRunnerGetValidator),
97 updateRunnerJobValidator,
98 asyncMiddleware(updateRunnerJobController)
99)
100
101runnerJobsRouter.post('/jobs/:jobUUID/error',
102 asyncMiddleware(jobOfRunnerGetValidator),
103 errorRunnerJobValidator,
104 asyncMiddleware(errorRunnerJob)
105)
106
107runnerJobsRouter.post('/jobs/:jobUUID/success',
108 postRunnerJobSuccessVideoFiles,
e915cde3 109 apiRateLimiter, // Has to be after multer middleware to parse runner token
0c9668f7
C
110 asyncMiddleware(jobOfRunnerGetValidator),
111 successRunnerJobValidator,
112 asyncMiddleware(postRunnerJobSuccess)
113)
114
115// ---------------------------------------------------------------------------
116// Controllers for admins
117// ---------------------------------------------------------------------------
118
119runnerJobsRouter.post('/jobs/:jobUUID/cancel',
120 authenticate,
121 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
122 asyncMiddleware(runnerJobGetValidator),
5e47f6ab 123 cancelRunnerJobValidator,
0c9668f7
C
124 asyncMiddleware(cancelRunnerJob)
125)
126
127runnerJobsRouter.get('/jobs',
128 authenticate,
129 ensureUserHasRight(UserRight.MANAGE_RUNNERS),
130 paginationValidator,
131 runnerJobsSortValidator,
132 setDefaultSort,
133 setDefaultPagination,
134 asyncMiddleware(listRunnerJobs)
135)
136
137// ---------------------------------------------------------------------------
138
139export {
140 runnerJobsRouter
141}
142
143// ---------------------------------------------------------------------------
144
145// ---------------------------------------------------------------------------
146// Controllers for runners
147// ---------------------------------------------------------------------------
148
149async function requestRunnerJob (req: express.Request, res: express.Response) {
150 const runner = res.locals.runner
151 const availableJobs = await RunnerJobModel.listAvailableJobs()
152
153 logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })
154
155 const result: RequestRunnerJobResult = {
156 availableJobs: availableJobs.map(j => ({
157 uuid: j.uuid,
158 type: j.type,
159 payload: j.payload
160 }))
161 }
162
163 updateLastRunnerContact(req, runner)
164
165 return res.json(result)
166}
167
168async function acceptRunnerJob (req: express.Request, res: express.Response) {
169 const runner = res.locals.runner
170 const runnerJob = res.locals.runnerJob
171
d38541fd
C
172 const newRunnerJob = await retryTransactionWrapper(() => {
173 return sequelizeTypescript.transaction(async transaction => {
174 await runnerJob.reload({ transaction })
0c9668f7 175
d38541fd
C
176 if (runnerJob.state !== RunnerJobState.PENDING) {
177 res.fail({
fe7019b2 178 type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
d38541fd
C
179 message: 'This job is not in pending state anymore',
180 status: HttpStatusCode.CONFLICT_409
181 })
182
183 return undefined
184 }
185
186 runnerJob.state = RunnerJobState.PROCESSING
187 runnerJob.processingJobToken = generateRunnerJobToken()
188 runnerJob.startedAt = new Date()
189 runnerJob.runnerId = runner.id
190
191 return runnerJob.save({ transaction })
192 })
0c9668f7 193 })
d38541fd
C
194 if (!newRunnerJob) return
195
0c9668f7
C
196 newRunnerJob.Runner = runner as RunnerModel
197
198 const result: AcceptRunnerJobResult = {
199 job: {
200 ...newRunnerJob.toFormattedJSON(),
201
202 jobToken: newRunnerJob.processingJobToken
203 }
204 }
205
206 updateLastRunnerContact(req, runner)
207
208 logger.info(
209 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
210 lTags(runner.name, runnerJob.uuid, runnerJob.type)
211 )
212
213 return res.json(result)
214}
215
216async function abortRunnerJob (req: express.Request, res: express.Response) {
217 const runnerJob = res.locals.runnerJob
218 const runner = runnerJob.Runner
219 const body: AbortRunnerJobBody = req.body
220
221 logger.info(
222 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
223 { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
224 )
225
226 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
227 await new RunnerJobHandler().abort({ runnerJob })
228
229 updateLastRunnerContact(req, runnerJob.Runner)
230
231 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
232}
233
234async function errorRunnerJob (req: express.Request, res: express.Response) {
235 const runnerJob = res.locals.runnerJob
236 const runner = runnerJob.Runner
237 const body: ErrorRunnerJobBody = req.body
238
239 runnerJob.failures += 1
240
241 logger.error(
242 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type,
243 { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
244 )
245
246 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
247 await new RunnerJobHandler().error({ runnerJob, message: body.message })
248
249 updateLastRunnerContact(req, runnerJob.Runner)
250
251 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
252}
253
254// ---------------------------------------------------------------------------
255
256const jobUpdateBuilders: {
257 [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload
258} = {
259 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => {
260 return {
261 ...payload,
262
263 masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path,
264 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path,
265 videoChunkFile: files['payload[videoChunkFile]']?.[0].path
266 }
267 }
268}
269
270async function updateRunnerJobController (req: express.Request, res: express.Response) {
271 const runnerJob = res.locals.runnerJob
272 const runner = runnerJob.Runner
273 const body: RunnerJobUpdateBody = req.body
274
275 const payloadBuilder = jobUpdateBuilders[runnerJob.type]
276 const updatePayload = payloadBuilder
277 ? payloadBuilder(body.payload, req.files as UploadFiles)
278 : undefined
279
280 logger.debug(
281 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
282 { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
283 )
284
285 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
286 await new RunnerJobHandler().update({
287 runnerJob,
288 progress: req.body.progress,
289 updatePayload
290 })
291
292 updateLastRunnerContact(req, runnerJob.Runner)
293
294 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
295}
296
297// ---------------------------------------------------------------------------
298
299const jobSuccessPayloadBuilders: {
300 [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload
301} = {
302 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => {
303 return {
304 ...payload,
305
306 videoFile: files['payload[videoFile]'][0].path
307 }
308 },
309
310 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => {
311 return {
312 ...payload,
313
314 videoFile: files['payload[videoFile]'][0].path,
315 resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path
316 }
317 },
318
319 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => {
320 return {
321 ...payload,
322
323 videoFile: files['payload[videoFile]'][0].path
324 }
325 },
326
ab14f0e0 327 'video-studio-transcoding': (payload: VideoStudioTranscodingSuccess, files) => {
5e47f6ab
C
328 return {
329 ...payload,
330
331 videoFile: files['payload[videoFile]'][0].path
332 }
333 },
334
0c9668f7
C
335 'live-rtmp-hls-transcoding': () => ({})
336}
337
338async function postRunnerJobSuccess (req: express.Request, res: express.Response) {
339 const runnerJob = res.locals.runnerJob
340 const runner = runnerJob.Runner
341 const body: RunnerJobSuccessBody = req.body
342
343 const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles)
344
345 logger.info(
346 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type,
347 { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) }
348 )
349
350 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
351 await new RunnerJobHandler().complete({ runnerJob, resultPayload })
352
353 updateLastRunnerContact(req, runnerJob.Runner)
354
355 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
356}
357
358// ---------------------------------------------------------------------------
359// Controllers for admins
360// ---------------------------------------------------------------------------
361
362async function cancelRunnerJob (req: express.Request, res: express.Response) {
363 const runnerJob = res.locals.runnerJob
364
5e47f6ab 365 logger.info('Cancelling job %s (%s)', runnerJob.uuid, runnerJob.type, lTags(runnerJob.uuid, runnerJob.type))
0c9668f7
C
366
367 const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob)
368 await new RunnerJobHandler().cancel({ runnerJob })
369
370 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
371}
372
373async function listRunnerJobs (req: express.Request, res: express.Response) {
374 const query: ListRunnerJobsQuery = req.query
375
376 const resultList = await RunnerJobModel.listForApi({
377 start: query.start,
378 count: query.count,
379 sort: query.sort,
380 search: query.search
381 })
382
383 return res.json({
384 total: resultList.total,
385 data: resultList.data.map(d => d.toFormattedAdminJSON())
386 })
387}