aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-01-21 15:58:17 +0100
committerChocobozzz <me@florianbigard.com>2021-01-21 15:58:17 +0100
commit24516aa26a6753517b379cf7b5104c1a24eccad6 (patch)
treea94b561faa63238f8ac69848f6b0379f0ade08a2 /server/lib/job-queue
parent3b01f4c0ac764ecb70efaadfd939ca868c28769c (diff)
downloadPeerTube-24516aa26a6753517b379cf7b5104c1a24eccad6.tar.gz
PeerTube-24516aa26a6753517b379cf7b5104c1a24eccad6.tar.zst
PeerTube-24516aa26a6753517b379cf7b5104c1a24eccad6.zip
Refactor transcoding job handlers
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts4
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts4
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts199
3 files changed, 136 insertions, 71 deletions
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 18823ee9c..22e4d0cf1 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -9,7 +9,7 @@ import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprob
9import { logger } from '../../../helpers/logger' 9import { logger } from '../../../helpers/logger'
10import { VideoModel } from '../../../models/video/video' 10import { VideoModel } from '../../../models/video/video'
11import { VideoFileModel } from '../../../models/video/video-file' 11import { VideoFileModel } from '../../../models/video/video-file'
12import { publishNewResolutionIfNeeded } from './video-transcoding' 12import { onNewWebTorrentFileResolution } from './video-transcoding'
13 13
14async function processVideoFileImport (job: Bull.Job) { 14async function processVideoFileImport (job: Bull.Job) {
15 const payload = job.data as VideoFileImportPayload 15 const payload = job.data as VideoFileImportPayload
@@ -24,7 +24,7 @@ async function processVideoFileImport (job: Bull.Job) {
24 24
25 await updateVideoFile(video, payload.filePath) 25 await updateVideoFile(video, payload.filePath)
26 26
27 await publishNewResolutionIfNeeded(video) 27 await onNewWebTorrentFileResolution(video)
28 return video 28 return video
29} 29}
30 30
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 8018e2277..db6cd3682 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -7,7 +7,7 @@ import { LiveManager } from '@server/lib/live-manager'
7import { generateVideoMiniature } from '@server/lib/thumbnail' 7import { generateVideoMiniature } from '@server/lib/thumbnail'
8import { publishAndFederateIfNeeded } from '@server/lib/video' 8import { publishAndFederateIfNeeded } from '@server/lib/video'
9import { getHLSDirectory } from '@server/lib/video-paths' 9import { getHLSDirectory } from '@server/lib/video-paths'
10import { generateHlsPlaylistFromTS } from '@server/lib/video-transcoding' 10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/video-transcoding'
11import { VideoModel } from '@server/models/video/video' 11import { VideoModel } from '@server/models/video/video'
12import { VideoFileModel } from '@server/models/video/video-file' 12import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoLiveModel } from '@server/models/video/video-live' 13import { VideoLiveModel } from '@server/models/video/video-live'
@@ -102,7 +102,7 @@ async function saveLive (video: MVideo, live: MVideoLive) {
102 102
103 const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) 103 const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe)
104 104
105 const outputPath = await generateHlsPlaylistFromTS({ 105 const outputPath = await generateHlsPlaylistResolutionFromTS({
106 video: videoWithFiles, 106 video: videoWithFiles,
107 concatenatedTsFilePath, 107 concatenatedTsFilePath,
108 resolution: videoFileResolution, 108 resolution: videoFileResolution,
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 083cec11a..0f6b3f753 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,8 +1,10 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
2import { publishAndFederateIfNeeded } from '@server/lib/video' 3import { publishAndFederateIfNeeded } from '@server/lib/video'
3import { getVideoFilePath } from '@server/lib/video-paths' 4import { getVideoFilePath } from '@server/lib/video-paths'
4import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' 5import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models'
5import { 6import {
7 HLSTranscodingPayload,
6 MergeAudioTranscodingPayload, 8 MergeAudioTranscodingPayload,
7 NewResolutionTranscodingPayload, 9 NewResolutionTranscodingPayload,
8 OptimizeTranscodingPayload, 10 OptimizeTranscodingPayload,
@@ -16,9 +18,31 @@ import { sequelizeTypescript } from '../../../initializers/database'
16import { VideoModel } from '../../../models/video/video' 18import { VideoModel } from '../../../models/video/video'
17import { federateVideoIfNeeded } from '../../activitypub/videos' 19import { federateVideoIfNeeded } from '../../activitypub/videos'
18import { Notifier } from '../../notifier' 20import { Notifier } from '../../notifier'
19import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' 21import {
22 generateHlsPlaylistResolution,
23 mergeAudioVideofile,
24 optimizeOriginalVideofile,
25 transcodeNewWebTorrentResolution
26} from '../../video-transcoding'
20import { JobQueue } from '../job-queue' 27import { JobQueue } from '../job-queue'
21import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' 28
29const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = {
30 // Deprecated, introduced in 3.1
31 'hls': handleHLSJob,
32 'new-resolution-to-hls': handleHLSJob,
33
34 // Deprecated, introduced in 3.1
35 'new-resolution': handleNewWebTorrentResolutionJob,
36 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob,
37
38 // Deprecated, introduced in 3.1
39 'merge-audio': handleWebTorrentMergeAudioJob,
40 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob,
41
42 // Deprecated, introduced in 3.1
43 'optimize': handleWebTorrentOptimizeJob,
44 'optimize-to-webtorrent': handleWebTorrentOptimizeJob
45}
22 46
23async function processVideoTranscoding (job: Bull.Job) { 47async function processVideoTranscoding (job: Bull.Job) {
24 const payload = job.data as VideoTranscodingPayload 48 const payload = job.data as VideoTranscodingPayload
@@ -31,42 +55,62 @@ async function processVideoTranscoding (job: Bull.Job) {
31 return undefined 55 return undefined
32 } 56 }
33 57
34 if (payload.type === 'hls') { 58 const handler = handlers[payload.type]
35 const videoFileInput = payload.copyCodecs
36 ? video.getWebTorrentFile(payload.resolution)
37 : video.getMaxQualityFile()
38 59
39 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() 60 if (!handler) {
40 const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) 61 throw new Error('Cannot find transcoding handler for ' + payload.type)
62 }
41 63
42 await generateHlsPlaylist({ 64 await handler(job, payload, video)
43 video, 65
44 videoInputPath, 66 return video
45 resolution: payload.resolution, 67}
46 copyCodecs: payload.copyCodecs,
47 isPortraitMode: payload.isPortraitMode || false,
48 job
49 })
50 68
51 await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) 69// ---------------------------------------------------------------------------
52 } else if (payload.type === 'new-resolution') { 70// Job handlers
53 await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false, job) 71// ---------------------------------------------------------------------------
54 72
55 await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) 73async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) {
56 } else if (payload.type === 'merge-audio') { 74 const videoFileInput = payload.copyCodecs
57 await mergeAudioVideofile(video, payload.resolution, job) 75 ? video.getWebTorrentFile(payload.resolution)
76 : video.getMaxQualityFile()
77
78 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
79 const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput)
80
81 await generateHlsPlaylistResolution({
82 video,
83 videoInputPath,
84 resolution: payload.resolution,
85 copyCodecs: payload.copyCodecs,
86 isPortraitMode: payload.isPortraitMode || false,
87 job
88 })
58 89
59 await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) 90 await retryTransactionWrapper(onHlsPlaylistGeneration, video)
60 } else { 91}
61 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
62 92
63 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload, transcodeType) 93async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) {
64 } 94 await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
65 95
66 return video 96 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload)
97}
98
99async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) {
100 await mergeAudioVideofile(video, payload.resolution, job)
101
102 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload)
67} 103}
68 104
69async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { 105async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) {
106 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
107
108 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType)
109}
110
111// ---------------------------------------------------------------------------
112
113async function onHlsPlaylistGeneration (video: MVideoFullLight) {
70 if (video === undefined) return undefined 114 if (video === undefined) return undefined
71 115
72 // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it 116 // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
@@ -82,13 +126,7 @@ async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) {
82 return publishAndFederateIfNeeded(video) 126 return publishAndFederateIfNeeded(video)
83} 127}
84 128
85async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) { 129async function onVideoFileOptimizer (
86 await publishAndFederateIfNeeded(video)
87
88 createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true }))
89}
90
91async function onVideoFileOptimizerSuccess (
92 videoArg: MVideoWithFile, 130 videoArg: MVideoWithFile,
93 payload: OptimizeTranscodingPayload, 131 payload: OptimizeTranscodingPayload,
94 transcodeType: TranscodeOptionsType 132 transcodeType: TranscodeOptionsType
@@ -113,7 +151,7 @@ async function onVideoFileOptimizerSuccess (
113 151
114 let videoPublished = false 152 let videoPublished = false
115 153
116 // Generate HLS version of the max quality file 154 // Generate HLS version of the original file
117 const originalFileHLSPayload = Object.assign({}, payload, { 155 const originalFileHLSPayload = Object.assign({}, payload, {
118 isPortraitMode, 156 isPortraitMode,
119 resolution: videoDatabase.getMaxQualityFile().resolution, 157 resolution: videoDatabase.getMaxQualityFile().resolution,
@@ -122,36 +160,11 @@ async function onVideoFileOptimizerSuccess (
122 }) 160 })
123 createHlsJobIfEnabled(originalFileHLSPayload) 161 createHlsJobIfEnabled(originalFileHLSPayload)
124 162
125 if (resolutionsEnabled.length !== 0) { 163 const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode)
126 for (const resolution of resolutionsEnabled) {
127 let dataInput: VideoTranscodingPayload
128
129 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
130 dataInput = {
131 type: 'new-resolution' as 'new-resolution',
132 videoUUID: videoDatabase.uuid,
133 resolution,
134 isPortraitMode
135 }
136 } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
137 dataInput = {
138 type: 'hls',
139 videoUUID: videoDatabase.uuid,
140 resolution,
141 isPortraitMode,
142 copyCodecs: false
143 }
144 }
145
146 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
147 }
148 164
149 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) 165 if (!hasNewResolutions) {
150 } else {
151 // No transcoding to do, it's now published 166 // No transcoding to do, it's now published
152 videoPublished = await videoDatabase.publishIfNeededAndSave(t) 167 videoPublished = await videoDatabase.publishIfNeededAndSave(t)
153
154 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
155 } 168 }
156 169
157 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) 170 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
@@ -163,11 +176,20 @@ async function onVideoFileOptimizerSuccess (
163 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) 176 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
164} 177}
165 178
179async function onNewWebTorrentFileResolution (
180 video: MVideoUUID,
181 payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
182) {
183 await publishAndFederateIfNeeded(video)
184
185 createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true }))
186}
187
166// --------------------------------------------------------------------------- 188// ---------------------------------------------------------------------------
167 189
168export { 190export {
169 processVideoTranscoding, 191 processVideoTranscoding,
170 publishNewResolutionIfNeeded 192 onNewWebTorrentFileResolution
171} 193}
172 194
173// --------------------------------------------------------------------------- 195// ---------------------------------------------------------------------------
@@ -175,8 +197,8 @@ export {
175function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { 197function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) {
176 // Generate HLS playlist? 198 // Generate HLS playlist?
177 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { 199 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
178 const hlsTranscodingPayload = { 200 const hlsTranscodingPayload: HLSTranscodingPayload = {
179 type: 'hls' as 'hls', 201 type: 'new-resolution-to-hls',
180 videoUUID: payload.videoUUID, 202 videoUUID: payload.videoUUID,
181 resolution: payload.resolution, 203 resolution: payload.resolution,
182 isPortraitMode: payload.isPortraitMode, 204 isPortraitMode: payload.isPortraitMode,
@@ -186,3 +208,46 @@ function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number
186 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) 208 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
187 } 209 }
188} 210}
211
212function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) {
213 // Create transcoding jobs if there are enabled resolutions
214 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
215 logger.info(
216 'Resolutions computed for video %s and origin file resolution of %d.', video.uuid, videoFileResolution,
217 { resolutions: resolutionsEnabled }
218 )
219
220 if (resolutionsEnabled.length === 0) {
221 logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid)
222
223 return false
224 }
225
226 for (const resolution of resolutionsEnabled) {
227 let dataInput: VideoTranscodingPayload
228
229 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
230 // WebTorrent will create subsequent HLS job
231 dataInput = {
232 type: 'new-resolution-to-webtorrent',
233 videoUUID: video.uuid,
234 resolution,
235 isPortraitMode
236 }
237 } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
238 dataInput = {
239 type: 'new-resolution-to-hls',
240 videoUUID: video.uuid,
241 resolution,
242 isPortraitMode,
243 copyCodecs: false
244 }
245 }
246
247 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
248 }
249
250 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
251
252 return true
253}