From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/middlewares/auth.ts | 27 +++- server/middlewares/doc.ts | 2 +- server/middlewares/error.ts | 6 +- server/middlewares/rate-limiter.ts | 28 +++- server/middlewares/validators/config.ts | 2 + server/middlewares/validators/runners/index.ts | 3 + server/middlewares/validators/runners/job-files.ts | 27 ++++ server/middlewares/validators/runners/jobs.ts | 156 +++++++++++++++++++++ .../validators/runners/registration-token.ts | 37 +++++ server/middlewares/validators/runners/runners.ts | 95 +++++++++++++ server/middlewares/validators/sort.ts | 4 + server/middlewares/validators/videos/video-live.ts | 9 ++ .../middlewares/validators/videos/video-studio.ts | 2 +- server/middlewares/validators/videos/videos.ts | 2 +- 14 files changed, 387 insertions(+), 13 deletions(-) create mode 100644 server/middlewares/validators/runners/index.ts create mode 100644 server/middlewares/validators/runners/job-files.ts create mode 100644 server/middlewares/validators/runners/jobs.ts create mode 100644 server/middlewares/validators/runners/registration-token.ts create mode 100644 server/middlewares/validators/runners/runners.ts (limited to 'server/middlewares') diff --git a/server/middlewares/auth.ts b/server/middlewares/auth.ts index e6025c8ce..0eefa2a8e 100644 --- a/server/middlewares/auth.ts +++ b/server/middlewares/auth.ts @@ -1,6 +1,7 @@ import express from 'express' import { Socket } from 'socket.io' import { getAccessToken } from '@server/lib/auth/oauth-model' +import { RunnerModel } from '@server/models/runner/runner' import { HttpStatusCode } from '../../shared/models/http/http-error-codes' import { logger } from '../helpers/logger' import { handleOAuthAuthenticate } from '../lib/auth/oauth' @@ -27,7 +28,7 @@ function authenticate (req: express.Request, res: express.Response, next: expres function authenticateSocket (socket: Socket, next: (err?: any) => void) { const accessToken = socket.handshake.query['accessToken'] - logger.debug('Checking socket access token %s.', accessToken) + logger.debug('Checking access token in runner.') if (!accessToken) return next(new Error('No access token provided')) if (typeof accessToken !== 'string') return next(new Error('Access token is invalid')) @@ -73,9 +74,31 @@ function optionalAuthenticate (req: express.Request, res: express.Response, next // --------------------------------------------------------------------------- +function authenticateRunnerSocket (socket: Socket, next: (err?: any) => void) { + const runnerToken = socket.handshake.auth['runnerToken'] + + logger.debug('Checking runner token in socket.') + + if (!runnerToken) return next(new Error('No runner token provided')) + if (typeof runnerToken !== 'string') return next(new Error('Runner token is invalid')) + + RunnerModel.loadByToken(runnerToken) + .then(runner => { + if (!runner) return next(new Error('Invalid runner token.')) + + socket.handshake.auth.runner = runner + + return next() + }) + .catch(err => logger.error('Cannot get runner token.', { err })) +} + +// --------------------------------------------------------------------------- + export { authenticate, authenticateSocket, authenticatePromise, - optionalAuthenticate + optionalAuthenticate, + authenticateRunnerSocket } diff --git a/server/middlewares/doc.ts b/server/middlewares/doc.ts index c43f41977..eef76acaa 100644 --- a/server/middlewares/doc.ts +++ b/server/middlewares/doc.ts @@ -5,7 +5,7 @@ function openapiOperationDoc (options: { operationId?: string }) { return (req: express.Request, res: express.Response, next: express.NextFunction) => { - res.locals.docUrl = options.url || 'https://docs.joinpeertube.org/api/rest-reference.html#operation/' + options.operationId + res.locals.docUrl = options.url || 'https://docs.joinpeertube.org/api-rest-reference.html#operation/' + options.operationId if (next) return next() } diff --git a/server/middlewares/error.ts b/server/middlewares/error.ts index 540edaeeb..94762e355 100644 --- a/server/middlewares/error.ts +++ b/server/middlewares/error.ts @@ -5,7 +5,7 @@ import { HttpStatusCode } from '@shared/models' function apiFailMiddleware (req: express.Request, res: express.Response, next: express.NextFunction) { res.fail = options => { - const { status = HttpStatusCode.BAD_REQUEST_400, message, title, type, data, instance } = options + const { status = HttpStatusCode.BAD_REQUEST_400, message, title, type, data, instance, tags } = options const extension = new ProblemDocumentExtension({ ...data, @@ -31,11 +31,11 @@ function apiFailMiddleware (req: express.Request, res: express.Response, next: e detail: message, type: type - ? `https://docs.joinpeertube.org/api/rest-reference.html#section/Errors/${type}` + ? `https://docs.joinpeertube.org/api-rest-reference.html#section/Errors/${type}` : undefined }, extension) - logger.debug('Bad HTTP request.', { json }) + logger.debug('Bad HTTP request.', { json, tags }) res.json(json) } diff --git a/server/middlewares/rate-limiter.ts b/server/middlewares/rate-limiter.ts index bc9513969..1eef8b360 100644 --- a/server/middlewares/rate-limiter.ts +++ b/server/middlewares/rate-limiter.ts @@ -1,10 +1,12 @@ +import express from 'express' +import RateLimit, { Options as RateLimitHandlerOptions } from 'express-rate-limit' +import { RunnerModel } from '@server/models/runner/runner' import { UserRole } from '@shared/models' -import RateLimit from 'express-rate-limit' import { optionalAuthenticate } from './auth' const whitelistRoles = new Set([ UserRole.ADMINISTRATOR, UserRole.MODERATOR ]) -function buildRateLimiter (options: { +export function buildRateLimiter (options: { windowMs: number max: number skipFailedRequests?: boolean @@ -15,17 +17,33 @@ function buildRateLimiter (options: { skipFailedRequests: options.skipFailedRequests, handler: (req, res, next, options) => { + // Bypass rate limit for registered runners + if (req.body?.runnerToken) { + return RunnerModel.loadByToken(req.body.runnerToken) + .then(runner => { + if (runner) return next() + + return sendRateLimited(res, options) + }) + } + + // Bypass rate limit for admins/moderators return optionalAuthenticate(req, res, () => { if (res.locals.authenticated === true && whitelistRoles.has(res.locals.oauth.token.User.role)) { return next() } - return res.status(options.statusCode).send(options.message) + return sendRateLimited(res, options) }) } }) } -export { - buildRateLimiter +// --------------------------------------------------------------------------- +// Private +// --------------------------------------------------------------------------- + +function sendRateLimited (res: express.Response, options: RateLimitHandlerOptions) { + return res.status(options.statusCode).send(options.message) + } diff --git a/server/middlewares/validators/config.ts b/server/middlewares/validators/config.ts index 4a9d1cb54..b3e7e5011 100644 --- a/server/middlewares/validators/config.ts +++ b/server/middlewares/validators/config.ts @@ -54,6 +54,7 @@ const customConfigUpdateValidator = [ body('transcoding.resolutions.1080p').isBoolean(), body('transcoding.resolutions.1440p').isBoolean(), body('transcoding.resolutions.2160p').isBoolean(), + body('transcoding.remoteRunners.enabled').isBoolean(), body('transcoding.alwaysTranscodeOriginalResolution').isBoolean(), @@ -97,6 +98,7 @@ const customConfigUpdateValidator = [ body('live.transcoding.resolutions.1440p').isBoolean(), body('live.transcoding.resolutions.2160p').isBoolean(), body('live.transcoding.alwaysTranscodeOriginalResolution').isBoolean(), + body('live.transcoding.remoteRunners.enabled').isBoolean(), body('search.remoteUri.users').isBoolean(), body('search.remoteUri.anonymous').isBoolean(), diff --git a/server/middlewares/validators/runners/index.ts b/server/middlewares/validators/runners/index.ts new file mode 100644 index 000000000..9a9629a80 --- /dev/null +++ b/server/middlewares/validators/runners/index.ts @@ -0,0 +1,3 @@ +export * from './jobs' +export * from './registration-token' +export * from './runners' diff --git a/server/middlewares/validators/runners/job-files.ts b/server/middlewares/validators/runners/job-files.ts new file mode 100644 index 000000000..56afa39aa --- /dev/null +++ b/server/middlewares/validators/runners/job-files.ts @@ -0,0 +1,27 @@ +import express from 'express' +import { HttpStatusCode } from '@shared/models' +import { areValidationErrors, doesVideoExist, isValidVideoIdParam } from '../shared' + +const tags = [ 'runner' ] + +export const runnerJobGetVideoTranscodingFileValidator = [ + isValidVideoIdParam('videoId'), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res)) return + + if (!await doesVideoExist(req.params.videoId, res, 'all')) return + + const runnerJob = res.locals.runnerJob + + if (runnerJob.privatePayload.videoUUID !== res.locals.videoAll.uuid) { + return res.fail({ + status: HttpStatusCode.FORBIDDEN_403, + message: 'Job is not associated to this video', + tags: [ ...tags, res.locals.videoAll.uuid ] + }) + } + + return next() + } +] diff --git a/server/middlewares/validators/runners/jobs.ts b/server/middlewares/validators/runners/jobs.ts new file mode 100644 index 000000000..8cb87e946 --- /dev/null +++ b/server/middlewares/validators/runners/jobs.ts @@ -0,0 +1,156 @@ +import express from 'express' +import { body, param } from 'express-validator' +import { isUUIDValid } from '@server/helpers/custom-validators/misc' +import { + isRunnerJobAbortReasonValid, + isRunnerJobErrorMessageValid, + isRunnerJobProgressValid, + isRunnerJobSuccessPayloadValid, + isRunnerJobTokenValid, + isRunnerJobUpdatePayloadValid +} from '@server/helpers/custom-validators/runners/jobs' +import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners' +import { cleanUpReqFiles } from '@server/helpers/express-utils' +import { RunnerJobModel } from '@server/models/runner/runner-job' +import { HttpStatusCode, RunnerJobState, RunnerJobSuccessBody, RunnerJobUpdateBody, ServerErrorCode } from '@shared/models' +import { areValidationErrors } from '../shared' + +const tags = [ 'runner' ] + +export const acceptRunnerJobValidator = [ + (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (res.locals.runnerJob.state !== RunnerJobState.PENDING) { + return res.fail({ + status: HttpStatusCode.BAD_REQUEST_400, + message: 'This runner job is not in pending state', + tags + }) + } + + return next() + } +] + +export const abortRunnerJobValidator = [ + body('reason').custom(isRunnerJobAbortReasonValid), + + (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + return next() + } +] + +export const updateRunnerJobValidator = [ + body('progress').optional().custom(isRunnerJobProgressValid), + + (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req) + + const body = req.body as RunnerJobUpdateBody + + if (isRunnerJobUpdatePayloadValid(body.payload, res.locals.runnerJob.type, req.files) !== true) { + cleanUpReqFiles(req) + + return res.fail({ + status: HttpStatusCode.BAD_REQUEST_400, + message: 'Payload is invalid', + tags + }) + } + + return next() + } +] + +export const errorRunnerJobValidator = [ + body('message').custom(isRunnerJobErrorMessageValid), + + (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + return next() + } +] + +export const successRunnerJobValidator = [ + (req: express.Request, res: express.Response, next: express.NextFunction) => { + const body = req.body as RunnerJobSuccessBody + + if (isRunnerJobSuccessPayloadValid(body.payload, res.locals.runnerJob.type, req.files) !== true) { + cleanUpReqFiles(req) + + return res.fail({ + status: HttpStatusCode.BAD_REQUEST_400, + message: 'Payload is invalid', + tags + }) + } + + return next() + } +] + +export const runnerJobGetValidator = [ + param('jobUUID').custom(isUUIDValid), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + const runnerJob = await RunnerJobModel.loadWithRunner(req.params.jobUUID) + + if (!runnerJob) { + return res.fail({ + status: HttpStatusCode.NOT_FOUND_404, + message: 'Unknown runner job', + tags + }) + } + + res.locals.runnerJob = runnerJob + + return next() + } +] + +export const jobOfRunnerGetValidator = [ + param('jobUUID').custom(isUUIDValid), + + body('runnerToken').custom(isRunnerTokenValid), + body('jobToken').custom(isRunnerJobTokenValid), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req) + + const runnerJob = await RunnerJobModel.loadByRunnerAndJobTokensWithRunner({ + uuid: req.params.jobUUID, + runnerToken: req.body.runnerToken, + jobToken: req.body.jobToken + }) + + if (!runnerJob) { + cleanUpReqFiles(req) + + return res.fail({ + status: HttpStatusCode.NOT_FOUND_404, + message: 'Unknown runner job', + tags + }) + } + + if (runnerJob.state !== RunnerJobState.PROCESSING) { + cleanUpReqFiles(req) + + return res.fail({ + status: HttpStatusCode.BAD_REQUEST_400, + type: ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE, + message: 'Job is not in "processing" state', + tags + }) + } + + res.locals.runnerJob = runnerJob + + return next() + } +] diff --git a/server/middlewares/validators/runners/registration-token.ts b/server/middlewares/validators/runners/registration-token.ts new file mode 100644 index 000000000..cc31d4a7e --- /dev/null +++ b/server/middlewares/validators/runners/registration-token.ts @@ -0,0 +1,37 @@ +import express from 'express' +import { param } from 'express-validator' +import { isIdValid } from '@server/helpers/custom-validators/misc' +import { RunnerRegistrationTokenModel } from '@server/models/runner/runner-registration-token' +import { forceNumber } from '@shared/core-utils' +import { HttpStatusCode } from '@shared/models' +import { areValidationErrors } from '../shared/utils' + +const tags = [ 'runner' ] + +const deleteRegistrationTokenValidator = [ + param('id').custom(isIdValid), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + const registrationToken = await RunnerRegistrationTokenModel.load(forceNumber(req.params.id)) + + if (!registrationToken) { + return res.fail({ + status: HttpStatusCode.NOT_FOUND_404, + message: 'Registration token not found', + tags + }) + } + + res.locals.runnerRegistrationToken = registrationToken + + return next() + } +] + +// --------------------------------------------------------------------------- + +export { + deleteRegistrationTokenValidator +} diff --git a/server/middlewares/validators/runners/runners.ts b/server/middlewares/validators/runners/runners.ts new file mode 100644 index 000000000..71a1275d2 --- /dev/null +++ b/server/middlewares/validators/runners/runners.ts @@ -0,0 +1,95 @@ +import express from 'express' +import { body, param } from 'express-validator' +import { isIdValid } from '@server/helpers/custom-validators/misc' +import { + isRunnerDescriptionValid, + isRunnerNameValid, + isRunnerRegistrationTokenValid, + isRunnerTokenValid +} from '@server/helpers/custom-validators/runners/runners' +import { RunnerModel } from '@server/models/runner/runner' +import { RunnerRegistrationTokenModel } from '@server/models/runner/runner-registration-token' +import { forceNumber } from '@shared/core-utils' +import { HttpStatusCode, RegisterRunnerBody, ServerErrorCode } from '@shared/models' +import { areValidationErrors } from '../shared/utils' + +const tags = [ 'runner' ] + +const registerRunnerValidator = [ + body('registrationToken').custom(isRunnerRegistrationTokenValid), + body('name').custom(isRunnerNameValid), + body('description').optional().custom(isRunnerDescriptionValid), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + const body: RegisterRunnerBody = req.body + + const runnerRegistrationToken = await RunnerRegistrationTokenModel.loadByRegistrationToken(body.registrationToken) + + if (!runnerRegistrationToken) { + return res.fail({ + status: HttpStatusCode.NOT_FOUND_404, + message: 'Registration token is invalid', + tags + }) + } + + res.locals.runnerRegistrationToken = runnerRegistrationToken + + return next() + } +] + +const deleteRunnerValidator = [ + param('runnerId').custom(isIdValid), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + const runner = await RunnerModel.load(forceNumber(req.params.runnerId)) + + if (!runner) { + return res.fail({ + status: HttpStatusCode.NOT_FOUND_404, + message: 'Runner not found', + tags + }) + } + + res.locals.runner = runner + + return next() + } +] + +const getRunnerFromTokenValidator = [ + body('runnerToken').custom(isRunnerTokenValid), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + if (areValidationErrors(req, res, { tags })) return + + const runner = await RunnerModel.loadByToken(req.body.runnerToken) + + if (!runner) { + return res.fail({ + status: HttpStatusCode.NOT_FOUND_404, + message: 'Unknown runner token', + type: ServerErrorCode.UNKNOWN_RUNNER_TOKEN, + tags + }) + } + + res.locals.runner = runner + + return next() + } +] + +// --------------------------------------------------------------------------- + +export { + registerRunnerValidator, + deleteRunnerValidator, + getRunnerFromTokenValidator +} diff --git a/server/middlewares/validators/sort.ts b/server/middlewares/validators/sort.ts index e6cc46317..959f663ac 100644 --- a/server/middlewares/validators/sort.ts +++ b/server/middlewares/validators/sort.ts @@ -34,6 +34,10 @@ export const videoChannelsFollowersSortValidator = checkSortFactory(SORTABLE_COL export const userRegistrationsSortValidator = checkSortFactory(SORTABLE_COLUMNS.USER_REGISTRATIONS) +export const runnersSortValidator = checkSortFactory(SORTABLE_COLUMNS.RUNNERS) +export const runnerRegistrationTokensSortValidator = checkSortFactory(SORTABLE_COLUMNS.RUNNER_REGISTRATION_TOKENS) +export const runnerJobsSortValidator = checkSortFactory(SORTABLE_COLUMNS.RUNNER_JOBS) + // --------------------------------------------------------------------------- function checkSortFactory (columns: string[], tags: string[] = []) { diff --git a/server/middlewares/validators/videos/video-live.ts b/server/middlewares/validators/videos/video-live.ts index e80fe1593..2aff831a8 100644 --- a/server/middlewares/validators/videos/video-live.ts +++ b/server/middlewares/validators/videos/video-live.ts @@ -115,6 +115,15 @@ const videoLiveAddValidator = getCommonVideoEditAttributes().concat([ }) } + if (body.saveReplay && !body.replaySettings?.privacy) { + cleanUpReqFiles(req) + + return res.fail({ + status: HttpStatusCode.BAD_REQUEST_400, + message: 'Live replay is enabled but privacy replay setting is missing' + }) + } + const user = res.locals.oauth.token.User if (!await doesVideoChannelOfAccountExist(body.channelId, user, res)) return cleanUpReqFiles(req) diff --git a/server/middlewares/validators/videos/video-studio.ts b/server/middlewares/validators/videos/video-studio.ts index b3e2d8101..4397e887e 100644 --- a/server/middlewares/validators/videos/video-studio.ts +++ b/server/middlewares/validators/videos/video-studio.ts @@ -10,7 +10,7 @@ import { import { cleanUpReqFiles } from '@server/helpers/express-utils' import { CONFIG } from '@server/initializers/config' import { approximateIntroOutroAdditionalSize, getTaskFile } from '@server/lib/video-studio' -import { isAudioFile } from '@shared/extra-utils' +import { isAudioFile } from '@shared/ffmpeg' import { HttpStatusCode, UserRight, VideoState, VideoStudioCreateEdition, VideoStudioTask } from '@shared/models' import { areValidationErrors, checkUserCanManageVideo, checkUserQuota, doesVideoExist } from '../shared' diff --git a/server/middlewares/validators/videos/videos.ts b/server/middlewares/validators/videos/videos.ts index d3014e8e7..794e1d4f1 100644 --- a/server/middlewares/validators/videos/videos.ts +++ b/server/middlewares/validators/videos/videos.ts @@ -7,6 +7,7 @@ import { getServerActor } from '@server/models/application/application' import { ExpressPromiseHandler } from '@server/types/express-handler' import { MUserAccountId, MVideoFullLight } from '@server/types/models' import { arrayify, getAllPrivacies } from '@shared/core-utils' +import { getVideoStreamDuration } from '@shared/ffmpeg' import { HttpStatusCode, ServerErrorCode, UserRight, VideoInclude, VideoState } from '@shared/models' import { exists, @@ -37,7 +38,6 @@ import { isVideoSupportValid } from '../../../helpers/custom-validators/videos' import { cleanUpReqFiles } from '../../../helpers/express-utils' -import { getVideoStreamDuration } from '../../../helpers/ffmpeg' import { logger } from '../../../helpers/logger' import { deleteFileAndCatch } from '../../../helpers/utils' import { getVideoWithAttributes } from '../../../helpers/video' -- cgit v1.2.3