aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-01-19 14:23:00 +0100
committerChocobozzz <me@florianbigard.com>2022-01-19 14:31:05 +0100
commit419b520ca4434d17f3505013174e195c3a316716 (patch)
tree24dbf663c4e11e970cb780f96e6eb3efe023b222 /server/lib/job-queue
parent52435e467a0b30175a10af1dd3ae10d7d564d8ae (diff)
downloadPeerTube-419b520ca4434d17f3505013174e195c3a316716.tar.gz
PeerTube-419b520ca4434d17f3505013174e195c3a316716.tar.zst
PeerTube-419b520ca4434d17f3505013174e195c3a316716.zip
Add ability to cancel & delete video imports
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-import.ts29
-rw-r--r--server/lib/job-queue/job-queue.ts12
2 files changed, 27 insertions, 14 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 2f74e9fbd..cb79725aa 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -42,8 +42,17 @@ import { generateVideoMiniature } from '../../thumbnail'
42async function processVideoImport (job: Job) { 42async function processVideoImport (job: Job) {
43 const payload = job.data as VideoImportPayload 43 const payload = job.data as VideoImportPayload
44 44
45 if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, payload) 45 const videoImport = await getVideoImportOrDie(payload.videoImportId)
46 if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') return processTorrentImport(job, payload) 46 if (videoImport.state === VideoImportState.CANCELLED) {
47 logger.info('Do not process import since it has been cancelled', { payload })
48 return
49 }
50
51 videoImport.state = VideoImportState.PROCESSING
52 await videoImport.save()
53
54 if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, videoImport, payload)
55 if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') return processTorrentImport(job, videoImport, payload)
47} 56}
48 57
49// --------------------------------------------------------------------------- 58// ---------------------------------------------------------------------------
@@ -54,15 +63,11 @@ export {
54 63
55// --------------------------------------------------------------------------- 64// ---------------------------------------------------------------------------
56 65
57async function processTorrentImport (job: Job, payload: VideoImportTorrentPayload) { 66async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
58 logger.info('Processing torrent video import in job %d.', job.id) 67 logger.info('Processing torrent video import in job %d.', job.id)
59 68
60 const videoImport = await getVideoImportOrDie(payload.videoImportId) 69 const options = { type: payload.type, videoImportId: payload.videoImportId }
61 70
62 const options = {
63 type: payload.type,
64 videoImportId: payload.videoImportId
65 }
66 const target = { 71 const target = {
67 torrentName: videoImport.torrentName ? getSecureTorrentName(videoImport.torrentName) : undefined, 72 torrentName: videoImport.torrentName ? getSecureTorrentName(videoImport.torrentName) : undefined,
68 uri: videoImport.magnetUri 73 uri: videoImport.magnetUri
@@ -70,14 +75,10 @@ async function processTorrentImport (job: Job, payload: VideoImportTorrentPayloa
70 return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options) 75 return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options)
71} 76}
72 77
73async function processYoutubeDLImport (job: Job, payload: VideoImportYoutubeDLPayload) { 78async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
74 logger.info('Processing youtubeDL video import in job %d.', job.id) 79 logger.info('Processing youtubeDL video import in job %d.', job.id)
75 80
76 const videoImport = await getVideoImportOrDie(payload.videoImportId) 81 const options = { type: payload.type, videoImportId: videoImport.id }
77 const options = {
78 type: payload.type,
79 videoImportId: videoImport.id
80 }
81 82
82 const youtubeDL = new YoutubeDLWrapper(videoImport.targetUrl, ServerConfigManager.Instance.getEnabledResolutions('vod')) 83 const youtubeDL = new YoutubeDLWrapper(videoImport.targetUrl, ServerConfigManager.Instance.getEnabledResolutions('vod'))
83 84
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index fbc599f12..22bd1f5d2 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -162,6 +162,18 @@ class JobQueue {
162 } 162 }
163 } 163 }
164 164
165 async pause () {
166 for (const handler of Object.keys(this.queues)) {
167 await this.queues[handler].pause(true)
168 }
169 }
170
171 async resume () {
172 for (const handler of Object.keys(this.queues)) {
173 await this.queues[handler].resume(true)
174 }
175 }
176
165 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { 177 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
166 this.createJobWithPromise(obj, options) 178 this.createJobWithPromise(obj, options)
167 .catch(err => logger.error('Cannot create job.', { err, obj })) 179 .catch(err => logger.error('Cannot create job.', { err, obj }))