diff options
Diffstat (limited to 'server/lib/job-queue')
15 files changed, 50 insertions, 50 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index 56e2b0ceb..d5e4508fe 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import * as Bull from 'bull' | 2 | import { Job } from 'bull' |
3 | import { checkUrlsSameHost } from '@server/helpers/activitypub' | 3 | import { checkUrlsSameHost } from '@server/helpers/activitypub' |
4 | import { | 4 | import { |
5 | isAnnounceActivityValid, | 5 | isAnnounceActivityValid, |
@@ -18,14 +18,14 @@ import { AccountVideoRateModel } from '../../../models/account/account-video-rat | |||
18 | 18 | ||
19 | // Job to clean remote interactions off local videos | 19 | // Job to clean remote interactions off local videos |
20 | 20 | ||
21 | async function processActivityPubCleaner (_job: Bull.Job) { | 21 | async function processActivityPubCleaner (_job: Job) { |
22 | logger.info('Processing ActivityPub cleaner.') | 22 | logger.info('Processing ActivityPub cleaner.') |
23 | 23 | ||
24 | { | 24 | { |
25 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() | 25 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() |
26 | const { bodyValidator, deleter, updater } = rateOptionsFactory() | 26 | const { bodyValidator, deleter, updater } = rateOptionsFactory() |
27 | 27 | ||
28 | await Bluebird.map(rateUrls, async rateUrl => { | 28 | await map(rateUrls, async rateUrl => { |
29 | try { | 29 | try { |
30 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | 30 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) |
31 | 31 | ||
@@ -44,7 +44,7 @@ async function processActivityPubCleaner (_job: Bull.Job) { | |||
44 | const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() | 44 | const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() |
45 | const { bodyValidator, deleter, updater } = shareOptionsFactory() | 45 | const { bodyValidator, deleter, updater } = shareOptionsFactory() |
46 | 46 | ||
47 | await Bluebird.map(shareUrls, async shareUrl => { | 47 | await map(shareUrls, async shareUrl => { |
48 | try { | 48 | try { |
49 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | 49 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) |
50 | } catch (err) { | 50 | } catch (err) { |
@@ -57,7 +57,7 @@ async function processActivityPubCleaner (_job: Bull.Job) { | |||
57 | const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() | 57 | const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() |
58 | const { bodyValidator, deleter, updater } = commentOptionsFactory() | 58 | const { bodyValidator, deleter, updater } = commentOptionsFactory() |
59 | 59 | ||
60 | await Bluebird.map(commentUrls, async commentUrl => { | 60 | await map(commentUrls, async commentUrl => { |
61 | try { | 61 | try { |
62 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | 62 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) |
63 | } catch (err) { | 63 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index f896d7af4..91e3d33c6 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' | 2 | import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' |
3 | import { ActivitypubFollowPayload } from '@shared/models' | 3 | import { ActivitypubFollowPayload } from '@shared/models' |
4 | import { sanitizeHost } from '../../../helpers/core-utils' | 4 | import { sanitizeHost } from '../../../helpers/core-utils' |
@@ -13,7 +13,7 @@ import { getOrCreateAPActor, loadActorUrlOrGetFromWebfinger } from '../../activi | |||
13 | import { sendFollow } from '../../activitypub/send' | 13 | import { sendFollow } from '../../activitypub/send' |
14 | import { Notifier } from '../../notifier' | 14 | import { Notifier } from '../../notifier' |
15 | 15 | ||
16 | async function processActivityPubFollow (job: Bull.Job) { | 16 | async function processActivityPubFollow (job: Job) { |
17 | const payload = job.data as ActivitypubFollowPayload | 17 | const payload = job.data as ActivitypubFollowPayload |
18 | const host = payload.host | 18 | const host = payload.host |
19 | 19 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index d4b328635..9b0bb6574 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import * as Bull from 'bull' | 2 | import { Job } from 'bull' |
3 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | 3 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | 4 | import { logger } from '../../../helpers/logger' |
5 | import { doRequest } from '../../../helpers/requests' | 5 | import { doRequest } from '../../../helpers/requests' |
@@ -7,7 +7,7 @@ import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | |||
7 | import { ActorFollowScoreCache } from '../../files-cache' | 7 | import { ActorFollowScoreCache } from '../../files-cache' |
8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Bull.Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
11 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 11 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) |
12 | 12 | ||
13 | const payload = job.data as ActivitypubHttpBroadcastPayload | 13 | const payload = job.data as ActivitypubHttpBroadcastPayload |
@@ -25,7 +25,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
25 | const badUrls: string[] = [] | 25 | const badUrls: string[] = [] |
26 | const goodUrls: string[] = [] | 26 | const goodUrls: string[] = [] |
27 | 27 | ||
28 | await Bluebird.map(payload.uris, uri => { | 28 | await map(payload.uris, uri => { |
29 | return doRequest(uri, options) | 29 | return doRequest(uri, options) |
30 | .then(() => goodUrls.push(uri)) | 30 | .then(() => goodUrls.push(uri)) |
31 | .catch(() => badUrls.push(uri)) | 31 | .catch(() => badUrls.push(uri)) |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index ab9675cae..46016a0a7 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' | 2 | import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | 4 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' |
@@ -13,7 +13,7 @@ import { addVideoShares } from '../../activitypub/share' | |||
13 | import { addVideoComments } from '../../activitypub/video-comments' | 13 | import { addVideoComments } from '../../activitypub/video-comments' |
14 | import { createRates } from '../../activitypub/video-rates' | 14 | import { createRates } from '../../activitypub/video-rates' |
15 | 15 | ||
16 | async function processActivityPubHttpFetcher (job: Bull.Job) { | 16 | async function processActivityPubHttpFetcher (job: Job) { |
17 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 17 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) |
18 | 18 | ||
19 | const payload = job.data as ActivitypubHttpFetcherPayload | 19 | const payload = job.data as ActivitypubHttpFetcherPayload |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 9e561c6b7..9be50837f 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -1,11 +1,11 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | 2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
5 | import { ActorFollowScoreCache } from '../../files-cache' | 5 | import { ActorFollowScoreCache } from '../../files-cache' |
6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Bull.Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
9 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 9 | logger.info('Processing ActivityPub unicast in job %d.', job.id) |
10 | 10 | ||
11 | const payload = job.data as ActivitypubHttpUnicastPayload | 11 | const payload = job.data as ActivitypubHttpUnicastPayload |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index d97e50ebc..5037992d2 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' | 2 | import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' |
3 | import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' | 3 | import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' |
4 | import { loadVideoByUrl } from '@server/lib/model-loaders' | 4 | import { loadVideoByUrl } from '@server/lib/model-loaders' |
@@ -8,7 +8,7 @@ import { ActorModel } from '../../../models/actor/actor' | |||
8 | import { VideoPlaylistModel } from '../../../models/video/video-playlist' | 8 | import { VideoPlaylistModel } from '../../../models/video/video-playlist' |
9 | import { refreshActorIfNeeded } from '../../activitypub/actors' | 9 | import { refreshActorIfNeeded } from '../../activitypub/actors' |
10 | 10 | ||
11 | async function refreshAPObject (job: Bull.Job) { | 11 | async function refreshAPObject (job: Job) { |
12 | const payload = job.data as RefreshPayload | 12 | const payload = job.data as RefreshPayload |
13 | 13 | ||
14 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) | 14 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) |
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 60ac61afd..9d5a65376 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' | 2 | import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' |
3 | import { ActorModel } from '@server/models/actor/actor' | 3 | import { ActorModel } from '@server/models/actor/actor' |
4 | import { ActorKeysPayload } from '@shared/models' | 4 | import { ActorKeysPayload } from '@shared/models' |
5 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
6 | 6 | ||
7 | async function processActorKeys (job: Bull.Job) { | 7 | async function processActorKeys (job: Job) { |
8 | const payload = job.data as ActorKeysPayload | 8 | const payload = job.data as ActorKeysPayload |
9 | logger.info('Processing actor keys in job %d.', job.id) | 9 | logger.info('Processing actor keys in job %d.', job.id) |
10 | 10 | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 3157731e2..6fc1caa84 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -1,9 +1,9 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { EmailPayload } from '@shared/models' | ||
2 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
3 | import { Emailer } from '../../emailer' | 4 | import { Emailer } from '../../emailer' |
4 | import { EmailPayload } from '@shared/models' | ||
5 | 5 | ||
6 | async function processEmail (job: Bull.Job) { | 6 | async function processEmail (job: Job) { |
7 | const payload = job.data as EmailPayload | 7 | const payload = job.data as EmailPayload |
8 | logger.info('Processing email in job %d.', job.id) | 8 | logger.info('Processing email in job %d.', job.id) |
9 | 9 | ||
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index f3b8726eb..0bebc0fc2 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { remove } from 'fs-extra' | 2 | import { remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { logger } from '@server/helpers/logger' | 4 | import { logger } from '@server/helpers/logger' |
@@ -12,7 +12,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
12 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' | 12 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' |
13 | import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared' | 13 | import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared' |
14 | 14 | ||
15 | export async function processMoveToObjectStorage (job: Bull.Job) { | 15 | export async function processMoveToObjectStorage (job: Job) { |
16 | const payload = job.data as MoveObjectStoragePayload | 16 | const payload = job.data as MoveObjectStoragePayload |
17 | logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) | 17 | logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) |
18 | 18 | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index e8ee1f759..e6c918e6c 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { copy, stat } from 'fs-extra' | 2 | import { copy, stat } from 'fs-extra' |
3 | import { getLowercaseExtension } from '@server/helpers/core-utils' | 3 | import { getLowercaseExtension } from '@server/helpers/core-utils' |
4 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | 4 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' |
@@ -16,7 +16,7 @@ import { VideoModel } from '../../../models/video/video' | |||
16 | import { VideoFileModel } from '../../../models/video/video-file' | 16 | import { VideoFileModel } from '../../../models/video/video-file' |
17 | import { createHlsJobIfEnabled } from './video-transcoding' | 17 | import { createHlsJobIfEnabled } from './video-transcoding' |
18 | 18 | ||
19 | async function processVideoFileImport (job: Bull.Job) { | 19 | async function processVideoFileImport (job: Job) { |
20 | const payload = job.data as VideoFileImportPayload | 20 | const payload = job.data as VideoFileImportPayload |
21 | logger.info('Processing video file import in job %d.', job.id) | 21 | logger.info('Processing video file import in job %d.', job.id) |
22 | 22 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index a5fa204f5..bdbf07a06 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { move, remove, stat } from 'fs-extra' | 2 | import { move, remove, stat } from 'fs-extra' |
3 | import { getLowercaseExtension } from '@server/helpers/core-utils' | 3 | import { getLowercaseExtension } from '@server/helpers/core-utils' |
4 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | 4 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
@@ -37,7 +37,7 @@ import { federateVideoIfNeeded } from '../../activitypub/videos' | |||
37 | import { Notifier } from '../../notifier' | 37 | import { Notifier } from '../../notifier' |
38 | import { generateVideoMiniature } from '../../thumbnail' | 38 | import { generateVideoMiniature } from '../../thumbnail' |
39 | 39 | ||
40 | async function processVideoImport (job: Bull.Job) { | 40 | async function processVideoImport (job: Job) { |
41 | const payload = job.data as VideoImportPayload | 41 | const payload = job.data as VideoImportPayload |
42 | 42 | ||
43 | if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, payload) | 43 | if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, payload) |
@@ -52,7 +52,7 @@ export { | |||
52 | 52 | ||
53 | // --------------------------------------------------------------------------- | 53 | // --------------------------------------------------------------------------- |
54 | 54 | ||
55 | async function processTorrentImport (job: Bull.Job, payload: VideoImportTorrentPayload) { | 55 | async function processTorrentImport (job: Job, payload: VideoImportTorrentPayload) { |
56 | logger.info('Processing torrent video import in job %d.', job.id) | 56 | logger.info('Processing torrent video import in job %d.', job.id) |
57 | 57 | ||
58 | const videoImport = await getVideoImportOrDie(payload.videoImportId) | 58 | const videoImport = await getVideoImportOrDie(payload.videoImportId) |
@@ -68,7 +68,7 @@ async function processTorrentImport (job: Bull.Job, payload: VideoImportTorrentP | |||
68 | return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options) | 68 | return processFile(() => downloadWebTorrentVideo(target, VIDEO_IMPORT_TIMEOUT), videoImport, options) |
69 | } | 69 | } |
70 | 70 | ||
71 | async function processYoutubeDLImport (job: Bull.Job, payload: VideoImportYoutubeDLPayload) { | 71 | async function processYoutubeDLImport (job: Job, payload: VideoImportYoutubeDLPayload) { |
72 | logger.info('Processing youtubeDL video import in job %d.', job.id) | 72 | logger.info('Processing youtubeDL video import in job %d.', job.id) |
73 | 73 | ||
74 | const videoImport = await getVideoImportOrDie(payload.videoImportId) | 74 | const videoImport = await getVideoImportOrDie(payload.videoImportId) |
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 9ccf724c2..a04cfa2c9 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { pathExists, readdir, remove } from 'fs-extra' | 2 | import { pathExists, readdir, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' | 4 | import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' |
@@ -17,7 +17,7 @@ import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' | |||
17 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' | 17 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' |
18 | import { logger } from '../../../helpers/logger' | 18 | import { logger } from '../../../helpers/logger' |
19 | 19 | ||
20 | async function processVideoLiveEnding (job: Bull.Job) { | 20 | async function processVideoLiveEnding (job: Job) { |
21 | const payload = job.data as VideoLiveEndingPayload | 21 | const payload = job.data as VideoLiveEndingPayload |
22 | 22 | ||
23 | function logError () { | 23 | function logError () { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 6296dab05..9cb7a6589 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -1,9 +1,9 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' | 2 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' |
4 | import { VideoRedundancyPayload } from '@shared/models' | 3 | import { VideoRedundancyPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | ||
5 | 5 | ||
6 | async function processVideoRedundancy (job: Bull.Job) { | 6 | async function processVideoRedundancy (job: Job) { |
7 | const payload = job.data as VideoRedundancyPayload | 7 | const payload = job.data as VideoRedundancyPayload |
8 | logger.info('Processing video redundancy in job %d.', job.id) | 8 | logger.info('Processing video redundancy in job %d.', job.id) |
9 | 9 | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 5a93c4ed1..20880cdc1 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import { Job } from 'bull' |
2 | import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' | 2 | import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' |
3 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 3 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' |
4 | import { VideoPathManager } from '@server/lib/video-path-manager' | 4 | import { VideoPathManager } from '@server/lib/video-path-manager' |
@@ -25,7 +25,7 @@ import { | |||
25 | transcodeNewWebTorrentResolution | 25 | transcodeNewWebTorrentResolution |
26 | } from '../../transcoding/video-transcoding' | 26 | } from '../../transcoding/video-transcoding' |
27 | 27 | ||
28 | type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 28 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
29 | 29 | ||
30 | const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { | 30 | const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { |
31 | 'new-resolution-to-hls': handleHLSJob, | 31 | 'new-resolution-to-hls': handleHLSJob, |
@@ -36,7 +36,7 @@ const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = | |||
36 | 36 | ||
37 | const lTags = loggerTagsFactory('transcoding') | 37 | const lTags = loggerTagsFactory('transcoding') |
38 | 38 | ||
39 | async function processVideoTranscoding (job: Bull.Job) { | 39 | async function processVideoTranscoding (job: Job) { |
40 | const payload = job.data as VideoTranscodingPayload | 40 | const payload = job.data as VideoTranscodingPayload |
41 | logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID)) | 41 | logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID)) |
42 | 42 | ||
@@ -64,7 +64,7 @@ async function processVideoTranscoding (job: Bull.Job) { | |||
64 | // Job handlers | 64 | // Job handlers |
65 | // --------------------------------------------------------------------------- | 65 | // --------------------------------------------------------------------------- |
66 | 66 | ||
67 | async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { | 67 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { |
68 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) | 68 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) |
69 | 69 | ||
70 | const videoFileInput = payload.copyCodecs | 70 | const videoFileInput = payload.copyCodecs |
@@ -90,7 +90,7 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide | |||
90 | } | 90 | } |
91 | 91 | ||
92 | async function handleNewWebTorrentResolutionJob ( | 92 | async function handleNewWebTorrentResolutionJob ( |
93 | job: Bull.Job, | 93 | job: Job, |
94 | payload: NewResolutionTranscodingPayload, | 94 | payload: NewResolutionTranscodingPayload, |
95 | video: MVideoFullLight, | 95 | video: MVideoFullLight, |
96 | user: MUserId | 96 | user: MUserId |
@@ -104,7 +104,7 @@ async function handleNewWebTorrentResolutionJob ( | |||
104 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) | 104 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) |
105 | } | 105 | } |
106 | 106 | ||
107 | async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 107 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
108 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) | 108 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) |
109 | 109 | ||
110 | await mergeAudioVideofile(video, payload.resolution, job) | 110 | await mergeAudioVideofile(video, payload.resolution, job) |
@@ -114,7 +114,7 @@ async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudio | |||
114 | await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) | 114 | await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) |
115 | } | 115 | } |
116 | 116 | ||
117 | async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 117 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
118 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) | 118 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) |
119 | 119 | ||
120 | const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) | 120 | const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 7a3a1bf82..4cda12b57 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Bull from 'bull' | 1 | import Bull, { Job, JobOptions, Queue } from 'bull' |
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 2 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
3 | import { CONFIG } from '@server/initializers/config' | 3 | import { CONFIG } from '@server/initializers/config' |
4 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 4 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
@@ -30,12 +30,12 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica | |||
30 | import { refreshAPObject } from './handlers/activitypub-refresher' | 30 | import { refreshAPObject } from './handlers/activitypub-refresher' |
31 | import { processActorKeys } from './handlers/actor-keys' | 31 | import { processActorKeys } from './handlers/actor-keys' |
32 | import { processEmail } from './handlers/email' | 32 | import { processEmail } from './handlers/email' |
33 | import { processMoveToObjectStorage } from './handlers/move-to-object-storage' | ||
33 | import { processVideoFileImport } from './handlers/video-file-import' | 34 | import { processVideoFileImport } from './handlers/video-file-import' |
34 | import { processVideoImport } from './handlers/video-import' | 35 | import { processVideoImport } from './handlers/video-import' |
35 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 36 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
36 | import { processVideoTranscoding } from './handlers/video-transcoding' | 37 | import { processVideoTranscoding } from './handlers/video-transcoding' |
37 | import { processVideosViews } from './handlers/video-views' | 38 | import { processVideosViews } from './handlers/video-views' |
38 | import { processMoveToObjectStorage } from './handlers/move-to-object-storage' | ||
39 | 39 | ||
40 | type CreateJobArgument = | 40 | type CreateJobArgument = |
41 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 41 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -59,7 +59,7 @@ export type CreateJobOptions = { | |||
59 | priority?: number | 59 | priority?: number |
60 | } | 60 | } |
61 | 61 | ||
62 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | 62 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
63 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 63 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
64 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 64 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
65 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 65 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
@@ -99,7 +99,7 @@ class JobQueue { | |||
99 | 99 | ||
100 | private static instance: JobQueue | 100 | private static instance: JobQueue |
101 | 101 | ||
102 | private queues: { [id in JobType]?: Bull.Queue } = {} | 102 | private queues: { [id in JobType]?: Queue } = {} |
103 | private initialized = false | 103 | private initialized = false |
104 | private jobRedisPrefix: string | 104 | private jobRedisPrefix: string |
105 | 105 | ||
@@ -160,7 +160,7 @@ class JobQueue { | |||
160 | return | 160 | return |
161 | } | 161 | } |
162 | 162 | ||
163 | const jobArgs: Bull.JobOptions = { | 163 | const jobArgs: JobOptions = { |
164 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 164 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
165 | attempts: JOB_ATTEMPTS[obj.type], | 165 | attempts: JOB_ATTEMPTS[obj.type], |
166 | timeout: JOB_TTL[obj.type], | 166 | timeout: JOB_TTL[obj.type], |
@@ -177,11 +177,11 @@ class JobQueue { | |||
177 | count: number | 177 | count: number |
178 | asc?: boolean | 178 | asc?: boolean |
179 | jobType: JobType | 179 | jobType: JobType |
180 | }): Promise<Bull.Job[]> { | 180 | }): Promise<Job[]> { |
181 | const { state, start, count, asc, jobType } = options | 181 | const { state, start, count, asc, jobType } = options |
182 | 182 | ||
183 | const states = state ? [ state ] : jobStates | 183 | const states = state ? [ state ] : jobStates |
184 | let results: Bull.Job[] = [] | 184 | let results: Job[] = [] |
185 | 185 | ||
186 | const filteredJobTypes = this.filterJobTypes(jobType) | 186 | const filteredJobTypes = this.filterJobTypes(jobType) |
187 | 187 | ||