aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/controllers/api/videos
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-08 15:48:17 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:18:07 +0200
commitbd911b54b555b11df7e9849cf92d358bccfecf6e (patch)
tree23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server/controllers/api/videos
parent5a921e7b74910414626bfc9672b857e987e3ebed (diff)
downloadPeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip
Use bullmq job dependency
Diffstat (limited to 'server/controllers/api/videos')
-rw-r--r--server/controllers/api/videos/import.ts4
-rw-r--r--server/controllers/api/videos/index.ts2
-rw-r--r--server/controllers/api/videos/studio.ts2
-rw-r--r--server/controllers/api/videos/update.ts64
-rw-r--r--server/controllers/api/videos/upload.ts68
5 files changed, 75 insertions, 65 deletions
diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts
index b12953630..5a2e1006a 100644
--- a/server/controllers/api/videos/import.ts
+++ b/server/controllers/api/videos/import.ts
@@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to
163 videoImportId: videoImport.id, 163 videoImportId: videoImport.id,
164 magnetUri 164 magnetUri
165 } 165 }
166 await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) 166 await JobQueue.Instance.createJob({ type: 'video-import', payload })
167 167
168 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) 168 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
169 169
@@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response)
255 videoImportId: videoImport.id, 255 videoImportId: videoImport.id,
256 fileExt 256 fileExt
257 } 257 }
258 await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) 258 await JobQueue.Instance.createJob({ type: 'video-import', payload })
259 259
260 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) 260 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
261 261
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index eca72c397..b301515df 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) {
151 const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) 151 const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId })
152 152
153 if (video.isOutdated()) { 153 if (video.isOutdated()) {
154 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) 154 JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } })
155 } 155 }
156 156
157 return res.json(video.toFormattedDetailsJSON()) 157 return res.json(video.toFormattedDetailsJSON())
diff --git a/server/controllers/api/videos/studio.ts b/server/controllers/api/videos/studio.ts
index bff344f3f..6667532bf 100644
--- a/server/controllers/api/videos/studio.ts
+++ b/server/controllers/api/videos/studio.ts
@@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response)
71 tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) 71 tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files))
72 } 72 }
73 73
74 JobQueue.Instance.createJob({ type: 'video-studio-edition', payload }) 74 JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload })
75 75
76 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 76 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
77} 77}
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts
index 1545a2232..ab1a23d9a 100644
--- a/server/controllers/api/videos/update.ts
+++ b/server/controllers/api/videos/update.ts
@@ -1,7 +1,7 @@
1import express from 'express' 1import express from 'express'
2import { Transaction } from 'sequelize/types' 2import { Transaction } from 'sequelize/types'
3import { changeVideoChannelShare } from '@server/lib/activitypub/share' 3import { changeVideoChannelShare } from '@server/lib/activitypub/share'
4import { JobQueue } from '@server/lib/job-queue' 4import { CreateJobArgument, JobQueue } from '@server/lib/job-queue'
5import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' 5import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
6import { openapiOperationDoc } from '@server/middlewares/doc' 6import { openapiOperationDoc } from '@server/middlewares/doc'
7import { FilteredModelAttributes } from '@server/types' 7import { FilteredModelAttributes } from '@server/types'
@@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils'
13import { logger, loggerTagsFactory } from '../../../helpers/logger' 13import { logger, loggerTagsFactory } from '../../../helpers/logger'
14import { MIMETYPES } from '../../../initializers/constants' 14import { MIMETYPES } from '../../../initializers/constants'
15import { sequelizeTypescript } from '../../../initializers/database' 15import { sequelizeTypescript } from '../../../initializers/database'
16import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
17import { Notifier } from '../../../lib/notifier'
18import { Hooks } from '../../../lib/plugins/hooks' 16import { Hooks } from '../../../lib/plugins/hooks'
19import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' 17import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
20import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' 18import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares'
@@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) {
139 return { videoInstanceUpdated, isNewVideo } 137 return { videoInstanceUpdated, isNewVideo }
140 }) 138 })
141 139
142 const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate) 140 Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res })
143 141
144 await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t)) 142 await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo })
145
146 if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
147
148 Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res })
149 } catch (err) { 143 } catch (err) {
150 // Force fields we want to update 144 // Force fields we want to update
151 // If the transaction is retried, sequelize will think the object has not changed 145 // If the transaction is retried, sequelize will think the object has not changed
@@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide
192 } 186 }
193} 187}
194 188
195async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) { 189async function addVideoJobsAfterUpdate (options: {
196 if (video.isLive || !videoInfoToUpdate.name) return video 190 video: MVideoFullLight
191 videoInfoToUpdate: VideoUpdate
192 wasConfidentialVideo: boolean
193 isNewVideo: boolean
194}) {
195 const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options
196 const jobs: CreateJobArgument[] = []
197
198 if (!video.isLive && videoInfoToUpdate.name) {
197 199
198 for (const file of (video.VideoFiles || [])) { 200 for (const file of (video.VideoFiles || [])) {
199 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } 201 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
200 202
201 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 203 jobs.push({ type: 'manage-video-torrent', payload })
202 await JobQueue.Instance.waitJob(job) 204 }
203 }
204 205
205 const hls = video.getHLSPlaylist() 206 const hls = video.getHLSPlaylist()
206 207
207 for (const file of (hls?.VideoFiles || [])) { 208 for (const file of (hls?.VideoFiles || [])) {
208 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } 209 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
209 210
210 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 211 jobs.push({ type: 'manage-video-torrent', payload })
211 await JobQueue.Instance.waitJob(job) 212 }
213 }
214
215 jobs.push({
216 type: 'federate-video',
217 payload: {
218 videoUUID: video.uuid,
219 isNewVideo
220 }
221 })
222
223 if (wasConfidentialVideo) {
224 jobs.push({
225 type: 'notify',
226 payload: {
227 action: 'new-video',
228 videoUUID: video.uuid
229 }
230 })
212 } 231 }
213 232
214 // Refresh video since files have changed 233 return JobQueue.Instance.createSequentialJobFlow(...jobs)
215 return VideoModel.loadFull(video.id)
216} 234}
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts
index 4a9d7b619..cc171eece 100644
--- a/server/controllers/api/videos/upload.ts
+++ b/server/controllers/api/videos/upload.ts
@@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Redis } from '@server/lib/redis' 8import { Redis } from '@server/lib/redis'
9import { uploadx } from '@server/lib/uploadx' 9import { uploadx } from '@server/lib/uploadx'
10import { 10import {
11 addMoveToObjectStorageJob,
12 addOptimizeOrMergeAudioJob,
13 buildLocalVideoFromReq, 11 buildLocalVideoFromReq,
12 buildMoveToObjectStorageJob,
13 buildOptimizeOrMergeAudioJob,
14 buildVideoThumbnailsFromReq, 14 buildVideoThumbnailsFromReq,
15 setVideoTags 15 setVideoTags
16} from '@server/lib/video' 16} from '@server/lib/video'
@@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager'
18import { buildNextVideoState } from '@server/lib/video-state' 18import { buildNextVideoState } from '@server/lib/video-state'
19import { openapiOperationDoc } from '@server/middlewares/doc' 19import { openapiOperationDoc } from '@server/middlewares/doc'
20import { VideoSourceModel } from '@server/models/video/video-source' 20import { VideoSourceModel } from '@server/models/video/video-source'
21import { MVideoFile, MVideoFullLight } from '@server/types/models' 21import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
22import { getLowercaseExtension } from '@shared/core-utils' 22import { getLowercaseExtension } from '@shared/core-utils'
23import { isAudioFile, uuidToShort } from '@shared/extra-utils' 23import { isAudioFile, uuidToShort } from '@shared/extra-utils'
24import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models' 24import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models'
25import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' 25import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
26import { retryTransactionWrapper } from '../../../helpers/database-utils'
27import { createReqFiles } from '../../../helpers/express-utils' 26import { createReqFiles } from '../../../helpers/express-utils'
28import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' 27import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg'
29import { logger, loggerTagsFactory } from '../../../helpers/logger' 28import { logger, loggerTagsFactory } from '../../../helpers/logger'
30import { MIMETYPES } from '../../../initializers/constants' 29import { MIMETYPES } from '../../../initializers/constants'
31import { sequelizeTypescript } from '../../../initializers/database' 30import { sequelizeTypescript } from '../../../initializers/database'
32import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
33import { Notifier } from '../../../lib/notifier'
34import { Hooks } from '../../../lib/plugins/hooks' 31import { Hooks } from '../../../lib/plugins/hooks'
35import { generateVideoMiniature } from '../../../lib/thumbnail' 32import { generateVideoMiniature } from '../../../lib/thumbnail'
36import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' 33import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
@@ -216,22 +213,8 @@ async function addVideo (options: {
216 // Channel has a new content, set as updated 213 // Channel has a new content, set as updated
217 await videoCreated.VideoChannel.setAsUpdated() 214 await videoCreated.VideoChannel.setAsUpdated()
218 215
219 createTorrentFederate(videoCreated, videoFile) 216 addVideoJobsAfterUpload(videoCreated, videoFile, user)
220 .catch(err => { 217 .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
221 logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })
222
223 return videoCreated
224 }).then(refreshedVideo => {
225 if (!refreshedVideo) return
226
227 if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
228 return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined })
229 }
230
231 if (refreshedVideo.state === VideoState.TO_TRANSCODE) {
232 return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user })
233 }
234 }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
235 218
236 Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) 219 Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res })
237 220
@@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
266 return videoFile 249 return videoFile
267} 250}
268 251
269async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) { 252async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) {
270 const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } 253 return JobQueue.Instance.createSequentialJobFlow(
271 254 {
272 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 255 type: 'manage-video-torrent' as 'manage-video-torrent',
273 await JobQueue.Instance.waitJob(job) 256 payload: {
274 257 videoId: video.id,
275 const refreshedVideo = await VideoModel.loadFull(video.id) 258 videoFileId: videoFile.id,
276 if (!refreshedVideo) return 259 action: 'create'
277 260 }
278 // Only federate and notify after the torrent creation 261 },
279 Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) 262 {
263 type: 'federate-video' as 'federate-video',
264 payload: {
265 videoUUID: video.uuid,
266 isNewVideo: true
267 }
268 },
280 269
281 await retryTransactionWrapper(() => { 270 video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE
282 return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t)) 271 ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined })
283 }) 272 : undefined,
284 273
285 return refreshedVideo 274 video.state === VideoState.TO_TRANSCODE
275 ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
276 : undefined
277 )
286} 278}
287 279
288async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { 280async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) {