aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/actors/get.ts4
-rw-r--r--server/lib/activitypub/follow.ts2
-rw-r--r--server/lib/activitypub/outbox.ts2
-rw-r--r--server/lib/activitypub/playlists/refresh.ts2
-rw-r--r--server/lib/activitypub/send/shared/send-utils.ts8
-rw-r--r--server/lib/activitypub/videos/get.ts2
-rw-r--r--server/lib/activitypub/videos/shared/video-sync-attributes.ts2
-rw-r--r--server/lib/emailer.ts10
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts2
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts2
-rw-r--r--server/lib/job-queue/handlers/email.ts2
-rw-r--r--server/lib/job-queue/handlers/federate-video.ts28
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts2
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts4
-rw-r--r--server/lib/job-queue/handlers/notify.ts27
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts7
-rw-r--r--server/lib/job-queue/handlers/video-import.ts15
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts2
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts10
-rw-r--r--server/lib/job-queue/job-queue.ts81
-rw-r--r--server/lib/live/live-manager.ts10
-rw-r--r--server/lib/notifier/notifier.ts2
-rw-r--r--server/lib/schedulers/auto-follow-index-instances.ts2
-rw-r--r--server/lib/video-state.ts7
-rw-r--r--server/lib/video.ts40
29 files changed, 206 insertions, 77 deletions
diff --git a/server/lib/activitypub/actors/get.ts b/server/lib/activitypub/actors/get.ts
index d2b651082..e73b7d707 100644
--- a/server/lib/activitypub/actors/get.ts
+++ b/server/lib/activitypub/actors/get.ts
@@ -110,7 +110,7 @@ async function loadActorFromDB (actorUrl: string, fetchType: ActorLoadByUrlType)
110async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) { 110async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) {
111 if ((created === true || refreshed === true) && updateCollections === true) { 111 if ((created === true || refreshed === true) && updateCollections === true) {
112 const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' } 112 const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' }
113 await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) 113 await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
114 } 114 }
115} 115}
116 116
@@ -118,6 +118,6 @@ async function schedulePlaylistFetchIfNeeded (actor: MActorAccountId, created: b
118 // We created a new account: fetch the playlists 118 // We created a new account: fetch the playlists
119 if (created === true && actor.Account && accountPlaylistsUrl) { 119 if (created === true && actor.Account && accountPlaylistsUrl) {
120 const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' } 120 const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' }
121 await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) 121 await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
122 } 122 }
123} 123}
diff --git a/server/lib/activitypub/follow.ts b/server/lib/activitypub/follow.ts
index 741b54df5..f6e2a48fd 100644
--- a/server/lib/activitypub/follow.ts
+++ b/server/lib/activitypub/follow.ts
@@ -27,7 +27,7 @@ async function autoFollowBackIfNeeded (actorFollow: MActorFollowActors, transact
27 isAutoFollow: true 27 isAutoFollow: true
28 } 28 }
29 29
30 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 30 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
31 } 31 }
32} 32}
33 33
diff --git a/server/lib/activitypub/outbox.ts b/server/lib/activitypub/outbox.ts
index ecdc33a77..5eef76871 100644
--- a/server/lib/activitypub/outbox.ts
+++ b/server/lib/activitypub/outbox.ts
@@ -16,7 +16,7 @@ async function addFetchOutboxJob (actor: Pick<ActorModel, 'id' | 'outboxUrl'>) {
16 type: 'activity' as 'activity' 16 type: 'activity' as 'activity'
17 } 17 }
18 18
19 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) 19 return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload })
20} 20}
21 21
22export { 22export {
diff --git a/server/lib/activitypub/playlists/refresh.ts b/server/lib/activitypub/playlists/refresh.ts
index 493e8c7ec..33260ea02 100644
--- a/server/lib/activitypub/playlists/refresh.ts
+++ b/server/lib/activitypub/playlists/refresh.ts
@@ -9,7 +9,7 @@ import { fetchRemoteVideoPlaylist } from './shared'
9function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { 9function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) {
10 if (!playlist.isOutdated()) return 10 if (!playlist.isOutdated()) return
11 11
12 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) 12 JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } })
13} 13}
14 14
15async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> { 15async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> {
diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts
index fcec63991..2bc1ef8f5 100644
--- a/server/lib/activitypub/send/shared/send-utils.ts
+++ b/server/lib/activitypub/send/shared/send-utils.ts
@@ -120,7 +120,7 @@ async function forwardActivity (
120 body: activity, 120 body: activity,
121 contextType: null 121 contextType: null
122 } 122 }
123 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })) 123 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload }))
124} 124}
125 125
126// --------------------------------------------------------------------------- 126// ---------------------------------------------------------------------------
@@ -205,7 +205,7 @@ function broadcastTo (options: {
205 contextType 205 contextType
206 } 206 }
207 207
208 JobQueue.Instance.createJob({ 208 JobQueue.Instance.createJobAsync({
209 type: parallelizable 209 type: parallelizable
210 ? 'activitypub-http-broadcast-parallel' 210 ? 'activitypub-http-broadcast-parallel'
211 : 'activitypub-http-broadcast', 211 : 'activitypub-http-broadcast',
@@ -222,7 +222,7 @@ function broadcastTo (options: {
222 contextType 222 contextType
223 } 223 }
224 224
225 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) 225 JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
226 } 226 }
227} 227}
228 228
@@ -243,7 +243,7 @@ function unicastTo (options: {
243 contextType 243 contextType
244 } 244 }
245 245
246 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) 246 JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
247} 247}
248 248
249// --------------------------------------------------------------------------- 249// ---------------------------------------------------------------------------
diff --git a/server/lib/activitypub/videos/get.ts b/server/lib/activitypub/videos/get.ts
index b74df132c..14ba55034 100644
--- a/server/lib/activitypub/videos/get.ts
+++ b/server/lib/activitypub/videos/get.ts
@@ -107,7 +107,7 @@ async function scheduleRefresh (video: MVideoThumbnail, fetchType: VideoLoadByUr
107 return refreshVideoIfNeeded(refreshOptions) 107 return refreshVideoIfNeeded(refreshOptions)
108 } 108 }
109 109
110 await JobQueue.Instance.createJobWithPromise({ 110 await JobQueue.Instance.createJob({
111 type: 'activitypub-refresher', 111 type: 'activitypub-refresher',
112 payload: { type: 'video', url: video.url } 112 payload: { type: 'video', url: video.url }
113 }) 113 })
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
index 8cf0c87a6..8ed1b6447 100644
--- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts
+++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
@@ -74,7 +74,7 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
74} 74}
75 75
76function createJob (payload: ActivitypubHttpFetcherPayload) { 76function createJob (payload: ActivitypubHttpFetcherPayload) {
77 return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) 77 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
78} 78}
79 79
80function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { 80function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
diff --git a/server/lib/emailer.ts b/server/lib/emailer.ts
index bd1089530..9e546de7f 100644
--- a/server/lib/emailer.ts
+++ b/server/lib/emailer.ts
@@ -66,7 +66,7 @@ class Emailer {
66 } 66 }
67 } 67 }
68 68
69 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 69 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
70 } 70 }
71 71
72 addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { 72 addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) {
@@ -80,7 +80,7 @@ class Emailer {
80 } 80 }
81 } 81 }
82 82
83 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 83 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
84 } 84 }
85 85
86 addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { 86 addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) {
@@ -94,7 +94,7 @@ class Emailer {
94 } 94 }
95 } 95 }
96 96
97 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 97 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
98 } 98 }
99 99
100 addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { 100 addUserBlockJob (user: MUser, blocked: boolean, reason?: string) {
@@ -108,7 +108,7 @@ class Emailer {
108 text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` 108 text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.`
109 } 109 }
110 110
111 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 111 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
112 } 112 }
113 113
114 addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { 114 addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) {
@@ -127,7 +127,7 @@ class Emailer {
127 } 127 }
128 } 128 }
129 129
130 return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) 130 return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
131 } 131 }
132 132
133 async sendMail (options: EmailPayload) { 133 async sendMail (options: EmailPayload) {
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 944da5be1..a68c32ba0 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) {
17 const payload = job.data as ActivitypubFollowPayload 17 const payload = job.data as ActivitypubFollowPayload
18 const host = payload.host 18 const host = payload.host
19 19
20 logger.info('Processing ActivityPub follow in job %d.', job.id) 20 logger.info('Processing ActivityPub follow in job %s.', job.id)
21 21
22 let targetActor: MActorFull 22 let targetActor: MActorFull
23 if (!host || host === WEBSERVER.HOST) { 23 if (!host || host === WEBSERVER.HOST) {
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 354c608fb..13eff5211 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests'
8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' 8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
9 9
10async function processActivityPubHttpBroadcast (job: Job) { 10async function processActivityPubHttpBroadcast (job: Job) {
11 logger.info('Processing ActivityPub broadcast in job %d.', job.id) 11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12 12
13 const payload = job.data as ActivitypubHttpBroadcastPayload 13 const payload = job.data as ActivitypubHttpBroadcastPayload
14 14
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index e0b841887..b6cb3c4a6 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share'
12import { addVideoComments } from '../../activitypub/video-comments' 12import { addVideoComments } from '../../activitypub/video-comments'
13 13
14async function processActivityPubHttpFetcher (job: Job) { 14async function processActivityPubHttpFetcher (job: Job) {
15 logger.info('Processing ActivityPub fetcher in job %d.', job.id) 15 logger.info('Processing ActivityPub fetcher in job %s.', job.id)
16 16
17 const payload = job.data as ActivitypubHttpFetcherPayload 17 const payload = job.data as ActivitypubHttpFetcherPayload
18 18
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 837a597a5..9e4e84002 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests'
6import { ActorFollowHealthCache } from '../../actor-follow-health-cache' 6import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
7 7
8async function processActivityPubHttpUnicast (job: Job) { 8async function processActivityPubHttpUnicast (job: Job) {
9 logger.info('Processing ActivityPub unicast in job %d.', job.id) 9 logger.info('Processing ActivityPub unicast in job %s.', job.id)
10 10
11 const payload = job.data as ActivitypubHttpUnicastPayload 11 const payload = job.data as ActivitypubHttpUnicastPayload
12 const uri = payload.uri 12 const uri = payload.uri
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 600f858a0..307e771ff 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors'
11async function refreshAPObject (job: Job) { 11async function refreshAPObject (job: Job) {
12 const payload = job.data as RefreshPayload 12 const payload = job.data as RefreshPayload
13 13
14 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) 14 logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url)
15 15
16 if (payload.type === 'video') return refreshVideo(payload.url) 16 if (payload.type === 'video') return refreshVideo(payload.url)
17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) 17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 4a5bad9fb..27a2d431b 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger'
6 6
7async function processActorKeys (job: Job) { 7async function processActorKeys (job: Job) {
8 const payload = job.data as ActorKeysPayload 8 const payload = job.data as ActorKeysPayload
9 logger.info('Processing actor keys in job %d.', job.id) 9 logger.info('Processing actor keys in job %s.', job.id)
10 10
11 const actor = await ActorModel.load(payload.actorId) 11 const actor = await ActorModel.load(payload.actorId)
12 12
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index b5b9475b1..567bcc076 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer'
5 5
6async function processEmail (job: Job) { 6async function processEmail (job: Job) {
7 const payload = job.data as EmailPayload 7 const payload = job.data as EmailPayload
8 logger.info('Processing email in job %d.', job.id) 8 logger.info('Processing email in job %s.', job.id)
9 9
10 return Emailer.Instance.sendMail(payload) 10 return Emailer.Instance.sendMail(payload)
11} 11}
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts
new file mode 100644
index 000000000..6aac36741
--- /dev/null
+++ b/server/lib/job-queue/handlers/federate-video.ts
@@ -0,0 +1,28 @@
1import { Job } from 'bullmq'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { sequelizeTypescript } from '@server/initializers/database'
4import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
5import { VideoModel } from '@server/models/video/video'
6import { FederateVideoPayload } from '@shared/models'
7import { logger } from '../../../helpers/logger'
8
9function processFederateVideo (job: Job) {
10 const payload = job.data as FederateVideoPayload
11
12 logger.info('Processing video federation in job %s.', job.id)
13
14 return retryTransactionWrapper(() => {
15 return sequelizeTypescript.transaction(async t => {
16 const video = await VideoModel.loadFull(payload.videoUUID, t)
17 if (!video) return
18
19 return federateVideoIfNeeded(video, payload.isNewVideo, t)
20 })
21 })
22}
23
24// ---------------------------------------------------------------------------
25
26export {
27 processFederateVideo
28}
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index 4505ca79e..03aa414c9 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger'
8 8
9async function processManageVideoTorrent (job: Job) { 9async function processManageVideoTorrent (job: Job) {
10 const payload = job.data as ManageVideoTorrentPayload 10 const payload = job.data as ManageVideoTorrentPayload
11 logger.info('Processing torrent in job %d.', job.id) 11 logger.info('Processing torrent in job %s.', job.id)
12 12
13 if (payload.action === 'create') return doCreateAction(payload) 13 if (payload.action === 'create') return doCreateAction(payload)
14 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) 14 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index d608fd865..25bdebeea 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage')
17 17
18export async function processMoveToObjectStorage (job: Job) { 18export async function processMoveToObjectStorage (job: Job) {
19 const payload = job.data as MoveObjectStoragePayload 19 const payload = job.data as MoveObjectStoragePayload
20 logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) 20 logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
21 21
22 const video = await VideoModel.loadWithFiles(payload.videoUUID) 22 const video = await VideoModel.loadWithFiles(payload.videoUUID)
23 // No video, maybe deleted? 23 // No video, maybe deleted?
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) {
43 43
44 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') 44 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
45 if (pendingMove === 0) { 45 if (pendingMove === 0) {
46 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) 46 logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
47 47
48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) 48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
49 } 49 }
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts
new file mode 100644
index 000000000..83605396c
--- /dev/null
+++ b/server/lib/job-queue/handlers/notify.ts
@@ -0,0 +1,27 @@
1import { Job } from 'bullmq'
2import { Notifier } from '@server/lib/notifier'
3import { VideoModel } from '@server/models/video/video'
4import { NotifyPayload } from '@shared/models'
5import { logger } from '../../../helpers/logger'
6
7async function processNotify (job: Job) {
8 const payload = job.data as NotifyPayload
9 logger.info('Processing %s notification in job %s.', payload.action, job.id)
10
11 if (payload.action === 'new-video') return doNotifyNewVideo(payload)
12}
13
14// ---------------------------------------------------------------------------
15
16export {
17 processNotify
18}
19
20// ---------------------------------------------------------------------------
21
22async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) {
23 const refreshedVideo = await VideoModel.loadFull(payload.videoUUID)
24 if (!refreshedVideo) return
25
26 Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
27}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 40c44cf52..d950f6407 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
6import { generateWebTorrentVideoFilename } from '@server/lib/paths' 6import { generateWebTorrentVideoFilename } from '@server/lib/paths'
7import { addMoveToObjectStorageJob } from '@server/lib/video' 7import { buildMoveToObjectStorageJob } from '@server/lib/video'
8import { VideoPathManager } from '@server/lib/video-path-manager' 8import { VideoPathManager } from '@server/lib/video-path-manager'
9import { VideoModel } from '@server/models/video/video' 9import { VideoModel } from '@server/models/video/video'
10import { VideoFileModel } from '@server/models/video/video-file' 10import { VideoFileModel } from '@server/models/video/video-file'
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils'
13import { VideoFileImportPayload, VideoStorage } from '@shared/models' 13import { VideoFileImportPayload, VideoStorage } from '@shared/models'
14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' 14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
15import { logger } from '../../../helpers/logger' 15import { logger } from '../../../helpers/logger'
16import { JobQueue } from '../job-queue'
16 17
17async function processVideoFileImport (job: Job) { 18async function processVideoFileImport (job: Job) {
18 const payload = job.data as VideoFileImportPayload 19 const payload = job.data as VideoFileImportPayload
19 logger.info('Processing video file import in job %d.', job.id) 20 logger.info('Processing video file import in job %s.', job.id)
20 21
21 const video = await VideoModel.loadFull(payload.videoUUID) 22 const video = await VideoModel.loadFull(payload.videoUUID)
22 // No video, maybe deleted? 23 // No video, maybe deleted?
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) {
28 await updateVideoFile(video, payload.filePath) 29 await updateVideoFile(video, payload.filePath)
29 30
30 if (CONFIG.OBJECT_STORAGE.ENABLED) { 31 if (CONFIG.OBJECT_STORAGE.ENABLED) {
31 await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) 32 await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state }))
32 } else { 33 } else {
33 await federateVideoIfNeeded(video, false) 34 await federateVideoIfNeeded(video, false)
34 } 35 }
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index e5cd35865..f4629159c 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Hooks } from '@server/lib/plugins/hooks' 8import { Hooks } from '@server/lib/plugins/hooks'
9import { ServerConfigManager } from '@server/lib/server-config-manager' 9import { ServerConfigManager } from '@server/lib/server-config-manager'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video'
12import { VideoPathManager } from '@server/lib/video-path-manager' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 13import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail'
39import { federateVideoIfNeeded } from '../../activitypub/videos' 39import { federateVideoIfNeeded } from '../../activitypub/videos'
40import { Notifier } from '../../notifier' 40import { Notifier } from '../../notifier'
41import { generateVideoMiniature } from '../../thumbnail' 41import { generateVideoMiniature } from '../../thumbnail'
42import { JobQueue } from '../job-queue'
42 43
43async function processVideoImport (job: Job) { 44async function processVideoImport (job: Job) {
44 const payload = job.data as VideoImportPayload 45 const payload = job.data as VideoImportPayload
@@ -65,7 +66,7 @@ export {
65// --------------------------------------------------------------------------- 66// ---------------------------------------------------------------------------
66 67
67async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { 68async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
68 logger.info('Processing torrent video import in job %d.', job.id) 69 logger.info('Processing torrent video import in job %s.', job.id)
69 70
70 const options = { type: payload.type, videoImportId: payload.videoImportId } 71 const options = { type: payload.type, videoImportId: payload.videoImportId }
71 72
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault,
77} 78}
78 79
79async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { 80async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
80 logger.info('Processing youtubeDL video import in job %d.', job.id) 81 logger.info('Processing youtubeDL video import in job %s.', job.id)
81 82
82 const options = { type: payload.type, videoImportId: videoImport.id } 83 const options = { type: payload.type, videoImportId: videoImport.id }
83 84
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
259 } 260 }
260 261
261 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 262 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
262 return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) 263 await JobQueue.Instance.createJob(
264 await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
265 )
263 } 266 }
264 267
265 // Create transcoding jobs? 268 // Create transcoding jobs?
266 if (video.state === VideoState.TO_TRANSCODE) { 269 if (video.state === VideoState.TO_TRANSCODE) {
267 await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) 270 await JobQueue.Instance.createJob(
271 await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
272 )
268 } 273 }
269 274
270 } catch (err) { 275 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
index 75ab2cd02..bac99fdb7 100644
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ b/server/lib/job-queue/handlers/video-redundancy.ts
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger'
5 5
6async function processVideoRedundancy (job: Job) { 6async function processVideoRedundancy (job: Job) {
7 const payload = job.data as VideoRedundancyPayload 7 const payload = job.data as VideoRedundancyPayload
8 logger.info('Processing video redundancy in job %d.', job.id) 8 logger.info('Processing video redundancy in job %s.', job.id)
9 9
10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) 10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId)
11} 11}
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 078243538..23f9a34cc 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { generateWebTorrentVideoFilename } from '@server/lib/paths' 8import { generateWebTorrentVideoFilename } from '@server/lib/paths'
9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' 9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { buildOptimizeOrMergeAudioJob } from '@server/lib/video'
12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' 12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
13import { VideoPathManager } from '@server/lib/video-path-manager' 13import { VideoPathManager } from '@server/lib/video-path-manager'
14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' 14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
@@ -36,6 +36,7 @@ import {
36 VideoStudioTaskWatermarkPayload 36 VideoStudioTaskWatermarkPayload
37} from '@shared/models' 37} from '@shared/models'
38import { logger, loggerTagsFactory } from '../../../helpers/logger' 38import { logger, loggerTagsFactory } from '../../../helpers/logger'
39import { JobQueue } from '../job-queue'
39 40
40const lTagsBase = loggerTagsFactory('video-edition') 41const lTagsBase = loggerTagsFactory('video-edition')
41 42
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) {
43 const payload = job.data as VideoStudioEditionPayload 44 const payload = job.data as VideoStudioEditionPayload
44 const lTags = lTagsBase(payload.videoUUID) 45 const lTags = lTagsBase(payload.videoUUID)
45 46
46 logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) 47 logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags)
47 48
48 const video = await VideoModel.loadFull(payload.videoUUID) 49 const video = await VideoModel.loadFull(payload.videoUUID)
49 50
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) {
100 await federateVideoIfNeeded(video, false, undefined) 101 await federateVideoIfNeeded(video, false, undefined)
101 102
102 const user = await UserModel.loadByVideoId(video.id) 103 const user = await UserModel.loadByVideoId(video.id)
103 await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) 104
105 await JobQueue.Instance.createJob(
106 await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
107 )
104} 108}
105 109
106// --------------------------------------------------------------------------- 110// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0cf5d53ce..50d732beb 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,6 @@
1import { 1import {
2 FlowJob,
3 FlowProducer,
2 Job, 4 Job,
3 JobsOptions, 5 JobsOptions,
4 Queue, 6 Queue,
@@ -13,7 +15,7 @@ import {
13import { jobStates } from '@server/helpers/custom-validators/jobs' 15import { jobStates } from '@server/helpers/custom-validators/jobs'
14import { CONFIG } from '@server/initializers/config' 16import { CONFIG } from '@server/initializers/config'
15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils' 18import { pick, timeoutPromise } from '@shared/core-utils'
17import { 19import {
18 ActivitypubFollowPayload, 20 ActivitypubFollowPayload,
19 ActivitypubHttpBroadcastPayload, 21 ActivitypubHttpBroadcastPayload,
@@ -22,10 +24,12 @@ import {
22 ActorKeysPayload, 24 ActorKeysPayload,
23 DeleteResumableUploadMetaFilePayload, 25 DeleteResumableUploadMetaFilePayload,
24 EmailPayload, 26 EmailPayload,
27 FederateVideoPayload,
25 JobState, 28 JobState,
26 JobType, 29 JobType,
27 ManageVideoTorrentPayload, 30 ManageVideoTorrentPayload,
28 MoveObjectStoragePayload, 31 MoveObjectStoragePayload,
32 NotifyPayload,
29 RefreshPayload, 33 RefreshPayload,
30 VideoFileImportPayload, 34 VideoFileImportPayload,
31 VideoImportPayload, 35 VideoImportPayload,
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
45import { refreshAPObject } from './handlers/activitypub-refresher' 49import { refreshAPObject } from './handlers/activitypub-refresher'
46import { processActorKeys } from './handlers/actor-keys' 50import { processActorKeys } from './handlers/actor-keys'
47import { processEmail } from './handlers/email' 51import { processEmail } from './handlers/email'
52import { processFederateVideo } from './handlers/federate-video'
48import { processManageVideoTorrent } from './handlers/manage-video-torrent' 53import { processManageVideoTorrent } from './handlers/manage-video-torrent'
49import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' 54import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
55import { processNotify } from './handlers/notify'
50import { processVideoFileImport } from './handlers/video-file-import' 56import { processVideoFileImport } from './handlers/video-file-import'
51import { processVideoImport } from './handlers/video-import' 57import { processVideoImport } from './handlers/video-import'
52import { processVideoLiveEnding } from './handlers/video-live-ending' 58import { processVideoLiveEnding } from './handlers/video-live-ending'
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
54import { processVideoTranscoding } from './handlers/video-transcoding' 60import { processVideoTranscoding } from './handlers/video-transcoding'
55import { processVideosViewsStats } from './handlers/video-views-stats' 61import { processVideosViewsStats } from './handlers/video-views-stats'
56 62
57type CreateJobArgument = 63export type CreateJobArgument =
58 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 64 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
59 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | 65 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
60 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 66 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
@@ -73,7 +79,9 @@ type CreateJobArgument =
73 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | 79 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
74 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | 80 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
75 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | 81 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
76 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } 82 { type: 'notify', payload: NotifyPayload } |
83 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
84 { type: 'federate-video', payload: FederateVideoPayload }
77 85
78export type CreateJobOptions = { 86export type CreateJobOptions = {
79 delay?: number 87 delay?: number
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
98 'video-redundancy': processVideoRedundancy, 106 'video-redundancy': processVideoRedundancy,
99 'move-to-object-storage': processMoveToObjectStorage, 107 'move-to-object-storage': processMoveToObjectStorage,
100 'manage-video-torrent': processManageVideoTorrent, 108 'manage-video-torrent': processManageVideoTorrent,
101 'video-studio-edition': processVideoStudioEdition 109 'notify': processNotify,
110 'video-studio-edition': processVideoStudioEdition,
111 'federate-video': processFederateVideo
102} 112}
103 113
104const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { 114const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
123 'video-live-ending', 133 'video-live-ending',
124 'move-to-object-storage', 134 'move-to-object-storage',
125 'manage-video-torrent', 135 'manage-video-torrent',
126 'video-studio-edition' 136 'video-studio-edition',
137 'notify',
138 'federate-video'
127] 139]
128 140
129const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) 141const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
@@ -137,6 +149,8 @@ class JobQueue {
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} 149 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {} 150 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139 151
152 private flowProducer: FlowProducer
153
140 private initialized = false 154 private initialized = false
141 private jobRedisPrefix: string 155 private jobRedisPrefix: string
142 156
@@ -157,6 +171,11 @@ class JobQueue {
157 this.buildQueueEvent(handlerName, produceOnly) 171 this.buildQueueEvent(handlerName, produceOnly)
158 } 172 }
159 173
174 this.flowProducer = new FlowProducer({
175 connection: this.getRedisConnection(),
176 prefix: this.jobRedisPrefix
177 })
178
160 this.addRepeatableJobs() 179 this.addRepeatableJobs()
161 } 180 }
162 181
@@ -243,6 +262,8 @@ class JobQueue {
243 } 262 }
244 } 263 }
245 264
265 // ---------------------------------------------------------------------------
266
246 async terminate () { 267 async terminate () {
247 const promises = Object.keys(this.workers) 268 const promises = Object.keys(this.workers)
248 .map(handlerName => { 269 .map(handlerName => {
@@ -278,28 +299,56 @@ class JobQueue {
278 } 299 }
279 } 300 }
280 301
281 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { 302 // ---------------------------------------------------------------------------
282 this.createJobWithPromise(obj, options) 303
283 .catch(err => logger.error('Cannot create job.', { err, obj })) 304 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
305 this.createJob(options)
306 .catch(err => logger.error('Cannot create job.', { err, options }))
284 } 307 }
285 308
286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 309 async createJob (options: CreateJobArgument & CreateJobOptions) {
287 const queue: Queue = this.queues[obj.type] 310 const queue: Queue = this.queues[options.type]
288 if (queue === undefined) { 311 if (queue === undefined) {
289 logger.error('Unknown queue %s: cannot create job.', obj.type) 312 logger.error('Unknown queue %s: cannot create job.', options.type)
290 return 313 return
291 } 314 }
292 315
293 const jobArgs: JobsOptions = { 316 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
317
318 return queue.add('job', options.payload, jobOptions)
319 }
320
321 async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
322 let lastJob: FlowJob
323
324 for (const job of jobs) {
325 if (!job) continue
326
327 lastJob = {
328 name: 'job',
329 data: job.payload,
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob
333 ? [ lastJob ]
334 : []
335 }
336 }
337
338 return this.flowProducer.add(lastJob)
339 }
340
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return {
294 backoff: { delay: 60 * 1000, type: 'exponential' }, 343 backoff: { delay: 60 * 1000, type: 'exponential' },
295 attempts: JOB_ATTEMPTS[obj.type], 344 attempts: JOB_ATTEMPTS[type],
296 priority: options.priority, 345 priority: options.priority,
297 delay: options.delay 346 delay: options.delay
298 } 347 }
299
300 return queue.add('job', obj.payload, jobArgs)
301 } 348 }
302 349
350 // ---------------------------------------------------------------------------
351
303 async listForApi (options: { 352 async listForApi (options: {
304 state?: JobState 353 state?: JobState
305 start: number 354 start: number
@@ -367,6 +416,8 @@ class JobQueue {
367 return Promise.all(promises) 416 return Promise.all(promises)
368 } 417 }
369 418
419 // ---------------------------------------------------------------------------
420
370 async removeOldJobs () { 421 async removeOldJobs () {
371 for (const key of Object.keys(this.queues)) { 422 for (const key of Object.keys(this.queues)) {
372 const queue: Queue = this.queues[key] 423 const queue: Queue = this.queues[key]
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 1410889a2..aadd8e308 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -408,7 +408,7 @@ class LiveManager {
408 await liveSession.save() 408 await liveSession.save()
409 } 409 }
410 410
411 JobQueue.Instance.createJob({ 411 JobQueue.Instance.createJobAsync({
412 type: 'video-live-ending', 412 type: 'video-live-ending',
413 payload: { 413 payload: {
414 videoId: fullVideo.id, 414 videoId: fullVideo.id,
@@ -421,8 +421,12 @@ class LiveManager {
421 streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, 421 streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
422 422
423 publishedAt: fullVideo.publishedAt.toISOString() 423 publishedAt: fullVideo.publishedAt.toISOString()
424 } 424 },
425 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) 425
426 delay: cleanupNow
427 ? 0
428 : VIDEO_LIVE.CLEANUP_DELAY
429 })
426 430
427 fullVideo.state = live.permanentLive 431 fullVideo.state = live.permanentLive
428 ? VideoState.WAITING_FOR_LIVE 432 ? VideoState.WAITING_FOR_LIVE
diff --git a/server/lib/notifier/notifier.ts b/server/lib/notifier/notifier.ts
index d1c4c0215..66cfc31c4 100644
--- a/server/lib/notifier/notifier.ts
+++ b/server/lib/notifier/notifier.ts
@@ -242,7 +242,7 @@ class Notifier {
242 242
243 for (const to of toEmails) { 243 for (const to of toEmails) {
244 const payload = await object.createEmail(to) 244 const payload = await object.createEmail(to)
245 JobQueue.Instance.createJob({ type: 'email', payload }) 245 JobQueue.Instance.createJobAsync({ type: 'email', payload })
246 } 246 }
247 } 247 }
248 248
diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts
index d9f9c2de3..956ece749 100644
--- a/server/lib/schedulers/auto-follow-index-instances.ts
+++ b/server/lib/schedulers/auto-follow-index-instances.ts
@@ -59,7 +59,7 @@ export class AutoFollowIndexInstances extends AbstractScheduler {
59 isAutoFollow: true 59 isAutoFollow: true
60 } 60 }
61 61
62 JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) 62 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
63 } 63 }
64 } 64 }
65 65
diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts
index b5d8353b7..9ebbd7679 100644
--- a/server/lib/video-state.ts
+++ b/server/lib/video-state.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
2import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
4import { sequelizeTypescript } from '@server/initializers/database' 5import { sequelizeTypescript } from '@server/initializers/database'
@@ -7,9 +8,9 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
7import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' 8import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models'
8import { VideoState } from '@shared/models' 9import { VideoState } from '@shared/models'
9import { federateVideoIfNeeded } from './activitypub/videos' 10import { federateVideoIfNeeded } from './activitypub/videos'
11import { JobQueue } from './job-queue'
10import { Notifier } from './notifier' 12import { Notifier } from './notifier'
11import { addMoveToObjectStorageJob } from './video' 13import { buildMoveToObjectStorageJob } from './video'
12import { retryTransactionWrapper } from '@server/helpers/database-utils'
13 14
14function buildNextVideoState (currentState?: VideoState) { 15function buildNextVideoState (currentState?: VideoState) {
15 if (currentState === VideoState.PUBLISHED) { 16 if (currentState === VideoState.PUBLISHED) {
@@ -86,7 +87,7 @@ async function moveToExternalStorageState (options: {
86 logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) 87 logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
87 88
88 try { 89 try {
89 await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }) 90 await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }))
90 91
91 return true 92 return true
92 } catch (err) { 93 } catch (err) {
diff --git a/server/lib/video.ts b/server/lib/video.ts
index b843b11bc..f7d7aa186 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -1,5 +1,7 @@
1import { UploadFiles } from 'express' 1import { UploadFiles } from 'express'
2import memoizee from 'memoizee'
2import { Transaction } from 'sequelize/types' 3import { Transaction } from 'sequelize/types'
4import { CONFIG } from '@server/initializers/config'
3import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' 5import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants'
4import { TagModel } from '@server/models/video/tag' 6import { TagModel } from '@server/models/video/tag'
5import { VideoModel } from '@server/models/video/video' 7import { VideoModel } from '@server/models/video/video'
@@ -9,8 +11,6 @@ import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID
9import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
10import { CreateJobOptions, JobQueue } from './job-queue/job-queue' 12import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
11import { updateVideoMiniatureFromExisting } from './thumbnail' 13import { updateVideoMiniatureFromExisting } from './thumbnail'
12import { CONFIG } from '@server/initializers/config'
13import memoizee from 'memoizee'
14 14
15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { 15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
16 return { 16 return {
@@ -86,7 +86,7 @@ async function setVideoTags (options: {
86 86
87// --------------------------------------------------------------------------- 87// ---------------------------------------------------------------------------
88 88
89async function addOptimizeOrMergeAudioJob (options: { 89async function buildOptimizeOrMergeAudioJob (options: {
90 video: MVideoUUID 90 video: MVideoUUID
91 videoFile: MVideoFile 91 videoFile: MVideoFile
92 user: MUserId 92 user: MUserId
@@ -94,10 +94,10 @@ async function addOptimizeOrMergeAudioJob (options: {
94}) { 94}) {
95 const { video, videoFile, user, isNewVideo } = options 95 const { video, videoFile, user, isNewVideo } = options
96 96
97 let dataInput: VideoTranscodingPayload 97 let payload: VideoTranscodingPayload
98 98
99 if (videoFile.isAudio()) { 99 if (videoFile.isAudio()) {
100 dataInput = { 100 payload = {
101 type: 'merge-audio-to-webtorrent', 101 type: 'merge-audio-to-webtorrent',
102 resolution: DEFAULT_AUDIO_RESOLUTION, 102 resolution: DEFAULT_AUDIO_RESOLUTION,
103 videoUUID: video.uuid, 103 videoUUID: video.uuid,
@@ -105,24 +105,26 @@ async function addOptimizeOrMergeAudioJob (options: {
105 isNewVideo 105 isNewVideo
106 } 106 }
107 } else { 107 } else {
108 dataInput = { 108 payload = {
109 type: 'optimize-to-webtorrent', 109 type: 'optimize-to-webtorrent',
110 videoUUID: video.uuid, 110 videoUUID: video.uuid,
111 isNewVideo 111 isNewVideo
112 } 112 }
113 } 113 }
114 114
115 const jobOptions = { 115 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
116 priority: await getTranscodingJobPriority(user)
117 }
118 116
119 return addTranscodingJob(dataInput, jobOptions) 117 return {
118 type: 'video-transcoding' as 'video-transcoding',
119 priority: await getTranscodingJobPriority(user),
120 payload
121 }
120} 122}
121 123
122async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { 124async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
123 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') 125 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
124 126
125 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options) 127 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
126} 128}
127 129
128async function getTranscodingJobPriority (user: MUserId) { 130async function getTranscodingJobPriority (user: MUserId) {
@@ -136,7 +138,7 @@ async function getTranscodingJobPriority (user: MUserId) {
136 138
137// --------------------------------------------------------------------------- 139// ---------------------------------------------------------------------------
138 140
139async function addMoveToObjectStorageJob (options: { 141async function buildMoveToObjectStorageJob (options: {
140 video: MVideoUUID 142 video: MVideoUUID
141 previousVideoState: VideoState 143 previousVideoState: VideoState
142 isNewVideo?: boolean // Default true 144 isNewVideo?: boolean // Default true
@@ -145,8 +147,14 @@ async function addMoveToObjectStorageJob (options: {
145 147
146 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') 148 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove')
147 149
148 const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState } 150 return {
149 return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput }) 151 type: 'move-to-object-storage' as 'move-to-object-storage',
152 payload: {
153 videoUUID: video.uuid,
154 isNewVideo,
155 previousVideoState
156 }
157 }
150} 158}
151 159
152// --------------------------------------------------------------------------- 160// ---------------------------------------------------------------------------
@@ -173,9 +181,9 @@ export {
173 buildLocalVideoFromReq, 181 buildLocalVideoFromReq,
174 buildVideoThumbnailsFromReq, 182 buildVideoThumbnailsFromReq,
175 setVideoTags, 183 setVideoTags,
176 addOptimizeOrMergeAudioJob, 184 buildOptimizeOrMergeAudioJob,
177 addTranscodingJob, 185 addTranscodingJob,
178 addMoveToObjectStorageJob, 186 buildMoveToObjectStorageJob,
179 getTranscodingJobPriority, 187 getTranscodingJobPriority,
180 getCachedVideoDuration 188 getCachedVideoDuration
181} 189}