diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts')
-rw-r--r-- | server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts new file mode 100644 index 000000000..7c892718b --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts | |||
@@ -0,0 +1,308 @@ | |||
1 | import Bluebird from 'bluebird' | ||
2 | import { computeOutputFPS } from '@server/helpers/ffmpeg' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { CONFIG } from '@server/initializers/config' | ||
5 | import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' | ||
6 | import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' | ||
7 | import { Hooks } from '@server/lib/plugins/hooks' | ||
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
10 | import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' | ||
11 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' | ||
12 | import { | ||
13 | HLSTranscodingPayload, | ||
14 | MergeAudioTranscodingPayload, | ||
15 | NewWebTorrentResolutionTranscodingPayload, | ||
16 | OptimizeTranscodingPayload, | ||
17 | VideoTranscodingPayload | ||
18 | } from '@shared/models' | ||
19 | import { canDoQuickTranscode } from '../../transcoding-quick-transcode' | ||
20 | import { computeResolutionsToTranscode } from '../../transcoding-resolutions' | ||
21 | import { AbstractJobBuilder } from './abstract-job-builder' | ||
22 | |||
23 | export class TranscodingJobQueueBuilder extends AbstractJobBuilder { | ||
24 | |||
25 | async createOptimizeOrMergeAudioJobs (options: { | ||
26 | video: MVideoFullLight | ||
27 | videoFile: MVideoFile | ||
28 | isNewVideo: boolean | ||
29 | user: MUserId | ||
30 | }) { | ||
31 | const { video, videoFile, isNewVideo, user } = options | ||
32 | |||
33 | let mergeOrOptimizePayload: MergeAudioTranscodingPayload | OptimizeTranscodingPayload | ||
34 | let nextTranscodingSequentialJobPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] | ||
35 | |||
36 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
37 | |||
38 | try { | ||
39 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { | ||
40 | const probe = await ffprobePromise(videoFilePath) | ||
41 | |||
42 | const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) | ||
43 | const hasAudio = await hasAudioStream(videoFilePath, probe) | ||
44 | const quickTranscode = await canDoQuickTranscode(videoFilePath, probe) | ||
45 | const inputFPS = videoFile.isAudio() | ||
46 | ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value | ||
47 | : await getVideoStreamFPS(videoFilePath, probe) | ||
48 | |||
49 | const maxResolution = await isAudioFile(videoFilePath, probe) | ||
50 | ? DEFAULT_AUDIO_RESOLUTION | ||
51 | : resolution | ||
52 | |||
53 | if (CONFIG.TRANSCODING.HLS.ENABLED === true) { | ||
54 | nextTranscodingSequentialJobPayloads.push([ | ||
55 | this.buildHLSJobPayload({ | ||
56 | deleteWebTorrentFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, | ||
57 | |||
58 | // We had some issues with a web video quick transcoded while producing a HLS version of it | ||
59 | copyCodecs: !quickTranscode, | ||
60 | |||
61 | resolution: maxResolution, | ||
62 | fps: computeOutputFPS({ inputFPS, resolution: maxResolution }), | ||
63 | videoUUID: video.uuid, | ||
64 | isNewVideo | ||
65 | }) | ||
66 | ]) | ||
67 | } | ||
68 | |||
69 | const lowerResolutionJobPayloads = await this.buildLowerResolutionJobPayloads({ | ||
70 | video, | ||
71 | inputVideoResolution: maxResolution, | ||
72 | inputVideoFPS: inputFPS, | ||
73 | hasAudio, | ||
74 | isNewVideo | ||
75 | }) | ||
76 | |||
77 | nextTranscodingSequentialJobPayloads = [ ...nextTranscodingSequentialJobPayloads, ...lowerResolutionJobPayloads ] | ||
78 | |||
79 | mergeOrOptimizePayload = videoFile.isAudio() | ||
80 | ? this.buildMergeAudioPayload({ videoUUID: video.uuid, isNewVideo }) | ||
81 | : this.buildOptimizePayload({ videoUUID: video.uuid, isNewVideo, quickTranscode }) | ||
82 | }) | ||
83 | } finally { | ||
84 | mutexReleaser() | ||
85 | } | ||
86 | |||
87 | const nextTranscodingSequentialJobs = await Bluebird.mapSeries(nextTranscodingSequentialJobPayloads, payloads => { | ||
88 | return Bluebird.mapSeries(payloads, payload => { | ||
89 | return this.buildTranscodingJob({ payload, user }) | ||
90 | }) | ||
91 | }) | ||
92 | |||
93 | const transcodingJobBuilderJob: CreateJobArgument = { | ||
94 | type: 'transcoding-job-builder', | ||
95 | payload: { | ||
96 | videoUUID: video.uuid, | ||
97 | sequentialJobs: nextTranscodingSequentialJobs | ||
98 | } | ||
99 | } | ||
100 | |||
101 | const mergeOrOptimizeJob = await this.buildTranscodingJob({ payload: mergeOrOptimizePayload, user }) | ||
102 | |||
103 | return JobQueue.Instance.createSequentialJobFlow(...[ mergeOrOptimizeJob, transcodingJobBuilderJob ]) | ||
104 | } | ||
105 | |||
106 | // --------------------------------------------------------------------------- | ||
107 | |||
108 | async createTranscodingJobs (options: { | ||
109 | transcodingType: 'hls' | 'webtorrent' | ||
110 | video: MVideoFullLight | ||
111 | resolutions: number[] | ||
112 | isNewVideo: boolean | ||
113 | user: MUserId | null | ||
114 | }) { | ||
115 | const { video, transcodingType, resolutions, isNewVideo } = options | ||
116 | |||
117 | const maxResolution = Math.max(...resolutions) | ||
118 | const childrenResolutions = resolutions.filter(r => r !== maxResolution) | ||
119 | |||
120 | logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) | ||
121 | |||
122 | const { fps: inputFPS } = await video.probeMaxQualityFile() | ||
123 | |||
124 | const children = childrenResolutions.map(resolution => { | ||
125 | const fps = computeOutputFPS({ inputFPS, resolution }) | ||
126 | |||
127 | if (transcodingType === 'hls') { | ||
128 | return this.buildHLSJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) | ||
129 | } | ||
130 | |||
131 | if (transcodingType === 'webtorrent') { | ||
132 | return this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) | ||
133 | } | ||
134 | |||
135 | throw new Error('Unknown transcoding type') | ||
136 | }) | ||
137 | |||
138 | const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) | ||
139 | |||
140 | const parent = transcodingType === 'hls' | ||
141 | ? this.buildHLSJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) | ||
142 | : this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) | ||
143 | |||
144 | // Process the last resolution after the other ones to prevent concurrency issue | ||
145 | // Because low resolutions use the biggest one as ffmpeg input | ||
146 | await this.createTranscodingJobsWithChildren({ videoUUID: video.uuid, parent, children, user: null }) | ||
147 | } | ||
148 | |||
149 | // --------------------------------------------------------------------------- | ||
150 | |||
151 | private async createTranscodingJobsWithChildren (options: { | ||
152 | videoUUID: string | ||
153 | parent: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload) | ||
154 | children: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload)[] | ||
155 | user: MUserId | null | ||
156 | }) { | ||
157 | const { videoUUID, parent, children, user } = options | ||
158 | |||
159 | const parentJob = await this.buildTranscodingJob({ payload: parent, user }) | ||
160 | const childrenJobs = await Bluebird.mapSeries(children, c => this.buildTranscodingJob({ payload: c, user })) | ||
161 | |||
162 | await JobQueue.Instance.createJobWithChildren(parentJob, childrenJobs) | ||
163 | |||
164 | await VideoJobInfoModel.increaseOrCreate(videoUUID, 'pendingTranscode', 1 + children.length) | ||
165 | } | ||
166 | |||
167 | private async buildTranscodingJob (options: { | ||
168 | payload: VideoTranscodingPayload | ||
169 | user: MUserId | null // null means we don't want priority | ||
170 | }) { | ||
171 | const { user, payload } = options | ||
172 | |||
173 | return { | ||
174 | type: 'video-transcoding' as 'video-transcoding', | ||
175 | priority: await this.getTranscodingJobPriority({ user, fallback: undefined }), | ||
176 | payload | ||
177 | } | ||
178 | } | ||
179 | |||
180 | private async buildLowerResolutionJobPayloads (options: { | ||
181 | video: MVideoWithFileThumbnail | ||
182 | inputVideoResolution: number | ||
183 | inputVideoFPS: number | ||
184 | hasAudio: boolean | ||
185 | isNewVideo: boolean | ||
186 | }) { | ||
187 | const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio } = options | ||
188 | |||
189 | // Create transcoding jobs if there are enabled resolutions | ||
190 | const resolutionsEnabled = await Hooks.wrapObject( | ||
191 | computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), | ||
192 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
193 | options | ||
194 | ) | ||
195 | |||
196 | const sequentialPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] | ||
197 | |||
198 | for (const resolution of resolutionsEnabled) { | ||
199 | const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) | ||
200 | |||
201 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { | ||
202 | const payloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[] = [ | ||
203 | this.buildWebTorrentJobPayload({ | ||
204 | videoUUID: video.uuid, | ||
205 | resolution, | ||
206 | fps, | ||
207 | isNewVideo | ||
208 | }) | ||
209 | ] | ||
210 | |||
211 | // Create a subsequent job to create HLS resolution that will just copy web video codecs | ||
212 | if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
213 | payloads.push( | ||
214 | this.buildHLSJobPayload({ | ||
215 | videoUUID: video.uuid, | ||
216 | resolution, | ||
217 | fps, | ||
218 | isNewVideo, | ||
219 | copyCodecs: true | ||
220 | }) | ||
221 | ) | ||
222 | } | ||
223 | |||
224 | sequentialPayloads.push(payloads) | ||
225 | } else if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
226 | sequentialPayloads.push([ | ||
227 | this.buildHLSJobPayload({ | ||
228 | videoUUID: video.uuid, | ||
229 | resolution, | ||
230 | fps, | ||
231 | copyCodecs: false, | ||
232 | isNewVideo | ||
233 | }) | ||
234 | ]) | ||
235 | } | ||
236 | } | ||
237 | |||
238 | return sequentialPayloads | ||
239 | } | ||
240 | |||
241 | private buildHLSJobPayload (options: { | ||
242 | videoUUID: string | ||
243 | resolution: number | ||
244 | fps: number | ||
245 | isNewVideo: boolean | ||
246 | deleteWebTorrentFiles?: boolean // default false | ||
247 | copyCodecs?: boolean // default false | ||
248 | }): HLSTranscodingPayload { | ||
249 | const { videoUUID, resolution, fps, isNewVideo, deleteWebTorrentFiles = false, copyCodecs = false } = options | ||
250 | |||
251 | return { | ||
252 | type: 'new-resolution-to-hls', | ||
253 | videoUUID, | ||
254 | resolution, | ||
255 | fps, | ||
256 | copyCodecs, | ||
257 | isNewVideo, | ||
258 | deleteWebTorrentFiles | ||
259 | } | ||
260 | } | ||
261 | |||
262 | private buildWebTorrentJobPayload (options: { | ||
263 | videoUUID: string | ||
264 | resolution: number | ||
265 | fps: number | ||
266 | isNewVideo: boolean | ||
267 | }): NewWebTorrentResolutionTranscodingPayload { | ||
268 | const { videoUUID, resolution, fps, isNewVideo } = options | ||
269 | |||
270 | return { | ||
271 | type: 'new-resolution-to-webtorrent', | ||
272 | videoUUID, | ||
273 | isNewVideo, | ||
274 | resolution, | ||
275 | fps | ||
276 | } | ||
277 | } | ||
278 | |||
279 | private buildMergeAudioPayload (options: { | ||
280 | videoUUID: string | ||
281 | isNewVideo: boolean | ||
282 | }): MergeAudioTranscodingPayload { | ||
283 | const { videoUUID, isNewVideo } = options | ||
284 | |||
285 | return { | ||
286 | type: 'merge-audio-to-webtorrent', | ||
287 | resolution: DEFAULT_AUDIO_RESOLUTION, | ||
288 | fps: VIDEO_TRANSCODING_FPS.AUDIO_MERGE, | ||
289 | videoUUID, | ||
290 | isNewVideo | ||
291 | } | ||
292 | } | ||
293 | |||
294 | private buildOptimizePayload (options: { | ||
295 | videoUUID: string | ||
296 | quickTranscode: boolean | ||
297 | isNewVideo: boolean | ||
298 | }): OptimizeTranscodingPayload { | ||
299 | const { videoUUID, quickTranscode, isNewVideo } = options | ||
300 | |||
301 | return { | ||
302 | type: 'optimize-to-webtorrent', | ||
303 | videoUUID, | ||
304 | isNewVideo, | ||
305 | quickTranscode | ||
306 | } | ||
307 | } | ||
308 | } | ||