aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/job-queue/handlers
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r--server/lib/job-queue/handlers/transcoding-job-builder.ts47
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-import.ts12
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts10
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts68
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts282
6 files changed, 135 insertions, 286 deletions
diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts
new file mode 100644
index 000000000..8b4a877d7
--- /dev/null
+++ b/server/lib/job-queue/handlers/transcoding-job-builder.ts
@@ -0,0 +1,47 @@
1import { Job } from 'bullmq'
2import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
3import { UserModel } from '@server/models/user/user'
4import { VideoModel } from '@server/models/video/video'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { pick } from '@shared/core-utils'
7import { TranscodingJobBuilderPayload } from '@shared/models'
8import { logger } from '../../../helpers/logger'
9import { JobQueue } from '../job-queue'
10
11async function processTranscodingJobBuilder (job: Job) {
12 const payload = job.data as TranscodingJobBuilderPayload
13
14 logger.info('Processing transcoding job builder in job %s.', job.id)
15
16 if (payload.optimizeJob) {
17 const video = await VideoModel.loadFull(payload.videoUUID)
18 const user = await UserModel.loadByVideoId(video.id)
19 const videoFile = video.getMaxQualityFile()
20
21 await createOptimizeOrMergeAudioJobs({
22 ...pick(payload.optimizeJob, [ 'isNewVideo' ]),
23
24 video,
25 videoFile,
26 user
27 })
28 }
29
30 for (const job of (payload.jobs || [])) {
31 await JobQueue.Instance.createJob(job)
32
33 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
34 }
35
36 for (const sequentialJobs of (payload.sequentialJobs || [])) {
37 await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs)
38
39 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.length)
40 }
41}
42
43// ---------------------------------------------------------------------------
44
45export {
46 processTranscodingJobBuilder
47}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index d950f6407..9a4550e4d 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -10,8 +10,8 @@ import { VideoModel } from '@server/models/video/video'
10import { VideoFileModel } from '@server/models/video/video-file' 10import { VideoFileModel } from '@server/models/video/video-file'
11import { MVideoFullLight } from '@server/types/models' 11import { MVideoFullLight } from '@server/types/models'
12import { getLowercaseExtension } from '@shared/core-utils' 12import { getLowercaseExtension } from '@shared/core-utils'
13import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg'
13import { VideoFileImportPayload, VideoStorage } from '@shared/models' 14import { VideoFileImportPayload, VideoStorage } from '@shared/models'
14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
15import { logger } from '../../../helpers/logger' 15import { logger } from '../../../helpers/logger'
16import { JobQueue } from '../job-queue' 16import { JobQueue } from '../job-queue'
17 17
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4d361c7b9..2a063282c 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -7,15 +7,16 @@ import { isPostImportVideoAccepted } from '@server/lib/moderation'
7import { generateWebTorrentVideoFilename } from '@server/lib/paths' 7import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Hooks } from '@server/lib/plugins/hooks' 8import { Hooks } from '@server/lib/plugins/hooks'
9import { ServerConfigManager } from '@server/lib/server-config-manager' 9import { ServerConfigManager } from '@server/lib/server-config-manager'
10import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
10import { isAbleToUploadVideo } from '@server/lib/user' 11import { isAbleToUploadVideo } from '@server/lib/user'
11import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' 12import { buildMoveToObjectStorageJob } from '@server/lib/video'
12import { VideoPathManager } from '@server/lib/video-path-manager' 13import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 14import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 15import { ThumbnailModel } from '@server/models/video/thumbnail'
15import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' 16import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
16import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 17import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
17import { getLowercaseExtension } from '@shared/core-utils' 18import { getLowercaseExtension } from '@shared/core-utils'
18import { isAudioFile } from '@shared/extra-utils' 19import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg'
19import { 20import {
20 ThumbnailType, 21 ThumbnailType,
21 VideoImportPayload, 22 VideoImportPayload,
@@ -28,7 +29,6 @@ import {
28 VideoResolution, 29 VideoResolution,
29 VideoState 30 VideoState
30} from '@shared/models' 31} from '@shared/models'
31import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '../../../helpers/ffmpeg'
32import { logger } from '../../../helpers/logger' 32import { logger } from '../../../helpers/logger'
33import { getSecureTorrentName } from '../../../helpers/utils' 33import { getSecureTorrentName } from '../../../helpers/utils'
34import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' 34import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent'
@@ -137,7 +137,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
137 137
138 const { resolution } = await isAudioFile(tempVideoPath, probe) 138 const { resolution } = await isAudioFile(tempVideoPath, probe)
139 ? { resolution: VideoResolution.H_NOVIDEO } 139 ? { resolution: VideoResolution.H_NOVIDEO }
140 : await getVideoStreamDimensionsInfo(tempVideoPath) 140 : await getVideoStreamDimensionsInfo(tempVideoPath, probe)
141 141
142 const fps = await getVideoStreamFPS(tempVideoPath, probe) 142 const fps = await getVideoStreamFPS(tempVideoPath, probe)
143 const duration = await getVideoStreamDuration(tempVideoPath, probe) 143 const duration = await getVideoStreamDuration(tempVideoPath, probe)
@@ -313,9 +313,7 @@ async function afterImportSuccess (options: {
313 } 313 }
314 314
315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? 315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs?
316 await JobQueue.Instance.createJob( 316 await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user })
317 await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
318 )
319 } 317 }
320} 318}
321 319
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 2f3a971bd..1bf43f592 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -1,25 +1,25 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { readdir, remove } from 'fs-extra' 2import { readdir, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 4import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' 6import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' 7import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
9import { generateVideoMiniature } from '@server/lib/thumbnail' 8import { generateVideoMiniature } from '@server/lib/thumbnail'
10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' 9import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding'
10import { VideoPathManager } from '@server/lib/video-path-manager'
11import { moveToNextState } from '@server/lib/video-state' 11import { moveToNextState } from '@server/lib/video-state'
12import { VideoModel } from '@server/models/video/video' 12import { VideoModel } from '@server/models/video/video'
13import { VideoBlacklistModel } from '@server/models/video/video-blacklist' 13import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
14import { VideoFileModel } from '@server/models/video/video-file' 14import { VideoFileModel } from '@server/models/video/video-file'
15import { VideoLiveModel } from '@server/models/video/video-live' 15import { VideoLiveModel } from '@server/models/video/video-live'
16import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
16import { VideoLiveSessionModel } from '@server/models/video/video-live-session' 17import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
17import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 18import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
18import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' 19import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
20import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg'
19import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' 21import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
20import { logger, loggerTagsFactory } from '../../../helpers/logger' 22import { logger, loggerTagsFactory } from '../../../helpers/logger'
21import { VideoPathManager } from '@server/lib/video-path-manager'
22import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
23 23
24const lTags = loggerTagsFactory('live', 'job') 24const lTags = loggerTagsFactory('live', 'job')
25 25
@@ -224,6 +224,7 @@ async function assignReplayFilesToVideo (options: {
224 const probe = await ffprobePromise(concatenatedTsFilePath) 224 const probe = await ffprobePromise(concatenatedTsFilePath)
225 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) 225 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe)
226 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) 226 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
227 const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe)
227 228
228 try { 229 try {
229 await generateHlsPlaylistResolutionFromTS({ 230 await generateHlsPlaylistResolutionFromTS({
@@ -231,6 +232,7 @@ async function assignReplayFilesToVideo (options: {
231 inputFileMutexReleaser, 232 inputFileMutexReleaser,
232 concatenatedTsFilePath, 233 concatenatedTsFilePath,
233 resolution, 234 resolution,
235 fps,
234 isAAC: audioStream?.codec_name === 'aac' 236 isAAC: audioStream?.codec_name === 'aac'
235 }) 237 })
236 } catch (err) { 238 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 3e208d83d..991d11ef1 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -1,15 +1,16 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { move, remove } from 'fs-extra' 2import { move, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' 4import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
5import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' 5import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config' 6import { CONFIG } from '@server/initializers/config'
7import { VIDEO_FILTERS } from '@server/initializers/constants'
7import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 8import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { generateWebTorrentVideoFilename } from '@server/lib/paths' 9import { generateWebTorrentVideoFilename } from '@server/lib/paths'
10import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' 11import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
10import { isAbleToUploadVideo } from '@server/lib/user' 12import { isAbleToUploadVideo } from '@server/lib/user'
11import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' 13import { buildFileMetadata, removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
13import { VideoPathManager } from '@server/lib/video-path-manager' 14import { VideoPathManager } from '@server/lib/video-path-manager'
14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' 15import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
15import { UserModel } from '@server/models/user/user' 16import { UserModel } from '@server/models/user/user'
@@ -17,15 +18,8 @@ import { VideoModel } from '@server/models/video/video'
17import { VideoFileModel } from '@server/models/video/video-file' 18import { VideoFileModel } from '@server/models/video/video-file'
18import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' 19import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models'
19import { getLowercaseExtension, pick } from '@shared/core-utils' 20import { getLowercaseExtension, pick } from '@shared/core-utils'
20import { 21import { buildUUID, getFileSize } from '@shared/extra-utils'
21 buildFileMetadata, 22import { FFmpegEdition, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg'
22 buildUUID,
23 ffprobePromise,
24 getFileSize,
25 getVideoStreamDimensionsInfo,
26 getVideoStreamDuration,
27 getVideoStreamFPS
28} from '@shared/extra-utils'
29import { 23import {
30 VideoStudioEditionPayload, 24 VideoStudioEditionPayload,
31 VideoStudioTask, 25 VideoStudioTask,
@@ -36,7 +30,6 @@ import {
36 VideoStudioTaskWatermarkPayload 30 VideoStudioTaskWatermarkPayload
37} from '@shared/models' 31} from '@shared/models'
38import { logger, loggerTagsFactory } from '../../../helpers/logger' 32import { logger, loggerTagsFactory } from '../../../helpers/logger'
39import { JobQueue } from '../job-queue'
40 33
41const lTagsBase = loggerTagsFactory('video-edition') 34const lTagsBase = loggerTagsFactory('video-edition')
42 35
@@ -102,9 +95,7 @@ async function processVideoStudioEdition (job: Job) {
102 95
103 const user = await UserModel.loadByVideoId(video.id) 96 const user = await UserModel.loadByVideoId(video.id)
104 97
105 await JobQueue.Instance.createJob( 98 await createOptimizeOrMergeAudioJobs({ video, videoFile: newFile, isNewVideo: false, user })
106 await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
107 )
108} 99}
109 100
110// --------------------------------------------------------------------------- 101// ---------------------------------------------------------------------------
@@ -131,9 +122,9 @@ const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessor
131} 122}
132 123
133async function processTask (options: TaskProcessorOptions) { 124async function processTask (options: TaskProcessorOptions) {
134 const { video, task } = options 125 const { video, task, lTags } = options
135 126
136 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) 127 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags })
137 128
138 const processor = taskProcessors[options.task.name] 129 const processor = taskProcessors[options.task.name]
139 if (!process) throw new Error('Unknown task ' + task.name) 130 if (!process) throw new Error('Unknown task ' + task.name)
@@ -142,48 +133,53 @@ async function processTask (options: TaskProcessorOptions) {
142} 133}
143 134
144function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { 135function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
145 const { task } = options 136 const { task, lTags } = options
137
138 logger.debug('Will add intro/outro to the video.', { options, ...lTags })
146 139
147 return addIntroOutro({ 140 return buildFFmpegEdition().addIntroOutro({
148 ...pick(options, [ 'inputPath', 'outputPath' ]), 141 ...pick(options, [ 'inputPath', 'outputPath' ]),
149 142
150 introOutroPath: task.options.file, 143 introOutroPath: task.options.file,
151 type: task.name === 'add-intro' 144 type: task.name === 'add-intro'
152 ? 'intro' 145 ? 'intro'
153 : 'outro', 146 : 'outro'
154
155 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
156 profile: CONFIG.TRANSCODING.PROFILE
157 }) 147 })
158} 148}
159 149
160function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { 150function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
161 const { task } = options 151 const { task, lTags } = options
162 152
163 return cutVideo({ 153 logger.debug('Will cut the video.', { options, ...lTags })
154
155 return buildFFmpegEdition().cutVideo({
164 ...pick(options, [ 'inputPath', 'outputPath' ]), 156 ...pick(options, [ 'inputPath', 'outputPath' ]),
165 157
166 start: task.options.start, 158 start: task.options.start,
167 end: task.options.end, 159 end: task.options.end
168
169 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
170 profile: CONFIG.TRANSCODING.PROFILE
171 }) 160 })
172} 161}
173 162
174function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { 163function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
175 const { task } = options 164 const { task, lTags } = options
165
166 logger.debug('Will add watermark to the video.', { options, ...lTags })
176 167
177 return addWatermark({ 168 return buildFFmpegEdition().addWatermark({
178 ...pick(options, [ 'inputPath', 'outputPath' ]), 169 ...pick(options, [ 'inputPath', 'outputPath' ]),
179 170
180 watermarkPath: task.options.file, 171 watermarkPath: task.options.file,
181 172
182 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 173 videoFilters: {
183 profile: CONFIG.TRANSCODING.PROFILE 174 watermarkSizeRatio: VIDEO_FILTERS.WATERMARK.SIZE_RATIO,
175 horitonzalMarginRatio: VIDEO_FILTERS.WATERMARK.HORIZONTAL_MARGIN_RATIO,
176 verticalMarginRatio: VIDEO_FILTERS.WATERMARK.VERTICAL_MARGIN_RATIO
177 }
184 }) 178 })
185} 179}
186 180
181// ---------------------------------------------------------------------------
182
187async function buildNewFile (video: MVideoId, path: string) { 183async function buildNewFile (video: MVideoId, path: string) {
188 const videoFile = new VideoFileModel({ 184 const videoFile = new VideoFileModel({
189 extname: getLowercaseExtension(path), 185 extname: getLowercaseExtension(path),
@@ -223,3 +219,7 @@ async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStud
223 throw new Error('Quota exceeded for this user to edit the video') 219 throw new Error('Quota exceeded for this user to edit the video')
224 } 220 }
225} 221}
222
223function buildFFmpegEdition () {
224 return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
225}
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 3e6d23363..17b717275 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,13 +1,13 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding'
4import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding'
5import { removeAllWebTorrentFiles } from '@server/lib/video-file'
5import { VideoPathManager } from '@server/lib/video-path-manager' 6import { VideoPathManager } from '@server/lib/video-path-manager'
6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' 7import { moveToFailedTranscodingState } from '@server/lib/video-state'
7import { UserModel } from '@server/models/user/user' 8import { UserModel } from '@server/models/user/user'
8import { VideoJobInfoModel } from '@server/models/video/video-job-info' 9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
9import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' 10import { MUser, MUserId, MVideoFullLight } from '@server/types/models'
10import { pick } from '@shared/core-utils'
11import { 11import {
12 HLSTranscodingPayload, 12 HLSTranscodingPayload,
13 MergeAudioTranscodingPayload, 13 MergeAudioTranscodingPayload,
@@ -15,18 +15,8 @@ import {
15 OptimizeTranscodingPayload, 15 OptimizeTranscodingPayload,
16 VideoTranscodingPayload 16 VideoTranscodingPayload
17} from '@shared/models' 17} from '@shared/models'
18import { retryTransactionWrapper } from '../../../helpers/database-utils'
19import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg'
20import { logger, loggerTagsFactory } from '../../../helpers/logger' 18import { logger, loggerTagsFactory } from '../../../helpers/logger'
21import { CONFIG } from '../../../initializers/config'
22import { VideoModel } from '../../../models/video/video' 19import { VideoModel } from '../../../models/video/video'
23import {
24 generateHlsPlaylistResolution,
25 mergeAudioVideofile,
26 optimizeOriginalVideofile,
27 transcodeNewWebTorrentResolution
28} from '../../transcoding/transcoding'
29import { JobQueue } from '../job-queue'
30 20
31type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> 21type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
32 22
@@ -84,260 +74,72 @@ export {
84// Job handlers 74// Job handlers
85// --------------------------------------------------------------------------- 75// ---------------------------------------------------------------------------
86 76
87async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
88 logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
89
90 const videoFileInput = payload.copyCodecs
91 ? video.getWebTorrentFile(payload.resolution)
92 : video.getMaxQualityFile()
93
94 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
95
96 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
97
98 try {
99 await videoFileInput.getVideo().reload()
100
101 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
102 return generateHlsPlaylistResolution({
103 video,
104 videoInputPath,
105 inputFileMutexReleaser,
106 resolution: payload.resolution,
107 copyCodecs: payload.copyCodecs,
108 job
109 })
110 })
111 } finally {
112 inputFileMutexReleaser()
113 }
114
115 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
116
117 await onHlsPlaylistGeneration(video, user, payload)
118}
119
120async function handleNewWebTorrentResolutionJob (
121 job: Job,
122 payload: NewWebTorrentResolutionTranscodingPayload,
123 video: MVideoFullLight,
124 user: MUserId
125) {
126 logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
127
128 await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job })
129
130 logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
131
132 await onNewWebTorrentFileResolution(video, user, payload)
133}
134
135async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { 77async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
136 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) 78 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid))
137 79
138 await mergeAudioVideofile({ video, resolution: payload.resolution, job }) 80 await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job })
139 81
140 logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) 82 logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid))
141 83
142 await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) 84 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
143} 85}
144 86
145async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { 87async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
146 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) 88 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid))
147 89
148 const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) 90 await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job })
149 91
150 logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) 92 logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid))
151 93
152 await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) 94 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
153} 95}
154 96
155// --------------------------------------------------------------------------- 97async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) {
156 98 logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
157async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) {
158 if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
159 // Remove webtorrent files if not enabled
160 for (const file of video.VideoFiles) {
161 await video.removeWebTorrentFile(file)
162 await file.destroy()
163 }
164
165 video.VideoFiles = []
166
167 // Create HLS new resolution jobs
168 await createLowerResolutionsJobs({
169 video,
170 user,
171 videoFileResolution: payload.resolution,
172 hasAudio: payload.hasAudio,
173 isNewVideo: payload.isNewVideo ?? true,
174 type: 'hls'
175 })
176 }
177
178 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
179 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo })
180}
181 99
182async function onVideoFirstWebTorrentTranscoding ( 100 await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job })
183 videoArg: MVideoWithFile,
184 payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload,
185 transcodeType: TranscodeVODOptionsType,
186 user: MUserId
187) {
188 const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid)
189 101
190 try { 102 logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
191 // Maybe the video changed in database, refresh it
192 const videoDatabase = await VideoModel.loadFull(videoArg.uuid)
193 // Video does not exist anymore
194 if (!videoDatabase) return undefined
195
196 const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile()
197
198 // Generate HLS version of the original file
199 const originalFileHLSPayload = {
200 ...payload,
201
202 hasAudio: !!audioStream,
203 resolution: videoDatabase.getMaxQualityFile().resolution,
204 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
205 copyCodecs: transcodeType !== 'quick-transcode',
206 isMaxQuality: true
207 }
208 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
209 const hasNewResolutions = await createLowerResolutionsJobs({
210 video: videoDatabase,
211 user,
212 videoFileResolution: resolution,
213 hasAudio: !!audioStream,
214 type: 'webtorrent',
215 isNewVideo: payload.isNewVideo ?? true
216 })
217
218 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
219 103
220 // Move to next state if there are no other resolutions to generate 104 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
221 if (!hasHls && !hasNewResolutions) {
222 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo })
223 }
224 } finally {
225 mutexReleaser()
226 }
227} 105}
228 106
229async function onNewWebTorrentFileResolution ( 107async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) {
230 video: MVideo, 108 logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
231 user: MUserId,
232 payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload
233) {
234 if (payload.createHLSIfNeeded) {
235 await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload })
236 }
237
238 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
239 109
240 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) 110 const videoFileInput = payload.copyCodecs
241} 111 ? video.getWebTorrentFile(payload.resolution)
112 : video.getMaxQualityFile()
242 113
243// --------------------------------------------------------------------------- 114 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
244 115
245async function createHlsJobIfEnabled (user: MUserId, payload: { 116 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
246 videoUUID: string
247 resolution: number
248 hasAudio: boolean
249 copyCodecs: boolean
250 isMaxQuality: boolean
251 isNewVideo?: boolean
252}) {
253 if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
254
255 const jobOptions = {
256 priority: await getTranscodingJobPriority(user)
257 }
258 117
259 const hlsTranscodingPayload: HLSTranscodingPayload = { 118 try {
260 type: 'new-resolution-to-hls', 119 await videoFileInput.getVideo().reload()
261 autoDeleteWebTorrentIfNeeded: true,
262 120
263 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) 121 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
122 return generateHlsPlaylistResolution({
123 video,
124 videoInputPath,
125 inputFileMutexReleaser,
126 resolution: payload.resolution,
127 fps: payload.fps,
128 copyCodecs: payload.copyCodecs,
129 job
130 })
131 })
132 } finally {
133 inputFileMutexReleaser()
264 } 134 }
265 135
266 await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) 136 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
267
268 return true
269}
270
271async function createLowerResolutionsJobs (options: {
272 video: MVideoFullLight
273 user: MUserId
274 videoFileResolution: number
275 hasAudio: boolean
276 isNewVideo: boolean
277 type: 'hls' | 'webtorrent'
278}) {
279 const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options
280
281 // Create transcoding jobs if there are enabled resolutions
282 const resolutionsEnabled = await Hooks.wrapObject(
283 computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }),
284 'filter:transcoding.auto.resolutions-to-transcode.result',
285 options
286 )
287
288 const resolutionCreated: string[] = []
289
290 for (const resolution of resolutionsEnabled) {
291 let dataInput: VideoTranscodingPayload
292
293 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
294 // WebTorrent will create subsequent HLS job
295 dataInput = {
296 type: 'new-resolution-to-webtorrent',
297 videoUUID: video.uuid,
298 resolution,
299 hasAudio,
300 createHLSIfNeeded: true,
301 isNewVideo
302 }
303
304 resolutionCreated.push('webtorrent-' + resolution)
305 }
306
307 if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') {
308 dataInput = {
309 type: 'new-resolution-to-hls',
310 videoUUID: video.uuid,
311 resolution,
312 hasAudio,
313 copyCodecs: false,
314 isMaxQuality: false,
315 autoDeleteWebTorrentIfNeeded: true,
316 isNewVideo
317 }
318
319 resolutionCreated.push('hls-' + resolution)
320 }
321
322 if (!dataInput) continue
323
324 const jobOptions = {
325 priority: await getTranscodingJobPriority(user)
326 }
327
328 await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
329 }
330 137
331 if (resolutionCreated.length === 0) { 138 if (payload.deleteWebTorrentFiles === true) {
332 logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) 139 logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid))
333 140
334 return false 141 await removeAllWebTorrentFiles(video)
335 } 142 }
336 143
337 logger.info( 144 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
338 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution,
339 { resolutionCreated, ...lTags(video.uuid) }
340 )
341
342 return true
343} 145}