From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/controllers/api/config.ts | 6 + server/controllers/api/index.ts | 2 + server/controllers/api/jobs.ts | 3 + server/controllers/api/runners/index.ts | 18 ++ server/controllers/api/runners/jobs-files.ts | 84 +++++ server/controllers/api/runners/jobs.ts | 352 +++++++++++++++++++++ server/controllers/api/runners/manage-runners.ts | 107 +++++++ .../controllers/api/runners/registration-tokens.ts | 87 +++++ server/controllers/api/videos/transcoding.ts | 87 +---- server/controllers/api/videos/upload.ts | 71 ++--- server/controllers/bots.ts | 6 +- server/controllers/object-storage-proxy.ts | 87 +---- 12 files changed, 710 insertions(+), 200 deletions(-) create mode 100644 server/controllers/api/runners/index.ts create mode 100644 server/controllers/api/runners/jobs-files.ts create mode 100644 server/controllers/api/runners/jobs.ts create mode 100644 server/controllers/api/runners/manage-runners.ts create mode 100644 server/controllers/api/runners/registration-tokens.ts (limited to 'server/controllers') diff --git a/server/controllers/api/config.ts b/server/controllers/api/config.ts index 60d168d12..0b9aaffda 100644 --- a/server/controllers/api/config.ts +++ b/server/controllers/api/config.ts @@ -217,6 +217,9 @@ function customConfig (): CustomConfig { }, transcoding: { enabled: CONFIG.TRANSCODING.ENABLED, + remoteRunners: { + enabled: CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED + }, allowAdditionalExtensions: CONFIG.TRANSCODING.ALLOW_ADDITIONAL_EXTENSIONS, allowAudioFiles: CONFIG.TRANSCODING.ALLOW_AUDIO_FILES, threads: CONFIG.TRANSCODING.THREADS, @@ -252,6 +255,9 @@ function customConfig (): CustomConfig { maxUserLives: CONFIG.LIVE.MAX_USER_LIVES, transcoding: { enabled: CONFIG.LIVE.TRANSCODING.ENABLED, + remoteRunners: { + enabled: CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED + }, threads: CONFIG.LIVE.TRANSCODING.THREADS, profile: CONFIG.LIVE.TRANSCODING.PROFILE, resolutions: { diff --git a/server/controllers/api/index.ts b/server/controllers/api/index.ts index e1d197c8a..646f9597e 100644 --- a/server/controllers/api/index.ts +++ b/server/controllers/api/index.ts @@ -15,6 +15,7 @@ import { metricsRouter } from './metrics' import { oauthClientsRouter } from './oauth-clients' import { overviewsRouter } from './overviews' import { pluginRouter } from './plugins' +import { runnersRouter } from './runners' import { searchRouter } from './search' import { serverRouter } from './server' import { usersRouter } from './users' @@ -55,6 +56,7 @@ apiRouter.use('/overviews', overviewsRouter) apiRouter.use('/plugins', pluginRouter) apiRouter.use('/custom-pages', customPageRouter) apiRouter.use('/blocklist', blocklistRouter) +apiRouter.use('/runners', runnersRouter) apiRouter.use('/ping', pong) apiRouter.use('/*', badRequest) diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index 6a53e3083..b63e2f962 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -93,6 +93,9 @@ async function formatJob (job: BullJob, state?: JobState): Promise { state: state || await job.getState(), type: job.queueName as JobType, data: job.data, + parent: job.parent + ? { id: job.parent.id } + : undefined, progress: job.progress as number, priority: job.opts.priority, error, diff --git a/server/controllers/api/runners/index.ts b/server/controllers/api/runners/index.ts new file mode 100644 index 000000000..c98ded354 --- /dev/null +++ b/server/controllers/api/runners/index.ts @@ -0,0 +1,18 @@ +import express from 'express' +import { runnerJobsRouter } from './jobs' +import { runnerJobFilesRouter } from './jobs-files' +import { manageRunnersRouter } from './manage-runners' +import { runnerRegistrationTokensRouter } from './registration-tokens' + +const runnersRouter = express.Router() + +runnersRouter.use('/', manageRunnersRouter) +runnersRouter.use('/', runnerJobsRouter) +runnersRouter.use('/', runnerJobFilesRouter) +runnersRouter.use('/', runnerRegistrationTokensRouter) + +// --------------------------------------------------------------------------- + +export { + runnersRouter +} diff --git a/server/controllers/api/runners/jobs-files.ts b/server/controllers/api/runners/jobs-files.ts new file mode 100644 index 000000000..e43ce35f5 --- /dev/null +++ b/server/controllers/api/runners/jobs-files.ts @@ -0,0 +1,84 @@ +import express from 'express' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { proxifyHLS, proxifyWebTorrentFile } from '@server/lib/object-storage' +import { VideoPathManager } from '@server/lib/video-path-manager' +import { asyncMiddleware } from '@server/middlewares' +import { jobOfRunnerGetValidator } from '@server/middlewares/validators/runners' +import { runnerJobGetVideoTranscodingFileValidator } from '@server/middlewares/validators/runners/job-files' +import { VideoStorage } from '@shared/models' + +const lTags = loggerTagsFactory('api', 'runner') + +const runnerJobFilesRouter = express.Router() + +runnerJobFilesRouter.post('/jobs/:jobUUID/files/videos/:videoId/max-quality', + asyncMiddleware(jobOfRunnerGetValidator), + asyncMiddleware(runnerJobGetVideoTranscodingFileValidator), + asyncMiddleware(getMaxQualityVideoFile) +) + +runnerJobFilesRouter.post('/jobs/:jobUUID/files/videos/:videoId/previews/max-quality', + asyncMiddleware(jobOfRunnerGetValidator), + asyncMiddleware(runnerJobGetVideoTranscodingFileValidator), + getMaxQualityVideoPreview +) + +// --------------------------------------------------------------------------- + +export { + runnerJobFilesRouter +} + +// --------------------------------------------------------------------------- + +async function getMaxQualityVideoFile (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + const runner = runnerJob.Runner + const video = res.locals.videoAll + + logger.info( + 'Get max quality file of video %s of job %s for runner %s', video.uuid, runnerJob.uuid, runner.name, + lTags(runner.name, runnerJob.id, runnerJob.type) + ) + + const file = video.getMaxQualityFile() + + if (file.storage === VideoStorage.OBJECT_STORAGE) { + if (file.isHLS()) { + return proxifyHLS({ + req, + res, + filename: file.filename, + playlist: video.getHLSPlaylist(), + reinjectVideoFileToken: false, + video + }) + } + + // Web video + return proxifyWebTorrentFile({ + req, + res, + filename: file.filename + }) + } + + return VideoPathManager.Instance.makeAvailableVideoFile(file, videoPath => { + return res.sendFile(videoPath) + }) +} + +function getMaxQualityVideoPreview (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + const runner = runnerJob.Runner + const video = res.locals.videoAll + + logger.info( + 'Get max quality preview file of video %s of job %s for runner %s', video.uuid, runnerJob.uuid, runner.name, + lTags(runner.name, runnerJob.id, runnerJob.type) + ) + + const file = video.getPreview() + + return res.sendFile(file.getPath()) +} diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts new file mode 100644 index 000000000..7d488ec11 --- /dev/null +++ b/server/controllers/api/runners/jobs.ts @@ -0,0 +1,352 @@ +import express, { UploadFiles } from 'express' +import { createReqFiles } from '@server/helpers/express-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { generateRunnerJobToken } from '@server/helpers/token-generator' +import { MIMETYPES } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { getRunnerJobHandlerClass, updateLastRunnerContact } from '@server/lib/runners' +import { + asyncMiddleware, + authenticate, + ensureUserHasRight, + paginationValidator, + runnerJobsSortValidator, + setDefaultPagination, + setDefaultSort +} from '@server/middlewares' +import { + abortRunnerJobValidator, + acceptRunnerJobValidator, + errorRunnerJobValidator, + getRunnerFromTokenValidator, + jobOfRunnerGetValidator, + runnerJobGetValidator, + successRunnerJobValidator, + updateRunnerJobValidator +} from '@server/middlewares/validators/runners' +import { RunnerModel } from '@server/models/runner/runner' +import { RunnerJobModel } from '@server/models/runner/runner-job' +import { + AbortRunnerJobBody, + AcceptRunnerJobResult, + ErrorRunnerJobBody, + HttpStatusCode, + ListRunnerJobsQuery, + LiveRTMPHLSTranscodingUpdatePayload, + RequestRunnerJobResult, + RunnerJobState, + RunnerJobSuccessBody, + RunnerJobSuccessPayload, + RunnerJobType, + RunnerJobUpdateBody, + RunnerJobUpdatePayload, + UserRight, + VODAudioMergeTranscodingSuccess, + VODHLSTranscodingSuccess, + VODWebVideoTranscodingSuccess +} from '@shared/models' + +const postRunnerJobSuccessVideoFiles = createReqFiles( + [ 'payload[videoFile]', 'payload[resolutionPlaylistFile]' ], + { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } +) + +const runnerJobUpdateVideoFiles = createReqFiles( + [ 'payload[videoChunkFile]', 'payload[resolutionPlaylistFile]', 'payload[masterPlaylistFile]' ], + { ...MIMETYPES.VIDEO.MIMETYPE_EXT, ...MIMETYPES.M3U8.MIMETYPE_EXT } +) + +const lTags = loggerTagsFactory('api', 'runner') + +const runnerJobsRouter = express.Router() + +// --------------------------------------------------------------------------- +// Controllers for runners +// --------------------------------------------------------------------------- + +runnerJobsRouter.post('/jobs/request', + asyncMiddleware(getRunnerFromTokenValidator), + asyncMiddleware(requestRunnerJob) +) + +runnerJobsRouter.post('/jobs/:jobUUID/accept', + asyncMiddleware(runnerJobGetValidator), + acceptRunnerJobValidator, + asyncMiddleware(getRunnerFromTokenValidator), + asyncMiddleware(acceptRunnerJob) +) + +runnerJobsRouter.post('/jobs/:jobUUID/abort', + asyncMiddleware(jobOfRunnerGetValidator), + abortRunnerJobValidator, + asyncMiddleware(abortRunnerJob) +) + +runnerJobsRouter.post('/jobs/:jobUUID/update', + runnerJobUpdateVideoFiles, + asyncMiddleware(jobOfRunnerGetValidator), + updateRunnerJobValidator, + asyncMiddleware(updateRunnerJobController) +) + +runnerJobsRouter.post('/jobs/:jobUUID/error', + asyncMiddleware(jobOfRunnerGetValidator), + errorRunnerJobValidator, + asyncMiddleware(errorRunnerJob) +) + +runnerJobsRouter.post('/jobs/:jobUUID/success', + postRunnerJobSuccessVideoFiles, + asyncMiddleware(jobOfRunnerGetValidator), + successRunnerJobValidator, + asyncMiddleware(postRunnerJobSuccess) +) + +// --------------------------------------------------------------------------- +// Controllers for admins +// --------------------------------------------------------------------------- + +runnerJobsRouter.post('/jobs/:jobUUID/cancel', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + asyncMiddleware(runnerJobGetValidator), + asyncMiddleware(cancelRunnerJob) +) + +runnerJobsRouter.get('/jobs', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + paginationValidator, + runnerJobsSortValidator, + setDefaultSort, + setDefaultPagination, + asyncMiddleware(listRunnerJobs) +) + +// --------------------------------------------------------------------------- + +export { + runnerJobsRouter +} + +// --------------------------------------------------------------------------- + +// --------------------------------------------------------------------------- +// Controllers for runners +// --------------------------------------------------------------------------- + +async function requestRunnerJob (req: express.Request, res: express.Response) { + const runner = res.locals.runner + const availableJobs = await RunnerJobModel.listAvailableJobs() + + logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) + + const result: RequestRunnerJobResult = { + availableJobs: availableJobs.map(j => ({ + uuid: j.uuid, + type: j.type, + payload: j.payload + })) + } + + updateLastRunnerContact(req, runner) + + return res.json(result) +} + +async function acceptRunnerJob (req: express.Request, res: express.Response) { + const runner = res.locals.runner + const runnerJob = res.locals.runnerJob + + runnerJob.state = RunnerJobState.PROCESSING + runnerJob.processingJobToken = generateRunnerJobToken() + runnerJob.startedAt = new Date() + runnerJob.runnerId = runner.id + + const newRunnerJob = await sequelizeTypescript.transaction(transaction => { + return runnerJob.save({ transaction }) + }) + newRunnerJob.Runner = runner as RunnerModel + + const result: AcceptRunnerJobResult = { + job: { + ...newRunnerJob.toFormattedJSON(), + + jobToken: newRunnerJob.processingJobToken + } + } + + updateLastRunnerContact(req, runner) + + logger.info( + 'Remote runner %s has accepted job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, + lTags(runner.name, runnerJob.uuid, runnerJob.type) + ) + + return res.json(result) +} + +async function abortRunnerJob (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + const runner = runnerJob.Runner + const body: AbortRunnerJobBody = req.body + + logger.info( + 'Remote runner %s is aborting job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, + { reason: body.reason, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } + ) + + const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) + await new RunnerJobHandler().abort({ runnerJob }) + + updateLastRunnerContact(req, runnerJob.Runner) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function errorRunnerJob (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + const runner = runnerJob.Runner + const body: ErrorRunnerJobBody = req.body + + runnerJob.failures += 1 + + logger.error( + 'Remote runner %s had an error with job %s (%s)', runner.name, runnerJob.uuid, runnerJob.type, + { errorMessage: body.message, totalFailures: runnerJob.failures, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } + ) + + const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) + await new RunnerJobHandler().error({ runnerJob, message: body.message }) + + updateLastRunnerContact(req, runnerJob.Runner) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +// --------------------------------------------------------------------------- + +const jobUpdateBuilders: { + [id in RunnerJobType]?: (payload: RunnerJobUpdatePayload, files?: UploadFiles) => RunnerJobUpdatePayload +} = { + 'live-rtmp-hls-transcoding': (payload: LiveRTMPHLSTranscodingUpdatePayload, files) => { + return { + ...payload, + + masterPlaylistFile: files['payload[masterPlaylistFile]']?.[0].path, + resolutionPlaylistFile: files['payload[resolutionPlaylistFile]']?.[0].path, + videoChunkFile: files['payload[videoChunkFile]']?.[0].path + } + } +} + +async function updateRunnerJobController (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + const runner = runnerJob.Runner + const body: RunnerJobUpdateBody = req.body + + const payloadBuilder = jobUpdateBuilders[runnerJob.type] + const updatePayload = payloadBuilder + ? payloadBuilder(body.payload, req.files as UploadFiles) + : undefined + + logger.debug( + 'Remote runner %s is updating job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, + { body, updatePayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } + ) + + const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) + await new RunnerJobHandler().update({ + runnerJob, + progress: req.body.progress, + updatePayload + }) + + updateLastRunnerContact(req, runnerJob.Runner) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +// --------------------------------------------------------------------------- + +const jobSuccessPayloadBuilders: { + [id in RunnerJobType]: (payload: RunnerJobSuccessPayload, files?: UploadFiles) => RunnerJobSuccessPayload +} = { + 'vod-web-video-transcoding': (payload: VODWebVideoTranscodingSuccess, files) => { + return { + ...payload, + + videoFile: files['payload[videoFile]'][0].path + } + }, + + 'vod-hls-transcoding': (payload: VODHLSTranscodingSuccess, files) => { + return { + ...payload, + + videoFile: files['payload[videoFile]'][0].path, + resolutionPlaylistFile: files['payload[resolutionPlaylistFile]'][0].path + } + }, + + 'vod-audio-merge-transcoding': (payload: VODAudioMergeTranscodingSuccess, files) => { + return { + ...payload, + + videoFile: files['payload[videoFile]'][0].path + } + }, + + 'live-rtmp-hls-transcoding': () => ({}) +} + +async function postRunnerJobSuccess (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + const runner = runnerJob.Runner + const body: RunnerJobSuccessBody = req.body + + const resultPayload = jobSuccessPayloadBuilders[runnerJob.type](body.payload, req.files as UploadFiles) + + logger.info( + 'Remote runner %s is sending success result for job %s (%s)', runnerJob.Runner.name, runnerJob.uuid, runnerJob.type, + { resultPayload, ...lTags(runner.name, runnerJob.uuid, runnerJob.type) } + ) + + const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) + await new RunnerJobHandler().complete({ runnerJob, resultPayload }) + + updateLastRunnerContact(req, runnerJob.Runner) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +// --------------------------------------------------------------------------- +// Controllers for admins +// --------------------------------------------------------------------------- + +async function cancelRunnerJob (req: express.Request, res: express.Response) { + const runnerJob = res.locals.runnerJob + + logger.info('Cancelling job %s (%s)', runnerJob.type, lTags(runnerJob.uuid, runnerJob.type)) + + const RunnerJobHandler = getRunnerJobHandlerClass(runnerJob) + await new RunnerJobHandler().cancel({ runnerJob }) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function listRunnerJobs (req: express.Request, res: express.Response) { + const query: ListRunnerJobsQuery = req.query + + const resultList = await RunnerJobModel.listForApi({ + start: query.start, + count: query.count, + sort: query.sort, + search: query.search + }) + + return res.json({ + total: resultList.total, + data: resultList.data.map(d => d.toFormattedAdminJSON()) + }) +} diff --git a/server/controllers/api/runners/manage-runners.ts b/server/controllers/api/runners/manage-runners.ts new file mode 100644 index 000000000..eb08c4b1d --- /dev/null +++ b/server/controllers/api/runners/manage-runners.ts @@ -0,0 +1,107 @@ +import express from 'express' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { generateRunnerToken } from '@server/helpers/token-generator' +import { + asyncMiddleware, + authenticate, + ensureUserHasRight, + paginationValidator, + runnersSortValidator, + setDefaultPagination, + setDefaultSort +} from '@server/middlewares' +import { deleteRunnerValidator, getRunnerFromTokenValidator, registerRunnerValidator } from '@server/middlewares/validators/runners' +import { RunnerModel } from '@server/models/runner/runner' +import { HttpStatusCode, ListRunnersQuery, RegisterRunnerBody, UserRight } from '@shared/models' + +const lTags = loggerTagsFactory('api', 'runner') + +const manageRunnersRouter = express.Router() + +manageRunnersRouter.post('/register', + asyncMiddleware(registerRunnerValidator), + asyncMiddleware(registerRunner) +) +manageRunnersRouter.post('/unregister', + asyncMiddleware(getRunnerFromTokenValidator), + asyncMiddleware(unregisterRunner) +) + +manageRunnersRouter.delete('/:runnerId', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + asyncMiddleware(deleteRunnerValidator), + asyncMiddleware(deleteRunner) +) + +manageRunnersRouter.get('/', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + paginationValidator, + runnersSortValidator, + setDefaultSort, + setDefaultPagination, + asyncMiddleware(listRunners) +) + +// --------------------------------------------------------------------------- + +export { + manageRunnersRouter +} + +// --------------------------------------------------------------------------- + +async function registerRunner (req: express.Request, res: express.Response) { + const body: RegisterRunnerBody = req.body + + const runnerToken = generateRunnerToken() + + const runner = new RunnerModel({ + runnerToken, + name: body.name, + description: body.description, + lastContact: new Date(), + ip: req.ip, + runnerRegistrationTokenId: res.locals.runnerRegistrationToken.id + }) + + await runner.save() + + logger.info('Registered new runner %s', runner.name, { ...lTags(runner.name) }) + + return res.json({ id: runner.id, runnerToken }) +} +async function unregisterRunner (req: express.Request, res: express.Response) { + const runner = res.locals.runner + await runner.destroy() + + logger.info('Unregistered runner %s', runner.name, { ...lTags(runner.name) }) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function deleteRunner (req: express.Request, res: express.Response) { + const runner = res.locals.runner + + await runner.destroy() + + logger.info('Deleted runner %s', runner.name, { ...lTags(runner.name) }) + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function listRunners (req: express.Request, res: express.Response) { + const query: ListRunnersQuery = req.query + + const resultList = await RunnerModel.listForApi({ + start: query.start, + count: query.count, + sort: query.sort + }) + + return res.json({ + total: resultList.total, + data: resultList.data.map(d => d.toFormattedJSON()) + }) +} diff --git a/server/controllers/api/runners/registration-tokens.ts b/server/controllers/api/runners/registration-tokens.ts new file mode 100644 index 000000000..5ac3773fe --- /dev/null +++ b/server/controllers/api/runners/registration-tokens.ts @@ -0,0 +1,87 @@ +import express from 'express' +import { generateRunnerRegistrationToken } from '@server/helpers/token-generator' +import { + asyncMiddleware, + authenticate, + ensureUserHasRight, + paginationValidator, + runnerRegistrationTokensSortValidator, + setDefaultPagination, + setDefaultSort +} from '@server/middlewares' +import { deleteRegistrationTokenValidator } from '@server/middlewares/validators/runners' +import { RunnerRegistrationTokenModel } from '@server/models/runner/runner-registration-token' +import { HttpStatusCode, ListRunnerRegistrationTokensQuery, UserRight } from '@shared/models' +import { logger, loggerTagsFactory } from '@server/helpers/logger' + +const lTags = loggerTagsFactory('api', 'runner') + +const runnerRegistrationTokensRouter = express.Router() + +runnerRegistrationTokensRouter.post('/registration-tokens/generate', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + asyncMiddleware(generateRegistrationToken) +) + +runnerRegistrationTokensRouter.delete('/registration-tokens/:id', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + asyncMiddleware(deleteRegistrationTokenValidator), + asyncMiddleware(deleteRegistrationToken) +) + +runnerRegistrationTokensRouter.get('/registration-tokens', + authenticate, + ensureUserHasRight(UserRight.MANAGE_RUNNERS), + paginationValidator, + runnerRegistrationTokensSortValidator, + setDefaultSort, + setDefaultPagination, + asyncMiddleware(listRegistrationTokens) +) + +// --------------------------------------------------------------------------- + +export { + runnerRegistrationTokensRouter +} + +// --------------------------------------------------------------------------- + +async function generateRegistrationToken (req: express.Request, res: express.Response) { + logger.info('Generating new runner registration token.', lTags()) + + const registrationToken = new RunnerRegistrationTokenModel({ + registrationToken: generateRunnerRegistrationToken() + }) + + await registrationToken.save() + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function deleteRegistrationToken (req: express.Request, res: express.Response) { + logger.info('Removing runner registration token.', lTags()) + + const runnerRegistrationToken = res.locals.runnerRegistrationToken + + await runnerRegistrationToken.destroy() + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function listRegistrationTokens (req: express.Request, res: express.Response) { + const query: ListRunnerRegistrationTokensQuery = req.query + + const resultList = await RunnerRegistrationTokenModel.listForApi({ + start: query.start, + count: query.count, + sort: query.sort + }) + + return res.json({ + total: resultList.total, + data: resultList.data.map(d => d.toFormattedJSON()) + }) +} diff --git a/server/controllers/api/videos/transcoding.ts b/server/controllers/api/videos/transcoding.ts index 8c9a5322b..54f484b2b 100644 --- a/server/controllers/api/videos/transcoding.ts +++ b/server/controllers/api/videos/transcoding.ts @@ -1,10 +1,8 @@ -import Bluebird from 'bluebird' import express from 'express' -import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' -import { JobQueue } from '@server/lib/job-queue' import { Hooks } from '@server/lib/plugins/hooks' -import { buildTranscodingJob } from '@server/lib/video' +import { createTranscodingJobs } from '@server/lib/transcoding/create-transcoding-job' +import { computeResolutionsToTranscode } from '@server/lib/transcoding/transcoding-resolutions' import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models' import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares' @@ -47,82 +45,13 @@ async function createTranscoding (req: express.Request, res: express.Response) { video.state = VideoState.TO_TRANSCODE await video.save() - const childrenResolutions = resolutions.filter(r => r !== maxResolution) - - logger.info('Manually creating transcoding jobs for %s.', body.transcodingType, { childrenResolutions, maxResolution }) - - const children = await Bluebird.mapSeries(childrenResolutions, resolution => { - if (body.transcodingType === 'hls') { - return buildHLSJobOption({ - videoUUID: video.uuid, - hasAudio, - resolution, - isMaxQuality: false - }) - } - - if (body.transcodingType === 'webtorrent') { - return buildWebTorrentJobOption({ - videoUUID: video.uuid, - hasAudio, - resolution - }) - } - }) - - const parent = body.transcodingType === 'hls' - ? await buildHLSJobOption({ - videoUUID: video.uuid, - hasAudio, - resolution: maxResolution, - isMaxQuality: false - }) - : await buildWebTorrentJobOption({ - videoUUID: video.uuid, - hasAudio, - resolution: maxResolution - }) - - // Porcess the last resolution after the other ones to prevent concurrency issue - // Because low resolutions use the biggest one as ffmpeg input - await JobQueue.Instance.createJobWithChildren(parent, children) - - return res.sendStatus(HttpStatusCode.NO_CONTENT_204) -} - -function buildHLSJobOption (options: { - videoUUID: string - hasAudio: boolean - resolution: number - isMaxQuality: boolean -}) { - const { videoUUID, hasAudio, resolution, isMaxQuality } = options - - return buildTranscodingJob({ - type: 'new-resolution-to-hls', - videoUUID, - resolution, - hasAudio, - copyCodecs: false, + await createTranscodingJobs({ + video, + resolutions, + transcodingType: body.transcodingType, isNewVideo: false, - autoDeleteWebTorrentIfNeeded: false, - isMaxQuality + user: null // Don't specify priority since these transcoding jobs are fired by the admin }) -} -function buildWebTorrentJobOption (options: { - videoUUID: string - hasAudio: boolean - resolution: number -}) { - const { videoUUID, hasAudio, resolution } = options - - return buildTranscodingJob({ - type: 'new-resolution-to-webtorrent', - videoUUID, - isNewVideo: false, - resolution, - hasAudio, - createHLSIfNeeded: false - }) + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) } diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 43313a143..885ac8b81 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts @@ -3,28 +3,20 @@ import { move } from 'fs-extra' import { basename } from 'path' import { getResumableUploadPath } from '@server/helpers/upload' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' -import { JobQueue } from '@server/lib/job-queue' -import { generateWebTorrentVideoFilename } from '@server/lib/paths' +import { CreateJobArgument, CreateJobOptions, JobQueue } from '@server/lib/job-queue' import { Redis } from '@server/lib/redis' import { uploadx } from '@server/lib/uploadx' -import { - buildLocalVideoFromReq, - buildMoveToObjectStorageJob, - buildOptimizeOrMergeAudioJob, - buildVideoThumbnailsFromReq, - setVideoTags -} from '@server/lib/video' +import { buildLocalVideoFromReq, buildMoveToObjectStorageJob, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' +import { buildNewFile } from '@server/lib/video-file' import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { openapiOperationDoc } from '@server/middlewares/doc' import { VideoSourceModel } from '@server/models/video/video-source' import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' -import { getLowercaseExtension } from '@shared/core-utils' -import { isAudioFile, uuidToShort } from '@shared/extra-utils' -import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models' +import { uuidToShort } from '@shared/extra-utils' +import { HttpStatusCode, VideoCreate, VideoState } from '@shared/models' import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' import { createReqFiles } from '../../../helpers/express-utils' -import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' import { logger, loggerTagsFactory } from '../../../helpers/logger' import { MIMETYPES } from '../../../initializers/constants' import { sequelizeTypescript } from '../../../initializers/database' @@ -41,7 +33,6 @@ import { } from '../../../middlewares' import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' import { VideoModel } from '../../../models/video/video' -import { VideoFileModel } from '../../../models/video/video-file' const lTags = loggerTagsFactory('api', 'video') const auditLogger = auditLoggerFactory('videos') @@ -148,7 +139,7 @@ async function addVideo (options: { video.VideoChannel = videoChannel video.url = getLocalVideoActivityPubUrl(video) // We use the UUID, so set the URL after building the object - const videoFile = await buildNewFile(videoPhysicalFile) + const videoFile = await buildNewFile({ path: videoPhysicalFile.path, mode: 'web-video' }) const originalFilename = videoPhysicalFile.originalname // Move physical file @@ -227,30 +218,8 @@ async function addVideo (options: { } } -async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { - const videoFile = new VideoFileModel({ - extname: getLowercaseExtension(videoPhysicalFile.filename), - size: videoPhysicalFile.size, - videoStreamingPlaylistId: null, - metadata: await buildFileMetadata(videoPhysicalFile.path) - }) - - const probe = await ffprobePromise(videoPhysicalFile.path) - - if (await isAudioFile(videoPhysicalFile.path, probe)) { - videoFile.resolution = VideoResolution.H_NOVIDEO - } else { - videoFile.fps = await getVideoStreamFPS(videoPhysicalFile.path, probe) - videoFile.resolution = (await getVideoStreamDimensionsInfo(videoPhysicalFile.path, probe)).resolution - } - - videoFile.filename = generateWebTorrentVideoFilename(videoFile.resolution, videoFile.extname) - - return videoFile -} - async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) { - return JobQueue.Instance.createSequentialJobFlow( + const jobs: (CreateJobArgument & CreateJobOptions)[] = [ { type: 'manage-video-torrent' as 'manage-video-torrent', payload: { @@ -274,16 +243,26 @@ async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVide videoUUID: video.uuid, isNewVideo: true } - }, + } + ] - video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE - ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined }) - : undefined, + if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { + jobs.push(await buildMoveToObjectStorageJob({ video, previousVideoState: undefined })) + } + + if (video.state === VideoState.TO_TRANSCODE) { + jobs.push({ + type: 'transcoding-job-builder' as 'transcoding-job-builder', + payload: { + videoUUID: video.uuid, + optimizeJob: { + isNewVideo: true + } + } + }) + } - video.state === VideoState.TO_TRANSCODE - ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) - : undefined - ) + return JobQueue.Instance.createSequentialJobFlow(...jobs) } async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { diff --git a/server/controllers/bots.ts b/server/controllers/bots.ts index a5ce1d79f..2b825a730 100644 --- a/server/controllers/bots.ts +++ b/server/controllers/bots.ts @@ -1,8 +1,8 @@ -import { getServerActor } from '@server/models/application/application' -import { logger } from '@uploadx/core' import express from 'express' import { truncate } from 'lodash' -import { SitemapStream, streamToPromise, ErrorLevel } from 'sitemap' +import { ErrorLevel, SitemapStream, streamToPromise } from 'sitemap' +import { logger } from '@server/helpers/logger' +import { getServerActor } from '@server/models/application/application' import { buildNSFWFilter } from '../helpers/express-utils' import { ROUTE_CACHE_LIFETIME, WEBSERVER } from '../initializers/constants' import { asyncMiddleware } from '../middlewares' diff --git a/server/controllers/object-storage-proxy.ts b/server/controllers/object-storage-proxy.ts index c530b57f8..8e2cc4af9 100644 --- a/server/controllers/object-storage-proxy.ts +++ b/server/controllers/object-storage-proxy.ts @@ -1,11 +1,7 @@ import cors from 'cors' import express from 'express' -import { PassThrough, pipeline } from 'stream' -import { logger } from '@server/helpers/logger' -import { StreamReplacer } from '@server/helpers/stream-replacer' import { OBJECT_STORAGE_PROXY_PATHS } from '@server/initializers/constants' -import { injectQueryToPlaylistUrls } from '@server/lib/hls' -import { getHLSFileReadStream, getWebTorrentFileReadStream } from '@server/lib/object-storage' +import { proxifyHLS, proxifyWebTorrentFile } from '@server/lib/object-storage' import { asyncMiddleware, ensureCanAccessPrivateVideoHLSFiles, @@ -13,9 +9,7 @@ import { ensurePrivateObjectStorageProxyIsEnabled, optionalAuthenticate } from '@server/middlewares' -import { HttpStatusCode } from '@shared/models' -import { buildReinjectVideoFileTokenQuery, doReinjectVideoFileToken } from './shared/m3u8-playlist' -import { GetObjectCommandOutput } from '@aws-sdk/client-s3' +import { doReinjectVideoFileToken } from './shared/m3u8-playlist' const objectStorageProxyRouter = express.Router() @@ -25,14 +19,14 @@ objectStorageProxyRouter.get(OBJECT_STORAGE_PROXY_PATHS.PRIVATE_WEBSEED + ':file ensurePrivateObjectStorageProxyIsEnabled, optionalAuthenticate, asyncMiddleware(ensureCanAccessVideoPrivateWebTorrentFiles), - asyncMiddleware(proxifyWebTorrent) + asyncMiddleware(proxifyWebTorrentController) ) objectStorageProxyRouter.get(OBJECT_STORAGE_PROXY_PATHS.STREAMING_PLAYLISTS.PRIVATE_HLS + ':videoUUID/:filename', ensurePrivateObjectStorageProxyIsEnabled, optionalAuthenticate, asyncMiddleware(ensureCanAccessPrivateVideoHLSFiles), - asyncMiddleware(proxifyHLS) + asyncMiddleware(proxifyHLSController) ) // --------------------------------------------------------------------------- @@ -41,76 +35,25 @@ export { objectStorageProxyRouter } -async function proxifyWebTorrent (req: express.Request, res: express.Response) { +function proxifyWebTorrentController (req: express.Request, res: express.Response) { const filename = req.params.filename - logger.debug('Proxifying WebTorrent file %s from object storage.', filename) - - try { - const { response: s3Response, stream } = await getWebTorrentFileReadStream({ - filename, - rangeHeader: req.header('range') - }) - - setS3Headers(res, s3Response) - - return stream.pipe(res) - } catch (err) { - return handleObjectStorageFailure(res, err) - } + return proxifyWebTorrentFile({ req, res, filename }) } -async function proxifyHLS (req: express.Request, res: express.Response) { +function proxifyHLSController (req: express.Request, res: express.Response) { const playlist = res.locals.videoStreamingPlaylist const video = res.locals.onlyVideo const filename = req.params.filename - logger.debug('Proxifying HLS file %s from object storage.', filename) - - try { - const { response: s3Response, stream } = await getHLSFileReadStream({ - playlist: playlist.withVideo(video), - filename, - rangeHeader: req.header('range') - }) - - setS3Headers(res, s3Response) - - const streamReplacer = filename.endsWith('.m3u8') && doReinjectVideoFileToken(req) - ? new StreamReplacer(line => injectQueryToPlaylistUrls(line, buildReinjectVideoFileTokenQuery(req, filename.endsWith('master.m3u8')))) - : new PassThrough() + const reinjectVideoFileToken = filename.endsWith('.m3u8') && doReinjectVideoFileToken(req) - return pipeline( - stream, - streamReplacer, - res, - err => { - if (!err) return - - handleObjectStorageFailure(res, err) - } - ) - } catch (err) { - return handleObjectStorageFailure(res, err) - } -} - -function handleObjectStorageFailure (res: express.Response, err: Error) { - if (err.name === 'NoSuchKey') { - logger.debug('Could not find key in object storage to proxify private HLS video file.', { err }) - return res.sendStatus(HttpStatusCode.NOT_FOUND_404) - } - - return res.fail({ - status: HttpStatusCode.INTERNAL_SERVER_ERROR_500, - message: err.message, - type: err.name + return proxifyHLS({ + req, + res, + playlist, + video, + filename, + reinjectVideoFileToken }) } - -function setS3Headers (res: express.Response, s3Response: GetObjectCommandOutput) { - if (s3Response.$metadata.httpStatusCode === HttpStatusCode.PARTIAL_CONTENT_206) { - res.setHeader('Content-Range', s3Response.ContentRange) - res.status(HttpStatusCode.PARTIAL_CONTENT_206) - } -} -- cgit v1.2.3