aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-08 15:48:17 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:18:07 +0200
commitbd911b54b555b11df7e9849cf92d358bccfecf6e (patch)
tree23e94b4acbe6819fedc1cb5e067b700cbdd880c3
parent5a921e7b74910414626bfc9672b857e987e3ebed (diff)
downloadPeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip
Use bullmq job dependency
-rw-r--r--scripts/create-import-video-file-job.ts2
-rw-r--r--server/controllers/api/accounts.ts2
-rw-r--r--server/controllers/api/server/follows.ts4
-rw-r--r--server/controllers/api/server/redundancy.ts2
-rw-r--r--server/controllers/api/users/my-subscriptions.ts2
-rw-r--r--server/controllers/api/video-channel.ts4
-rw-r--r--server/controllers/api/videos/import.ts4
-rw-r--r--server/controllers/api/videos/index.ts2
-rw-r--r--server/controllers/api/videos/studio.ts2
-rw-r--r--server/controllers/api/videos/update.ts64
-rw-r--r--server/controllers/api/videos/upload.ts68
-rw-r--r--server/initializers/constants.ts10
-rw-r--r--server/lib/activitypub/actors/get.ts4
-rw-r--r--server/lib/activitypub/follow.ts2
-rw-r--r--server/lib/activitypub/outbox.ts2
-rw-r--r--server/lib/activitypub/playlists/refresh.ts2
-rw-r--r--server/lib/activitypub/send/shared/send-utils.ts8
-rw-r--r--server/lib/activitypub/videos/get.ts2
-rw-r--r--server/lib/activitypub/videos/shared/video-sync-attributes.ts2
-rw-r--r--server/lib/emailer.ts10
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts2
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts2
-rw-r--r--server/lib/job-queue/handlers/email.ts2
-rw-r--r--server/lib/job-queue/handlers/federate-video.ts28
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts2
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts4
-rw-r--r--server/lib/job-queue/handlers/notify.ts27
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts7
-rw-r--r--server/lib/job-queue/handlers/video-import.ts15
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts2
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts10
-rw-r--r--server/lib/job-queue/job-queue.ts81
-rw-r--r--server/lib/live/live-manager.ts10
-rw-r--r--server/lib/notifier/notifier.ts2
-rw-r--r--server/lib/schedulers/auto-follow-index-instances.ts2
-rw-r--r--server/lib/video-state.ts7
-rw-r--r--server/lib/video.ts40
-rw-r--r--shared/models/server/job.model.ts17
42 files changed, 314 insertions, 152 deletions
diff --git a/scripts/create-import-video-file-job.ts b/scripts/create-import-video-file-job.ts
index 97e9c7933..cf974f240 100644
--- a/scripts/create-import-video-file-job.ts
+++ b/scripts/create-import-video-file-job.ts
@@ -45,6 +45,6 @@ async function run () {
45 } 45 }
46 46
47 JobQueue.Instance.init(true) 47 JobQueue.Instance.init(true)
48 await JobQueue.Instance.createJobWithPromise({ type: 'video-file-import', payload: dataInput }) 48 await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput })
49 console.log('Import job for video %s created.', video.uuid) 49 console.log('Import job for video %s created.', video.uuid)
50} 50}
diff --git a/server/controllers/api/accounts.ts b/server/controllers/api/accounts.ts
index 8d9f92d93..66cdaab82 100644
--- a/server/controllers/api/accounts.ts
+++ b/server/controllers/api/accounts.ts
@@ -119,7 +119,7 @@ function getAccount (req: express.Request, res: express.Response) {
119 const account = res.locals.account 119 const account = res.locals.account
120 120
121 if (account.isOutdated()) { 121 if (account.isOutdated()) {
122 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) 122 JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } })
123 } 123 }
124 124
125 return res.json(account.toFormattedJSON()) 125 return res.json(account.toFormattedJSON())
diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts
index 60d36ed59..87828813a 100644
--- a/server/controllers/api/server/follows.ts
+++ b/server/controllers/api/server/follows.ts
@@ -138,7 +138,7 @@ async function addFollow (req: express.Request, res: express.Response) {
138 followerActorId: follower.id 138 followerActorId: follower.id
139 } 139 }
140 140
141 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 141 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
142 } 142 }
143 143
144 for (const handle of handles) { 144 for (const handle of handles) {
@@ -150,7 +150,7 @@ async function addFollow (req: express.Request, res: express.Response) {
150 followerActorId: follower.id 150 followerActorId: follower.id
151 } 151 }
152 152
153 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 153 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
154 } 154 }
155 155
156 return res.status(HttpStatusCode.NO_CONTENT_204).end() 156 return res.status(HttpStatusCode.NO_CONTENT_204).end()
diff --git a/server/controllers/api/server/redundancy.ts b/server/controllers/api/server/redundancy.ts
index 9f43d3e4e..94e187cd4 100644
--- a/server/controllers/api/server/redundancy.ts
+++ b/server/controllers/api/server/redundancy.ts
@@ -85,7 +85,7 @@ async function addVideoRedundancy (req: express.Request, res: express.Response)
85 videoId: res.locals.onlyVideo.id 85 videoId: res.locals.onlyVideo.id
86 } 86 }
87 87
88 await JobQueue.Instance.createJobWithPromise({ 88 await JobQueue.Instance.createJob({
89 type: 'video-redundancy', 89 type: 'video-redundancy',
90 payload 90 payload
91 }) 91 })
diff --git a/server/controllers/api/users/my-subscriptions.ts b/server/controllers/api/users/my-subscriptions.ts
index fb1f68635..a750f9bd1 100644
--- a/server/controllers/api/users/my-subscriptions.ts
+++ b/server/controllers/api/users/my-subscriptions.ts
@@ -122,7 +122,7 @@ function addUserSubscription (req: express.Request, res: express.Response) {
122 followerActorId: user.Account.Actor.id 122 followerActorId: user.Account.Actor.id
123 } 123 }
124 124
125 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 125 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
126 126
127 return res.status(HttpStatusCode.NO_CONTENT_204).end() 127 return res.status(HttpStatusCode.NO_CONTENT_204).end()
128} 128}
diff --git a/server/controllers/api/video-channel.ts b/server/controllers/api/video-channel.ts
index 411ec8630..6b33e894d 100644
--- a/server/controllers/api/video-channel.ts
+++ b/server/controllers/api/video-channel.ts
@@ -245,7 +245,7 @@ async function addVideoChannel (req: express.Request, res: express.Response) {
245 }) 245 })
246 246
247 const payload = { actorId: videoChannelCreated.actorId } 247 const payload = { actorId: videoChannelCreated.actorId }
248 await JobQueue.Instance.createJobWithPromise({ type: 'actor-keys', payload }) 248 await JobQueue.Instance.createJob({ type: 'actor-keys', payload })
249 249
250 auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON())) 250 auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON()))
251 logger.info('Video channel %s created.', videoChannelCreated.Actor.url) 251 logger.info('Video channel %s created.', videoChannelCreated.Actor.url)
@@ -335,7 +335,7 @@ async function getVideoChannel (req: express.Request, res: express.Response) {
335 const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id }) 335 const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id })
336 336
337 if (videoChannel.isOutdated()) { 337 if (videoChannel.isOutdated()) {
338 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) 338 JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } })
339 } 339 }
340 340
341 return res.json(videoChannel.toFormattedJSON()) 341 return res.json(videoChannel.toFormattedJSON())
diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts
index b12953630..5a2e1006a 100644
--- a/server/controllers/api/videos/import.ts
+++ b/server/controllers/api/videos/import.ts
@@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to
163 videoImportId: videoImport.id, 163 videoImportId: videoImport.id,
164 magnetUri 164 magnetUri
165 } 165 }
166 await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) 166 await JobQueue.Instance.createJob({ type: 'video-import', payload })
167 167
168 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) 168 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
169 169
@@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response)
255 videoImportId: videoImport.id, 255 videoImportId: videoImport.id,
256 fileExt 256 fileExt
257 } 257 }
258 await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) 258 await JobQueue.Instance.createJob({ type: 'video-import', payload })
259 259
260 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) 260 auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
261 261
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index eca72c397..b301515df 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) {
151 const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) 151 const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId })
152 152
153 if (video.isOutdated()) { 153 if (video.isOutdated()) {
154 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) 154 JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } })
155 } 155 }
156 156
157 return res.json(video.toFormattedDetailsJSON()) 157 return res.json(video.toFormattedDetailsJSON())
diff --git a/server/controllers/api/videos/studio.ts b/server/controllers/api/videos/studio.ts
index bff344f3f..6667532bf 100644
--- a/server/controllers/api/videos/studio.ts
+++ b/server/controllers/api/videos/studio.ts
@@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response)
71 tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) 71 tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files))
72 } 72 }
73 73
74 JobQueue.Instance.createJob({ type: 'video-studio-edition', payload }) 74 JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload })
75 75
76 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 76 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
77} 77}
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts
index 1545a2232..ab1a23d9a 100644
--- a/server/controllers/api/videos/update.ts
+++ b/server/controllers/api/videos/update.ts
@@ -1,7 +1,7 @@
1import express from 'express' 1import express from 'express'
2import { Transaction } from 'sequelize/types' 2import { Transaction } from 'sequelize/types'
3import { changeVideoChannelShare } from '@server/lib/activitypub/share' 3import { changeVideoChannelShare } from '@server/lib/activitypub/share'
4import { JobQueue } from '@server/lib/job-queue' 4import { CreateJobArgument, JobQueue } from '@server/lib/job-queue'
5import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' 5import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
6import { openapiOperationDoc } from '@server/middlewares/doc' 6import { openapiOperationDoc } from '@server/middlewares/doc'
7import { FilteredModelAttributes } from '@server/types' 7import { FilteredModelAttributes } from '@server/types'
@@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils'
13import { logger, loggerTagsFactory } from '../../../helpers/logger' 13import { logger, loggerTagsFactory } from '../../../helpers/logger'
14import { MIMETYPES } from '../../../initializers/constants' 14import { MIMETYPES } from '../../../initializers/constants'
15import { sequelizeTypescript } from '../../../initializers/database' 15import { sequelizeTypescript } from '../../../initializers/database'
16import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
17import { Notifier } from '../../../lib/notifier'
18import { Hooks } from '../../../lib/plugins/hooks' 16import { Hooks } from '../../../lib/plugins/hooks'
19import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' 17import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
20import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' 18import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares'
@@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) {
139 return { videoInstanceUpdated, isNewVideo } 137 return { videoInstanceUpdated, isNewVideo }
140 }) 138 })
141 139
142 const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate) 140 Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res })
143 141
144 await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t)) 142 await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo })
145
146 if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
147
148 Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res })
149 } catch (err) { 143 } catch (err) {
150 // Force fields we want to update 144 // Force fields we want to update
151 // If the transaction is retried, sequelize will think the object has not changed 145 // If the transaction is retried, sequelize will think the object has not changed
@@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide
192 } 186 }
193} 187}
194 188
195async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) { 189async function addVideoJobsAfterUpdate (options: {
196 if (video.isLive || !videoInfoToUpdate.name) return video 190 video: MVideoFullLight
191 videoInfoToUpdate: VideoUpdate
192 wasConfidentialVideo: boolean
193 isNewVideo: boolean
194}) {
195 const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options
196 const jobs: CreateJobArgument[] = []
197
198 if (!video.isLive && videoInfoToUpdate.name) {
197 199
198 for (const file of (video.VideoFiles || [])) { 200 for (const file of (video.VideoFiles || [])) {
199 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } 201 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
200 202
201 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 203 jobs.push({ type: 'manage-video-torrent', payload })
202 await JobQueue.Instance.waitJob(job) 204 }
203 }
204 205
205 const hls = video.getHLSPlaylist() 206 const hls = video.getHLSPlaylist()
206 207
207 for (const file of (hls?.VideoFiles || [])) { 208 for (const file of (hls?.VideoFiles || [])) {
208 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } 209 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
209 210
210 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 211 jobs.push({ type: 'manage-video-torrent', payload })
211 await JobQueue.Instance.waitJob(job) 212 }
213 }
214
215 jobs.push({
216 type: 'federate-video',
217 payload: {
218 videoUUID: video.uuid,
219 isNewVideo
220 }
221 })
222
223 if (wasConfidentialVideo) {
224 jobs.push({
225 type: 'notify',
226 payload: {
227 action: 'new-video',
228 videoUUID: video.uuid
229 }
230 })
212 } 231 }
213 232
214 // Refresh video since files have changed 233 return JobQueue.Instance.createSequentialJobFlow(...jobs)
215 return VideoModel.loadFull(video.id)
216} 234}
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts
index 4a9d7b619..cc171eece 100644
--- a/server/controllers/api/videos/upload.ts
+++ b/server/controllers/api/videos/upload.ts
@@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Redis } from '@server/lib/redis' 8import { Redis } from '@server/lib/redis'
9import { uploadx } from '@server/lib/uploadx' 9import { uploadx } from '@server/lib/uploadx'
10import { 10import {
11 addMoveToObjectStorageJob,
12 addOptimizeOrMergeAudioJob,
13 buildLocalVideoFromReq, 11 buildLocalVideoFromReq,
12 buildMoveToObjectStorageJob,
13 buildOptimizeOrMergeAudioJob,
14 buildVideoThumbnailsFromReq, 14 buildVideoThumbnailsFromReq,
15 setVideoTags 15 setVideoTags
16} from '@server/lib/video' 16} from '@server/lib/video'
@@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager'
18import { buildNextVideoState } from '@server/lib/video-state' 18import { buildNextVideoState } from '@server/lib/video-state'
19import { openapiOperationDoc } from '@server/middlewares/doc' 19import { openapiOperationDoc } from '@server/middlewares/doc'
20import { VideoSourceModel } from '@server/models/video/video-source' 20import { VideoSourceModel } from '@server/models/video/video-source'
21import { MVideoFile, MVideoFullLight } from '@server/types/models' 21import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
22import { getLowercaseExtension } from '@shared/core-utils' 22import { getLowercaseExtension } from '@shared/core-utils'
23import { isAudioFile, uuidToShort } from '@shared/extra-utils' 23import { isAudioFile, uuidToShort } from '@shared/extra-utils'
24import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models' 24import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models'
25import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' 25import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
26import { retryTransactionWrapper } from '../../../helpers/database-utils'
27import { createReqFiles } from '../../../helpers/express-utils' 26import { createReqFiles } from '../../../helpers/express-utils'
28import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' 27import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg'
29import { logger, loggerTagsFactory } from '../../../helpers/logger' 28import { logger, loggerTagsFactory } from '../../../helpers/logger'
30import { MIMETYPES } from '../../../initializers/constants' 29import { MIMETYPES } from '../../../initializers/constants'
31import { sequelizeTypescript } from '../../../initializers/database' 30import { sequelizeTypescript } from '../../../initializers/database'
32import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
33import { Notifier } from '../../../lib/notifier'
34import { Hooks } from '../../../lib/plugins/hooks' 31import { Hooks } from '../../../lib/plugins/hooks'
35import { generateVideoMiniature } from '../../../lib/thumbnail' 32import { generateVideoMiniature } from '../../../lib/thumbnail'
36import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' 33import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
@@ -216,22 +213,8 @@ async function addVideo (options: {
216 // Channel has a new content, set as updated 213 // Channel has a new content, set as updated
217 await videoCreated.VideoChannel.setAsUpdated() 214 await videoCreated.VideoChannel.setAsUpdated()
218 215
219 createTorrentFederate(videoCreated, videoFile) 216 addVideoJobsAfterUpload(videoCreated, videoFile, user)
220 .catch(err => { 217 .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
221 logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })
222
223 return videoCreated
224 }).then(refreshedVideo => {
225 if (!refreshedVideo) return
226
227 if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
228 return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined })
229 }
230
231 if (refreshedVideo.state === VideoState.TO_TRANSCODE) {
232 return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user })
233 }
234 }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
235 218
236 Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) 219 Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res })
237 220
@@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
266 return videoFile 249 return videoFile
267} 250}
268 251
269async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) { 252async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) {
270 const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } 253 return JobQueue.Instance.createSequentialJobFlow(
271 254 {
272 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 255 type: 'manage-video-torrent' as 'manage-video-torrent',
273 await JobQueue.Instance.waitJob(job) 256 payload: {
274 257 videoId: video.id,
275 const refreshedVideo = await VideoModel.loadFull(video.id) 258 videoFileId: videoFile.id,
276 if (!refreshedVideo) return 259 action: 'create'
277 260 }
278 // Only federate and notify after the torrent creation 261 },
279 Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) 262 {
263 type: 'federate-video' as 'federate-video',
264 payload: {
265 videoUUID: video.uuid,
266 isNewVideo: true
267 }
268 },
280 269
281 await retryTransactionWrapper(() => { 270 video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE
282 return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t)) 271 ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined })
283 }) 272 : undefined,
284 273
285 return refreshedVideo 274 video.state === VideoState.TO_TRANSCODE
275 ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
276 : undefined
277 )
286} 278}
287 279
288async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { 280async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) {
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index db43c59be..a53c22662 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -156,7 +156,9 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
156 'video-live-ending': 1, 156 'video-live-ending': 1,
157 'video-studio-edition': 1, 157 'video-studio-edition': 1,
158 'manage-video-torrent': 1, 158 'manage-video-torrent': 1,
159 'move-to-object-storage': 3 159 'move-to-object-storage': 3,
160 'notify': 1,
161 'federate-video': 1
160} 162}
161// Excluded keys are jobs that can be configured by admins 163// Excluded keys are jobs that can be configured by admins
162const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { 164const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
@@ -175,7 +177,9 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
175 'video-live-ending': 10, 177 'video-live-ending': 10,
176 'video-studio-edition': 1, 178 'video-studio-edition': 1,
177 'manage-video-torrent': 1, 179 'manage-video-torrent': 1,
178 'move-to-object-storage': 1 180 'move-to-object-storage': 1,
181 'notify': 5,
182 'federate-video': 3
179} 183}
180const JOB_TTL: { [id in JobType]: number } = { 184const JOB_TTL: { [id in JobType]: number } = {
181 'activitypub-http-broadcast': 60000 * 10, // 10 minutes 185 'activitypub-http-broadcast': 60000 * 10, // 10 minutes
@@ -195,6 +199,8 @@ const JOB_TTL: { [id in JobType]: number } = {
195 'video-redundancy': 1000 * 3600 * 3, // 3 hours 199 'video-redundancy': 1000 * 3600 * 3, // 3 hours
196 'video-live-ending': 1000 * 60 * 10, // 10 minutes 200 'video-live-ending': 1000 * 60 * 10, // 10 minutes
197 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours 201 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
202 'notify': 60000 * 5, // 5 minutes
203 'federate-video': 60000 * 5, // 5 minutes
198 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours 204 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
199} 205}
200const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { 206const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
diff --git a/server/lib/activitypub/actors/get.ts b/server/lib/activitypub/actors/get.ts
index d2b651082..e73b7d707 100644
--- a/server/lib/activitypub/actors/get.ts
+++ b/server/lib/activitypub/actors/get.ts
@@ -110,7 +110,7 @@ async function loadActorFromDB (actorUrl: string, fetchType: ActorLoadByUrlType)
110async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) { 110async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) {
111 if ((created === true || refreshed === true) && updateCollections === true) { 111 if ((created === true || refreshed === true) && updateCollections === true) {
112 const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' } 112 const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' }
113 await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) 113 await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
114 } 114 }
115} 115}
116 116
@@ -118,6 +118,6 @@ async function schedulePlaylistFetchIfNeeded (actor: MActorAccountId, created: b
118 // We created a new account: fetch the playlists 118 // We created a new account: fetch the playlists
119 if (created === true && actor.Account && accountPlaylistsUrl) { 119 if (created === true && actor.Account && accountPlaylistsUrl) {
120 const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' } 120 const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' }
121 await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) 121 await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
122 } 122 }
123} 123}
diff --git a/server/lib/activitypub/follow.ts b/server/lib/activitypub/follow.ts
index 741b54df5..f6e2a48fd 100644
--- a/server/lib/activitypub/follow.ts
+++ b/server/lib/activitypub/follow.ts
@@ -27,7 +27,7 @@ async function autoFollowBackIfNeeded (actorFollow: MActorFollowActors, transact
27 isAutoFollow: true 27 isAutoFollow: true
28 } 28 }
29 29
30 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 30 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
31 } 31 }
32} 32}
33 33
diff --git a/server/lib/activitypub/outbox.ts b/server/lib/activitypub/outbox.ts
index ecdc33a77..5eef76871 100644
--- a/server/lib/activitypub/outbox.ts
+++ b/server/lib/activitypub/outbox.ts
@@ -16,7 +16,7 @@ async function addFetchOutboxJob (actor: Pick<ActorModel, 'id' | 'outboxUrl'>) {
16 type: 'activity' as 'activity' 16 type: 'activity' as 'activity'
17 } 17 }
18 18
19 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) 19 return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload })
20} 20}
21 21
22export { 22export {
diff --git a/server/lib/activitypub/playlists/refresh.ts b/server/lib/activitypub/playlists/refresh.ts
index 493e8c7ec..33260ea02 100644
--- a/server/lib/activitypub/playlists/refresh.ts
+++ b/server/lib/activitypub/playlists/refresh.ts
@@ -9,7 +9,7 @@ import { fetchRemoteVideoPlaylist } from './shared'
9function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { 9function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) {
10 if (!playlist.isOutdated()) return 10 if (!playlist.isOutdated()) return
11 11
12 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) 12 JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } })
13} 13}
14 14
15async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> { 15async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> {
diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts
index fcec63991..2bc1ef8f5 100644
--- a/server/lib/activitypub/send/shared/send-utils.ts
+++ b/server/lib/activitypub/send/shared/send-utils.ts
@@ -120,7 +120,7 @@ async function forwardActivity (
120 body: activity, 120 body: activity,
121 contextType: null 121 contextType: null
122 } 122 }
123 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })) 123 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload }))
124} 124}
125 125
126// --------------------------------------------------------------------------- 126// ---------------------------------------------------------------------------
@@ -205,7 +205,7 @@ function broadcastTo (options: {
205 contextType 205 contextType
206 } 206 }
207 207
208 JobQueue.Instance.createJob({ 208 JobQueue.Instance.createJobAsync({
209 type: parallelizable 209 type: parallelizable
210 ? 'activitypub-http-broadcast-parallel' 210 ? 'activitypub-http-broadcast-parallel'
211 : 'activitypub-http-broadcast', 211 : 'activitypub-http-broadcast',
@@ -222,7 +222,7 @@ function broadcastTo (options: {
222 contextType 222 contextType
223 } 223 }
224 224
225 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) 225 JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
226 } 226 }
227} 227}
228 228
@@ -243,7 +243,7 @@ function unicastTo (options: {
243 contextType 243 contextType
244 } 244 }
245 245
246 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) 246 JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
247} 247}
248 248
249// --------------------------------------------------------------------------- 249// ---------------------------------------------------------------------------
diff --git a/server/lib/activitypub/videos/get.ts b/server/lib/activitypub/videos/get.ts
index b74df132c..14ba55034 100644
--- a/server/lib/activitypub/videos/get.ts
+++ b/server/lib/activitypub/videos/get.ts
@@ -107,7 +107,7 @@ async function scheduleRefresh (video: MVideoThumbnail, fetchType: VideoLoadByUr
107 return refreshVideoIfNeeded(refreshOptions) 107 return refreshVideoIfNeeded(refreshOptions)
108 } 108 }
109 109
110 await JobQueue.Instance.createJobWithPromise({ 110 await JobQueue.Instance.createJob({
111 type: 'activitypub-refresher', 111 type: 'activitypub-refresher',
112 payload: { type: 'video', url: video.url } 112 payload: { type: 'video', url: video.url }
113 }) 113 })
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
index 8cf0c87a6..8ed1b6447 100644
--- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts
+++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
@@ -74,7 +74,7 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
74} 74}
75 75
76function createJob (payload: ActivitypubHttpFetcherPayload) { 76function createJob (payload: ActivitypubHttpFetcherPayload) {
77 return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) 77 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
78} 78}
79 79
80function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { 80function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
diff --git a/server/lib/emailer.ts b/server/lib/emailer.ts
index bd1089530..9e546de7f 100644
--- a/server/lib/emailer.ts
+++ b/server/lib/emailer.ts
@@ -66,7 +66,7 @@ class Emailer {
66 } 66 }
67 } 67 }
68 68
69 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 69 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
70 } 70 }
71 71
72 addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { 72 addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) {
@@ -80,7 +80,7 @@ class Emailer {
80 } 80 }
81 } 81 }
82 82
83 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 83 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
84 } 84 }
85 85
86 addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { 86 addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) {
@@ -94,7 +94,7 @@ class Emailer {
94 } 94 }
95 } 95 }
96 96
97 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 97 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
98 } 98 }
99 99
100 addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { 100 addUserBlockJob (user: MUser, blocked: boolean, reason?: string) {
@@ -108,7 +108,7 @@ class Emailer {
108 text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` 108 text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.`
109 } 109 }
110 110
111 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 111 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
112 } 112 }
113 113
114 addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { 114 addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) {
@@ -127,7 +127,7 @@ class Emailer {
127 } 127 }
128 } 128 }
129 129
130 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 130 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
131 } 131 }
132 132
133 async sendMail (options: EmailPayload) { 133 async sendMail (options: EmailPayload) {
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 944da5be1..a68c32ba0 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) {
17 const payload = job.data as ActivitypubFollowPayload 17 const payload = job.data as ActivitypubFollowPayload
18 const host = payload.host 18 const host = payload.host
19 19
20 logger.info('Processing ActivityPub follow in job %d.', job.id) 20 logger.info('Processing ActivityPub follow in job %s.', job.id)
21 21
22 let targetActor: MActorFull 22 let targetActor: MActorFull
23 if (!host || host === WEBSERVER.HOST) { 23 if (!host || host === WEBSERVER.HOST) {
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 354c608fb..13eff5211 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests'
8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' 8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
9 9
10async function processActivityPubHttpBroadcast (job: Job) { 10async function processActivityPubHttpBroadcast (job: Job) {
11 logger.info('Processing ActivityPub broadcast in job %d.', job.id) 11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12 12
13 const payload = job.data as ActivitypubHttpBroadcastPayload 13 const payload = job.data as ActivitypubHttpBroadcastPayload
14 14
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index e0b841887..b6cb3c4a6 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share'
12import { addVideoComments } from '../../activitypub/video-comments' 12import { addVideoComments } from '../../activitypub/video-comments'
13 13
14async function processActivityPubHttpFetcher (job: Job) { 14async function processActivityPubHttpFetcher (job: Job) {
15 logger.info('Processing ActivityPub fetcher in job %d.', job.id) 15 logger.info('Processing ActivityPub fetcher in job %s.', job.id)
16 16
17 const payload = job.data as ActivitypubHttpFetcherPayload 17 const payload = job.data as ActivitypubHttpFetcherPayload
18 18
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 837a597a5..9e4e84002 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests'
6import { ActorFollowHealthCache } from '../../actor-follow-health-cache' 6import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
7 7
8async function processActivityPubHttpUnicast (job: Job) { 8async function processActivityPubHttpUnicast (job: Job) {
9 logger.info('Processing ActivityPub unicast in job %d.', job.id) 9 logger.info('Processing ActivityPub unicast in job %s.', job.id)
10 10
11 const payload = job.data as ActivitypubHttpUnicastPayload 11 const payload = job.data as ActivitypubHttpUnicastPayload
12 const uri = payload.uri 12 const uri = payload.uri
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 600f858a0..307e771ff 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors'
11async function refreshAPObject (job: Job) { 11async function refreshAPObject (job: Job) {
12 const payload = job.data as RefreshPayload 12 const payload = job.data as RefreshPayload
13 13
14 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) 14 logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url)
15 15
16 if (payload.type === 'video') return refreshVideo(payload.url) 16 if (payload.type === 'video') return refreshVideo(payload.url)
17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) 17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 4a5bad9fb..27a2d431b 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger'
6 6
7async function processActorKeys (job: Job) { 7async function processActorKeys (job: Job) {
8 const payload = job.data as ActorKeysPayload 8 const payload = job.data as ActorKeysPayload
9 logger.info('Processing actor keys in job %d.', job.id) 9 logger.info('Processing actor keys in job %s.', job.id)
10 10
11 const actor = await ActorModel.load(payload.actorId) 11 const actor = await ActorModel.load(payload.actorId)
12 12
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index b5b9475b1..567bcc076 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer'
5 5
6async function processEmail (job: Job) { 6async function processEmail (job: Job) {
7 const payload = job.data as EmailPayload 7 const payload = job.data as EmailPayload
8 logger.info('Processing email in job %d.', job.id) 8 logger.info('Processing email in job %s.', job.id)
9 9
10 return Emailer.Instance.sendMail(payload) 10 return Emailer.Instance.sendMail(payload)
11} 11}
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts
new file mode 100644
index 000000000..6aac36741
--- /dev/null
+++ b/server/lib/job-queue/handlers/federate-video.ts
@@ -0,0 +1,28 @@
1import { Job } from 'bullmq'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { sequelizeTypescript } from '@server/initializers/database'
4import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
5import { VideoModel } from '@server/models/video/video'
6import { FederateVideoPayload } from '@shared/models'
7import { logger } from '../../../helpers/logger'
8
9function processFederateVideo (job: Job) {
10 const payload = job.data as FederateVideoPayload
11
12 logger.info('Processing video federation in job %s.', job.id)
13
14 return retryTransactionWrapper(() => {
15 return sequelizeTypescript.transaction(async t => {
16 const video = await VideoModel.loadFull(payload.videoUUID, t)
17 if (!video) return
18
19 return federateVideoIfNeeded(video, payload.isNewVideo, t)
20 })
21 })
22}
23
24// ---------------------------------------------------------------------------
25
26export {
27 processFederateVideo
28}
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index 4505ca79e..03aa414c9 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger'
8 8
9async function processManageVideoTorrent (job: Job) { 9async function processManageVideoTorrent (job: Job) {
10 const payload = job.data as ManageVideoTorrentPayload 10 const payload = job.data as ManageVideoTorrentPayload
11 logger.info('Processing torrent in job %d.', job.id) 11 logger.info('Processing torrent in job %s.', job.id)
12 12
13 if (payload.action === 'create') return doCreateAction(payload) 13 if (payload.action === 'create') return doCreateAction(payload)
14 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) 14 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index d608fd865..25bdebeea 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage')
17 17
18export async function processMoveToObjectStorage (job: Job) { 18export async function processMoveToObjectStorage (job: Job) {
19 const payload = job.data as MoveObjectStoragePayload 19 const payload = job.data as MoveObjectStoragePayload
20 logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) 20 logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
21 21
22 const video = await VideoModel.loadWithFiles(payload.videoUUID) 22 const video = await VideoModel.loadWithFiles(payload.videoUUID)
23 // No video, maybe deleted? 23 // No video, maybe deleted?
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) {
43 43
44 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') 44 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
45 if (pendingMove === 0) { 45 if (pendingMove === 0) {
46 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) 46 logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
47 47
48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) 48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
49 } 49 }
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts
new file mode 100644
index 000000000..83605396c
--- /dev/null
+++ b/server/lib/job-queue/handlers/notify.ts
@@ -0,0 +1,27 @@
1import { Job } from 'bullmq'
2import { Notifier } from '@server/lib/notifier'
3import { VideoModel } from '@server/models/video/video'
4import { NotifyPayload } from '@shared/models'
5import { logger } from '../../../helpers/logger'
6
7async function processNotify (job: Job) {
8 const payload = job.data as NotifyPayload
9 logger.info('Processing %s notification in job %s.', payload.action, job.id)
10
11 if (payload.action === 'new-video') return doNotifyNewVideo(payload)
12}
13
14// ---------------------------------------------------------------------------
15
16export {
17 processNotify
18}
19
20// ---------------------------------------------------------------------------
21
22async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) {
23 const refreshedVideo = await VideoModel.loadFull(payload.videoUUID)
24 if (!refreshedVideo) return
25
26 Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
27}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 40c44cf52..d950f6407 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
6import { generateWebTorrentVideoFilename } from '@server/lib/paths' 6import { generateWebTorrentVideoFilename } from '@server/lib/paths'
7import { addMoveToObjectStorageJob } from '@server/lib/video' 7import { buildMoveToObjectStorageJob } from '@server/lib/video'
8import { VideoPathManager } from '@server/lib/video-path-manager' 8import { VideoPathManager } from '@server/lib/video-path-manager'
9import { VideoModel } from '@server/models/video/video' 9import { VideoModel } from '@server/models/video/video'
10import { VideoFileModel } from '@server/models/video/video-file' 10import { VideoFileModel } from '@server/models/video/video-file'
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils'
13import { VideoFileImportPayload, VideoStorage } from '@shared/models' 13import { VideoFileImportPayload, VideoStorage } from '@shared/models'
14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' 14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
15import { logger } from '../../../helpers/logger' 15import { logger } from '../../../helpers/logger'
16import { JobQueue } from '../job-queue'
16 17
17async function processVideoFileImport (job: Job) { 18async function processVideoFileImport (job: Job) {
18 const payload = job.data as VideoFileImportPayload 19 const payload = job.data as VideoFileImportPayload
19 logger.info('Processing video file import in job %d.', job.id) 20 logger.info('Processing video file import in job %s.', job.id)
20 21
21 const video = await VideoModel.loadFull(payload.videoUUID) 22 const video = await VideoModel.loadFull(payload.videoUUID)
22 // No video, maybe deleted? 23 // No video, maybe deleted?
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) {
28 await updateVideoFile(video, payload.filePath) 29 await updateVideoFile(video, payload.filePath)
29 30
30 if (CONFIG.OBJECT_STORAGE.ENABLED) { 31 if (CONFIG.OBJECT_STORAGE.ENABLED) {
31 await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) 32 await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state }))
32 } else { 33 } else {
33 await federateVideoIfNeeded(video, false) 34 await federateVideoIfNeeded(video, false)
34 } 35 }
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index e5cd35865..f4629159c 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Hooks } from '@server/lib/plugins/hooks' 8import { Hooks } from '@server/lib/plugins/hooks'
9import { ServerConfigManager } from '@server/lib/server-config-manager' 9import { ServerConfigManager } from '@server/lib/server-config-manager'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video'
12import { VideoPathManager } from '@server/lib/video-path-manager' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 13import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail'
39import { federateVideoIfNeeded } from '../../activitypub/videos' 39import { federateVideoIfNeeded } from '../../activitypub/videos'
40import { Notifier } from '../../notifier' 40import { Notifier } from '../../notifier'
41import { generateVideoMiniature } from '../../thumbnail' 41import { generateVideoMiniature } from '../../thumbnail'
42import { JobQueue } from '../job-queue'
42 43
43async function processVideoImport (job: Job) { 44async function processVideoImport (job: Job) {
44 const payload = job.data as VideoImportPayload 45 const payload = job.data as VideoImportPayload
@@ -65,7 +66,7 @@ export {
65// --------------------------------------------------------------------------- 66// ---------------------------------------------------------------------------
66 67
67async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { 68async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
68 logger.info('Processing torrent video import in job %d.', job.id) 69 logger.info('Processing torrent video import in job %s.', job.id)
69 70
70 const options = { type: payload.type, videoImportId: payload.videoImportId } 71 const options = { type: payload.type, videoImportId: payload.videoImportId }
71 72
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault,
77} 78}
78 79
79async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { 80async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
80 logger.info('Processing youtubeDL video import in job %d.', job.id) 81 logger.info('Processing youtubeDL video import in job %s.', job.id)
81 82
82 const options = { type: payload.type, videoImportId: videoImport.id } 83 const options = { type: payload.type, videoImportId: videoImport.id }
83 84
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
259 } 260 }
260 261
261 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 262 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
262 return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) 263 await JobQueue.Instance.createJob(
264 await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
265 )
263 } 266 }
264 267
265 // Create transcoding jobs? 268 // Create transcoding jobs?
266 if (video.state === VideoState.TO_TRANSCODE) { 269 if (video.state === VideoState.TO_TRANSCODE) {
267 await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) 270 await JobQueue.Instance.createJob(
271 await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
272 )
268 } 273 }
269 274
270 } catch (err) { 275 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
index 75ab2cd02..bac99fdb7 100644
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ b/server/lib/job-queue/handlers/video-redundancy.ts
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger'
5 5
6async function processVideoRedundancy (job: Job) { 6async function processVideoRedundancy (job: Job) {
7 const payload = job.data as VideoRedundancyPayload 7 const payload = job.data as VideoRedundancyPayload
8 logger.info('Processing video redundancy in job %d.', job.id) 8 logger.info('Processing video redundancy in job %s.', job.id)
9 9
10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) 10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId)
11} 11}
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 078243538..23f9a34cc 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { generateWebTorrentVideoFilename } from '@server/lib/paths' 8import { generateWebTorrentVideoFilename } from '@server/lib/paths'
9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' 9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { buildOptimizeOrMergeAudioJob } from '@server/lib/video'
12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' 12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
13import { VideoPathManager } from '@server/lib/video-path-manager' 13import { VideoPathManager } from '@server/lib/video-path-manager'
14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' 14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
@@ -36,6 +36,7 @@ import {
36 VideoStudioTaskWatermarkPayload 36 VideoStudioTaskWatermarkPayload
37} from '@shared/models' 37} from '@shared/models'
38import { logger, loggerTagsFactory } from '../../../helpers/logger' 38import { logger, loggerTagsFactory } from '../../../helpers/logger'
39import { JobQueue } from '../job-queue'
39 40
40const lTagsBase = loggerTagsFactory('video-edition') 41const lTagsBase = loggerTagsFactory('video-edition')
41 42
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) {
43 const payload = job.data as VideoStudioEditionPayload 44 const payload = job.data as VideoStudioEditionPayload
44 const lTags = lTagsBase(payload.videoUUID) 45 const lTags = lTagsBase(payload.videoUUID)
45 46
46 logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) 47 logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags)
47 48
48 const video = await VideoModel.loadFull(payload.videoUUID) 49 const video = await VideoModel.loadFull(payload.videoUUID)
49 50
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) {
100 await federateVideoIfNeeded(video, false, undefined) 101 await federateVideoIfNeeded(video, false, undefined)
101 102
102 const user = await UserModel.loadByVideoId(video.id) 103 const user = await UserModel.loadByVideoId(video.id)
103 await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) 104
105 await JobQueue.Instance.createJob(
106 await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
107 )
104} 108}
105 109
106// --------------------------------------------------------------------------- 110// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0cf5d53ce..50d732beb 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,6 @@
1import { 1import {
2 FlowJob,
3 FlowProducer,
2 Job, 4 Job,
3 JobsOptions, 5 JobsOptions,
4 Queue, 6 Queue,
@@ -13,7 +15,7 @@ import {
13import { jobStates } from '@server/helpers/custom-validators/jobs' 15import { jobStates } from '@server/helpers/custom-validators/jobs'
14import { CONFIG } from '@server/initializers/config' 16import { CONFIG } from '@server/initializers/config'
15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils' 18import { pick, timeoutPromise } from '@shared/core-utils'
17import { 19import {
18 ActivitypubFollowPayload, 20 ActivitypubFollowPayload,
19 ActivitypubHttpBroadcastPayload, 21 ActivitypubHttpBroadcastPayload,
@@ -22,10 +24,12 @@ import {
22 ActorKeysPayload, 24 ActorKeysPayload,
23 DeleteResumableUploadMetaFilePayload, 25 DeleteResumableUploadMetaFilePayload,
24 EmailPayload, 26 EmailPayload,
27 FederateVideoPayload,
25 JobState, 28 JobState,
26 JobType, 29 JobType,
27 ManageVideoTorrentPayload, 30 ManageVideoTorrentPayload,
28 MoveObjectStoragePayload, 31 MoveObjectStoragePayload,
32 NotifyPayload,
29 RefreshPayload, 33 RefreshPayload,
30 VideoFileImportPayload, 34 VideoFileImportPayload,
31 VideoImportPayload, 35 VideoImportPayload,
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
45import { refreshAPObject } from './handlers/activitypub-refresher' 49import { refreshAPObject } from './handlers/activitypub-refresher'
46import { processActorKeys } from './handlers/actor-keys' 50import { processActorKeys } from './handlers/actor-keys'
47import { processEmail } from './handlers/email' 51import { processEmail } from './handlers/email'
52import { processFederateVideo } from './handlers/federate-video'
48import { processManageVideoTorrent } from './handlers/manage-video-torrent' 53import { processManageVideoTorrent } from './handlers/manage-video-torrent'
49import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' 54import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
55import { processNotify } from './handlers/notify'
50import { processVideoFileImport } from './handlers/video-file-import' 56import { processVideoFileImport } from './handlers/video-file-import'
51import { processVideoImport } from './handlers/video-import' 57import { processVideoImport } from './handlers/video-import'
52import { processVideoLiveEnding } from './handlers/video-live-ending' 58import { processVideoLiveEnding } from './handlers/video-live-ending'
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
54import { processVideoTranscoding } from './handlers/video-transcoding' 60import { processVideoTranscoding } from './handlers/video-transcoding'
55import { processVideosViewsStats } from './handlers/video-views-stats' 61import { processVideosViewsStats } from './handlers/video-views-stats'
56 62
57type CreateJobArgument = 63export type CreateJobArgument =
58 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 64 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
59 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | 65 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
60 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 66 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
@@ -73,7 +79,9 @@ type CreateJobArgument =
73 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | 79 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
74 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | 80 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
75 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | 81 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
76 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } 82 { type: 'notify', payload: NotifyPayload } |
83 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
84 { type: 'federate-video', payload: FederateVideoPayload }
77 85
78export type CreateJobOptions = { 86export type CreateJobOptions = {
79 delay?: number 87 delay?: number
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
98 'video-redundancy': processVideoRedundancy, 106 'video-redundancy': processVideoRedundancy,
99 'move-to-object-storage': processMoveToObjectStorage, 107 'move-to-object-storage': processMoveToObjectStorage,
100 'manage-video-torrent': processManageVideoTorrent, 108 'manage-video-torrent': processManageVideoTorrent,
101 'video-studio-edition': processVideoStudioEdition 109 'notify': processNotify,
110 'video-studio-edition': processVideoStudioEdition,
111 'federate-video': processFederateVideo
102} 112}
103 113
104const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { 114const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
123 'video-live-ending', 133 'video-live-ending',
124 'move-to-object-storage', 134 'move-to-object-storage',
125 'manage-video-torrent', 135 'manage-video-torrent',
126 'video-studio-edition' 136 'video-studio-edition',
137 'notify',
138 'federate-video'
127] 139]
128 140
129const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) 141const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
@@ -137,6 +149,8 @@ class JobQueue {
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} 149 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {} 150 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139 151
152 private flowProducer: FlowProducer
153
140 private initialized = false 154 private initialized = false
141 private jobRedisPrefix: string 155 private jobRedisPrefix: string
142 156
@@ -157,6 +171,11 @@ class JobQueue {
157 this.buildQueueEvent(handlerName, produceOnly) 171 this.buildQueueEvent(handlerName, produceOnly)
158 } 172 }
159 173
174 this.flowProducer = new FlowProducer({
175 connection: this.getRedisConnection(),
176 prefix: this.jobRedisPrefix
177 })
178
160 this.addRepeatableJobs() 179 this.addRepeatableJobs()
161 } 180 }
162 181
@@ -243,6 +262,8 @@ class JobQueue {
243 } 262 }
244 } 263 }
245 264
265 // ---------------------------------------------------------------------------
266
246 async terminate () { 267 async terminate () {
247 const promises = Object.keys(this.workers) 268 const promises = Object.keys(this.workers)
248 .map(handlerName => { 269 .map(handlerName => {
@@ -278,28 +299,56 @@ class JobQueue {
278 } 299 }
279 } 300 }
280 301
281 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { 302 // ---------------------------------------------------------------------------
282 this.createJobWithPromise(obj, options) 303
283 .catch(err => logger.error('Cannot create job.', { err, obj })) 304 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
305 this.createJob(options)
306 .catch(err => logger.error('Cannot create job.', { err, options }))
284 } 307 }
285 308
286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 309 async createJob (options: CreateJobArgument & CreateJobOptions) {
287 const queue: Queue = this.queues[obj.type] 310 const queue: Queue = this.queues[options.type]
288 if (queue === undefined) { 311 if (queue === undefined) {
289 logger.error('Unknown queue %s: cannot create job.', obj.type) 312 logger.error('Unknown queue %s: cannot create job.', options.type)
290 return 313 return
291 } 314 }
292 315
293 const jobArgs: JobsOptions = { 316 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
317
318 return queue.add('job', options.payload, jobOptions)
319 }
320
321 async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
322 let lastJob: FlowJob
323
324 for (const job of jobs) {
325 if (!job) continue
326
327 lastJob = {
328 name: 'job',
329 data: job.payload,
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob
333 ? [ lastJob ]
334 : []
335 }
336 }
337
338 return this.flowProducer.add(lastJob)
339 }
340
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return {
294 backoff: { delay: 60 * 1000, type: 'exponential' }, 343 backoff: { delay: 60 * 1000, type: 'exponential' },
295 attempts: JOB_ATTEMPTS[obj.type], 344 attempts: JOB_ATTEMPTS[type],
296 priority: options.priority, 345 priority: options.priority,
297 delay: options.delay 346 delay: options.delay
298 } 347 }
299
300 return queue.add('job', obj.payload, jobArgs)
301 } 348 }
302 349
350 // ---------------------------------------------------------------------------
351
303 async listForApi (options: { 352 async listForApi (options: {
304 state?: JobState 353 state?: JobState
305 start: number 354 start: number
@@ -367,6 +416,8 @@ class JobQueue {
367 return Promise.all(promises) 416 return Promise.all(promises)
368 } 417 }
369 418
419 // ---------------------------------------------------------------------------
420
370 async removeOldJobs () { 421 async removeOldJobs () {
371 for (const key of Object.keys(this.queues)) { 422 for (const key of Object.keys(this.queues)) {
372 const queue: Queue = this.queues[key] 423 const queue: Queue = this.queues[key]
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 1410889a2..aadd8e308 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -408,7 +408,7 @@ class LiveManager {
408 await liveSession.save() 408 await liveSession.save()
409 } 409 }
410 410
411 JobQueue.Instance.createJob({ 411 JobQueue.Instance.createJobAsync({
412 type: 'video-live-ending', 412 type: 'video-live-ending',
413 payload: { 413 payload: {
414 videoId: fullVideo.id, 414 videoId: fullVideo.id,
@@ -421,8 +421,12 @@ class LiveManager {
421 streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, 421 streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
422 422
423 publishedAt: fullVideo.publishedAt.toISOString() 423 publishedAt: fullVideo.publishedAt.toISOString()
424 } 424 },
425 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) 425
426 delay: cleanupNow
427 ? 0
428 : VIDEO_LIVE.CLEANUP_DELAY
429 })
426 430
427 fullVideo.state = live.permanentLive 431 fullVideo.state = live.permanentLive
428 ? VideoState.WAITING_FOR_LIVE 432 ? VideoState.WAITING_FOR_LIVE
diff --git a/server/lib/notifier/notifier.ts b/server/lib/notifier/notifier.ts
index d1c4c0215..66cfc31c4 100644
--- a/server/lib/notifier/notifier.ts
+++ b/server/lib/notifier/notifier.ts
@@ -242,7 +242,7 @@ class Notifier {
242 242
243 for (const to of toEmails) { 243 for (const to of toEmails) {
244 const payload = await object.createEmail(to) 244 const payload = await object.createEmail(to)
245 JobQueue.Instance.createJob({ type: 'email', payload }) 245 JobQueue.Instance.createJobAsync({ type: 'email', payload })
246 } 246 }
247 } 247 }
248 248
diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts
index d9f9c2de3..956ece749 100644
--- a/server/lib/schedulers/auto-follow-index-instances.ts
+++ b/server/lib/schedulers/auto-follow-index-instances.ts
@@ -59,7 +59,7 @@ export class AutoFollowIndexInstances extends AbstractScheduler {
59 isAutoFollow: true 59 isAutoFollow: true
60 } 60 }
61 61
62 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 62 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
63 } 63 }
64 } 64 }
65 65
diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts
index b5d8353b7..9ebbd7679 100644
--- a/server/lib/video-state.ts
+++ b/server/lib/video-state.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
2import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
4import { sequelizeTypescript } from '@server/initializers/database' 5import { sequelizeTypescript } from '@server/initializers/database'
@@ -7,9 +8,9 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
7import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' 8import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models'
8import { VideoState } from '@shared/models' 9import { VideoState } from '@shared/models'
9import { federateVideoIfNeeded } from './activitypub/videos' 10import { federateVideoIfNeeded } from './activitypub/videos'
11import { JobQueue } from './job-queue'
10import { Notifier } from './notifier' 12import { Notifier } from './notifier'
11import { addMoveToObjectStorageJob } from './video' 13import { buildMoveToObjectStorageJob } from './video'
12import { retryTransactionWrapper } from '@server/helpers/database-utils'
13 14
14function buildNextVideoState (currentState?: VideoState) { 15function buildNextVideoState (currentState?: VideoState) {
15 if (currentState === VideoState.PUBLISHED) { 16 if (currentState === VideoState.PUBLISHED) {
@@ -86,7 +87,7 @@ async function moveToExternalStorageState (options: {
86 logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) 87 logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
87 88
88 try { 89 try {
89 await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }) 90 await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }))
90 91
91 return true 92 return true
92 } catch (err) { 93 } catch (err) {
diff --git a/server/lib/video.ts b/server/lib/video.ts
index b843b11bc..f7d7aa186 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -1,5 +1,7 @@
1import { UploadFiles } from 'express' 1import { UploadFiles } from 'express'
2import memoizee from 'memoizee'
2import { Transaction } from 'sequelize/types' 3import { Transaction } from 'sequelize/types'
4import { CONFIG } from '@server/initializers/config'
3import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' 5import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants'
4import { TagModel } from '@server/models/video/tag' 6import { TagModel } from '@server/models/video/tag'
5import { VideoModel } from '@server/models/video/video' 7import { VideoModel } from '@server/models/video/video'
@@ -9,8 +11,6 @@ import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID
9import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
10import { CreateJobOptions, JobQueue } from './job-queue/job-queue' 12import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
11import { updateVideoMiniatureFromExisting } from './thumbnail' 13import { updateVideoMiniatureFromExisting } from './thumbnail'
12import { CONFIG } from '@server/initializers/config'
13import memoizee from 'memoizee'
14 14
15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { 15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
16 return { 16 return {
@@ -86,7 +86,7 @@ async function setVideoTags (options: {
86 86
87// --------------------------------------------------------------------------- 87// ---------------------------------------------------------------------------
88 88
89async function addOptimizeOrMergeAudioJob (options: { 89async function buildOptimizeOrMergeAudioJob (options: {
90 video: MVideoUUID 90 video: MVideoUUID
91 videoFile: MVideoFile 91 videoFile: MVideoFile
92 user: MUserId 92 user: MUserId
@@ -94,10 +94,10 @@ async function addOptimizeOrMergeAudioJob (options: {
94}) { 94}) {
95 const { video, videoFile, user, isNewVideo } = options 95 const { video, videoFile, user, isNewVideo } = options
96 96
97 let dataInput: VideoTranscodingPayload 97 let payload: VideoTranscodingPayload
98 98
99 if (videoFile.isAudio()) { 99 if (videoFile.isAudio()) {
100 dataInput = { 100 payload = {
101 type: 'merge-audio-to-webtorrent', 101 type: 'merge-audio-to-webtorrent',
102 resolution: DEFAULT_AUDIO_RESOLUTION, 102 resolution: DEFAULT_AUDIO_RESOLUTION,
103 videoUUID: video.uuid, 103 videoUUID: video.uuid,
@@ -105,24 +105,26 @@ async function addOptimizeOrMergeAudioJob (options: {
105 isNewVideo 105 isNewVideo
106 } 106 }
107 } else { 107 } else {
108 dataInput = { 108 payload = {
109 type: 'optimize-to-webtorrent', 109 type: 'optimize-to-webtorrent',
110 videoUUID: video.uuid, 110 videoUUID: video.uuid,
111 isNewVideo 111 isNewVideo
112 } 112 }
113 } 113 }
114 114
115 const jobOptions = { 115 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
116 priority: await getTranscodingJobPriority(user)
117 }
118 116
119 return addTranscodingJob(dataInput, jobOptions) 117 return {
118 type: 'video-transcoding' as 'video-transcoding',
119 priority: await getTranscodingJobPriority(user),
120 payload
121 }
120} 122}
121 123
122async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { 124async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
123 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') 125 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
124 126
125 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options) 127 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
126} 128}
127 129
128async function getTranscodingJobPriority (user: MUserId) { 130async function getTranscodingJobPriority (user: MUserId) {
@@ -136,7 +138,7 @@ async function getTranscodingJobPriority (user: MUserId) {
136 138
137// --------------------------------------------------------------------------- 139// ---------------------------------------------------------------------------
138 140
139async function addMoveToObjectStorageJob (options: { 141async function buildMoveToObjectStorageJob (options: {
140 video: MVideoUUID 142 video: MVideoUUID
141 previousVideoState: VideoState 143 previousVideoState: VideoState
142 isNewVideo?: boolean // Default true 144 isNewVideo?: boolean // Default true
@@ -145,8 +147,14 @@ async function addMoveToObjectStorageJob (options: {
145 147
146 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') 148 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove')
147 149
148 const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState } 150 return {
149 return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput }) 151 type: 'move-to-object-storage' as 'move-to-object-storage',
152 payload: {
153 videoUUID: video.uuid,
154 isNewVideo,
155 previousVideoState
156 }
157 }
150} 158}
151 159
152// --------------------------------------------------------------------------- 160// ---------------------------------------------------------------------------
@@ -173,9 +181,9 @@ export {
173 buildLocalVideoFromReq, 181 buildLocalVideoFromReq,
174 buildVideoThumbnailsFromReq, 182 buildVideoThumbnailsFromReq,
175 setVideoTags, 183 setVideoTags,
176 addOptimizeOrMergeAudioJob, 184 buildOptimizeOrMergeAudioJob,
177 addTranscodingJob, 185 addTranscodingJob,
178 addMoveToObjectStorageJob, 186 buildMoveToObjectStorageJob,
179 getTranscodingJobPriority, 187 getTranscodingJobPriority,
180 getCachedVideoDuration 188 getCachedVideoDuration
181} 189}
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts
index a924183f2..8c8f64de9 100644
--- a/shared/models/server/job.model.ts
+++ b/shared/models/server/job.model.ts
@@ -25,6 +25,8 @@ export type JobType =
25 | 'manage-video-torrent' 25 | 'manage-video-torrent'
26 | 'move-to-object-storage' 26 | 'move-to-object-storage'
27 | 'video-studio-edition' 27 | 'video-studio-edition'
28 | 'notify'
29 | 'federate-video'
28 30
29export interface Job { 31export interface Job {
30 id: number | string 32 id: number | string
@@ -214,3 +216,18 @@ export interface VideoStudioEditionPayload {
214 videoUUID: string 216 videoUUID: string
215 tasks: VideoStudioTaskPayload[] 217 tasks: VideoStudioTaskPayload[]
216} 218}
219
220// ---------------------------------------------------------------------------
221
222export type NotifyPayload =
223 {
224 action: 'new-video'
225 videoUUID: string
226 }
227
228// ---------------------------------------------------------------------------
229
230export interface FederateVideoPayload {
231 videoUUID: string
232 isNewVideo: boolean
233}