aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts7
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts5
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts24
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts14
-rw-r--r--server/lib/job-queue/handlers/email.ts11
-rw-r--r--server/lib/job-queue/handlers/utils/activitypub-http-utils.ts4
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts78
-rw-r--r--server/lib/job-queue/handlers/video-import.ts48
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts (renamed from server/lib/job-queue/handlers/video-file.ts)98
-rw-r--r--server/lib/job-queue/job-queue.ts18
11 files changed, 216 insertions, 95 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index b4d381062..b3defb617 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -1,6 +1,6 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { CONFIG, REMOTE_SCHEME, sequelizeTypescript } from '../../../initializers' 3import { REMOTE_SCHEME, WEBSERVER } from '../../../initializers/constants'
4import { sendFollow } from '../../activitypub/send' 4import { sendFollow } from '../../activitypub/send'
5import { sanitizeHost } from '../../../helpers/core-utils' 5import { sanitizeHost } from '../../../helpers/core-utils'
6import { loadActorUrlOrGetFromWebfinger } from '../../../helpers/webfinger' 6import { loadActorUrlOrGetFromWebfinger } from '../../../helpers/webfinger'
@@ -9,6 +9,7 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils'
9import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 9import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
10import { ActorModel } from '../../../models/activitypub/actor' 10import { ActorModel } from '../../../models/activitypub/actor'
11import { Notifier } from '../../notifier' 11import { Notifier } from '../../notifier'
12import { sequelizeTypescript } from '../../../initializers/database'
12 13
13export type ActivitypubFollowPayload = { 14export type ActivitypubFollowPayload = {
14 followerActorId: number 15 followerActorId: number
@@ -23,7 +24,7 @@ async function processActivityPubFollow (job: Bull.Job) {
23 logger.info('Processing ActivityPub follow in job %d.', job.id) 24 logger.info('Processing ActivityPub follow in job %d.', job.id)
24 25
25 let targetActor: ActorModel 26 let targetActor: ActorModel
26 if (!host || host === CONFIG.WEBSERVER.HOST) { 27 if (!host || host === WEBSERVER.HOST) {
27 targetActor = await ActorModel.loadLocalByName(payload.name) 28 targetActor = await ActorModel.loadLocalByName(payload.name)
28 } else { 29 } else {
29 const sanitizedHost = sanitizeHost(host, REMOTE_SCHEME.HTTP) 30 const sanitizedHost = sanitizeHost(host, REMOTE_SCHEME.HTTP)
@@ -73,5 +74,5 @@ async function follow (fromActor: ActorModel, targetActor: ActorModel) {
73 return actorFollow 74 return actorFollow
74 }) 75 })
75 76
76 if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewFollow(actorFollow) 77 if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewUserFollow(actorFollow)
77} 78}
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 9493945ff..0ff7b44a0 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -2,10 +2,9 @@ import * as Bull from 'bull'
2import * as Bluebird from 'bluebird' 2import * as Bluebird from 'bluebird'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
5import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 5import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' 6import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants'
8import { ActorFollowScoreCache } from '../../cache' 7import { ActorFollowScoreCache } from '../../files-cache'
9 8
10export type ActivitypubHttpBroadcastPayload = { 9export type ActivitypubHttpBroadcastPayload = {
11 uris: string[] 10 uris: string[]
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 67ccfa995..23d33c26f 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,17 +1,24 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import * as Bluebird from 'bluebird'
2import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
3import { processActivities } from '../../activitypub/process' 4import { processActivities } from '../../activitypub/process'
4import { addVideoComments } from '../../activitypub/video-comments' 5import { addVideoComments } from '../../activitypub/video-comments'
5import { crawlCollectionPage } from '../../activitypub/crawl' 6import { crawlCollectionPage } from '../../activitypub/crawl'
6import { VideoModel } from '../../../models/video/video' 7import { VideoModel } from '../../../models/video/video'
7import { addVideoShares, createRates } from '../../activitypub' 8import { addVideoShares, createRates } from '../../activitypub'
9import { createAccountPlaylists } from '../../activitypub/playlist'
10import { AccountModel } from '../../../models/account/account'
11import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
12import { VideoShareModel } from '../../../models/video/video-share'
13import { VideoCommentModel } from '../../../models/video/video-comment'
8 14
9type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' 15type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' | 'account-playlists'
10 16
11export type ActivitypubHttpFetcherPayload = { 17export type ActivitypubHttpFetcherPayload = {
12 uri: string 18 uri: string
13 type: FetchType 19 type: FetchType
14 videoId?: number 20 videoId?: number
21 accountId?: number
15} 22}
16 23
17async function processActivityPubHttpFetcher (job: Bull.Job) { 24async function processActivityPubHttpFetcher (job: Bull.Job) {
@@ -22,15 +29,26 @@ async function processActivityPubHttpFetcher (job: Bull.Job) {
22 let video: VideoModel 29 let video: VideoModel
23 if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) 30 if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
24 31
32 let account: AccountModel
33 if (payload.accountId) account = await AccountModel.load(payload.accountId)
34
25 const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = { 35 const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = {
26 'activity': items => processActivities(items, { outboxUrl: payload.uri }), 36 'activity': items => processActivities(items, { outboxUrl: payload.uri }),
27 'video-likes': items => createRates(items, video, 'like'), 37 'video-likes': items => createRates(items, video, 'like'),
28 'video-dislikes': items => createRates(items, video, 'dislike'), 38 'video-dislikes': items => createRates(items, video, 'dislike'),
29 'video-shares': items => addVideoShares(items, video), 39 'video-shares': items => addVideoShares(items, video),
30 'video-comments': items => addVideoComments(items, video) 40 'video-comments': items => addVideoComments(items, video),
41 'account-playlists': items => createAccountPlaylists(items, account)
42 }
43
44 const cleanerType: { [ id in FetchType ]?: (crawlStartDate: Date) => Bluebird<any> } = {
45 'video-likes': crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'like' as 'like', crawlStartDate),
46 'video-dislikes': crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'dislike' as 'dislike', crawlStartDate),
47 'video-shares': crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate),
48 'video-comments': crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
31 } 49 }
32 50
33 return crawlCollectionPage(payload.uri, fetcherType[payload.type]) 51 return crawlCollectionPage(payload.uri, fetcherType[payload.type], cleanerType[payload.type])
34} 52}
35 53
36// --------------------------------------------------------------------------- 54// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 3973dcdc8..c70ce3be9 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -2,8 +2,8 @@ import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 4import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
5import { JOB_REQUEST_TIMEOUT } from '../../../initializers' 5import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants'
6import { ActorFollowScoreCache } from '../../cache' 6import { ActorFollowScoreCache } from '../../files-cache'
7 7
8export type ActivitypubHttpUnicastPayload = { 8export type ActivitypubHttpUnicastPayload = {
9 uri: string 9 uri: string
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 454b975fe..4d6c38cfa 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -1,11 +1,12 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { fetchVideoByUrl } from '../../../helpers/video' 3import { fetchVideoByUrl } from '../../../helpers/video'
4import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub' 4import { refreshActorIfNeeded, refreshVideoIfNeeded, refreshVideoPlaylistIfNeeded } from '../../activitypub'
5import { ActorModel } from '../../../models/activitypub/actor' 5import { ActorModel } from '../../../models/activitypub/actor'
6import { VideoPlaylistModel } from '../../../models/video/video-playlist'
6 7
7export type RefreshPayload = { 8export type RefreshPayload = {
8 type: 'video' | 'actor' 9 type: 'video' | 'video-playlist' | 'actor'
9 url: string 10 url: string
10} 11}
11 12
@@ -15,13 +16,13 @@ async function refreshAPObject (job: Bull.Job) {
15 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) 16 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
16 17
17 if (payload.type === 'video') return refreshVideo(payload.url) 18 if (payload.type === 'video') return refreshVideo(payload.url)
19 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
18 if (payload.type === 'actor') return refreshActor(payload.url) 20 if (payload.type === 'actor') return refreshActor(payload.url)
19} 21}
20 22
21// --------------------------------------------------------------------------- 23// ---------------------------------------------------------------------------
22 24
23export { 25export {
24 refreshActor,
25 refreshAPObject 26 refreshAPObject
26} 27}
27 28
@@ -50,5 +51,12 @@ async function refreshActor (actorUrl: string) {
50 if (actor) { 51 if (actor) {
51 await refreshActorIfNeeded(actor, fetchType) 52 await refreshActorIfNeeded(actor, fetchType)
52 } 53 }
54}
55
56async function refreshVideoPlaylist (playlistUrl: string) {
57 const playlist = await VideoPlaylistModel.loadByUrlAndPopulateAccount(playlistUrl)
53 58
59 if (playlist) {
60 await refreshVideoPlaylistIfNeeded(playlist)
61 }
54} 62}
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 2ba39a156..62701222c 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -1,15 +1,8 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { Emailer } from '../../emailer' 3import { Emailer, SendEmailOptions } from '../../emailer'
4 4
5export type EmailPayload = { 5export type EmailPayload = SendEmailOptions
6 to: string[]
7 subject: string
8 text: string
9
10 fromDisplayName?: string
11 replyTo?: string
12}
13 6
14async function processEmail (job: Bull.Job) { 7async function processEmail (job: Bull.Job) {
15 const payload = job.data as EmailPayload 8 const payload = job.data as EmailPayload
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
index 4961d4502..cdee1f6fd 100644
--- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
+++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
@@ -2,7 +2,7 @@ import { buildSignedActivity } from '../../../../helpers/activitypub'
2import { getServerActor } from '../../../../helpers/utils' 2import { getServerActor } from '../../../../helpers/utils'
3import { ActorModel } from '../../../../models/activitypub/actor' 3import { ActorModel } from '../../../../models/activitypub/actor'
4import { sha256 } from '../../../../helpers/core-utils' 4import { sha256 } from '../../../../helpers/core-utils'
5import { HTTP_SIGNATURE } from '../../../../initializers' 5import { HTTP_SIGNATURE } from '../../../../initializers/constants'
6 6
7type Payload = { body: any, signatureActorId?: number } 7type Payload = { body: any, signatureActorId?: number }
8 8
@@ -28,7 +28,7 @@ async function buildSignedRequestOptions (payload: Payload) {
28 actor = await getServerActor() 28 actor = await getServerActor()
29 } 29 }
30 30
31 const keyId = actor.getWebfingerUrl() 31 const keyId = actor.url
32 return { 32 return {
33 algorithm: HTTP_SIGNATURE.ALGORITHM, 33 algorithm: HTTP_SIGNATURE.ALGORITHM,
34 authorizationHeaderName: HTTP_SIGNATURE.HEADER_NAME, 34 authorizationHeaderName: HTTP_SIGNATURE.HEADER_NAME,
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
new file mode 100644
index 000000000..921d9a083
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -0,0 +1,78 @@
1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video'
4import { publishVideoIfNeeded } from './video-transcoding'
5import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils'
6import { copy, stat } from 'fs-extra'
7import { VideoFileModel } from '../../../models/video/video-file'
8import { extname } from 'path'
9
10export type VideoFileImportPayload = {
11 videoUUID: string,
12 filePath: string
13}
14
15async function processVideoFileImport (job: Bull.Job) {
16 const payload = job.data as VideoFileImportPayload
17 logger.info('Processing video file import in job %d.', job.id)
18
19 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
20 // No video, maybe deleted?
21 if (!video) {
22 logger.info('Do not process job %d, video does not exist.', job.id)
23 return undefined
24 }
25
26 await updateVideoFile(video, payload.filePath)
27
28 await publishVideoIfNeeded(video)
29 return video
30}
31
32// ---------------------------------------------------------------------------
33
34export {
35 processVideoFileImport
36}
37
38// ---------------------------------------------------------------------------
39
40async function updateVideoFile (video: VideoModel, inputFilePath: string) {
41 const { videoFileResolution } = await getVideoFileResolution(inputFilePath)
42 const { size } = await stat(inputFilePath)
43 const fps = await getVideoFileFPS(inputFilePath)
44
45 let updatedVideoFile = new VideoFileModel({
46 resolution: videoFileResolution,
47 extname: extname(inputFilePath),
48 size,
49 fps,
50 videoId: video.id
51 })
52
53 const currentVideoFile = video.VideoFiles.find(videoFile => videoFile.resolution === updatedVideoFile.resolution)
54
55 if (currentVideoFile) {
56 // Remove old file and old torrent
57 await video.removeFile(currentVideoFile)
58 await video.removeTorrent(currentVideoFile)
59 // Remove the old video file from the array
60 video.VideoFiles = video.VideoFiles.filter(f => f !== currentVideoFile)
61
62 // Update the database
63 currentVideoFile.set('extname', updatedVideoFile.extname)
64 currentVideoFile.set('size', updatedVideoFile.size)
65 currentVideoFile.set('fps', updatedVideoFile.fps)
66
67 updatedVideoFile = currentVideoFile
68 }
69
70 const outputPath = video.getVideoFilePath(updatedVideoFile)
71 await copy(inputFilePath, outputPath)
72
73 await video.createTorrentAndSetInfoHash(updatedVideoFile)
74
75 await updatedVideoFile.save()
76
77 video.VideoFiles.push(updatedVideoFile)
78}
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 12004dcd7..1650916a6 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -6,16 +6,20 @@ import { VideoImportState } from '../../../../shared/models/videos'
6import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils' 6import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils'
7import { extname, join } from 'path' 7import { extname, join } from 'path'
8import { VideoFileModel } from '../../../models/video/video-file' 8import { VideoFileModel } from '../../../models/video/video-file'
9import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' 9import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants'
10import { downloadImage } from '../../../helpers/requests'
11import { VideoState } from '../../../../shared' 10import { VideoState } from '../../../../shared'
12import { JobQueue } from '../index' 11import { JobQueue } from '../index'
13import { federateVideoIfNeeded } from '../../activitypub' 12import { federateVideoIfNeeded } from '../../activitypub'
14import { VideoModel } from '../../../models/video/video' 13import { VideoModel } from '../../../models/video/video'
15import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' 14import { downloadWebTorrentVideo } from '../../../helpers/webtorrent'
16import { getSecureTorrentName } from '../../../helpers/utils' 15import { getSecureTorrentName } from '../../../helpers/utils'
17import { remove, move, stat } from 'fs-extra' 16import { move, remove, stat } from 'fs-extra'
18import { Notifier } from '../../notifier' 17import { Notifier } from '../../notifier'
18import { CONFIG } from '../../../initializers/config'
19import { sequelizeTypescript } from '../../../initializers/database'
20import { ThumbnailModel } from '../../../models/video/thumbnail'
21import { createVideoMiniatureFromUrl, generateVideoMiniature } from '../../thumbnail'
22import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type'
19 23
20type VideoImportYoutubeDLPayload = { 24type VideoImportYoutubeDLPayload = {
21 type: 'youtube-dl' 25 type: 'youtube-dl'
@@ -144,25 +148,19 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
144 tempVideoPath = null // This path is not used anymore 148 tempVideoPath = null // This path is not used anymore
145 149
146 // Process thumbnail 150 // Process thumbnail
147 if (options.downloadThumbnail) { 151 let thumbnailModel: ThumbnailModel
148 if (options.thumbnailUrl) { 152 if (options.downloadThumbnail && options.thumbnailUrl) {
149 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE) 153 thumbnailModel = await createVideoMiniatureFromUrl(options.thumbnailUrl, videoImport.Video, ThumbnailType.MINIATURE)
150 } else { 154 } else if (options.generateThumbnail || options.downloadThumbnail) {
151 await videoImport.Video.createThumbnail(videoFile) 155 thumbnailModel = await generateVideoMiniature(videoImport.Video, videoFile, ThumbnailType.MINIATURE)
152 }
153 } else if (options.generateThumbnail) {
154 await videoImport.Video.createThumbnail(videoFile)
155 } 156 }
156 157
157 // Process preview 158 // Process preview
158 if (options.downloadPreview) { 159 let previewModel: ThumbnailModel
159 if (options.thumbnailUrl) { 160 if (options.downloadPreview && options.thumbnailUrl) {
160 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE) 161 previewModel = await createVideoMiniatureFromUrl(options.thumbnailUrl, videoImport.Video, ThumbnailType.PREVIEW)
161 } else { 162 } else if (options.generatePreview || options.downloadPreview) {
162 await videoImport.Video.createPreview(videoFile) 163 previewModel = await generateVideoMiniature(videoImport.Video, videoFile, ThumbnailType.PREVIEW)
163 }
164 } else if (options.generatePreview) {
165 await videoImport.Video.createPreview(videoFile)
166 } 164 }
167 165
168 // Create torrent 166 // Create torrent
@@ -182,6 +180,9 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
182 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED 180 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED
183 await video.save({ transaction: t }) 181 await video.save({ transaction: t })
184 182
183 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
184 if (previewModel) await video.addAndSaveThumbnail(previewModel, t)
185
185 // Now we can federate the video (reload from database, we need more attributes) 186 // Now we can federate the video (reload from database, we need more attributes)
186 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 187 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
187 await federateVideoIfNeeded(videoForFederation, true, t) 188 await federateVideoIfNeeded(videoForFederation, true, t)
@@ -196,9 +197,14 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
196 return videoImportUpdated 197 return videoImportUpdated
197 }) 198 })
198 199
199 Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video)
200 Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) 200 Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true)
201 201
202 if (videoImportUpdated.Video.VideoBlacklist) {
203 Notifier.Instance.notifyOnVideoAutoBlacklist(videoImportUpdated.Video)
204 } else {
205 Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video)
206 }
207
202 // Create transcoding jobs? 208 // Create transcoding jobs?
203 if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { 209 if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) {
204 // Put uuid because we don't have id auto incremented for now 210 // Put uuid because we don't have id auto incremented for now
@@ -207,7 +213,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
207 isNewVideo: true 213 isNewVideo: true
208 } 214 }
209 215
210 await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) 216 await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
211 } 217 }
212 218
213 } catch (err) { 219 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 593e43cc5..48cac517e 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -8,40 +8,20 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils'
8import { sequelizeTypescript } from '../../../initializers' 8import { sequelizeTypescript } from '../../../initializers'
9import * as Bluebird from 'bluebird' 9import * as Bluebird from 'bluebird'
10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' 10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
11import { importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' 11import { generateHlsPlaylist, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
12import { Notifier } from '../../notifier' 12import { Notifier } from '../../notifier'
13import { CONFIG } from '../../../initializers/config'
13 14
14export type VideoFilePayload = { 15export type VideoTranscodingPayload = {
15 videoUUID: string 16 videoUUID: string
16 isNewVideo?: boolean
17 resolution?: VideoResolution 17 resolution?: VideoResolution
18 isNewVideo?: boolean
18 isPortraitMode?: boolean 19 isPortraitMode?: boolean
20 generateHlsPlaylist?: boolean
19} 21}
20 22
21export type VideoFileImportPayload = { 23async function processVideoTranscoding (job: Bull.Job) {
22 videoUUID: string, 24 const payload = job.data as VideoTranscodingPayload
23 filePath: string
24}
25
26async function processVideoFileImport (job: Bull.Job) {
27 const payload = job.data as VideoFileImportPayload
28 logger.info('Processing video file import in job %d.', job.id)
29
30 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
31 // No video, maybe deleted?
32 if (!video) {
33 logger.info('Do not process job %d, video does not exist.', job.id)
34 return undefined
35 }
36
37 await importVideoFile(video, payload.filePath)
38
39 await onVideoFileTranscoderOrImportSuccess(video)
40 return video
41}
42
43async function processVideoFile (job: Bull.Job) {
44 const payload = job.data as VideoFilePayload
45 logger.info('Processing video file in job %d.', job.id) 25 logger.info('Processing video file in job %d.', job.id)
46 26
47 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) 27 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
@@ -51,23 +31,38 @@ async function processVideoFile (job: Bull.Job) {
51 return undefined 31 return undefined
52 } 32 }
53 33
54 // Transcoding in other resolution 34 if (payload.generateHlsPlaylist) {
55 if (payload.resolution) { 35 await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
36
37 await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
38 } else if (payload.resolution) { // Transcoding in other resolution
56 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) 39 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
57 40
58 await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video) 41 await retryTransactionWrapper(publishVideoIfNeeded, video, payload)
59 } else { 42 } else {
60 await optimizeVideofile(video) 43 await optimizeVideofile(video)
61 44
62 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload.isNewVideo) 45 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
63 } 46 }
64 47
65 return video 48 return video
66} 49}
67 50
68async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { 51async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
69 if (video === undefined) return undefined 52 if (video === undefined) return undefined
70 53
54 await sequelizeTypescript.transaction(async t => {
55 // Maybe the video changed in database, refresh it
56 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
57 // Video does not exist anymore
58 if (!videoDatabase) return undefined
59
60 // If the video was not published, we consider it is a new one for other instances
61 await federateVideoIfNeeded(videoDatabase, false, t)
62 })
63}
64
65async function publishVideoIfNeeded (video: VideoModel, payload?: VideoTranscodingPayload) {
71 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { 66 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
72 // Maybe the video changed in database, refresh it 67 // Maybe the video changed in database, refresh it
73 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 68 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
@@ -93,11 +88,13 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) {
93 88
94 if (videoPublished) { 89 if (videoPublished) {
95 Notifier.Instance.notifyOnNewVideo(videoDatabase) 90 Notifier.Instance.notifyOnNewVideo(videoDatabase)
96 Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) 91 Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
97 } 92 }
93
94 await createHlsJobIfEnabled(payload)
98} 95}
99 96
100async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) { 97async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoTranscodingPayload) {
101 if (videoArg === undefined) return undefined 98 if (videoArg === undefined) return undefined
102 99
103 // Outside the transaction (IO on disk) 100 // Outside the transaction (IO on disk)
@@ -119,7 +116,7 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo
119 let videoPublished = false 116 let videoPublished = false
120 117
121 if (resolutionsEnabled.length !== 0) { 118 if (resolutionsEnabled.length !== 0) {
122 const tasks: Bluebird<Bull.Job<any>>[] = [] 119 const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = []
123 120
124 for (const resolution of resolutionsEnabled) { 121 for (const resolution of resolutionsEnabled) {
125 const dataInput = { 122 const dataInput = {
@@ -127,7 +124,7 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo
127 resolution 124 resolution
128 } 125 }
129 126
130 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) 127 const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
131 tasks.push(p) 128 tasks.push(p)
132 } 129 }
133 130
@@ -144,18 +141,37 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo
144 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) 141 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
145 } 142 }
146 143
147 await federateVideoIfNeeded(videoDatabase, isNewVideo, t) 144 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
148 145
149 return { videoDatabase, videoPublished } 146 return { videoDatabase, videoPublished }
150 }) 147 })
151 148
152 if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) 149 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
153 if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) 150 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
151
152 await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }))
154} 153}
155 154
156// --------------------------------------------------------------------------- 155// ---------------------------------------------------------------------------
157 156
158export { 157export {
159 processVideoFile, 158 processVideoTranscoding,
160 processVideoFileImport 159 publishVideoIfNeeded
160}
161
162// ---------------------------------------------------------------------------
163
164function createHlsJobIfEnabled (payload?: VideoTranscodingPayload) {
165 // Generate HLS playlist?
166 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
167 const hlsTranscodingPayload = {
168 videoUUID: payload.videoUUID,
169 resolution: payload.resolution,
170 isPortraitMode: payload.isPortraitMode,
171
172 generateHlsPlaylist: true
173 }
174
175 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
176 }
161} 177}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index ba9cbe0d9..3c810da98 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -2,16 +2,17 @@ import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { Redis } from '../redis' 4import { Redis } from '../redis'
5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' 5import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
9import { EmailPayload, processEmail } from './handlers/email' 9import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' 10import { processVideoTranscoding, VideoTranscodingPayload } from './handlers/video-transcoding'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import' 12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
13import { processVideosViews } from './handlers/video-views' 13import { processVideosViews } from './handlers/video-views'
14import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' 14import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
15import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import'
15 16
16type CreateJobArgument = 17type CreateJobArgument =
17 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 18 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -19,19 +20,20 @@ type CreateJobArgument =
19 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | 20 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
20 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | 21 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
21 { type: 'video-file-import', payload: VideoFileImportPayload } | 22 { type: 'video-file-import', payload: VideoFileImportPayload } |
22 { type: 'video-file', payload: VideoFilePayload } | 23 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
23 { type: 'email', payload: EmailPayload } | 24 { type: 'email', payload: EmailPayload } |
24 { type: 'video-import', payload: VideoImportPayload } | 25 { type: 'video-import', payload: VideoImportPayload } |
25 { type: 'activitypub-refresher', payload: RefreshPayload } | 26 { type: 'activitypub-refresher', payload: RefreshPayload } |
26 { type: 'videos-views', payload: {} } 27 { type: 'videos-views', payload: {} }
27 28
28const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 29const handlers: { [ id in (JobType | 'video-file') ]: (job: Bull.Job) => Promise<any>} = {
29 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 30 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
30 'activitypub-http-unicast': processActivityPubHttpUnicast, 31 'activitypub-http-unicast': processActivityPubHttpUnicast,
31 'activitypub-http-fetcher': processActivityPubHttpFetcher, 32 'activitypub-http-fetcher': processActivityPubHttpFetcher,
32 'activitypub-follow': processActivityPubFollow, 33 'activitypub-follow': processActivityPubFollow,
33 'video-file-import': processVideoFileImport, 34 'video-file-import': processVideoFileImport,
34 'video-file': processVideoFile, 35 'video-transcoding': processVideoTranscoding,
36 'video-file': processVideoTranscoding, // TODO: remove it (changed in 1.3)
35 'email': processEmail, 37 'email': processEmail,
36 'video-import': processVideoImport, 38 'video-import': processVideoImport,
37 'videos-views': processVideosViews, 39 'videos-views': processVideosViews,
@@ -44,7 +46,7 @@ const jobTypes: JobType[] = [
44 'activitypub-http-fetcher', 46 'activitypub-http-fetcher',
45 'activitypub-http-unicast', 47 'activitypub-http-unicast',
46 'email', 48 'email',
47 'video-file', 49 'video-transcoding',
48 'video-file-import', 50 'video-file-import',
49 'video-import', 51 'video-import',
50 'videos-views', 52 'videos-views',
@@ -66,10 +68,10 @@ class JobQueue {
66 if (this.initialized === true) return 68 if (this.initialized === true) return
67 this.initialized = true 69 this.initialized = true
68 70
69 this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST 71 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
70 const queueOptions = { 72 const queueOptions = {
71 prefix: this.jobRedisPrefix, 73 prefix: this.jobRedisPrefix,
72 redis: Redis.getRedisClient(), 74 redis: Redis.getRedisClientOptions(),
73 settings: { 75 settings: {
74 maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts 76 maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
75 } 77 }