diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-08 15:48:17 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:18:07 +0200 |
commit | bd911b54b555b11df7e9849cf92d358bccfecf6e (patch) | |
tree | 23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server | |
parent | 5a921e7b74910414626bfc9672b857e987e3ebed (diff) | |
download | PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip |
Use bullmq job dependency
Diffstat (limited to 'server')
40 files changed, 296 insertions, 151 deletions
diff --git a/server/controllers/api/accounts.ts b/server/controllers/api/accounts.ts index 8d9f92d93..66cdaab82 100644 --- a/server/controllers/api/accounts.ts +++ b/server/controllers/api/accounts.ts | |||
@@ -119,7 +119,7 @@ function getAccount (req: express.Request, res: express.Response) { | |||
119 | const account = res.locals.account | 119 | const account = res.locals.account |
120 | 120 | ||
121 | if (account.isOutdated()) { | 121 | if (account.isOutdated()) { |
122 | JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) | 122 | JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) |
123 | } | 123 | } |
124 | 124 | ||
125 | return res.json(account.toFormattedJSON()) | 125 | return res.json(account.toFormattedJSON()) |
diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 60d36ed59..87828813a 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts | |||
@@ -138,7 +138,7 @@ async function addFollow (req: express.Request, res: express.Response) { | |||
138 | followerActorId: follower.id | 138 | followerActorId: follower.id |
139 | } | 139 | } |
140 | 140 | ||
141 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 141 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
142 | } | 142 | } |
143 | 143 | ||
144 | for (const handle of handles) { | 144 | for (const handle of handles) { |
@@ -150,7 +150,7 @@ async function addFollow (req: express.Request, res: express.Response) { | |||
150 | followerActorId: follower.id | 150 | followerActorId: follower.id |
151 | } | 151 | } |
152 | 152 | ||
153 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 153 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
154 | } | 154 | } |
155 | 155 | ||
156 | return res.status(HttpStatusCode.NO_CONTENT_204).end() | 156 | return res.status(HttpStatusCode.NO_CONTENT_204).end() |
diff --git a/server/controllers/api/server/redundancy.ts b/server/controllers/api/server/redundancy.ts index 9f43d3e4e..94e187cd4 100644 --- a/server/controllers/api/server/redundancy.ts +++ b/server/controllers/api/server/redundancy.ts | |||
@@ -85,7 +85,7 @@ async function addVideoRedundancy (req: express.Request, res: express.Response) | |||
85 | videoId: res.locals.onlyVideo.id | 85 | videoId: res.locals.onlyVideo.id |
86 | } | 86 | } |
87 | 87 | ||
88 | await JobQueue.Instance.createJobWithPromise({ | 88 | await JobQueue.Instance.createJob({ |
89 | type: 'video-redundancy', | 89 | type: 'video-redundancy', |
90 | payload | 90 | payload |
91 | }) | 91 | }) |
diff --git a/server/controllers/api/users/my-subscriptions.ts b/server/controllers/api/users/my-subscriptions.ts index fb1f68635..a750f9bd1 100644 --- a/server/controllers/api/users/my-subscriptions.ts +++ b/server/controllers/api/users/my-subscriptions.ts | |||
@@ -122,7 +122,7 @@ function addUserSubscription (req: express.Request, res: express.Response) { | |||
122 | followerActorId: user.Account.Actor.id | 122 | followerActorId: user.Account.Actor.id |
123 | } | 123 | } |
124 | 124 | ||
125 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 125 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
126 | 126 | ||
127 | return res.status(HttpStatusCode.NO_CONTENT_204).end() | 127 | return res.status(HttpStatusCode.NO_CONTENT_204).end() |
128 | } | 128 | } |
diff --git a/server/controllers/api/video-channel.ts b/server/controllers/api/video-channel.ts index 411ec8630..6b33e894d 100644 --- a/server/controllers/api/video-channel.ts +++ b/server/controllers/api/video-channel.ts | |||
@@ -245,7 +245,7 @@ async function addVideoChannel (req: express.Request, res: express.Response) { | |||
245 | }) | 245 | }) |
246 | 246 | ||
247 | const payload = { actorId: videoChannelCreated.actorId } | 247 | const payload = { actorId: videoChannelCreated.actorId } |
248 | await JobQueue.Instance.createJobWithPromise({ type: 'actor-keys', payload }) | 248 | await JobQueue.Instance.createJob({ type: 'actor-keys', payload }) |
249 | 249 | ||
250 | auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON())) | 250 | auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON())) |
251 | logger.info('Video channel %s created.', videoChannelCreated.Actor.url) | 251 | logger.info('Video channel %s created.', videoChannelCreated.Actor.url) |
@@ -335,7 +335,7 @@ async function getVideoChannel (req: express.Request, res: express.Response) { | |||
335 | const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id }) | 335 | const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id }) |
336 | 336 | ||
337 | if (videoChannel.isOutdated()) { | 337 | if (videoChannel.isOutdated()) { |
338 | JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) | 338 | JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) |
339 | } | 339 | } |
340 | 340 | ||
341 | return res.json(videoChannel.toFormattedJSON()) | 341 | return res.json(videoChannel.toFormattedJSON()) |
diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts index b12953630..5a2e1006a 100644 --- a/server/controllers/api/videos/import.ts +++ b/server/controllers/api/videos/import.ts | |||
@@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to | |||
163 | videoImportId: videoImport.id, | 163 | videoImportId: videoImport.id, |
164 | magnetUri | 164 | magnetUri |
165 | } | 165 | } |
166 | await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) | 166 | await JobQueue.Instance.createJob({ type: 'video-import', payload }) |
167 | 167 | ||
168 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) | 168 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) |
169 | 169 | ||
@@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response) | |||
255 | videoImportId: videoImport.id, | 255 | videoImportId: videoImport.id, |
256 | fileExt | 256 | fileExt |
257 | } | 257 | } |
258 | await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) | 258 | await JobQueue.Instance.createJob({ type: 'video-import', payload }) |
259 | 259 | ||
260 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) | 260 | auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) |
261 | 261 | ||
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index eca72c397..b301515df 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts | |||
@@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) { | |||
151 | const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) | 151 | const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) |
152 | 152 | ||
153 | if (video.isOutdated()) { | 153 | if (video.isOutdated()) { |
154 | JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) | 154 | JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) |
155 | } | 155 | } |
156 | 156 | ||
157 | return res.json(video.toFormattedDetailsJSON()) | 157 | return res.json(video.toFormattedDetailsJSON()) |
diff --git a/server/controllers/api/videos/studio.ts b/server/controllers/api/videos/studio.ts index bff344f3f..6667532bf 100644 --- a/server/controllers/api/videos/studio.ts +++ b/server/controllers/api/videos/studio.ts | |||
@@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response) | |||
71 | tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) | 71 | tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) |
72 | } | 72 | } |
73 | 73 | ||
74 | JobQueue.Instance.createJob({ type: 'video-studio-edition', payload }) | 74 | JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload }) |
75 | 75 | ||
76 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 76 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
77 | } | 77 | } |
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 1545a2232..ab1a23d9a 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import express from 'express' | 1 | import express from 'express' |
2 | import { Transaction } from 'sequelize/types' | 2 | import { Transaction } from 'sequelize/types' |
3 | import { changeVideoChannelShare } from '@server/lib/activitypub/share' | 3 | import { changeVideoChannelShare } from '@server/lib/activitypub/share' |
4 | import { JobQueue } from '@server/lib/job-queue' | 4 | import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' |
5 | import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' | 5 | import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' |
6 | import { openapiOperationDoc } from '@server/middlewares/doc' | 6 | import { openapiOperationDoc } from '@server/middlewares/doc' |
7 | import { FilteredModelAttributes } from '@server/types' | 7 | import { FilteredModelAttributes } from '@server/types' |
@@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils' | |||
13 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 13 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
14 | import { MIMETYPES } from '../../../initializers/constants' | 14 | import { MIMETYPES } from '../../../initializers/constants' |
15 | import { sequelizeTypescript } from '../../../initializers/database' | 15 | import { sequelizeTypescript } from '../../../initializers/database' |
16 | import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' | ||
17 | import { Notifier } from '../../../lib/notifier' | ||
18 | import { Hooks } from '../../../lib/plugins/hooks' | 16 | import { Hooks } from '../../../lib/plugins/hooks' |
19 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' | 17 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' |
20 | import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' | 18 | import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' |
@@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) { | |||
139 | return { videoInstanceUpdated, isNewVideo } | 137 | return { videoInstanceUpdated, isNewVideo } |
140 | }) | 138 | }) |
141 | 139 | ||
142 | const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate) | 140 | Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res }) |
143 | 141 | ||
144 | await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t)) | 142 | await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo }) |
145 | |||
146 | if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | ||
147 | |||
148 | Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res }) | ||
149 | } catch (err) { | 143 | } catch (err) { |
150 | // Force fields we want to update | 144 | // Force fields we want to update |
151 | // If the transaction is retried, sequelize will think the object has not changed | 145 | // If the transaction is retried, sequelize will think the object has not changed |
@@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide | |||
192 | } | 186 | } |
193 | } | 187 | } |
194 | 188 | ||
195 | async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) { | 189 | async function addVideoJobsAfterUpdate (options: { |
196 | if (video.isLive || !videoInfoToUpdate.name) return video | 190 | video: MVideoFullLight |
191 | videoInfoToUpdate: VideoUpdate | ||
192 | wasConfidentialVideo: boolean | ||
193 | isNewVideo: boolean | ||
194 | }) { | ||
195 | const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options | ||
196 | const jobs: CreateJobArgument[] = [] | ||
197 | |||
198 | if (!video.isLive && videoInfoToUpdate.name) { | ||
197 | 199 | ||
198 | for (const file of (video.VideoFiles || [])) { | 200 | for (const file of (video.VideoFiles || [])) { |
199 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } | 201 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } |
200 | 202 | ||
201 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 203 | jobs.push({ type: 'manage-video-torrent', payload }) |
202 | await JobQueue.Instance.waitJob(job) | 204 | } |
203 | } | ||
204 | 205 | ||
205 | const hls = video.getHLSPlaylist() | 206 | const hls = video.getHLSPlaylist() |
206 | 207 | ||
207 | for (const file of (hls?.VideoFiles || [])) { | 208 | for (const file of (hls?.VideoFiles || [])) { |
208 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } | 209 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } |
209 | 210 | ||
210 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 211 | jobs.push({ type: 'manage-video-torrent', payload }) |
211 | await JobQueue.Instance.waitJob(job) | 212 | } |
213 | } | ||
214 | |||
215 | jobs.push({ | ||
216 | type: 'federate-video', | ||
217 | payload: { | ||
218 | videoUUID: video.uuid, | ||
219 | isNewVideo | ||
220 | } | ||
221 | }) | ||
222 | |||
223 | if (wasConfidentialVideo) { | ||
224 | jobs.push({ | ||
225 | type: 'notify', | ||
226 | payload: { | ||
227 | action: 'new-video', | ||
228 | videoUUID: video.uuid | ||
229 | } | ||
230 | }) | ||
212 | } | 231 | } |
213 | 232 | ||
214 | // Refresh video since files have changed | 233 | return JobQueue.Instance.createSequentialJobFlow(...jobs) |
215 | return VideoModel.loadFull(video.id) | ||
216 | } | 234 | } |
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 4a9d7b619..cc171eece 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts | |||
@@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
8 | import { Redis } from '@server/lib/redis' | 8 | import { Redis } from '@server/lib/redis' |
9 | import { uploadx } from '@server/lib/uploadx' | 9 | import { uploadx } from '@server/lib/uploadx' |
10 | import { | 10 | import { |
11 | addMoveToObjectStorageJob, | ||
12 | addOptimizeOrMergeAudioJob, | ||
13 | buildLocalVideoFromReq, | 11 | buildLocalVideoFromReq, |
12 | buildMoveToObjectStorageJob, | ||
13 | buildOptimizeOrMergeAudioJob, | ||
14 | buildVideoThumbnailsFromReq, | 14 | buildVideoThumbnailsFromReq, |
15 | setVideoTags | 15 | setVideoTags |
16 | } from '@server/lib/video' | 16 | } from '@server/lib/video' |
@@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager' | |||
18 | import { buildNextVideoState } from '@server/lib/video-state' | 18 | import { buildNextVideoState } from '@server/lib/video-state' |
19 | import { openapiOperationDoc } from '@server/middlewares/doc' | 19 | import { openapiOperationDoc } from '@server/middlewares/doc' |
20 | import { VideoSourceModel } from '@server/models/video/video-source' | 20 | import { VideoSourceModel } from '@server/models/video/video-source' |
21 | import { MVideoFile, MVideoFullLight } from '@server/types/models' | 21 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' |
22 | import { getLowercaseExtension } from '@shared/core-utils' | 22 | import { getLowercaseExtension } from '@shared/core-utils' |
23 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' | 23 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' |
24 | import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models' | 24 | import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models' |
25 | import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' | 25 | import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' |
26 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
27 | import { createReqFiles } from '../../../helpers/express-utils' | 26 | import { createReqFiles } from '../../../helpers/express-utils' |
28 | import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' | 27 | import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' |
29 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 28 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
30 | import { MIMETYPES } from '../../../initializers/constants' | 29 | import { MIMETYPES } from '../../../initializers/constants' |
31 | import { sequelizeTypescript } from '../../../initializers/database' | 30 | import { sequelizeTypescript } from '../../../initializers/database' |
32 | import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' | ||
33 | import { Notifier } from '../../../lib/notifier' | ||
34 | import { Hooks } from '../../../lib/plugins/hooks' | 31 | import { Hooks } from '../../../lib/plugins/hooks' |
35 | import { generateVideoMiniature } from '../../../lib/thumbnail' | 32 | import { generateVideoMiniature } from '../../../lib/thumbnail' |
36 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' | 33 | import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' |
@@ -216,22 +213,8 @@ async function addVideo (options: { | |||
216 | // Channel has a new content, set as updated | 213 | // Channel has a new content, set as updated |
217 | await videoCreated.VideoChannel.setAsUpdated() | 214 | await videoCreated.VideoChannel.setAsUpdated() |
218 | 215 | ||
219 | createTorrentFederate(videoCreated, videoFile) | 216 | addVideoJobsAfterUpload(videoCreated, videoFile, user) |
220 | .catch(err => { | 217 | .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) |
221 | logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }) | ||
222 | |||
223 | return videoCreated | ||
224 | }).then(refreshedVideo => { | ||
225 | if (!refreshedVideo) return | ||
226 | |||
227 | if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
228 | return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined }) | ||
229 | } | ||
230 | |||
231 | if (refreshedVideo.state === VideoState.TO_TRANSCODE) { | ||
232 | return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user }) | ||
233 | } | ||
234 | }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) | ||
235 | 218 | ||
236 | Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) | 219 | Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) |
237 | 220 | ||
@@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { | |||
266 | return videoFile | 249 | return videoFile |
267 | } | 250 | } |
268 | 251 | ||
269 | async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) { | 252 | async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) { |
270 | const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } | 253 | return JobQueue.Instance.createSequentialJobFlow( |
271 | 254 | { | |
272 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 255 | type: 'manage-video-torrent' as 'manage-video-torrent', |
273 | await JobQueue.Instance.waitJob(job) | 256 | payload: { |
274 | 257 | videoId: video.id, | |
275 | const refreshedVideo = await VideoModel.loadFull(video.id) | 258 | videoFileId: videoFile.id, |
276 | if (!refreshedVideo) return | 259 | action: 'create' |
277 | 260 | } | |
278 | // Only federate and notify after the torrent creation | 261 | }, |
279 | Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | 262 | { |
263 | type: 'federate-video' as 'federate-video', | ||
264 | payload: { | ||
265 | videoUUID: video.uuid, | ||
266 | isNewVideo: true | ||
267 | } | ||
268 | }, | ||
280 | 269 | ||
281 | await retryTransactionWrapper(() => { | 270 | video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE |
282 | return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t)) | 271 | ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined }) |
283 | }) | 272 | : undefined, |
284 | 273 | ||
285 | return refreshedVideo | 274 | video.state === VideoState.TO_TRANSCODE |
275 | ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) | ||
276 | : undefined | ||
277 | ) | ||
286 | } | 278 | } |
287 | 279 | ||
288 | async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { | 280 | async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index db43c59be..a53c22662 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -156,7 +156,9 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
156 | 'video-live-ending': 1, | 156 | 'video-live-ending': 1, |
157 | 'video-studio-edition': 1, | 157 | 'video-studio-edition': 1, |
158 | 'manage-video-torrent': 1, | 158 | 'manage-video-torrent': 1, |
159 | 'move-to-object-storage': 3 | 159 | 'move-to-object-storage': 3, |
160 | 'notify': 1, | ||
161 | 'federate-video': 1 | ||
160 | } | 162 | } |
161 | // Excluded keys are jobs that can be configured by admins | 163 | // Excluded keys are jobs that can be configured by admins |
162 | const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { | 164 | const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { |
@@ -175,7 +177,9 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im | |||
175 | 'video-live-ending': 10, | 177 | 'video-live-ending': 10, |
176 | 'video-studio-edition': 1, | 178 | 'video-studio-edition': 1, |
177 | 'manage-video-torrent': 1, | 179 | 'manage-video-torrent': 1, |
178 | 'move-to-object-storage': 1 | 180 | 'move-to-object-storage': 1, |
181 | 'notify': 5, | ||
182 | 'federate-video': 3 | ||
179 | } | 183 | } |
180 | const JOB_TTL: { [id in JobType]: number } = { | 184 | const JOB_TTL: { [id in JobType]: number } = { |
181 | 'activitypub-http-broadcast': 60000 * 10, // 10 minutes | 185 | 'activitypub-http-broadcast': 60000 * 10, // 10 minutes |
@@ -195,6 +199,8 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
195 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours | 199 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours |
196 | 'video-live-ending': 1000 * 60 * 10, // 10 minutes | 200 | 'video-live-ending': 1000 * 60 * 10, // 10 minutes |
197 | 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours | 201 | 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours |
202 | 'notify': 60000 * 5, // 5 minutes | ||
203 | 'federate-video': 60000 * 5, // 5 minutes | ||
198 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours | 204 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours |
199 | } | 205 | } |
200 | const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { | 206 | const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { |
diff --git a/server/lib/activitypub/actors/get.ts b/server/lib/activitypub/actors/get.ts index d2b651082..e73b7d707 100644 --- a/server/lib/activitypub/actors/get.ts +++ b/server/lib/activitypub/actors/get.ts | |||
@@ -110,7 +110,7 @@ async function loadActorFromDB (actorUrl: string, fetchType: ActorLoadByUrlType) | |||
110 | async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) { | 110 | async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) { |
111 | if ((created === true || refreshed === true) && updateCollections === true) { | 111 | if ((created === true || refreshed === true) && updateCollections === true) { |
112 | const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' } | 112 | const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' } |
113 | await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) | 113 | await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
114 | } | 114 | } |
115 | } | 115 | } |
116 | 116 | ||
@@ -118,6 +118,6 @@ async function schedulePlaylistFetchIfNeeded (actor: MActorAccountId, created: b | |||
118 | // We created a new account: fetch the playlists | 118 | // We created a new account: fetch the playlists |
119 | if (created === true && actor.Account && accountPlaylistsUrl) { | 119 | if (created === true && actor.Account && accountPlaylistsUrl) { |
120 | const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' } | 120 | const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' } |
121 | await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) | 121 | await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
122 | } | 122 | } |
123 | } | 123 | } |
diff --git a/server/lib/activitypub/follow.ts b/server/lib/activitypub/follow.ts index 741b54df5..f6e2a48fd 100644 --- a/server/lib/activitypub/follow.ts +++ b/server/lib/activitypub/follow.ts | |||
@@ -27,7 +27,7 @@ async function autoFollowBackIfNeeded (actorFollow: MActorFollowActors, transact | |||
27 | isAutoFollow: true | 27 | isAutoFollow: true |
28 | } | 28 | } |
29 | 29 | ||
30 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 30 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
31 | } | 31 | } |
32 | } | 32 | } |
33 | 33 | ||
diff --git a/server/lib/activitypub/outbox.ts b/server/lib/activitypub/outbox.ts index ecdc33a77..5eef76871 100644 --- a/server/lib/activitypub/outbox.ts +++ b/server/lib/activitypub/outbox.ts | |||
@@ -16,7 +16,7 @@ async function addFetchOutboxJob (actor: Pick<ActorModel, 'id' | 'outboxUrl'>) { | |||
16 | type: 'activity' as 'activity' | 16 | type: 'activity' as 'activity' |
17 | } | 17 | } |
18 | 18 | ||
19 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | 19 | return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload }) |
20 | } | 20 | } |
21 | 21 | ||
22 | export { | 22 | export { |
diff --git a/server/lib/activitypub/playlists/refresh.ts b/server/lib/activitypub/playlists/refresh.ts index 493e8c7ec..33260ea02 100644 --- a/server/lib/activitypub/playlists/refresh.ts +++ b/server/lib/activitypub/playlists/refresh.ts | |||
@@ -9,7 +9,7 @@ import { fetchRemoteVideoPlaylist } from './shared' | |||
9 | function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { | 9 | function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { |
10 | if (!playlist.isOutdated()) return | 10 | if (!playlist.isOutdated()) return |
11 | 11 | ||
12 | JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) | 12 | JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) |
13 | } | 13 | } |
14 | 14 | ||
15 | async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> { | 15 | async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> { |
diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts index fcec63991..2bc1ef8f5 100644 --- a/server/lib/activitypub/send/shared/send-utils.ts +++ b/server/lib/activitypub/send/shared/send-utils.ts | |||
@@ -120,7 +120,7 @@ async function forwardActivity ( | |||
120 | body: activity, | 120 | body: activity, |
121 | contextType: null | 121 | contextType: null |
122 | } | 122 | } |
123 | return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })) | 123 | return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload })) |
124 | } | 124 | } |
125 | 125 | ||
126 | // --------------------------------------------------------------------------- | 126 | // --------------------------------------------------------------------------- |
@@ -205,7 +205,7 @@ function broadcastTo (options: { | |||
205 | contextType | 205 | contextType |
206 | } | 206 | } |
207 | 207 | ||
208 | JobQueue.Instance.createJob({ | 208 | JobQueue.Instance.createJobAsync({ |
209 | type: parallelizable | 209 | type: parallelizable |
210 | ? 'activitypub-http-broadcast-parallel' | 210 | ? 'activitypub-http-broadcast-parallel' |
211 | : 'activitypub-http-broadcast', | 211 | : 'activitypub-http-broadcast', |
@@ -222,7 +222,7 @@ function broadcastTo (options: { | |||
222 | contextType | 222 | contextType |
223 | } | 223 | } |
224 | 224 | ||
225 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | 225 | JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload }) |
226 | } | 226 | } |
227 | } | 227 | } |
228 | 228 | ||
@@ -243,7 +243,7 @@ function unicastTo (options: { | |||
243 | contextType | 243 | contextType |
244 | } | 244 | } |
245 | 245 | ||
246 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | 246 | JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload }) |
247 | } | 247 | } |
248 | 248 | ||
249 | // --------------------------------------------------------------------------- | 249 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/activitypub/videos/get.ts b/server/lib/activitypub/videos/get.ts index b74df132c..14ba55034 100644 --- a/server/lib/activitypub/videos/get.ts +++ b/server/lib/activitypub/videos/get.ts | |||
@@ -107,7 +107,7 @@ async function scheduleRefresh (video: MVideoThumbnail, fetchType: VideoLoadByUr | |||
107 | return refreshVideoIfNeeded(refreshOptions) | 107 | return refreshVideoIfNeeded(refreshOptions) |
108 | } | 108 | } |
109 | 109 | ||
110 | await JobQueue.Instance.createJobWithPromise({ | 110 | await JobQueue.Instance.createJob({ |
111 | type: 'activitypub-refresher', | 111 | type: 'activitypub-refresher', |
112 | payload: { type: 'video', url: video.url } | 112 | payload: { type: 'video', url: video.url } |
113 | }) | 113 | }) |
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8cf0c87a6..8ed1b6447 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts | |||
@@ -74,7 +74,7 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi | |||
74 | } | 74 | } |
75 | 75 | ||
76 | function createJob (payload: ActivitypubHttpFetcherPayload) { | 76 | function createJob (payload: ActivitypubHttpFetcherPayload) { |
77 | return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) | 77 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
78 | } | 78 | } |
79 | 79 | ||
80 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { | 80 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { |
diff --git a/server/lib/emailer.ts b/server/lib/emailer.ts index bd1089530..9e546de7f 100644 --- a/server/lib/emailer.ts +++ b/server/lib/emailer.ts | |||
@@ -66,7 +66,7 @@ class Emailer { | |||
66 | } | 66 | } |
67 | } | 67 | } |
68 | 68 | ||
69 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 69 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
70 | } | 70 | } |
71 | 71 | ||
72 | addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { | 72 | addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { |
@@ -80,7 +80,7 @@ class Emailer { | |||
80 | } | 80 | } |
81 | } | 81 | } |
82 | 82 | ||
83 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 83 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
84 | } | 84 | } |
85 | 85 | ||
86 | addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { | 86 | addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { |
@@ -94,7 +94,7 @@ class Emailer { | |||
94 | } | 94 | } |
95 | } | 95 | } |
96 | 96 | ||
97 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 97 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
98 | } | 98 | } |
99 | 99 | ||
100 | addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { | 100 | addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { |
@@ -108,7 +108,7 @@ class Emailer { | |||
108 | text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` | 108 | text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` |
109 | } | 109 | } |
110 | 110 | ||
111 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 111 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
112 | } | 112 | } |
113 | 113 | ||
114 | addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { | 114 | addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { |
@@ -127,7 +127,7 @@ class Emailer { | |||
127 | } | 127 | } |
128 | } | 128 | } |
129 | 129 | ||
130 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 130 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
131 | } | 131 | } |
132 | 132 | ||
133 | async sendMail (options: EmailPayload) { | 133 | async sendMail (options: EmailPayload) { |
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 944da5be1..a68c32ba0 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) { | |||
17 | const payload = job.data as ActivitypubFollowPayload | 17 | const payload = job.data as ActivitypubFollowPayload |
18 | const host = payload.host | 18 | const host = payload.host |
19 | 19 | ||
20 | logger.info('Processing ActivityPub follow in job %d.', job.id) | 20 | logger.info('Processing ActivityPub follow in job %s.', job.id) |
21 | 21 | ||
22 | let targetActor: MActorFull | 22 | let targetActor: MActorFull |
23 | if (!host || host === WEBSERVER.HOST) { | 23 | if (!host || host === WEBSERVER.HOST) { |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 354c608fb..13eff5211 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests' | |||
8 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | 8 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
11 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 11 | logger.info('Processing ActivityPub broadcast in job %s.', job.id) |
12 | 12 | ||
13 | const payload = job.data as ActivitypubHttpBroadcastPayload | 13 | const payload = job.data as ActivitypubHttpBroadcastPayload |
14 | 14 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index e0b841887..b6cb3c4a6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share' | |||
12 | import { addVideoComments } from '../../activitypub/video-comments' | 12 | import { addVideoComments } from '../../activitypub/video-comments' |
13 | 13 | ||
14 | async function processActivityPubHttpFetcher (job: Job) { | 14 | async function processActivityPubHttpFetcher (job: Job) { |
15 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 15 | logger.info('Processing ActivityPub fetcher in job %s.', job.id) |
16 | 16 | ||
17 | const payload = job.data as ActivitypubHttpFetcherPayload | 17 | const payload = job.data as ActivitypubHttpFetcherPayload |
18 | 18 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 837a597a5..9e4e84002 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests' | |||
6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | 6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
9 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 9 | logger.info('Processing ActivityPub unicast in job %s.', job.id) |
10 | 10 | ||
11 | const payload = job.data as ActivitypubHttpUnicastPayload | 11 | const payload = job.data as ActivitypubHttpUnicastPayload |
12 | const uri = payload.uri | 12 | const uri = payload.uri |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 600f858a0..307e771ff 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors' | |||
11 | async function refreshAPObject (job: Job) { | 11 | async function refreshAPObject (job: Job) { |
12 | const payload = job.data as RefreshPayload | 12 | const payload = job.data as RefreshPayload |
13 | 13 | ||
14 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) | 14 | logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url) |
15 | 15 | ||
16 | if (payload.type === 'video') return refreshVideo(payload.url) | 16 | if (payload.type === 'video') return refreshVideo(payload.url) |
17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) | 17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) |
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 4a5bad9fb..27a2d431b 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' | |||
6 | 6 | ||
7 | async function processActorKeys (job: Job) { | 7 | async function processActorKeys (job: Job) { |
8 | const payload = job.data as ActorKeysPayload | 8 | const payload = job.data as ActorKeysPayload |
9 | logger.info('Processing actor keys in job %d.', job.id) | 9 | logger.info('Processing actor keys in job %s.', job.id) |
10 | 10 | ||
11 | const actor = await ActorModel.load(payload.actorId) | 11 | const actor = await ActorModel.load(payload.actorId) |
12 | 12 | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index b5b9475b1..567bcc076 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer' | |||
5 | 5 | ||
6 | async function processEmail (job: Job) { | 6 | async function processEmail (job: Job) { |
7 | const payload = job.data as EmailPayload | 7 | const payload = job.data as EmailPayload |
8 | logger.info('Processing email in job %d.', job.id) | 8 | logger.info('Processing email in job %s.', job.id) |
9 | 9 | ||
10 | return Emailer.Instance.sendMail(payload) | 10 | return Emailer.Instance.sendMail(payload) |
11 | } | 11 | } |
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts new file mode 100644 index 000000000..6aac36741 --- /dev/null +++ b/server/lib/job-queue/handlers/federate-video.ts | |||
@@ -0,0 +1,28 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { sequelizeTypescript } from '@server/initializers/database' | ||
4 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { FederateVideoPayload } from '@shared/models' | ||
7 | import { logger } from '../../../helpers/logger' | ||
8 | |||
9 | function processFederateVideo (job: Job) { | ||
10 | const payload = job.data as FederateVideoPayload | ||
11 | |||
12 | logger.info('Processing video federation in job %s.', job.id) | ||
13 | |||
14 | return retryTransactionWrapper(() => { | ||
15 | return sequelizeTypescript.transaction(async t => { | ||
16 | const video = await VideoModel.loadFull(payload.videoUUID, t) | ||
17 | if (!video) return | ||
18 | |||
19 | return federateVideoIfNeeded(video, payload.isNewVideo, t) | ||
20 | }) | ||
21 | }) | ||
22 | } | ||
23 | |||
24 | // --------------------------------------------------------------------------- | ||
25 | |||
26 | export { | ||
27 | processFederateVideo | ||
28 | } | ||
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index 4505ca79e..03aa414c9 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts | |||
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger' | |||
8 | 8 | ||
9 | async function processManageVideoTorrent (job: Job) { | 9 | async function processManageVideoTorrent (job: Job) { |
10 | const payload = job.data as ManageVideoTorrentPayload | 10 | const payload = job.data as ManageVideoTorrentPayload |
11 | logger.info('Processing torrent in job %d.', job.id) | 11 | logger.info('Processing torrent in job %s.', job.id) |
12 | 12 | ||
13 | if (payload.action === 'create') return doCreateAction(payload) | 13 | if (payload.action === 'create') return doCreateAction(payload) |
14 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) | 14 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) |
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index d608fd865..25bdebeea 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage') | |||
17 | 17 | ||
18 | export async function processMoveToObjectStorage (job: Job) { | 18 | export async function processMoveToObjectStorage (job: Job) { |
19 | const payload = job.data as MoveObjectStoragePayload | 19 | const payload = job.data as MoveObjectStoragePayload |
20 | logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) | 20 | logger.info('Moving video %s in job %s.', payload.videoUUID, job.id) |
21 | 21 | ||
22 | const video = await VideoModel.loadWithFiles(payload.videoUUID) | 22 | const video = await VideoModel.loadWithFiles(payload.videoUUID) |
23 | // No video, maybe deleted? | 23 | // No video, maybe deleted? |
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) { | |||
43 | 43 | ||
44 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | 44 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') |
45 | if (pendingMove === 0) { | 45 | if (pendingMove === 0) { |
46 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) | 46 | logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags) |
47 | 47 | ||
48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) | 48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) |
49 | } | 49 | } |
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts new file mode 100644 index 000000000..83605396c --- /dev/null +++ b/server/lib/job-queue/handlers/notify.ts | |||
@@ -0,0 +1,27 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { Notifier } from '@server/lib/notifier' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { NotifyPayload } from '@shared/models' | ||
5 | import { logger } from '../../../helpers/logger' | ||
6 | |||
7 | async function processNotify (job: Job) { | ||
8 | const payload = job.data as NotifyPayload | ||
9 | logger.info('Processing %s notification in job %s.', payload.action, job.id) | ||
10 | |||
11 | if (payload.action === 'new-video') return doNotifyNewVideo(payload) | ||
12 | } | ||
13 | |||
14 | // --------------------------------------------------------------------------- | ||
15 | |||
16 | export { | ||
17 | processNotify | ||
18 | } | ||
19 | |||
20 | // --------------------------------------------------------------------------- | ||
21 | |||
22 | async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) { | ||
23 | const refreshedVideo = await VideoModel.loadFull(payload.videoUUID) | ||
24 | if (!refreshedVideo) return | ||
25 | |||
26 | Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | ||
27 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 40c44cf52..d950f6407 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
4 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
6 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 6 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
7 | import { addMoveToObjectStorageJob } from '@server/lib/video' | 7 | import { buildMoveToObjectStorageJob } from '@server/lib/video' |
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | 8 | import { VideoPathManager } from '@server/lib/video-path-manager' |
9 | import { VideoModel } from '@server/models/video/video' | 9 | import { VideoModel } from '@server/models/video/video' |
10 | import { VideoFileModel } from '@server/models/video/video-file' | 10 | import { VideoFileModel } from '@server/models/video/video-file' |
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils' | |||
13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' | 13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' |
14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' | 14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' |
15 | import { logger } from '../../../helpers/logger' | 15 | import { logger } from '../../../helpers/logger' |
16 | import { JobQueue } from '../job-queue' | ||
16 | 17 | ||
17 | async function processVideoFileImport (job: Job) { | 18 | async function processVideoFileImport (job: Job) { |
18 | const payload = job.data as VideoFileImportPayload | 19 | const payload = job.data as VideoFileImportPayload |
19 | logger.info('Processing video file import in job %d.', job.id) | 20 | logger.info('Processing video file import in job %s.', job.id) |
20 | 21 | ||
21 | const video = await VideoModel.loadFull(payload.videoUUID) | 22 | const video = await VideoModel.loadFull(payload.videoUUID) |
22 | // No video, maybe deleted? | 23 | // No video, maybe deleted? |
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) { | |||
28 | await updateVideoFile(video, payload.filePath) | 29 | await updateVideoFile(video, payload.filePath) |
29 | 30 | ||
30 | if (CONFIG.OBJECT_STORAGE.ENABLED) { | 31 | if (CONFIG.OBJECT_STORAGE.ENABLED) { |
31 | await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) | 32 | await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state })) |
32 | } else { | 33 | } else { |
33 | await federateVideoIfNeeded(video, false) | 34 | await federateVideoIfNeeded(video, false) |
34 | } | 35 | } |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index e5cd35865..f4629159c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
8 | import { Hooks } from '@server/lib/plugins/hooks' | 8 | import { Hooks } from '@server/lib/plugins/hooks' |
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | 9 | import { ServerConfigManager } from '@server/lib/server-config-manager' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video' |
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 12 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 13 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 14 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail' | |||
39 | import { federateVideoIfNeeded } from '../../activitypub/videos' | 39 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
40 | import { Notifier } from '../../notifier' | 40 | import { Notifier } from '../../notifier' |
41 | import { generateVideoMiniature } from '../../thumbnail' | 41 | import { generateVideoMiniature } from '../../thumbnail' |
42 | import { JobQueue } from '../job-queue' | ||
42 | 43 | ||
43 | async function processVideoImport (job: Job) { | 44 | async function processVideoImport (job: Job) { |
44 | const payload = job.data as VideoImportPayload | 45 | const payload = job.data as VideoImportPayload |
@@ -65,7 +66,7 @@ export { | |||
65 | // --------------------------------------------------------------------------- | 66 | // --------------------------------------------------------------------------- |
66 | 67 | ||
67 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { | 68 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { |
68 | logger.info('Processing torrent video import in job %d.', job.id) | 69 | logger.info('Processing torrent video import in job %s.', job.id) |
69 | 70 | ||
70 | const options = { type: payload.type, videoImportId: payload.videoImportId } | 71 | const options = { type: payload.type, videoImportId: payload.videoImportId } |
71 | 72 | ||
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, | |||
77 | } | 78 | } |
78 | 79 | ||
79 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { | 80 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { |
80 | logger.info('Processing youtubeDL video import in job %d.', job.id) | 81 | logger.info('Processing youtubeDL video import in job %s.', job.id) |
81 | 82 | ||
82 | const options = { type: payload.type, videoImportId: videoImport.id } | 83 | const options = { type: payload.type, videoImportId: videoImport.id } |
83 | 84 | ||
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
259 | } | 260 | } |
260 | 261 | ||
261 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | 262 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { |
262 | return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | 263 | await JobQueue.Instance.createJob( |
264 | await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | ||
265 | ) | ||
263 | } | 266 | } |
264 | 267 | ||
265 | // Create transcoding jobs? | 268 | // Create transcoding jobs? |
266 | if (video.state === VideoState.TO_TRANSCODE) { | 269 | if (video.state === VideoState.TO_TRANSCODE) { |
267 | await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | 270 | await JobQueue.Instance.createJob( |
271 | await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | ||
272 | ) | ||
268 | } | 273 | } |
269 | 274 | ||
270 | } catch (err) { | 275 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 75ab2cd02..bac99fdb7 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger' | |||
5 | 5 | ||
6 | async function processVideoRedundancy (job: Job) { | 6 | async function processVideoRedundancy (job: Job) { |
7 | const payload = job.data as VideoRedundancyPayload | 7 | const payload = job.data as VideoRedundancyPayload |
8 | logger.info('Processing video redundancy in job %d.', job.id) | 8 | logger.info('Processing video redundancy in job %s.', job.id) |
9 | 9 | ||
10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | 10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) |
11 | } | 11 | } |
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 078243538..23f9a34cc 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts | |||
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | |||
8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | 9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' |
12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' | 12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' |
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | 13 | import { VideoPathManager } from '@server/lib/video-path-manager' |
14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' | 14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' |
@@ -36,6 +36,7 @@ import { | |||
36 | VideoStudioTaskWatermarkPayload | 36 | VideoStudioTaskWatermarkPayload |
37 | } from '@shared/models' | 37 | } from '@shared/models' |
38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
39 | import { JobQueue } from '../job-queue' | ||
39 | 40 | ||
40 | const lTagsBase = loggerTagsFactory('video-edition') | 41 | const lTagsBase = loggerTagsFactory('video-edition') |
41 | 42 | ||
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) { | |||
43 | const payload = job.data as VideoStudioEditionPayload | 44 | const payload = job.data as VideoStudioEditionPayload |
44 | const lTags = lTagsBase(payload.videoUUID) | 45 | const lTags = lTagsBase(payload.videoUUID) |
45 | 46 | ||
46 | logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) | 47 | logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags) |
47 | 48 | ||
48 | const video = await VideoModel.loadFull(payload.videoUUID) | 49 | const video = await VideoModel.loadFull(payload.videoUUID) |
49 | 50 | ||
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) { | |||
100 | await federateVideoIfNeeded(video, false, undefined) | 101 | await federateVideoIfNeeded(video, false, undefined) |
101 | 102 | ||
102 | const user = await UserModel.loadByVideoId(video.id) | 103 | const user = await UserModel.loadByVideoId(video.id) |
103 | await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | 104 | |
105 | await JobQueue.Instance.createJob( | ||
106 | await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | ||
107 | ) | ||
104 | } | 108 | } |
105 | 109 | ||
106 | // --------------------------------------------------------------------------- | 110 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0cf5d53ce..50d732beb 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,6 @@ | |||
1 | import { | 1 | import { |
2 | FlowJob, | ||
3 | FlowProducer, | ||
2 | Job, | 4 | Job, |
3 | JobsOptions, | 5 | JobsOptions, |
4 | Queue, | 6 | Queue, |
@@ -13,7 +15,7 @@ import { | |||
13 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 15 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
14 | import { CONFIG } from '@server/initializers/config' | 16 | import { CONFIG } from '@server/initializers/config' |
15 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
16 | import { timeoutPromise } from '@shared/core-utils' | 18 | import { pick, timeoutPromise } from '@shared/core-utils' |
17 | import { | 19 | import { |
18 | ActivitypubFollowPayload, | 20 | ActivitypubFollowPayload, |
19 | ActivitypubHttpBroadcastPayload, | 21 | ActivitypubHttpBroadcastPayload, |
@@ -22,10 +24,12 @@ import { | |||
22 | ActorKeysPayload, | 24 | ActorKeysPayload, |
23 | DeleteResumableUploadMetaFilePayload, | 25 | DeleteResumableUploadMetaFilePayload, |
24 | EmailPayload, | 26 | EmailPayload, |
27 | FederateVideoPayload, | ||
25 | JobState, | 28 | JobState, |
26 | JobType, | 29 | JobType, |
27 | ManageVideoTorrentPayload, | 30 | ManageVideoTorrentPayload, |
28 | MoveObjectStoragePayload, | 31 | MoveObjectStoragePayload, |
32 | NotifyPayload, | ||
29 | RefreshPayload, | 33 | RefreshPayload, |
30 | VideoFileImportPayload, | 34 | VideoFileImportPayload, |
31 | VideoImportPayload, | 35 | VideoImportPayload, |
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica | |||
45 | import { refreshAPObject } from './handlers/activitypub-refresher' | 49 | import { refreshAPObject } from './handlers/activitypub-refresher' |
46 | import { processActorKeys } from './handlers/actor-keys' | 50 | import { processActorKeys } from './handlers/actor-keys' |
47 | import { processEmail } from './handlers/email' | 51 | import { processEmail } from './handlers/email' |
52 | import { processFederateVideo } from './handlers/federate-video' | ||
48 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 53 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
49 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 54 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
55 | import { processNotify } from './handlers/notify' | ||
50 | import { processVideoFileImport } from './handlers/video-file-import' | 56 | import { processVideoFileImport } from './handlers/video-file-import' |
51 | import { processVideoImport } from './handlers/video-import' | 57 | import { processVideoImport } from './handlers/video-import' |
52 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 58 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition' | |||
54 | import { processVideoTranscoding } from './handlers/video-transcoding' | 60 | import { processVideoTranscoding } from './handlers/video-transcoding' |
55 | import { processVideosViewsStats } from './handlers/video-views-stats' | 61 | import { processVideosViewsStats } from './handlers/video-views-stats' |
56 | 62 | ||
57 | type CreateJobArgument = | 63 | export type CreateJobArgument = |
58 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 64 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
59 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | 65 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
60 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 66 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
@@ -73,7 +79,9 @@ type CreateJobArgument = | |||
73 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | 79 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
74 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | 80 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
75 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | 81 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
76 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | 82 | { type: 'notify', payload: NotifyPayload } | |
83 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
84 | { type: 'federate-video', payload: FederateVideoPayload } | ||
77 | 85 | ||
78 | export type CreateJobOptions = { | 86 | export type CreateJobOptions = { |
79 | delay?: number | 87 | delay?: number |
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
98 | 'video-redundancy': processVideoRedundancy, | 106 | 'video-redundancy': processVideoRedundancy, |
99 | 'move-to-object-storage': processMoveToObjectStorage, | 107 | 'move-to-object-storage': processMoveToObjectStorage, |
100 | 'manage-video-torrent': processManageVideoTorrent, | 108 | 'manage-video-torrent': processManageVideoTorrent, |
101 | 'video-studio-edition': processVideoStudioEdition | 109 | 'notify': processNotify, |
110 | 'video-studio-edition': processVideoStudioEdition, | ||
111 | 'federate-video': processFederateVideo | ||
102 | } | 112 | } |
103 | 113 | ||
104 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | 114 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [ | |||
123 | 'video-live-ending', | 133 | 'video-live-ending', |
124 | 'move-to-object-storage', | 134 | 'move-to-object-storage', |
125 | 'manage-video-torrent', | 135 | 'manage-video-torrent', |
126 | 'video-studio-edition' | 136 | 'video-studio-edition', |
137 | 'notify', | ||
138 | 'federate-video' | ||
127 | ] | 139 | ] |
128 | 140 | ||
129 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | 141 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
@@ -137,6 +149,8 @@ class JobQueue { | |||
137 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | 149 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} |
138 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | 150 | private queueEvents: { [id in JobType]?: QueueEvents } = {} |
139 | 151 | ||
152 | private flowProducer: FlowProducer | ||
153 | |||
140 | private initialized = false | 154 | private initialized = false |
141 | private jobRedisPrefix: string | 155 | private jobRedisPrefix: string |
142 | 156 | ||
@@ -157,6 +171,11 @@ class JobQueue { | |||
157 | this.buildQueueEvent(handlerName, produceOnly) | 171 | this.buildQueueEvent(handlerName, produceOnly) |
158 | } | 172 | } |
159 | 173 | ||
174 | this.flowProducer = new FlowProducer({ | ||
175 | connection: this.getRedisConnection(), | ||
176 | prefix: this.jobRedisPrefix | ||
177 | }) | ||
178 | |||
160 | this.addRepeatableJobs() | 179 | this.addRepeatableJobs() |
161 | } | 180 | } |
162 | 181 | ||
@@ -243,6 +262,8 @@ class JobQueue { | |||
243 | } | 262 | } |
244 | } | 263 | } |
245 | 264 | ||
265 | // --------------------------------------------------------------------------- | ||
266 | |||
246 | async terminate () { | 267 | async terminate () { |
247 | const promises = Object.keys(this.workers) | 268 | const promises = Object.keys(this.workers) |
248 | .map(handlerName => { | 269 | .map(handlerName => { |
@@ -278,28 +299,56 @@ class JobQueue { | |||
278 | } | 299 | } |
279 | } | 300 | } |
280 | 301 | ||
281 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { | 302 | // --------------------------------------------------------------------------- |
282 | this.createJobWithPromise(obj, options) | 303 | |
283 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 304 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { |
305 | this.createJob(options) | ||
306 | .catch(err => logger.error('Cannot create job.', { err, options })) | ||
284 | } | 307 | } |
285 | 308 | ||
286 | async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | 309 | async createJob (options: CreateJobArgument & CreateJobOptions) { |
287 | const queue: Queue = this.queues[obj.type] | 310 | const queue: Queue = this.queues[options.type] |
288 | if (queue === undefined) { | 311 | if (queue === undefined) { |
289 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 312 | logger.error('Unknown queue %s: cannot create job.', options.type) |
290 | return | 313 | return |
291 | } | 314 | } |
292 | 315 | ||
293 | const jobArgs: JobsOptions = { | 316 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) |
317 | |||
318 | return queue.add('job', options.payload, jobOptions) | ||
319 | } | ||
320 | |||
321 | async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | ||
322 | let lastJob: FlowJob | ||
323 | |||
324 | for (const job of jobs) { | ||
325 | if (!job) continue | ||
326 | |||
327 | lastJob = { | ||
328 | name: 'job', | ||
329 | data: job.payload, | ||
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | ||
333 | ? [ lastJob ] | ||
334 | : [] | ||
335 | } | ||
336 | } | ||
337 | |||
338 | return this.flowProducer.add(lastJob) | ||
339 | } | ||
340 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | ||
342 | return { | ||
294 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 343 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
295 | attempts: JOB_ATTEMPTS[obj.type], | 344 | attempts: JOB_ATTEMPTS[type], |
296 | priority: options.priority, | 345 | priority: options.priority, |
297 | delay: options.delay | 346 | delay: options.delay |
298 | } | 347 | } |
299 | |||
300 | return queue.add('job', obj.payload, jobArgs) | ||
301 | } | 348 | } |
302 | 349 | ||
350 | // --------------------------------------------------------------------------- | ||
351 | |||
303 | async listForApi (options: { | 352 | async listForApi (options: { |
304 | state?: JobState | 353 | state?: JobState |
305 | start: number | 354 | start: number |
@@ -367,6 +416,8 @@ class JobQueue { | |||
367 | return Promise.all(promises) | 416 | return Promise.all(promises) |
368 | } | 417 | } |
369 | 418 | ||
419 | // --------------------------------------------------------------------------- | ||
420 | |||
370 | async removeOldJobs () { | 421 | async removeOldJobs () { |
371 | for (const key of Object.keys(this.queues)) { | 422 | for (const key of Object.keys(this.queues)) { |
372 | const queue: Queue = this.queues[key] | 423 | const queue: Queue = this.queues[key] |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1410889a2..aadd8e308 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -408,7 +408,7 @@ class LiveManager { | |||
408 | await liveSession.save() | 408 | await liveSession.save() |
409 | } | 409 | } |
410 | 410 | ||
411 | JobQueue.Instance.createJob({ | 411 | JobQueue.Instance.createJobAsync({ |
412 | type: 'video-live-ending', | 412 | type: 'video-live-ending', |
413 | payload: { | 413 | payload: { |
414 | videoId: fullVideo.id, | 414 | videoId: fullVideo.id, |
@@ -421,8 +421,12 @@ class LiveManager { | |||
421 | streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, | 421 | streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, |
422 | 422 | ||
423 | publishedAt: fullVideo.publishedAt.toISOString() | 423 | publishedAt: fullVideo.publishedAt.toISOString() |
424 | } | 424 | }, |
425 | }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) | 425 | |
426 | delay: cleanupNow | ||
427 | ? 0 | ||
428 | : VIDEO_LIVE.CLEANUP_DELAY | ||
429 | }) | ||
426 | 430 | ||
427 | fullVideo.state = live.permanentLive | 431 | fullVideo.state = live.permanentLive |
428 | ? VideoState.WAITING_FOR_LIVE | 432 | ? VideoState.WAITING_FOR_LIVE |
diff --git a/server/lib/notifier/notifier.ts b/server/lib/notifier/notifier.ts index d1c4c0215..66cfc31c4 100644 --- a/server/lib/notifier/notifier.ts +++ b/server/lib/notifier/notifier.ts | |||
@@ -242,7 +242,7 @@ class Notifier { | |||
242 | 242 | ||
243 | for (const to of toEmails) { | 243 | for (const to of toEmails) { |
244 | const payload = await object.createEmail(to) | 244 | const payload = await object.createEmail(to) |
245 | JobQueue.Instance.createJob({ type: 'email', payload }) | 245 | JobQueue.Instance.createJobAsync({ type: 'email', payload }) |
246 | } | 246 | } |
247 | } | 247 | } |
248 | 248 | ||
diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts index d9f9c2de3..956ece749 100644 --- a/server/lib/schedulers/auto-follow-index-instances.ts +++ b/server/lib/schedulers/auto-follow-index-instances.ts | |||
@@ -59,7 +59,7 @@ export class AutoFollowIndexInstances extends AbstractScheduler { | |||
59 | isAutoFollow: true | 59 | isAutoFollow: true |
60 | } | 60 | } |
61 | 61 | ||
62 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 62 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
63 | } | 63 | } |
64 | } | 64 | } |
65 | 65 | ||
diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts index b5d8353b7..9ebbd7679 100644 --- a/server/lib/video-state.ts +++ b/server/lib/video-state.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
2 | import { logger } from '@server/helpers/logger' | 3 | import { logger } from '@server/helpers/logger' |
3 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
4 | import { sequelizeTypescript } from '@server/initializers/database' | 5 | import { sequelizeTypescript } from '@server/initializers/database' |
@@ -7,9 +8,9 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
7 | import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' | 8 | import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' |
8 | import { VideoState } from '@shared/models' | 9 | import { VideoState } from '@shared/models' |
9 | import { federateVideoIfNeeded } from './activitypub/videos' | 10 | import { federateVideoIfNeeded } from './activitypub/videos' |
11 | import { JobQueue } from './job-queue' | ||
10 | import { Notifier } from './notifier' | 12 | import { Notifier } from './notifier' |
11 | import { addMoveToObjectStorageJob } from './video' | 13 | import { buildMoveToObjectStorageJob } from './video' |
12 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
13 | 14 | ||
14 | function buildNextVideoState (currentState?: VideoState) { | 15 | function buildNextVideoState (currentState?: VideoState) { |
15 | if (currentState === VideoState.PUBLISHED) { | 16 | if (currentState === VideoState.PUBLISHED) { |
@@ -86,7 +87,7 @@ async function moveToExternalStorageState (options: { | |||
86 | logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) | 87 | logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) |
87 | 88 | ||
88 | try { | 89 | try { |
89 | await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }) | 90 | await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo })) |
90 | 91 | ||
91 | return true | 92 | return true |
92 | } catch (err) { | 93 | } catch (err) { |
diff --git a/server/lib/video.ts b/server/lib/video.ts index b843b11bc..f7d7aa186 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts | |||
@@ -1,5 +1,7 @@ | |||
1 | import { UploadFiles } from 'express' | 1 | import { UploadFiles } from 'express' |
2 | import memoizee from 'memoizee' | ||
2 | import { Transaction } from 'sequelize/types' | 3 | import { Transaction } from 'sequelize/types' |
4 | import { CONFIG } from '@server/initializers/config' | ||
3 | import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' | 5 | import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' |
4 | import { TagModel } from '@server/models/video/tag' | 6 | import { TagModel } from '@server/models/video/tag' |
5 | import { VideoModel } from '@server/models/video/video' | 7 | import { VideoModel } from '@server/models/video/video' |
@@ -9,8 +11,6 @@ import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID | |||
9 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' | 11 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' |
10 | import { CreateJobOptions, JobQueue } from './job-queue/job-queue' | 12 | import { CreateJobOptions, JobQueue } from './job-queue/job-queue' |
11 | import { updateVideoMiniatureFromExisting } from './thumbnail' | 13 | import { updateVideoMiniatureFromExisting } from './thumbnail' |
12 | import { CONFIG } from '@server/initializers/config' | ||
13 | import memoizee from 'memoizee' | ||
14 | 14 | ||
15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { | 15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { |
16 | return { | 16 | return { |
@@ -86,7 +86,7 @@ async function setVideoTags (options: { | |||
86 | 86 | ||
87 | // --------------------------------------------------------------------------- | 87 | // --------------------------------------------------------------------------- |
88 | 88 | ||
89 | async function addOptimizeOrMergeAudioJob (options: { | 89 | async function buildOptimizeOrMergeAudioJob (options: { |
90 | video: MVideoUUID | 90 | video: MVideoUUID |
91 | videoFile: MVideoFile | 91 | videoFile: MVideoFile |
92 | user: MUserId | 92 | user: MUserId |
@@ -94,10 +94,10 @@ async function addOptimizeOrMergeAudioJob (options: { | |||
94 | }) { | 94 | }) { |
95 | const { video, videoFile, user, isNewVideo } = options | 95 | const { video, videoFile, user, isNewVideo } = options |
96 | 96 | ||
97 | let dataInput: VideoTranscodingPayload | 97 | let payload: VideoTranscodingPayload |
98 | 98 | ||
99 | if (videoFile.isAudio()) { | 99 | if (videoFile.isAudio()) { |
100 | dataInput = { | 100 | payload = { |
101 | type: 'merge-audio-to-webtorrent', | 101 | type: 'merge-audio-to-webtorrent', |
102 | resolution: DEFAULT_AUDIO_RESOLUTION, | 102 | resolution: DEFAULT_AUDIO_RESOLUTION, |
103 | videoUUID: video.uuid, | 103 | videoUUID: video.uuid, |
@@ -105,24 +105,26 @@ async function addOptimizeOrMergeAudioJob (options: { | |||
105 | isNewVideo | 105 | isNewVideo |
106 | } | 106 | } |
107 | } else { | 107 | } else { |
108 | dataInput = { | 108 | payload = { |
109 | type: 'optimize-to-webtorrent', | 109 | type: 'optimize-to-webtorrent', |
110 | videoUUID: video.uuid, | 110 | videoUUID: video.uuid, |
111 | isNewVideo | 111 | isNewVideo |
112 | } | 112 | } |
113 | } | 113 | } |
114 | 114 | ||
115 | const jobOptions = { | 115 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') |
116 | priority: await getTranscodingJobPriority(user) | ||
117 | } | ||
118 | 116 | ||
119 | return addTranscodingJob(dataInput, jobOptions) | 117 | return { |
118 | type: 'video-transcoding' as 'video-transcoding', | ||
119 | priority: await getTranscodingJobPriority(user), | ||
120 | payload | ||
121 | } | ||
120 | } | 122 | } |
121 | 123 | ||
122 | async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { | 124 | async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { |
123 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | 125 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') |
124 | 126 | ||
125 | return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options) | 127 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) |
126 | } | 128 | } |
127 | 129 | ||
128 | async function getTranscodingJobPriority (user: MUserId) { | 130 | async function getTranscodingJobPriority (user: MUserId) { |
@@ -136,7 +138,7 @@ async function getTranscodingJobPriority (user: MUserId) { | |||
136 | 138 | ||
137 | // --------------------------------------------------------------------------- | 139 | // --------------------------------------------------------------------------- |
138 | 140 | ||
139 | async function addMoveToObjectStorageJob (options: { | 141 | async function buildMoveToObjectStorageJob (options: { |
140 | video: MVideoUUID | 142 | video: MVideoUUID |
141 | previousVideoState: VideoState | 143 | previousVideoState: VideoState |
142 | isNewVideo?: boolean // Default true | 144 | isNewVideo?: boolean // Default true |
@@ -145,8 +147,14 @@ async function addMoveToObjectStorageJob (options: { | |||
145 | 147 | ||
146 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') | 148 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') |
147 | 149 | ||
148 | const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState } | 150 | return { |
149 | return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput }) | 151 | type: 'move-to-object-storage' as 'move-to-object-storage', |
152 | payload: { | ||
153 | videoUUID: video.uuid, | ||
154 | isNewVideo, | ||
155 | previousVideoState | ||
156 | } | ||
157 | } | ||
150 | } | 158 | } |
151 | 159 | ||
152 | // --------------------------------------------------------------------------- | 160 | // --------------------------------------------------------------------------- |
@@ -173,9 +181,9 @@ export { | |||
173 | buildLocalVideoFromReq, | 181 | buildLocalVideoFromReq, |
174 | buildVideoThumbnailsFromReq, | 182 | buildVideoThumbnailsFromReq, |
175 | setVideoTags, | 183 | setVideoTags, |
176 | addOptimizeOrMergeAudioJob, | 184 | buildOptimizeOrMergeAudioJob, |
177 | addTranscodingJob, | 185 | addTranscodingJob, |
178 | addMoveToObjectStorageJob, | 186 | buildMoveToObjectStorageJob, |
179 | getTranscodingJobPriority, | 187 | getTranscodingJobPriority, |
180 | getCachedVideoDuration | 188 | getCachedVideoDuration |
181 | } | 189 | } |