aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers/video-transcoding.ts
diff options
context:
space:
mode:
authorJelle Besseling <jelle@pingiun.com>2021-08-17 08:26:20 +0200
committerGitHub <noreply@github.com>2021-08-17 08:26:20 +0200
commit0305db28c98fd6cf43a3c50ba92c76215e99d512 (patch)
tree33b753a19728d9f453c1aa4f19b36ac797e5fe80 /server/lib/job-queue/handlers/video-transcoding.ts
parentf88ae8f5bc223579313b28582de9101944a4a814 (diff)
downloadPeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.tar.gz
PeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.tar.zst
PeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.zip
Add support for saving video files to object storage (#4290)
* Add support for saving video files to object storage * Add support for custom url generation on s3 stored files Uses two config keys to support url generation that doesn't directly go to (compatible s3). Can be used to generate urls to any cache server or CDN. * Upload files to s3 concurrently and delete originals afterwards * Only publish after move to object storage is complete * Use base url instead of url template * Fix mistyped config field * Add rudenmentary way to download before transcode * Implement Chocobozzz suggestions https://github.com/Chocobozzz/PeerTube/pull/4290#issuecomment-891670478 The remarks in question: Try to use objectStorage prefix instead of s3 prefix for your function/variables/config names Prefer to use a tree for the config: s3.streaming_playlists_bucket -> object_storage.streaming_playlists.bucket Use uppercase for config: S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket -> OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET (maybe BUCKET_NAME instead of BUCKET) I suggest to rename moveJobsRunning to pendingMovingJobs (or better, create a dedicated videoJobInfo table with a pendingMove & videoId columns so we could also use this table to track pending transcoding jobs) https://github.com/Chocobozzz/PeerTube/pull/4290/files#diff-3e26d41ca4bda1de8e1747af70ca2af642abcc1e9e0bfb94239ff2165acfbde5R19 uses a string instead of an integer I think we should store the origin object storage URL in fileUrl, without base_url injection. Instead, inject the base_url at "runtime" so admins can easily change this configuration without running a script to update DB URLs * Import correct function * Support multipart upload * Remove import of node 15.0 module stream/promises * Extend maximum upload job length Using the same value as for redundancy downloading seems logical * Use dynamic part size for really large uploads Also adds very small part size for local testing * Fix decreasePendingMove query * Resolve various PR comments * Move to object storage after optimize * Make upload size configurable and increase default * Prune webtorrent files that are stored in object storage * Move files after transcoding jobs * Fix federation * Add video path manager * Support move to external storage job in client * Fix live object storage tests Co-authored-by: Chocobozzz <me@florianbigard.com>
Diffstat (limited to 'server/lib/job-queue/handlers/video-transcoding.ts')
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts128
1 files changed, 74 insertions, 54 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 876d1460c..b3149dde8 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,9 +1,11 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' 2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
3import { getTranscodingJobPriority, publishAndFederateIfNeeded } from '@server/lib/video' 3import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
4import { getVideoFilePath } from '@server/lib/video-paths' 4import { VideoPathManager } from '@server/lib/video-path-manager'
5import { moveToNextState } from '@server/lib/video-state'
5import { UserModel } from '@server/models/user/user' 6import { UserModel } from '@server/models/user/user'
6import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' 7import { VideoJobInfoModel } from '@server/models/video/video-job-info'
8import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models'
7import { 9import {
8 HLSTranscodingPayload, 10 HLSTranscodingPayload,
9 MergeAudioTranscodingPayload, 11 MergeAudioTranscodingPayload,
@@ -16,17 +18,14 @@ import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils'
16import { logger } from '../../../helpers/logger' 18import { logger } from '../../../helpers/logger'
17import { CONFIG } from '../../../initializers/config' 19import { CONFIG } from '../../../initializers/config'
18import { VideoModel } from '../../../models/video/video' 20import { VideoModel } from '../../../models/video/video'
19import { federateVideoIfNeeded } from '../../activitypub/videos'
20import { Notifier } from '../../notifier'
21import { 21import {
22 generateHlsPlaylistResolution, 22 generateHlsPlaylistResolution,
23 mergeAudioVideofile, 23 mergeAudioVideofile,
24 optimizeOriginalVideofile, 24 optimizeOriginalVideofile,
25 transcodeNewWebTorrentResolution 25 transcodeNewWebTorrentResolution
26} from '../../transcoding/video-transcoding' 26} from '../../transcoding/video-transcoding'
27import { JobQueue } from '../job-queue'
28 27
29type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<any> 28type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
30 29
31const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { 30const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
32 'new-resolution-to-hls': handleHLSJob, 31 'new-resolution-to-hls': handleHLSJob,
@@ -69,15 +68,16 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide
69 : video.getMaxQualityFile() 68 : video.getMaxQualityFile()
70 69
71 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() 70 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
72 const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput)
73 71
74 await generateHlsPlaylistResolution({ 72 await VideoPathManager.Instance.makeAvailableVideoFile(videoOrStreamingPlaylist, videoFileInput, videoInputPath => {
75 video, 73 return generateHlsPlaylistResolution({
76 videoInputPath, 74 video,
77 resolution: payload.resolution, 75 videoInputPath,
78 copyCodecs: payload.copyCodecs, 76 resolution: payload.resolution,
79 isPortraitMode: payload.isPortraitMode || false, 77 copyCodecs: payload.copyCodecs,
80 job 78 isPortraitMode: payload.isPortraitMode || false,
79 job
80 })
81 }) 81 })
82 82
83 await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) 83 await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload)
@@ -101,7 +101,7 @@ async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudio
101} 101}
102 102
103async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { 103async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
104 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) 104 const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
105 105
106 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) 106 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user)
107} 107}
@@ -121,10 +121,18 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay
121 video.VideoFiles = [] 121 video.VideoFiles = []
122 122
123 // Create HLS new resolution jobs 123 // Create HLS new resolution jobs
124 await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') 124 await createLowerResolutionsJobs({
125 video,
126 user,
127 videoFileResolution: payload.resolution,
128 isPortraitMode: payload.isPortraitMode,
129 isNewVideo: payload.isNewVideo ?? true,
130 type: 'hls'
131 })
125 } 132 }
126 133
127 return publishAndFederateIfNeeded(video) 134 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
135 await moveToNextState(video, payload.isNewVideo)
128} 136}
129 137
130async function onVideoFileOptimizer ( 138async function onVideoFileOptimizer (
@@ -143,58 +151,54 @@ async function onVideoFileOptimizer (
143 // Video does not exist anymore 151 // Video does not exist anymore
144 if (!videoDatabase) return undefined 152 if (!videoDatabase) return undefined
145 153
146 let videoPublished = false
147
148 // Generate HLS version of the original file 154 // Generate HLS version of the original file
149 const originalFileHLSPayload = Object.assign({}, payload, { 155 const originalFileHLSPayload = {
156 ...payload,
157
150 isPortraitMode, 158 isPortraitMode,
151 resolution: videoDatabase.getMaxQualityFile().resolution, 159 resolution: videoDatabase.getMaxQualityFile().resolution,
152 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 160 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
153 copyCodecs: transcodeType !== 'quick-transcode', 161 copyCodecs: transcodeType !== 'quick-transcode',
154 isMaxQuality: true 162 isMaxQuality: true
155 }) 163 }
156 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) 164 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
165 const hasNewResolutions = await createLowerResolutionsJobs({
166 video: videoDatabase,
167 user,
168 videoFileResolution: resolution,
169 isPortraitMode,
170 type: 'webtorrent',
171 isNewVideo: payload.isNewVideo ?? true
172 })
157 173
158 const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, resolution, isPortraitMode, 'webtorrent') 174 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
159 175
176 // Move to next state if there are no other resolutions to generate
160 if (!hasHls && !hasNewResolutions) { 177 if (!hasHls && !hasNewResolutions) {
161 // No transcoding to do, it's now published 178 await moveToNextState(videoDatabase, payload.isNewVideo)
162 videoPublished = await videoDatabase.publishIfNeededAndSave(undefined)
163 } 179 }
164
165 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo)
166
167 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
168 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
169} 180}
170 181
171async function onNewWebTorrentFileResolution ( 182async function onNewWebTorrentFileResolution (
172 video: MVideoUUID, 183 video: MVideo,
173 user: MUserId, 184 user: MUserId,
174 payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload 185 payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
175) { 186) {
176 await publishAndFederateIfNeeded(video) 187 await createHlsJobIfEnabled(user, { ...payload, copyCodecs: true, isMaxQuality: false })
188 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
177 189
178 await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) 190 await moveToNextState(video, payload.isNewVideo)
179} 191}
180 192
181// ---------------------------------------------------------------------------
182
183export {
184 processVideoTranscoding,
185 onNewWebTorrentFileResolution
186}
187
188// ---------------------------------------------------------------------------
189
190async function createHlsJobIfEnabled (user: MUserId, payload: { 193async function createHlsJobIfEnabled (user: MUserId, payload: {
191 videoUUID: string 194 videoUUID: string
192 resolution: number 195 resolution: number
193 isPortraitMode?: boolean 196 isPortraitMode?: boolean
194 copyCodecs: boolean 197 copyCodecs: boolean
195 isMaxQuality: boolean 198 isMaxQuality: boolean
199 isNewVideo?: boolean
196}) { 200}) {
197 if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false 201 if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
198 202
199 const jobOptions = { 203 const jobOptions = {
200 priority: await getTranscodingJobPriority(user) 204 priority: await getTranscodingJobPriority(user)
@@ -206,21 +210,35 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
206 resolution: payload.resolution, 210 resolution: payload.resolution,
207 isPortraitMode: payload.isPortraitMode, 211 isPortraitMode: payload.isPortraitMode,
208 copyCodecs: payload.copyCodecs, 212 copyCodecs: payload.copyCodecs,
209 isMaxQuality: payload.isMaxQuality 213 isMaxQuality: payload.isMaxQuality,
214 isNewVideo: payload.isNewVideo
210 } 215 }
211 216
212 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) 217 await addTranscodingJob(hlsTranscodingPayload, jobOptions)
213 218
214 return true 219 return true
215} 220}
216 221
217async function createLowerResolutionsJobs ( 222// ---------------------------------------------------------------------------
218 video: MVideoFullLight, 223
219 user: MUserId, 224export {
220 videoFileResolution: number, 225 processVideoTranscoding,
221 isPortraitMode: boolean, 226 createHlsJobIfEnabled,
227 onNewWebTorrentFileResolution
228}
229
230// ---------------------------------------------------------------------------
231
232async function createLowerResolutionsJobs (options: {
233 video: MVideoFullLight
234 user: MUserId
235 videoFileResolution: number
236 isPortraitMode: boolean
237 isNewVideo: boolean
222 type: 'hls' | 'webtorrent' 238 type: 'hls' | 'webtorrent'
223) { 239}) {
240 const { video, user, videoFileResolution, isPortraitMode, isNewVideo, type } = options
241
224 // Create transcoding jobs if there are enabled resolutions 242 // Create transcoding jobs if there are enabled resolutions
225 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') 243 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
226 const resolutionCreated: number[] = [] 244 const resolutionCreated: number[] = []
@@ -234,7 +252,8 @@ async function createLowerResolutionsJobs (
234 type: 'new-resolution-to-webtorrent', 252 type: 'new-resolution-to-webtorrent',
235 videoUUID: video.uuid, 253 videoUUID: video.uuid,
236 resolution, 254 resolution,
237 isPortraitMode 255 isPortraitMode,
256 isNewVideo
238 } 257 }
239 } 258 }
240 259
@@ -245,7 +264,8 @@ async function createLowerResolutionsJobs (
245 resolution, 264 resolution,
246 isPortraitMode, 265 isPortraitMode,
247 copyCodecs: false, 266 copyCodecs: false,
248 isMaxQuality: false 267 isMaxQuality: false,
268 isNewVideo
249 } 269 }
250 } 270 }
251 271
@@ -257,7 +277,7 @@ async function createLowerResolutionsJobs (
257 priority: await getTranscodingJobPriority(user) 277 priority: await getTranscodingJobPriority(user)
258 } 278 }
259 279
260 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) 280 await addTranscodingJob(dataInput, jobOptions)
261 } 281 }
262 282
263 if (resolutionCreated.length === 0) { 283 if (resolutionCreated.length === 0) {