aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts12
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts8
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts4
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts4
-rw-r--r--server/lib/job-queue/handlers/email.ts6
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts4
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts4
-rw-r--r--server/lib/job-queue/handlers/video-import.ts8
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts4
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts6
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts14
-rw-r--r--server/lib/job-queue/job-queue.ts14
15 files changed, 50 insertions, 50 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index 56e2b0ceb..d5e4508fe 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -1,5 +1,5 @@
1import * as Bluebird from 'bluebird' 1import { map } from 'bluebird'
2import * as Bull from 'bull' 2import { Job } from 'bull'
3import { checkUrlsSameHost } from '@server/helpers/activitypub' 3import { checkUrlsSameHost } from '@server/helpers/activitypub'
4import { 4import {
5 isAnnounceActivityValid, 5 isAnnounceActivityValid,
@@ -18,14 +18,14 @@ import { AccountVideoRateModel } from '../../../models/account/account-video-rat
18 18
19// Job to clean remote interactions off local videos 19// Job to clean remote interactions off local videos
20 20
21async function processActivityPubCleaner (_job: Bull.Job) { 21async function processActivityPubCleaner (_job: Job) {
22 logger.info('Processing ActivityPub cleaner.') 22 logger.info('Processing ActivityPub cleaner.')
23 23
24 { 24 {
25 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() 25 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
26 const { bodyValidator, deleter, updater } = rateOptionsFactory() 26 const { bodyValidator, deleter, updater } = rateOptionsFactory()
27 27
28 await Bluebird.map(rateUrls, async rateUrl => { 28 await map(rateUrls, async rateUrl => {
29 try { 29 try {
30 const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) 30 const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
31 31
@@ -44,7 +44,7 @@ async function processActivityPubCleaner (_job: Bull.Job) {
44 const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() 44 const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
45 const { bodyValidator, deleter, updater } = shareOptionsFactory() 45 const { bodyValidator, deleter, updater } = shareOptionsFactory()
46 46
47 await Bluebird.map(shareUrls, async shareUrl => { 47 await map(shareUrls, async shareUrl => {
48 try { 48 try {
49 await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) 49 await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter)
50 } catch (err) { 50 } catch (err) {
@@ -57,7 +57,7 @@ async function processActivityPubCleaner (_job: Bull.Job) {
57 const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() 57 const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
58 const { bodyValidator, deleter, updater } = commentOptionsFactory() 58 const { bodyValidator, deleter, updater } = commentOptionsFactory()
59 59
60 await Bluebird.map(commentUrls, async commentUrl => { 60 await map(commentUrls, async commentUrl => {
61 try { 61 try {
62 await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) 62 await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter)
63 } catch (err) { 63 } catch (err) {
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index f896d7af4..91e3d33c6 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' 2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url'
3import { ActivitypubFollowPayload } from '@shared/models' 3import { ActivitypubFollowPayload } from '@shared/models'
4import { sanitizeHost } from '../../../helpers/core-utils' 4import { sanitizeHost } from '../../../helpers/core-utils'
@@ -13,7 +13,7 @@ import { getOrCreateAPActor, loadActorUrlOrGetFromWebfinger } from '../../activi
13import { sendFollow } from '../../activitypub/send' 13import { sendFollow } from '../../activitypub/send'
14import { Notifier } from '../../notifier' 14import { Notifier } from '../../notifier'
15 15
16async function processActivityPubFollow (job: Bull.Job) { 16async function processActivityPubFollow (job: Job) {
17 const payload = job.data as ActivitypubFollowPayload 17 const payload = job.data as ActivitypubFollowPayload
18 const host = payload.host 18 const host = payload.host
19 19
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index d4b328635..9b0bb6574 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,5 +1,5 @@
1import * as Bluebird from 'bluebird' 1import { map } from 'bluebird'
2import * as Bull from 'bull' 2import { Job } from 'bull'
3import { ActivitypubHttpBroadcastPayload } from '@shared/models' 3import { ActivitypubHttpBroadcastPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
5import { doRequest } from '../../../helpers/requests' 5import { doRequest } from '../../../helpers/requests'
@@ -7,7 +7,7 @@ import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
7import { ActorFollowScoreCache } from '../../files-cache' 7import { ActorFollowScoreCache } from '../../files-cache'
8import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 8import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
9 9
10async function processActivityPubHttpBroadcast (job: Bull.Job) { 10async function processActivityPubHttpBroadcast (job: Job) {
11 logger.info('Processing ActivityPub broadcast in job %d.', job.id) 11 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
12 12
13 const payload = job.data as ActivitypubHttpBroadcastPayload 13 const payload = job.data as ActivitypubHttpBroadcastPayload
@@ -25,7 +25,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) {
25 const badUrls: string[] = [] 25 const badUrls: string[] = []
26 const goodUrls: string[] = [] 26 const goodUrls: string[] = []
27 27
28 await Bluebird.map(payload.uris, uri => { 28 await map(payload.uris, uri => {
29 return doRequest(uri, options) 29 return doRequest(uri, options)
30 .then(() => goodUrls.push(uri)) 30 .then(() => goodUrls.push(uri))
31 .catch(() => badUrls.push(uri)) 31 .catch(() => badUrls.push(uri))
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index ab9675cae..46016a0a7 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' 2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { AccountVideoRateModel } from '../../../models/account/account-video-rate' 4import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
@@ -13,7 +13,7 @@ import { addVideoShares } from '../../activitypub/share'
13import { addVideoComments } from '../../activitypub/video-comments' 13import { addVideoComments } from '../../activitypub/video-comments'
14import { createRates } from '../../activitypub/video-rates' 14import { createRates } from '../../activitypub/video-rates'
15 15
16async function processActivityPubHttpFetcher (job: Bull.Job) { 16async function processActivityPubHttpFetcher (job: Job) {
17 logger.info('Processing ActivityPub fetcher in job %d.', job.id) 17 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
18 18
19 const payload = job.data as ActivitypubHttpFetcherPayload 19 const payload = job.data as ActivitypubHttpFetcherPayload
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 9e561c6b7..9be50837f 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,11 +1,11 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { ActivitypubHttpUnicastPayload } from '@shared/models' 2import { ActivitypubHttpUnicastPayload } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
5import { ActorFollowScoreCache } from '../../files-cache' 5import { ActorFollowScoreCache } from '../../files-cache'
6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7 7
8async function processActivityPubHttpUnicast (job: Bull.Job) { 8async function processActivityPubHttpUnicast (job: Job) {
9 logger.info('Processing ActivityPub unicast in job %d.', job.id) 9 logger.info('Processing ActivityPub unicast in job %d.', job.id)
10 10
11 const payload = job.data as ActivitypubHttpUnicastPayload 11 const payload = job.data as ActivitypubHttpUnicastPayload
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index d97e50ebc..5037992d2 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' 2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists'
3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' 3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos'
4import { loadVideoByUrl } from '@server/lib/model-loaders' 4import { loadVideoByUrl } from '@server/lib/model-loaders'
@@ -8,7 +8,7 @@ import { ActorModel } from '../../../models/actor/actor'
8import { VideoPlaylistModel } from '../../../models/video/video-playlist' 8import { VideoPlaylistModel } from '../../../models/video/video-playlist'
9import { refreshActorIfNeeded } from '../../activitypub/actors' 9import { refreshActorIfNeeded } from '../../activitypub/actors'
10 10
11async function refreshAPObject (job: Bull.Job) { 11async function refreshAPObject (job: Job) {
12 const payload = job.data as RefreshPayload 12 const payload = job.data as RefreshPayload
13 13
14 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) 14 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 60ac61afd..9d5a65376 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -1,10 +1,10 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' 2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors'
3import { ActorModel } from '@server/models/actor/actor' 3import { ActorModel } from '@server/models/actor/actor'
4import { ActorKeysPayload } from '@shared/models' 4import { ActorKeysPayload } from '@shared/models'
5import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
6 6
7async function processActorKeys (job: Bull.Job) { 7async function processActorKeys (job: Job) {
8 const payload = job.data as ActorKeysPayload 8 const payload = job.data as ActorKeysPayload
9 logger.info('Processing actor keys in job %d.', job.id) 9 logger.info('Processing actor keys in job %d.', job.id)
10 10
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 3157731e2..6fc1caa84 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -1,9 +1,9 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { EmailPayload } from '@shared/models'
2import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
3import { Emailer } from '../../emailer' 4import { Emailer } from '../../emailer'
4import { EmailPayload } from '@shared/models'
5 5
6async function processEmail (job: Bull.Job) { 6async function processEmail (job: Job) {
7 const payload = job.data as EmailPayload 7 const payload = job.data as EmailPayload
8 logger.info('Processing email in job %d.', job.id) 8 logger.info('Processing email in job %d.', job.id)
9 9
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index f3b8726eb..0bebc0fc2 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { remove } from 'fs-extra' 2import { remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { logger } from '@server/helpers/logger' 4import { logger } from '@server/helpers/logger'
@@ -12,7 +12,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
12import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' 12import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
13import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared' 13import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared'
14 14
15export async function processMoveToObjectStorage (job: Bull.Job) { 15export async function processMoveToObjectStorage (job: Job) {
16 const payload = job.data as MoveObjectStoragePayload 16 const payload = job.data as MoveObjectStoragePayload
17 logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) 17 logger.info('Moving video %s in job %d.', payload.videoUUID, job.id)
18 18
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index e8ee1f759..e6c918e6c 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { copy, stat } from 'fs-extra' 2import { copy, stat } from 'fs-extra'
3import { getLowercaseExtension } from '@server/helpers/core-utils' 3import { getLowercaseExtension } from '@server/helpers/core-utils'
4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
@@ -16,7 +16,7 @@ import { VideoModel } from '../../../models/video/video'
16import { VideoFileModel } from '../../../models/video/video-file' 16import { VideoFileModel } from '../../../models/video/video-file'
17import { createHlsJobIfEnabled } from './video-transcoding' 17import { createHlsJobIfEnabled } from './video-transcoding'
18 18
19async function processVideoFileImport (job: Bull.Job) { 19async function processVideoFileImport (job: Job) {
20 const payload = job.data as VideoFileImportPayload 20 const payload = job.data as VideoFileImportPayload
21 logger.info('Processing video file import in job %d.', job.id) 21 logger.info('Processing video file import in job %d.', job.id)
22 22
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index a5fa204f5..bdbf07a06 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { getLowercaseExtension } from '@server/helpers/core-utils' 3import { getLowercaseExtension } from '@server/helpers/core-utils'
4import { retryTransactionWrapper } from '@server/helpers/database-utils' 4import { retryTransactionWrapper } from '@server/helpers/database-utils'
@@ -37,7 +37,7 @@ import { federateVideoIfNeeded } from '../../activitypub/videos'
37import { Notifier } from '../../notifier' 37import { Notifier } from '../../notifier'
38import { generateVideoMiniature } from '../../thumbnail' 38import { generateVideoMiniature } from '../../thumbnail'
39 39
40async function processVideoImport (job: Bull.Job) { 40async function processVideoImport (job: Job) {
41 const payload = job.data as VideoImportPayload 41 const payload = job.data as VideoImportPayload
42 42
43 if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, payload) 43 if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, payload)
@@ -52,7 +52,7 @@ export {
52 52
53// --------------------------------------------------------------------------- 53// ---------------------------------------------------------------------------
54 54
55async function processTorrentImport (job: Bull.Job, payload: VideoImportTorrentPayload) { 55async function processTorrentImport (job: Job, payload: VideoImportTorrentPayload) {
56 logger.info('Processing torrent video import in job %d.', job.id) 56 logger.info('Processing torrent video import in job %d.', job.id)
57 57
58 const videoImport = await getVideoImportOrDie(payload.videoImportId) 58 const videoImport = await getVideoImportOrDie(payload.videoImportId)
@@ -68,7 +68,7 @@ async function processTorrentImport (job: Bull.Job, payload: VideoImportTorrentP
68 return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options) 68 return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options)
69} 69}
70 70
71async function processYoutubeDLImport (job: Bull.Job, payload: VideoImportYoutubeDLPayload) { 71async function processYoutubeDLImport (job: Job, payload: VideoImportYoutubeDLPayload) {
72 logger.info('Processing youtubeDL video import in job %d.', job.id) 72 logger.info('Processing youtubeDL video import in job %d.', job.id)
73 73
74 const videoImport = await getVideoImportOrDie(payload.videoImportId) 74 const videoImport = await getVideoImportOrDie(payload.videoImportId)
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 9ccf724c2..a04cfa2c9 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { pathExists, readdir, remove } from 'fs-extra' 2import { pathExists, readdir, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' 4import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
@@ -17,7 +17,7 @@ import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models'
17import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' 17import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
18import { logger } from '../../../helpers/logger' 18import { logger } from '../../../helpers/logger'
19 19
20async function processVideoLiveEnding (job: Bull.Job) { 20async function processVideoLiveEnding (job: Job) {
21 const payload = job.data as VideoLiveEndingPayload 21 const payload = job.data as VideoLiveEndingPayload
22 22
23 function logError () { 23 function logError () {
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
index 6296dab05..9cb7a6589 100644
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ b/server/lib/job-queue/handlers/video-redundancy.ts
@@ -1,9 +1,9 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { logger } from '../../../helpers/logger'
3import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' 2import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler'
4import { VideoRedundancyPayload } from '@shared/models' 3import { VideoRedundancyPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger'
5 5
6async function processVideoRedundancy (job: Bull.Job) { 6async function processVideoRedundancy (job: Job) {
7 const payload = job.data as VideoRedundancyPayload 7 const payload = job.data as VideoRedundancyPayload
8 logger.info('Processing video redundancy in job %d.', job.id) 8 logger.info('Processing video redundancy in job %d.', job.id)
9 9
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 5a93c4ed1..20880cdc1 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import { Job } from 'bull'
2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' 2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
3import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 3import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
4import { VideoPathManager } from '@server/lib/video-path-manager' 4import { VideoPathManager } from '@server/lib/video-path-manager'
@@ -25,7 +25,7 @@ import {
25 transcodeNewWebTorrentResolution 25 transcodeNewWebTorrentResolution
26} from '../../transcoding/video-transcoding' 26} from '../../transcoding/video-transcoding'
27 27
28type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> 28type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
29 29
30const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { 30const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
31 'new-resolution-to-hls': handleHLSJob, 31 'new-resolution-to-hls': handleHLSJob,
@@ -36,7 +36,7 @@ const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } =
36 36
37const lTags = loggerTagsFactory('transcoding') 37const lTags = loggerTagsFactory('transcoding')
38 38
39async function processVideoTranscoding (job: Bull.Job) { 39async function processVideoTranscoding (job: Job) {
40 const payload = job.data as VideoTranscodingPayload 40 const payload = job.data as VideoTranscodingPayload
41 logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID)) 41 logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID))
42 42
@@ -64,7 +64,7 @@ async function processVideoTranscoding (job: Bull.Job) {
64// Job handlers 64// Job handlers
65// --------------------------------------------------------------------------- 65// ---------------------------------------------------------------------------
66 66
67async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { 67async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
68 logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) 68 logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
69 69
70 const videoFileInput = payload.copyCodecs 70 const videoFileInput = payload.copyCodecs
@@ -90,7 +90,7 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide
90} 90}
91 91
92async function handleNewWebTorrentResolutionJob ( 92async function handleNewWebTorrentResolutionJob (
93 job: Bull.Job, 93 job: Job,
94 payload: NewResolutionTranscodingPayload, 94 payload: NewResolutionTranscodingPayload,
95 video: MVideoFullLight, 95 video: MVideoFullLight,
96 user: MUserId 96 user: MUserId
@@ -104,7 +104,7 @@ async function handleNewWebTorrentResolutionJob (
104 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) 104 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
105} 105}
106 106
107async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { 107async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
108 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) 108 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid))
109 109
110 await mergeAudioVideofile(video, payload.resolution, job) 110 await mergeAudioVideofile(video, payload.resolution, job)
@@ -114,7 +114,7 @@ async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudio
114 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) 114 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user)
115} 115}
116 116
117async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { 117async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
118 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) 118 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid))
119 119
120 const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) 120 const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 7a3a1bf82..4cda12b57 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,4 @@
1import * as Bull from 'bull' 1import Bull, { Job, JobOptions, Queue } from 'bull'
2import { jobStates } from '@server/helpers/custom-validators/jobs' 2import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { CONFIG } from '@server/initializers/config' 3import { CONFIG } from '@server/initializers/config'
4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
@@ -30,12 +30,12 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
30import { refreshAPObject } from './handlers/activitypub-refresher' 30import { refreshAPObject } from './handlers/activitypub-refresher'
31import { processActorKeys } from './handlers/actor-keys' 31import { processActorKeys } from './handlers/actor-keys'
32import { processEmail } from './handlers/email' 32import { processEmail } from './handlers/email'
33import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
33import { processVideoFileImport } from './handlers/video-file-import' 34import { processVideoFileImport } from './handlers/video-file-import'
34import { processVideoImport } from './handlers/video-import' 35import { processVideoImport } from './handlers/video-import'
35import { processVideoLiveEnding } from './handlers/video-live-ending' 36import { processVideoLiveEnding } from './handlers/video-live-ending'
36import { processVideoTranscoding } from './handlers/video-transcoding' 37import { processVideoTranscoding } from './handlers/video-transcoding'
37import { processVideosViews } from './handlers/video-views' 38import { processVideosViews } from './handlers/video-views'
38import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
39 39
40type CreateJobArgument = 40type CreateJobArgument =
41 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 41 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -59,7 +59,7 @@ export type CreateJobOptions = {
59 priority?: number 59 priority?: number
60} 60}
61 61
62const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { 62const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
63 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 63 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
64 'activitypub-http-unicast': processActivityPubHttpUnicast, 64 'activitypub-http-unicast': processActivityPubHttpUnicast,
65 'activitypub-http-fetcher': processActivityPubHttpFetcher, 65 'activitypub-http-fetcher': processActivityPubHttpFetcher,
@@ -99,7 +99,7 @@ class JobQueue {
99 99
100 private static instance: JobQueue 100 private static instance: JobQueue
101 101
102 private queues: { [id in JobType]?: Bull.Queue } = {} 102 private queues: { [id in JobType]?: Queue } = {}
103 private initialized = false 103 private initialized = false
104 private jobRedisPrefix: string 104 private jobRedisPrefix: string
105 105
@@ -160,7 +160,7 @@ class JobQueue {
160 return 160 return
161 } 161 }
162 162
163 const jobArgs: Bull.JobOptions = { 163 const jobArgs: JobOptions = {
164 backoff: { delay: 60 * 1000, type: 'exponential' }, 164 backoff: { delay: 60 * 1000, type: 'exponential' },
165 attempts: JOB_ATTEMPTS[obj.type], 165 attempts: JOB_ATTEMPTS[obj.type],
166 timeout: JOB_TTL[obj.type], 166 timeout: JOB_TTL[obj.type],
@@ -177,11 +177,11 @@ class JobQueue {
177 count: number 177 count: number
178 asc?: boolean 178 asc?: boolean
179 jobType: JobType 179 jobType: JobType
180 }): Promise<Bull.Job[]> { 180 }): Promise<Job[]> {
181 const { state, start, count, asc, jobType } = options 181 const { state, start, count, asc, jobType } = options
182 182
183 const states = state ? [ state ] : jobStates 183 const states = state ? [ state ] : jobStates
184 let results: Bull.Job[] = [] 184 let results: Job[] = []
185 185
186 const filteredJobTypes = this.filterJobTypes(jobType) 186 const filteredJobTypes = this.filterJobTypes(jobType)
187 187