diff options
Diffstat (limited to 'server/lib/job-queue/handlers')
12 files changed, 68 insertions, 148 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 4a7cda0a2..7034c10d0 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -11,13 +11,7 @@ import { ActorModel } from '../../../models/activitypub/actor' | |||
11 | import { Notifier } from '../../notifier' | 11 | import { Notifier } from '../../notifier' |
12 | import { sequelizeTypescript } from '../../../initializers/database' | 12 | import { sequelizeTypescript } from '../../../initializers/database' |
13 | import { MActor, MActorFollowActors, MActorFull } from '../../../typings/models' | 13 | import { MActor, MActorFollowActors, MActorFull } from '../../../typings/models' |
14 | 14 | import { ActivitypubFollowPayload } from '@shared/models' | |
15 | export type ActivitypubFollowPayload = { | ||
16 | followerActorId: number | ||
17 | name: string | ||
18 | host: string | ||
19 | isAutoFollow?: boolean | ||
20 | } | ||
21 | 15 | ||
22 | async function processActivityPubFollow (job: Bull.Job) { | 16 | async function processActivityPubFollow (job: Bull.Job) { |
23 | const payload = job.data as ActivitypubFollowPayload | 17 | const payload = job.data as ActivitypubFollowPayload |
@@ -34,6 +28,11 @@ async function processActivityPubFollow (job: Bull.Job) { | |||
34 | targetActor = await getOrCreateActorAndServerAndModel(actorUrl, 'all') | 28 | targetActor = await getOrCreateActorAndServerAndModel(actorUrl, 'all') |
35 | } | 29 | } |
36 | 30 | ||
31 | if (payload.assertIsChannel && !targetActor.VideoChannel) { | ||
32 | logger.warn('Do not follow %s@%s because it is not a channel.', payload.name, host) | ||
33 | return | ||
34 | } | ||
35 | |||
37 | const fromActor = await ActorModel.load(payload.followerActorId) | 36 | const fromActor = await ActorModel.load(payload.followerActorId) |
38 | 37 | ||
39 | return retryTransactionWrapper(follow, fromActor, targetActor, payload.isAutoFollow) | 38 | return retryTransactionWrapper(follow, fromActor, targetActor, payload.isAutoFollow) |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 0ff7b44a0..e4d3dbbff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -5,12 +5,7 @@ import { doRequest } from '../../../helpers/requests' | |||
5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
6 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' | 6 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' |
7 | import { ActorFollowScoreCache } from '../../files-cache' | 7 | import { ActorFollowScoreCache } from '../../files-cache' |
8 | 8 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | |
9 | export type ActivitypubHttpBroadcastPayload = { | ||
10 | uris: string[] | ||
11 | signatureActorId?: number | ||
12 | body: any | ||
13 | } | ||
14 | 9 | ||
15 | async function processActivityPubHttpBroadcast (job: Bull.Job) { | 10 | async function processActivityPubHttpBroadcast (job: Bull.Job) { |
16 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 11 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 0182c5169..524aadc27 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -5,22 +5,15 @@ import { processActivities } from '../../activitypub/process' | |||
5 | import { addVideoComments } from '../../activitypub/video-comments' | 5 | import { addVideoComments } from '../../activitypub/video-comments' |
6 | import { crawlCollectionPage } from '../../activitypub/crawl' | 6 | import { crawlCollectionPage } from '../../activitypub/crawl' |
7 | import { VideoModel } from '../../../models/video/video' | 7 | import { VideoModel } from '../../../models/video/video' |
8 | import { addVideoShares, createRates } from '../../activitypub' | 8 | import { addVideoShares } from '../../activitypub/share' |
9 | import { createRates } from '../../activitypub/video-rates' | ||
9 | import { createAccountPlaylists } from '../../activitypub/playlist' | 10 | import { createAccountPlaylists } from '../../activitypub/playlist' |
10 | import { AccountModel } from '../../../models/account/account' | 11 | import { AccountModel } from '../../../models/account/account' |
11 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | 12 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' |
12 | import { VideoShareModel } from '../../../models/video/video-share' | 13 | import { VideoShareModel } from '../../../models/video/video-share' |
13 | import { VideoCommentModel } from '../../../models/video/video-comment' | 14 | import { VideoCommentModel } from '../../../models/video/video-comment' |
14 | import { MAccountDefault, MVideoFullLight } from '../../../typings/models' | 15 | import { MAccountDefault, MVideoFullLight } from '../../../typings/models' |
15 | 16 | import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' | |
16 | type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' | 'account-playlists' | ||
17 | |||
18 | export type ActivitypubHttpFetcherPayload = { | ||
19 | uri: string | ||
20 | type: FetchType | ||
21 | videoId?: number | ||
22 | accountId?: number | ||
23 | } | ||
24 | 17 | ||
25 | async function processActivityPubHttpFetcher (job: Bull.Job) { | 18 | async function processActivityPubHttpFetcher (job: Bull.Job) { |
26 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 19 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index c70ce3be9..b65eeb677 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -4,12 +4,7 @@ import { doRequest } from '../../../helpers/requests' | |||
4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' | 5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' |
6 | import { ActorFollowScoreCache } from '../../files-cache' | 6 | import { ActorFollowScoreCache } from '../../files-cache' |
7 | 7 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | |
8 | export type ActivitypubHttpUnicastPayload = { | ||
9 | uri: string | ||
10 | signatureActorId?: number | ||
11 | body: any | ||
12 | } | ||
13 | 8 | ||
14 | async function processActivityPubHttpUnicast (job: Bull.Job) { | 9 | async function processActivityPubHttpUnicast (job: Bull.Job) { |
15 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 10 | logger.info('Processing ActivityPub unicast in job %d.', job.id) |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 4d6c38cfa..666e56868 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -1,14 +1,12 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { fetchVideoByUrl } from '../../../helpers/video' | 3 | import { fetchVideoByUrl } from '../../../helpers/video' |
4 | import { refreshActorIfNeeded, refreshVideoIfNeeded, refreshVideoPlaylistIfNeeded } from '../../activitypub' | 4 | import { refreshActorIfNeeded } from '../../activitypub/actor' |
5 | import { refreshVideoIfNeeded } from '../../activitypub/videos' | ||
5 | import { ActorModel } from '../../../models/activitypub/actor' | 6 | import { ActorModel } from '../../../models/activitypub/actor' |
6 | import { VideoPlaylistModel } from '../../../models/video/video-playlist' | 7 | import { VideoPlaylistModel } from '../../../models/video/video-playlist' |
7 | 8 | import { RefreshPayload } from '@shared/models' | |
8 | export type RefreshPayload = { | 9 | import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlist' |
9 | type: 'video' | 'video-playlist' | 'actor' | ||
10 | url: string | ||
11 | } | ||
12 | 10 | ||
13 | async function refreshAPObject (job: Bull.Job) { | 11 | async function refreshAPObject (job: Bull.Job) { |
14 | const payload = job.data as RefreshPayload | 12 | const payload = job.data as RefreshPayload |
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 62701222c..3157731e2 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -1,8 +1,7 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { Emailer, SendEmailOptions } from '../../emailer' | 3 | import { Emailer } from '../../emailer' |
4 | 4 | import { EmailPayload } from '@shared/models' | |
5 | export type EmailPayload = SendEmailOptions | ||
6 | 5 | ||
7 | async function processEmail (job: Bull.Job) { | 6 | async function processEmail (job: Bull.Job) { |
8 | const payload = job.data as EmailPayload | 7 | const payload = job.data as EmailPayload |
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts index d3bde6e6a..bcb49a731 100644 --- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -1,11 +1,12 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | 1 | import { buildSignedActivity } from '../../../../helpers/activitypub' |
2 | import { getServerActor } from '../../../../helpers/utils' | ||
3 | import { ActorModel } from '../../../../models/activitypub/actor' | 2 | import { ActorModel } from '../../../../models/activitypub/actor' |
4 | import { sha256 } from '../../../../helpers/core-utils' | 3 | import { ACTIVITY_PUB, HTTP_SIGNATURE } from '../../../../initializers/constants' |
5 | import { HTTP_SIGNATURE } from '../../../../initializers/constants' | ||
6 | import { MActor } from '../../../../typings/models' | 4 | import { MActor } from '../../../../typings/models' |
5 | import { getServerActor } from '@server/models/application/application' | ||
6 | import { buildDigest } from '@server/helpers/peertube-crypto' | ||
7 | import { ContextType } from '@shared/models/activitypub/context' | ||
7 | 8 | ||
8 | type Payload = { body: any, signatureActorId?: number } | 9 | type Payload = { body: any, contextType?: ContextType, signatureActorId?: number } |
9 | 10 | ||
10 | async function computeBody (payload: Payload) { | 11 | async function computeBody (payload: Payload) { |
11 | let body = payload.body | 12 | let body = payload.body |
@@ -13,7 +14,7 @@ async function computeBody (payload: Payload) { | |||
13 | if (payload.signatureActorId) { | 14 | if (payload.signatureActorId) { |
14 | const actorSignature = await ActorModel.load(payload.signatureActorId) | 15 | const actorSignature = await ActorModel.load(payload.signatureActorId) |
15 | if (!actorSignature) throw new Error('Unknown signature actor id.') | 16 | if (!actorSignature) throw new Error('Unknown signature actor id.') |
16 | body = await buildSignedActivity(actorSignature, payload.body) | 17 | body = await buildSignedActivity(actorSignature, payload.body, payload.contextType) |
17 | } | 18 | } |
18 | 19 | ||
19 | return body | 20 | return body |
@@ -42,18 +43,13 @@ async function buildSignedRequestOptions (payload: Payload) { | |||
42 | 43 | ||
43 | function buildGlobalHeaders (body: any) { | 44 | function buildGlobalHeaders (body: any) { |
44 | return { | 45 | return { |
45 | 'Digest': buildDigest(body) | 46 | 'Digest': buildDigest(body), |
47 | 'Content-Type': 'application/activity+json', | ||
48 | 'Accept': ACTIVITY_PUB.ACCEPT_HEADER | ||
46 | } | 49 | } |
47 | } | 50 | } |
48 | 51 | ||
49 | function buildDigest (body: any) { | ||
50 | const rawBody = typeof body === 'string' ? body : JSON.stringify(body) | ||
51 | |||
52 | return 'SHA-256=' + sha256(rawBody, 'base64') | ||
53 | } | ||
54 | |||
55 | export { | 52 | export { |
56 | buildDigest, | ||
57 | buildGlobalHeaders, | 53 | buildGlobalHeaders, |
58 | computeBody, | 54 | computeBody, |
59 | buildSignedRequestOptions | 55 | buildSignedRequestOptions |
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 99c991e72..ae11f1de3 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -9,11 +9,7 @@ import { extname } from 'path' | |||
9 | import { MVideoFile, MVideoWithFile } from '@server/typings/models' | 9 | import { MVideoFile, MVideoWithFile } from '@server/typings/models' |
10 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | 10 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' |
11 | import { getVideoFilePath } from '@server/lib/video-paths' | 11 | import { getVideoFilePath } from '@server/lib/video-paths' |
12 | 12 | import { VideoFileImportPayload } from '@shared/models' | |
13 | export type VideoFileImportPayload = { | ||
14 | videoUUID: string, | ||
15 | filePath: string | ||
16 | } | ||
17 | 13 | ||
18 | async function processVideoFileImport (job: Bull.Job) { | 14 | async function processVideoFileImport (job: Bull.Job) { |
19 | const payload = job.data as VideoFileImportPayload | 15 | const payload = job.data as VideoFileImportPayload |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 1fca17584..ad549c6fc 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -7,9 +7,8 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro | |||
7 | import { extname } from 'path' | 7 | import { extname } from 'path' |
8 | import { VideoFileModel } from '../../../models/video/video-file' | 8 | import { VideoFileModel } from '../../../models/video/video-file' |
9 | import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants' | 9 | import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants' |
10 | import { VideoState } from '../../../../shared' | 10 | import { VideoImportPayload, VideoImportTorrentPayload, VideoImportYoutubeDLPayload, VideoState } from '../../../../shared' |
11 | import { JobQueue } from '../index' | 11 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
12 | import { federateVideoIfNeeded } from '../../activitypub' | ||
13 | import { VideoModel } from '../../../models/video/video' | 12 | import { VideoModel } from '../../../models/video/video' |
14 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | 13 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' |
15 | import { getSecureTorrentName } from '../../../helpers/utils' | 14 | import { getSecureTorrentName } from '../../../helpers/utils' |
@@ -17,27 +16,12 @@ import { move, remove, stat } from 'fs-extra' | |||
17 | import { Notifier } from '../../notifier' | 16 | import { Notifier } from '../../notifier' |
18 | import { CONFIG } from '../../../initializers/config' | 17 | import { CONFIG } from '../../../initializers/config' |
19 | import { sequelizeTypescript } from '../../../initializers/database' | 18 | import { sequelizeTypescript } from '../../../initializers/database' |
20 | import { createVideoMiniatureFromUrl, generateVideoMiniature } from '../../thumbnail' | 19 | import { generateVideoMiniature } from '../../thumbnail' |
21 | import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type' | 20 | import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type' |
22 | import { MThumbnail } from '../../../typings/models/video/thumbnail' | 21 | import { MThumbnail } from '../../../typings/models/video/thumbnail' |
23 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/typings/models/video/video-import' | 22 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/typings/models/video/video-import' |
24 | import { getVideoFilePath } from '@server/lib/video-paths' | 23 | import { getVideoFilePath } from '@server/lib/video-paths' |
25 | 24 | import { addOptimizeOrMergeAudioJob } from '@server/helpers/video' | |
26 | type VideoImportYoutubeDLPayload = { | ||
27 | type: 'youtube-dl' | ||
28 | videoImportId: number | ||
29 | |||
30 | thumbnailUrl: string | ||
31 | downloadThumbnail: boolean | ||
32 | downloadPreview: boolean | ||
33 | } | ||
34 | |||
35 | type VideoImportTorrentPayload = { | ||
36 | type: 'magnet-uri' | 'torrent-file' | ||
37 | videoImportId: number | ||
38 | } | ||
39 | |||
40 | export type VideoImportPayload = VideoImportYoutubeDLPayload | VideoImportTorrentPayload | ||
41 | 25 | ||
42 | async function processVideoImport (job: Bull.Job) { | 26 | async function processVideoImport (job: Bull.Job) { |
43 | const payload = job.data as VideoImportPayload | 27 | const payload = job.data as VideoImportPayload |
@@ -62,9 +46,6 @@ async function processTorrentImport (job: Bull.Job, payload: VideoImportTorrentP | |||
62 | const options = { | 46 | const options = { |
63 | videoImportId: payload.videoImportId, | 47 | videoImportId: payload.videoImportId, |
64 | 48 | ||
65 | downloadThumbnail: false, | ||
66 | downloadPreview: false, | ||
67 | |||
68 | generateThumbnail: true, | 49 | generateThumbnail: true, |
69 | generatePreview: true | 50 | generatePreview: true |
70 | } | 51 | } |
@@ -82,15 +63,11 @@ async function processYoutubeDLImport (job: Bull.Job, payload: VideoImportYoutub | |||
82 | const options = { | 63 | const options = { |
83 | videoImportId: videoImport.id, | 64 | videoImportId: videoImport.id, |
84 | 65 | ||
85 | downloadThumbnail: payload.downloadThumbnail, | 66 | generateThumbnail: payload.generateThumbnail, |
86 | downloadPreview: payload.downloadPreview, | 67 | generatePreview: payload.generatePreview |
87 | thumbnailUrl: payload.thumbnailUrl, | ||
88 | |||
89 | generateThumbnail: false, | ||
90 | generatePreview: false | ||
91 | } | 68 | } |
92 | 69 | ||
93 | return processFile(() => downloadYoutubeDLVideo(videoImport.targetUrl, VIDEO_IMPORT_TIMEOUT), videoImport, options) | 70 | return processFile(() => downloadYoutubeDLVideo(videoImport.targetUrl, payload.fileExt, VIDEO_IMPORT_TIMEOUT), videoImport, options) |
94 | } | 71 | } |
95 | 72 | ||
96 | async function getVideoImportOrDie (videoImportId: number) { | 73 | async function getVideoImportOrDie (videoImportId: number) { |
@@ -105,10 +82,6 @@ async function getVideoImportOrDie (videoImportId: number) { | |||
105 | type ProcessFileOptions = { | 82 | type ProcessFileOptions = { |
106 | videoImportId: number | 83 | videoImportId: number |
107 | 84 | ||
108 | downloadThumbnail: boolean | ||
109 | downloadPreview: boolean | ||
110 | thumbnailUrl?: string | ||
111 | |||
112 | generateThumbnail: boolean | 85 | generateThumbnail: boolean |
113 | generatePreview: boolean | 86 | generatePreview: boolean |
114 | } | 87 | } |
@@ -153,17 +126,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
153 | 126 | ||
154 | // Process thumbnail | 127 | // Process thumbnail |
155 | let thumbnailModel: MThumbnail | 128 | let thumbnailModel: MThumbnail |
156 | if (options.downloadThumbnail && options.thumbnailUrl) { | 129 | if (options.generateThumbnail) { |
157 | thumbnailModel = await createVideoMiniatureFromUrl(options.thumbnailUrl, videoImportWithFiles.Video, ThumbnailType.MINIATURE) | ||
158 | } else if (options.generateThumbnail || options.downloadThumbnail) { | ||
159 | thumbnailModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.MINIATURE) | 130 | thumbnailModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.MINIATURE) |
160 | } | 131 | } |
161 | 132 | ||
162 | // Process preview | 133 | // Process preview |
163 | let previewModel: MThumbnail | 134 | let previewModel: MThumbnail |
164 | if (options.downloadPreview && options.thumbnailUrl) { | 135 | if (options.generatePreview) { |
165 | previewModel = await createVideoMiniatureFromUrl(options.thumbnailUrl, videoImportWithFiles.Video, ThumbnailType.PREVIEW) | ||
166 | } else if (options.generatePreview || options.downloadPreview) { | ||
167 | previewModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.PREVIEW) | 136 | previewModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.PREVIEW) |
168 | } | 137 | } |
169 | 138 | ||
@@ -214,14 +183,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
214 | 183 | ||
215 | // Create transcoding jobs? | 184 | // Create transcoding jobs? |
216 | if (video.state === VideoState.TO_TRANSCODE) { | 185 | if (video.state === VideoState.TO_TRANSCODE) { |
217 | // Put uuid because we don't have id auto incremented for now | 186 | await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile) |
218 | const dataInput = { | ||
219 | type: 'optimize' as 'optimize', | ||
220 | videoUUID: videoImportUpdated.Video.uuid, | ||
221 | isNewVideo: true | ||
222 | } | ||
223 | |||
224 | await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | ||
225 | } | 187 | } |
226 | 188 | ||
227 | } catch (err) { | 189 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts new file mode 100644 index 000000000..6296dab05 --- /dev/null +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -0,0 +1,17 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' | ||
4 | import { VideoRedundancyPayload } from '@shared/models' | ||
5 | |||
6 | async function processVideoRedundancy (job: Bull.Job) { | ||
7 | const payload = job.data as VideoRedundancyPayload | ||
8 | logger.info('Processing video redundancy in job %d.', job.id) | ||
9 | |||
10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | ||
11 | } | ||
12 | |||
13 | // --------------------------------------------------------------------------- | ||
14 | |||
15 | export { | ||
16 | processVideoRedundancy | ||
17 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 39b9fac98..46d52e1cf 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,48 +1,22 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { VideoResolution } from '../../../../shared' | 2 | import { |
3 | MergeAudioTranscodingPayload, | ||
4 | NewResolutionTranscodingPayload, | ||
5 | OptimizeTranscodingPayload, | ||
6 | VideoTranscodingPayload | ||
7 | } from '../../../../shared' | ||
3 | import { logger } from '../../../helpers/logger' | 8 | import { logger } from '../../../helpers/logger' |
4 | import { VideoModel } from '../../../models/video/video' | 9 | import { VideoModel } from '../../../models/video/video' |
5 | import { JobQueue } from '../job-queue' | 10 | import { JobQueue } from '../job-queue' |
6 | import { federateVideoIfNeeded } from '../../activitypub' | 11 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 12 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
8 | import { sequelizeTypescript } from '../../../initializers' | 13 | import { sequelizeTypescript } from '../../../initializers/database' |
9 | import * as Bluebird from 'bluebird' | ||
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | 14 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' |
11 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' | 15 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' |
12 | import { Notifier } from '../../notifier' | 16 | import { Notifier } from '../../notifier' |
13 | import { CONFIG } from '../../../initializers/config' | 17 | import { CONFIG } from '../../../initializers/config' |
14 | import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/typings/models' | 18 | import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/typings/models' |
15 | 19 | ||
16 | interface BaseTranscodingPayload { | ||
17 | videoUUID: string | ||
18 | isNewVideo?: boolean | ||
19 | } | ||
20 | |||
21 | interface HLSTranscodingPayload extends BaseTranscodingPayload { | ||
22 | type: 'hls' | ||
23 | isPortraitMode?: boolean | ||
24 | resolution: VideoResolution | ||
25 | copyCodecs: boolean | ||
26 | } | ||
27 | |||
28 | interface NewResolutionTranscodingPayload extends BaseTranscodingPayload { | ||
29 | type: 'new-resolution' | ||
30 | isPortraitMode?: boolean | ||
31 | resolution: VideoResolution | ||
32 | } | ||
33 | |||
34 | interface MergeAudioTranscodingPayload extends BaseTranscodingPayload { | ||
35 | type: 'merge-audio' | ||
36 | resolution: VideoResolution | ||
37 | } | ||
38 | |||
39 | interface OptimizeTranscodingPayload extends BaseTranscodingPayload { | ||
40 | type: 'optimize' | ||
41 | } | ||
42 | |||
43 | export type VideoTranscodingPayload = HLSTranscodingPayload | NewResolutionTranscodingPayload | ||
44 | | OptimizeTranscodingPayload | MergeAudioTranscodingPayload | ||
45 | |||
46 | async function processVideoTranscoding (job: Bull.Job) { | 20 | async function processVideoTranscoding (job: Bull.Job) { |
47 | const payload = job.data as VideoTranscodingPayload | 21 | const payload = job.data as VideoTranscodingPayload |
48 | logger.info('Processing video file in job %d.', job.id) | 22 | logger.info('Processing video file in job %d.', job.id) |
@@ -105,7 +79,7 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
105 | 79 | ||
106 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { | 80 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
107 | // Maybe the video changed in database, refresh it | 81 | // Maybe the video changed in database, refresh it |
108 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) | 82 | const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) |
109 | // Video does not exist anymore | 83 | // Video does not exist anymore |
110 | if (!videoDatabase) return undefined | 84 | if (!videoDatabase) return undefined |
111 | 85 | ||
@@ -118,12 +92,11 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
118 | 92 | ||
119 | let videoPublished = false | 93 | let videoPublished = false |
120 | 94 | ||
95 | // Generate HLS version of the max quality file | ||
121 | const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getMaxQualityFile().resolution }) | 96 | const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getMaxQualityFile().resolution }) |
122 | await createHlsJobIfEnabled(hlsPayload) | 97 | await createHlsJobIfEnabled(hlsPayload) |
123 | 98 | ||
124 | if (resolutionsEnabled.length !== 0) { | 99 | if (resolutionsEnabled.length !== 0) { |
125 | const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = [] | ||
126 | |||
127 | for (const resolution of resolutionsEnabled) { | 100 | for (const resolution of resolutionsEnabled) { |
128 | let dataInput: VideoTranscodingPayload | 101 | let dataInput: VideoTranscodingPayload |
129 | 102 | ||
@@ -143,12 +116,9 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
143 | } | 116 | } |
144 | } | 117 | } |
145 | 118 | ||
146 | const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 119 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) |
147 | tasks.push(p) | ||
148 | } | 120 | } |
149 | 121 | ||
150 | await Promise.all(tasks) | ||
151 | |||
152 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 122 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
153 | } else { | 123 | } else { |
154 | // No transcoding to do, it's now published | 124 | // No transcoding to do, it's now published |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index 73fa5ed04..7211df237 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -3,7 +3,7 @@ import { logger } from '../../../helpers/logger' | |||
3 | import { VideoModel } from '../../../models/video/video' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-views' | 4 | import { VideoViewModel } from '../../../models/video/video-views' |
5 | import { isTestInstance } from '../../../helpers/core-utils' | 5 | import { isTestInstance } from '../../../helpers/core-utils' |
6 | import { federateVideoIfNeeded } from '../../activitypub' | 6 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
7 | 7 | ||
8 | async function processVideosViews () { | 8 | async function processVideosViews () { |
9 | const lastHour = new Date() | 9 | const lastHour = new Date() |
@@ -23,6 +23,8 @@ async function processVideosViews () { | |||
23 | for (const videoId of videoIds) { | 23 | for (const videoId of videoIds) { |
24 | try { | 24 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 25 | const views = await Redis.Instance.getVideoViews(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
27 | |||
26 | if (views) { | 28 | if (views) { |
27 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) |
28 | 30 | ||
@@ -52,8 +54,6 @@ async function processVideosViews () { | |||
52 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) |
53 | } | 55 | } |
54 | } | 56 | } |
55 | |||
56 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
57 | } catch (err) { | 57 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 59 | } |