diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-08 15:48:17 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:18:07 +0200 |
commit | bd911b54b555b11df7e9849cf92d358bccfecf6e (patch) | |
tree | 23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server/controllers/api/videos | |
parent | 5a921e7b74910414626bfc9672b857e987e3ebed (diff) | |
download | PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip |
Use bullmq job dependency
Diffstat (limited to 'server/controllers/api/videos')
-rw-r--r-- | server/controllers/api/videos/import.ts | 4 | ||||
-rw-r--r-- | server/controllers/api/videos/index.ts | 2 | ||||
-rw-r--r-- | server/controllers/api/videos/studio.ts | 2 | ||||
-rw-r--r-- | server/controllers/api/videos/update.ts | 64 | ||||
-rw-r--r-- | server/controllers/api/videos/upload.ts | 68 |
5 files changed, 75 insertions, 65 deletions
diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts index b12953630..5a2e1006a 100644 --- a/server/controllers/api/videos/import.ts +++ b/server/controllers/api/videos/import.ts | |||
@@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to | |||
163 | videoImportId: videoImport.id, | 163 | videoImportId: videoImport.id, |
164 | magnetUri | 164 | magnetUri |
165 | } | 165 | } |
166 | await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) | 166 | await JobQueue.Instance.createJob({ type: 'video-import', payload }) |
167 | 167 | ||
168 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) | 168 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) |
169 | 169 | ||
@@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response) | |||
255 | videoImportId: videoImport.id, | 255 | videoImportId: videoImport.id, |
256 | fileExt | 256 | fileExt |
257 | } | 257 | } |
258 | await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) | 258 | await JobQueue.Instance.createJob({ type: 'video-import', payload }) |
259 | 259 | ||
260 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) | 260 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) |
261 | 261 | ||
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index eca72c397..b301515df 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts | |||
@@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) { | |||
151 | const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) | 151 | const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) |
152 | 152 | ||
153 | if (video.isOutdated()) { | 153 | if (video.isOutdated()) { |
154 | JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) | 154 | JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) |
155 | } | 155 | } |
156 | 156 | ||
157 | return res.json(video.toFormattedDetailsJSON()) | 157 | return res.json(video.toFormattedDetailsJSON()) |
diff --git a/server/controllers/api/videos/studio.ts b/server/controllers/api/videos/studio.ts index bff344f3f..6667532bf 100644 --- a/server/controllers/api/videos/studio.ts +++ b/server/controllers/api/videos/studio.ts | |||
@@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response) | |||
71 | tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) | 71 | tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) |
72 | } | 72 | } |
73 | 73 | ||
74 | JobQueue.Instance.createJob({ type: 'video-studio-edition', payload }) | 74 | JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload }) |
75 | 75 | ||
76 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 76 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
77 | } | 77 | } |
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 1545a2232..ab1a23d9a 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import express from 'express' | 1 | import express from 'express' |
2 | import { Transaction } from 'sequelize/types' | 2 | import { Transaction } from 'sequelize/types' |
3 | import { changeVideoChannelShare } from '@server/lib/activitypub/share' | 3 | import { changeVideoChannelShare } from '@server/lib/activitypub/share' |
4 | import { JobQueue } from '@server/lib/job-queue' | 4 | import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' |
5 | import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' | 5 | import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' |
6 | import { openapiOperationDoc } from '@server/middlewares/doc' | 6 | import { openapiOperationDoc } from '@server/middlewares/doc' |
7 | import { FilteredModelAttributes } from '@server/types' | 7 | import { FilteredModelAttributes } from '@server/types' |
@@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils' | |||
13 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 13 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
14 | import { MIMETYPES } from '../../../initializers/constants' | 14 | import { MIMETYPES } from '../../../initializers/constants' |
15 | import { sequelizeTypescript } from '../../../initializers/database' | 15 | import { sequelizeTypescript } from '../../../initializers/database' |
16 | import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' | ||
17 | import { Notifier } from '../../../lib/notifier' | ||
18 | import { Hooks } from '../../../lib/plugins/hooks' | 16 | import { Hooks } from '../../../lib/plugins/hooks' |
19 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' | 17 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' |
20 | import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' | 18 | import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' |
@@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) { | |||
139 | return { videoInstanceUpdated, isNewVideo } | 137 | return { videoInstanceUpdated, isNewVideo } |
140 | }) | 138 | }) |
141 | 139 | ||
142 | const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate) | 140 | Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res }) |
143 | 141 | ||
144 | await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t)) | 142 | await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo }) |
145 | |||
146 | if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | ||
147 | |||
148 | Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res }) | ||
149 | } catch (err) { | 143 | } catch (err) { |
150 | // Force fields we want to update | 144 | // Force fields we want to update |
151 | // If the transaction is retried, sequelize will think the object has not changed | 145 | // If the transaction is retried, sequelize will think the object has not changed |
@@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide | |||
192 | } | 186 | } |
193 | } | 187 | } |
194 | 188 | ||
195 | async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) { | 189 | async function addVideoJobsAfterUpdate (options: { |
196 | if (video.isLive || !videoInfoToUpdate.name) return video | 190 | video: MVideoFullLight |
191 | videoInfoToUpdate: VideoUpdate | ||
192 | wasConfidentialVideo: boolean | ||
193 | isNewVideo: boolean | ||
194 | }) { | ||
195 | const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options | ||
196 | const jobs: CreateJobArgument[] = [] | ||
197 | |||
198 | if (!video.isLive && videoInfoToUpdate.name) { | ||
197 | 199 | ||
198 | for (const file of (video.VideoFiles || [])) { | 200 | for (const file of (video.VideoFiles || [])) { |
199 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } | 201 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } |
200 | 202 | ||
201 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 203 | jobs.push({ type: 'manage-video-torrent', payload }) |
202 | await JobQueue.Instance.waitJob(job) | 204 | } |
203 | } | ||
204 | 205 | ||
205 | const hls = video.getHLSPlaylist() | 206 | const hls = video.getHLSPlaylist() |
206 | 207 | ||
207 | for (const file of (hls?.VideoFiles || [])) { | 208 | for (const file of (hls?.VideoFiles || [])) { |
208 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } | 209 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } |
209 | 210 | ||
210 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 211 | jobs.push({ type: 'manage-video-torrent', payload }) |
211 | await JobQueue.Instance.waitJob(job) | 212 | } |
213 | } | ||
214 | |||
215 | jobs.push({ | ||
216 | type: 'federate-video', | ||
217 | payload: { | ||
218 | videoUUID: video.uuid, | ||
219 | isNewVideo | ||
220 | } | ||
221 | }) | ||
222 | |||
223 | if (wasConfidentialVideo) { | ||
224 | jobs.push({ | ||
225 | type: 'notify', | ||
226 | payload: { | ||
227 | action: 'new-video', | ||
228 | videoUUID: video.uuid | ||
229 | } | ||
230 | }) | ||
212 | } | 231 | } |
213 | 232 | ||
214 | // Refresh video since files have changed | 233 | return JobQueue.Instance.createSequentialJobFlow(...jobs) |
215 | return VideoModel.loadFull(video.id) | ||
216 | } | 234 | } |
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 4a9d7b619..cc171eece 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts | |||
@@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
8 | import { Redis } from '@server/lib/redis' | 8 | import { Redis } from '@server/lib/redis' |
9 | import { uploadx } from '@server/lib/uploadx' | 9 | import { uploadx } from '@server/lib/uploadx' |
10 | import { | 10 | import { |
11 | addMoveToObjectStorageJob, | ||
12 | addOptimizeOrMergeAudioJob, | ||
13 | buildLocalVideoFromReq, | 11 | buildLocalVideoFromReq, |
12 | buildMoveToObjectStorageJob, | ||
13 | buildOptimizeOrMergeAudioJob, | ||
14 | buildVideoThumbnailsFromReq, | 14 | buildVideoThumbnailsFromReq, |
15 | setVideoTags | 15 | setVideoTags |
16 | } from '@server/lib/video' | 16 | } from '@server/lib/video' |
@@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager' | |||
18 | import { buildNextVideoState } from '@server/lib/video-state' | 18 | import { buildNextVideoState } from '@server/lib/video-state' |
19 | import { openapiOperationDoc } from '@server/middlewares/doc' | 19 | import { openapiOperationDoc } from '@server/middlewares/doc' |
20 | import { VideoSourceModel } from '@server/models/video/video-source' | 20 | import { VideoSourceModel } from '@server/models/video/video-source' |
21 | import { MVideoFile, MVideoFullLight } from '@server/types/models' | 21 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' |
22 | import { getLowercaseExtension } from '@shared/core-utils' | 22 | import { getLowercaseExtension } from '@shared/core-utils' |
23 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' | 23 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' |
24 | import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models' | 24 | import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models' |
25 | import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' | 25 | import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' |
26 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
27 | import { createReqFiles } from '../../../helpers/express-utils' | 26 | import { createReqFiles } from '../../../helpers/express-utils' |
28 | import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' | 27 | import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' |
29 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 28 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
30 | import { MIMETYPES } from '../../../initializers/constants' | 29 | import { MIMETYPES } from '../../../initializers/constants' |
31 | import { sequelizeTypescript } from '../../../initializers/database' | 30 | import { sequelizeTypescript } from '../../../initializers/database' |
32 | import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' | ||
33 | import { Notifier } from '../../../lib/notifier' | ||
34 | import { Hooks } from '../../../lib/plugins/hooks' | 31 | import { Hooks } from '../../../lib/plugins/hooks' |
35 | import { generateVideoMiniature } from '../../../lib/thumbnail' | 32 | import { generateVideoMiniature } from '../../../lib/thumbnail' |
36 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' | 33 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' |
@@ -216,22 +213,8 @@ async function addVideo (options: { | |||
216 | // Channel has a new content, set as updated | 213 | // Channel has a new content, set as updated |
217 | await videoCreated.VideoChannel.setAsUpdated() | 214 | await videoCreated.VideoChannel.setAsUpdated() |
218 | 215 | ||
219 | createTorrentFederate(videoCreated, videoFile) | 216 | addVideoJobsAfterUpload(videoCreated, videoFile, user) |
220 | .catch(err => { | 217 | .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) |
221 | logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }) | ||
222 | |||
223 | return videoCreated | ||
224 | }).then(refreshedVideo => { | ||
225 | if (!refreshedVideo) return | ||
226 | |||
227 | if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
228 | return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined }) | ||
229 | } | ||
230 | |||
231 | if (refreshedVideo.state === VideoState.TO_TRANSCODE) { | ||
232 | return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user }) | ||
233 | } | ||
234 | }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) | ||
235 | 218 | ||
236 | Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) | 219 | Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) |
237 | 220 | ||
@@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { | |||
266 | return videoFile | 249 | return videoFile |
267 | } | 250 | } |
268 | 251 | ||
269 | async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) { | 252 | async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) { |
270 | const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } | 253 | return JobQueue.Instance.createSequentialJobFlow( |
271 | 254 | { | |
272 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 255 | type: 'manage-video-torrent' as 'manage-video-torrent', |
273 | await JobQueue.Instance.waitJob(job) | 256 | payload: { |
274 | 257 | videoId: video.id, | |
275 | const refreshedVideo = await VideoModel.loadFull(video.id) | 258 | videoFileId: videoFile.id, |
276 | if (!refreshedVideo) return | 259 | action: 'create' |
277 | 260 | } | |
278 | // Only federate and notify after the torrent creation | 261 | }, |
279 | Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | 262 | { |
263 | type: 'federate-video' as 'federate-video', | ||
264 | payload: { | ||
265 | videoUUID: video.uuid, | ||
266 | isNewVideo: true | ||
267 | } | ||
268 | }, | ||
280 | 269 | ||
281 | await retryTransactionWrapper(() => { | 270 | video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE |
282 | return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t)) | 271 | ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined }) |
283 | }) | 272 | : undefined, |
284 | 273 | ||
285 | return refreshedVideo | 274 | video.state === VideoState.TO_TRANSCODE |
275 | ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) | ||
276 | : undefined | ||
277 | ) | ||
286 | } | 278 | } |
287 | 279 | ||
288 | async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { | 280 | async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { |