aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-02-08 10:51:10 +0100
committerChocobozzz <chocobozzz@cpy.re>2021-02-08 15:38:45 +0100
commit9129b7694d577322327ee79e9b9aa64deee92765 (patch)
treeeb23b7a952048c3725f29109d38c36368976dec0 /server/lib
parent81b46cbc3417c46263c210c61b51a84a457abaaa (diff)
downloadPeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.gz
PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.zst
PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.zip
Allow to specify transcoding and import jobs concurrency
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts44
-rw-r--r--server/lib/job-queue/job-queue.ts12
-rw-r--r--server/lib/video-transcoding.ts4
3 files changed, 36 insertions, 24 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 4718a7d5c..e248b645e 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -76,7 +76,7 @@ async function processVideoTranscoding (job: Bull.Job) {
76// Job handlers 76// Job handlers
77// --------------------------------------------------------------------------- 77// ---------------------------------------------------------------------------
78 78
79async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { 79async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
80 const videoFileInput = payload.copyCodecs 80 const videoFileInput = payload.copyCodecs
81 ? video.getWebTorrentFile(payload.resolution) 81 ? video.getWebTorrentFile(payload.resolution)
82 : video.getMaxQualityFile() 82 : video.getMaxQualityFile()
@@ -93,7 +93,7 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide
93 job 93 job
94 }) 94 })
95 95
96 await retryTransactionWrapper(onHlsPlaylistGeneration, video, payload.resolution) 96 await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload)
97} 97}
98 98
99async function handleNewWebTorrentResolutionJob ( 99async function handleNewWebTorrentResolutionJob (
@@ -110,9 +110,7 @@ async function handleNewWebTorrentResolutionJob (
110async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { 110async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
111 await mergeAudioVideofile(video, payload.resolution, job) 111 await mergeAudioVideofile(video, payload.resolution, job)
112 112
113 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) 113 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user)
114
115 await createLowerResolutionsJobs(video, user, payload.resolution, false)
116} 114}
117 115
118async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { 116async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
@@ -123,13 +121,11 @@ async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTran
123 121
124// --------------------------------------------------------------------------- 122// ---------------------------------------------------------------------------
125 123
126async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: number) { 124async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) {
127 if (video === undefined) return undefined 125 if (video === undefined) return undefined
128 126
129 const maxQualityFile = video.getMaxQualityFile() 127 if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
130 128 // Remove webtorrent files if not enabled
131 // We generated the max quality HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
132 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false && video.hasWebTorrentFiles() && maxQualityFile.resolution === resolution) {
133 for (const file of video.VideoFiles) { 129 for (const file of video.VideoFiles) {
134 await video.removeFile(file) 130 await video.removeFile(file)
135 await video.removeTorrent(file) 131 await video.removeTorrent(file)
@@ -137,6 +133,9 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb
137 } 133 }
138 134
139 video.VideoFiles = [] 135 video.VideoFiles = []
136
137 // Create HLS new resolution jobs
138 await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls')
140 } 139 }
141 140
142 return publishAndFederateIfNeeded(video) 141 return publishAndFederateIfNeeded(video)
@@ -144,7 +143,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, resolution: numb
144 143
145async function onVideoFileOptimizer ( 144async function onVideoFileOptimizer (
146 videoArg: MVideoWithFile, 145 videoArg: MVideoWithFile,
147 payload: OptimizeTranscodingPayload, 146 payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload,
148 transcodeType: TranscodeOptionsType, 147 transcodeType: TranscodeOptionsType,
149 user: MUserId 148 user: MUserId
150) { 149) {
@@ -166,11 +165,12 @@ async function onVideoFileOptimizer (
166 isPortraitMode, 165 isPortraitMode,
167 resolution: videoDatabase.getMaxQualityFile().resolution, 166 resolution: videoDatabase.getMaxQualityFile().resolution,
168 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 167 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
169 copyCodecs: transcodeType !== 'quick-transcode' 168 copyCodecs: transcodeType !== 'quick-transcode',
169 isMaxQuality: true
170 }) 170 })
171 await createHlsJobIfEnabled(user, originalFileHLSPayload) 171 await createHlsJobIfEnabled(user, originalFileHLSPayload)
172 172
173 const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) 173 const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode, 'webtorrent')
174 174
175 if (!hasNewResolutions) { 175 if (!hasNewResolutions) {
176 // No transcoding to do, it's now published 176 // No transcoding to do, it's now published
@@ -193,7 +193,7 @@ async function onNewWebTorrentFileResolution (
193) { 193) {
194 await publishAndFederateIfNeeded(video) 194 await publishAndFederateIfNeeded(video)
195 195
196 await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) 196 await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false }))
197} 197}
198 198
199// --------------------------------------------------------------------------- 199// ---------------------------------------------------------------------------
@@ -210,6 +210,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
210 resolution: number 210 resolution: number
211 isPortraitMode?: boolean 211 isPortraitMode?: boolean
212 copyCodecs: boolean 212 copyCodecs: boolean
213 isMaxQuality: boolean
213}) { 214}) {
214 if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return 215 if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return
215 216
@@ -222,7 +223,8 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
222 videoUUID: payload.videoUUID, 223 videoUUID: payload.videoUUID,
223 resolution: payload.resolution, 224 resolution: payload.resolution,
224 isPortraitMode: payload.isPortraitMode, 225 isPortraitMode: payload.isPortraitMode,
225 copyCodecs: payload.copyCodecs 226 copyCodecs: payload.copyCodecs,
227 isMaxQuality: payload.isMaxQuality
226 } 228 }
227 229
228 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) 230 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions)
@@ -232,7 +234,8 @@ async function createLowerResolutionsJobs (
232 video: MVideoFullLight, 234 video: MVideoFullLight,
233 user: MUserId, 235 user: MUserId,
234 videoFileResolution: number, 236 videoFileResolution: number,
235 isPortraitMode: boolean 237 isPortraitMode: boolean,
238 type: 'hls' | 'webtorrent'
236) { 239) {
237 // Create transcoding jobs if there are enabled resolutions 240 // Create transcoding jobs if there are enabled resolutions
238 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') 241 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
@@ -250,7 +253,7 @@ async function createLowerResolutionsJobs (
250 for (const resolution of resolutionsEnabled) { 253 for (const resolution of resolutionsEnabled) {
251 let dataInput: VideoTranscodingPayload 254 let dataInput: VideoTranscodingPayload
252 255
253 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { 256 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
254 // WebTorrent will create subsequent HLS job 257 // WebTorrent will create subsequent HLS job
255 dataInput = { 258 dataInput = {
256 type: 'new-resolution-to-webtorrent', 259 type: 'new-resolution-to-webtorrent',
@@ -258,13 +261,16 @@ async function createLowerResolutionsJobs (
258 resolution, 261 resolution,
259 isPortraitMode 262 isPortraitMode
260 } 263 }
261 } else if (CONFIG.TRANSCODING.HLS.ENABLED) { 264 }
265
266 if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') {
262 dataInput = { 267 dataInput = {
263 type: 'new-resolution-to-hls', 268 type: 'new-resolution-to-hls',
264 videoUUID: video.uuid, 269 videoUUID: video.uuid,
265 resolution, 270 resolution,
266 isPortraitMode, 271 isPortraitMode,
267 copyCodecs: false 272 copyCodecs: false,
273 isMaxQuality: false
268 } 274 }
269 } 275 }
270 276
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 38b1d6f1f..72fed6072 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,5 +1,6 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { jobStates } from '@server/helpers/custom-validators/jobs' 2import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { CONFIG } from '@server/initializers/config'
3import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
4import { 5import {
5 ActivitypubFollowPayload, 6 ActivitypubFollowPayload,
@@ -105,11 +106,11 @@ class JobQueue {
105 } 106 }
106 } 107 }
107 108
108 for (const handlerName of Object.keys(handlers)) { 109 for (const handlerName of (Object.keys(handlers) as JobType[])) {
109 const queue = new Bull(handlerName, queueOptions) 110 const queue = new Bull(handlerName, queueOptions)
110 const handler = handlers[handlerName] 111 const handler = handlers[handlerName]
111 112
112 queue.process(JOB_CONCURRENCY[handlerName], handler) 113 queue.process(this.getJobConcurrency(handlerName), handler)
113 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 114 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
114 115
115 queue.on('failed', (job, err) => { 116 queue.on('failed', (job, err) => {
@@ -235,6 +236,13 @@ class JobQueue {
235 return jobTypes.filter(t => t === jobType) 236 return jobTypes.filter(t => t === jobType)
236 } 237 }
237 238
239 private getJobConcurrency (jobType: JobType) {
240 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
241 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
242
243 return JOB_CONCURRENCY[jobType]
244 }
245
238 static get Instance () { 246 static get Instance () {
239 return this.instance || (this.instance = new this()) 247 return this.instance || (this.instance = new this())
240 } 248 }
diff --git a/server/lib/video-transcoding.ts b/server/lib/video-transcoding.ts
index 88a6e0673..a58c9dd20 100644
--- a/server/lib/video-transcoding.ts
+++ b/server/lib/video-transcoding.ts
@@ -272,7 +272,7 @@ async function generateHlsPlaylistCommon (options: {
272 const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC, job } = options 272 const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC, job } = options
273 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR 273 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
274 274
275 const videoTranscodedBasePath = join(transcodeDirectory, type, video.uuid) 275 const videoTranscodedBasePath = join(transcodeDirectory, type)
276 await ensureDir(videoTranscodedBasePath) 276 await ensureDir(videoTranscodedBasePath)
277 277
278 const videoFilename = generateVideoStreamingPlaylistName(video.uuid, resolution) 278 const videoFilename = generateVideoStreamingPlaylistName(video.uuid, resolution)
@@ -337,8 +337,6 @@ async function generateHlsPlaylistCommon (options: {
337 await move(playlistFileTranscodePath, playlistPath, { overwrite: true }) 337 await move(playlistFileTranscodePath, playlistPath, { overwrite: true })
338 // Move video file 338 // Move video file
339 await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true }) 339 await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true })
340 // Cleanup directory
341 await remove(videoTranscodedBasePath)
342 340
343 const stats = await stat(videoFilePath) 341 const stats = await stat(videoFilePath)
344 342