diff options
author | Chocobozzz <me@florianbigard.com> | 2021-02-08 10:51:10 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2021-02-08 15:38:45 +0100 |
commit | 9129b7694d577322327ee79e9b9aa64deee92765 (patch) | |
tree | eb23b7a952048c3725f29109d38c36368976dec0 /server/lib | |
parent | 81b46cbc3417c46263c210c61b51a84a457abaaa (diff) | |
download | PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.gz PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.zst PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.zip |
Allow to specify transcoding and import jobs concurrency
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 44 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 12 | ||||
-rw-r--r-- | server/lib/video-transcoding.ts | 4 |
3 files changed, 36 insertions, 24 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4718a7d5c..e248b645e 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -76,7 +76,7 @@ async function processVideoTranscoding (job: Bull.Job) { | |||
76 | // Job handlers | 76 | // Job handlers |
77 | // --------------------------------------------------------------------------- | 77 | // --------------------------------------------------------------------------- |
78 | 78 | ||
79 | async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { | 79 | async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { |
80 | const videoFileInput = payload.copyCodecs | 80 | const videoFileInput = payload.copyCodecs |
81 | ? video.getWebTorrentFile(payload.resolution) | 81 | ? video.getWebTorrentFile(payload.resolution) |
82 | : video.getMaxQualityFile() | 82 | : video.getMaxQualityFile() |
@@ -93,7 +93,7 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide | |||
93 | job | 93 | job |
94 | }) | 94 | }) |
95 | 95 | ||
96 | await retryTransactionWrapper(onHlsPlaylistGeneration, video, payload.resolution) | 96 | await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) |
97 | } | 97 | } |
98 | 98 | ||
99 | async function handleNewWebTorrentResolutionJob ( | 99 | async function handleNewWebTorrentResolutionJob ( |
@@ -110,9 +110,7 @@ async function handleNewWebTorrentResolutionJob ( | |||
110 | async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 110 | async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
111 | await mergeAudioVideofile(video, payload.resolution, job) | 111 | await mergeAudioVideofile(video, payload.resolution, job) |
112 | 112 | ||
113 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) | 113 | await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user) |
114 | |||
115 | await createLowerResolutionsJobs(video, user, payload.resolution, false) | ||
116 | } | 114 | } |
117 | 115 | ||
118 | async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 116 | async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
@@ -123,13 +121,11 @@ async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTran | |||
123 | 121 | ||
124 | // --------------------------------------------------------------------------- | 122 | // --------------------------------------------------------------------------- |
125 | 123 | ||
126 | async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: number) { | 124 | async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { |
127 | if (video === undefined) return undefined | 125 | if (video === undefined) return undefined |
128 | 126 | ||
129 | const maxQualityFile = video.getMaxQualityFile() | 127 | if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { |
130 | 128 | // Remove webtorrent files if not enabled | |
131 | // We generated the max quality HLS playlist, we don't need the webtorrent files anymore if the admin disabled it | ||
132 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false && video.hasWebTorrentFiles() && maxQualityFile.resolution === resolution) { | ||
133 | for (const file of video.VideoFiles) { | 129 | for (const file of video.VideoFiles) { |
134 | await video.removeFile(file) | 130 | await video.removeFile(file) |
135 | await video.removeTorrent(file) | 131 | await video.removeTorrent(file) |
@@ -137,6 +133,9 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb | |||
137 | } | 133 | } |
138 | 134 | ||
139 | video.VideoFiles = [] | 135 | video.VideoFiles = [] |
136 | |||
137 | // Create HLS new resolution jobs | ||
138 | await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') | ||
140 | } | 139 | } |
141 | 140 | ||
142 | return publishAndFederateIfNeeded(video) | 141 | return publishAndFederateIfNeeded(video) |
@@ -144,7 +143,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb | |||
144 | 143 | ||
145 | async function onVideoFileOptimizer ( | 144 | async function onVideoFileOptimizer ( |
146 | videoArg: MVideoWithFile, | 145 | videoArg: MVideoWithFile, |
147 | payload: OptimizeTranscodingPayload, | 146 | payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, |
148 | transcodeType: TranscodeOptionsType, | 147 | transcodeType: TranscodeOptionsType, |
149 | user: MUserId | 148 | user: MUserId |
150 | ) { | 149 | ) { |
@@ -166,11 +165,12 @@ async function onVideoFileOptimizer ( | |||
166 | isPortraitMode, | 165 | isPortraitMode, |
167 | resolution: videoDatabase.getMaxQualityFile().resolution, | 166 | resolution: videoDatabase.getMaxQualityFile().resolution, |
168 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | 167 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues |
169 | copyCodecs: transcodeType !== 'quick-transcode' | 168 | copyCodecs: transcodeType !== 'quick-transcode', |
169 | isMaxQuality: true | ||
170 | }) | 170 | }) |
171 | await createHlsJobIfEnabled(user, originalFileHLSPayload) | 171 | await createHlsJobIfEnabled(user, originalFileHLSPayload) |
172 | 172 | ||
173 | const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) | 173 | const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent') |
174 | 174 | ||
175 | if (!hasNewResolutions) { | 175 | if (!hasNewResolutions) { |
176 | // No transcoding to do, it's now published | 176 | // No transcoding to do, it's now published |
@@ -193,7 +193,7 @@ async function onNewWebTorrentFileResolution ( | |||
193 | ) { | 193 | ) { |
194 | await publishAndFederateIfNeeded(video) | 194 | await publishAndFederateIfNeeded(video) |
195 | 195 | ||
196 | await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) | 196 | await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) |
197 | } | 197 | } |
198 | 198 | ||
199 | // --------------------------------------------------------------------------- | 199 | // --------------------------------------------------------------------------- |
@@ -210,6 +210,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
210 | resolution: number | 210 | resolution: number |
211 | isPortraitMode?: boolean | 211 | isPortraitMode?: boolean |
212 | copyCodecs: boolean | 212 | copyCodecs: boolean |
213 | isMaxQuality: boolean | ||
213 | }) { | 214 | }) { |
214 | if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return | 215 | if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return |
215 | 216 | ||
@@ -222,7 +223,8 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
222 | videoUUID: payload.videoUUID, | 223 | videoUUID: payload.videoUUID, |
223 | resolution: payload.resolution, | 224 | resolution: payload.resolution, |
224 | isPortraitMode: payload.isPortraitMode, | 225 | isPortraitMode: payload.isPortraitMode, |
225 | copyCodecs: payload.copyCodecs | 226 | copyCodecs: payload.copyCodecs, |
227 | isMaxQuality: payload.isMaxQuality | ||
226 | } | 228 | } |
227 | 229 | ||
228 | return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) | 230 | return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) |
@@ -232,7 +234,8 @@ async function createLowerResolutionsJobs ( | |||
232 | video: MVideoFullLight, | 234 | video: MVideoFullLight, |
233 | user: MUserId, | 235 | user: MUserId, |
234 | videoFileResolution: number, | 236 | videoFileResolution: number, |
235 | isPortraitMode: boolean | 237 | isPortraitMode: boolean, |
238 | type: 'hls' | 'webtorrent' | ||
236 | ) { | 239 | ) { |
237 | // Create transcoding jobs if there are enabled resolutions | 240 | // Create transcoding jobs if there are enabled resolutions |
238 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') | 241 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') |
@@ -250,7 +253,7 @@ async function createLowerResolutionsJobs ( | |||
250 | for (const resolution of resolutionsEnabled) { | 253 | for (const resolution of resolutionsEnabled) { |
251 | let dataInput: VideoTranscodingPayload | 254 | let dataInput: VideoTranscodingPayload |
252 | 255 | ||
253 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { | 256 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { |
254 | // WebTorrent will create subsequent HLS job | 257 | // WebTorrent will create subsequent HLS job |
255 | dataInput = { | 258 | dataInput = { |
256 | type: 'new-resolution-to-webtorrent', | 259 | type: 'new-resolution-to-webtorrent', |
@@ -258,13 +261,16 @@ async function createLowerResolutionsJobs ( | |||
258 | resolution, | 261 | resolution, |
259 | isPortraitMode | 262 | isPortraitMode |
260 | } | 263 | } |
261 | } else if (CONFIG.TRANSCODING.HLS.ENABLED) { | 264 | } |
265 | |||
266 | if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { | ||
262 | dataInput = { | 267 | dataInput = { |
263 | type: 'new-resolution-to-hls', | 268 | type: 'new-resolution-to-hls', |
264 | videoUUID: video.uuid, | 269 | videoUUID: video.uuid, |
265 | resolution, | 270 | resolution, |
266 | isPortraitMode, | 271 | isPortraitMode, |
267 | copyCodecs: false | 272 | copyCodecs: false, |
273 | isMaxQuality: false | ||
268 | } | 274 | } |
269 | } | 275 | } |
270 | 276 | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 38b1d6f1f..72fed6072 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,5 +1,6 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 2 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
3 | import { CONFIG } from '@server/initializers/config' | ||
3 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 4 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
4 | import { | 5 | import { |
5 | ActivitypubFollowPayload, | 6 | ActivitypubFollowPayload, |
@@ -105,11 +106,11 @@ class JobQueue { | |||
105 | } | 106 | } |
106 | } | 107 | } |
107 | 108 | ||
108 | for (const handlerName of Object.keys(handlers)) { | 109 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
109 | const queue = new Bull(handlerName, queueOptions) | 110 | const queue = new Bull(handlerName, queueOptions) |
110 | const handler = handlers[handlerName] | 111 | const handler = handlers[handlerName] |
111 | 112 | ||
112 | queue.process(JOB_CONCURRENCY[handlerName], handler) | 113 | queue.process(this.getJobConcurrency(handlerName), handler) |
113 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 114 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) |
114 | 115 | ||
115 | queue.on('failed', (job, err) => { | 116 | queue.on('failed', (job, err) => { |
@@ -235,6 +236,13 @@ class JobQueue { | |||
235 | return jobTypes.filter(t => t === jobType) | 236 | return jobTypes.filter(t => t === jobType) |
236 | } | 237 | } |
237 | 238 | ||
239 | private getJobConcurrency (jobType: JobType) { | ||
240 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | ||
241 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | ||
242 | |||
243 | return JOB_CONCURRENCY[jobType] | ||
244 | } | ||
245 | |||
238 | static get Instance () { | 246 | static get Instance () { |
239 | return this.instance || (this.instance = new this()) | 247 | return this.instance || (this.instance = new this()) |
240 | } | 248 | } |
diff --git a/server/lib/video-transcoding.ts b/server/lib/video-transcoding.ts index 88a6e0673..a58c9dd20 100644 --- a/server/lib/video-transcoding.ts +++ b/server/lib/video-transcoding.ts | |||
@@ -272,7 +272,7 @@ async function generateHlsPlaylistCommon (options: { | |||
272 | const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC, job } = options | 272 | const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC, job } = options |
273 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | 273 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR |
274 | 274 | ||
275 | const videoTranscodedBasePath = join(transcodeDirectory, type, video.uuid) | 275 | const videoTranscodedBasePath = join(transcodeDirectory, type) |
276 | await ensureDir(videoTranscodedBasePath) | 276 | await ensureDir(videoTranscodedBasePath) |
277 | 277 | ||
278 | const videoFilename = generateVideoStreamingPlaylistName(video.uuid, resolution) | 278 | const videoFilename = generateVideoStreamingPlaylistName(video.uuid, resolution) |
@@ -337,8 +337,6 @@ async function generateHlsPlaylistCommon (options: { | |||
337 | await move(playlistFileTranscodePath, playlistPath, { overwrite: true }) | 337 | await move(playlistFileTranscodePath, playlistPath, { overwrite: true }) |
338 | // Move video file | 338 | // Move video file |
339 | await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true }) | 339 | await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true }) |
340 | // Cleanup directory | ||
341 | await remove(videoTranscodedBasePath) | ||
342 | 340 | ||
343 | const stats = await stat(videoFilePath) | 341 | const stats = await stat(videoFilePath) |
344 | 342 | ||