diff options
author | Chocobozzz <me@florianbigard.com> | 2021-01-21 15:58:17 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-01-21 15:58:17 +0100 |
commit | 24516aa26a6753517b379cf7b5104c1a24eccad6 (patch) | |
tree | a94b561faa63238f8ac69848f6b0379f0ade08a2 /server/lib/job-queue | |
parent | 3b01f4c0ac764ecb70efaadfd939ca868c28769c (diff) | |
download | PeerTube-24516aa26a6753517b379cf7b5104c1a24eccad6.tar.gz PeerTube-24516aa26a6753517b379cf7b5104c1a24eccad6.tar.zst PeerTube-24516aa26a6753517b379cf7b5104c1a24eccad6.zip |
Refactor transcoding job handlers
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-file-import.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-live-ending.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 199 |
3 files changed, 136 insertions, 71 deletions
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 18823ee9c..22e4d0cf1 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -9,7 +9,7 @@ import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprob | |||
9 | import { logger } from '../../../helpers/logger' | 9 | import { logger } from '../../../helpers/logger' |
10 | import { VideoModel } from '../../../models/video/video' | 10 | import { VideoModel } from '../../../models/video/video' |
11 | import { VideoFileModel } from '../../../models/video/video-file' | 11 | import { VideoFileModel } from '../../../models/video/video-file' |
12 | import { publishNewResolutionIfNeeded } from './video-transcoding' | 12 | import { onNewWebTorrentFileResolution } from './video-transcoding' |
13 | 13 | ||
14 | async function processVideoFileImport (job: Bull.Job) { | 14 | async function processVideoFileImport (job: Bull.Job) { |
15 | const payload = job.data as VideoFileImportPayload | 15 | const payload = job.data as VideoFileImportPayload |
@@ -24,7 +24,7 @@ async function processVideoFileImport (job: Bull.Job) { | |||
24 | 24 | ||
25 | await updateVideoFile(video, payload.filePath) | 25 | await updateVideoFile(video, payload.filePath) |
26 | 26 | ||
27 | await publishNewResolutionIfNeeded(video) | 27 | await onNewWebTorrentFileResolution(video) |
28 | return video | 28 | return video |
29 | } | 29 | } |
30 | 30 | ||
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 8018e2277..db6cd3682 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -7,7 +7,7 @@ import { LiveManager } from '@server/lib/live-manager' | |||
7 | import { generateVideoMiniature } from '@server/lib/thumbnail' | 7 | import { generateVideoMiniature } from '@server/lib/thumbnail' |
8 | import { publishAndFederateIfNeeded } from '@server/lib/video' | 8 | import { publishAndFederateIfNeeded } from '@server/lib/video' |
9 | import { getHLSDirectory } from '@server/lib/video-paths' | 9 | import { getHLSDirectory } from '@server/lib/video-paths' |
10 | import { generateHlsPlaylistFromTS } from '@server/lib/video-transcoding' | 10 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/video-transcoding' |
11 | import { VideoModel } from '@server/models/video/video' | 11 | import { VideoModel } from '@server/models/video/video' |
12 | import { VideoFileModel } from '@server/models/video/video-file' | 12 | import { VideoFileModel } from '@server/models/video/video-file' |
13 | import { VideoLiveModel } from '@server/models/video/video-live' | 13 | import { VideoLiveModel } from '@server/models/video/video-live' |
@@ -102,7 +102,7 @@ async function saveLive (video: MVideo, live: MVideoLive) { | |||
102 | 102 | ||
103 | const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) | 103 | const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) |
104 | 104 | ||
105 | const outputPath = await generateHlsPlaylistFromTS({ | 105 | const outputPath = await generateHlsPlaylistResolutionFromTS({ |
106 | video: videoWithFiles, | 106 | video: videoWithFiles, |
107 | concatenatedTsFilePath, | 107 | concatenatedTsFilePath, |
108 | resolution: videoFileResolution, | 108 | resolution: videoFileResolution, |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 083cec11a..0f6b3f753 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,8 +1,10 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' | ||
2 | import { publishAndFederateIfNeeded } from '@server/lib/video' | 3 | import { publishAndFederateIfNeeded } from '@server/lib/video' |
3 | import { getVideoFilePath } from '@server/lib/video-paths' | 4 | import { getVideoFilePath } from '@server/lib/video-paths' |
4 | import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' | 5 | import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' |
5 | import { | 6 | import { |
7 | HLSTranscodingPayload, | ||
6 | MergeAudioTranscodingPayload, | 8 | MergeAudioTranscodingPayload, |
7 | NewResolutionTranscodingPayload, | 9 | NewResolutionTranscodingPayload, |
8 | OptimizeTranscodingPayload, | 10 | OptimizeTranscodingPayload, |
@@ -16,9 +18,31 @@ import { sequelizeTypescript } from '../../../initializers/database' | |||
16 | import { VideoModel } from '../../../models/video/video' | 18 | import { VideoModel } from '../../../models/video/video' |
17 | import { federateVideoIfNeeded } from '../../activitypub/videos' | 19 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
18 | import { Notifier } from '../../notifier' | 20 | import { Notifier } from '../../notifier' |
19 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' | 21 | import { |
22 | generateHlsPlaylistResolution, | ||
23 | mergeAudioVideofile, | ||
24 | optimizeOriginalVideofile, | ||
25 | transcodeNewWebTorrentResolution | ||
26 | } from '../../video-transcoding' | ||
20 | import { JobQueue } from '../job-queue' | 27 | import { JobQueue } from '../job-queue' |
21 | import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' | 28 | |
29 | const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = { | ||
30 | // Deprecated, introduced in 3.1 | ||
31 | 'hls': handleHLSJob, | ||
32 | 'new-resolution-to-hls': handleHLSJob, | ||
33 | |||
34 | // Deprecated, introduced in 3.1 | ||
35 | 'new-resolution': handleNewWebTorrentResolutionJob, | ||
36 | 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob, | ||
37 | |||
38 | // Deprecated, introduced in 3.1 | ||
39 | 'merge-audio': handleWebTorrentMergeAudioJob, | ||
40 | 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob, | ||
41 | |||
42 | // Deprecated, introduced in 3.1 | ||
43 | 'optimize': handleWebTorrentOptimizeJob, | ||
44 | 'optimize-to-webtorrent': handleWebTorrentOptimizeJob | ||
45 | } | ||
22 | 46 | ||
23 | async function processVideoTranscoding (job: Bull.Job) { | 47 | async function processVideoTranscoding (job: Bull.Job) { |
24 | const payload = job.data as VideoTranscodingPayload | 48 | const payload = job.data as VideoTranscodingPayload |
@@ -31,42 +55,62 @@ async function processVideoTranscoding (job: Bull.Job) { | |||
31 | return undefined | 55 | return undefined |
32 | } | 56 | } |
33 | 57 | ||
34 | if (payload.type === 'hls') { | 58 | const handler = handlers[payload.type] |
35 | const videoFileInput = payload.copyCodecs | ||
36 | ? video.getWebTorrentFile(payload.resolution) | ||
37 | : video.getMaxQualityFile() | ||
38 | 59 | ||
39 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | 60 | if (!handler) { |
40 | const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) | 61 | throw new Error('Cannot find transcoding handler for ' + payload.type) |
62 | } | ||
41 | 63 | ||
42 | await generateHlsPlaylist({ | 64 | await handler(job, payload, video) |
43 | video, | 65 | |
44 | videoInputPath, | 66 | return video |
45 | resolution: payload.resolution, | 67 | } |
46 | copyCodecs: payload.copyCodecs, | ||
47 | isPortraitMode: payload.isPortraitMode || false, | ||
48 | job | ||
49 | }) | ||
50 | 68 | ||
51 | await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) | 69 | // --------------------------------------------------------------------------- |
52 | } else if (payload.type === 'new-resolution') { | 70 | // Job handlers |
53 | await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false, job) | 71 | // --------------------------------------------------------------------------- |
54 | 72 | ||
55 | await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) | 73 | async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { |
56 | } else if (payload.type === 'merge-audio') { | 74 | const videoFileInput = payload.copyCodecs |
57 | await mergeAudioVideofile(video, payload.resolution, job) | 75 | ? video.getWebTorrentFile(payload.resolution) |
76 | : video.getMaxQualityFile() | ||
77 | |||
78 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | ||
79 | const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) | ||
80 | |||
81 | await generateHlsPlaylistResolution({ | ||
82 | video, | ||
83 | videoInputPath, | ||
84 | resolution: payload.resolution, | ||
85 | copyCodecs: payload.copyCodecs, | ||
86 | isPortraitMode: payload.isPortraitMode || false, | ||
87 | job | ||
88 | }) | ||
58 | 89 | ||
59 | await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) | 90 | await retryTransactionWrapper(onHlsPlaylistGeneration, video) |
60 | } else { | 91 | } |
61 | const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) | ||
62 | 92 | ||
63 | await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload, transcodeType) | 93 | async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { |
64 | } | 94 | await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) |
65 | 95 | ||
66 | return video | 96 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) |
97 | } | ||
98 | |||
99 | async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { | ||
100 | await mergeAudioVideofile(video, payload.resolution, job) | ||
101 | |||
102 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) | ||
67 | } | 103 | } |
68 | 104 | ||
69 | async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { | 105 | async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { |
106 | const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) | ||
107 | |||
108 | await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) | ||
109 | } | ||
110 | |||
111 | // --------------------------------------------------------------------------- | ||
112 | |||
113 | async function onHlsPlaylistGeneration (video: MVideoFullLight) { | ||
70 | if (video === undefined) return undefined | 114 | if (video === undefined) return undefined |
71 | 115 | ||
72 | // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it | 116 | // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it |
@@ -82,13 +126,7 @@ async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { | |||
82 | return publishAndFederateIfNeeded(video) | 126 | return publishAndFederateIfNeeded(video) |
83 | } | 127 | } |
84 | 128 | ||
85 | async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) { | 129 | async function onVideoFileOptimizer ( |
86 | await publishAndFederateIfNeeded(video) | ||
87 | |||
88 | createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) | ||
89 | } | ||
90 | |||
91 | async function onVideoFileOptimizerSuccess ( | ||
92 | videoArg: MVideoWithFile, | 130 | videoArg: MVideoWithFile, |
93 | payload: OptimizeTranscodingPayload, | 131 | payload: OptimizeTranscodingPayload, |
94 | transcodeType: TranscodeOptionsType | 132 | transcodeType: TranscodeOptionsType |
@@ -113,7 +151,7 @@ async function onVideoFileOptimizerSuccess ( | |||
113 | 151 | ||
114 | let videoPublished = false | 152 | let videoPublished = false |
115 | 153 | ||
116 | // Generate HLS version of the max quality file | 154 | // Generate HLS version of the original file |
117 | const originalFileHLSPayload = Object.assign({}, payload, { | 155 | const originalFileHLSPayload = Object.assign({}, payload, { |
118 | isPortraitMode, | 156 | isPortraitMode, |
119 | resolution: videoDatabase.getMaxQualityFile().resolution, | 157 | resolution: videoDatabase.getMaxQualityFile().resolution, |
@@ -122,36 +160,11 @@ async function onVideoFileOptimizerSuccess ( | |||
122 | }) | 160 | }) |
123 | createHlsJobIfEnabled(originalFileHLSPayload) | 161 | createHlsJobIfEnabled(originalFileHLSPayload) |
124 | 162 | ||
125 | if (resolutionsEnabled.length !== 0) { | 163 | const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) |
126 | for (const resolution of resolutionsEnabled) { | ||
127 | let dataInput: VideoTranscodingPayload | ||
128 | |||
129 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { | ||
130 | dataInput = { | ||
131 | type: 'new-resolution' as 'new-resolution', | ||
132 | videoUUID: videoDatabase.uuid, | ||
133 | resolution, | ||
134 | isPortraitMode | ||
135 | } | ||
136 | } else if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
137 | dataInput = { | ||
138 | type: 'hls', | ||
139 | videoUUID: videoDatabase.uuid, | ||
140 | resolution, | ||
141 | isPortraitMode, | ||
142 | copyCodecs: false | ||
143 | } | ||
144 | } | ||
145 | |||
146 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | ||
147 | } | ||
148 | 164 | ||
149 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 165 | if (!hasNewResolutions) { |
150 | } else { | ||
151 | // No transcoding to do, it's now published | 166 | // No transcoding to do, it's now published |
152 | videoPublished = await videoDatabase.publishIfNeededAndSave(t) | 167 | videoPublished = await videoDatabase.publishIfNeededAndSave(t) |
153 | |||
154 | logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) | ||
155 | } | 168 | } |
156 | 169 | ||
157 | await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) | 170 | await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) |
@@ -163,11 +176,20 @@ async function onVideoFileOptimizerSuccess ( | |||
163 | if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) | 176 | if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) |
164 | } | 177 | } |
165 | 178 | ||
179 | async function onNewWebTorrentFileResolution ( | ||
180 | video: MVideoUUID, | ||
181 | payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload | ||
182 | ) { | ||
183 | await publishAndFederateIfNeeded(video) | ||
184 | |||
185 | createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) | ||
186 | } | ||
187 | |||
166 | // --------------------------------------------------------------------------- | 188 | // --------------------------------------------------------------------------- |
167 | 189 | ||
168 | export { | 190 | export { |
169 | processVideoTranscoding, | 191 | processVideoTranscoding, |
170 | publishNewResolutionIfNeeded | 192 | onNewWebTorrentFileResolution |
171 | } | 193 | } |
172 | 194 | ||
173 | // --------------------------------------------------------------------------- | 195 | // --------------------------------------------------------------------------- |
@@ -175,8 +197,8 @@ export { | |||
175 | function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { | 197 | function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { |
176 | // Generate HLS playlist? | 198 | // Generate HLS playlist? |
177 | if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { | 199 | if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { |
178 | const hlsTranscodingPayload = { | 200 | const hlsTranscodingPayload: HLSTranscodingPayload = { |
179 | type: 'hls' as 'hls', | 201 | type: 'new-resolution-to-hls', |
180 | videoUUID: payload.videoUUID, | 202 | videoUUID: payload.videoUUID, |
181 | resolution: payload.resolution, | 203 | resolution: payload.resolution, |
182 | isPortraitMode: payload.isPortraitMode, | 204 | isPortraitMode: payload.isPortraitMode, |
@@ -186,3 +208,46 @@ function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number | |||
186 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) | 208 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) |
187 | } | 209 | } |
188 | } | 210 | } |
211 | |||
212 | function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { | ||
213 | // Create transcoding jobs if there are enabled resolutions | ||
214 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') | ||
215 | logger.info( | ||
216 | 'Resolutions computed for video %s and origin file resolution of %d.', video.uuid, videoFileResolution, | ||
217 | { resolutions: resolutionsEnabled } | ||
218 | ) | ||
219 | |||
220 | if (resolutionsEnabled.length === 0) { | ||
221 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) | ||
222 | |||
223 | return false | ||
224 | } | ||
225 | |||
226 | for (const resolution of resolutionsEnabled) { | ||
227 | let dataInput: VideoTranscodingPayload | ||
228 | |||
229 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { | ||
230 | // WebTorrent will create subsequent HLS job | ||
231 | dataInput = { | ||
232 | type: 'new-resolution-to-webtorrent', | ||
233 | videoUUID: video.uuid, | ||
234 | resolution, | ||
235 | isPortraitMode | ||
236 | } | ||
237 | } else if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
238 | dataInput = { | ||
239 | type: 'new-resolution-to-hls', | ||
240 | videoUUID: video.uuid, | ||
241 | resolution, | ||
242 | isPortraitMode, | ||
243 | copyCodecs: false | ||
244 | } | ||
245 | } | ||
246 | |||
247 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | ||
248 | } | ||
249 | |||
250 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) | ||
251 | |||
252 | return true | ||
253 | } | ||