diff options
Diffstat (limited to 'server/lib')
29 files changed, 206 insertions, 77 deletions
diff --git a/server/lib/activitypub/actors/get.ts b/server/lib/activitypub/actors/get.ts index d2b651082..e73b7d707 100644 --- a/server/lib/activitypub/actors/get.ts +++ b/server/lib/activitypub/actors/get.ts | |||
@@ -110,7 +110,7 @@ async function loadActorFromDB (actorUrl: string, fetchType: ActorLoadByUrlType) | |||
110 | async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) { | 110 | async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) { |
111 | if ((created === true || refreshed === true) && updateCollections === true) { | 111 | if ((created === true || refreshed === true) && updateCollections === true) { |
112 | const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' } | 112 | const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' } |
113 | await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) | 113 | await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
114 | } | 114 | } |
115 | } | 115 | } |
116 | 116 | ||
@@ -118,6 +118,6 @@ async function schedulePlaylistFetchIfNeeded (actor: MActorAccountId, created: b | |||
118 | // We created a new account: fetch the playlists | 118 | // We created a new account: fetch the playlists |
119 | if (created === true && actor.Account && accountPlaylistsUrl) { | 119 | if (created === true && actor.Account && accountPlaylistsUrl) { |
120 | const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' } | 120 | const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' } |
121 | await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) | 121 | await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
122 | } | 122 | } |
123 | } | 123 | } |
diff --git a/server/lib/activitypub/follow.ts b/server/lib/activitypub/follow.ts index 741b54df5..f6e2a48fd 100644 --- a/server/lib/activitypub/follow.ts +++ b/server/lib/activitypub/follow.ts | |||
@@ -27,7 +27,7 @@ async function autoFollowBackIfNeeded (actorFollow: MActorFollowActors, transact | |||
27 | isAutoFollow: true | 27 | isAutoFollow: true |
28 | } | 28 | } |
29 | 29 | ||
30 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 30 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
31 | } | 31 | } |
32 | } | 32 | } |
33 | 33 | ||
diff --git a/server/lib/activitypub/outbox.ts b/server/lib/activitypub/outbox.ts index ecdc33a77..5eef76871 100644 --- a/server/lib/activitypub/outbox.ts +++ b/server/lib/activitypub/outbox.ts | |||
@@ -16,7 +16,7 @@ async function addFetchOutboxJob (actor: Pick<ActorModel, 'id' | 'outboxUrl'>) { | |||
16 | type: 'activity' as 'activity' | 16 | type: 'activity' as 'activity' |
17 | } | 17 | } |
18 | 18 | ||
19 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | 19 | return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload }) |
20 | } | 20 | } |
21 | 21 | ||
22 | export { | 22 | export { |
diff --git a/server/lib/activitypub/playlists/refresh.ts b/server/lib/activitypub/playlists/refresh.ts index 493e8c7ec..33260ea02 100644 --- a/server/lib/activitypub/playlists/refresh.ts +++ b/server/lib/activitypub/playlists/refresh.ts | |||
@@ -9,7 +9,7 @@ import { fetchRemoteVideoPlaylist } from './shared' | |||
9 | function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { | 9 | function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { |
10 | if (!playlist.isOutdated()) return | 10 | if (!playlist.isOutdated()) return |
11 | 11 | ||
12 | JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) | 12 | JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) |
13 | } | 13 | } |
14 | 14 | ||
15 | async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> { | 15 | async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> { |
diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts index fcec63991..2bc1ef8f5 100644 --- a/server/lib/activitypub/send/shared/send-utils.ts +++ b/server/lib/activitypub/send/shared/send-utils.ts | |||
@@ -120,7 +120,7 @@ async function forwardActivity ( | |||
120 | body: activity, | 120 | body: activity, |
121 | contextType: null | 121 | contextType: null |
122 | } | 122 | } |
123 | return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })) | 123 | return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload })) |
124 | } | 124 | } |
125 | 125 | ||
126 | // --------------------------------------------------------------------------- | 126 | // --------------------------------------------------------------------------- |
@@ -205,7 +205,7 @@ function broadcastTo (options: { | |||
205 | contextType | 205 | contextType |
206 | } | 206 | } |
207 | 207 | ||
208 | JobQueue.Instance.createJob({ | 208 | JobQueue.Instance.createJobAsync({ |
209 | type: parallelizable | 209 | type: parallelizable |
210 | ? 'activitypub-http-broadcast-parallel' | 210 | ? 'activitypub-http-broadcast-parallel' |
211 | : 'activitypub-http-broadcast', | 211 | : 'activitypub-http-broadcast', |
@@ -222,7 +222,7 @@ function broadcastTo (options: { | |||
222 | contextType | 222 | contextType |
223 | } | 223 | } |
224 | 224 | ||
225 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | 225 | JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload }) |
226 | } | 226 | } |
227 | } | 227 | } |
228 | 228 | ||
@@ -243,7 +243,7 @@ function unicastTo (options: { | |||
243 | contextType | 243 | contextType |
244 | } | 244 | } |
245 | 245 | ||
246 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | 246 | JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload }) |
247 | } | 247 | } |
248 | 248 | ||
249 | // --------------------------------------------------------------------------- | 249 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/activitypub/videos/get.ts b/server/lib/activitypub/videos/get.ts index b74df132c..14ba55034 100644 --- a/server/lib/activitypub/videos/get.ts +++ b/server/lib/activitypub/videos/get.ts | |||
@@ -107,7 +107,7 @@ async function scheduleRefresh (video: MVideoThumbnail, fetchType: VideoLoadByUr | |||
107 | return refreshVideoIfNeeded(refreshOptions) | 107 | return refreshVideoIfNeeded(refreshOptions) |
108 | } | 108 | } |
109 | 109 | ||
110 | await JobQueue.Instance.createJobWithPromise({ | 110 | await JobQueue.Instance.createJob({ |
111 | type: 'activitypub-refresher', | 111 | type: 'activitypub-refresher', |
112 | payload: { type: 'video', url: video.url } | 112 | payload: { type: 'video', url: video.url } |
113 | }) | 113 | }) |
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8cf0c87a6..8ed1b6447 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts | |||
@@ -74,7 +74,7 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi | |||
74 | } | 74 | } |
75 | 75 | ||
76 | function createJob (payload: ActivitypubHttpFetcherPayload) { | 76 | function createJob (payload: ActivitypubHttpFetcherPayload) { |
77 | return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) | 77 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
78 | } | 78 | } |
79 | 79 | ||
80 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { | 80 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { |
diff --git a/server/lib/emailer.ts b/server/lib/emailer.ts index bd1089530..9e546de7f 100644 --- a/server/lib/emailer.ts +++ b/server/lib/emailer.ts | |||
@@ -66,7 +66,7 @@ class Emailer { | |||
66 | } | 66 | } |
67 | } | 67 | } |
68 | 68 | ||
69 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 69 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
70 | } | 70 | } |
71 | 71 | ||
72 | addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { | 72 | addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { |
@@ -80,7 +80,7 @@ class Emailer { | |||
80 | } | 80 | } |
81 | } | 81 | } |
82 | 82 | ||
83 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 83 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
84 | } | 84 | } |
85 | 85 | ||
86 | addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { | 86 | addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { |
@@ -94,7 +94,7 @@ class Emailer { | |||
94 | } | 94 | } |
95 | } | 95 | } |
96 | 96 | ||
97 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 97 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
98 | } | 98 | } |
99 | 99 | ||
100 | addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { | 100 | addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { |
@@ -108,7 +108,7 @@ class Emailer { | |||
108 | text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` | 108 | text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` |
109 | } | 109 | } |
110 | 110 | ||
111 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 111 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
112 | } | 112 | } |
113 | 113 | ||
114 | addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { | 114 | addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { |
@@ -127,7 +127,7 @@ class Emailer { | |||
127 | } | 127 | } |
128 | } | 128 | } |
129 | 129 | ||
130 | return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) | 130 | return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) |
131 | } | 131 | } |
132 | 132 | ||
133 | async sendMail (options: EmailPayload) { | 133 | async sendMail (options: EmailPayload) { |
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 944da5be1..a68c32ba0 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) { | |||
17 | const payload = job.data as ActivitypubFollowPayload | 17 | const payload = job.data as ActivitypubFollowPayload |
18 | const host = payload.host | 18 | const host = payload.host |
19 | 19 | ||
20 | logger.info('Processing ActivityPub follow in job %d.', job.id) | 20 | logger.info('Processing ActivityPub follow in job %s.', job.id) |
21 | 21 | ||
22 | let targetActor: MActorFull | 22 | let targetActor: MActorFull |
23 | if (!host || host === WEBSERVER.HOST) { | 23 | if (!host || host === WEBSERVER.HOST) { |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 354c608fb..13eff5211 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests' | |||
8 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | 8 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
11 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 11 | logger.info('Processing ActivityPub broadcast in job %s.', job.id) |
12 | 12 | ||
13 | const payload = job.data as ActivitypubHttpBroadcastPayload | 13 | const payload = job.data as ActivitypubHttpBroadcastPayload |
14 | 14 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index e0b841887..b6cb3c4a6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share' | |||
12 | import { addVideoComments } from '../../activitypub/video-comments' | 12 | import { addVideoComments } from '../../activitypub/video-comments' |
13 | 13 | ||
14 | async function processActivityPubHttpFetcher (job: Job) { | 14 | async function processActivityPubHttpFetcher (job: Job) { |
15 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 15 | logger.info('Processing ActivityPub fetcher in job %s.', job.id) |
16 | 16 | ||
17 | const payload = job.data as ActivitypubHttpFetcherPayload | 17 | const payload = job.data as ActivitypubHttpFetcherPayload |
18 | 18 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 837a597a5..9e4e84002 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests' | |||
6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | 6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
9 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 9 | logger.info('Processing ActivityPub unicast in job %s.', job.id) |
10 | 10 | ||
11 | const payload = job.data as ActivitypubHttpUnicastPayload | 11 | const payload = job.data as ActivitypubHttpUnicastPayload |
12 | const uri = payload.uri | 12 | const uri = payload.uri |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 600f858a0..307e771ff 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors' | |||
11 | async function refreshAPObject (job: Job) { | 11 | async function refreshAPObject (job: Job) { |
12 | const payload = job.data as RefreshPayload | 12 | const payload = job.data as RefreshPayload |
13 | 13 | ||
14 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) | 14 | logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url) |
15 | 15 | ||
16 | if (payload.type === 'video') return refreshVideo(payload.url) | 16 | if (payload.type === 'video') return refreshVideo(payload.url) |
17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) | 17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) |
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 4a5bad9fb..27a2d431b 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' | |||
6 | 6 | ||
7 | async function processActorKeys (job: Job) { | 7 | async function processActorKeys (job: Job) { |
8 | const payload = job.data as ActorKeysPayload | 8 | const payload = job.data as ActorKeysPayload |
9 | logger.info('Processing actor keys in job %d.', job.id) | 9 | logger.info('Processing actor keys in job %s.', job.id) |
10 | 10 | ||
11 | const actor = await ActorModel.load(payload.actorId) | 11 | const actor = await ActorModel.load(payload.actorId) |
12 | 12 | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index b5b9475b1..567bcc076 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer' | |||
5 | 5 | ||
6 | async function processEmail (job: Job) { | 6 | async function processEmail (job: Job) { |
7 | const payload = job.data as EmailPayload | 7 | const payload = job.data as EmailPayload |
8 | logger.info('Processing email in job %d.', job.id) | 8 | logger.info('Processing email in job %s.', job.id) |
9 | 9 | ||
10 | return Emailer.Instance.sendMail(payload) | 10 | return Emailer.Instance.sendMail(payload) |
11 | } | 11 | } |
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts new file mode 100644 index 000000000..6aac36741 --- /dev/null +++ b/server/lib/job-queue/handlers/federate-video.ts | |||
@@ -0,0 +1,28 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { sequelizeTypescript } from '@server/initializers/database' | ||
4 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { FederateVideoPayload } from '@shared/models' | ||
7 | import { logger } from '../../../helpers/logger' | ||
8 | |||
9 | function processFederateVideo (job: Job) { | ||
10 | const payload = job.data as FederateVideoPayload | ||
11 | |||
12 | logger.info('Processing video federation in job %s.', job.id) | ||
13 | |||
14 | return retryTransactionWrapper(() => { | ||
15 | return sequelizeTypescript.transaction(async t => { | ||
16 | const video = await VideoModel.loadFull(payload.videoUUID, t) | ||
17 | if (!video) return | ||
18 | |||
19 | return federateVideoIfNeeded(video, payload.isNewVideo, t) | ||
20 | }) | ||
21 | }) | ||
22 | } | ||
23 | |||
24 | // --------------------------------------------------------------------------- | ||
25 | |||
26 | export { | ||
27 | processFederateVideo | ||
28 | } | ||
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index 4505ca79e..03aa414c9 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts | |||
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger' | |||
8 | 8 | ||
9 | async function processManageVideoTorrent (job: Job) { | 9 | async function processManageVideoTorrent (job: Job) { |
10 | const payload = job.data as ManageVideoTorrentPayload | 10 | const payload = job.data as ManageVideoTorrentPayload |
11 | logger.info('Processing torrent in job %d.', job.id) | 11 | logger.info('Processing torrent in job %s.', job.id) |
12 | 12 | ||
13 | if (payload.action === 'create') return doCreateAction(payload) | 13 | if (payload.action === 'create') return doCreateAction(payload) |
14 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) | 14 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) |
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index d608fd865..25bdebeea 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage') | |||
17 | 17 | ||
18 | export async function processMoveToObjectStorage (job: Job) { | 18 | export async function processMoveToObjectStorage (job: Job) { |
19 | const payload = job.data as MoveObjectStoragePayload | 19 | const payload = job.data as MoveObjectStoragePayload |
20 | logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) | 20 | logger.info('Moving video %s in job %s.', payload.videoUUID, job.id) |
21 | 21 | ||
22 | const video = await VideoModel.loadWithFiles(payload.videoUUID) | 22 | const video = await VideoModel.loadWithFiles(payload.videoUUID) |
23 | // No video, maybe deleted? | 23 | // No video, maybe deleted? |
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) { | |||
43 | 43 | ||
44 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | 44 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') |
45 | if (pendingMove === 0) { | 45 | if (pendingMove === 0) { |
46 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) | 46 | logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags) |
47 | 47 | ||
48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) | 48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) |
49 | } | 49 | } |
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts new file mode 100644 index 000000000..83605396c --- /dev/null +++ b/server/lib/job-queue/handlers/notify.ts | |||
@@ -0,0 +1,27 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { Notifier } from '@server/lib/notifier' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { NotifyPayload } from '@shared/models' | ||
5 | import { logger } from '../../../helpers/logger' | ||
6 | |||
7 | async function processNotify (job: Job) { | ||
8 | const payload = job.data as NotifyPayload | ||
9 | logger.info('Processing %s notification in job %s.', payload.action, job.id) | ||
10 | |||
11 | if (payload.action === 'new-video') return doNotifyNewVideo(payload) | ||
12 | } | ||
13 | |||
14 | // --------------------------------------------------------------------------- | ||
15 | |||
16 | export { | ||
17 | processNotify | ||
18 | } | ||
19 | |||
20 | // --------------------------------------------------------------------------- | ||
21 | |||
22 | async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) { | ||
23 | const refreshedVideo = await VideoModel.loadFull(payload.videoUUID) | ||
24 | if (!refreshedVideo) return | ||
25 | |||
26 | Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | ||
27 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 40c44cf52..d950f6407 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
4 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
6 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 6 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
7 | import { addMoveToObjectStorageJob } from '@server/lib/video' | 7 | import { buildMoveToObjectStorageJob } from '@server/lib/video' |
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | 8 | import { VideoPathManager } from '@server/lib/video-path-manager' |
9 | import { VideoModel } from '@server/models/video/video' | 9 | import { VideoModel } from '@server/models/video/video' |
10 | import { VideoFileModel } from '@server/models/video/video-file' | 10 | import { VideoFileModel } from '@server/models/video/video-file' |
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils' | |||
13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' | 13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' |
14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' | 14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' |
15 | import { logger } from '../../../helpers/logger' | 15 | import { logger } from '../../../helpers/logger' |
16 | import { JobQueue } from '../job-queue' | ||
16 | 17 | ||
17 | async function processVideoFileImport (job: Job) { | 18 | async function processVideoFileImport (job: Job) { |
18 | const payload = job.data as VideoFileImportPayload | 19 | const payload = job.data as VideoFileImportPayload |
19 | logger.info('Processing video file import in job %d.', job.id) | 20 | logger.info('Processing video file import in job %s.', job.id) |
20 | 21 | ||
21 | const video = await VideoModel.loadFull(payload.videoUUID) | 22 | const video = await VideoModel.loadFull(payload.videoUUID) |
22 | // No video, maybe deleted? | 23 | // No video, maybe deleted? |
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) { | |||
28 | await updateVideoFile(video, payload.filePath) | 29 | await updateVideoFile(video, payload.filePath) |
29 | 30 | ||
30 | if (CONFIG.OBJECT_STORAGE.ENABLED) { | 31 | if (CONFIG.OBJECT_STORAGE.ENABLED) { |
31 | await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) | 32 | await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state })) |
32 | } else { | 33 | } else { |
33 | await federateVideoIfNeeded(video, false) | 34 | await federateVideoIfNeeded(video, false) |
34 | } | 35 | } |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index e5cd35865..f4629159c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
8 | import { Hooks } from '@server/lib/plugins/hooks' | 8 | import { Hooks } from '@server/lib/plugins/hooks' |
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | 9 | import { ServerConfigManager } from '@server/lib/server-config-manager' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video' |
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 12 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 13 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 14 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail' | |||
39 | import { federateVideoIfNeeded } from '../../activitypub/videos' | 39 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
40 | import { Notifier } from '../../notifier' | 40 | import { Notifier } from '../../notifier' |
41 | import { generateVideoMiniature } from '../../thumbnail' | 41 | import { generateVideoMiniature } from '../../thumbnail' |
42 | import { JobQueue } from '../job-queue' | ||
42 | 43 | ||
43 | async function processVideoImport (job: Job) { | 44 | async function processVideoImport (job: Job) { |
44 | const payload = job.data as VideoImportPayload | 45 | const payload = job.data as VideoImportPayload |
@@ -65,7 +66,7 @@ export { | |||
65 | // --------------------------------------------------------------------------- | 66 | // --------------------------------------------------------------------------- |
66 | 67 | ||
67 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { | 68 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { |
68 | logger.info('Processing torrent video import in job %d.', job.id) | 69 | logger.info('Processing torrent video import in job %s.', job.id) |
69 | 70 | ||
70 | const options = { type: payload.type, videoImportId: payload.videoImportId } | 71 | const options = { type: payload.type, videoImportId: payload.videoImportId } |
71 | 72 | ||
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, | |||
77 | } | 78 | } |
78 | 79 | ||
79 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { | 80 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { |
80 | logger.info('Processing youtubeDL video import in job %d.', job.id) | 81 | logger.info('Processing youtubeDL video import in job %s.', job.id) |
81 | 82 | ||
82 | const options = { type: payload.type, videoImportId: videoImport.id } | 83 | const options = { type: payload.type, videoImportId: videoImport.id } |
83 | 84 | ||
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
259 | } | 260 | } |
260 | 261 | ||
261 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | 262 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { |
262 | return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | 263 | await JobQueue.Instance.createJob( |
264 | await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | ||
265 | ) | ||
263 | } | 266 | } |
264 | 267 | ||
265 | // Create transcoding jobs? | 268 | // Create transcoding jobs? |
266 | if (video.state === VideoState.TO_TRANSCODE) { | 269 | if (video.state === VideoState.TO_TRANSCODE) { |
267 | await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | 270 | await JobQueue.Instance.createJob( |
271 | await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | ||
272 | ) | ||
268 | } | 273 | } |
269 | 274 | ||
270 | } catch (err) { | 275 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 75ab2cd02..bac99fdb7 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger' | |||
5 | 5 | ||
6 | async function processVideoRedundancy (job: Job) { | 6 | async function processVideoRedundancy (job: Job) { |
7 | const payload = job.data as VideoRedundancyPayload | 7 | const payload = job.data as VideoRedundancyPayload |
8 | logger.info('Processing video redundancy in job %d.', job.id) | 8 | logger.info('Processing video redundancy in job %s.', job.id) |
9 | 9 | ||
10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | 10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) |
11 | } | 11 | } |
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 078243538..23f9a34cc 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts | |||
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | |||
8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | 9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' |
12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' | 12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' |
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | 13 | import { VideoPathManager } from '@server/lib/video-path-manager' |
14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' | 14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' |
@@ -36,6 +36,7 @@ import { | |||
36 | VideoStudioTaskWatermarkPayload | 36 | VideoStudioTaskWatermarkPayload |
37 | } from '@shared/models' | 37 | } from '@shared/models' |
38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
39 | import { JobQueue } from '../job-queue' | ||
39 | 40 | ||
40 | const lTagsBase = loggerTagsFactory('video-edition') | 41 | const lTagsBase = loggerTagsFactory('video-edition') |
41 | 42 | ||
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) { | |||
43 | const payload = job.data as VideoStudioEditionPayload | 44 | const payload = job.data as VideoStudioEditionPayload |
44 | const lTags = lTagsBase(payload.videoUUID) | 45 | const lTags = lTagsBase(payload.videoUUID) |
45 | 46 | ||
46 | logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) | 47 | logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags) |
47 | 48 | ||
48 | const video = await VideoModel.loadFull(payload.videoUUID) | 49 | const video = await VideoModel.loadFull(payload.videoUUID) |
49 | 50 | ||
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) { | |||
100 | await federateVideoIfNeeded(video, false, undefined) | 101 | await federateVideoIfNeeded(video, false, undefined) |
101 | 102 | ||
102 | const user = await UserModel.loadByVideoId(video.id) | 103 | const user = await UserModel.loadByVideoId(video.id) |
103 | await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | 104 | |
105 | await JobQueue.Instance.createJob( | ||
106 | await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | ||
107 | ) | ||
104 | } | 108 | } |
105 | 109 | ||
106 | // --------------------------------------------------------------------------- | 110 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0cf5d53ce..50d732beb 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,6 @@ | |||
1 | import { | 1 | import { |
2 | FlowJob, | ||
3 | FlowProducer, | ||
2 | Job, | 4 | Job, |
3 | JobsOptions, | 5 | JobsOptions, |
4 | Queue, | 6 | Queue, |
@@ -13,7 +15,7 @@ import { | |||
13 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 15 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
14 | import { CONFIG } from '@server/initializers/config' | 16 | import { CONFIG } from '@server/initializers/config' |
15 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
16 | import { timeoutPromise } from '@shared/core-utils' | 18 | import { pick, timeoutPromise } from '@shared/core-utils' |
17 | import { | 19 | import { |
18 | ActivitypubFollowPayload, | 20 | ActivitypubFollowPayload, |
19 | ActivitypubHttpBroadcastPayload, | 21 | ActivitypubHttpBroadcastPayload, |
@@ -22,10 +24,12 @@ import { | |||
22 | ActorKeysPayload, | 24 | ActorKeysPayload, |
23 | DeleteResumableUploadMetaFilePayload, | 25 | DeleteResumableUploadMetaFilePayload, |
24 | EmailPayload, | 26 | EmailPayload, |
27 | FederateVideoPayload, | ||
25 | JobState, | 28 | JobState, |
26 | JobType, | 29 | JobType, |
27 | ManageVideoTorrentPayload, | 30 | ManageVideoTorrentPayload, |
28 | MoveObjectStoragePayload, | 31 | MoveObjectStoragePayload, |
32 | NotifyPayload, | ||
29 | RefreshPayload, | 33 | RefreshPayload, |
30 | VideoFileImportPayload, | 34 | VideoFileImportPayload, |
31 | VideoImportPayload, | 35 | VideoImportPayload, |
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica | |||
45 | import { refreshAPObject } from './handlers/activitypub-refresher' | 49 | import { refreshAPObject } from './handlers/activitypub-refresher' |
46 | import { processActorKeys } from './handlers/actor-keys' | 50 | import { processActorKeys } from './handlers/actor-keys' |
47 | import { processEmail } from './handlers/email' | 51 | import { processEmail } from './handlers/email' |
52 | import { processFederateVideo } from './handlers/federate-video' | ||
48 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 53 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
49 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 54 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
55 | import { processNotify } from './handlers/notify' | ||
50 | import { processVideoFileImport } from './handlers/video-file-import' | 56 | import { processVideoFileImport } from './handlers/video-file-import' |
51 | import { processVideoImport } from './handlers/video-import' | 57 | import { processVideoImport } from './handlers/video-import' |
52 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 58 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition' | |||
54 | import { processVideoTranscoding } from './handlers/video-transcoding' | 60 | import { processVideoTranscoding } from './handlers/video-transcoding' |
55 | import { processVideosViewsStats } from './handlers/video-views-stats' | 61 | import { processVideosViewsStats } from './handlers/video-views-stats' |
56 | 62 | ||
57 | type CreateJobArgument = | 63 | export type CreateJobArgument = |
58 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 64 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
59 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | 65 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
60 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 66 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
@@ -73,7 +79,9 @@ type CreateJobArgument = | |||
73 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | 79 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
74 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | 80 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
75 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | 81 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
76 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | 82 | { type: 'notify', payload: NotifyPayload } | |
83 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
84 | { type: 'federate-video', payload: FederateVideoPayload } | ||
77 | 85 | ||
78 | export type CreateJobOptions = { | 86 | export type CreateJobOptions = { |
79 | delay?: number | 87 | delay?: number |
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
98 | 'video-redundancy': processVideoRedundancy, | 106 | 'video-redundancy': processVideoRedundancy, |
99 | 'move-to-object-storage': processMoveToObjectStorage, | 107 | 'move-to-object-storage': processMoveToObjectStorage, |
100 | 'manage-video-torrent': processManageVideoTorrent, | 108 | 'manage-video-torrent': processManageVideoTorrent, |
101 | 'video-studio-edition': processVideoStudioEdition | 109 | 'notify': processNotify, |
110 | 'video-studio-edition': processVideoStudioEdition, | ||
111 | 'federate-video': processFederateVideo | ||
102 | } | 112 | } |
103 | 113 | ||
104 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | 114 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [ | |||
123 | 'video-live-ending', | 133 | 'video-live-ending', |
124 | 'move-to-object-storage', | 134 | 'move-to-object-storage', |
125 | 'manage-video-torrent', | 135 | 'manage-video-torrent', |
126 | 'video-studio-edition' | 136 | 'video-studio-edition', |
137 | 'notify', | ||
138 | 'federate-video' | ||
127 | ] | 139 | ] |
128 | 140 | ||
129 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | 141 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
@@ -137,6 +149,8 @@ class JobQueue { | |||
137 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | 149 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} |
138 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | 150 | private queueEvents: { [id in JobType]?: QueueEvents } = {} |
139 | 151 | ||
152 | private flowProducer: FlowProducer | ||
153 | |||
140 | private initialized = false | 154 | private initialized = false |
141 | private jobRedisPrefix: string | 155 | private jobRedisPrefix: string |
142 | 156 | ||
@@ -157,6 +171,11 @@ class JobQueue { | |||
157 | this.buildQueueEvent(handlerName, produceOnly) | 171 | this.buildQueueEvent(handlerName, produceOnly) |
158 | } | 172 | } |
159 | 173 | ||
174 | this.flowProducer = new FlowProducer({ | ||
175 | connection: this.getRedisConnection(), | ||
176 | prefix: this.jobRedisPrefix | ||
177 | }) | ||
178 | |||
160 | this.addRepeatableJobs() | 179 | this.addRepeatableJobs() |
161 | } | 180 | } |
162 | 181 | ||
@@ -243,6 +262,8 @@ class JobQueue { | |||
243 | } | 262 | } |
244 | } | 263 | } |
245 | 264 | ||
265 | // --------------------------------------------------------------------------- | ||
266 | |||
246 | async terminate () { | 267 | async terminate () { |
247 | const promises = Object.keys(this.workers) | 268 | const promises = Object.keys(this.workers) |
248 | .map(handlerName => { | 269 | .map(handlerName => { |
@@ -278,28 +299,56 @@ class JobQueue { | |||
278 | } | 299 | } |
279 | } | 300 | } |
280 | 301 | ||
281 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { | 302 | // --------------------------------------------------------------------------- |
282 | this.createJobWithPromise(obj, options) | 303 | |
283 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 304 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { |
305 | this.createJob(options) | ||
306 | .catch(err => logger.error('Cannot create job.', { err, options })) | ||
284 | } | 307 | } |
285 | 308 | ||
286 | async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | 309 | async createJob (options: CreateJobArgument & CreateJobOptions) { |
287 | const queue: Queue = this.queues[obj.type] | 310 | const queue: Queue = this.queues[options.type] |
288 | if (queue === undefined) { | 311 | if (queue === undefined) { |
289 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 312 | logger.error('Unknown queue %s: cannot create job.', options.type) |
290 | return | 313 | return |
291 | } | 314 | } |
292 | 315 | ||
293 | const jobArgs: JobsOptions = { | 316 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) |
317 | |||
318 | return queue.add('job', options.payload, jobOptions) | ||
319 | } | ||
320 | |||
321 | async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | ||
322 | let lastJob: FlowJob | ||
323 | |||
324 | for (const job of jobs) { | ||
325 | if (!job) continue | ||
326 | |||
327 | lastJob = { | ||
328 | name: 'job', | ||
329 | data: job.payload, | ||
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | ||
333 | ? [ lastJob ] | ||
334 | : [] | ||
335 | } | ||
336 | } | ||
337 | |||
338 | return this.flowProducer.add(lastJob) | ||
339 | } | ||
340 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | ||
342 | return { | ||
294 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 343 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
295 | attempts: JOB_ATTEMPTS[obj.type], | 344 | attempts: JOB_ATTEMPTS[type], |
296 | priority: options.priority, | 345 | priority: options.priority, |
297 | delay: options.delay | 346 | delay: options.delay |
298 | } | 347 | } |
299 | |||
300 | return queue.add('job', obj.payload, jobArgs) | ||
301 | } | 348 | } |
302 | 349 | ||
350 | // --------------------------------------------------------------------------- | ||
351 | |||
303 | async listForApi (options: { | 352 | async listForApi (options: { |
304 | state?: JobState | 353 | state?: JobState |
305 | start: number | 354 | start: number |
@@ -367,6 +416,8 @@ class JobQueue { | |||
367 | return Promise.all(promises) | 416 | return Promise.all(promises) |
368 | } | 417 | } |
369 | 418 | ||
419 | // --------------------------------------------------------------------------- | ||
420 | |||
370 | async removeOldJobs () { | 421 | async removeOldJobs () { |
371 | for (const key of Object.keys(this.queues)) { | 422 | for (const key of Object.keys(this.queues)) { |
372 | const queue: Queue = this.queues[key] | 423 | const queue: Queue = this.queues[key] |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1410889a2..aadd8e308 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -408,7 +408,7 @@ class LiveManager { | |||
408 | await liveSession.save() | 408 | await liveSession.save() |
409 | } | 409 | } |
410 | 410 | ||
411 | JobQueue.Instance.createJob({ | 411 | JobQueue.Instance.createJobAsync({ |
412 | type: 'video-live-ending', | 412 | type: 'video-live-ending', |
413 | payload: { | 413 | payload: { |
414 | videoId: fullVideo.id, | 414 | videoId: fullVideo.id, |
@@ -421,8 +421,12 @@ class LiveManager { | |||
421 | streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, | 421 | streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, |
422 | 422 | ||
423 | publishedAt: fullVideo.publishedAt.toISOString() | 423 | publishedAt: fullVideo.publishedAt.toISOString() |
424 | } | 424 | }, |
425 | }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) | 425 | |
426 | delay: cleanupNow | ||
427 | ? 0 | ||
428 | : VIDEO_LIVE.CLEANUP_DELAY | ||
429 | }) | ||
426 | 430 | ||
427 | fullVideo.state = live.permanentLive | 431 | fullVideo.state = live.permanentLive |
428 | ? VideoState.WAITING_FOR_LIVE | 432 | ? VideoState.WAITING_FOR_LIVE |
diff --git a/server/lib/notifier/notifier.ts b/server/lib/notifier/notifier.ts index d1c4c0215..66cfc31c4 100644 --- a/server/lib/notifier/notifier.ts +++ b/server/lib/notifier/notifier.ts | |||
@@ -242,7 +242,7 @@ class Notifier { | |||
242 | 242 | ||
243 | for (const to of toEmails) { | 243 | for (const to of toEmails) { |
244 | const payload = await object.createEmail(to) | 244 | const payload = await object.createEmail(to) |
245 | JobQueue.Instance.createJob({ type: 'email', payload }) | 245 | JobQueue.Instance.createJobAsync({ type: 'email', payload }) |
246 | } | 246 | } |
247 | } | 247 | } |
248 | 248 | ||
diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts index d9f9c2de3..956ece749 100644 --- a/server/lib/schedulers/auto-follow-index-instances.ts +++ b/server/lib/schedulers/auto-follow-index-instances.ts | |||
@@ -59,7 +59,7 @@ export class AutoFollowIndexInstances extends AbstractScheduler { | |||
59 | isAutoFollow: true | 59 | isAutoFollow: true |
60 | } | 60 | } |
61 | 61 | ||
62 | JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) | 62 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) |
63 | } | 63 | } |
64 | } | 64 | } |
65 | 65 | ||
diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts index b5d8353b7..9ebbd7679 100644 --- a/server/lib/video-state.ts +++ b/server/lib/video-state.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
2 | import { logger } from '@server/helpers/logger' | 3 | import { logger } from '@server/helpers/logger' |
3 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
4 | import { sequelizeTypescript } from '@server/initializers/database' | 5 | import { sequelizeTypescript } from '@server/initializers/database' |
@@ -7,9 +8,9 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
7 | import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' | 8 | import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' |
8 | import { VideoState } from '@shared/models' | 9 | import { VideoState } from '@shared/models' |
9 | import { federateVideoIfNeeded } from './activitypub/videos' | 10 | import { federateVideoIfNeeded } from './activitypub/videos' |
11 | import { JobQueue } from './job-queue' | ||
10 | import { Notifier } from './notifier' | 12 | import { Notifier } from './notifier' |
11 | import { addMoveToObjectStorageJob } from './video' | 13 | import { buildMoveToObjectStorageJob } from './video' |
12 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
13 | 14 | ||
14 | function buildNextVideoState (currentState?: VideoState) { | 15 | function buildNextVideoState (currentState?: VideoState) { |
15 | if (currentState === VideoState.PUBLISHED) { | 16 | if (currentState === VideoState.PUBLISHED) { |
@@ -86,7 +87,7 @@ async function moveToExternalStorageState (options: { | |||
86 | logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) | 87 | logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) |
87 | 88 | ||
88 | try { | 89 | try { |
89 | await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }) | 90 | await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo })) |
90 | 91 | ||
91 | return true | 92 | return true |
92 | } catch (err) { | 93 | } catch (err) { |
diff --git a/server/lib/video.ts b/server/lib/video.ts index b843b11bc..f7d7aa186 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts | |||
@@ -1,5 +1,7 @@ | |||
1 | import { UploadFiles } from 'express' | 1 | import { UploadFiles } from 'express' |
2 | import memoizee from 'memoizee' | ||
2 | import { Transaction } from 'sequelize/types' | 3 | import { Transaction } from 'sequelize/types' |
4 | import { CONFIG } from '@server/initializers/config' | ||
3 | import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' | 5 | import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' |
4 | import { TagModel } from '@server/models/video/tag' | 6 | import { TagModel } from '@server/models/video/tag' |
5 | import { VideoModel } from '@server/models/video/video' | 7 | import { VideoModel } from '@server/models/video/video' |
@@ -9,8 +11,6 @@ import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID | |||
9 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' | 11 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' |
10 | import { CreateJobOptions, JobQueue } from './job-queue/job-queue' | 12 | import { CreateJobOptions, JobQueue } from './job-queue/job-queue' |
11 | import { updateVideoMiniatureFromExisting } from './thumbnail' | 13 | import { updateVideoMiniatureFromExisting } from './thumbnail' |
12 | import { CONFIG } from '@server/initializers/config' | ||
13 | import memoizee from 'memoizee' | ||
14 | 14 | ||
15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { | 15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { |
16 | return { | 16 | return { |
@@ -86,7 +86,7 @@ async function setVideoTags (options: { | |||
86 | 86 | ||
87 | // --------------------------------------------------------------------------- | 87 | // --------------------------------------------------------------------------- |
88 | 88 | ||
89 | async function addOptimizeOrMergeAudioJob (options: { | 89 | async function buildOptimizeOrMergeAudioJob (options: { |
90 | video: MVideoUUID | 90 | video: MVideoUUID |
91 | videoFile: MVideoFile | 91 | videoFile: MVideoFile |
92 | user: MUserId | 92 | user: MUserId |
@@ -94,10 +94,10 @@ async function addOptimizeOrMergeAudioJob (options: { | |||
94 | }) { | 94 | }) { |
95 | const { video, videoFile, user, isNewVideo } = options | 95 | const { video, videoFile, user, isNewVideo } = options |
96 | 96 | ||
97 | let dataInput: VideoTranscodingPayload | 97 | let payload: VideoTranscodingPayload |
98 | 98 | ||
99 | if (videoFile.isAudio()) { | 99 | if (videoFile.isAudio()) { |
100 | dataInput = { | 100 | payload = { |
101 | type: 'merge-audio-to-webtorrent', | 101 | type: 'merge-audio-to-webtorrent', |
102 | resolution: DEFAULT_AUDIO_RESOLUTION, | 102 | resolution: DEFAULT_AUDIO_RESOLUTION, |
103 | videoUUID: video.uuid, | 103 | videoUUID: video.uuid, |
@@ -105,24 +105,26 @@ async function addOptimizeOrMergeAudioJob (options: { | |||
105 | isNewVideo | 105 | isNewVideo |
106 | } | 106 | } |
107 | } else { | 107 | } else { |
108 | dataInput = { | 108 | payload = { |
109 | type: 'optimize-to-webtorrent', | 109 | type: 'optimize-to-webtorrent', |
110 | videoUUID: video.uuid, | 110 | videoUUID: video.uuid, |
111 | isNewVideo | 111 | isNewVideo |
112 | } | 112 | } |
113 | } | 113 | } |
114 | 114 | ||
115 | const jobOptions = { | 115 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') |
116 | priority: await getTranscodingJobPriority(user) | ||
117 | } | ||
118 | 116 | ||
119 | return addTranscodingJob(dataInput, jobOptions) | 117 | return { |
118 | type: 'video-transcoding' as 'video-transcoding', | ||
119 | priority: await getTranscodingJobPriority(user), | ||
120 | payload | ||
121 | } | ||
120 | } | 122 | } |
121 | 123 | ||
122 | async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { | 124 | async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { |
123 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | 125 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') |
124 | 126 | ||
125 | return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options) | 127 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) |
126 | } | 128 | } |
127 | 129 | ||
128 | async function getTranscodingJobPriority (user: MUserId) { | 130 | async function getTranscodingJobPriority (user: MUserId) { |
@@ -136,7 +138,7 @@ async function getTranscodingJobPriority (user: MUserId) { | |||
136 | 138 | ||
137 | // --------------------------------------------------------------------------- | 139 | // --------------------------------------------------------------------------- |
138 | 140 | ||
139 | async function addMoveToObjectStorageJob (options: { | 141 | async function buildMoveToObjectStorageJob (options: { |
140 | video: MVideoUUID | 142 | video: MVideoUUID |
141 | previousVideoState: VideoState | 143 | previousVideoState: VideoState |
142 | isNewVideo?: boolean // Default true | 144 | isNewVideo?: boolean // Default true |
@@ -145,8 +147,14 @@ async function addMoveToObjectStorageJob (options: { | |||
145 | 147 | ||
146 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') | 148 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') |
147 | 149 | ||
148 | const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState } | 150 | return { |
149 | return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput }) | 151 | type: 'move-to-object-storage' as 'move-to-object-storage', |
152 | payload: { | ||
153 | videoUUID: video.uuid, | ||
154 | isNewVideo, | ||
155 | previousVideoState | ||
156 | } | ||
157 | } | ||
150 | } | 158 | } |
151 | 159 | ||
152 | // --------------------------------------------------------------------------- | 160 | // --------------------------------------------------------------------------- |
@@ -173,9 +181,9 @@ export { | |||
173 | buildLocalVideoFromReq, | 181 | buildLocalVideoFromReq, |
174 | buildVideoThumbnailsFromReq, | 182 | buildVideoThumbnailsFromReq, |
175 | setVideoTags, | 183 | setVideoTags, |
176 | addOptimizeOrMergeAudioJob, | 184 | buildOptimizeOrMergeAudioJob, |
177 | addTranscodingJob, | 185 | addTranscodingJob, |
178 | addMoveToObjectStorageJob, | 186 | buildMoveToObjectStorageJob, |
179 | getTranscodingJobPriority, | 187 | getTranscodingJobPriority, |
180 | getCachedVideoDuration | 188 | getCachedVideoDuration |
181 | } | 189 | } |