diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-08 15:48:17 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:18:07 +0200 |
commit | bd911b54b555b11df7e9849cf92d358bccfecf6e (patch) | |
tree | 23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server/lib/job-queue | |
parent | 5a921e7b74910414626bfc9672b857e987e3ebed (diff) | |
download | PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip |
Use bullmq job dependency
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-follow.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-refresher.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/actor-keys.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/email.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/federate-video.ts | 28 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/manage-video-torrent.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/move-to-object-storage.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/notify.ts | 27 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file-import.ts | 7 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 15 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-redundancy.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-studio-edition.ts | 10 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 81 |
16 files changed, 153 insertions, 37 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 944da5be1..a68c32ba0 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) { | |||
17 | const payload = job.data as ActivitypubFollowPayload | 17 | const payload = job.data as ActivitypubFollowPayload |
18 | const host = payload.host | 18 | const host = payload.host |
19 | 19 | ||
20 | logger.info('Processing ActivityPub follow in job %d.', job.id) | 20 | logger.info('Processing ActivityPub follow in job %s.', job.id) |
21 | 21 | ||
22 | let targetActor: MActorFull | 22 | let targetActor: MActorFull |
23 | if (!host || host === WEBSERVER.HOST) { | 23 | if (!host || host === WEBSERVER.HOST) { |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 354c608fb..13eff5211 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests' | |||
8 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | 8 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
11 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 11 | logger.info('Processing ActivityPub broadcast in job %s.', job.id) |
12 | 12 | ||
13 | const payload = job.data as ActivitypubHttpBroadcastPayload | 13 | const payload = job.data as ActivitypubHttpBroadcastPayload |
14 | 14 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index e0b841887..b6cb3c4a6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share' | |||
12 | import { addVideoComments } from '../../activitypub/video-comments' | 12 | import { addVideoComments } from '../../activitypub/video-comments' |
13 | 13 | ||
14 | async function processActivityPubHttpFetcher (job: Job) { | 14 | async function processActivityPubHttpFetcher (job: Job) { |
15 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 15 | logger.info('Processing ActivityPub fetcher in job %s.', job.id) |
16 | 16 | ||
17 | const payload = job.data as ActivitypubHttpFetcherPayload | 17 | const payload = job.data as ActivitypubHttpFetcherPayload |
18 | 18 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 837a597a5..9e4e84002 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests' | |||
6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | 6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
9 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 9 | logger.info('Processing ActivityPub unicast in job %s.', job.id) |
10 | 10 | ||
11 | const payload = job.data as ActivitypubHttpUnicastPayload | 11 | const payload = job.data as ActivitypubHttpUnicastPayload |
12 | const uri = payload.uri | 12 | const uri = payload.uri |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 600f858a0..307e771ff 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors' | |||
11 | async function refreshAPObject (job: Job) { | 11 | async function refreshAPObject (job: Job) { |
12 | const payload = job.data as RefreshPayload | 12 | const payload = job.data as RefreshPayload |
13 | 13 | ||
14 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) | 14 | logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url) |
15 | 15 | ||
16 | if (payload.type === 'video') return refreshVideo(payload.url) | 16 | if (payload.type === 'video') return refreshVideo(payload.url) |
17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) | 17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) |
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 4a5bad9fb..27a2d431b 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' | |||
6 | 6 | ||
7 | async function processActorKeys (job: Job) { | 7 | async function processActorKeys (job: Job) { |
8 | const payload = job.data as ActorKeysPayload | 8 | const payload = job.data as ActorKeysPayload |
9 | logger.info('Processing actor keys in job %d.', job.id) | 9 | logger.info('Processing actor keys in job %s.', job.id) |
10 | 10 | ||
11 | const actor = await ActorModel.load(payload.actorId) | 11 | const actor = await ActorModel.load(payload.actorId) |
12 | 12 | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index b5b9475b1..567bcc076 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer' | |||
5 | 5 | ||
6 | async function processEmail (job: Job) { | 6 | async function processEmail (job: Job) { |
7 | const payload = job.data as EmailPayload | 7 | const payload = job.data as EmailPayload |
8 | logger.info('Processing email in job %d.', job.id) | 8 | logger.info('Processing email in job %s.', job.id) |
9 | 9 | ||
10 | return Emailer.Instance.sendMail(payload) | 10 | return Emailer.Instance.sendMail(payload) |
11 | } | 11 | } |
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts new file mode 100644 index 000000000..6aac36741 --- /dev/null +++ b/server/lib/job-queue/handlers/federate-video.ts | |||
@@ -0,0 +1,28 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { sequelizeTypescript } from '@server/initializers/database' | ||
4 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { FederateVideoPayload } from '@shared/models' | ||
7 | import { logger } from '../../../helpers/logger' | ||
8 | |||
9 | function processFederateVideo (job: Job) { | ||
10 | const payload = job.data as FederateVideoPayload | ||
11 | |||
12 | logger.info('Processing video federation in job %s.', job.id) | ||
13 | |||
14 | return retryTransactionWrapper(() => { | ||
15 | return sequelizeTypescript.transaction(async t => { | ||
16 | const video = await VideoModel.loadFull(payload.videoUUID, t) | ||
17 | if (!video) return | ||
18 | |||
19 | return federateVideoIfNeeded(video, payload.isNewVideo, t) | ||
20 | }) | ||
21 | }) | ||
22 | } | ||
23 | |||
24 | // --------------------------------------------------------------------------- | ||
25 | |||
26 | export { | ||
27 | processFederateVideo | ||
28 | } | ||
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index 4505ca79e..03aa414c9 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts | |||
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger' | |||
8 | 8 | ||
9 | async function processManageVideoTorrent (job: Job) { | 9 | async function processManageVideoTorrent (job: Job) { |
10 | const payload = job.data as ManageVideoTorrentPayload | 10 | const payload = job.data as ManageVideoTorrentPayload |
11 | logger.info('Processing torrent in job %d.', job.id) | 11 | logger.info('Processing torrent in job %s.', job.id) |
12 | 12 | ||
13 | if (payload.action === 'create') return doCreateAction(payload) | 13 | if (payload.action === 'create') return doCreateAction(payload) |
14 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) | 14 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) |
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index d608fd865..25bdebeea 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage') | |||
17 | 17 | ||
18 | export async function processMoveToObjectStorage (job: Job) { | 18 | export async function processMoveToObjectStorage (job: Job) { |
19 | const payload = job.data as MoveObjectStoragePayload | 19 | const payload = job.data as MoveObjectStoragePayload |
20 | logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) | 20 | logger.info('Moving video %s in job %s.', payload.videoUUID, job.id) |
21 | 21 | ||
22 | const video = await VideoModel.loadWithFiles(payload.videoUUID) | 22 | const video = await VideoModel.loadWithFiles(payload.videoUUID) |
23 | // No video, maybe deleted? | 23 | // No video, maybe deleted? |
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) { | |||
43 | 43 | ||
44 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | 44 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') |
45 | if (pendingMove === 0) { | 45 | if (pendingMove === 0) { |
46 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) | 46 | logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags) |
47 | 47 | ||
48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) | 48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) |
49 | } | 49 | } |
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts new file mode 100644 index 000000000..83605396c --- /dev/null +++ b/server/lib/job-queue/handlers/notify.ts | |||
@@ -0,0 +1,27 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { Notifier } from '@server/lib/notifier' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { NotifyPayload } from '@shared/models' | ||
5 | import { logger } from '../../../helpers/logger' | ||
6 | |||
7 | async function processNotify (job: Job) { | ||
8 | const payload = job.data as NotifyPayload | ||
9 | logger.info('Processing %s notification in job %s.', payload.action, job.id) | ||
10 | |||
11 | if (payload.action === 'new-video') return doNotifyNewVideo(payload) | ||
12 | } | ||
13 | |||
14 | // --------------------------------------------------------------------------- | ||
15 | |||
16 | export { | ||
17 | processNotify | ||
18 | } | ||
19 | |||
20 | // --------------------------------------------------------------------------- | ||
21 | |||
22 | async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) { | ||
23 | const refreshedVideo = await VideoModel.loadFull(payload.videoUUID) | ||
24 | if (!refreshedVideo) return | ||
25 | |||
26 | Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | ||
27 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 40c44cf52..d950f6407 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
4 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
6 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 6 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
7 | import { addMoveToObjectStorageJob } from '@server/lib/video' | 7 | import { buildMoveToObjectStorageJob } from '@server/lib/video' |
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | 8 | import { VideoPathManager } from '@server/lib/video-path-manager' |
9 | import { VideoModel } from '@server/models/video/video' | 9 | import { VideoModel } from '@server/models/video/video' |
10 | import { VideoFileModel } from '@server/models/video/video-file' | 10 | import { VideoFileModel } from '@server/models/video/video-file' |
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils' | |||
13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' | 13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' |
14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' | 14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' |
15 | import { logger } from '../../../helpers/logger' | 15 | import { logger } from '../../../helpers/logger' |
16 | import { JobQueue } from '../job-queue' | ||
16 | 17 | ||
17 | async function processVideoFileImport (job: Job) { | 18 | async function processVideoFileImport (job: Job) { |
18 | const payload = job.data as VideoFileImportPayload | 19 | const payload = job.data as VideoFileImportPayload |
19 | logger.info('Processing video file import in job %d.', job.id) | 20 | logger.info('Processing video file import in job %s.', job.id) |
20 | 21 | ||
21 | const video = await VideoModel.loadFull(payload.videoUUID) | 22 | const video = await VideoModel.loadFull(payload.videoUUID) |
22 | // No video, maybe deleted? | 23 | // No video, maybe deleted? |
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) { | |||
28 | await updateVideoFile(video, payload.filePath) | 29 | await updateVideoFile(video, payload.filePath) |
29 | 30 | ||
30 | if (CONFIG.OBJECT_STORAGE.ENABLED) { | 31 | if (CONFIG.OBJECT_STORAGE.ENABLED) { |
31 | await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) | 32 | await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state })) |
32 | } else { | 33 | } else { |
33 | await federateVideoIfNeeded(video, false) | 34 | await federateVideoIfNeeded(video, false) |
34 | } | 35 | } |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index e5cd35865..f4629159c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
8 | import { Hooks } from '@server/lib/plugins/hooks' | 8 | import { Hooks } from '@server/lib/plugins/hooks' |
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | 9 | import { ServerConfigManager } from '@server/lib/server-config-manager' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video' |
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 12 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 13 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 14 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail' | |||
39 | import { federateVideoIfNeeded } from '../../activitypub/videos' | 39 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
40 | import { Notifier } from '../../notifier' | 40 | import { Notifier } from '../../notifier' |
41 | import { generateVideoMiniature } from '../../thumbnail' | 41 | import { generateVideoMiniature } from '../../thumbnail' |
42 | import { JobQueue } from '../job-queue' | ||
42 | 43 | ||
43 | async function processVideoImport (job: Job) { | 44 | async function processVideoImport (job: Job) { |
44 | const payload = job.data as VideoImportPayload | 45 | const payload = job.data as VideoImportPayload |
@@ -65,7 +66,7 @@ export { | |||
65 | // --------------------------------------------------------------------------- | 66 | // --------------------------------------------------------------------------- |
66 | 67 | ||
67 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { | 68 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { |
68 | logger.info('Processing torrent video import in job %d.', job.id) | 69 | logger.info('Processing torrent video import in job %s.', job.id) |
69 | 70 | ||
70 | const options = { type: payload.type, videoImportId: payload.videoImportId } | 71 | const options = { type: payload.type, videoImportId: payload.videoImportId } |
71 | 72 | ||
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, | |||
77 | } | 78 | } |
78 | 79 | ||
79 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { | 80 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { |
80 | logger.info('Processing youtubeDL video import in job %d.', job.id) | 81 | logger.info('Processing youtubeDL video import in job %s.', job.id) |
81 | 82 | ||
82 | const options = { type: payload.type, videoImportId: videoImport.id } | 83 | const options = { type: payload.type, videoImportId: videoImport.id } |
83 | 84 | ||
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
259 | } | 260 | } |
260 | 261 | ||
261 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | 262 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { |
262 | return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | 263 | await JobQueue.Instance.createJob( |
264 | await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | ||
265 | ) | ||
263 | } | 266 | } |
264 | 267 | ||
265 | // Create transcoding jobs? | 268 | // Create transcoding jobs? |
266 | if (video.state === VideoState.TO_TRANSCODE) { | 269 | if (video.state === VideoState.TO_TRANSCODE) { |
267 | await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | 270 | await JobQueue.Instance.createJob( |
271 | await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | ||
272 | ) | ||
268 | } | 273 | } |
269 | 274 | ||
270 | } catch (err) { | 275 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 75ab2cd02..bac99fdb7 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger' | |||
5 | 5 | ||
6 | async function processVideoRedundancy (job: Job) { | 6 | async function processVideoRedundancy (job: Job) { |
7 | const payload = job.data as VideoRedundancyPayload | 7 | const payload = job.data as VideoRedundancyPayload |
8 | logger.info('Processing video redundancy in job %d.', job.id) | 8 | logger.info('Processing video redundancy in job %s.', job.id) |
9 | 9 | ||
10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | 10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) |
11 | } | 11 | } |
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 078243538..23f9a34cc 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts | |||
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | |||
8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | 9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' |
12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' | 12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' |
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | 13 | import { VideoPathManager } from '@server/lib/video-path-manager' |
14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' | 14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' |
@@ -36,6 +36,7 @@ import { | |||
36 | VideoStudioTaskWatermarkPayload | 36 | VideoStudioTaskWatermarkPayload |
37 | } from '@shared/models' | 37 | } from '@shared/models' |
38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
39 | import { JobQueue } from '../job-queue' | ||
39 | 40 | ||
40 | const lTagsBase = loggerTagsFactory('video-edition') | 41 | const lTagsBase = loggerTagsFactory('video-edition') |
41 | 42 | ||
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) { | |||
43 | const payload = job.data as VideoStudioEditionPayload | 44 | const payload = job.data as VideoStudioEditionPayload |
44 | const lTags = lTagsBase(payload.videoUUID) | 45 | const lTags = lTagsBase(payload.videoUUID) |
45 | 46 | ||
46 | logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) | 47 | logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags) |
47 | 48 | ||
48 | const video = await VideoModel.loadFull(payload.videoUUID) | 49 | const video = await VideoModel.loadFull(payload.videoUUID) |
49 | 50 | ||
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) { | |||
100 | await federateVideoIfNeeded(video, false, undefined) | 101 | await federateVideoIfNeeded(video, false, undefined) |
101 | 102 | ||
102 | const user = await UserModel.loadByVideoId(video.id) | 103 | const user = await UserModel.loadByVideoId(video.id) |
103 | await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | 104 | |
105 | await JobQueue.Instance.createJob( | ||
106 | await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | ||
107 | ) | ||
104 | } | 108 | } |
105 | 109 | ||
106 | // --------------------------------------------------------------------------- | 110 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0cf5d53ce..50d732beb 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,6 @@ | |||
1 | import { | 1 | import { |
2 | FlowJob, | ||
3 | FlowProducer, | ||
2 | Job, | 4 | Job, |
3 | JobsOptions, | 5 | JobsOptions, |
4 | Queue, | 6 | Queue, |
@@ -13,7 +15,7 @@ import { | |||
13 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 15 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
14 | import { CONFIG } from '@server/initializers/config' | 16 | import { CONFIG } from '@server/initializers/config' |
15 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
16 | import { timeoutPromise } from '@shared/core-utils' | 18 | import { pick, timeoutPromise } from '@shared/core-utils' |
17 | import { | 19 | import { |
18 | ActivitypubFollowPayload, | 20 | ActivitypubFollowPayload, |
19 | ActivitypubHttpBroadcastPayload, | 21 | ActivitypubHttpBroadcastPayload, |
@@ -22,10 +24,12 @@ import { | |||
22 | ActorKeysPayload, | 24 | ActorKeysPayload, |
23 | DeleteResumableUploadMetaFilePayload, | 25 | DeleteResumableUploadMetaFilePayload, |
24 | EmailPayload, | 26 | EmailPayload, |
27 | FederateVideoPayload, | ||
25 | JobState, | 28 | JobState, |
26 | JobType, | 29 | JobType, |
27 | ManageVideoTorrentPayload, | 30 | ManageVideoTorrentPayload, |
28 | MoveObjectStoragePayload, | 31 | MoveObjectStoragePayload, |
32 | NotifyPayload, | ||
29 | RefreshPayload, | 33 | RefreshPayload, |
30 | VideoFileImportPayload, | 34 | VideoFileImportPayload, |
31 | VideoImportPayload, | 35 | VideoImportPayload, |
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica | |||
45 | import { refreshAPObject } from './handlers/activitypub-refresher' | 49 | import { refreshAPObject } from './handlers/activitypub-refresher' |
46 | import { processActorKeys } from './handlers/actor-keys' | 50 | import { processActorKeys } from './handlers/actor-keys' |
47 | import { processEmail } from './handlers/email' | 51 | import { processEmail } from './handlers/email' |
52 | import { processFederateVideo } from './handlers/federate-video' | ||
48 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 53 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
49 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 54 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
55 | import { processNotify } from './handlers/notify' | ||
50 | import { processVideoFileImport } from './handlers/video-file-import' | 56 | import { processVideoFileImport } from './handlers/video-file-import' |
51 | import { processVideoImport } from './handlers/video-import' | 57 | import { processVideoImport } from './handlers/video-import' |
52 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 58 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition' | |||
54 | import { processVideoTranscoding } from './handlers/video-transcoding' | 60 | import { processVideoTranscoding } from './handlers/video-transcoding' |
55 | import { processVideosViewsStats } from './handlers/video-views-stats' | 61 | import { processVideosViewsStats } from './handlers/video-views-stats' |
56 | 62 | ||
57 | type CreateJobArgument = | 63 | export type CreateJobArgument = |
58 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 64 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
59 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | 65 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
60 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 66 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
@@ -73,7 +79,9 @@ type CreateJobArgument = | |||
73 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | 79 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
74 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | 80 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
75 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | 81 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
76 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | 82 | { type: 'notify', payload: NotifyPayload } | |
83 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
84 | { type: 'federate-video', payload: FederateVideoPayload } | ||
77 | 85 | ||
78 | export type CreateJobOptions = { | 86 | export type CreateJobOptions = { |
79 | delay?: number | 87 | delay?: number |
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
98 | 'video-redundancy': processVideoRedundancy, | 106 | 'video-redundancy': processVideoRedundancy, |
99 | 'move-to-object-storage': processMoveToObjectStorage, | 107 | 'move-to-object-storage': processMoveToObjectStorage, |
100 | 'manage-video-torrent': processManageVideoTorrent, | 108 | 'manage-video-torrent': processManageVideoTorrent, |
101 | 'video-studio-edition': processVideoStudioEdition | 109 | 'notify': processNotify, |
110 | 'video-studio-edition': processVideoStudioEdition, | ||
111 | 'federate-video': processFederateVideo | ||
102 | } | 112 | } |
103 | 113 | ||
104 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | 114 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [ | |||
123 | 'video-live-ending', | 133 | 'video-live-ending', |
124 | 'move-to-object-storage', | 134 | 'move-to-object-storage', |
125 | 'manage-video-torrent', | 135 | 'manage-video-torrent', |
126 | 'video-studio-edition' | 136 | 'video-studio-edition', |
137 | 'notify', | ||
138 | 'federate-video' | ||
127 | ] | 139 | ] |
128 | 140 | ||
129 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | 141 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
@@ -137,6 +149,8 @@ class JobQueue { | |||
137 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | 149 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} |
138 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | 150 | private queueEvents: { [id in JobType]?: QueueEvents } = {} |
139 | 151 | ||
152 | private flowProducer: FlowProducer | ||
153 | |||
140 | private initialized = false | 154 | private initialized = false |
141 | private jobRedisPrefix: string | 155 | private jobRedisPrefix: string |
142 | 156 | ||
@@ -157,6 +171,11 @@ class JobQueue { | |||
157 | this.buildQueueEvent(handlerName, produceOnly) | 171 | this.buildQueueEvent(handlerName, produceOnly) |
158 | } | 172 | } |
159 | 173 | ||
174 | this.flowProducer = new FlowProducer({ | ||
175 | connection: this.getRedisConnection(), | ||
176 | prefix: this.jobRedisPrefix | ||
177 | }) | ||
178 | |||
160 | this.addRepeatableJobs() | 179 | this.addRepeatableJobs() |
161 | } | 180 | } |
162 | 181 | ||
@@ -243,6 +262,8 @@ class JobQueue { | |||
243 | } | 262 | } |
244 | } | 263 | } |
245 | 264 | ||
265 | // --------------------------------------------------------------------------- | ||
266 | |||
246 | async terminate () { | 267 | async terminate () { |
247 | const promises = Object.keys(this.workers) | 268 | const promises = Object.keys(this.workers) |
248 | .map(handlerName => { | 269 | .map(handlerName => { |
@@ -278,28 +299,56 @@ class JobQueue { | |||
278 | } | 299 | } |
279 | } | 300 | } |
280 | 301 | ||
281 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { | 302 | // --------------------------------------------------------------------------- |
282 | this.createJobWithPromise(obj, options) | 303 | |
283 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 304 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { |
305 | this.createJob(options) | ||
306 | .catch(err => logger.error('Cannot create job.', { err, options })) | ||
284 | } | 307 | } |
285 | 308 | ||
286 | async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | 309 | async createJob (options: CreateJobArgument & CreateJobOptions) { |
287 | const queue: Queue = this.queues[obj.type] | 310 | const queue: Queue = this.queues[options.type] |
288 | if (queue === undefined) { | 311 | if (queue === undefined) { |
289 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 312 | logger.error('Unknown queue %s: cannot create job.', options.type) |
290 | return | 313 | return |
291 | } | 314 | } |
292 | 315 | ||
293 | const jobArgs: JobsOptions = { | 316 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) |
317 | |||
318 | return queue.add('job', options.payload, jobOptions) | ||
319 | } | ||
320 | |||
321 | async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | ||
322 | let lastJob: FlowJob | ||
323 | |||
324 | for (const job of jobs) { | ||
325 | if (!job) continue | ||
326 | |||
327 | lastJob = { | ||
328 | name: 'job', | ||
329 | data: job.payload, | ||
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | ||
333 | ? [ lastJob ] | ||
334 | : [] | ||
335 | } | ||
336 | } | ||
337 | |||
338 | return this.flowProducer.add(lastJob) | ||
339 | } | ||
340 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | ||
342 | return { | ||
294 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 343 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
295 | attempts: JOB_ATTEMPTS[obj.type], | 344 | attempts: JOB_ATTEMPTS[type], |
296 | priority: options.priority, | 345 | priority: options.priority, |
297 | delay: options.delay | 346 | delay: options.delay |
298 | } | 347 | } |
299 | |||
300 | return queue.add('job', obj.payload, jobArgs) | ||
301 | } | 348 | } |
302 | 349 | ||
350 | // --------------------------------------------------------------------------- | ||
351 | |||
303 | async listForApi (options: { | 352 | async listForApi (options: { |
304 | state?: JobState | 353 | state?: JobState |
305 | start: number | 354 | start: number |
@@ -367,6 +416,8 @@ class JobQueue { | |||
367 | return Promise.all(promises) | 416 | return Promise.all(promises) |
368 | } | 417 | } |
369 | 418 | ||
419 | // --------------------------------------------------------------------------- | ||
420 | |||
370 | async removeOldJobs () { | 421 | async removeOldJobs () { |
371 | for (const key of Object.keys(this.queues)) { | 422 | for (const key of Object.keys(this.queues)) { |
372 | const queue: Queue = this.queues[key] | 423 | const queue: Queue = this.queues[key] |