From 419b520ca4434d17f3505013174e195c3a316716 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 19 Jan 2022 14:23:00 +0100 Subject: Add ability to cancel & delete video imports --- server/controllers/api/jobs.ts | 26 ++++++- server/controllers/api/videos/import.ts | 48 ++++++++++++- server/initializers/constants.ts | 4 +- server/lib/job-queue/handlers/video-import.ts | 29 ++++---- server/lib/job-queue/job-queue.ts | 12 ++++ .../middlewares/validators/videos/video-imports.ts | 71 +++++++++++++++++-- server/tests/api/check-params/jobs.ts | 43 +++++++++++- server/tests/api/check-params/video-imports.ts | 66 +++++++++++++++++- server/tests/api/server/jobs.ts | 25 +++++++ server/tests/api/videos/video-imports.ts | 81 +++++++++++++++++++++- 10 files changed, 379 insertions(+), 26 deletions(-) (limited to 'server') diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index eebd195b0..c61b7362f 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -1,5 +1,5 @@ import express from 'express' -import { Job, JobState, JobType, ResultList, UserRight } from '@shared/models' +import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' import { isArray } from '../../helpers/custom-validators/misc' import { JobQueue } from '../../lib/job-queue' import { @@ -16,6 +16,18 @@ import { listJobsValidator } from '../../middlewares/validators/jobs' const jobsRouter = express.Router() +jobsRouter.post('/pause', + authenticate, + ensureUserHasRight(UserRight.MANAGE_JOBS), + asyncMiddleware(pauseJobQueue) +) + +jobsRouter.post('/resume', + authenticate, + ensureUserHasRight(UserRight.MANAGE_JOBS), + asyncMiddleware(resumeJobQueue) +) + jobsRouter.get('/:state?', openapiOperationDoc({ operationId: 'getJobs' }), authenticate, @@ -36,6 +48,18 @@ export { // --------------------------------------------------------------------------- +async function pauseJobQueue (req: express.Request, res: express.Response) { + await JobQueue.Instance.pause() + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function resumeJobQueue (req: express.Request, res: express.Response) { + await JobQueue.Instance.resume() + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + async function listJobs (req: express.Request, res: express.Response) { const state = req.params.state as JobState const asc = req.query.sort === 'createdAt' diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts index 08d69827b..8cbfd3286 100644 --- a/server/controllers/api/videos/import.ts +++ b/server/controllers/api/videos/import.ts @@ -19,7 +19,15 @@ import { MVideoWithBlacklistLight } from '@server/types/models' import { MVideoImportFormattable } from '@server/types/models/video/video-import' -import { ServerErrorCode, ThumbnailType, VideoImportCreate, VideoImportState, VideoPrivacy, VideoState } from '@shared/models' +import { + HttpStatusCode, + ServerErrorCode, + ThumbnailType, + VideoImportCreate, + VideoImportState, + VideoPrivacy, + VideoState +} from '@shared/models' import { auditLoggerFactory, getAuditIdFromRes, VideoImportAuditView } from '../../../helpers/audit-logger' import { moveAndProcessCaptionFile } from '../../../helpers/captions-utils' import { isArray } from '../../../helpers/custom-validators/misc' @@ -34,7 +42,14 @@ import { getLocalVideoActivityPubUrl } from '../../../lib/activitypub/url' import { JobQueue } from '../../../lib/job-queue/job-queue' import { updateVideoMiniatureFromExisting, updateVideoMiniatureFromUrl } from '../../../lib/thumbnail' import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' -import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videoImportAddValidator } from '../../../middlewares' +import { + asyncMiddleware, + asyncRetryTransactionMiddleware, + authenticate, + videoImportAddValidator, + videoImportCancelValidator, + videoImportDeleteValidator +} from '../../../middlewares' import { VideoModel } from '../../../models/video/video' import { VideoCaptionModel } from '../../../models/video/video-caption' import { VideoImportModel } from '../../../models/video/video-import' @@ -59,6 +74,18 @@ videoImportsRouter.post('/imports', asyncRetryTransactionMiddleware(addVideoImport) ) +videoImportsRouter.post('/imports/:id/cancel', + authenticate, + asyncMiddleware(videoImportCancelValidator), + asyncRetryTransactionMiddleware(cancelVideoImport) +) + +videoImportsRouter.delete('/imports/:id', + authenticate, + asyncMiddleware(videoImportDeleteValidator), + asyncRetryTransactionMiddleware(deleteVideoImport) +) + // --------------------------------------------------------------------------- export { @@ -67,6 +94,23 @@ export { // --------------------------------------------------------------------------- +async function deleteVideoImport (req: express.Request, res: express.Response) { + const videoImport = res.locals.videoImport + + await videoImport.destroy() + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + +async function cancelVideoImport (req: express.Request, res: express.Response) { + const videoImport = res.locals.videoImport + + videoImport.state = VideoImportState.CANCELLED + await videoImport.save() + + return res.sendStatus(HttpStatusCode.NO_CONTENT_204) +} + function addVideoImport (req: express.Request, res: express.Response) { if (req.body.targetUrl) return addYoutubeDLImport(req, res) diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index b2f511152..6a59bf805 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -441,7 +441,9 @@ const VIDEO_IMPORT_STATES: { [ id in VideoImportState ]: string } = { [VideoImportState.FAILED]: 'Failed', [VideoImportState.PENDING]: 'Pending', [VideoImportState.SUCCESS]: 'Success', - [VideoImportState.REJECTED]: 'Rejected' + [VideoImportState.REJECTED]: 'Rejected', + [VideoImportState.CANCELLED]: 'Cancelled', + [VideoImportState.PROCESSING]: 'Processing' } const ABUSE_STATES: { [ id in AbuseState ]: string } = { diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 2f74e9fbd..cb79725aa 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -42,8 +42,17 @@ import { generateVideoMiniature } from '../../thumbnail' async function processVideoImport (job: Job) { const payload = job.data as VideoImportPayload - if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, payload) - if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') return processTorrentImport(job, payload) + const videoImport = await getVideoImportOrDie(payload.videoImportId) + if (videoImport.state === VideoImportState.CANCELLED) { + logger.info('Do not process import since it has been cancelled', { payload }) + return + } + + videoImport.state = VideoImportState.PROCESSING + await videoImport.save() + + if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, videoImport, payload) + if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') return processTorrentImport(job, videoImport, payload) } // --------------------------------------------------------------------------- @@ -54,15 +63,11 @@ export { // --------------------------------------------------------------------------- -async function processTorrentImport (job: Job, payload: VideoImportTorrentPayload) { +async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { logger.info('Processing torrent video import in job %d.', job.id) - const videoImport = await getVideoImportOrDie(payload.videoImportId) + const options = { type: payload.type, videoImportId: payload.videoImportId } - const options = { - type: payload.type, - videoImportId: payload.videoImportId - } const target = { torrentName: videoImport.torrentName ? getSecureTorrentName(videoImport.torrentName) : undefined, uri: videoImport.magnetUri @@ -70,14 +75,10 @@ async function processTorrentImport (job: Job, payload: VideoImportTorrentPayloa return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options) } -async function processYoutubeDLImport (job: Job, payload: VideoImportYoutubeDLPayload) { +async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { logger.info('Processing youtubeDL video import in job %d.', job.id) - const videoImport = await getVideoImportOrDie(payload.videoImportId) - const options = { - type: payload.type, - videoImportId: videoImport.id - } + const options = { type: payload.type, videoImportId: videoImport.id } const youtubeDL = new YoutubeDLWrapper(videoImport.targetUrl, ServerConfigManager.Instance.getEnabledResolutions('vod')) diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index fbc599f12..22bd1f5d2 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -162,6 +162,18 @@ class JobQueue { } } + async pause () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].pause(true) + } + } + + async resume () { + for (const handler of Object.keys(this.queues)) { + await this.queues[handler].resume(true) + } + } + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { this.createJobWithPromise(obj, options) .catch(err => logger.error('Cannot create job.', { err, obj })) diff --git a/server/middlewares/validators/videos/video-imports.ts b/server/middlewares/validators/videos/video-imports.ts index e4b54283f..a3a5cc531 100644 --- a/server/middlewares/validators/videos/video-imports.ts +++ b/server/middlewares/validators/videos/video-imports.ts @@ -1,8 +1,10 @@ import express from 'express' -import { body } from 'express-validator' +import { body, param } from 'express-validator' +import { isValid as isIPValid, parse as parseIP } from 'ipaddr.js' import { isPreImportVideoAccepted } from '@server/lib/moderation' import { Hooks } from '@server/lib/plugins/hooks' -import { HttpStatusCode } from '@shared/models' +import { MUserAccountId, MVideoImport } from '@server/types/models' +import { HttpStatusCode, UserRight, VideoImportState } from '@shared/models' import { VideoImportCreate } from '@shared/models/videos/import/video-import-create.model' import { isIdValid, toIntOrNull } from '../../../helpers/custom-validators/misc' import { isVideoImportTargetUrlValid, isVideoImportTorrentFile } from '../../../helpers/custom-validators/video-imports' @@ -11,9 +13,8 @@ import { cleanUpReqFiles } from '../../../helpers/express-utils' import { logger } from '../../../helpers/logger' import { CONFIG } from '../../../initializers/config' import { CONSTRAINTS_FIELDS } from '../../../initializers/constants' -import { areValidationErrors, doesVideoChannelOfAccountExist } from '../shared' +import { areValidationErrors, doesVideoChannelOfAccountExist, doesVideoImportExist } from '../shared' import { getCommonVideoEditAttributes } from './videos' -import { isValid as isIPValid, parse as parseIP } from 'ipaddr.js' const videoImportAddValidator = getCommonVideoEditAttributes().concat([ body('channelId') @@ -95,10 +96,58 @@ const videoImportAddValidator = getCommonVideoEditAttributes().concat([ } ]) +const videoImportDeleteValidator = [ + param('id') + .custom(isIdValid).withMessage('Should have correct import id'), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + logger.debug('Checking videoImportDeleteValidator parameters', { parameters: req.params }) + + if (areValidationErrors(req, res)) return + + if (!await doesVideoImportExist(parseInt(req.params.id), res)) return + if (!checkUserCanManageImport(res.locals.oauth.token.user, res.locals.videoImport, res)) return + + if (res.locals.videoImport.state === VideoImportState.PENDING) { + return res.fail({ + status: HttpStatusCode.CONFLICT_409, + message: 'Cannot delete a pending video import. Cancel it or wait for the end of the import first.' + }) + } + + return next() + } +] + +const videoImportCancelValidator = [ + param('id') + .custom(isIdValid).withMessage('Should have correct import id'), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + logger.debug('Checking videoImportCancelValidator parameters', { parameters: req.params }) + + if (areValidationErrors(req, res)) return + + if (!await doesVideoImportExist(parseInt(req.params.id), res)) return + if (!checkUserCanManageImport(res.locals.oauth.token.user, res.locals.videoImport, res)) return + + if (res.locals.videoImport.state !== VideoImportState.PENDING) { + return res.fail({ + status: HttpStatusCode.CONFLICT_409, + message: 'Cannot cancel a non pending video import.' + }) + } + + return next() + } +] + // --------------------------------------------------------------------------- export { - videoImportAddValidator + videoImportAddValidator, + videoImportCancelValidator, + videoImportDeleteValidator } // --------------------------------------------------------------------------- @@ -132,3 +181,15 @@ async function isImportAccepted (req: express.Request, res: express.Response) { return true } + +function checkUserCanManageImport (user: MUserAccountId, videoImport: MVideoImport, res: express.Response) { + if (user.hasRight(UserRight.MANAGE_VIDEO_IMPORTS) === false && videoImport.userId !== user.id) { + res.fail({ + status: HttpStatusCode.FORBIDDEN_403, + message: 'Cannot manage video import of another user' + }) + return false + } + + return true +} diff --git a/server/tests/api/check-params/jobs.ts b/server/tests/api/check-params/jobs.ts index d85961d62..801b13d1e 100644 --- a/server/tests/api/check-params/jobs.ts +++ b/server/tests/api/check-params/jobs.ts @@ -3,7 +3,14 @@ import 'mocha' import { checkBadCountPagination, checkBadSortPagination, checkBadStartPagination } from '@server/tests/shared' import { HttpStatusCode } from '@shared/models' -import { cleanupTests, createSingleServer, makeGetRequest, PeerTubeServer, setAccessTokensToServers } from '@shared/server-commands' +import { + cleanupTests, + createSingleServer, + makeGetRequest, + makePostBodyRequest, + PeerTubeServer, + setAccessTokensToServers +} from '@shared/server-commands' describe('Test jobs API validators', function () { const path = '/api/v1/jobs/failed' @@ -76,7 +83,41 @@ describe('Test jobs API validators', function () { expectedStatus: HttpStatusCode.FORBIDDEN_403 }) }) + }) + + describe('When pausing/resuming the job queue', async function () { + const commands = [ 'pause', 'resume' ] + + it('Should fail with a non authenticated user', async function () { + for (const command of commands) { + await makePostBodyRequest({ + url: server.url, + path: '/api/v1/jobs/' + command, + expectedStatus: HttpStatusCode.UNAUTHORIZED_401 + }) + } + }) + it('Should fail with a non admin user', async function () { + for (const command of commands) { + await makePostBodyRequest({ + url: server.url, + path: '/api/v1/jobs/' + command, + expectedStatus: HttpStatusCode.UNAUTHORIZED_401 + }) + } + }) + + it('Should succeed with the correct params', async function () { + for (const command of commands) { + await makePostBodyRequest({ + url: server.url, + path: '/api/v1/jobs/' + command, + token: server.accessToken, + expectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + }) }) after(async function () { diff --git a/server/tests/api/check-params/video-imports.ts b/server/tests/api/check-params/video-imports.ts index da05793a0..156a612ee 100644 --- a/server/tests/api/check-params/video-imports.ts +++ b/server/tests/api/check-params/video-imports.ts @@ -12,7 +12,9 @@ import { makePostBodyRequest, makeUploadRequest, PeerTubeServer, - setAccessTokensToServers + setAccessTokensToServers, + setDefaultVideoChannel, + waitJobs } from '@shared/server-commands' describe('Test video imports API validator', function () { @@ -29,6 +31,7 @@ describe('Test video imports API validator', function () { server = await createSingleServer(1) await setAccessTokensToServers([ server ]) + await setDefaultVideoChannel([ server ]) const username = 'user1' const password = 'my super password' @@ -347,6 +350,67 @@ describe('Test video imports API validator', function () { }) }) + describe('Deleting/cancelling a video import', function () { + let importId: number + + async function importVideo () { + const attributes = { channelId: server.store.channel.id, targetUrl: FIXTURE_URLS.goodVideo } + const res = await server.imports.importVideo({ attributes }) + + return res.id + } + + before(async function () { + importId = await importVideo() + }) + + it('Should fail with an invalid import id', async function () { + await server.imports.cancel({ importId: 'artyom' as any, expectedStatus: HttpStatusCode.BAD_REQUEST_400 }) + await server.imports.delete({ importId: 'artyom' as any, expectedStatus: HttpStatusCode.BAD_REQUEST_400 }) + }) + + it('Should fail with an unknown import id', async function () { + await server.imports.cancel({ importId: 42, expectedStatus: HttpStatusCode.NOT_FOUND_404 }) + await server.imports.delete({ importId: 42, expectedStatus: HttpStatusCode.NOT_FOUND_404 }) + }) + + it('Should fail without token', async function () { + await server.imports.cancel({ importId, token: null, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 }) + await server.imports.delete({ importId, token: null, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 }) + }) + + it('Should fail with another user token', async function () { + await server.imports.cancel({ importId, token: userAccessToken, expectedStatus: HttpStatusCode.FORBIDDEN_403 }) + await server.imports.delete({ importId, token: userAccessToken, expectedStatus: HttpStatusCode.FORBIDDEN_403 }) + }) + + it('Should fail to cancel non pending import', async function () { + this.timeout(60000) + + await waitJobs([ server ]) + + await server.imports.cancel({ importId, expectedStatus: HttpStatusCode.CONFLICT_409 }) + }) + + it('Should succeed to delete an import', async function () { + await server.imports.delete({ importId }) + }) + + it('Should fail to delete a pending import', async function () { + await server.jobs.pauseJobQueue() + + importId = await importVideo() + + await server.imports.delete({ importId, expectedStatus: HttpStatusCode.CONFLICT_409 }) + }) + + it('Should succeed to cancel an import', async function () { + importId = await importVideo() + + await server.imports.cancel({ importId }) + }) + }) + after(async function () { await cleanupTests([ server ]) }) diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 4294e1fd5..bd8ffe188 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts @@ -11,6 +11,7 @@ import { setAccessTokensToServers, waitJobs } from '@shared/server-commands' +import { wait } from '@shared/core-utils' const expect = chai.expect @@ -91,6 +92,30 @@ describe('Test jobs', function () { expect(jobs.find(j => j.state === 'completed')).to.not.be.undefined }) + it('Should pause the job queue', async function () { + this.timeout(120000) + + await servers[1].jobs.pauseJobQueue() + + await servers[1].videos.upload({ attributes: { name: 'video2' } }) + + await wait(5000) + + const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' }) + expect(body.data).to.have.lengthOf(1) + }) + + it('Should resume the job queue', async function () { + this.timeout(120000) + + await servers[1].jobs.resumeJobQueue() + + await waitJobs(servers) + + const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' }) + expect(body.data).to.have.lengthOf(0) + }) + after(async function () { await cleanupTests(servers) }) diff --git a/server/tests/api/videos/video-imports.ts b/server/tests/api/videos/video-imports.ts index e8e0f01f1..ba21ab17a 100644 --- a/server/tests/api/videos/video-imports.ts +++ b/server/tests/api/videos/video-imports.ts @@ -6,7 +6,7 @@ import { pathExists, readdir, remove } from 'fs-extra' import { join } from 'path' import { FIXTURE_URLS, testCaptionFile, testImage } from '@server/tests/shared' import { areHttpImportTestsDisabled } from '@shared/core-utils' -import { VideoPrivacy, VideoResolution } from '@shared/models' +import { HttpStatusCode, Video, VideoImportState, VideoPrivacy, VideoResolution, VideoState } from '@shared/models' import { cleanupTests, createMultipleServers, @@ -382,6 +382,85 @@ describe('Test video imports', function () { runSuite('yt-dlp') + describe('Delete/cancel an import', function () { + let server: PeerTubeServer + + let finishedImportId: number + let finishedVideo: Video + let pendingImportId: number + + async function importVideo (name: string) { + const attributes = { name, channelId: server.store.channel.id, targetUrl: FIXTURE_URLS.goodVideo } + const res = await server.imports.importVideo({ attributes }) + + return res.id + } + + before(async function () { + this.timeout(120_000) + + server = await createSingleServer(1) + + await setAccessTokensToServers([ server ]) + await setDefaultVideoChannel([ server ]) + + finishedImportId = await importVideo('finished') + await waitJobs([ server ]) + + await server.jobs.pauseJobQueue() + pendingImportId = await importVideo('pending') + + const { data } = await server.imports.getMyVideoImports() + expect(data).to.have.lengthOf(2) + + finishedVideo = data.find(i => i.id === finishedImportId).video + }) + + it('Should delete a video import', async function () { + await server.imports.delete({ importId: finishedImportId }) + + const { data } = await server.imports.getMyVideoImports() + expect(data).to.have.lengthOf(1) + expect(data[0].id).to.equal(pendingImportId) + expect(data[0].state.id).to.equal(VideoImportState.PENDING) + }) + + it('Should not have deleted the associated video', async function () { + const video = await server.videos.get({ id: finishedVideo.id, token: server.accessToken, expectedStatus: HttpStatusCode.OK_200 }) + expect(video.name).to.equal('finished') + expect(video.state.id).to.equal(VideoState.PUBLISHED) + }) + + it('Should cancel a video import', async function () { + await server.imports.cancel({ importId: pendingImportId }) + + const { data } = await server.imports.getMyVideoImports() + expect(data).to.have.lengthOf(1) + expect(data[0].id).to.equal(pendingImportId) + expect(data[0].state.id).to.equal(VideoImportState.CANCELLED) + }) + + it('Should not have processed the cancelled video import', async function () { + this.timeout(60_000) + + await server.jobs.resumeJobQueue() + + await waitJobs([ server ]) + + const { data } = await server.imports.getMyVideoImports() + expect(data).to.have.lengthOf(1) + expect(data[0].id).to.equal(pendingImportId) + expect(data[0].state.id).to.equal(VideoImportState.CANCELLED) + expect(data[0].video.state.id).to.equal(VideoState.TO_IMPORT) + }) + + it('Should delete the cancelled video import', async function () { + await server.imports.delete({ importId: pendingImportId }) + const { data } = await server.imports.getMyVideoImports() + expect(data).to.have.lengthOf(0) + }) + }) + describe('Auto update', function () { let server: PeerTubeServer -- cgit v1.2.3