diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-08 10:42:08 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:18:07 +0200 |
commit | 5a921e7b74910414626bfc9672b857e987e3ebed (patch) | |
tree | f627e2ccc11c55bcba9e630951e72c5f94864c12 /server/controllers/api | |
parent | 5e2afe4290103bf0d54ae7b3e62781f2a00487c9 (diff) | |
download | PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.gz PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.zst PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.zip |
Move to bullmq
Diffstat (limited to 'server/controllers/api')
-rw-r--r-- | server/controllers/api/jobs.ts | 13 | ||||
-rw-r--r-- | server/controllers/api/videos/update.ts | 4 | ||||
-rw-r--r-- | server/controllers/api/videos/upload.ts | 4 |
3 files changed, 11 insertions, 10 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index c61b7362f..6a53e3083 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | import { Job as BullJob } from 'bullmq' | ||
1 | import express from 'express' | 2 | import express from 'express' |
2 | import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' | 3 | import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' |
3 | import { isArray } from '../../helpers/custom-validators/misc' | 4 | import { isArray } from '../../helpers/custom-validators/misc' |
@@ -25,7 +26,7 @@ jobsRouter.post('/pause', | |||
25 | jobsRouter.post('/resume', | 26 | jobsRouter.post('/resume', |
26 | authenticate, | 27 | authenticate, |
27 | ensureUserHasRight(UserRight.MANAGE_JOBS), | 28 | ensureUserHasRight(UserRight.MANAGE_JOBS), |
28 | asyncMiddleware(resumeJobQueue) | 29 | resumeJobQueue |
29 | ) | 30 | ) |
30 | 31 | ||
31 | jobsRouter.get('/:state?', | 32 | jobsRouter.get('/:state?', |
@@ -54,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) { | |||
54 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 55 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
55 | } | 56 | } |
56 | 57 | ||
57 | async function resumeJobQueue (req: express.Request, res: express.Response) { | 58 | function resumeJobQueue (req: express.Request, res: express.Response) { |
58 | await JobQueue.Instance.resume() | 59 | JobQueue.Instance.resume() |
59 | 60 | ||
60 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 61 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
61 | } | 62 | } |
@@ -82,7 +83,7 @@ async function listJobs (req: express.Request, res: express.Response) { | |||
82 | return res.json(result) | 83 | return res.json(result) |
83 | } | 84 | } |
84 | 85 | ||
85 | async function formatJob (job: any, state?: JobState): Promise<Job> { | 86 | async function formatJob (job: BullJob, state?: JobState): Promise<Job> { |
86 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 | 87 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 |
87 | ? job.stacktrace[0] | 88 | ? job.stacktrace[0] |
88 | : null | 89 | : null |
@@ -90,9 +91,9 @@ async function formatJob (job: any, state?: JobState): Promise<Job> { | |||
90 | return { | 91 | return { |
91 | id: job.id, | 92 | id: job.id, |
92 | state: state || await job.getState(), | 93 | state: state || await job.getState(), |
93 | type: job.queue.name as JobType, | 94 | type: job.queueName as JobType, |
94 | data: job.data, | 95 | data: job.data, |
95 | progress: await job.progress(), | 96 | progress: job.progress as number, |
96 | priority: job.opts.priority, | 97 | priority: job.opts.priority, |
97 | error, | 98 | error, |
98 | createdAt: new Date(job.timestamp), | 99 | createdAt: new Date(job.timestamp), |
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 65a7321fd..1545a2232 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts | |||
@@ -199,7 +199,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo | |||
199 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } | 199 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } |
200 | 200 | ||
201 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 201 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) |
202 | await job.finished() | 202 | await JobQueue.Instance.waitJob(job) |
203 | } | 203 | } |
204 | 204 | ||
205 | const hls = video.getHLSPlaylist() | 205 | const hls = video.getHLSPlaylist() |
@@ -208,7 +208,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo | |||
208 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } | 208 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } |
209 | 209 | ||
210 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 210 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) |
211 | await job.finished() | 211 | await JobQueue.Instance.waitJob(job) |
212 | } | 212 | } |
213 | 213 | ||
214 | // Refresh video since files have changed | 214 | // Refresh video since files have changed |
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 3ce66c9ca..4a9d7b619 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts | |||
@@ -17,6 +17,7 @@ import { | |||
17 | import { VideoPathManager } from '@server/lib/video-path-manager' | 17 | import { VideoPathManager } from '@server/lib/video-path-manager' |
18 | import { buildNextVideoState } from '@server/lib/video-state' | 18 | import { buildNextVideoState } from '@server/lib/video-state' |
19 | import { openapiOperationDoc } from '@server/middlewares/doc' | 19 | import { openapiOperationDoc } from '@server/middlewares/doc' |
20 | import { VideoSourceModel } from '@server/models/video/video-source' | ||
20 | import { MVideoFile, MVideoFullLight } from '@server/types/models' | 21 | import { MVideoFile, MVideoFullLight } from '@server/types/models' |
21 | import { getLowercaseExtension } from '@shared/core-utils' | 22 | import { getLowercaseExtension } from '@shared/core-utils' |
22 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' | 23 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' |
@@ -44,7 +45,6 @@ import { | |||
44 | import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' | 45 | import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' |
45 | import { VideoModel } from '../../../models/video/video' | 46 | import { VideoModel } from '../../../models/video/video' |
46 | import { VideoFileModel } from '../../../models/video/video-file' | 47 | import { VideoFileModel } from '../../../models/video/video-file' |
47 | import { VideoSourceModel } from '@server/models/video/video-source' | ||
48 | 48 | ||
49 | const lTags = loggerTagsFactory('api', 'video') | 49 | const lTags = loggerTagsFactory('api', 'video') |
50 | const auditLogger = auditLoggerFactory('videos') | 50 | const auditLogger = auditLoggerFactory('videos') |
@@ -270,7 +270,7 @@ async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoF | |||
270 | const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } | 270 | const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } |
271 | 271 | ||
272 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 272 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) |
273 | await job.finished() | 273 | await JobQueue.Instance.waitJob(job) |
274 | 274 | ||
275 | const refreshedVideo = await VideoModel.loadFull(video.id) | 275 | const refreshedVideo = await VideoModel.loadFull(video.id) |
276 | if (!refreshedVideo) return | 276 | if (!refreshedVideo) return |