diff options
author | Chocobozzz <me@florianbigard.com> | 2021-01-21 16:57:21 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2021-01-25 14:38:52 +0100 |
commit | 77d7e851dccf17dcc89e8fcc2db3f655d1e63f95 (patch) | |
tree | d09e045dfabe7ab1e170d1b0caa9decda8a7d39c /server/lib/job-queue | |
parent | 92c871b40554d5285232eb4392cebb63d127704a (diff) | |
download | PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.tar.gz PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.tar.zst PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.zip |
Add priority to transcoding jobs
(1 = highest priority)
100 for new resolutions
10 for original file optimization
Add a malus for transcoding jobs depending on how many uploads the user did in the
last 7 days
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-file-import.ts | 5 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 91 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 2 |
4 files changed, 65 insertions, 37 deletions
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 22e4d0cf1..582efea3a 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -3,6 +3,7 @@ import { copy, stat } from 'fs-extra' | |||
3 | import { extname } from 'path' | 3 | import { extname } from 'path' |
4 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | 4 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' |
5 | import { getVideoFilePath } from '@server/lib/video-paths' | 5 | import { getVideoFilePath } from '@server/lib/video-paths' |
6 | import { UserModel } from '@server/models/account/user' | ||
6 | import { MVideoFile, MVideoWithFile } from '@server/types/models' | 7 | import { MVideoFile, MVideoWithFile } from '@server/types/models' |
7 | import { VideoFileImportPayload } from '@shared/models' | 8 | import { VideoFileImportPayload } from '@shared/models' |
8 | import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' | 9 | import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' |
@@ -24,7 +25,9 @@ async function processVideoFileImport (job: Bull.Job) { | |||
24 | 25 | ||
25 | await updateVideoFile(video, payload.filePath) | 26 | await updateVideoFile(video, payload.filePath) |
26 | 27 | ||
27 | await onNewWebTorrentFileResolution(video) | 28 | const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) |
29 | await onNewWebTorrentFileResolution(video, user) | ||
30 | |||
28 | return video | 31 | return video |
29 | } | 32 | } |
30 | 33 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index c1d1866b0..f61fd773a 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { move, remove, stat } from 'fs-extra' | 2 | import { move, remove, stat } from 'fs-extra' |
3 | import { extname } from 'path' | 3 | import { extname } from 'path' |
4 | import { addOptimizeOrMergeAudioJob } from '@server/helpers/video' | ||
5 | import { isPostImportVideoAccepted } from '@server/lib/moderation' | 4 | import { isPostImportVideoAccepted } from '@server/lib/moderation' |
6 | import { Hooks } from '@server/lib/plugins/hooks' | 5 | import { Hooks } from '@server/lib/plugins/hooks' |
7 | import { isAbleToUploadVideo } from '@server/lib/user' | 6 | import { isAbleToUploadVideo } from '@server/lib/user' |
7 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' | ||
8 | import { getVideoFilePath } from '@server/lib/video-paths' | 8 | import { getVideoFilePath } from '@server/lib/video-paths' |
9 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 9 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' |
10 | import { | 10 | import { |
@@ -224,7 +224,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
224 | 224 | ||
225 | // Create transcoding jobs? | 225 | // Create transcoding jobs? |
226 | if (video.state === VideoState.TO_TRANSCODE) { | 226 | if (video.state === VideoState.TO_TRANSCODE) { |
227 | await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile) | 227 | await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) |
228 | } | 228 | } |
229 | 229 | ||
230 | } catch (err) { | 230 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 0f6b3f753..ee241ad03 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,8 +1,9 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' | 2 | import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' |
3 | import { publishAndFederateIfNeeded } from '@server/lib/video' | 3 | import { JOB_PRIORITY } from '@server/initializers/constants' |
4 | import { getJobTranscodingPriorityMalus, publishAndFederateIfNeeded } from '@server/lib/video' | ||
4 | import { getVideoFilePath } from '@server/lib/video-paths' | 5 | import { getVideoFilePath } from '@server/lib/video-paths' |
5 | import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' | 6 | import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' |
6 | import { | 7 | import { |
7 | HLSTranscodingPayload, | 8 | HLSTranscodingPayload, |
8 | MergeAudioTranscodingPayload, | 9 | MergeAudioTranscodingPayload, |
@@ -25,8 +26,11 @@ import { | |||
25 | transcodeNewWebTorrentResolution | 26 | transcodeNewWebTorrentResolution |
26 | } from '../../video-transcoding' | 27 | } from '../../video-transcoding' |
27 | import { JobQueue } from '../job-queue' | 28 | import { JobQueue } from '../job-queue' |
29 | import { UserModel } from '@server/models/account/user' | ||
28 | 30 | ||
29 | const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = { | 31 | type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<any> |
32 | |||
33 | const handlers: { [ id: string ]: HandlerFunction } = { | ||
30 | // Deprecated, introduced in 3.1 | 34 | // Deprecated, introduced in 3.1 |
31 | 'hls': handleHLSJob, | 35 | 'hls': handleHLSJob, |
32 | 'new-resolution-to-hls': handleHLSJob, | 36 | 'new-resolution-to-hls': handleHLSJob, |
@@ -55,13 +59,15 @@ async function processVideoTranscoding (job: Bull.Job) { | |||
55 | return undefined | 59 | return undefined |
56 | } | 60 | } |
57 | 61 | ||
62 | const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) | ||
63 | |||
58 | const handler = handlers[payload.type] | 64 | const handler = handlers[payload.type] |
59 | 65 | ||
60 | if (!handler) { | 66 | if (!handler) { |
61 | throw new Error('Cannot find transcoding handler for ' + payload.type) | 67 | throw new Error('Cannot find transcoding handler for ' + payload.type) |
62 | } | 68 | } |
63 | 69 | ||
64 | await handler(job, payload, video) | 70 | await handler(job, payload, video, user) |
65 | 71 | ||
66 | return video | 72 | return video |
67 | } | 73 | } |
@@ -90,22 +96,27 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide | |||
90 | await retryTransactionWrapper(onHlsPlaylistGeneration, video) | 96 | await retryTransactionWrapper(onHlsPlaylistGeneration, video) |
91 | } | 97 | } |
92 | 98 | ||
93 | async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { | 99 | async function handleNewWebTorrentResolutionJob ( |
100 | job: Bull.Job, | ||
101 | payload: NewResolutionTranscodingPayload, | ||
102 | video: MVideoFullLight, | ||
103 | user: MUserId | ||
104 | ) { | ||
94 | await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) | 105 | await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) |
95 | 106 | ||
96 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) | 107 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) |
97 | } | 108 | } |
98 | 109 | ||
99 | async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { | 110 | async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
100 | await mergeAudioVideofile(video, payload.resolution, job) | 111 | await mergeAudioVideofile(video, payload.resolution, job) |
101 | 112 | ||
102 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) | 113 | await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) |
103 | } | 114 | } |
104 | 115 | ||
105 | async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { | 116 | async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
106 | const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) | 117 | const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) |
107 | 118 | ||
108 | await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) | 119 | await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) |
109 | } | 120 | } |
110 | 121 | ||
111 | // --------------------------------------------------------------------------- | 122 | // --------------------------------------------------------------------------- |
@@ -129,7 +140,8 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight) { | |||
129 | async function onVideoFileOptimizer ( | 140 | async function onVideoFileOptimizer ( |
130 | videoArg: MVideoWithFile, | 141 | videoArg: MVideoWithFile, |
131 | payload: OptimizeTranscodingPayload, | 142 | payload: OptimizeTranscodingPayload, |
132 | transcodeType: TranscodeOptionsType | 143 | transcodeType: TranscodeOptionsType, |
144 | user: MUserId | ||
133 | ) { | 145 | ) { |
134 | if (videoArg === undefined) return undefined | 146 | if (videoArg === undefined) return undefined |
135 | 147 | ||
@@ -142,13 +154,6 @@ async function onVideoFileOptimizer ( | |||
142 | // Video does not exist anymore | 154 | // Video does not exist anymore |
143 | if (!videoDatabase) return undefined | 155 | if (!videoDatabase) return undefined |
144 | 156 | ||
145 | // Create transcoding jobs if there are enabled resolutions | ||
146 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') | ||
147 | logger.info( | ||
148 | 'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution, | ||
149 | { resolutions: resolutionsEnabled } | ||
150 | ) | ||
151 | |||
152 | let videoPublished = false | 157 | let videoPublished = false |
153 | 158 | ||
154 | // Generate HLS version of the original file | 159 | // Generate HLS version of the original file |
@@ -158,9 +163,9 @@ async function onVideoFileOptimizer ( | |||
158 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | 163 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues |
159 | copyCodecs: transcodeType !== 'quick-transcode' | 164 | copyCodecs: transcodeType !== 'quick-transcode' |
160 | }) | 165 | }) |
161 | createHlsJobIfEnabled(originalFileHLSPayload) | 166 | await createHlsJobIfEnabled(user, originalFileHLSPayload) |
162 | 167 | ||
163 | const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) | 168 | const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) |
164 | 169 | ||
165 | if (!hasNewResolutions) { | 170 | if (!hasNewResolutions) { |
166 | // No transcoding to do, it's now published | 171 | // No transcoding to do, it's now published |
@@ -178,11 +183,12 @@ async function onVideoFileOptimizer ( | |||
178 | 183 | ||
179 | async function onNewWebTorrentFileResolution ( | 184 | async function onNewWebTorrentFileResolution ( |
180 | video: MVideoUUID, | 185 | video: MVideoUUID, |
186 | user: MUserId, | ||
181 | payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload | 187 | payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload |
182 | ) { | 188 | ) { |
183 | await publishAndFederateIfNeeded(video) | 189 | await publishAndFederateIfNeeded(video) |
184 | 190 | ||
185 | createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) | 191 | await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) |
186 | } | 192 | } |
187 | 193 | ||
188 | // --------------------------------------------------------------------------- | 194 | // --------------------------------------------------------------------------- |
@@ -194,22 +200,35 @@ export { | |||
194 | 200 | ||
195 | // --------------------------------------------------------------------------- | 201 | // --------------------------------------------------------------------------- |
196 | 202 | ||
197 | function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { | 203 | async function createHlsJobIfEnabled (user: MUserId, payload: { |
198 | // Generate HLS playlist? | 204 | videoUUID: string |
199 | if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { | 205 | resolution: number |
200 | const hlsTranscodingPayload: HLSTranscodingPayload = { | 206 | isPortraitMode?: boolean |
201 | type: 'new-resolution-to-hls', | 207 | copyCodecs: boolean |
202 | videoUUID: payload.videoUUID, | 208 | }) { |
203 | resolution: payload.resolution, | 209 | if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return |
204 | isPortraitMode: payload.isPortraitMode, | 210 | |
205 | copyCodecs: payload.copyCodecs | 211 | const jobOptions = { |
206 | } | 212 | priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) |
213 | } | ||
207 | 214 | ||
208 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) | 215 | const hlsTranscodingPayload: HLSTranscodingPayload = { |
216 | type: 'new-resolution-to-hls', | ||
217 | videoUUID: payload.videoUUID, | ||
218 | resolution: payload.resolution, | ||
219 | isPortraitMode: payload.isPortraitMode, | ||
220 | copyCodecs: payload.copyCodecs | ||
209 | } | 221 | } |
222 | |||
223 | return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) | ||
210 | } | 224 | } |
211 | 225 | ||
212 | function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { | 226 | async function createLowerResolutionsJobs ( |
227 | video: MVideoFullLight, | ||
228 | user: MUserId, | ||
229 | videoFileResolution: number, | ||
230 | isPortraitMode: boolean | ||
231 | ) { | ||
213 | // Create transcoding jobs if there are enabled resolutions | 232 | // Create transcoding jobs if there are enabled resolutions |
214 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') | 233 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') |
215 | logger.info( | 234 | logger.info( |
@@ -244,7 +263,11 @@ function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution | |||
244 | } | 263 | } |
245 | } | 264 | } |
246 | 265 | ||
247 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 266 | const jobOptions = { |
267 | priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) | ||
268 | } | ||
269 | |||
270 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) | ||
248 | } | 271 | } |
249 | 272 | ||
250 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) | 273 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 5d0b797b0..38b1d6f1f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -47,6 +47,7 @@ type CreateJobArgument = | |||
47 | 47 | ||
48 | type CreateJobOptions = { | 48 | type CreateJobOptions = { |
49 | delay?: number | 49 | delay?: number |
50 | priority?: number | ||
50 | } | 51 | } |
51 | 52 | ||
52 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | 53 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
@@ -148,6 +149,7 @@ class JobQueue { | |||
148 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 149 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
149 | attempts: JOB_ATTEMPTS[obj.type], | 150 | attempts: JOB_ATTEMPTS[obj.type], |
150 | timeout: JOB_TTL[obj.type], | 151 | timeout: JOB_TTL[obj.type], |
152 | priority: options.priority, | ||
151 | delay: options.delay | 153 | delay: options.delay |
152 | } | 154 | } |
153 | 155 | ||