aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-08 15:48:17 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:18:07 +0200
commitbd911b54b555b11df7e9849cf92d358bccfecf6e (patch)
tree23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server/lib/job-queue
parent5a921e7b74910414626bfc9672b857e987e3ebed (diff)
downloadPeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst
PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip
Use bullmq job dependency
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts2
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts2
-rw-r--r--server/lib/job-queue/handlers/email.ts2
-rw-r--r--server/lib/job-queue/handlers/federate-video.ts28
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts2
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts4
-rw-r--r--server/lib/job-queue/handlers/notify.ts27
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts7
-rw-r--r--server/lib/job-queue/handlers/video-import.ts15
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts2
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts10
-rw-r--r--server/lib/job-queue/job-queue.ts81
16 files changed, 153 insertions, 37 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 944da5be1..a68c32ba0 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) {
17 const payload = job.data as ActivitypubFollowPayload 17 const payload = job.data as ActivitypubFollowPayload
18 const host = payload.host 18 const host = payload.host
19 19
20 logger.info('Processing ActivityPub follow in job %d.', job.id) 20 logger.info('Processing ActivityPub follow in job %s.', job.id)
21 21
22 let targetActor: MActorFull 22 let targetActor: MActorFull
23 if (!host || host === WEBSERVER.HOST) { 23 if (!host || host === WEBSERVER.HOST) {
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 354c608fb..13eff5211 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests'
8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' 8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
9 9
10async function processActivityPubHttpBroadcast (job: Job) { 10async function processActivityPubHttpBroadcast (job: Job) {
11 logger.info('Processing ActivityPub broadcast in job %d.', job.id) 11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12 12
13 const payload = job.data as ActivitypubHttpBroadcastPayload 13 const payload = job.data as ActivitypubHttpBroadcastPayload
14 14
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index e0b841887..b6cb3c4a6 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share'
12import { addVideoComments } from '../../activitypub/video-comments' 12import { addVideoComments } from '../../activitypub/video-comments'
13 13
14async function processActivityPubHttpFetcher (job: Job) { 14async function processActivityPubHttpFetcher (job: Job) {
15 logger.info('Processing ActivityPub fetcher in job %d.', job.id) 15 logger.info('Processing ActivityPub fetcher in job %s.', job.id)
16 16
17 const payload = job.data as ActivitypubHttpFetcherPayload 17 const payload = job.data as ActivitypubHttpFetcherPayload
18 18
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 837a597a5..9e4e84002 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests'
6import { ActorFollowHealthCache } from '../../actor-follow-health-cache' 6import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
7 7
8async function processActivityPubHttpUnicast (job: Job) { 8async function processActivityPubHttpUnicast (job: Job) {
9 logger.info('Processing ActivityPub unicast in job %d.', job.id) 9 logger.info('Processing ActivityPub unicast in job %s.', job.id)
10 10
11 const payload = job.data as ActivitypubHttpUnicastPayload 11 const payload = job.data as ActivitypubHttpUnicastPayload
12 const uri = payload.uri 12 const uri = payload.uri
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 600f858a0..307e771ff 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors'
11async function refreshAPObject (job: Job) { 11async function refreshAPObject (job: Job) {
12 const payload = job.data as RefreshPayload 12 const payload = job.data as RefreshPayload
13 13
14 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) 14 logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url)
15 15
16 if (payload.type === 'video') return refreshVideo(payload.url) 16 if (payload.type === 'video') return refreshVideo(payload.url)
17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) 17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 4a5bad9fb..27a2d431b 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger'
6 6
7async function processActorKeys (job: Job) { 7async function processActorKeys (job: Job) {
8 const payload = job.data as ActorKeysPayload 8 const payload = job.data as ActorKeysPayload
9 logger.info('Processing actor keys in job %d.', job.id) 9 logger.info('Processing actor keys in job %s.', job.id)
10 10
11 const actor = await ActorModel.load(payload.actorId) 11 const actor = await ActorModel.load(payload.actorId)
12 12
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index b5b9475b1..567bcc076 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer'
5 5
6async function processEmail (job: Job) { 6async function processEmail (job: Job) {
7 const payload = job.data as EmailPayload 7 const payload = job.data as EmailPayload
8 logger.info('Processing email in job %d.', job.id) 8 logger.info('Processing email in job %s.', job.id)
9 9
10 return Emailer.Instance.sendMail(payload) 10 return Emailer.Instance.sendMail(payload)
11} 11}
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts
new file mode 100644
index 000000000..6aac36741
--- /dev/null
+++ b/server/lib/job-queue/handlers/federate-video.ts
@@ -0,0 +1,28 @@
1import { Job } from 'bullmq'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { sequelizeTypescript } from '@server/initializers/database'
4import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
5import { VideoModel } from '@server/models/video/video'
6import { FederateVideoPayload } from '@shared/models'
7import { logger } from '../../../helpers/logger'
8
9function processFederateVideo (job: Job) {
10 const payload = job.data as FederateVideoPayload
11
12 logger.info('Processing video federation in job %s.', job.id)
13
14 return retryTransactionWrapper(() => {
15 return sequelizeTypescript.transaction(async t => {
16 const video = await VideoModel.loadFull(payload.videoUUID, t)
17 if (!video) return
18
19 return federateVideoIfNeeded(video, payload.isNewVideo, t)
20 })
21 })
22}
23
24// ---------------------------------------------------------------------------
25
26export {
27 processFederateVideo
28}
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index 4505ca79e..03aa414c9 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger'
8 8
9async function processManageVideoTorrent (job: Job) { 9async function processManageVideoTorrent (job: Job) {
10 const payload = job.data as ManageVideoTorrentPayload 10 const payload = job.data as ManageVideoTorrentPayload
11 logger.info('Processing torrent in job %d.', job.id) 11 logger.info('Processing torrent in job %s.', job.id)
12 12
13 if (payload.action === 'create') return doCreateAction(payload) 13 if (payload.action === 'create') return doCreateAction(payload)
14 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) 14 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index d608fd865..25bdebeea 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage')
17 17
18export async function processMoveToObjectStorage (job: Job) { 18export async function processMoveToObjectStorage (job: Job) {
19 const payload = job.data as MoveObjectStoragePayload 19 const payload = job.data as MoveObjectStoragePayload
20 logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) 20 logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
21 21
22 const video = await VideoModel.loadWithFiles(payload.videoUUID) 22 const video = await VideoModel.loadWithFiles(payload.videoUUID)
23 // No video, maybe deleted? 23 // No video, maybe deleted?
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) {
43 43
44 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') 44 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
45 if (pendingMove === 0) { 45 if (pendingMove === 0) {
46 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) 46 logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
47 47
48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) 48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
49 } 49 }
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts
new file mode 100644
index 000000000..83605396c
--- /dev/null
+++ b/server/lib/job-queue/handlers/notify.ts
@@ -0,0 +1,27 @@
1import { Job } from 'bullmq'
2import { Notifier } from '@server/lib/notifier'
3import { VideoModel } from '@server/models/video/video'
4import { NotifyPayload } from '@shared/models'
5import { logger } from '../../../helpers/logger'
6
7async function processNotify (job: Job) {
8 const payload = job.data as NotifyPayload
9 logger.info('Processing %s notification in job %s.', payload.action, job.id)
10
11 if (payload.action === 'new-video') return doNotifyNewVideo(payload)
12}
13
14// ---------------------------------------------------------------------------
15
16export {
17 processNotify
18}
19
20// ---------------------------------------------------------------------------
21
22async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) {
23 const refreshedVideo = await VideoModel.loadFull(payload.videoUUID)
24 if (!refreshedVideo) return
25
26 Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
27}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 40c44cf52..d950f6407 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
6import { generateWebTorrentVideoFilename } from '@server/lib/paths' 6import { generateWebTorrentVideoFilename } from '@server/lib/paths'
7import { addMoveToObjectStorageJob } from '@server/lib/video' 7import { buildMoveToObjectStorageJob } from '@server/lib/video'
8import { VideoPathManager } from '@server/lib/video-path-manager' 8import { VideoPathManager } from '@server/lib/video-path-manager'
9import { VideoModel } from '@server/models/video/video' 9import { VideoModel } from '@server/models/video/video'
10import { VideoFileModel } from '@server/models/video/video-file' 10import { VideoFileModel } from '@server/models/video/video-file'
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils'
13import { VideoFileImportPayload, VideoStorage } from '@shared/models' 13import { VideoFileImportPayload, VideoStorage } from '@shared/models'
14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' 14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
15import { logger } from '../../../helpers/logger' 15import { logger } from '../../../helpers/logger'
16import { JobQueue } from '../job-queue'
16 17
17async function processVideoFileImport (job: Job) { 18async function processVideoFileImport (job: Job) {
18 const payload = job.data as VideoFileImportPayload 19 const payload = job.data as VideoFileImportPayload
19 logger.info('Processing video file import in job %d.', job.id) 20 logger.info('Processing video file import in job %s.', job.id)
20 21
21 const video = await VideoModel.loadFull(payload.videoUUID) 22 const video = await VideoModel.loadFull(payload.videoUUID)
22 // No video, maybe deleted? 23 // No video, maybe deleted?
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) {
28 await updateVideoFile(video, payload.filePath) 29 await updateVideoFile(video, payload.filePath)
29 30
30 if (CONFIG.OBJECT_STORAGE.ENABLED) { 31 if (CONFIG.OBJECT_STORAGE.ENABLED) {
31 await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) 32 await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state }))
32 } else { 33 } else {
33 await federateVideoIfNeeded(video, false) 34 await federateVideoIfNeeded(video, false)
34 } 35 }
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index e5cd35865..f4629159c 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Hooks } from '@server/lib/plugins/hooks' 8import { Hooks } from '@server/lib/plugins/hooks'
9import { ServerConfigManager } from '@server/lib/server-config-manager' 9import { ServerConfigManager } from '@server/lib/server-config-manager'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video'
12import { VideoPathManager } from '@server/lib/video-path-manager' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 13import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail'
39import { federateVideoIfNeeded } from '../../activitypub/videos' 39import { federateVideoIfNeeded } from '../../activitypub/videos'
40import { Notifier } from '../../notifier' 40import { Notifier } from '../../notifier'
41import { generateVideoMiniature } from '../../thumbnail' 41import { generateVideoMiniature } from '../../thumbnail'
42import { JobQueue } from '../job-queue'
42 43
43async function processVideoImport (job: Job) { 44async function processVideoImport (job: Job) {
44 const payload = job.data as VideoImportPayload 45 const payload = job.data as VideoImportPayload
@@ -65,7 +66,7 @@ export {
65// --------------------------------------------------------------------------- 66// ---------------------------------------------------------------------------
66 67
67async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { 68async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
68 logger.info('Processing torrent video import in job %d.', job.id) 69 logger.info('Processing torrent video import in job %s.', job.id)
69 70
70 const options = { type: payload.type, videoImportId: payload.videoImportId } 71 const options = { type: payload.type, videoImportId: payload.videoImportId }
71 72
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault,
77} 78}
78 79
79async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { 80async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
80 logger.info('Processing youtubeDL video import in job %d.', job.id) 81 logger.info('Processing youtubeDL video import in job %s.', job.id)
81 82
82 const options = { type: payload.type, videoImportId: videoImport.id } 83 const options = { type: payload.type, videoImportId: videoImport.id }
83 84
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
259 } 260 }
260 261
261 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 262 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
262 return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) 263 await JobQueue.Instance.createJob(
264 await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
265 )
263 } 266 }
264 267
265 // Create transcoding jobs? 268 // Create transcoding jobs?
266 if (video.state === VideoState.TO_TRANSCODE) { 269 if (video.state === VideoState.TO_TRANSCODE) {
267 await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) 270 await JobQueue.Instance.createJob(
271 await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
272 )
268 } 273 }
269 274
270 } catch (err) { 275 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
index 75ab2cd02..bac99fdb7 100644
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ b/server/lib/job-queue/handlers/video-redundancy.ts
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger'
5 5
6async function processVideoRedundancy (job: Job) { 6async function processVideoRedundancy (job: Job) {
7 const payload = job.data as VideoRedundancyPayload 7 const payload = job.data as VideoRedundancyPayload
8 logger.info('Processing video redundancy in job %d.', job.id) 8 logger.info('Processing video redundancy in job %s.', job.id)
9 9
10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) 10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId)
11} 11}
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 078243538..23f9a34cc 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { generateWebTorrentVideoFilename } from '@server/lib/paths' 8import { generateWebTorrentVideoFilename } from '@server/lib/paths'
9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' 9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { buildOptimizeOrMergeAudioJob } from '@server/lib/video'
12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' 12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
13import { VideoPathManager } from '@server/lib/video-path-manager' 13import { VideoPathManager } from '@server/lib/video-path-manager'
14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' 14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
@@ -36,6 +36,7 @@ import {
36 VideoStudioTaskWatermarkPayload 36 VideoStudioTaskWatermarkPayload
37} from '@shared/models' 37} from '@shared/models'
38import { logger, loggerTagsFactory } from '../../../helpers/logger' 38import { logger, loggerTagsFactory } from '../../../helpers/logger'
39import { JobQueue } from '../job-queue'
39 40
40const lTagsBase = loggerTagsFactory('video-edition') 41const lTagsBase = loggerTagsFactory('video-edition')
41 42
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) {
43 const payload = job.data as VideoStudioEditionPayload 44 const payload = job.data as VideoStudioEditionPayload
44 const lTags = lTagsBase(payload.videoUUID) 45 const lTags = lTagsBase(payload.videoUUID)
45 46
46 logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) 47 logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags)
47 48
48 const video = await VideoModel.loadFull(payload.videoUUID) 49 const video = await VideoModel.loadFull(payload.videoUUID)
49 50
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) {
100 await federateVideoIfNeeded(video, false, undefined) 101 await federateVideoIfNeeded(video, false, undefined)
101 102
102 const user = await UserModel.loadByVideoId(video.id) 103 const user = await UserModel.loadByVideoId(video.id)
103 await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) 104
105 await JobQueue.Instance.createJob(
106 await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
107 )
104} 108}
105 109
106// --------------------------------------------------------------------------- 110// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0cf5d53ce..50d732beb 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,6 @@
1import { 1import {
2 FlowJob,
3 FlowProducer,
2 Job, 4 Job,
3 JobsOptions, 5 JobsOptions,
4 Queue, 6 Queue,
@@ -13,7 +15,7 @@ import {
13import { jobStates } from '@server/helpers/custom-validators/jobs' 15import { jobStates } from '@server/helpers/custom-validators/jobs'
14import { CONFIG } from '@server/initializers/config' 16import { CONFIG } from '@server/initializers/config'
15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils' 18import { pick, timeoutPromise } from '@shared/core-utils'
17import { 19import {
18 ActivitypubFollowPayload, 20 ActivitypubFollowPayload,
19 ActivitypubHttpBroadcastPayload, 21 ActivitypubHttpBroadcastPayload,
@@ -22,10 +24,12 @@ import {
22 ActorKeysPayload, 24 ActorKeysPayload,
23 DeleteResumableUploadMetaFilePayload, 25 DeleteResumableUploadMetaFilePayload,
24 EmailPayload, 26 EmailPayload,
27 FederateVideoPayload,
25 JobState, 28 JobState,
26 JobType, 29 JobType,
27 ManageVideoTorrentPayload, 30 ManageVideoTorrentPayload,
28 MoveObjectStoragePayload, 31 MoveObjectStoragePayload,
32 NotifyPayload,
29 RefreshPayload, 33 RefreshPayload,
30 VideoFileImportPayload, 34 VideoFileImportPayload,
31 VideoImportPayload, 35 VideoImportPayload,
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
45import { refreshAPObject } from './handlers/activitypub-refresher' 49import { refreshAPObject } from './handlers/activitypub-refresher'
46import { processActorKeys } from './handlers/actor-keys' 50import { processActorKeys } from './handlers/actor-keys'
47import { processEmail } from './handlers/email' 51import { processEmail } from './handlers/email'
52import { processFederateVideo } from './handlers/federate-video'
48import { processManageVideoTorrent } from './handlers/manage-video-torrent' 53import { processManageVideoTorrent } from './handlers/manage-video-torrent'
49import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' 54import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
55import { processNotify } from './handlers/notify'
50import { processVideoFileImport } from './handlers/video-file-import' 56import { processVideoFileImport } from './handlers/video-file-import'
51import { processVideoImport } from './handlers/video-import' 57import { processVideoImport } from './handlers/video-import'
52import { processVideoLiveEnding } from './handlers/video-live-ending' 58import { processVideoLiveEnding } from './handlers/video-live-ending'
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
54import { processVideoTranscoding } from './handlers/video-transcoding' 60import { processVideoTranscoding } from './handlers/video-transcoding'
55import { processVideosViewsStats } from './handlers/video-views-stats' 61import { processVideosViewsStats } from './handlers/video-views-stats'
56 62
57type CreateJobArgument = 63export type CreateJobArgument =
58 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 64 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
59 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | 65 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
60 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 66 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
@@ -73,7 +79,9 @@ type CreateJobArgument =
73 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | 79 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
74 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | 80 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
75 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | 81 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
76 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } 82 { type: 'notify', payload: NotifyPayload } |
83 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
84 { type: 'federate-video', payload: FederateVideoPayload }
77 85
78export type CreateJobOptions = { 86export type CreateJobOptions = {
79 delay?: number 87 delay?: number
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
98 'video-redundancy': processVideoRedundancy, 106 'video-redundancy': processVideoRedundancy,
99 'move-to-object-storage': processMoveToObjectStorage, 107 'move-to-object-storage': processMoveToObjectStorage,
100 'manage-video-torrent': processManageVideoTorrent, 108 'manage-video-torrent': processManageVideoTorrent,
101 'video-studio-edition': processVideoStudioEdition 109 'notify': processNotify,
110 'video-studio-edition': processVideoStudioEdition,
111 'federate-video': processFederateVideo
102} 112}
103 113
104const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { 114const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
123 'video-live-ending', 133 'video-live-ending',
124 'move-to-object-storage', 134 'move-to-object-storage',
125 'manage-video-torrent', 135 'manage-video-torrent',
126 'video-studio-edition' 136 'video-studio-edition',
137 'notify',
138 'federate-video'
127] 139]
128 140
129const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) 141const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
@@ -137,6 +149,8 @@ class JobQueue {
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} 149 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {} 150 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139 151
152 private flowProducer: FlowProducer
153
140 private initialized = false 154 private initialized = false
141 private jobRedisPrefix: string 155 private jobRedisPrefix: string
142 156
@@ -157,6 +171,11 @@ class JobQueue {
157 this.buildQueueEvent(handlerName, produceOnly) 171 this.buildQueueEvent(handlerName, produceOnly)
158 } 172 }
159 173
174 this.flowProducer = new FlowProducer({
175 connection: this.getRedisConnection(),
176 prefix: this.jobRedisPrefix
177 })
178
160 this.addRepeatableJobs() 179 this.addRepeatableJobs()
161 } 180 }
162 181
@@ -243,6 +262,8 @@ class JobQueue {
243 } 262 }
244 } 263 }
245 264
265 // ---------------------------------------------------------------------------
266
246 async terminate () { 267 async terminate () {
247 const promises = Object.keys(this.workers) 268 const promises = Object.keys(this.workers)
248 .map(handlerName => { 269 .map(handlerName => {
@@ -278,28 +299,56 @@ class JobQueue {
278 } 299 }
279 } 300 }
280 301
281 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { 302 // ---------------------------------------------------------------------------
282 this.createJobWithPromise(obj, options) 303
283 .catch(err => logger.error('Cannot create job.', { err, obj })) 304 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
305 this.createJob(options)
306 .catch(err => logger.error('Cannot create job.', { err, options }))
284 } 307 }
285 308
286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 309 async createJob (options: CreateJobArgument & CreateJobOptions) {
287 const queue: Queue = this.queues[obj.type] 310 const queue: Queue = this.queues[options.type]
288 if (queue === undefined) { 311 if (queue === undefined) {
289 logger.error('Unknown queue %s: cannot create job.', obj.type) 312 logger.error('Unknown queue %s: cannot create job.', options.type)
290 return 313 return
291 } 314 }
292 315
293 const jobArgs: JobsOptions = { 316 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
317
318 return queue.add('job', options.payload, jobOptions)
319 }
320
321 async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
322 let lastJob: FlowJob
323
324 for (const job of jobs) {
325 if (!job) continue
326
327 lastJob = {
328 name: 'job',
329 data: job.payload,
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob
333 ? [ lastJob ]
334 : []
335 }
336 }
337
338 return this.flowProducer.add(lastJob)
339 }
340
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return {
294 backoff: { delay: 60 * 1000, type: 'exponential' }, 343 backoff: { delay: 60 * 1000, type: 'exponential' },
295 attempts: JOB_ATTEMPTS[obj.type], 344 attempts: JOB_ATTEMPTS[type],
296 priority: options.priority, 345 priority: options.priority,
297 delay: options.delay 346 delay: options.delay
298 } 347 }
299
300 return queue.add('job', obj.payload, jobArgs)
301 } 348 }
302 349
350 // ---------------------------------------------------------------------------
351
303 async listForApi (options: { 352 async listForApi (options: {
304 state?: JobState 353 state?: JobState
305 start: number 354 start: number
@@ -367,6 +416,8 @@ class JobQueue {
367 return Promise.all(promises) 416 return Promise.all(promises)
368 } 417 }
369 418
419 // ---------------------------------------------------------------------------
420
370 async removeOldJobs () { 421 async removeOldJobs () {
371 for (const key of Object.keys(this.queues)) { 422 for (const key of Object.keys(this.queues)) {
372 const queue: Queue = this.queues[key] 423 const queue: Queue = this.queues[key]