diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/job-queue/handlers/video-transcoding.ts | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/job-queue/handlers/video-transcoding.ts')
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 282 |
1 files changed, 42 insertions, 240 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 3e6d23363..17b717275 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding' |
4 | import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding' |
5 | import { removeAllWebTorrentFiles } from '@server/lib/video-file' | ||
5 | import { VideoPathManager } from '@server/lib/video-path-manager' | 6 | import { VideoPathManager } from '@server/lib/video-path-manager' |
6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 7 | import { moveToFailedTranscodingState } from '@server/lib/video-state' |
7 | import { UserModel } from '@server/models/user/user' | 8 | import { UserModel } from '@server/models/user/user' |
8 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
9 | import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' | 10 | import { MUser, MUserId, MVideoFullLight } from '@server/types/models' |
10 | import { pick } from '@shared/core-utils' | ||
11 | import { | 11 | import { |
12 | HLSTranscodingPayload, | 12 | HLSTranscodingPayload, |
13 | MergeAudioTranscodingPayload, | 13 | MergeAudioTranscodingPayload, |
@@ -15,18 +15,8 @@ import { | |||
15 | OptimizeTranscodingPayload, | 15 | OptimizeTranscodingPayload, |
16 | VideoTranscodingPayload | 16 | VideoTranscodingPayload |
17 | } from '@shared/models' | 17 | } from '@shared/models' |
18 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
19 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg' | ||
20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
21 | import { CONFIG } from '../../../initializers/config' | ||
22 | import { VideoModel } from '../../../models/video/video' | 19 | import { VideoModel } from '../../../models/video/video' |
23 | import { | ||
24 | generateHlsPlaylistResolution, | ||
25 | mergeAudioVideofile, | ||
26 | optimizeOriginalVideofile, | ||
27 | transcodeNewWebTorrentResolution | ||
28 | } from '../../transcoding/transcoding' | ||
29 | import { JobQueue } from '../job-queue' | ||
30 | 20 | ||
31 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 21 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
32 | 22 | ||
@@ -84,260 +74,72 @@ export { | |||
84 | // Job handlers | 74 | // Job handlers |
85 | // --------------------------------------------------------------------------- | 75 | // --------------------------------------------------------------------------- |
86 | 76 | ||
87 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { | ||
88 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) | ||
89 | |||
90 | const videoFileInput = payload.copyCodecs | ||
91 | ? video.getWebTorrentFile(payload.resolution) | ||
92 | : video.getMaxQualityFile() | ||
93 | |||
94 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | ||
95 | |||
96 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
97 | |||
98 | try { | ||
99 | await videoFileInput.getVideo().reload() | ||
100 | |||
101 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { | ||
102 | return generateHlsPlaylistResolution({ | ||
103 | video, | ||
104 | videoInputPath, | ||
105 | inputFileMutexReleaser, | ||
106 | resolution: payload.resolution, | ||
107 | copyCodecs: payload.copyCodecs, | ||
108 | job | ||
109 | }) | ||
110 | }) | ||
111 | } finally { | ||
112 | inputFileMutexReleaser() | ||
113 | } | ||
114 | |||
115 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | ||
116 | |||
117 | await onHlsPlaylistGeneration(video, user, payload) | ||
118 | } | ||
119 | |||
120 | async function handleNewWebTorrentResolutionJob ( | ||
121 | job: Job, | ||
122 | payload: NewWebTorrentResolutionTranscodingPayload, | ||
123 | video: MVideoFullLight, | ||
124 | user: MUserId | ||
125 | ) { | ||
126 | logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) | ||
127 | |||
128 | await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job }) | ||
129 | |||
130 | logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | ||
131 | |||
132 | await onNewWebTorrentFileResolution(video, user, payload) | ||
133 | } | ||
134 | |||
135 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 77 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
136 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) | 78 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) |
137 | 79 | ||
138 | await mergeAudioVideofile({ video, resolution: payload.resolution, job }) | 80 | await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job }) |
139 | 81 | ||
140 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 82 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
141 | 83 | ||
142 | await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) | 84 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
143 | } | 85 | } |
144 | 86 | ||
145 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 87 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
146 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) | 88 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) |
147 | 89 | ||
148 | const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) | 90 | await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job }) |
149 | 91 | ||
150 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 92 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
151 | 93 | ||
152 | await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) | 94 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
153 | } | 95 | } |
154 | 96 | ||
155 | // --------------------------------------------------------------------------- | 97 | async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) { |
156 | 98 | logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) | |
157 | async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { | ||
158 | if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { | ||
159 | // Remove webtorrent files if not enabled | ||
160 | for (const file of video.VideoFiles) { | ||
161 | await video.removeWebTorrentFile(file) | ||
162 | await file.destroy() | ||
163 | } | ||
164 | |||
165 | video.VideoFiles = [] | ||
166 | |||
167 | // Create HLS new resolution jobs | ||
168 | await createLowerResolutionsJobs({ | ||
169 | video, | ||
170 | user, | ||
171 | videoFileResolution: payload.resolution, | ||
172 | hasAudio: payload.hasAudio, | ||
173 | isNewVideo: payload.isNewVideo ?? true, | ||
174 | type: 'hls' | ||
175 | }) | ||
176 | } | ||
177 | |||
178 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
179 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) | ||
180 | } | ||
181 | 99 | ||
182 | async function onVideoFirstWebTorrentTranscoding ( | 100 | await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job }) |
183 | videoArg: MVideoWithFile, | ||
184 | payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, | ||
185 | transcodeType: TranscodeVODOptionsType, | ||
186 | user: MUserId | ||
187 | ) { | ||
188 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) | ||
189 | 101 | ||
190 | try { | 102 | logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
191 | // Maybe the video changed in database, refresh it | ||
192 | const videoDatabase = await VideoModel.loadFull(videoArg.uuid) | ||
193 | // Video does not exist anymore | ||
194 | if (!videoDatabase) return undefined | ||
195 | |||
196 | const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile() | ||
197 | |||
198 | // Generate HLS version of the original file | ||
199 | const originalFileHLSPayload = { | ||
200 | ...payload, | ||
201 | |||
202 | hasAudio: !!audioStream, | ||
203 | resolution: videoDatabase.getMaxQualityFile().resolution, | ||
204 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | ||
205 | copyCodecs: transcodeType !== 'quick-transcode', | ||
206 | isMaxQuality: true | ||
207 | } | ||
208 | const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) | ||
209 | const hasNewResolutions = await createLowerResolutionsJobs({ | ||
210 | video: videoDatabase, | ||
211 | user, | ||
212 | videoFileResolution: resolution, | ||
213 | hasAudio: !!audioStream, | ||
214 | type: 'webtorrent', | ||
215 | isNewVideo: payload.isNewVideo ?? true | ||
216 | }) | ||
217 | |||
218 | await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') | ||
219 | 103 | ||
220 | // Move to next state if there are no other resolutions to generate | 104 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
221 | if (!hasHls && !hasNewResolutions) { | ||
222 | await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) | ||
223 | } | ||
224 | } finally { | ||
225 | mutexReleaser() | ||
226 | } | ||
227 | } | 105 | } |
228 | 106 | ||
229 | async function onNewWebTorrentFileResolution ( | 107 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { |
230 | video: MVideo, | 108 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) |
231 | user: MUserId, | ||
232 | payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload | ||
233 | ) { | ||
234 | if (payload.createHLSIfNeeded) { | ||
235 | await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) | ||
236 | } | ||
237 | |||
238 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
239 | 109 | ||
240 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) | 110 | const videoFileInput = payload.copyCodecs |
241 | } | 111 | ? video.getWebTorrentFile(payload.resolution) |
112 | : video.getMaxQualityFile() | ||
242 | 113 | ||
243 | // --------------------------------------------------------------------------- | 114 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() |
244 | 115 | ||
245 | async function createHlsJobIfEnabled (user: MUserId, payload: { | 116 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) |
246 | videoUUID: string | ||
247 | resolution: number | ||
248 | hasAudio: boolean | ||
249 | copyCodecs: boolean | ||
250 | isMaxQuality: boolean | ||
251 | isNewVideo?: boolean | ||
252 | }) { | ||
253 | if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false | ||
254 | |||
255 | const jobOptions = { | ||
256 | priority: await getTranscodingJobPriority(user) | ||
257 | } | ||
258 | 117 | ||
259 | const hlsTranscodingPayload: HLSTranscodingPayload = { | 118 | try { |
260 | type: 'new-resolution-to-hls', | 119 | await videoFileInput.getVideo().reload() |
261 | autoDeleteWebTorrentIfNeeded: true, | ||
262 | 120 | ||
263 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) | 121 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { |
122 | return generateHlsPlaylistResolution({ | ||
123 | video, | ||
124 | videoInputPath, | ||
125 | inputFileMutexReleaser, | ||
126 | resolution: payload.resolution, | ||
127 | fps: payload.fps, | ||
128 | copyCodecs: payload.copyCodecs, | ||
129 | job | ||
130 | }) | ||
131 | }) | ||
132 | } finally { | ||
133 | inputFileMutexReleaser() | ||
264 | } | 134 | } |
265 | 135 | ||
266 | await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) | 136 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
267 | |||
268 | return true | ||
269 | } | ||
270 | |||
271 | async function createLowerResolutionsJobs (options: { | ||
272 | video: MVideoFullLight | ||
273 | user: MUserId | ||
274 | videoFileResolution: number | ||
275 | hasAudio: boolean | ||
276 | isNewVideo: boolean | ||
277 | type: 'hls' | 'webtorrent' | ||
278 | }) { | ||
279 | const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options | ||
280 | |||
281 | // Create transcoding jobs if there are enabled resolutions | ||
282 | const resolutionsEnabled = await Hooks.wrapObject( | ||
283 | computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), | ||
284 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
285 | options | ||
286 | ) | ||
287 | |||
288 | const resolutionCreated: string[] = [] | ||
289 | |||
290 | for (const resolution of resolutionsEnabled) { | ||
291 | let dataInput: VideoTranscodingPayload | ||
292 | |||
293 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { | ||
294 | // WebTorrent will create subsequent HLS job | ||
295 | dataInput = { | ||
296 | type: 'new-resolution-to-webtorrent', | ||
297 | videoUUID: video.uuid, | ||
298 | resolution, | ||
299 | hasAudio, | ||
300 | createHLSIfNeeded: true, | ||
301 | isNewVideo | ||
302 | } | ||
303 | |||
304 | resolutionCreated.push('webtorrent-' + resolution) | ||
305 | } | ||
306 | |||
307 | if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { | ||
308 | dataInput = { | ||
309 | type: 'new-resolution-to-hls', | ||
310 | videoUUID: video.uuid, | ||
311 | resolution, | ||
312 | hasAudio, | ||
313 | copyCodecs: false, | ||
314 | isMaxQuality: false, | ||
315 | autoDeleteWebTorrentIfNeeded: true, | ||
316 | isNewVideo | ||
317 | } | ||
318 | |||
319 | resolutionCreated.push('hls-' + resolution) | ||
320 | } | ||
321 | |||
322 | if (!dataInput) continue | ||
323 | |||
324 | const jobOptions = { | ||
325 | priority: await getTranscodingJobPriority(user) | ||
326 | } | ||
327 | |||
328 | await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) | ||
329 | } | ||
330 | 137 | ||
331 | if (resolutionCreated.length === 0) { | 138 | if (payload.deleteWebTorrentFiles === true) { |
332 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) | 139 | logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid)) |
333 | 140 | ||
334 | return false | 141 | await removeAllWebTorrentFiles(video) |
335 | } | 142 | } |
336 | 143 | ||
337 | logger.info( | 144 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
338 | 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, | ||
339 | { resolutionCreated, ...lTags(video.uuid) } | ||
340 | ) | ||
341 | |||
342 | return true | ||
343 | } | 145 | } |