aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-01-21 16:57:21 +0100
committerChocobozzz <chocobozzz@cpy.re>2021-01-25 14:38:52 +0100
commit77d7e851dccf17dcc89e8fcc2db3f655d1e63f95 (patch)
treed09e045dfabe7ab1e170d1b0caa9decda8a7d39c /server/lib/job-queue
parent92c871b40554d5285232eb4392cebb63d127704a (diff)
downloadPeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.tar.gz
PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.tar.zst
PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.zip
Add priority to transcoding jobs
(1 = highest priority) 100 for new resolutions 10 for original file optimization Add a malus for transcoding jobs depending on how many uploads the user did in the last 7 days
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts5
-rw-r--r--server/lib/job-queue/handlers/video-import.ts4
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts91
-rw-r--r--server/lib/job-queue/job-queue.ts2
4 files changed, 65 insertions, 37 deletions
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 22e4d0cf1..582efea3a 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -3,6 +3,7 @@ import { copy, stat } from 'fs-extra'
3import { extname } from 'path' 3import { extname } from 'path'
4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
5import { getVideoFilePath } from '@server/lib/video-paths' 5import { getVideoFilePath } from '@server/lib/video-paths'
6import { UserModel } from '@server/models/account/user'
6import { MVideoFile, MVideoWithFile } from '@server/types/models' 7import { MVideoFile, MVideoWithFile } from '@server/types/models'
7import { VideoFileImportPayload } from '@shared/models' 8import { VideoFileImportPayload } from '@shared/models'
8import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' 9import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils'
@@ -24,7 +25,9 @@ async function processVideoFileImport (job: Bull.Job) {
24 25
25 await updateVideoFile(video, payload.filePath) 26 await updateVideoFile(video, payload.filePath)
26 27
27 await onNewWebTorrentFileResolution(video) 28 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
29 await onNewWebTorrentFileResolution(video, user)
30
28 return video 31 return video
29} 32}
30 33
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index c1d1866b0..f61fd773a 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,10 +1,10 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { extname } from 'path' 3import { extname } from 'path'
4import { addOptimizeOrMergeAudioJob } from '@server/helpers/video'
5import { isPostImportVideoAccepted } from '@server/lib/moderation' 4import { isPostImportVideoAccepted } from '@server/lib/moderation'
6import { Hooks } from '@server/lib/plugins/hooks' 5import { Hooks } from '@server/lib/plugins/hooks'
7import { isAbleToUploadVideo } from '@server/lib/user' 6import { isAbleToUploadVideo } from '@server/lib/user'
7import { addOptimizeOrMergeAudioJob } from '@server/lib/video'
8import { getVideoFilePath } from '@server/lib/video-paths' 8import { getVideoFilePath } from '@server/lib/video-paths'
9import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 9import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
10import { 10import {
@@ -224,7 +224,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
224 224
225 // Create transcoding jobs? 225 // Create transcoding jobs?
226 if (video.state === VideoState.TO_TRANSCODE) { 226 if (video.state === VideoState.TO_TRANSCODE) {
227 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile) 227 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User)
228 } 228 }
229 229
230 } catch (err) { 230 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 0f6b3f753..ee241ad03 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,8 +1,9 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' 2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
3import { publishAndFederateIfNeeded } from '@server/lib/video' 3import { JOB_PRIORITY } from '@server/initializers/constants'
4import { getJobTranscodingPriorityMalus, publishAndFederateIfNeeded } from '@server/lib/video'
4import { getVideoFilePath } from '@server/lib/video-paths' 5import { getVideoFilePath } from '@server/lib/video-paths'
5import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' 6import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models'
6import { 7import {
7 HLSTranscodingPayload, 8 HLSTranscodingPayload,
8 MergeAudioTranscodingPayload, 9 MergeAudioTranscodingPayload,
@@ -25,8 +26,11 @@ import {
25 transcodeNewWebTorrentResolution 26 transcodeNewWebTorrentResolution
26} from '../../video-transcoding' 27} from '../../video-transcoding'
27import { JobQueue } from '../job-queue' 28import { JobQueue } from '../job-queue'
29import { UserModel } from '@server/models/account/user'
28 30
29const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = { 31type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<any>
32
33const handlers: { [ id: string ]: HandlerFunction } = {
30 // Deprecated, introduced in 3.1 34 // Deprecated, introduced in 3.1
31 'hls': handleHLSJob, 35 'hls': handleHLSJob,
32 'new-resolution-to-hls': handleHLSJob, 36 'new-resolution-to-hls': handleHLSJob,
@@ -55,13 +59,15 @@ async function processVideoTranscoding (job: Bull.Job) {
55 return undefined 59 return undefined
56 } 60 }
57 61
62 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
63
58 const handler = handlers[payload.type] 64 const handler = handlers[payload.type]
59 65
60 if (!handler) { 66 if (!handler) {
61 throw new Error('Cannot find transcoding handler for ' + payload.type) 67 throw new Error('Cannot find transcoding handler for ' + payload.type)
62 } 68 }
63 69
64 await handler(job, payload, video) 70 await handler(job, payload, video, user)
65 71
66 return video 72 return video
67} 73}
@@ -90,22 +96,27 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide
90 await retryTransactionWrapper(onHlsPlaylistGeneration, video) 96 await retryTransactionWrapper(onHlsPlaylistGeneration, video)
91} 97}
92 98
93async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { 99async function handleNewWebTorrentResolutionJob (
100 job: Bull.Job,
101 payload: NewResolutionTranscodingPayload,
102 video: MVideoFullLight,
103 user: MUserId
104) {
94 await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) 105 await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
95 106
96 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) 107 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
97} 108}
98 109
99async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { 110async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
100 await mergeAudioVideofile(video, payload.resolution, job) 111 await mergeAudioVideofile(video, payload.resolution, job)
101 112
102 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) 113 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
103} 114}
104 115
105async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { 116async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
106 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) 117 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
107 118
108 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) 119 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user)
109} 120}
110 121
111// --------------------------------------------------------------------------- 122// ---------------------------------------------------------------------------
@@ -129,7 +140,8 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight) {
129async function onVideoFileOptimizer ( 140async function onVideoFileOptimizer (
130 videoArg: MVideoWithFile, 141 videoArg: MVideoWithFile,
131 payload: OptimizeTranscodingPayload, 142 payload: OptimizeTranscodingPayload,
132 transcodeType: TranscodeOptionsType 143 transcodeType: TranscodeOptionsType,
144 user: MUserId
133) { 145) {
134 if (videoArg === undefined) return undefined 146 if (videoArg === undefined) return undefined
135 147
@@ -142,13 +154,6 @@ async function onVideoFileOptimizer (
142 // Video does not exist anymore 154 // Video does not exist anymore
143 if (!videoDatabase) return undefined 155 if (!videoDatabase) return undefined
144 156
145 // Create transcoding jobs if there are enabled resolutions
146 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
147 logger.info(
148 'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution,
149 { resolutions: resolutionsEnabled }
150 )
151
152 let videoPublished = false 157 let videoPublished = false
153 158
154 // Generate HLS version of the original file 159 // Generate HLS version of the original file
@@ -158,9 +163,9 @@ async function onVideoFileOptimizer (
158 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 163 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
159 copyCodecs: transcodeType !== 'quick-transcode' 164 copyCodecs: transcodeType !== 'quick-transcode'
160 }) 165 })
161 createHlsJobIfEnabled(originalFileHLSPayload) 166 await createHlsJobIfEnabled(user, originalFileHLSPayload)
162 167
163 const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) 168 const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode)
164 169
165 if (!hasNewResolutions) { 170 if (!hasNewResolutions) {
166 // No transcoding to do, it's now published 171 // No transcoding to do, it's now published
@@ -178,11 +183,12 @@ async function onVideoFileOptimizer (
178 183
179async function onNewWebTorrentFileResolution ( 184async function onNewWebTorrentFileResolution (
180 video: MVideoUUID, 185 video: MVideoUUID,
186 user: MUserId,
181 payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload 187 payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
182) { 188) {
183 await publishAndFederateIfNeeded(video) 189 await publishAndFederateIfNeeded(video)
184 190
185 createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) 191 await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true }))
186} 192}
187 193
188// --------------------------------------------------------------------------- 194// ---------------------------------------------------------------------------
@@ -194,22 +200,35 @@ export {
194 200
195// --------------------------------------------------------------------------- 201// ---------------------------------------------------------------------------
196 202
197function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { 203async function createHlsJobIfEnabled (user: MUserId, payload: {
198 // Generate HLS playlist? 204 videoUUID: string
199 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { 205 resolution: number
200 const hlsTranscodingPayload: HLSTranscodingPayload = { 206 isPortraitMode?: boolean
201 type: 'new-resolution-to-hls', 207 copyCodecs: boolean
202 videoUUID: payload.videoUUID, 208}) {
203 resolution: payload.resolution, 209 if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return
204 isPortraitMode: payload.isPortraitMode, 210
205 copyCodecs: payload.copyCodecs 211 const jobOptions = {
206 } 212 priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user)
213 }
207 214
208 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) 215 const hlsTranscodingPayload: HLSTranscodingPayload = {
216 type: 'new-resolution-to-hls',
217 videoUUID: payload.videoUUID,
218 resolution: payload.resolution,
219 isPortraitMode: payload.isPortraitMode,
220 copyCodecs: payload.copyCodecs
209 } 221 }
222
223 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions)
210} 224}
211 225
212function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { 226async function createLowerResolutionsJobs (
227 video: MVideoFullLight,
228 user: MUserId,
229 videoFileResolution: number,
230 isPortraitMode: boolean
231) {
213 // Create transcoding jobs if there are enabled resolutions 232 // Create transcoding jobs if there are enabled resolutions
214 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') 233 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
215 logger.info( 234 logger.info(
@@ -244,7 +263,11 @@ function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution
244 } 263 }
245 } 264 }
246 265
247 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) 266 const jobOptions = {
267 priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user)
268 }
269
270 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions)
248 } 271 }
249 272
250 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) 273 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 5d0b797b0..38b1d6f1f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -47,6 +47,7 @@ type CreateJobArgument =
47 47
48type CreateJobOptions = { 48type CreateJobOptions = {
49 delay?: number 49 delay?: number
50 priority?: number
50} 51}
51 52
52const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { 53const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
@@ -148,6 +149,7 @@ class JobQueue {
148 backoff: { delay: 60 * 1000, type: 'exponential' }, 149 backoff: { delay: 60 * 1000, type: 'exponential' },
149 attempts: JOB_ATTEMPTS[obj.type], 150 attempts: JOB_ATTEMPTS[obj.type],
150 timeout: JOB_TTL[obj.type], 151 timeout: JOB_TTL[obj.type],
152 priority: options.priority,
151 delay: options.delay 153 delay: options.delay
152 } 154 }
153 155