aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorJelle Besseling <jelle@pingiun.com>2021-08-17 08:26:20 +0200
committerGitHub <noreply@github.com>2021-08-17 08:26:20 +0200
commit0305db28c98fd6cf43a3c50ba92c76215e99d512 (patch)
tree33b753a19728d9f453c1aa4f19b36ac797e5fe80 /server/lib/job-queue
parentf88ae8f5bc223579313b28582de9101944a4a814 (diff)
downloadPeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.tar.gz
PeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.tar.zst
PeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.zip
Add support for saving video files to object storage (#4290)
* Add support for saving video files to object storage * Add support for custom url generation on s3 stored files Uses two config keys to support url generation that doesn't directly go to (compatible s3). Can be used to generate urls to any cache server or CDN. * Upload files to s3 concurrently and delete originals afterwards * Only publish after move to object storage is complete * Use base url instead of url template * Fix mistyped config field * Add rudenmentary way to download before transcode * Implement Chocobozzz suggestions https://github.com/Chocobozzz/PeerTube/pull/4290#issuecomment-891670478 The remarks in question: Try to use objectStorage prefix instead of s3 prefix for your function/variables/config names Prefer to use a tree for the config: s3.streaming_playlists_bucket -> object_storage.streaming_playlists.bucket Use uppercase for config: S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket -> OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET (maybe BUCKET_NAME instead of BUCKET) I suggest to rename moveJobsRunning to pendingMovingJobs (or better, create a dedicated videoJobInfo table with a pendingMove & videoId columns so we could also use this table to track pending transcoding jobs) https://github.com/Chocobozzz/PeerTube/pull/4290/files#diff-3e26d41ca4bda1de8e1747af70ca2af642abcc1e9e0bfb94239ff2165acfbde5R19 uses a string instead of an integer I think we should store the origin object storage URL in fileUrl, without base_url injection. Instead, inject the base_url at "runtime" so admins can easily change this configuration without running a script to update DB URLs * Import correct function * Support multipart upload * Remove import of node 15.0 module stream/promises * Extend maximum upload job length Using the same value as for redundancy downloading seems logical * Use dynamic part size for really large uploads Also adds very small part size for local testing * Fix decreasePendingMove query * Resolve various PR comments * Move to object storage after optimize * Make upload size configurable and increase default * Prune webtorrent files that are stored in object storage * Move files after transcoding jobs * Fix federation * Add video path manager * Support move to external storage job in client * Fix live object storage tests Co-authored-by: Chocobozzz <me@florianbigard.com>
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts114
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts27
-rw-r--r--server/lib/job-queue/handlers/video-import.ts16
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts18
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts128
-rw-r--r--server/lib/job-queue/job-queue.ts13
6 files changed, 234 insertions, 82 deletions
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
new file mode 100644
index 000000000..a0c58d211
--- /dev/null
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -0,0 +1,114 @@
1import * as Bull from 'bull'
2import { remove } from 'fs-extra'
3import { join } from 'path'
4import { logger } from '@server/helpers/logger'
5import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config'
7import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage'
8import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
9import { moveToNextState } from '@server/lib/video-state'
10import { VideoModel } from '@server/models/video/video'
11import { VideoJobInfoModel } from '@server/models/video/video-job-info'
12import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
13import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared'
14
15export async function processMoveToObjectStorage (job: Bull.Job) {
16 const payload = job.data as MoveObjectStoragePayload
17 logger.info('Moving video %s in job %d.', payload.videoUUID, job.id)
18
19 const video = await VideoModel.loadWithFiles(payload.videoUUID)
20 // No video, maybe deleted?
21 if (!video) {
22 logger.info('Can\'t process job %d, video does not exist.', job.id)
23 return undefined
24 }
25
26 if (video.VideoFiles) {
27 await moveWebTorrentFiles(video)
28 }
29
30 if (video.VideoStreamingPlaylists) {
31 await moveHLSFiles(video)
32 }
33
34 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
35 if (pendingMove === 0) {
36 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id)
37 await doAfterLastJob(video, payload.isNewVideo)
38 }
39
40 return payload.videoUUID
41}
42
43// ---------------------------------------------------------------------------
44
45async function moveWebTorrentFiles (video: MVideoWithAllFiles) {
46 for (const file of video.VideoFiles) {
47 if (file.storage !== VideoStorage.FILE_SYSTEM) continue
48
49 const fileUrl = await storeWebTorrentFile(file.filename)
50
51 const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename)
52 await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath })
53 }
54}
55
56async function moveHLSFiles (video: MVideoWithAllFiles) {
57 for (const playlist of video.VideoStreamingPlaylists) {
58
59 for (const file of playlist.VideoFiles) {
60 if (file.storage !== VideoStorage.FILE_SYSTEM) continue
61
62 // Resolution playlist
63 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
64 await storeHLSFile(playlist, video, playlistFilename)
65
66 // Resolution fragmented file
67 const fileUrl = await storeHLSFile(playlist, video, file.filename)
68
69 const oldPath = join(getHLSDirectory(video), file.filename)
70
71 await onFileMoved({ videoOrPlaylist: Object.assign(playlist, { Video: video }), file, fileUrl, oldPath })
72 }
73 }
74}
75
76async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) {
77 for (const playlist of video.VideoStreamingPlaylists) {
78 if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue
79
80 // Master playlist
81 playlist.playlistUrl = await storeHLSFile(playlist, video, playlist.playlistFilename)
82 // Sha256 segments file
83 playlist.segmentsSha256Url = await storeHLSFile(playlist, video, playlist.segmentsSha256Filename)
84
85 playlist.storage = VideoStorage.OBJECT_STORAGE
86
87 await playlist.save()
88 }
89
90 // Remove empty hls video directory
91 if (video.VideoStreamingPlaylists) {
92 await remove(getHLSDirectory(video))
93 }
94
95 await moveToNextState(video, isNewVideo)
96}
97
98async function onFileMoved (options: {
99 videoOrPlaylist: MVideo | MStreamingPlaylistVideo
100 file: MVideoFile
101 fileUrl: string
102 oldPath: string
103}) {
104 const { videoOrPlaylist, file, fileUrl, oldPath } = options
105
106 file.fileUrl = fileUrl
107 file.storage = VideoStorage.OBJECT_STORAGE
108
109 await createTorrentAndSetInfoHash(videoOrPlaylist, file)
110 await file.save()
111
112 logger.debug('Removing %s because it\'s now on object storage', oldPath)
113 await remove(oldPath)
114}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 2f4abf730..e8ee1f759 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -2,15 +2,19 @@ import * as Bull from 'bull'
2import { copy, stat } from 'fs-extra' 2import { copy, stat } from 'fs-extra'
3import { getLowercaseExtension } from '@server/helpers/core-utils' 3import { getLowercaseExtension } from '@server/helpers/core-utils'
4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
5import { generateWebTorrentVideoFilename, getVideoFilePath } from '@server/lib/video-paths' 5import { CONFIG } from '@server/initializers/config'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { addMoveToObjectStorageJob } from '@server/lib/video'
9import { VideoPathManager } from '@server/lib/video-path-manager'
6import { UserModel } from '@server/models/user/user' 10import { UserModel } from '@server/models/user/user'
7import { MVideoFullLight } from '@server/types/models' 11import { MVideoFullLight } from '@server/types/models'
8import { VideoFileImportPayload } from '@shared/models' 12import { VideoFileImportPayload, VideoStorage } from '@shared/models'
9import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' 13import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils'
10import { logger } from '../../../helpers/logger' 14import { logger } from '../../../helpers/logger'
11import { VideoModel } from '../../../models/video/video' 15import { VideoModel } from '../../../models/video/video'
12import { VideoFileModel } from '../../../models/video/video-file' 16import { VideoFileModel } from '../../../models/video/video-file'
13import { onNewWebTorrentFileResolution } from './video-transcoding' 17import { createHlsJobIfEnabled } from './video-transcoding'
14 18
15async function processVideoFileImport (job: Bull.Job) { 19async function processVideoFileImport (job: Bull.Job) {
16 const payload = job.data as VideoFileImportPayload 20 const payload = job.data as VideoFileImportPayload
@@ -29,15 +33,19 @@ async function processVideoFileImport (job: Bull.Job) {
29 33
30 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) 34 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
31 35
32 const newResolutionPayload = { 36 await createHlsJobIfEnabled(user, {
33 type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent',
34 videoUUID: video.uuid, 37 videoUUID: video.uuid,
35 resolution: data.resolution, 38 resolution: data.resolution,
36 isPortraitMode: data.isPortraitMode, 39 isPortraitMode: data.isPortraitMode,
37 copyCodecs: false, 40 copyCodecs: true,
38 isNewVideo: false 41 isMaxQuality: false
42 })
43
44 if (CONFIG.OBJECT_STORAGE.ENABLED) {
45 await addMoveToObjectStorageJob(video)
46 } else {
47 await federateVideoIfNeeded(video, false)
39 } 48 }
40 await onNewWebTorrentFileResolution(video, user, newResolutionPayload)
41 49
42 return video 50 return video
43} 51}
@@ -72,12 +80,13 @@ async function updateVideoFile (video: MVideoFullLight, inputFilePath: string) {
72 resolution, 80 resolution,
73 extname: fileExt, 81 extname: fileExt,
74 filename: generateWebTorrentVideoFilename(resolution, fileExt), 82 filename: generateWebTorrentVideoFilename(resolution, fileExt),
83 storage: VideoStorage.FILE_SYSTEM,
75 size, 84 size,
76 fps, 85 fps,
77 videoId: video.id 86 videoId: video.id
78 }) 87 })
79 88
80 const outputPath = getVideoFilePath(video, newVideoFile) 89 const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile)
81 await copy(inputFilePath, outputPath) 90 await copy(inputFilePath, outputPath)
82 91
83 video.VideoFiles.push(newVideoFile) 92 video.VideoFiles.push(newVideoFile)
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index fec553f2b..a5fa204f5 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -4,11 +4,13 @@ import { getLowercaseExtension } from '@server/helpers/core-utils'
4import { retryTransactionWrapper } from '@server/helpers/database-utils' 4import { retryTransactionWrapper } from '@server/helpers/database-utils'
5import { YoutubeDL } from '@server/helpers/youtube-dl' 5import { YoutubeDL } from '@server/helpers/youtube-dl'
6import { isPostImportVideoAccepted } from '@server/lib/moderation' 6import { isPostImportVideoAccepted } from '@server/lib/moderation'
7import { generateWebTorrentVideoFilename } from '@server/lib/paths'
7import { Hooks } from '@server/lib/plugins/hooks' 8import { Hooks } from '@server/lib/plugins/hooks'
8import { ServerConfigManager } from '@server/lib/server-config-manager' 9import { ServerConfigManager } from '@server/lib/server-config-manager'
9import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
10import { addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video'
11import { generateWebTorrentVideoFilename, getVideoFilePath } from '@server/lib/video-paths' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state'
12import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
13import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 15import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
14import { 16import {
@@ -25,7 +27,6 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro
25import { logger } from '../../../helpers/logger' 27import { logger } from '../../../helpers/logger'
26import { getSecureTorrentName } from '../../../helpers/utils' 28import { getSecureTorrentName } from '../../../helpers/utils'
27import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' 29import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent'
28import { CONFIG } from '../../../initializers/config'
29import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants' 30import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants'
30import { sequelizeTypescript } from '../../../initializers/database' 31import { sequelizeTypescript } from '../../../initializers/database'
31import { VideoModel } from '../../../models/video/video' 32import { VideoModel } from '../../../models/video/video'
@@ -100,7 +101,6 @@ type ProcessFileOptions = {
100} 101}
101async function processFile (downloader: () => Promise<string>, videoImport: MVideoImportDefault, options: ProcessFileOptions) { 102async function processFile (downloader: () => Promise<string>, videoImport: MVideoImportDefault, options: ProcessFileOptions) {
102 let tempVideoPath: string 103 let tempVideoPath: string
103 let videoDestFile: string
104 let videoFile: VideoFileModel 104 let videoFile: VideoFileModel
105 105
106 try { 106 try {
@@ -159,7 +159,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
159 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles }) 159 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles })
160 160
161 // Move file 161 // Move file
162 videoDestFile = getVideoFilePath(videoImportWithFiles.Video, videoFile) 162 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
163 await move(tempVideoPath, videoDestFile) 163 await move(tempVideoPath, videoDestFile)
164 tempVideoPath = null // This path is not used anymore 164 tempVideoPath = null // This path is not used anymore
165 165
@@ -204,7 +204,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
204 204
205 // Update video DB object 205 // Update video DB object
206 video.duration = duration 206 video.duration = duration
207 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED 207 video.state = buildNextVideoState(video.state)
208 await video.save({ transaction: t }) 208 await video.save({ transaction: t })
209 209
210 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) 210 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
@@ -245,6 +245,10 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
245 Notifier.Instance.notifyOnNewVideoIfNeeded(video) 245 Notifier.Instance.notifyOnNewVideoIfNeeded(video)
246 } 246 }
247 247
248 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
249 return addMoveToObjectStorageJob(videoImportUpdated.Video)
250 }
251
248 // Create transcoding jobs? 252 // Create transcoding jobs?
249 if (video.state === VideoState.TO_TRANSCODE) { 253 if (video.state === VideoState.TO_TRANSCODE) {
250 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) 254 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User)
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index aa5bd573a..9ccf724c2 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -4,10 +4,11 @@ import { join } from 'path'
4import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' 4import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
5import { VIDEO_LIVE } from '@server/initializers/constants' 5import { VIDEO_LIVE } from '@server/initializers/constants'
6import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live' 6import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live'
7import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveDirectory } from '@server/lib/paths'
7import { generateVideoMiniature } from '@server/lib/thumbnail' 8import { generateVideoMiniature } from '@server/lib/thumbnail'
8import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/video-transcoding' 9import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/video-transcoding'
9import { publishAndFederateIfNeeded } from '@server/lib/video' 10import { VideoPathManager } from '@server/lib/video-path-manager'
10import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHLSDirectory } from '@server/lib/video-paths' 11import { moveToNextState } from '@server/lib/video-state'
11import { VideoModel } from '@server/models/video/video' 12import { VideoModel } from '@server/models/video/video'
12import { VideoFileModel } from '@server/models/video/video-file' 13import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoLiveModel } from '@server/models/video/video-live' 14import { VideoLiveModel } from '@server/models/video/video-live'
@@ -55,16 +56,15 @@ export {
55// --------------------------------------------------------------------------- 56// ---------------------------------------------------------------------------
56 57
57async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MStreamingPlaylist) { 58async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MStreamingPlaylist) {
58 const hlsDirectory = getHLSDirectory(video, false) 59 const replayDirectory = VideoPathManager.Instance.getFSHLSOutputPath(video, VIDEO_LIVE.REPLAY_DIRECTORY)
59 const replayDirectory = join(hlsDirectory, VIDEO_LIVE.REPLAY_DIRECTORY)
60 60
61 const rootFiles = await readdir(hlsDirectory) 61 const rootFiles = await readdir(getLiveDirectory(video))
62 62
63 const playlistFiles = rootFiles.filter(file => { 63 const playlistFiles = rootFiles.filter(file => {
64 return file.endsWith('.m3u8') && file !== streamingPlaylist.playlistFilename 64 return file.endsWith('.m3u8') && file !== streamingPlaylist.playlistFilename
65 }) 65 })
66 66
67 await cleanupLiveFiles(hlsDirectory) 67 await cleanupTMPLiveFiles(getLiveDirectory(video))
68 68
69 await live.destroy() 69 await live.destroy()
70 70
@@ -98,7 +98,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt
98 98
99 const { resolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) 99 const { resolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe)
100 100
101 const outputPath = await generateHlsPlaylistResolutionFromTS({ 101 const { resolutionPlaylistPath: outputPath } = await generateHlsPlaylistResolutionFromTS({
102 video: videoWithFiles, 102 video: videoWithFiles,
103 concatenatedTsFilePath, 103 concatenatedTsFilePath,
104 resolution, 104 resolution,
@@ -133,10 +133,10 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt
133 }) 133 })
134 } 134 }
135 135
136 await publishAndFederateIfNeeded(videoWithFiles, true) 136 await moveToNextState(videoWithFiles, false)
137} 137}
138 138
139async function cleanupLiveFiles (hlsDirectory: string) { 139async function cleanupTMPLiveFiles (hlsDirectory: string) {
140 if (!await pathExists(hlsDirectory)) return 140 if (!await pathExists(hlsDirectory)) return
141 141
142 const files = await readdir(hlsDirectory) 142 const files = await readdir(hlsDirectory)
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 876d1460c..b3149dde8 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,9 +1,11 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' 2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
3import { getTranscodingJobPriority, publishAndFederateIfNeeded } from '@server/lib/video' 3import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
4import { getVideoFilePath } from '@server/lib/video-paths' 4import { VideoPathManager } from '@server/lib/video-path-manager'
5import { moveToNextState } from '@server/lib/video-state'
5import { UserModel } from '@server/models/user/user' 6import { UserModel } from '@server/models/user/user'
6import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' 7import { VideoJobInfoModel } from '@server/models/video/video-job-info'
8import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models'
7import { 9import {
8 HLSTranscodingPayload, 10 HLSTranscodingPayload,
9 MergeAudioTranscodingPayload, 11 MergeAudioTranscodingPayload,
@@ -16,17 +18,14 @@ import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils'
16import { logger } from '../../../helpers/logger' 18import { logger } from '../../../helpers/logger'
17import { CONFIG } from '../../../initializers/config' 19import { CONFIG } from '../../../initializers/config'
18import { VideoModel } from '../../../models/video/video' 20import { VideoModel } from '../../../models/video/video'
19import { federateVideoIfNeeded } from '../../activitypub/videos'
20import { Notifier } from '../../notifier'
21import { 21import {
22 generateHlsPlaylistResolution, 22 generateHlsPlaylistResolution,
23 mergeAudioVideofile, 23 mergeAudioVideofile,
24 optimizeOriginalVideofile, 24 optimizeOriginalVideofile,
25 transcodeNewWebTorrentResolution 25 transcodeNewWebTorrentResolution
26} from '../../transcoding/video-transcoding' 26} from '../../transcoding/video-transcoding'
27import { JobQueue } from '../job-queue'
28 27
29type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<any> 28type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
30 29
31const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { 30const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
32 'new-resolution-to-hls': handleHLSJob, 31 'new-resolution-to-hls': handleHLSJob,
@@ -69,15 +68,16 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide
69 : video.getMaxQualityFile() 68 : video.getMaxQualityFile()
70 69
71 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() 70 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
72 const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput)
73 71
74 await generateHlsPlaylistResolution({ 72 await VideoPathManager.Instance.makeAvailableVideoFile(videoOrStreamingPlaylist, videoFileInput, videoInputPath => {
75 video, 73 return generateHlsPlaylistResolution({
76 videoInputPath, 74 video,
77 resolution: payload.resolution, 75 videoInputPath,
78 copyCodecs: payload.copyCodecs, 76 resolution: payload.resolution,
79 isPortraitMode: payload.isPortraitMode || false, 77 copyCodecs: payload.copyCodecs,
80 job 78 isPortraitMode: payload.isPortraitMode || false,
79 job
80 })
81 }) 81 })
82 82
83 await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) 83 await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload)
@@ -101,7 +101,7 @@ async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudio
101} 101}
102 102
103async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { 103async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
104 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) 104 const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
105 105
106 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) 106 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user)
107} 107}
@@ -121,10 +121,18 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay
121 video.VideoFiles = [] 121 video.VideoFiles = []
122 122
123 // Create HLS new resolution jobs 123 // Create HLS new resolution jobs
124 await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') 124 await createLowerResolutionsJobs({
125 video,
126 user,
127 videoFileResolution: payload.resolution,
128 isPortraitMode: payload.isPortraitMode,
129 isNewVideo: payload.isNewVideo ?? true,
130 type: 'hls'
131 })
125 } 132 }
126 133
127 return publishAndFederateIfNeeded(video) 134 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
135 await moveToNextState(video, payload.isNewVideo)
128} 136}
129 137
130async function onVideoFileOptimizer ( 138async function onVideoFileOptimizer (
@@ -143,58 +151,54 @@ async function onVideoFileOptimizer (
143 // Video does not exist anymore 151 // Video does not exist anymore
144 if (!videoDatabase) return undefined 152 if (!videoDatabase) return undefined
145 153
146 let videoPublished = false
147
148 // Generate HLS version of the original file 154 // Generate HLS version of the original file
149 const originalFileHLSPayload = Object.assign({}, payload, { 155 const originalFileHLSPayload = {
156 ...payload,
157
150 isPortraitMode, 158 isPortraitMode,
151 resolution: videoDatabase.getMaxQualityFile().resolution, 159 resolution: videoDatabase.getMaxQualityFile().resolution,
152 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 160 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
153 copyCodecs: transcodeType !== 'quick-transcode', 161 copyCodecs: transcodeType !== 'quick-transcode',
154 isMaxQuality: true 162 isMaxQuality: true
155 }) 163 }
156 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) 164 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
165 const hasNewResolutions = await createLowerResolutionsJobs({
166 video: videoDatabase,
167 user,
168 videoFileResolution: resolution,
169 isPortraitMode,
170 type: 'webtorrent',
171 isNewVideo: payload.isNewVideo ?? true
172 })
157 173
158 const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, resolution, isPortraitMode, 'webtorrent') 174 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
159 175
176 // Move to next state if there are no other resolutions to generate
160 if (!hasHls && !hasNewResolutions) { 177 if (!hasHls && !hasNewResolutions) {
161 // No transcoding to do, it's now published 178 await moveToNextState(videoDatabase, payload.isNewVideo)
162 videoPublished = await videoDatabase.publishIfNeededAndSave(undefined)
163 } 179 }
164
165 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo)
166
167 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
168 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
169} 180}
170 181
171async function onNewWebTorrentFileResolution ( 182async function onNewWebTorrentFileResolution (
172 video: MVideoUUID, 183 video: MVideo,
173 user: MUserId, 184 user: MUserId,
174 payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload 185 payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
175) { 186) {
176 await publishAndFederateIfNeeded(video) 187 await createHlsJobIfEnabled(user, { ...payload, copyCodecs: true, isMaxQuality: false })
188 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
177 189
178 await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) 190 await moveToNextState(video, payload.isNewVideo)
179} 191}
180 192
181// ---------------------------------------------------------------------------
182
183export {
184 processVideoTranscoding,
185 onNewWebTorrentFileResolution
186}
187
188// ---------------------------------------------------------------------------
189
190async function createHlsJobIfEnabled (user: MUserId, payload: { 193async function createHlsJobIfEnabled (user: MUserId, payload: {
191 videoUUID: string 194 videoUUID: string
192 resolution: number 195 resolution: number
193 isPortraitMode?: boolean 196 isPortraitMode?: boolean
194 copyCodecs: boolean 197 copyCodecs: boolean
195 isMaxQuality: boolean 198 isMaxQuality: boolean
199 isNewVideo?: boolean
196}) { 200}) {
197 if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false 201 if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
198 202
199 const jobOptions = { 203 const jobOptions = {
200 priority: await getTranscodingJobPriority(user) 204 priority: await getTranscodingJobPriority(user)
@@ -206,21 +210,35 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
206 resolution: payload.resolution, 210 resolution: payload.resolution,
207 isPortraitMode: payload.isPortraitMode, 211 isPortraitMode: payload.isPortraitMode,
208 copyCodecs: payload.copyCodecs, 212 copyCodecs: payload.copyCodecs,
209 isMaxQuality: payload.isMaxQuality 213 isMaxQuality: payload.isMaxQuality,
214 isNewVideo: payload.isNewVideo
210 } 215 }
211 216
212 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) 217 await addTranscodingJob(hlsTranscodingPayload, jobOptions)
213 218
214 return true 219 return true
215} 220}
216 221
217async function createLowerResolutionsJobs ( 222// ---------------------------------------------------------------------------
218 video: MVideoFullLight, 223
219 user: MUserId, 224export {
220 videoFileResolution: number, 225 processVideoTranscoding,
221 isPortraitMode: boolean, 226 createHlsJobIfEnabled,
227 onNewWebTorrentFileResolution
228}
229
230// ---------------------------------------------------------------------------
231
232async function createLowerResolutionsJobs (options: {
233 video: MVideoFullLight
234 user: MUserId
235 videoFileResolution: number
236 isPortraitMode: boolean
237 isNewVideo: boolean
222 type: 'hls' | 'webtorrent' 238 type: 'hls' | 'webtorrent'
223) { 239}) {
240 const { video, user, videoFileResolution, isPortraitMode, isNewVideo, type } = options
241
224 // Create transcoding jobs if there are enabled resolutions 242 // Create transcoding jobs if there are enabled resolutions
225 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') 243 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
226 const resolutionCreated: number[] = [] 244 const resolutionCreated: number[] = []
@@ -234,7 +252,8 @@ async function createLowerResolutionsJobs (
234 type: 'new-resolution-to-webtorrent', 252 type: 'new-resolution-to-webtorrent',
235 videoUUID: video.uuid, 253 videoUUID: video.uuid,
236 resolution, 254 resolution,
237 isPortraitMode 255 isPortraitMode,
256 isNewVideo
238 } 257 }
239 } 258 }
240 259
@@ -245,7 +264,8 @@ async function createLowerResolutionsJobs (
245 resolution, 264 resolution,
246 isPortraitMode, 265 isPortraitMode,
247 copyCodecs: false, 266 copyCodecs: false,
248 isMaxQuality: false 267 isMaxQuality: false,
268 isNewVideo
249 } 269 }
250 } 270 }
251 271
@@ -257,7 +277,7 @@ async function createLowerResolutionsJobs (
257 priority: await getTranscodingJobPriority(user) 277 priority: await getTranscodingJobPriority(user)
258 } 278 }
259 279
260 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) 280 await addTranscodingJob(dataInput, jobOptions)
261 } 281 }
262 282
263 if (resolutionCreated.length === 0) { 283 if (resolutionCreated.length === 0) {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 42e8347b1..7a3a1bf82 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -11,6 +11,7 @@ import {
11 EmailPayload, 11 EmailPayload,
12 JobState, 12 JobState,
13 JobType, 13 JobType,
14 MoveObjectStoragePayload,
14 RefreshPayload, 15 RefreshPayload,
15 VideoFileImportPayload, 16 VideoFileImportPayload,
16 VideoImportPayload, 17 VideoImportPayload,
@@ -34,6 +35,7 @@ import { processVideoImport } from './handlers/video-import'
34import { processVideoLiveEnding } from './handlers/video-live-ending' 35import { processVideoLiveEnding } from './handlers/video-live-ending'
35import { processVideoTranscoding } from './handlers/video-transcoding' 36import { processVideoTranscoding } from './handlers/video-transcoding'
36import { processVideosViews } from './handlers/video-views' 37import { processVideosViews } from './handlers/video-views'
38import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
37 39
38type CreateJobArgument = 40type CreateJobArgument =
39 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 41 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -49,9 +51,10 @@ type CreateJobArgument =
49 { type: 'videos-views', payload: {} } | 51 { type: 'videos-views', payload: {} } |
50 { type: 'video-live-ending', payload: VideoLiveEndingPayload } | 52 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
51 { type: 'actor-keys', payload: ActorKeysPayload } | 53 { type: 'actor-keys', payload: ActorKeysPayload } |
52 { type: 'video-redundancy', payload: VideoRedundancyPayload } 54 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
55 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
53 56
54type CreateJobOptions = { 57export type CreateJobOptions = {
55 delay?: number 58 delay?: number
56 priority?: number 59 priority?: number
57} 60}
@@ -70,7 +73,8 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
70 'activitypub-refresher': refreshAPObject, 73 'activitypub-refresher': refreshAPObject,
71 'video-live-ending': processVideoLiveEnding, 74 'video-live-ending': processVideoLiveEnding,
72 'actor-keys': processActorKeys, 75 'actor-keys': processActorKeys,
73 'video-redundancy': processVideoRedundancy 76 'video-redundancy': processVideoRedundancy,
77 'move-to-object-storage': processMoveToObjectStorage
74} 78}
75 79
76const jobTypes: JobType[] = [ 80const jobTypes: JobType[] = [
@@ -87,7 +91,8 @@ const jobTypes: JobType[] = [
87 'activitypub-refresher', 91 'activitypub-refresher',
88 'video-redundancy', 92 'video-redundancy',
89 'actor-keys', 93 'actor-keys',
90 'video-live-ending' 94 'video-live-ending',
95 'move-to-object-storage'
91] 96]
92 97
93class JobQueue { 98class JobQueue {