aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/hls.ts18
-rw-r--r--server/lib/job-queue/handlers/transcoding-job-builder.ts47
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-import.ts12
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts10
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts68
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts282
-rw-r--r--server/lib/job-queue/job-queue.ts63
-rw-r--r--server/lib/live/live-manager.ts94
-rw-r--r--server/lib/live/live-segment-sha-store.ts5
-rw-r--r--server/lib/live/live-utils.ts12
-rw-r--r--server/lib/live/shared/muxing-session.ts191
-rw-r--r--server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts101
-rw-r--r--server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts95
-rw-r--r--server/lib/live/shared/transcoding-wrapper/index.ts3
-rw-r--r--server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts20
-rw-r--r--server/lib/object-storage/index.ts1
-rw-r--r--server/lib/object-storage/proxy.ts97
-rw-r--r--server/lib/peertube-socket.ts32
-rw-r--r--server/lib/plugins/plugin-helpers-builder.ts2
-rw-r--r--server/lib/runners/index.ts3
-rw-r--r--server/lib/runners/job-handlers/abstract-job-handler.ts271
-rw-r--r--server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts71
-rw-r--r--server/lib/runners/job-handlers/index.ts6
-rw-r--r--server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts170
-rw-r--r--server/lib/runners/job-handlers/runner-job-handlers.ts18
-rw-r--r--server/lib/runners/job-handlers/shared/index.ts1
-rw-r--r--server/lib/runners/job-handlers/shared/vod-helpers.ts44
-rw-r--r--server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts97
-rw-r--r--server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts114
-rw-r--r--server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts84
-rw-r--r--server/lib/runners/runner-urls.ts9
-rw-r--r--server/lib/runners/runner.ts36
-rw-r--r--server/lib/schedulers/runner-job-watch-dog-scheduler.ts42
-rw-r--r--server/lib/server-config-manager.ts10
-rw-r--r--server/lib/transcoding/create-transcoding-job.ts36
-rw-r--r--server/lib/transcoding/default-transcoding-profiles.ts16
-rw-r--r--server/lib/transcoding/ended-transcoding.ts18
-rw-r--r--server/lib/transcoding/hls-transcoding.ts181
-rw-r--r--server/lib/transcoding/shared/ffmpeg-builder.ts18
-rw-r--r--server/lib/transcoding/shared/index.ts2
-rw-r--r--server/lib/transcoding/shared/job-builders/abstract-job-builder.ts38
-rw-r--r--server/lib/transcoding/shared/job-builders/index.ts2
-rw-r--r--server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts308
-rw-r--r--server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts189
-rw-r--r--server/lib/transcoding/transcoding-quick-transcode.ts61
-rw-r--r--server/lib/transcoding/transcoding-resolutions.ts52
-rw-r--r--server/lib/transcoding/transcoding.ts465
-rw-r--r--server/lib/transcoding/web-transcoding.ts273
-rw-r--r--server/lib/uploadx.ts5
-rw-r--r--server/lib/video-blacklist.ts2
-rw-r--r--server/lib/video-file.ts54
-rw-r--r--server/lib/video-studio.ts2
-rw-r--r--server/lib/video.ts63
54 files changed, 2902 insertions, 1014 deletions
diff --git a/server/lib/hls.ts b/server/lib/hls.ts
index 053b5d326..fc1d7e1b0 100644
--- a/server/lib/hls.ts
+++ b/server/lib/hls.ts
@@ -3,10 +3,11 @@ import { flatten } from 'lodash'
3import PQueue from 'p-queue' 3import PQueue from 'p-queue'
4import { basename, dirname, join } from 'path' 4import { basename, dirname, join } from 'path'
5import { MStreamingPlaylist, MStreamingPlaylistFilesVideo, MVideo } from '@server/types/models' 5import { MStreamingPlaylist, MStreamingPlaylistFilesVideo, MVideo } from '@server/types/models'
6import { uniqify } from '@shared/core-utils' 6import { uniqify, uuidRegex } from '@shared/core-utils'
7import { sha256 } from '@shared/extra-utils' 7import { sha256 } from '@shared/extra-utils'
8import { getVideoStreamDimensionsInfo } from '@shared/ffmpeg'
8import { VideoStorage } from '@shared/models' 9import { VideoStorage } from '@shared/models'
9import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamDimensionsInfo } from '../helpers/ffmpeg' 10import { getAudioStreamCodec, getVideoStreamCodec } from '../helpers/ffmpeg'
10import { logger } from '../helpers/logger' 11import { logger } from '../helpers/logger'
11import { doRequest, doRequestAndSaveToFile } from '../helpers/requests' 12import { doRequest, doRequestAndSaveToFile } from '../helpers/requests'
12import { generateRandomString } from '../helpers/utils' 13import { generateRandomString } from '../helpers/utils'
@@ -234,6 +235,16 @@ function downloadPlaylistSegments (playlistUrl: string, destinationDir: string,
234 235
235// --------------------------------------------------------------------------- 236// ---------------------------------------------------------------------------
236 237
238async function renameVideoFileInPlaylist (playlistPath: string, newVideoFilename: string) {
239 const content = await readFile(playlistPath, 'utf8')
240
241 const newContent = content.replace(new RegExp(`${uuidRegex}-\\d+-fragmented.mp4`, 'g'), newVideoFilename)
242
243 await writeFile(playlistPath, newContent, 'utf8')
244}
245
246// ---------------------------------------------------------------------------
247
237function injectQueryToPlaylistUrls (content: string, queryString: string) { 248function injectQueryToPlaylistUrls (content: string, queryString: string) {
238 return content.replace(/\.(m3u8|ts|mp4)/gm, '.$1?' + queryString) 249 return content.replace(/\.(m3u8|ts|mp4)/gm, '.$1?' + queryString)
239} 250}
@@ -247,7 +258,8 @@ export {
247 downloadPlaylistSegments, 258 downloadPlaylistSegments,
248 updateStreamingPlaylistsInfohashesIfNeeded, 259 updateStreamingPlaylistsInfohashesIfNeeded,
249 updatePlaylistAfterFileChange, 260 updatePlaylistAfterFileChange,
250 injectQueryToPlaylistUrls 261 injectQueryToPlaylistUrls,
262 renameVideoFileInPlaylist
251} 263}
252 264
253// --------------------------------------------------------------------------- 265// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts
new file mode 100644
index 000000000..8b4a877d7
--- /dev/null
+++ b/server/lib/job-queue/handlers/transcoding-job-builder.ts
@@ -0,0 +1,47 @@
1import { Job } from 'bullmq'
2import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
3import { UserModel } from '@server/models/user/user'
4import { VideoModel } from '@server/models/video/video'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { pick } from '@shared/core-utils'
7import { TranscodingJobBuilderPayload } from '@shared/models'
8import { logger } from '../../../helpers/logger'
9import { JobQueue } from '../job-queue'
10
11async function processTranscodingJobBuilder (job: Job) {
12 const payload = job.data as TranscodingJobBuilderPayload
13
14 logger.info('Processing transcoding job builder in job %s.', job.id)
15
16 if (payload.optimizeJob) {
17 const video = await VideoModel.loadFull(payload.videoUUID)
18 const user = await UserModel.loadByVideoId(video.id)
19 const videoFile = video.getMaxQualityFile()
20
21 await createOptimizeOrMergeAudioJobs({
22 ...pick(payload.optimizeJob, [ 'isNewVideo' ]),
23
24 video,
25 videoFile,
26 user
27 })
28 }
29
30 for (const job of (payload.jobs || [])) {
31 await JobQueue.Instance.createJob(job)
32
33 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
34 }
35
36 for (const sequentialJobs of (payload.sequentialJobs || [])) {
37 await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs)
38
39 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.length)
40 }
41}
42
43// ---------------------------------------------------------------------------
44
45export {
46 processTranscodingJobBuilder
47}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index d950f6407..9a4550e4d 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -10,8 +10,8 @@ import { VideoModel } from '@server/models/video/video'
10import { VideoFileModel } from '@server/models/video/video-file' 10import { VideoFileModel } from '@server/models/video/video-file'
11import { MVideoFullLight } from '@server/types/models' 11import { MVideoFullLight } from '@server/types/models'
12import { getLowercaseExtension } from '@shared/core-utils' 12import { getLowercaseExtension } from '@shared/core-utils'
13import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg'
13import { VideoFileImportPayload, VideoStorage } from '@shared/models' 14import { VideoFileImportPayload, VideoStorage } from '@shared/models'
14import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
15import { logger } from '../../../helpers/logger' 15import { logger } from '../../../helpers/logger'
16import { JobQueue } from '../job-queue' 16import { JobQueue } from '../job-queue'
17 17
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4d361c7b9..2a063282c 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -7,15 +7,16 @@ import { isPostImportVideoAccepted } from '@server/lib/moderation'
7import { generateWebTorrentVideoFilename } from '@server/lib/paths' 7import { generateWebTorrentVideoFilename } from '@server/lib/paths'
8import { Hooks } from '@server/lib/plugins/hooks' 8import { Hooks } from '@server/lib/plugins/hooks'
9import { ServerConfigManager } from '@server/lib/server-config-manager' 9import { ServerConfigManager } from '@server/lib/server-config-manager'
10import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
10import { isAbleToUploadVideo } from '@server/lib/user' 11import { isAbleToUploadVideo } from '@server/lib/user'
11import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' 12import { buildMoveToObjectStorageJob } from '@server/lib/video'
12import { VideoPathManager } from '@server/lib/video-path-manager' 13import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 14import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 15import { ThumbnailModel } from '@server/models/video/thumbnail'
15import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' 16import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
16import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 17import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
17import { getLowercaseExtension } from '@shared/core-utils' 18import { getLowercaseExtension } from '@shared/core-utils'
18import { isAudioFile } from '@shared/extra-utils' 19import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg'
19import { 20import {
20 ThumbnailType, 21 ThumbnailType,
21 VideoImportPayload, 22 VideoImportPayload,
@@ -28,7 +29,6 @@ import {
28 VideoResolution, 29 VideoResolution,
29 VideoState 30 VideoState
30} from '@shared/models' 31} from '@shared/models'
31import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '../../../helpers/ffmpeg'
32import { logger } from '../../../helpers/logger' 32import { logger } from '../../../helpers/logger'
33import { getSecureTorrentName } from '../../../helpers/utils' 33import { getSecureTorrentName } from '../../../helpers/utils'
34import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' 34import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent'
@@ -137,7 +137,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
137 137
138 const { resolution } = await isAudioFile(tempVideoPath, probe) 138 const { resolution } = await isAudioFile(tempVideoPath, probe)
139 ? { resolution: VideoResolution.H_NOVIDEO } 139 ? { resolution: VideoResolution.H_NOVIDEO }
140 : await getVideoStreamDimensionsInfo(tempVideoPath) 140 : await getVideoStreamDimensionsInfo(tempVideoPath, probe)
141 141
142 const fps = await getVideoStreamFPS(tempVideoPath, probe) 142 const fps = await getVideoStreamFPS(tempVideoPath, probe)
143 const duration = await getVideoStreamDuration(tempVideoPath, probe) 143 const duration = await getVideoStreamDuration(tempVideoPath, probe)
@@ -313,9 +313,7 @@ async function afterImportSuccess (options: {
313 } 313 }
314 314
315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? 315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs?
316 await JobQueue.Instance.createJob( 316 await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user })
317 await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
318 )
319 } 317 }
320} 318}
321 319
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 2f3a971bd..1bf43f592 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -1,25 +1,25 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { readdir, remove } from 'fs-extra' 2import { readdir, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 4import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' 6import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' 7import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
9import { generateVideoMiniature } from '@server/lib/thumbnail' 8import { generateVideoMiniature } from '@server/lib/thumbnail'
10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' 9import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding'
10import { VideoPathManager } from '@server/lib/video-path-manager'
11import { moveToNextState } from '@server/lib/video-state' 11import { moveToNextState } from '@server/lib/video-state'
12import { VideoModel } from '@server/models/video/video' 12import { VideoModel } from '@server/models/video/video'
13import { VideoBlacklistModel } from '@server/models/video/video-blacklist' 13import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
14import { VideoFileModel } from '@server/models/video/video-file' 14import { VideoFileModel } from '@server/models/video/video-file'
15import { VideoLiveModel } from '@server/models/video/video-live' 15import { VideoLiveModel } from '@server/models/video/video-live'
16import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
16import { VideoLiveSessionModel } from '@server/models/video/video-live-session' 17import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
17import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 18import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
18import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' 19import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
20import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg'
19import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' 21import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
20import { logger, loggerTagsFactory } from '../../../helpers/logger' 22import { logger, loggerTagsFactory } from '../../../helpers/logger'
21import { VideoPathManager } from '@server/lib/video-path-manager'
22import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
23 23
24const lTags = loggerTagsFactory('live', 'job') 24const lTags = loggerTagsFactory('live', 'job')
25 25
@@ -224,6 +224,7 @@ async function assignReplayFilesToVideo (options: {
224 const probe = await ffprobePromise(concatenatedTsFilePath) 224 const probe = await ffprobePromise(concatenatedTsFilePath)
225 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) 225 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe)
226 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) 226 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
227 const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe)
227 228
228 try { 229 try {
229 await generateHlsPlaylistResolutionFromTS({ 230 await generateHlsPlaylistResolutionFromTS({
@@ -231,6 +232,7 @@ async function assignReplayFilesToVideo (options: {
231 inputFileMutexReleaser, 232 inputFileMutexReleaser,
232 concatenatedTsFilePath, 233 concatenatedTsFilePath,
233 resolution, 234 resolution,
235 fps,
234 isAAC: audioStream?.codec_name === 'aac' 236 isAAC: audioStream?.codec_name === 'aac'
235 }) 237 })
236 } catch (err) { 238 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 3e208d83d..991d11ef1 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -1,15 +1,16 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { move, remove } from 'fs-extra' 2import { move, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' 4import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
5import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' 5import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config' 6import { CONFIG } from '@server/initializers/config'
7import { VIDEO_FILTERS } from '@server/initializers/constants'
7import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 8import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { generateWebTorrentVideoFilename } from '@server/lib/paths' 9import { generateWebTorrentVideoFilename } from '@server/lib/paths'
10import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' 11import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
10import { isAbleToUploadVideo } from '@server/lib/user' 12import { isAbleToUploadVideo } from '@server/lib/user'
11import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' 13import { buildFileMetadata, removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
12import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
13import { VideoPathManager } from '@server/lib/video-path-manager' 14import { VideoPathManager } from '@server/lib/video-path-manager'
14import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' 15import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
15import { UserModel } from '@server/models/user/user' 16import { UserModel } from '@server/models/user/user'
@@ -17,15 +18,8 @@ import { VideoModel } from '@server/models/video/video'
17import { VideoFileModel } from '@server/models/video/video-file' 18import { VideoFileModel } from '@server/models/video/video-file'
18import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' 19import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models'
19import { getLowercaseExtension, pick } from '@shared/core-utils' 20import { getLowercaseExtension, pick } from '@shared/core-utils'
20import { 21import { buildUUID, getFileSize } from '@shared/extra-utils'
21 buildFileMetadata, 22import { FFmpegEdition, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg'
22 buildUUID,
23 ffprobePromise,
24 getFileSize,
25 getVideoStreamDimensionsInfo,
26 getVideoStreamDuration,
27 getVideoStreamFPS
28} from '@shared/extra-utils'
29import { 23import {
30 VideoStudioEditionPayload, 24 VideoStudioEditionPayload,
31 VideoStudioTask, 25 VideoStudioTask,
@@ -36,7 +30,6 @@ import {
36 VideoStudioTaskWatermarkPayload 30 VideoStudioTaskWatermarkPayload
37} from '@shared/models' 31} from '@shared/models'
38import { logger, loggerTagsFactory } from '../../../helpers/logger' 32import { logger, loggerTagsFactory } from '../../../helpers/logger'
39import { JobQueue } from '../job-queue'
40 33
41const lTagsBase = loggerTagsFactory('video-edition') 34const lTagsBase = loggerTagsFactory('video-edition')
42 35
@@ -102,9 +95,7 @@ async function processVideoStudioEdition (job: Job) {
102 95
103 const user = await UserModel.loadByVideoId(video.id) 96 const user = await UserModel.loadByVideoId(video.id)
104 97
105 await JobQueue.Instance.createJob( 98 await createOptimizeOrMergeAudioJobs({ video, videoFile: newFile, isNewVideo: false, user })
106 await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
107 )
108} 99}
109 100
110// --------------------------------------------------------------------------- 101// ---------------------------------------------------------------------------
@@ -131,9 +122,9 @@ const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessor
131} 122}
132 123
133async function processTask (options: TaskProcessorOptions) { 124async function processTask (options: TaskProcessorOptions) {
134 const { video, task } = options 125 const { video, task, lTags } = options
135 126
136 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) 127 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags })
137 128
138 const processor = taskProcessors[options.task.name] 129 const processor = taskProcessors[options.task.name]
139 if (!process) throw new Error('Unknown task ' + task.name) 130 if (!process) throw new Error('Unknown task ' + task.name)
@@ -142,48 +133,53 @@ async function processTask (options: TaskProcessorOptions) {
142} 133}
143 134
144function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { 135function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
145 const { task } = options 136 const { task, lTags } = options
137
138 logger.debug('Will add intro/outro to the video.', { options, ...lTags })
146 139
147 return addIntroOutro({ 140 return buildFFmpegEdition().addIntroOutro({
148 ...pick(options, [ 'inputPath', 'outputPath' ]), 141 ...pick(options, [ 'inputPath', 'outputPath' ]),
149 142
150 introOutroPath: task.options.file, 143 introOutroPath: task.options.file,
151 type: task.name === 'add-intro' 144 type: task.name === 'add-intro'
152 ? 'intro' 145 ? 'intro'
153 : 'outro', 146 : 'outro'
154
155 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
156 profile: CONFIG.TRANSCODING.PROFILE
157 }) 147 })
158} 148}
159 149
160function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { 150function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
161 const { task } = options 151 const { task, lTags } = options
162 152
163 return cutVideo({ 153 logger.debug('Will cut the video.', { options, ...lTags })
154
155 return buildFFmpegEdition().cutVideo({
164 ...pick(options, [ 'inputPath', 'outputPath' ]), 156 ...pick(options, [ 'inputPath', 'outputPath' ]),
165 157
166 start: task.options.start, 158 start: task.options.start,
167 end: task.options.end, 159 end: task.options.end
168
169 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
170 profile: CONFIG.TRANSCODING.PROFILE
171 }) 160 })
172} 161}
173 162
174function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { 163function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
175 const { task } = options 164 const { task, lTags } = options
165
166 logger.debug('Will add watermark to the video.', { options, ...lTags })
176 167
177 return addWatermark({ 168 return buildFFmpegEdition().addWatermark({
178 ...pick(options, [ 'inputPath', 'outputPath' ]), 169 ...pick(options, [ 'inputPath', 'outputPath' ]),
179 170
180 watermarkPath: task.options.file, 171 watermarkPath: task.options.file,
181 172
182 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 173 videoFilters: {
183 profile: CONFIG.TRANSCODING.PROFILE 174 watermarkSizeRatio: VIDEO_FILTERS.WATERMARK.SIZE_RATIO,
175 horitonzalMarginRatio: VIDEO_FILTERS.WATERMARK.HORIZONTAL_MARGIN_RATIO,
176 verticalMarginRatio: VIDEO_FILTERS.WATERMARK.VERTICAL_MARGIN_RATIO
177 }
184 }) 178 })
185} 179}
186 180
181// ---------------------------------------------------------------------------
182
187async function buildNewFile (video: MVideoId, path: string) { 183async function buildNewFile (video: MVideoId, path: string) {
188 const videoFile = new VideoFileModel({ 184 const videoFile = new VideoFileModel({
189 extname: getLowercaseExtension(path), 185 extname: getLowercaseExtension(path),
@@ -223,3 +219,7 @@ async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStud
223 throw new Error('Quota exceeded for this user to edit the video') 219 throw new Error('Quota exceeded for this user to edit the video')
224 } 220 }
225} 221}
222
223function buildFFmpegEdition () {
224 return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
225}
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 3e6d23363..17b717275 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,13 +1,13 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding'
4import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding'
5import { removeAllWebTorrentFiles } from '@server/lib/video-file'
5import { VideoPathManager } from '@server/lib/video-path-manager' 6import { VideoPathManager } from '@server/lib/video-path-manager'
6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' 7import { moveToFailedTranscodingState } from '@server/lib/video-state'
7import { UserModel } from '@server/models/user/user' 8import { UserModel } from '@server/models/user/user'
8import { VideoJobInfoModel } from '@server/models/video/video-job-info' 9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
9import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' 10import { MUser, MUserId, MVideoFullLight } from '@server/types/models'
10import { pick } from '@shared/core-utils'
11import { 11import {
12 HLSTranscodingPayload, 12 HLSTranscodingPayload,
13 MergeAudioTranscodingPayload, 13 MergeAudioTranscodingPayload,
@@ -15,18 +15,8 @@ import {
15 OptimizeTranscodingPayload, 15 OptimizeTranscodingPayload,
16 VideoTranscodingPayload 16 VideoTranscodingPayload
17} from '@shared/models' 17} from '@shared/models'
18import { retryTransactionWrapper } from '../../../helpers/database-utils'
19import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg'
20import { logger, loggerTagsFactory } from '../../../helpers/logger' 18import { logger, loggerTagsFactory } from '../../../helpers/logger'
21import { CONFIG } from '../../../initializers/config'
22import { VideoModel } from '../../../models/video/video' 19import { VideoModel } from '../../../models/video/video'
23import {
24 generateHlsPlaylistResolution,
25 mergeAudioVideofile,
26 optimizeOriginalVideofile,
27 transcodeNewWebTorrentResolution
28} from '../../transcoding/transcoding'
29import { JobQueue } from '../job-queue'
30 20
31type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> 21type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
32 22
@@ -84,260 +74,72 @@ export {
84// Job handlers 74// Job handlers
85// --------------------------------------------------------------------------- 75// ---------------------------------------------------------------------------
86 76
87async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
88 logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
89
90 const videoFileInput = payload.copyCodecs
91 ? video.getWebTorrentFile(payload.resolution)
92 : video.getMaxQualityFile()
93
94 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
95
96 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
97
98 try {
99 await videoFileInput.getVideo().reload()
100
101 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
102 return generateHlsPlaylistResolution({
103 video,
104 videoInputPath,
105 inputFileMutexReleaser,
106 resolution: payload.resolution,
107 copyCodecs: payload.copyCodecs,
108 job
109 })
110 })
111 } finally {
112 inputFileMutexReleaser()
113 }
114
115 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
116
117 await onHlsPlaylistGeneration(video, user, payload)
118}
119
120async function handleNewWebTorrentResolutionJob (
121 job: Job,
122 payload: NewWebTorrentResolutionTranscodingPayload,
123 video: MVideoFullLight,
124 user: MUserId
125) {
126 logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
127
128 await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job })
129
130 logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
131
132 await onNewWebTorrentFileResolution(video, user, payload)
133}
134
135async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { 77async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
136 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) 78 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid))
137 79
138 await mergeAudioVideofile({ video, resolution: payload.resolution, job }) 80 await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job })
139 81
140 logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) 82 logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid))
141 83
142 await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) 84 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
143} 85}
144 86
145async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { 87async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
146 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) 88 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid))
147 89
148 const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) 90 await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job })
149 91
150 logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) 92 logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid))
151 93
152 await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) 94 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
153} 95}
154 96
155// --------------------------------------------------------------------------- 97async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) {
156 98 logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
157async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) {
158 if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
159 // Remove webtorrent files if not enabled
160 for (const file of video.VideoFiles) {
161 await video.removeWebTorrentFile(file)
162 await file.destroy()
163 }
164
165 video.VideoFiles = []
166
167 // Create HLS new resolution jobs
168 await createLowerResolutionsJobs({
169 video,
170 user,
171 videoFileResolution: payload.resolution,
172 hasAudio: payload.hasAudio,
173 isNewVideo: payload.isNewVideo ?? true,
174 type: 'hls'
175 })
176 }
177
178 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
179 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo })
180}
181 99
182async function onVideoFirstWebTorrentTranscoding ( 100 await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job })
183 videoArg: MVideoWithFile,
184 payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload,
185 transcodeType: TranscodeVODOptionsType,
186 user: MUserId
187) {
188 const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid)
189 101
190 try { 102 logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
191 // Maybe the video changed in database, refresh it
192 const videoDatabase = await VideoModel.loadFull(videoArg.uuid)
193 // Video does not exist anymore
194 if (!videoDatabase) return undefined
195
196 const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile()
197
198 // Generate HLS version of the original file
199 const originalFileHLSPayload = {
200 ...payload,
201
202 hasAudio: !!audioStream,
203 resolution: videoDatabase.getMaxQualityFile().resolution,
204 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
205 copyCodecs: transcodeType !== 'quick-transcode',
206 isMaxQuality: true
207 }
208 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
209 const hasNewResolutions = await createLowerResolutionsJobs({
210 video: videoDatabase,
211 user,
212 videoFileResolution: resolution,
213 hasAudio: !!audioStream,
214 type: 'webtorrent',
215 isNewVideo: payload.isNewVideo ?? true
216 })
217
218 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
219 103
220 // Move to next state if there are no other resolutions to generate 104 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
221 if (!hasHls && !hasNewResolutions) {
222 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo })
223 }
224 } finally {
225 mutexReleaser()
226 }
227} 105}
228 106
229async function onNewWebTorrentFileResolution ( 107async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) {
230 video: MVideo, 108 logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
231 user: MUserId,
232 payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload
233) {
234 if (payload.createHLSIfNeeded) {
235 await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload })
236 }
237
238 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
239 109
240 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) 110 const videoFileInput = payload.copyCodecs
241} 111 ? video.getWebTorrentFile(payload.resolution)
112 : video.getMaxQualityFile()
242 113
243// --------------------------------------------------------------------------- 114 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
244 115
245async function createHlsJobIfEnabled (user: MUserId, payload: { 116 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
246 videoUUID: string
247 resolution: number
248 hasAudio: boolean
249 copyCodecs: boolean
250 isMaxQuality: boolean
251 isNewVideo?: boolean
252}) {
253 if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
254
255 const jobOptions = {
256 priority: await getTranscodingJobPriority(user)
257 }
258 117
259 const hlsTranscodingPayload: HLSTranscodingPayload = { 118 try {
260 type: 'new-resolution-to-hls', 119 await videoFileInput.getVideo().reload()
261 autoDeleteWebTorrentIfNeeded: true,
262 120
263 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) 121 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
122 return generateHlsPlaylistResolution({
123 video,
124 videoInputPath,
125 inputFileMutexReleaser,
126 resolution: payload.resolution,
127 fps: payload.fps,
128 copyCodecs: payload.copyCodecs,
129 job
130 })
131 })
132 } finally {
133 inputFileMutexReleaser()
264 } 134 }
265 135
266 await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) 136 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
267
268 return true
269}
270
271async function createLowerResolutionsJobs (options: {
272 video: MVideoFullLight
273 user: MUserId
274 videoFileResolution: number
275 hasAudio: boolean
276 isNewVideo: boolean
277 type: 'hls' | 'webtorrent'
278}) {
279 const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options
280
281 // Create transcoding jobs if there are enabled resolutions
282 const resolutionsEnabled = await Hooks.wrapObject(
283 computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }),
284 'filter:transcoding.auto.resolutions-to-transcode.result',
285 options
286 )
287
288 const resolutionCreated: string[] = []
289
290 for (const resolution of resolutionsEnabled) {
291 let dataInput: VideoTranscodingPayload
292
293 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
294 // WebTorrent will create subsequent HLS job
295 dataInput = {
296 type: 'new-resolution-to-webtorrent',
297 videoUUID: video.uuid,
298 resolution,
299 hasAudio,
300 createHLSIfNeeded: true,
301 isNewVideo
302 }
303
304 resolutionCreated.push('webtorrent-' + resolution)
305 }
306
307 if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') {
308 dataInput = {
309 type: 'new-resolution-to-hls',
310 videoUUID: video.uuid,
311 resolution,
312 hasAudio,
313 copyCodecs: false,
314 isMaxQuality: false,
315 autoDeleteWebTorrentIfNeeded: true,
316 isNewVideo
317 }
318
319 resolutionCreated.push('hls-' + resolution)
320 }
321
322 if (!dataInput) continue
323
324 const jobOptions = {
325 priority: await getTranscodingJobPriority(user)
326 }
327
328 await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
329 }
330 137
331 if (resolutionCreated.length === 0) { 138 if (payload.deleteWebTorrentFiles === true) {
332 logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) 139 logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid))
333 140
334 return false 141 await removeAllWebTorrentFiles(video)
335 } 142 }
336 143
337 logger.info( 144 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
338 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution,
339 { resolutionCreated, ...lTags(video.uuid) }
340 )
341
342 return true
343} 145}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index cc6be0bd8..21bf0f226 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -31,6 +31,7 @@ import {
31 MoveObjectStoragePayload, 31 MoveObjectStoragePayload,
32 NotifyPayload, 32 NotifyPayload,
33 RefreshPayload, 33 RefreshPayload,
34 TranscodingJobBuilderPayload,
34 VideoChannelImportPayload, 35 VideoChannelImportPayload,
35 VideoFileImportPayload, 36 VideoFileImportPayload,
36 VideoImportPayload, 37 VideoImportPayload,
@@ -56,6 +57,7 @@ import { processFederateVideo } from './handlers/federate-video'
56import { processManageVideoTorrent } from './handlers/manage-video-torrent' 57import { processManageVideoTorrent } from './handlers/manage-video-torrent'
57import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' 58import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
58import { processNotify } from './handlers/notify' 59import { processNotify } from './handlers/notify'
60import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder'
59import { processVideoChannelImport } from './handlers/video-channel-import' 61import { processVideoChannelImport } from './handlers/video-channel-import'
60import { processVideoFileImport } from './handlers/video-file-import' 62import { processVideoFileImport } from './handlers/video-file-import'
61import { processVideoImport } from './handlers/video-import' 63import { processVideoImport } from './handlers/video-import'
@@ -69,11 +71,12 @@ export type CreateJobArgument =
69 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | 71 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
70 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 72 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
71 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | 73 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
72 { type: 'activitypub-http-cleaner', payload: {} } | 74 { type: 'activitypub-cleaner', payload: {} } |
73 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | 75 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
74 { type: 'video-file-import', payload: VideoFileImportPayload } | 76 { type: 'video-file-import', payload: VideoFileImportPayload } |
75 { type: 'video-transcoding', payload: VideoTranscodingPayload } | 77 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
76 { type: 'email', payload: EmailPayload } | 78 { type: 'email', payload: EmailPayload } |
79 { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
77 { type: 'video-import', payload: VideoImportPayload } | 80 { type: 'video-import', payload: VideoImportPayload } |
78 { type: 'activitypub-refresher', payload: RefreshPayload } | 81 { type: 'activitypub-refresher', payload: RefreshPayload } |
79 { type: 'videos-views-stats', payload: {} } | 82 { type: 'videos-views-stats', payload: {} } |
@@ -96,28 +99,29 @@ export type CreateJobOptions = {
96} 99}
97 100
98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { 101const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
99 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
100 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
103 'activitypub-cleaner': processActivityPubCleaner, 102 'activitypub-cleaner': processActivityPubCleaner,
104 'activitypub-follow': processActivityPubFollow, 103 'activitypub-follow': processActivityPubFollow,
105 'video-file-import': processVideoFileImport, 104 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
106 'video-transcoding': processVideoTranscoding, 105 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
106 'activitypub-http-fetcher': processActivityPubHttpFetcher,
107 'activitypub-http-unicast': processActivityPubHttpUnicast,
108 'activitypub-refresher': refreshAPObject,
109 'actor-keys': processActorKeys,
110 'after-video-channel-import': processAfterVideoChannelImport,
107 'email': processEmail, 111 'email': processEmail,
112 'federate-video': processFederateVideo,
113 'transcoding-job-builder': processTranscodingJobBuilder,
114 'manage-video-torrent': processManageVideoTorrent,
115 'move-to-object-storage': processMoveToObjectStorage,
116 'notify': processNotify,
117 'video-channel-import': processVideoChannelImport,
118 'video-file-import': processVideoFileImport,
108 'video-import': processVideoImport, 119 'video-import': processVideoImport,
109 'videos-views-stats': processVideosViewsStats,
110 'activitypub-refresher': refreshAPObject,
111 'video-live-ending': processVideoLiveEnding, 120 'video-live-ending': processVideoLiveEnding,
112 'actor-keys': processActorKeys,
113 'video-redundancy': processVideoRedundancy, 121 'video-redundancy': processVideoRedundancy,
114 'move-to-object-storage': processMoveToObjectStorage,
115 'manage-video-torrent': processManageVideoTorrent,
116 'video-studio-edition': processVideoStudioEdition, 122 'video-studio-edition': processVideoStudioEdition,
117 'video-channel-import': processVideoChannelImport, 123 'video-transcoding': processVideoTranscoding,
118 'after-video-channel-import': processAfterVideoChannelImport, 124 'videos-views-stats': processVideosViewsStats
119 'notify': processNotify,
120 'federate-video': processFederateVideo
121} 125}
122 126
123const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { 127const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -125,28 +129,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> }
125} 129}
126 130
127const jobTypes: JobType[] = [ 131const jobTypes: JobType[] = [
132 'activitypub-cleaner',
128 'activitypub-follow', 133 'activitypub-follow',
129 'activitypub-http-broadcast',
130 'activitypub-http-broadcast-parallel', 134 'activitypub-http-broadcast-parallel',
135 'activitypub-http-broadcast',
131 'activitypub-http-fetcher', 136 'activitypub-http-fetcher',
132 'activitypub-http-unicast', 137 'activitypub-http-unicast',
133 'activitypub-cleaner', 138 'activitypub-refresher',
139 'actor-keys',
140 'after-video-channel-import',
134 'email', 141 'email',
135 'video-transcoding', 142 'federate-video',
143 'transcoding-job-builder',
144 'manage-video-torrent',
145 'move-to-object-storage',
146 'notify',
147 'video-channel-import',
136 'video-file-import', 148 'video-file-import',
137 'video-import', 149 'video-import',
138 'videos-views-stats',
139 'activitypub-refresher',
140 'video-redundancy',
141 'actor-keys',
142 'video-live-ending', 150 'video-live-ending',
143 'move-to-object-storage', 151 'video-redundancy',
144 'manage-video-torrent',
145 'video-studio-edition', 152 'video-studio-edition',
146 'video-channel-import', 153 'video-transcoding',
147 'after-video-channel-import', 154 'videos-views-stats'
148 'notify',
149 'federate-video'
150] 155]
151 156
152const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) 157const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 05274955d..aa32a9d52 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra'
2import { createServer, Server } from 'net' 2import { createServer, Server } from 'net'
3import { join } from 'path' 3import { join } from 'path'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import {
6 computeResolutionsToTranscode,
7 ffprobePromise,
8 getLiveSegmentTime,
9 getVideoStreamBitrate,
10 getVideoStreamDimensionsInfo,
11 getVideoStreamFPS,
12 hasAudioStream
13} from '@server/helpers/ffmpeg'
14import { logger, loggerTagsFactory } from '@server/helpers/logger' 5import { logger, loggerTagsFactory } from '@server/helpers/logger'
15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 6import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
16import { VIDEO_LIVE } from '@server/initializers/constants' 7import { VIDEO_LIVE } from '@server/initializers/constants'
8import { sequelizeTypescript } from '@server/initializers/database'
17import { UserModel } from '@server/models/user/user' 9import { UserModel } from '@server/models/user/user'
18import { VideoModel } from '@server/models/video/video' 10import { VideoModel } from '@server/models/video/video'
19import { VideoLiveModel } from '@server/models/video/video-live' 11import { VideoLiveModel } from '@server/models/video/video-live'
12import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
20import { VideoLiveSessionModel } from '@server/models/video/video-live-session' 13import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
22import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' 15import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models'
23import { pick, wait } from '@shared/core-utils' 16import { pick, wait } from '@shared/core-utils'
17import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg'
24import { LiveVideoError, VideoState } from '@shared/models' 18import { LiveVideoError, VideoState } from '@shared/models'
25import { federateVideoIfNeeded } from '../activitypub/videos' 19import { federateVideoIfNeeded } from '../activitypub/videos'
26import { JobQueue } from '../job-queue' 20import { JobQueue } from '../job-queue'
27import { getLiveReplayBaseDirectory } from '../paths' 21import { getLiveReplayBaseDirectory } from '../paths'
28import { PeerTubeSocket } from '../peertube-socket' 22import { PeerTubeSocket } from '../peertube-socket'
29import { Hooks } from '../plugins/hooks' 23import { Hooks } from '../plugins/hooks'
24import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions'
30import { LiveQuotaStore } from './live-quota-store' 25import { LiveQuotaStore } from './live-quota-store'
31import { cleanupAndDestroyPermanentLive } from './live-utils' 26import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils'
32import { MuxingSession } from './shared' 27import { MuxingSession } from './shared'
33import { sequelizeTypescript } from '@server/initializers/database' 28import { RunnerJobModel } from '@server/models/runner/runner-job'
34import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
35 29
36const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') 30const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
37const context = require('node-media-server/src/node_core_ctx') 31const context = require('node-media-server/src/node_core_ctx')
@@ -57,7 +51,7 @@ class LiveManager {
57 private static instance: LiveManager 51 private static instance: LiveManager
58 52
59 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
60 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<string, string>()
61 55
62 private rtmpServer: Server 56 private rtmpServer: Server
63 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -177,14 +171,19 @@ class LiveManager {
177 return !!this.rtmpServer 171 return !!this.rtmpServer
178 } 172 }
179 173
180 stopSessionOf (videoId: number, error: LiveVideoError | null) { 174 stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
181 const sessionId = this.videoSessions.get(videoId) 175 const sessionId = this.videoSessions.get(videoUUID)
182 if (!sessionId) return 176 if (!sessionId) {
177 logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
178 return
179 }
183 180
184 this.saveEndingSession(videoId, error) 181 logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
185 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
186 182
187 this.videoSessions.delete(videoId) 183 this.saveEndingSession(videoUUID, error)
184 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
185
186 this.videoSessions.delete(videoUUID)
188 this.abortSession(sessionId) 187 this.abortSession(sessionId)
189 } 188 }
190 189
@@ -221,6 +220,11 @@ class LiveManager {
221 return this.abortSession(sessionId) 220 return this.abortSession(sessionId)
222 } 221 }
223 222
223 if (this.videoSessions.has(video.uuid)) {
224 logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
225 return this.abortSession(sessionId)
226 }
227
224 // Cleanup old potential live (could happen with a permanent live) 228 // Cleanup old potential live (could happen with a permanent live)
225 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) 229 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
226 if (oldStreamingPlaylist) { 230 if (oldStreamingPlaylist) {
@@ -229,7 +233,7 @@ class LiveManager {
229 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) 233 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
230 } 234 }
231 235
232 this.videoSessions.set(video.id, sessionId) 236 this.videoSessions.set(video.uuid, sessionId)
233 237
234 const now = Date.now() 238 const now = Date.now()
235 const probe = await ffprobePromise(inputUrl) 239 const probe = await ffprobePromise(inputUrl)
@@ -253,7 +257,7 @@ class LiveManager {
253 ) 257 )
254 258
255 logger.info( 259 logger.info(
256 'Will mux/transcode live video of original resolution %d.', resolution, 260 'Handling live video of original resolution %d.', resolution,
257 { allResolutions, ...lTags(sessionId, video.uuid) } 261 { allResolutions, ...lTags(sessionId, video.uuid) }
258 ) 262 )
259 263
@@ -301,44 +305,44 @@ class LiveManager {
301 305
302 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) 306 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
303 307
304 muxingSession.on('bad-socket-health', ({ videoId }) => { 308 muxingSession.on('bad-socket-health', ({ videoUUID }) => {
305 logger.error( 309 logger.error(
306 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + 310 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
307 ' Stopping session of video %s.', videoUUID, 311 ' Stopping session of video %s.', videoUUID,
308 localLTags 312 localLTags
309 ) 313 )
310 314
311 this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) 315 this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH)
312 }) 316 })
313 317
314 muxingSession.on('duration-exceeded', ({ videoId }) => { 318 muxingSession.on('duration-exceeded', ({ videoUUID }) => {
315 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) 319 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
316 320
317 this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) 321 this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED)
318 }) 322 })
319 323
320 muxingSession.on('quota-exceeded', ({ videoId }) => { 324 muxingSession.on('quota-exceeded', ({ videoUUID }) => {
321 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) 325 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
322 326
323 this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) 327 this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED)
324 }) 328 })
325 329
326 muxingSession.on('ffmpeg-error', ({ videoId }) => { 330 muxingSession.on('transcoding-error', ({ videoUUID }) => {
327 this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) 331 this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR)
328 }) 332 })
329 333
330 muxingSession.on('ffmpeg-end', ({ videoId }) => { 334 muxingSession.on('transcoding-end', ({ videoUUID }) => {
331 this.onMuxingFFmpegEnd(videoId, sessionId) 335 this.onMuxingFFmpegEnd(videoUUID, sessionId)
332 }) 336 })
333 337
334 muxingSession.on('after-cleanup', ({ videoId }) => { 338 muxingSession.on('after-cleanup', ({ videoUUID }) => {
335 this.muxingSessions.delete(sessionId) 339 this.muxingSessions.delete(sessionId)
336 340
337 LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) 341 LiveQuotaStore.Instance.removeLive(user.id, videoLive.id)
338 342
339 muxingSession.destroy() 343 muxingSession.destroy()
340 344
341 return this.onAfterMuxingCleanup({ videoId, liveSession }) 345 return this.onAfterMuxingCleanup({ videoUUID, liveSession })
342 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 346 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
343 }) 347 })
344 348
@@ -379,22 +383,24 @@ class LiveManager {
379 } 383 }
380 } 384 }
381 385
382 private onMuxingFFmpegEnd (videoId: number, sessionId: string) { 386 private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
383 this.videoSessions.delete(videoId) 387 this.videoSessions.delete(videoUUID)
384 388
385 this.saveEndingSession(videoId, null) 389 this.saveEndingSession(videoUUID, null)
386 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) 390 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
387 } 391 }
388 392
389 private async onAfterMuxingCleanup (options: { 393 private async onAfterMuxingCleanup (options: {
390 videoId: number | string 394 videoUUID: string
391 liveSession?: MVideoLiveSession 395 liveSession?: MVideoLiveSession
392 cleanupNow?: boolean // Default false 396 cleanupNow?: boolean // Default false
393 }) { 397 }) {
394 const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options 398 const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
399
400 logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
395 401
396 try { 402 try {
397 const fullVideo = await VideoModel.loadFull(videoId) 403 const fullVideo = await VideoModel.loadFull(videoUUID)
398 if (!fullVideo) return 404 if (!fullVideo) return
399 405
400 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 406 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
@@ -437,15 +443,17 @@ class LiveManager {
437 443
438 await federateVideoIfNeeded(fullVideo, false) 444 await federateVideoIfNeeded(fullVideo, false)
439 } catch (err) { 445 } catch (err) {
440 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) 446 logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
441 } 447 }
442 } 448 }
443 449
444 private async handleBrokenLives () { 450 private async handleBrokenLives () {
451 await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' })
452
445 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 453 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
446 454
447 for (const uuid of videoUUIDs) { 455 for (const uuid of videoUUIDs) {
448 await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) 456 await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
449 } 457 }
450 } 458 }
451 459
@@ -494,8 +502,8 @@ class LiveManager {
494 }) 502 })
495 } 503 }
496 504
497 private async saveEndingSession (videoId: number, error: LiveVideoError | null) { 505 private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) {
498 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) 506 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
499 if (!liveSession) return 507 if (!liveSession) return
500 508
501 liveSession.endDate = new Date() 509 liveSession.endDate = new Date()
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts
index 4d03754a9..251301141 100644
--- a/server/lib/live/live-segment-sha-store.ts
+++ b/server/lib/live/live-segment-sha-store.ts
@@ -52,7 +52,10 @@ class LiveSegmentShaStore {
52 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) 52 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
53 53
54 if (!this.segmentsSha256.has(segmentName)) { 54 if (!this.segmentsSha256.has(segmentName)) {
55 logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID)) 55 logger.warn(
56 'Unknown segment in live segment hash store for video %s and segment %s.',
57 this.videoUUID, segmentPath, lTags(this.videoUUID)
58 )
56 return 59 return
57 } 60 }
58 61
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index c0dec9829..3fb3ce1ce 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -1,8 +1,9 @@
1import { pathExists, readdir, remove } from 'fs-extra' 1import { pathExists, readdir, remove } from 'fs-extra'
2import { basename, join } from 'path' 2import { basename, join } from 'path'
3import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
4import { VIDEO_LIVE } from '@server/initializers/constants'
4import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' 5import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
5import { VideoStorage } from '@shared/models' 6import { LiveVideoLatencyMode, VideoStorage } from '@shared/models'
6import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage' 7import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage'
7import { getLiveDirectory } from '../paths' 8import { getLiveDirectory } from '../paths'
8 9
@@ -37,10 +38,19 @@ async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreaming
37 await cleanupTMPLiveFilesFromFilesystem(video) 38 await cleanupTMPLiveFilesFromFilesystem(video)
38} 39}
39 40
41function getLiveSegmentTime (latencyMode: LiveVideoLatencyMode) {
42 if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) {
43 return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY
44 }
45
46 return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY
47}
48
40export { 49export {
41 cleanupAndDestroyPermanentLive, 50 cleanupAndDestroyPermanentLive,
42 cleanupUnsavedNormalLive, 51 cleanupUnsavedNormalLive,
43 cleanupTMPLiveFiles, 52 cleanupTMPLiveFiles,
53 getLiveSegmentTime,
44 buildConcatenatedName 54 buildConcatenatedName
45} 55}
46 56
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 2727fc4a7..f3f8fc886 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -1,11 +1,10 @@
1import { mapSeries } from 'bluebird' 1import { mapSeries } from 'bluebird'
2import { FSWatcher, watch } from 'chokidar' 2import { FSWatcher, watch } from 'chokidar'
3import { FfmpegCommand } from 'fluent-ffmpeg' 3import { EventEmitter } from 'events'
4import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 4import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
5import PQueue from 'p-queue' 5import PQueue from 'p-queue'
6import { basename, join } from 'path' 6import { basename, join } from 'path'
7import { EventEmitter } from 'stream' 7import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 9import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
@@ -20,24 +19,24 @@ import {
20 getLiveDirectory, 19 getLiveDirectory,
21 getLiveReplayBaseDirectory 20 getLiveReplayBaseDirectory
22} from '../../paths' 21} from '../../paths'
23import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
24import { isAbleToUploadVideo } from '../../user' 22import { isAbleToUploadVideo } from '../../user'
25import { LiveQuotaStore } from '../live-quota-store' 23import { LiveQuotaStore } from '../live-quota-store'
26import { LiveSegmentShaStore } from '../live-segment-sha-store' 24import { LiveSegmentShaStore } from '../live-segment-sha-store'
27import { buildConcatenatedName } from '../live-utils' 25import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils'
26import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper'
28 27
29import memoizee = require('memoizee') 28import memoizee = require('memoizee')
30interface MuxingSessionEvents { 29interface MuxingSessionEvents {
31 'live-ready': (options: { videoId: number }) => void 30 'live-ready': (options: { videoUUID: string }) => void
32 31
33 'bad-socket-health': (options: { videoId: number }) => void 32 'bad-socket-health': (options: { videoUUID: string }) => void
34 'duration-exceeded': (options: { videoId: number }) => void 33 'duration-exceeded': (options: { videoUUID: string }) => void
35 'quota-exceeded': (options: { videoId: number }) => void 34 'quota-exceeded': (options: { videoUUID: string }) => void
36 35
37 'ffmpeg-end': (options: { videoId: number }) => void 36 'transcoding-end': (options: { videoUUID: string }) => void
38 'ffmpeg-error': (options: { videoId: number }) => void 37 'transcoding-error': (options: { videoUUID: string }) => void
39 38
40 'after-cleanup': (options: { videoId: number }) => void 39 'after-cleanup': (options: { videoUUID: string }) => void
41} 40}
42 41
43declare interface MuxingSession { 42declare interface MuxingSession {
@@ -52,7 +51,7 @@ declare interface MuxingSession {
52 51
53class MuxingSession extends EventEmitter { 52class MuxingSession extends EventEmitter {
54 53
55 private ffmpegCommand: FfmpegCommand 54 private transcodingWrapper: AbstractTranscodingWrapper
56 55
57 private readonly context: any 56 private readonly context: any
58 private readonly user: MUserId 57 private readonly user: MUserId
@@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter {
67 66
68 private readonly hasAudio: boolean 67 private readonly hasAudio: boolean
69 68
70 private readonly videoId: number
71 private readonly videoUUID: string 69 private readonly videoUUID: string
72 private readonly saveReplay: boolean 70 private readonly saveReplay: boolean
73 71
@@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter {
126 124
127 this.allResolutions = options.allResolutions 125 this.allResolutions = options.allResolutions
128 126
129 this.videoId = this.videoLive.Video.id
130 this.videoUUID = this.videoLive.Video.uuid 127 this.videoUUID = this.videoLive.Video.uuid
131 128
132 this.saveReplay = this.videoLive.saveReplay 129 this.saveReplay = this.videoLive.saveReplay
@@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter {
145 142
146 await this.prepareDirectories() 143 await this.prepareDirectories()
147 144
148 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED 145 this.transcodingWrapper = this.buildTranscodingWrapper()
149 ? await getLiveTranscodingCommand({
150 inputUrl: this.inputUrl,
151 146
152 outPath: this.outDirectory, 147 this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
153 masterPlaylistName: this.streamingPlaylist.playlistFilename, 148 this.transcodingWrapper.on('error', () => this.onTranscodingError())
154 149
155 latencyMode: this.videoLive.latencyMode, 150 await this.transcodingWrapper.run()
156
157 resolutions: this.allResolutions,
158 fps: this.fps,
159 bitrate: this.bitrate,
160 ratio: this.ratio,
161
162 hasAudio: this.hasAudio,
163
164 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
165 profile: CONFIG.LIVE.TRANSCODING.PROFILE
166 })
167 : getLiveMuxingCommand({
168 inputUrl: this.inputUrl,
169 outPath: this.outDirectory,
170 masterPlaylistName: this.streamingPlaylist.playlistFilename,
171 latencyMode: this.videoLive.latencyMode
172 })
173
174 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
175 151
176 this.watchMasterFile() 152 this.watchMasterFile()
177 this.watchTSFiles() 153 this.watchTSFiles()
178 this.watchM3U8File() 154 this.watchM3U8File()
179
180 let ffmpegShellCommand: string
181 this.ffmpegCommand.on('start', cmdline => {
182 ffmpegShellCommand = cmdline
183
184 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
185 })
186
187 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
188 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
189 })
190
191 this.ffmpegCommand.on('end', () => {
192 this.emit('ffmpeg-end', ({ videoId: this.videoId }))
193
194 this.onFFmpegEnded()
195 })
196
197 this.ffmpegCommand.run()
198 } 155 }
199 156
200 abort () { 157 abort () {
201 if (!this.ffmpegCommand) return 158 if (!this.transcodingWrapper) return
202 159
203 this.aborted = true 160 this.aborted = true
204 this.ffmpegCommand.kill('SIGINT') 161 this.transcodingWrapper.abort()
205 } 162 }
206 163
207 destroy () { 164 destroy () {
@@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter {
210 this.hasClientSocketInBadHealthWithCache.clear() 167 this.hasClientSocketInBadHealthWithCache.clear()
211 } 168 }
212 169
213 private onFFmpegError (options: {
214 err: any
215 stdout: string
216 stderr: string
217 ffmpegShellCommand: string
218 }) {
219 const { err, stdout, stderr, ffmpegShellCommand } = options
220
221 this.onFFmpegEnded()
222
223 // Don't care that we killed the ffmpeg process
224 if (err?.message?.includes('Exiting normally')) return
225
226 logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
227
228 this.emit('ffmpeg-error', ({ videoId: this.videoId }))
229 }
230
231 private onFFmpegEnded () {
232 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
233
234 setTimeout(() => {
235 // Wait latest segments generation, and close watchers
236
237 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
238 .then(() => {
239 // Process remaining segments hash
240 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
241 this.processSegments(this.segmentsToProcessPerPlaylist[key])
242 }
243 })
244 .catch(err => {
245 logger.error(
246 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
247 { err, ...this.lTags() }
248 )
249 })
250
251 this.emit('after-cleanup', { videoId: this.videoId })
252 }, 1000)
253 }
254
255 private watchMasterFile () { 170 private watchMasterFile () {
256 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 171 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
257 172
@@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter {
272 187
273 this.masterPlaylistCreated = true 188 this.masterPlaylistCreated = true
274 189
190 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
191
275 this.masterWatcher.close() 192 this.masterWatcher.close()
276 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 193 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
277 }) 194 })
@@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter {
318 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] 235 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
319 236
320 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { 237 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
321 this.emit('bad-socket-health', { videoId: this.videoId }) 238 this.emit('bad-socket-health', { videoUUID: this.videoUUID })
322 return 239 return
323 } 240 }
324 241
325 // Duration constraint check 242 // Duration constraint check
326 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 243 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
327 this.emit('duration-exceeded', { videoId: this.videoId }) 244 this.emit('duration-exceeded', { videoUUID: this.videoUUID })
328 return 245 return
329 } 246 }
330 247
331 // Check user quota if the user enabled replay saving 248 // Check user quota if the user enabled replay saving
332 if (await this.isQuotaExceeded(segmentPath) === true) { 249 if (await this.isQuotaExceeded(segmentPath) === true) {
333 this.emit('quota-exceeded', { videoId: this.videoId }) 250 this.emit('quota-exceeded', { videoUUID: this.videoUUID })
334 } 251 }
335 } 252 }
336 253
@@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter {
438 if (this.masterPlaylistCreated && !this.liveReady) { 355 if (this.masterPlaylistCreated && !this.liveReady) {
439 this.liveReady = true 356 this.liveReady = true
440 357
441 this.emit('live-ready', { videoId: this.videoId }) 358 this.emit('live-ready', { videoUUID: this.videoUUID })
442 } 359 }
443 } 360 }
444 361
362 private onTranscodingError () {
363 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
364 }
365
366 private onTranscodedEnded () {
367 this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
368
369 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
370
371 setTimeout(() => {
372 // Wait latest segments generation, and close watchers
373
374 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
375 .then(() => {
376 // Process remaining segments hash
377 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
378 this.processSegments(this.segmentsToProcessPerPlaylist[key])
379 }
380 })
381 .catch(err => {
382 logger.error(
383 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
384 { err, ...this.lTags() }
385 )
386 })
387
388 this.emit('after-cleanup', { videoUUID: this.videoUUID })
389 }, 1000)
390 }
391
445 private hasClientSocketInBadHealth (sessionId: string) { 392 private hasClientSocketInBadHealth (sessionId: string) {
446 const rtmpSession = this.context.sessions.get(sessionId) 393 const rtmpSession = this.context.sessions.get(sessionId)
447 394
@@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter {
503 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED 450 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
504 }) 451 })
505 } 452 }
453
454 private buildTranscodingWrapper () {
455 const options = {
456 streamingPlaylist: this.streamingPlaylist,
457 videoLive: this.videoLive,
458
459 lTags: this.lTags,
460
461 inputUrl: this.inputUrl,
462
463 toTranscode: this.allResolutions.map(resolution => ({
464 resolution,
465 fps: computeOutputFPS({ inputFPS: this.fps, resolution })
466 })),
467
468 fps: this.fps,
469 bitrate: this.bitrate,
470 ratio: this.ratio,
471 hasAudio: this.hasAudio,
472
473 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
474 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
475
476 outDirectory: this.outDirectory
477 }
478
479 return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
480 ? new RemoteTranscodingWrapper(options)
481 : new FFmpegTranscodingWrapper(options)
482 }
506} 483}
507 484
508// --------------------------------------------------------------------------- 485// ---------------------------------------------------------------------------
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
new file mode 100644
index 000000000..226ba4573
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
@@ -0,0 +1,101 @@
1import EventEmitter from 'events'
2import { LoggerTagsFn } from '@server/helpers/logger'
3import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models'
4import { LiveVideoError } from '@shared/models'
5
6interface TranscodingWrapperEvents {
7 'end': () => void
8
9 'error': (options: { err: Error }) => void
10}
11
12declare interface AbstractTranscodingWrapper {
13 on<U extends keyof TranscodingWrapperEvents>(
14 event: U, listener: TranscodingWrapperEvents[U]
15 ): this
16
17 emit<U extends keyof TranscodingWrapperEvents>(
18 event: U, ...args: Parameters<TranscodingWrapperEvents[U]>
19 ): boolean
20}
21
22interface AbstractTranscodingWrapperOptions {
23 streamingPlaylist: MStreamingPlaylistVideo
24 videoLive: MVideoLiveVideo
25
26 lTags: LoggerTagsFn
27
28 inputUrl: string
29 fps: number
30 toTranscode: {
31 resolution: number
32 fps: number
33 }[]
34
35 bitrate: number
36 ratio: number
37 hasAudio: boolean
38
39 segmentListSize: number
40 segmentDuration: number
41
42 outDirectory: string
43}
44
45abstract class AbstractTranscodingWrapper extends EventEmitter {
46 protected readonly videoLive: MVideoLiveVideo
47
48 protected readonly toTranscode: {
49 resolution: number
50 fps: number
51 }[]
52
53 protected readonly inputUrl: string
54 protected readonly fps: number
55 protected readonly bitrate: number
56 protected readonly ratio: number
57 protected readonly hasAudio: boolean
58
59 protected readonly segmentListSize: number
60 protected readonly segmentDuration: number
61
62 protected readonly videoUUID: string
63
64 protected readonly outDirectory: string
65
66 protected readonly lTags: LoggerTagsFn
67
68 protected readonly streamingPlaylist: MStreamingPlaylistVideo
69
70 constructor (options: AbstractTranscodingWrapperOptions) {
71 super()
72
73 this.lTags = options.lTags
74
75 this.videoLive = options.videoLive
76 this.videoUUID = options.videoLive.Video.uuid
77 this.streamingPlaylist = options.streamingPlaylist
78
79 this.inputUrl = options.inputUrl
80 this.fps = options.fps
81 this.toTranscode = options.toTranscode
82
83 this.bitrate = options.bitrate
84 this.ratio = options.ratio
85 this.hasAudio = options.hasAudio
86
87 this.segmentListSize = options.segmentListSize
88 this.segmentDuration = options.segmentDuration
89
90 this.outDirectory = options.outDirectory
91 }
92
93 abstract run (): Promise<void>
94
95 abstract abort (error?: LiveVideoError): void
96}
97
98export {
99 AbstractTranscodingWrapper,
100 AbstractTranscodingWrapperOptions
101}
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
new file mode 100644
index 000000000..1f4c12bd4
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
@@ -0,0 +1,95 @@
1import { FfmpegCommand } from 'fluent-ffmpeg'
2import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
3import { logger } from '@server/helpers/logger'
4import { CONFIG } from '@server/initializers/config'
5import { VIDEO_LIVE } from '@server/initializers/constants'
6import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
7import { FFmpegLive } from '@shared/ffmpeg'
8import { getLiveSegmentTime } from '../../live-utils'
9import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
10
11export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
12 private ffmpegCommand: FfmpegCommand
13 private ended = false
14
15 async run () {
16 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
17 ? await this.buildFFmpegLive().getLiveTranscodingCommand({
18 inputUrl: this.inputUrl,
19
20 outPath: this.outDirectory,
21 masterPlaylistName: this.streamingPlaylist.playlistFilename,
22
23 segmentListSize: this.segmentListSize,
24 segmentDuration: this.segmentDuration,
25
26 toTranscode: this.toTranscode,
27
28 bitrate: this.bitrate,
29 ratio: this.ratio,
30
31 hasAudio: this.hasAudio
32 })
33 : this.buildFFmpegLive().getLiveMuxingCommand({
34 inputUrl: this.inputUrl,
35 outPath: this.outDirectory,
36
37 masterPlaylistName: this.streamingPlaylist.playlistFilename,
38
39 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
40 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode)
41 })
42
43 logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags())
44
45 this.ffmpegCommand.run()
46
47 let ffmpegShellCommand: string
48 this.ffmpegCommand.on('start', cmdline => {
49 ffmpegShellCommand = cmdline
50
51 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
52 })
53
54 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
55 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
56 })
57
58 this.ffmpegCommand.on('end', () => {
59 this.onFFmpegEnded()
60 })
61
62 this.ffmpegCommand.run()
63 }
64
65 abort () {
66 // Nothing to do, ffmpeg will automatically exit
67 }
68
69 private onFFmpegError (options: {
70 err: any
71 stdout: string
72 stderr: string
73 ffmpegShellCommand: string
74 }) {
75 const { err, stdout, stderr, ffmpegShellCommand } = options
76
77 // Don't care that we killed the ffmpeg process
78 if (err?.message?.includes('Exiting normally')) return
79
80 logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
81
82 this.emit('error', { err })
83 }
84
85 private onFFmpegEnded () {
86 if (this.ended) return
87
88 this.ended = true
89 this.emit('end')
90 }
91
92 private buildFFmpegLive () {
93 return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
94 }
95}
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts
new file mode 100644
index 000000000..ae28fa1ca
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/index.ts
@@ -0,0 +1,3 @@
1export * from './abstract-transcoding-wrapper'
2export * from './ffmpeg-transcoding-wrapper'
3export * from './remote-transcoding-wrapper'
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
new file mode 100644
index 000000000..345eaf442
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
@@ -0,0 +1,20 @@
1import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners'
2import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
3
4export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
5 async run () {
6 await new LiveRTMPHLSTranscodingJobHandler().create({
7 rtmpUrl: this.inputUrl,
8 toTranscode: this.toTranscode,
9 video: this.videoLive.Video,
10 outputDirectory: this.outDirectory,
11 playlist: this.streamingPlaylist,
12 segmentListSize: this.segmentListSize,
13 segmentDuration: this.segmentDuration
14 })
15 }
16
17 abort () {
18 this.emit('end')
19 }
20}
diff --git a/server/lib/object-storage/index.ts b/server/lib/object-storage/index.ts
index 8b413a40e..6525f8dfb 100644
--- a/server/lib/object-storage/index.ts
+++ b/server/lib/object-storage/index.ts
@@ -1,3 +1,4 @@
1export * from './keys' 1export * from './keys'
2export * from './proxy'
2export * from './urls' 3export * from './urls'
3export * from './videos' 4export * from './videos'
diff --git a/server/lib/object-storage/proxy.ts b/server/lib/object-storage/proxy.ts
new file mode 100644
index 000000000..c782a8a25
--- /dev/null
+++ b/server/lib/object-storage/proxy.ts
@@ -0,0 +1,97 @@
1import express from 'express'
2import { PassThrough, pipeline } from 'stream'
3import { GetObjectCommandOutput } from '@aws-sdk/client-s3'
4import { buildReinjectVideoFileTokenQuery } from '@server/controllers/shared/m3u8-playlist'
5import { logger } from '@server/helpers/logger'
6import { StreamReplacer } from '@server/helpers/stream-replacer'
7import { MStreamingPlaylist, MVideo } from '@server/types/models'
8import { HttpStatusCode } from '@shared/models'
9import { injectQueryToPlaylistUrls } from '../hls'
10import { getHLSFileReadStream, getWebTorrentFileReadStream } from './videos'
11
12export async function proxifyWebTorrentFile (options: {
13 req: express.Request
14 res: express.Response
15 filename: string
16}) {
17 const { req, res, filename } = options
18
19 logger.debug('Proxifying WebTorrent file %s from object storage.', filename)
20
21 try {
22 const { response: s3Response, stream } = await getWebTorrentFileReadStream({
23 filename,
24 rangeHeader: req.header('range')
25 })
26
27 setS3Headers(res, s3Response)
28
29 return stream.pipe(res)
30 } catch (err) {
31 return handleObjectStorageFailure(res, err)
32 }
33}
34
35export async function proxifyHLS (options: {
36 req: express.Request
37 res: express.Response
38 playlist: MStreamingPlaylist
39 video: MVideo
40 filename: string
41 reinjectVideoFileToken: boolean
42}) {
43 const { req, res, playlist, video, filename, reinjectVideoFileToken } = options
44
45 logger.debug('Proxifying HLS file %s from object storage.', filename)
46
47 try {
48 const { response: s3Response, stream } = await getHLSFileReadStream({
49 playlist: playlist.withVideo(video),
50 filename,
51 rangeHeader: req.header('range')
52 })
53
54 setS3Headers(res, s3Response)
55
56 const streamReplacer = reinjectVideoFileToken
57 ? new StreamReplacer(line => injectQueryToPlaylistUrls(line, buildReinjectVideoFileTokenQuery(req, filename.endsWith('master.m3u8'))))
58 : new PassThrough()
59
60 return pipeline(
61 stream,
62 streamReplacer,
63 res,
64 err => {
65 if (!err) return
66
67 handleObjectStorageFailure(res, err)
68 }
69 )
70 } catch (err) {
71 return handleObjectStorageFailure(res, err)
72 }
73}
74
75// ---------------------------------------------------------------------------
76// Private
77// ---------------------------------------------------------------------------
78
79function handleObjectStorageFailure (res: express.Response, err: Error) {
80 if (err.name === 'NoSuchKey') {
81 logger.debug('Could not find key in object storage to proxify private HLS video file.', { err })
82 return res.sendStatus(HttpStatusCode.NOT_FOUND_404)
83 }
84
85 return res.fail({
86 status: HttpStatusCode.INTERNAL_SERVER_ERROR_500,
87 message: err.message,
88 type: err.name
89 })
90}
91
92function setS3Headers (res: express.Response, s3Response: GetObjectCommandOutput) {
93 if (s3Response.$metadata.httpStatusCode === HttpStatusCode.PARTIAL_CONTENT_206) {
94 res.setHeader('Content-Range', s3Response.ContentRange)
95 res.status(HttpStatusCode.PARTIAL_CONTENT_206)
96 }
97}
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 0398ca61d..ded7e9743 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http'
2import { Namespace, Server as SocketServer, Socket } from 'socket.io' 2import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3import { isIdValid } from '@server/helpers/custom-validators/misc' 3import { isIdValid } from '@server/helpers/custom-validators/misc'
4import { MVideo, MVideoImmutable } from '@server/types/models' 4import { MVideo, MVideoImmutable } from '@server/types/models'
5import { MRunner } from '@server/types/models/runners'
5import { UserNotificationModelForApi } from '@server/types/models/user' 6import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' 7import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger' 8import { logger } from '../helpers/logger'
8import { authenticateSocket } from '../middlewares' 9import { authenticateRunnerSocket, authenticateSocket } from '../middlewares'
10import { Debounce } from '@server/helpers/debounce'
9 11
10class PeerTubeSocket { 12class PeerTubeSocket {
11 13
@@ -13,6 +15,7 @@ class PeerTubeSocket {
13 15
14 private userNotificationSockets: { [ userId: number ]: Socket[] } = {} 16 private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
15 private liveVideosNamespace: Namespace 17 private liveVideosNamespace: Namespace
18 private readonly runnerSockets = new Set<Socket>()
16 19
17 private constructor () {} 20 private constructor () {}
18 21
@@ -24,7 +27,7 @@ class PeerTubeSocket {
24 .on('connection', socket => { 27 .on('connection', socket => {
25 const userId = socket.handshake.auth.user.id 28 const userId = socket.handshake.auth.user.id
26 29
27 logger.debug('User %d connected on the notification system.', userId) 30 logger.debug('User %d connected to the notification system.', userId)
28 31
29 if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] 32 if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
30 33
@@ -53,6 +56,22 @@ class PeerTubeSocket {
53 socket.leave(videoId) 56 socket.leave(videoId)
54 }) 57 })
55 }) 58 })
59
60 io.of('/runners')
61 .use(authenticateRunnerSocket)
62 .on('connection', socket => {
63 const runner: MRunner = socket.handshake.auth.runner
64
65 logger.debug(`New runner "${runner.name}" connected to the notification system.`)
66
67 this.runnerSockets.add(socket)
68
69 socket.on('disconnect', () => {
70 logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
71
72 this.runnerSockets.delete(socket)
73 })
74 })
56 } 75 }
57 76
58 sendNotification (userId: number, notification: UserNotificationModelForApi) { 77 sendNotification (userId: number, notification: UserNotificationModelForApi) {
@@ -89,6 +108,15 @@ class PeerTubeSocket {
89 .emit(type, data) 108 .emit(type, data)
90 } 109 }
91 110
111 @Debounce({ timeoutMS: 1000 })
112 sendAvailableJobsPingToRunners () {
113 logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
114
115 for (const runners of this.runnerSockets) {
116 runners.emit('available-jobs')
117 }
118 }
119
92 static get Instance () { 120 static get Instance () {
93 return this.instance || (this.instance = new this()) 121 return this.instance || (this.instance = new this())
94 } 122 }
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts
index 66383af46..92ef87cca 100644
--- a/server/lib/plugins/plugin-helpers-builder.ts
+++ b/server/lib/plugins/plugin-helpers-builder.ts
@@ -1,7 +1,6 @@
1import express from 'express' 1import express from 'express'
2import { Server } from 'http' 2import { Server } from 'http'
3import { join } from 'path' 3import { join } from 'path'
4import { ffprobePromise } from '@server/helpers/ffmpeg/ffprobe-utils'
5import { buildLogger } from '@server/helpers/logger' 4import { buildLogger } from '@server/helpers/logger'
6import { CONFIG } from '@server/initializers/config' 5import { CONFIG } from '@server/initializers/config'
7import { WEBSERVER } from '@server/initializers/constants' 6import { WEBSERVER } from '@server/initializers/constants'
@@ -16,6 +15,7 @@ import { VideoModel } from '@server/models/video/video'
16import { VideoBlacklistModel } from '@server/models/video/video-blacklist' 15import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
17import { MPlugin, MVideo, UserNotificationModelForApi } from '@server/types/models' 16import { MPlugin, MVideo, UserNotificationModelForApi } from '@server/types/models'
18import { PeerTubeHelpers } from '@server/types/plugins' 17import { PeerTubeHelpers } from '@server/types/plugins'
18import { ffprobePromise } from '@shared/ffmpeg'
19import { VideoBlacklistCreate, VideoStorage } from '@shared/models' 19import { VideoBlacklistCreate, VideoStorage } from '@shared/models'
20import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist' 20import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist'
21import { PeerTubeSocket } from '../peertube-socket' 21import { PeerTubeSocket } from '../peertube-socket'
diff --git a/server/lib/runners/index.ts b/server/lib/runners/index.ts
new file mode 100644
index 000000000..a737c7b59
--- /dev/null
+++ b/server/lib/runners/index.ts
@@ -0,0 +1,3 @@
1export * from './job-handlers'
2export * from './runner'
3export * from './runner-urls'
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts
new file mode 100644
index 000000000..73fc14574
--- /dev/null
+++ b/server/lib/runners/job-handlers/abstract-job-handler.ts
@@ -0,0 +1,271 @@
1import { retryTransactionWrapper } from '@server/helpers/database-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { RUNNER_JOBS } from '@server/initializers/constants'
4import { sequelizeTypescript } from '@server/initializers/database'
5import { PeerTubeSocket } from '@server/lib/peertube-socket'
6import { RunnerJobModel } from '@server/models/runner/runner-job'
7import { setAsUpdated } from '@server/models/shared'
8import { MRunnerJob } from '@server/types/models/runners'
9import { pick } from '@shared/core-utils'
10import {
11 RunnerJobLiveRTMPHLSTranscodingPayload,
12 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
13 RunnerJobState,
14 RunnerJobSuccessPayload,
15 RunnerJobType,
16 RunnerJobUpdatePayload,
17 RunnerJobVODAudioMergeTranscodingPayload,
18 RunnerJobVODAudioMergeTranscodingPrivatePayload,
19 RunnerJobVODHLSTranscodingPayload,
20 RunnerJobVODHLSTranscodingPrivatePayload,
21 RunnerJobVODWebVideoTranscodingPayload,
22 RunnerJobVODWebVideoTranscodingPrivatePayload
23} from '@shared/models'
24
25type CreateRunnerJobArg =
26 {
27 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
28 payload: RunnerJobVODWebVideoTranscodingPayload
29 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
30 } |
31 {
32 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
33 payload: RunnerJobVODHLSTranscodingPayload
34 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
35 } |
36 {
37 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
38 payload: RunnerJobVODAudioMergeTranscodingPayload
39 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
40 } |
41 {
42 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
43 payload: RunnerJobLiveRTMPHLSTranscodingPayload
44 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
45 }
46
47export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
48
49 protected readonly lTags = loggerTagsFactory('runner')
50
51 // ---------------------------------------------------------------------------
52
53 abstract create (options: C): Promise<MRunnerJob>
54
55 protected async createRunnerJob (options: CreateRunnerJobArg & {
56 jobUUID: string
57 priority: number
58 dependsOnRunnerJob?: MRunnerJob
59 }): Promise<MRunnerJob> {
60 const { priority, dependsOnRunnerJob } = options
61
62 const runnerJob = new RunnerJobModel({
63 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
64
65 uuid: options.jobUUID,
66
67 state: dependsOnRunnerJob
68 ? RunnerJobState.WAITING_FOR_PARENT_JOB
69 : RunnerJobState.PENDING,
70
71 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
72
73 priority
74 })
75
76 const job = await sequelizeTypescript.transaction(async transaction => {
77 return runnerJob.save({ transaction })
78 })
79
80 if (runnerJob.state === RunnerJobState.PENDING) {
81 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
82 }
83
84 return job
85 }
86
87 // ---------------------------------------------------------------------------
88
89 protected abstract specificUpdate (options: {
90 runnerJob: MRunnerJob
91 updatePayload?: U
92 }): Promise<void> | void
93
94 async update (options: {
95 runnerJob: MRunnerJob
96 progress?: number
97 updatePayload?: U
98 }) {
99 const { runnerJob, progress } = options
100
101 await this.specificUpdate(options)
102
103 if (progress) runnerJob.progress = progress
104
105 await retryTransactionWrapper(() => {
106 return sequelizeTypescript.transaction(async transaction => {
107 if (runnerJob.changed()) {
108 return runnerJob.save({ transaction })
109 }
110
111 // Don't update the job too often
112 if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
113 await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
114 }
115 })
116 })
117 }
118
119 // ---------------------------------------------------------------------------
120
121 async complete (options: {
122 runnerJob: MRunnerJob
123 resultPayload: S
124 }) {
125 const { runnerJob } = options
126
127 try {
128 await this.specificComplete(options)
129
130 runnerJob.state = RunnerJobState.COMPLETED
131 } catch (err) {
132 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
133
134 runnerJob.state = RunnerJobState.ERRORED
135 runnerJob.error = err.message
136 }
137
138 runnerJob.progress = null
139 runnerJob.finishedAt = new Date()
140
141 await retryTransactionWrapper(() => {
142 return sequelizeTypescript.transaction(async transaction => {
143 await runnerJob.save({ transaction })
144 })
145 })
146
147 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
148
149 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
150 }
151
152 protected abstract specificComplete (options: {
153 runnerJob: MRunnerJob
154 resultPayload: S
155 }): Promise<void> | void
156
157 // ---------------------------------------------------------------------------
158
159 async cancel (options: {
160 runnerJob: MRunnerJob
161 fromParent?: boolean
162 }) {
163 const { runnerJob, fromParent } = options
164
165 await this.specificCancel(options)
166
167 const cancelState = fromParent
168 ? RunnerJobState.PARENT_CANCELLED
169 : RunnerJobState.CANCELLED
170
171 runnerJob.setToErrorOrCancel(cancelState)
172
173 await retryTransactionWrapper(() => {
174 return sequelizeTypescript.transaction(async transaction => {
175 await runnerJob.save({ transaction })
176 })
177 })
178
179 const children = await RunnerJobModel.listChildrenOf(runnerJob)
180 for (const child of children) {
181 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
182
183 await this.cancel({ runnerJob: child, fromParent: true })
184 }
185 }
186
187 protected abstract specificCancel (options: {
188 runnerJob: MRunnerJob
189 }): Promise<void> | void
190
191 // ---------------------------------------------------------------------------
192
193 protected abstract isAbortSupported (): boolean
194
195 async abort (options: {
196 runnerJob: MRunnerJob
197 }) {
198 const { runnerJob } = options
199
200 if (this.isAbortSupported() !== true) {
201 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
202 }
203
204 await this.specificAbort(options)
205
206 runnerJob.resetToPending()
207
208 await retryTransactionWrapper(() => {
209 return sequelizeTypescript.transaction(async transaction => {
210 await runnerJob.save({ transaction })
211 })
212 })
213 }
214
215 protected setAbortState (runnerJob: MRunnerJob) {
216 runnerJob.resetToPending()
217 }
218
219 protected abstract specificAbort (options: {
220 runnerJob: MRunnerJob
221 }): Promise<void> | void
222
223 // ---------------------------------------------------------------------------
224
225 async error (options: {
226 runnerJob: MRunnerJob
227 message: string
228 fromParent?: boolean
229 }) {
230 const { runnerJob, message, fromParent } = options
231
232 const errorState = fromParent
233 ? RunnerJobState.PARENT_ERRORED
234 : RunnerJobState.ERRORED
235
236 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
237 ? RunnerJobState.PENDING
238 : errorState
239
240 await this.specificError({ ...options, nextState })
241
242 if (nextState === errorState) {
243 runnerJob.setToErrorOrCancel(nextState)
244 runnerJob.error = message
245 } else {
246 runnerJob.resetToPending()
247 }
248
249 await retryTransactionWrapper(() => {
250 return sequelizeTypescript.transaction(async transaction => {
251 await runnerJob.save({ transaction })
252 })
253 })
254
255 if (runnerJob.state === errorState) {
256 const children = await RunnerJobModel.listChildrenOf(runnerJob)
257
258 for (const child of children) {
259 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
260
261 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
262 }
263 }
264 }
265
266 protected abstract specificError (options: {
267 runnerJob: MRunnerJob
268 message: string
269 nextState: RunnerJobState
270 }): Promise<void> | void
271}
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
new file mode 100644
index 000000000..517645848
--- /dev/null
+++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
@@ -0,0 +1,71 @@
1
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { logger } from '@server/helpers/logger'
4import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { MRunnerJob } from '@server/types/models/runners'
7import {
8 LiveRTMPHLSTranscodingUpdatePayload,
9 RunnerJobSuccessPayload,
10 RunnerJobUpdatePayload,
11 RunnerJobVODPrivatePayload
12} from '@shared/models'
13import { AbstractJobHandler } from './abstract-job-handler'
14import { loadTranscodingRunnerVideo } from './shared'
15
16// eslint-disable-next-line max-len
17export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> {
18
19 // ---------------------------------------------------------------------------
20
21 protected isAbortSupported () {
22 return true
23 }
24
25 protected specificUpdate (_options: {
26 runnerJob: MRunnerJob
27 updatePayload?: LiveRTMPHLSTranscodingUpdatePayload
28 }) {
29 // empty
30 }
31
32 protected specificAbort (_options: {
33 runnerJob: MRunnerJob
34 }) {
35 // empty
36 }
37
38 protected async specificError (options: {
39 runnerJob: MRunnerJob
40 }) {
41 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
42 if (!video) return
43
44 await moveToFailedTranscodingState(video)
45
46 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
47 }
48
49 protected async specificCancel (options: {
50 runnerJob: MRunnerJob
51 }) {
52 const { runnerJob } = options
53
54 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
55 if (!video) return
56
57 const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
58
59 logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid))
60
61 if (pending === 0) {
62 logger.info(
63 `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`,
64 this.lTags(video.uuid)
65 )
66
67 const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload
68 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo })
69 }
70 }
71}
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts
new file mode 100644
index 000000000..0fca72b9a
--- /dev/null
+++ b/server/lib/runners/job-handlers/index.ts
@@ -0,0 +1,6 @@
1export * from './abstract-job-handler'
2export * from './live-rtmp-hls-transcoding-job-handler'
3export * from './vod-audio-merge-transcoding-job-handler'
4export * from './vod-hls-transcoding-job-handler'
5export * from './vod-web-video-transcoding-job-handler'
6export * from './runner-job-handlers'
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
new file mode 100644
index 000000000..c3d0e427d
--- /dev/null
+++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
@@ -0,0 +1,170 @@
1import { move, remove } from 'fs-extra'
2import { join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { JOB_PRIORITY } from '@server/initializers/constants'
5import { LiveManager } from '@server/lib/live'
6import { MStreamingPlaylist, MVideo } from '@server/types/models'
7import { MRunnerJob } from '@server/types/models/runners'
8import { buildUUID } from '@shared/extra-utils'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 LiveVideoError,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
15 RunnerJobState
16} from '@shared/models'
17import { AbstractJobHandler } from './abstract-job-handler'
18
19type CreateOptions = {
20 video: MVideo
21 playlist: MStreamingPlaylist
22
23 rtmpUrl: string
24
25 toTranscode: {
26 resolution: number
27 fps: number
28 }[]
29
30 segmentListSize: number
31 segmentDuration: number
32
33 outputDirectory: string
34}
35
36// eslint-disable-next-line max-len
37export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
38
39 async create (options: CreateOptions) {
40 const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options
41
42 const jobUUID = buildUUID()
43 const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
44 input: {
45 rtmpUrl
46 },
47 output: {
48 toTranscode,
49 segmentListSize,
50 segmentDuration
51 }
52 }
53
54 const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
55 videoUUID: video.uuid,
56 masterPlaylistName: playlist.playlistFilename,
57 outputDirectory
58 }
59
60 const job = await this.createRunnerJob({
61 type: 'live-rtmp-hls-transcoding',
62 jobUUID,
63 payload,
64 privatePayload,
65 priority: JOB_PRIORITY.TRANSCODING
66 })
67
68 return job
69 }
70
71 // ---------------------------------------------------------------------------
72
73 async specificUpdate (options: {
74 runnerJob: MRunnerJob
75 updatePayload: LiveRTMPHLSTranscodingUpdatePayload
76 }) {
77 const { runnerJob, updatePayload } = options
78
79 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
80 const outputDirectory = privatePayload.outputDirectory
81 const videoUUID = privatePayload.videoUUID
82
83 if (updatePayload.type === 'add-chunk') {
84 await move(
85 updatePayload.videoChunkFile as string,
86 join(outputDirectory, updatePayload.videoChunkFilename),
87 { overwrite: true }
88 )
89 } else if (updatePayload.type === 'remove-chunk') {
90 await remove(join(outputDirectory, updatePayload.videoChunkFilename))
91 }
92
93 if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) {
94 await move(
95 updatePayload.resolutionPlaylistFile as string,
96 join(outputDirectory, updatePayload.resolutionPlaylistFilename),
97 { overwrite: true }
98 )
99 }
100
101 if (updatePayload.masterPlaylistFile) {
102 await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true })
103 }
104
105 logger.info(
106 'Runner live RTMP to HLS job %s for %s updated.',
107 runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) }
108 )
109 }
110
111 // ---------------------------------------------------------------------------
112
113 protected specificComplete (options: {
114 runnerJob: MRunnerJob
115 }) {
116 return this.stopLive({
117 runnerJob: options.runnerJob,
118 type: 'ended'
119 })
120 }
121
122 // ---------------------------------------------------------------------------
123
124 protected isAbortSupported () {
125 return false
126 }
127
128 protected specificAbort () {
129 throw new Error('Not implemented')
130 }
131
132 protected specificError (options: {
133 runnerJob: MRunnerJob
134 nextState: RunnerJobState
135 }) {
136 return this.stopLive({
137 runnerJob: options.runnerJob,
138 type: 'errored'
139 })
140 }
141
142 protected specificCancel (options: {
143 runnerJob: MRunnerJob
144 }) {
145 return this.stopLive({
146 runnerJob: options.runnerJob,
147 type: 'cancelled'
148 })
149 }
150
151 private stopLive (options: {
152 runnerJob: MRunnerJob
153 type: 'ended' | 'errored' | 'cancelled'
154 }) {
155 const { runnerJob, type } = options
156
157 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
158 const videoUUID = privatePayload.videoUUID
159
160 const errorType = {
161 ended: null,
162 errored: LiveVideoError.RUNNER_JOB_ERROR,
163 cancelled: LiveVideoError.RUNNER_JOB_CANCEL
164 }
165
166 LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type])
167
168 logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID))
169 }
170}
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts
new file mode 100644
index 000000000..7bad1bc77
--- /dev/null
+++ b/server/lib/runners/job-handlers/runner-job-handlers.ts
@@ -0,0 +1,18 @@
1import { MRunnerJob } from '@server/types/models/runners'
2import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models'
3import { AbstractJobHandler } from './abstract-job-handler'
4import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler'
5import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler'
6import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler'
7import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler'
8
9const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, RunnerJobUpdatePayload, RunnerJobSuccessPayload>> = {
10 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler,
11 'vod-hls-transcoding': VODHLSTranscodingJobHandler,
12 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler,
13 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler
14}
15
16export function getRunnerJobHandlerClass (job: MRunnerJob) {
17 return processors[job.type]
18}
diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts
new file mode 100644
index 000000000..348273ae2
--- /dev/null
+++ b/server/lib/runners/job-handlers/shared/index.ts
@@ -0,0 +1 @@
export * from './vod-helpers'
diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts
new file mode 100644
index 000000000..93ae89ff8
--- /dev/null
+++ b/server/lib/runners/job-handlers/shared/vod-helpers.ts
@@ -0,0 +1,44 @@
1import { move } from 'fs-extra'
2import { dirname, join } from 'path'
3import { logger, LoggerTagsFn } from '@server/helpers/logger'
4import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
5import { onWebTorrentVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding'
6import { buildNewFile } from '@server/lib/video-file'
7import { VideoModel } from '@server/models/video/video'
8import { MVideoFullLight } from '@server/types/models'
9import { MRunnerJob } from '@server/types/models/runners'
10import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models'
11
12export async function onVODWebVideoOrAudioMergeTranscodingJob (options: {
13 video: MVideoFullLight
14 videoFilePath: string
15 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload
16}) {
17 const { video, videoFilePath, privatePayload } = options
18
19 const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' })
20 videoFile.videoId = video.id
21
22 const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename)
23 await move(videoFilePath, newVideoFilePath)
24
25 await onWebTorrentVideoFileTranscoding({
26 video,
27 videoFile,
28 videoOutputPath: newVideoFilePath
29 })
30
31 await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video })
32}
33
34export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) {
35 const videoUUID = runnerJob.privatePayload.videoUUID
36
37 const video = await VideoModel.loadFull(videoUUID)
38 if (!video) {
39 logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID))
40 return undefined
41 }
42
43 return video
44}
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
new file mode 100644
index 000000000..a7b33f87e
--- /dev/null
+++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
@@ -0,0 +1,97 @@
1import { pick } from 'lodash'
2import { logger } from '@server/helpers/logger'
3import { VideoJobInfoModel } from '@server/models/video/video-job-info'
4import { MVideo } from '@server/types/models'
5import { MRunnerJob } from '@server/types/models/runners'
6import { buildUUID } from '@shared/extra-utils'
7import { getVideoStreamDuration } from '@shared/ffmpeg'
8import {
9 RunnerJobUpdatePayload,
10 RunnerJobVODAudioMergeTranscodingPayload,
11 RunnerJobVODWebVideoTranscodingPrivatePayload,
12 VODAudioMergeTranscodingSuccess
13} from '@shared/models'
14import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls'
15import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
16import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared'
17
18type CreateOptions = {
19 video: MVideo
20 isNewVideo: boolean
21 resolution: number
22 fps: number
23 priority: number
24 dependsOnRunnerJob?: MRunnerJob
25}
26
27// eslint-disable-next-line max-len
28export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODAudioMergeTranscodingSuccess> {
29
30 async create (options: CreateOptions) {
31 const { video, resolution, fps, priority, dependsOnRunnerJob } = options
32
33 const jobUUID = buildUUID()
34 const payload: RunnerJobVODAudioMergeTranscodingPayload = {
35 input: {
36 audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid),
37 previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid)
38 },
39 output: {
40 resolution,
41 fps
42 }
43 }
44
45 const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = {
46 ...pick(options, [ 'isNewVideo' ]),
47
48 videoUUID: video.uuid
49 }
50
51 const job = await this.createRunnerJob({
52 type: 'vod-audio-merge-transcoding',
53 jobUUID,
54 payload,
55 privatePayload,
56 priority,
57 dependsOnRunnerJob
58 })
59
60 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
61
62 return job
63 }
64
65 // ---------------------------------------------------------------------------
66
67 async specificComplete (options: {
68 runnerJob: MRunnerJob
69 resultPayload: VODAudioMergeTranscodingSuccess
70 }) {
71 const { runnerJob, resultPayload } = options
72 const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload
73
74 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
75 if (!video) return
76
77 const videoFilePath = resultPayload.videoFile as string
78
79 // ffmpeg generated a new video file, so update the video duration
80 // See https://trac.ffmpeg.org/ticket/5456
81 video.duration = await getVideoStreamDuration(videoFilePath)
82 await video.save()
83
84 // We can remove the old audio file
85 const oldAudioFile = video.VideoFiles[0]
86 await video.removeWebTorrentFile(oldAudioFile)
87 await oldAudioFile.destroy()
88 video.VideoFiles = []
89
90 await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload })
91
92 logger.info(
93 'Runner VOD audio merge transcoding job %s for %s ended.',
94 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
95 )
96 }
97}
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
new file mode 100644
index 000000000..02566b9d5
--- /dev/null
+++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
@@ -0,0 +1,114 @@
1import { move } from 'fs-extra'
2import { dirname, join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { renameVideoFileInPlaylist } from '@server/lib/hls'
5import { getHlsResolutionPlaylistFilename } from '@server/lib/paths'
6import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
7import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding'
8import { buildNewFile, removeAllWebTorrentFiles } from '@server/lib/video-file'
9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
10import { MVideo } from '@server/types/models'
11import { MRunnerJob } from '@server/types/models/runners'
12import { pick } from '@shared/core-utils'
13import { buildUUID } from '@shared/extra-utils'
14import {
15 RunnerJobUpdatePayload,
16 RunnerJobVODHLSTranscodingPayload,
17 RunnerJobVODHLSTranscodingPrivatePayload,
18 VODHLSTranscodingSuccess
19} from '@shared/models'
20import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
21import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
22import { loadTranscodingRunnerVideo } from './shared'
23
24type CreateOptions = {
25 video: MVideo
26 isNewVideo: boolean
27 deleteWebVideoFiles: boolean
28 resolution: number
29 fps: number
30 priority: number
31 dependsOnRunnerJob?: MRunnerJob
32}
33
34// eslint-disable-next-line max-len
35export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODHLSTranscodingSuccess> {
36
37 async create (options: CreateOptions) {
38 const { video, resolution, fps, dependsOnRunnerJob, priority } = options
39
40 const jobUUID = buildUUID()
41
42 const payload: RunnerJobVODHLSTranscodingPayload = {
43 input: {
44 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
45 },
46 output: {
47 resolution,
48 fps
49 }
50 }
51
52 const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = {
53 ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]),
54
55 videoUUID: video.uuid
56 }
57
58 const job = await this.createRunnerJob({
59 type: 'vod-hls-transcoding',
60 jobUUID,
61 payload,
62 privatePayload,
63 priority,
64 dependsOnRunnerJob
65 })
66
67 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
68
69 return job
70 }
71
72 // ---------------------------------------------------------------------------
73
74 async specificComplete (options: {
75 runnerJob: MRunnerJob
76 resultPayload: VODHLSTranscodingSuccess
77 }) {
78 const { runnerJob, resultPayload } = options
79 const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload
80
81 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
82 if (!video) return
83
84 const videoFilePath = resultPayload.videoFile as string
85 const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string
86
87 const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' })
88 const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename)
89 await move(videoFilePath, newVideoFilePath)
90
91 const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename)
92 const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename)
93 await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath)
94
95 await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename)
96
97 await onHLSVideoFileTranscoding({
98 video,
99 videoFile,
100 m3u8OutputPath: newResolutionPlaylistFilePath,
101 videoOutputPath: newVideoFilePath
102 })
103
104 await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video })
105
106 if (privatePayload.deleteWebVideoFiles === true) {
107 logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid))
108
109 await removeAllWebTorrentFiles(video)
110 }
111
112 logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid))
113 }
114}
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
new file mode 100644
index 000000000..57761a7a1
--- /dev/null
+++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
@@ -0,0 +1,84 @@
1import { pick } from 'lodash'
2import { logger } from '@server/helpers/logger'
3import { VideoJobInfoModel } from '@server/models/video/video-job-info'
4import { MVideo } from '@server/types/models'
5import { MRunnerJob } from '@server/types/models/runners'
6import { buildUUID } from '@shared/extra-utils'
7import {
8 RunnerJobUpdatePayload,
9 RunnerJobVODWebVideoTranscodingPayload,
10 RunnerJobVODWebVideoTranscodingPrivatePayload,
11 VODWebVideoTranscodingSuccess
12} from '@shared/models'
13import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
14import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
15import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared'
16
17type CreateOptions = {
18 video: MVideo
19 isNewVideo: boolean
20 resolution: number
21 fps: number
22 priority: number
23 dependsOnRunnerJob?: MRunnerJob
24}
25
26// eslint-disable-next-line max-len
27export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODWebVideoTranscodingSuccess> {
28
29 async create (options: CreateOptions) {
30 const { video, resolution, fps, priority, dependsOnRunnerJob } = options
31
32 const jobUUID = buildUUID()
33 const payload: RunnerJobVODWebVideoTranscodingPayload = {
34 input: {
35 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
36 },
37 output: {
38 resolution,
39 fps
40 }
41 }
42
43 const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = {
44 ...pick(options, [ 'isNewVideo' ]),
45
46 videoUUID: video.uuid
47 }
48
49 const job = await this.createRunnerJob({
50 type: 'vod-web-video-transcoding',
51 jobUUID,
52 payload,
53 privatePayload,
54 dependsOnRunnerJob,
55 priority
56 })
57
58 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
59
60 return job
61 }
62
63 // ---------------------------------------------------------------------------
64
65 async specificComplete (options: {
66 runnerJob: MRunnerJob
67 resultPayload: VODWebVideoTranscodingSuccess
68 }) {
69 const { runnerJob, resultPayload } = options
70 const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload
71
72 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
73 if (!video) return
74
75 const videoFilePath = resultPayload.videoFile as string
76
77 await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload })
78
79 logger.info(
80 'Runner VOD web video transcoding job %s for %s ended.',
81 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
82 )
83 }
84}
diff --git a/server/lib/runners/runner-urls.ts b/server/lib/runners/runner-urls.ts
new file mode 100644
index 000000000..329fb1170
--- /dev/null
+++ b/server/lib/runners/runner-urls.ts
@@ -0,0 +1,9 @@
1import { WEBSERVER } from '@server/initializers/constants'
2
3export function generateRunnerTranscodingVideoInputFileUrl (jobUUID: string, videoUUID: string) {
4 return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/max-quality'
5}
6
7export function generateRunnerTranscodingVideoPreviewFileUrl (jobUUID: string, videoUUID: string) {
8 return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/previews/max-quality'
9}
diff --git a/server/lib/runners/runner.ts b/server/lib/runners/runner.ts
new file mode 100644
index 000000000..74c814ba1
--- /dev/null
+++ b/server/lib/runners/runner.ts
@@ -0,0 +1,36 @@
1import express from 'express'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { logger, loggerTagsFactory } from '@server/helpers/logger'
4import { sequelizeTypescript } from '@server/initializers/database'
5import { MRunner } from '@server/types/models/runners'
6
7const lTags = loggerTagsFactory('runner')
8
9const updatingRunner = new Set<number>()
10
11function updateLastRunnerContact (req: express.Request, runner: MRunner) {
12 const now = new Date()
13
14 // Don't update last runner contact too often
15 if (now.getTime() - runner.lastContact.getTime() < 2000) return
16 if (updatingRunner.has(runner.id)) return
17
18 updatingRunner.add(runner.id)
19
20 runner.lastContact = now
21 runner.ip = req.ip
22
23 logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name))
24
25 retryTransactionWrapper(() => {
26 return sequelizeTypescript.transaction(async transaction => {
27 return runner.save({ transaction })
28 })
29 })
30 .catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) }))
31 .finally(() => updatingRunner.delete(runner.id))
32}
33
34export {
35 updateLastRunnerContact
36}
diff --git a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts
new file mode 100644
index 000000000..f7a26d2bc
--- /dev/null
+++ b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts
@@ -0,0 +1,42 @@
1import { CONFIG } from '@server/initializers/config'
2import { RunnerJobModel } from '@server/models/runner/runner-job'
3import { logger, loggerTagsFactory } from '../../helpers/logger'
4import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
5import { getRunnerJobHandlerClass } from '../runners'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('runner')
9
10export class RunnerJobWatchDogScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.RUNNER_JOB_WATCH_DOG
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const vodStalledJobs = await RunnerJobModel.listStalledJobs({
22 staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.VOD,
23 types: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]
24 })
25
26 const liveStalledJobs = await RunnerJobModel.listStalledJobs({
27 staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.LIVE,
28 types: [ 'live-rtmp-hls-transcoding' ]
29 })
30
31 for (const stalled of [ ...vodStalledJobs, ...liveStalledJobs ]) {
32 logger.info('Abort stalled runner job %s (%s)', stalled.uuid, stalled.type, lTags(stalled.uuid, stalled.type))
33
34 const Handler = getRunnerJobHandlerClass(stalled)
35 await new Handler().abort({ runnerJob: stalled })
36 }
37 }
38
39 static get Instance () {
40 return this.instance || (this.instance = new this())
41 }
42}
diff --git a/server/lib/server-config-manager.ts b/server/lib/server-config-manager.ts
index e87e2854f..ba7916363 100644
--- a/server/lib/server-config-manager.ts
+++ b/server/lib/server-config-manager.ts
@@ -126,11 +126,14 @@ class ServerConfigManager {
126 serverVersion: PEERTUBE_VERSION, 126 serverVersion: PEERTUBE_VERSION,
127 serverCommit: this.serverCommit, 127 serverCommit: this.serverCommit,
128 transcoding: { 128 transcoding: {
129 remoteRunners: {
130 enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED
131 },
129 hls: { 132 hls: {
130 enabled: CONFIG.TRANSCODING.HLS.ENABLED 133 enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.HLS.ENABLED
131 }, 134 },
132 webtorrent: { 135 webtorrent: {
133 enabled: CONFIG.TRANSCODING.WEBTORRENT.ENABLED 136 enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.WEBTORRENT.ENABLED
134 }, 137 },
135 enabledResolutions: this.getEnabledResolutions('vod'), 138 enabledResolutions: this.getEnabledResolutions('vod'),
136 profile: CONFIG.TRANSCODING.PROFILE, 139 profile: CONFIG.TRANSCODING.PROFILE,
@@ -150,6 +153,9 @@ class ServerConfigManager {
150 153
151 transcoding: { 154 transcoding: {
152 enabled: CONFIG.LIVE.TRANSCODING.ENABLED, 155 enabled: CONFIG.LIVE.TRANSCODING.ENABLED,
156 remoteRunners: {
157 enabled: CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
158 },
153 enabledResolutions: this.getEnabledResolutions('live'), 159 enabledResolutions: this.getEnabledResolutions('live'),
154 profile: CONFIG.LIVE.TRANSCODING.PROFILE, 160 profile: CONFIG.LIVE.TRANSCODING.PROFILE,
155 availableProfiles: VideoTranscodingProfilesManager.Instance.getAvailableProfiles('live') 161 availableProfiles: VideoTranscodingProfilesManager.Instance.getAvailableProfiles('live')
diff --git a/server/lib/transcoding/create-transcoding-job.ts b/server/lib/transcoding/create-transcoding-job.ts
new file mode 100644
index 000000000..46831a912
--- /dev/null
+++ b/server/lib/transcoding/create-transcoding-job.ts
@@ -0,0 +1,36 @@
1import { CONFIG } from '@server/initializers/config'
2import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
3import { TranscodingJobQueueBuilder, TranscodingRunnerJobBuilder } from './shared'
4
5export function createOptimizeOrMergeAudioJobs (options: {
6 video: MVideoFullLight
7 videoFile: MVideoFile
8 isNewVideo: boolean
9 user: MUserId
10}) {
11 return getJobBuilder().createOptimizeOrMergeAudioJobs(options)
12}
13
14// ---------------------------------------------------------------------------
15
16export function createTranscodingJobs (options: {
17 transcodingType: 'hls' | 'webtorrent'
18 video: MVideoFullLight
19 resolutions: number[]
20 isNewVideo: boolean
21 user: MUserId
22}) {
23 return getJobBuilder().createTranscodingJobs(options)
24}
25
26// ---------------------------------------------------------------------------
27// Private
28// ---------------------------------------------------------------------------
29
30function getJobBuilder () {
31 if (CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED === true) {
32 return new TranscodingRunnerJobBuilder()
33 }
34
35 return new TranscodingJobQueueBuilder()
36}
diff --git a/server/lib/transcoding/default-transcoding-profiles.ts b/server/lib/transcoding/default-transcoding-profiles.ts
index f47718819..5251784ac 100644
--- a/server/lib/transcoding/default-transcoding-profiles.ts
+++ b/server/lib/transcoding/default-transcoding-profiles.ts
@@ -1,15 +1,9 @@
1 1
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils' 3import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils'
4import { AvailableEncoders, EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '../../../shared/models/videos' 4import { buildStreamSuffix, FFmpegCommandWrapper, ffprobePromise, getAudioStream, getMaxAudioBitrate } from '@shared/ffmpeg'
5import { 5import { AvailableEncoders, EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '@shared/models'
6 buildStreamSuffix, 6import { canDoQuickAudioTranscode } from './transcoding-quick-transcode'
7 canDoQuickAudioTranscode,
8 ffprobePromise,
9 getAudioStream,
10 getMaxAudioBitrate,
11 resetSupportedEncoders
12} from '../../helpers/ffmpeg'
13 7
14/** 8/**
15 * 9 *
@@ -184,14 +178,14 @@ class VideoTranscodingProfilesManager {
184 addEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { 178 addEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) {
185 this.encodersPriorities[type][streamType].push({ name: encoder, priority }) 179 this.encodersPriorities[type][streamType].push({ name: encoder, priority })
186 180
187 resetSupportedEncoders() 181 FFmpegCommandWrapper.resetSupportedEncoders()
188 } 182 }
189 183
190 removeEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { 184 removeEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) {
191 this.encodersPriorities[type][streamType] = this.encodersPriorities[type][streamType] 185 this.encodersPriorities[type][streamType] = this.encodersPriorities[type][streamType]
192 .filter(o => o.name !== encoder && o.priority !== priority) 186 .filter(o => o.name !== encoder && o.priority !== priority)
193 187
194 resetSupportedEncoders() 188 FFmpegCommandWrapper.resetSupportedEncoders()
195 } 189 }
196 190
197 private getEncodersByPriority (type: 'vod' | 'live', streamType: 'audio' | 'video') { 191 private getEncodersByPriority (type: 'vod' | 'live', streamType: 'audio' | 'video') {
diff --git a/server/lib/transcoding/ended-transcoding.ts b/server/lib/transcoding/ended-transcoding.ts
new file mode 100644
index 000000000..d31674ede
--- /dev/null
+++ b/server/lib/transcoding/ended-transcoding.ts
@@ -0,0 +1,18 @@
1import { retryTransactionWrapper } from '@server/helpers/database-utils'
2import { VideoJobInfoModel } from '@server/models/video/video-job-info'
3import { MVideo } from '@server/types/models'
4import { moveToNextState } from '../video-state'
5
6export async function onTranscodingEnded (options: {
7 video: MVideo
8 isNewVideo: boolean
9 moveVideoToNextState: boolean
10}) {
11 const { video, isNewVideo, moveVideoToNextState } = options
12
13 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
14
15 if (moveVideoToNextState) {
16 await retryTransactionWrapper(moveToNextState, { video, isNewVideo })
17 }
18}
diff --git a/server/lib/transcoding/hls-transcoding.ts b/server/lib/transcoding/hls-transcoding.ts
new file mode 100644
index 000000000..cffa859c7
--- /dev/null
+++ b/server/lib/transcoding/hls-transcoding.ts
@@ -0,0 +1,181 @@
1import { MutexInterface } from 'async-mutex'
2import { Job } from 'bullmq'
3import { ensureDir, move, stat } from 'fs-extra'
4import { basename, extname as extnameUtil, join } from 'path'
5import { retryTransactionWrapper } from '@server/helpers/database-utils'
6import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
7import { sequelizeTypescript } from '@server/initializers/database'
8import { MVideo, MVideoFile } from '@server/types/models'
9import { pick } from '@shared/core-utils'
10import { getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg'
11import { VideoResolution } from '@shared/models'
12import { CONFIG } from '../../initializers/config'
13import { VideoFileModel } from '../../models/video/video-file'
14import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
15import { updatePlaylistAfterFileChange } from '../hls'
16import { generateHLSVideoFilename, getHlsResolutionPlaylistFilename } from '../paths'
17import { buildFileMetadata } from '../video-file'
18import { VideoPathManager } from '../video-path-manager'
19import { buildFFmpegVOD } from './shared'
20
21// Concat TS segments from a live video to a fragmented mp4 HLS playlist
22export async function generateHlsPlaylistResolutionFromTS (options: {
23 video: MVideo
24 concatenatedTsFilePath: string
25 resolution: VideoResolution
26 fps: number
27 isAAC: boolean
28 inputFileMutexReleaser: MutexInterface.Releaser
29}) {
30 return generateHlsPlaylistCommon({
31 type: 'hls-from-ts' as 'hls-from-ts',
32 inputPath: options.concatenatedTsFilePath,
33
34 ...pick(options, [ 'video', 'resolution', 'fps', 'inputFileMutexReleaser', 'isAAC' ])
35 })
36}
37
38// Generate an HLS playlist from an input file, and update the master playlist
39export function generateHlsPlaylistResolution (options: {
40 video: MVideo
41 videoInputPath: string
42 resolution: VideoResolution
43 fps: number
44 copyCodecs: boolean
45 inputFileMutexReleaser: MutexInterface.Releaser
46 job?: Job
47}) {
48 return generateHlsPlaylistCommon({
49 type: 'hls' as 'hls',
50 inputPath: options.videoInputPath,
51
52 ...pick(options, [ 'video', 'resolution', 'fps', 'copyCodecs', 'inputFileMutexReleaser', 'job' ])
53 })
54}
55
56export async function onHLSVideoFileTranscoding (options: {
57 video: MVideo
58 videoFile: MVideoFile
59 videoOutputPath: string
60 m3u8OutputPath: string
61}) {
62 const { video, videoFile, videoOutputPath, m3u8OutputPath } = options
63
64 // Create or update the playlist
65 const playlist = await retryTransactionWrapper(() => {
66 return sequelizeTypescript.transaction(async transaction => {
67 return VideoStreamingPlaylistModel.loadOrGenerate(video, transaction)
68 })
69 })
70 videoFile.videoStreamingPlaylistId = playlist.id
71
72 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
73
74 try {
75 // VOD transcoding is a long task, refresh video attributes
76 await video.reload()
77
78 const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, videoFile)
79 await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video))
80
81 // Move playlist file
82 const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, basename(m3u8OutputPath))
83 await move(m3u8OutputPath, resolutionPlaylistPath, { overwrite: true })
84 // Move video file
85 await move(videoOutputPath, videoFilePath, { overwrite: true })
86
87 // Update video duration if it was not set (in case of a live for example)
88 if (!video.duration) {
89 video.duration = await getVideoStreamDuration(videoFilePath)
90 await video.save()
91 }
92
93 const stats = await stat(videoFilePath)
94
95 videoFile.size = stats.size
96 videoFile.fps = await getVideoStreamFPS(videoFilePath)
97 videoFile.metadata = await buildFileMetadata(videoFilePath)
98
99 await createTorrentAndSetInfoHash(playlist, videoFile)
100
101 const oldFile = await VideoFileModel.loadHLSFile({
102 playlistId: playlist.id,
103 fps: videoFile.fps,
104 resolution: videoFile.resolution
105 })
106
107 if (oldFile) {
108 await video.removeStreamingPlaylistVideoFile(playlist, oldFile)
109 await oldFile.destroy()
110 }
111
112 const savedVideoFile = await VideoFileModel.customUpsert(videoFile, 'streaming-playlist', undefined)
113
114 await updatePlaylistAfterFileChange(video, playlist)
115
116 return { resolutionPlaylistPath, videoFile: savedVideoFile }
117 } finally {
118 mutexReleaser()
119 }
120}
121
122// ---------------------------------------------------------------------------
123
124async function generateHlsPlaylistCommon (options: {
125 type: 'hls' | 'hls-from-ts'
126 video: MVideo
127 inputPath: string
128
129 resolution: VideoResolution
130 fps: number
131
132 inputFileMutexReleaser: MutexInterface.Releaser
133
134 copyCodecs?: boolean
135 isAAC?: boolean
136
137 job?: Job
138}) {
139 const { type, video, inputPath, resolution, fps, copyCodecs, isAAC, job, inputFileMutexReleaser } = options
140 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
141
142 const videoTranscodedBasePath = join(transcodeDirectory, type)
143 await ensureDir(videoTranscodedBasePath)
144
145 const videoFilename = generateHLSVideoFilename(resolution)
146 const videoOutputPath = join(videoTranscodedBasePath, videoFilename)
147
148 const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFilename)
149 const m3u8OutputPath = join(videoTranscodedBasePath, resolutionPlaylistFilename)
150
151 const transcodeOptions = {
152 type,
153
154 inputPath,
155 outputPath: m3u8OutputPath,
156
157 resolution,
158 fps,
159 copyCodecs,
160
161 isAAC,
162
163 inputFileMutexReleaser,
164
165 hlsPlaylist: {
166 videoFilename
167 }
168 }
169
170 await buildFFmpegVOD(job).transcode(transcodeOptions)
171
172 const newVideoFile = new VideoFileModel({
173 resolution,
174 extname: extnameUtil(videoFilename),
175 size: 0,
176 filename: videoFilename,
177 fps: -1
178 })
179
180 await onHLSVideoFileTranscoding({ video, videoFile: newVideoFile, videoOutputPath, m3u8OutputPath })
181}
diff --git a/server/lib/transcoding/shared/ffmpeg-builder.ts b/server/lib/transcoding/shared/ffmpeg-builder.ts
new file mode 100644
index 000000000..441445ec4
--- /dev/null
+++ b/server/lib/transcoding/shared/ffmpeg-builder.ts
@@ -0,0 +1,18 @@
1import { Job } from 'bullmq'
2import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
3import { logger } from '@server/helpers/logger'
4import { FFmpegVOD } from '@shared/ffmpeg'
5import { VideoTranscodingProfilesManager } from '../default-transcoding-profiles'
6
7export function buildFFmpegVOD (job?: Job) {
8 return new FFmpegVOD({
9 ...getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()),
10
11 updateJobProgress: progress => {
12 if (!job) return
13
14 job.updateProgress(progress)
15 .catch(err => logger.error('Cannot update ffmpeg job progress', { err }))
16 }
17 })
18}
diff --git a/server/lib/transcoding/shared/index.ts b/server/lib/transcoding/shared/index.ts
new file mode 100644
index 000000000..f0b45bcbb
--- /dev/null
+++ b/server/lib/transcoding/shared/index.ts
@@ -0,0 +1,2 @@
1export * from './job-builders'
2export * from './ffmpeg-builder'
diff --git a/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts
new file mode 100644
index 000000000..f1e9efdcf
--- /dev/null
+++ b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts
@@ -0,0 +1,38 @@
1
2import { JOB_PRIORITY } from '@server/initializers/constants'
3import { VideoModel } from '@server/models/video/video'
4import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
5
6export abstract class AbstractJobBuilder {
7
8 abstract createOptimizeOrMergeAudioJobs (options: {
9 video: MVideoFullLight
10 videoFile: MVideoFile
11 isNewVideo: boolean
12 user: MUserId
13 }): Promise<any>
14
15 abstract createTranscodingJobs (options: {
16 transcodingType: 'hls' | 'webtorrent'
17 video: MVideoFullLight
18 resolutions: number[]
19 isNewVideo: boolean
20 user: MUserId | null
21 }): Promise<any>
22
23 protected async getTranscodingJobPriority (options: {
24 user: MUserId
25 fallback: number
26 }) {
27 const { user, fallback } = options
28
29 if (!user) return fallback
30
31 const now = new Date()
32 const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7)
33
34 const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek)
35
36 return JOB_PRIORITY.TRANSCODING + videoUploadedByUser
37 }
38}
diff --git a/server/lib/transcoding/shared/job-builders/index.ts b/server/lib/transcoding/shared/job-builders/index.ts
new file mode 100644
index 000000000..9b1c82adf
--- /dev/null
+++ b/server/lib/transcoding/shared/job-builders/index.ts
@@ -0,0 +1,2 @@
1export * from './transcoding-job-queue-builder'
2export * from './transcoding-runner-job-builder'
diff --git a/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts
new file mode 100644
index 000000000..7c892718b
--- /dev/null
+++ b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts
@@ -0,0 +1,308 @@
1import Bluebird from 'bluebird'
2import { computeOutputFPS } from '@server/helpers/ffmpeg'
3import { logger } from '@server/helpers/logger'
4import { CONFIG } from '@server/initializers/config'
5import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants'
6import { CreateJobArgument, JobQueue } from '@server/lib/job-queue'
7import { Hooks } from '@server/lib/plugins/hooks'
8import { VideoPathManager } from '@server/lib/video-path-manager'
9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
10import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models'
11import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg'
12import {
13 HLSTranscodingPayload,
14 MergeAudioTranscodingPayload,
15 NewWebTorrentResolutionTranscodingPayload,
16 OptimizeTranscodingPayload,
17 VideoTranscodingPayload
18} from '@shared/models'
19import { canDoQuickTranscode } from '../../transcoding-quick-transcode'
20import { computeResolutionsToTranscode } from '../../transcoding-resolutions'
21import { AbstractJobBuilder } from './abstract-job-builder'
22
23export class TranscodingJobQueueBuilder extends AbstractJobBuilder {
24
25 async createOptimizeOrMergeAudioJobs (options: {
26 video: MVideoFullLight
27 videoFile: MVideoFile
28 isNewVideo: boolean
29 user: MUserId
30 }) {
31 const { video, videoFile, isNewVideo, user } = options
32
33 let mergeOrOptimizePayload: MergeAudioTranscodingPayload | OptimizeTranscodingPayload
34 let nextTranscodingSequentialJobPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = []
35
36 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
37
38 try {
39 await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => {
40 const probe = await ffprobePromise(videoFilePath)
41
42 const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe)
43 const hasAudio = await hasAudioStream(videoFilePath, probe)
44 const quickTranscode = await canDoQuickTranscode(videoFilePath, probe)
45 const inputFPS = videoFile.isAudio()
46 ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value
47 : await getVideoStreamFPS(videoFilePath, probe)
48
49 const maxResolution = await isAudioFile(videoFilePath, probe)
50 ? DEFAULT_AUDIO_RESOLUTION
51 : resolution
52
53 if (CONFIG.TRANSCODING.HLS.ENABLED === true) {
54 nextTranscodingSequentialJobPayloads.push([
55 this.buildHLSJobPayload({
56 deleteWebTorrentFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false,
57
58 // We had some issues with a web video quick transcoded while producing a HLS version of it
59 copyCodecs: !quickTranscode,
60
61 resolution: maxResolution,
62 fps: computeOutputFPS({ inputFPS, resolution: maxResolution }),
63 videoUUID: video.uuid,
64 isNewVideo
65 })
66 ])
67 }
68
69 const lowerResolutionJobPayloads = await this.buildLowerResolutionJobPayloads({
70 video,
71 inputVideoResolution: maxResolution,
72 inputVideoFPS: inputFPS,
73 hasAudio,
74 isNewVideo
75 })
76
77 nextTranscodingSequentialJobPayloads = [ ...nextTranscodingSequentialJobPayloads, ...lowerResolutionJobPayloads ]
78
79 mergeOrOptimizePayload = videoFile.isAudio()
80 ? this.buildMergeAudioPayload({ videoUUID: video.uuid, isNewVideo })
81 : this.buildOptimizePayload({ videoUUID: video.uuid, isNewVideo, quickTranscode })
82 })
83 } finally {
84 mutexReleaser()
85 }
86
87 const nextTranscodingSequentialJobs = await Bluebird.mapSeries(nextTranscodingSequentialJobPayloads, payloads => {
88 return Bluebird.mapSeries(payloads, payload => {
89 return this.buildTranscodingJob({ payload, user })
90 })
91 })
92
93 const transcodingJobBuilderJob: CreateJobArgument = {
94 type: 'transcoding-job-builder',
95 payload: {
96 videoUUID: video.uuid,
97 sequentialJobs: nextTranscodingSequentialJobs
98 }
99 }
100
101 const mergeOrOptimizeJob = await this.buildTranscodingJob({ payload: mergeOrOptimizePayload, user })
102
103 return JobQueue.Instance.createSequentialJobFlow(...[ mergeOrOptimizeJob, transcodingJobBuilderJob ])
104 }
105
106 // ---------------------------------------------------------------------------
107
108 async createTranscodingJobs (options: {
109 transcodingType: 'hls' | 'webtorrent'
110 video: MVideoFullLight
111 resolutions: number[]
112 isNewVideo: boolean
113 user: MUserId | null
114 }) {
115 const { video, transcodingType, resolutions, isNewVideo } = options
116
117 const maxResolution = Math.max(...resolutions)
118 const childrenResolutions = resolutions.filter(r => r !== maxResolution)
119
120 logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution })
121
122 const { fps: inputFPS } = await video.probeMaxQualityFile()
123
124 const children = childrenResolutions.map(resolution => {
125 const fps = computeOutputFPS({ inputFPS, resolution })
126
127 if (transcodingType === 'hls') {
128 return this.buildHLSJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo })
129 }
130
131 if (transcodingType === 'webtorrent') {
132 return this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo })
133 }
134
135 throw new Error('Unknown transcoding type')
136 })
137
138 const fps = computeOutputFPS({ inputFPS, resolution: maxResolution })
139
140 const parent = transcodingType === 'hls'
141 ? this.buildHLSJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo })
142 : this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo })
143
144 // Process the last resolution after the other ones to prevent concurrency issue
145 // Because low resolutions use the biggest one as ffmpeg input
146 await this.createTranscodingJobsWithChildren({ videoUUID: video.uuid, parent, children, user: null })
147 }
148
149 // ---------------------------------------------------------------------------
150
151 private async createTranscodingJobsWithChildren (options: {
152 videoUUID: string
153 parent: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload)
154 children: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload)[]
155 user: MUserId | null
156 }) {
157 const { videoUUID, parent, children, user } = options
158
159 const parentJob = await this.buildTranscodingJob({ payload: parent, user })
160 const childrenJobs = await Bluebird.mapSeries(children, c => this.buildTranscodingJob({ payload: c, user }))
161
162 await JobQueue.Instance.createJobWithChildren(parentJob, childrenJobs)
163
164 await VideoJobInfoModel.increaseOrCreate(videoUUID, 'pendingTranscode', 1 + children.length)
165 }
166
167 private async buildTranscodingJob (options: {
168 payload: VideoTranscodingPayload
169 user: MUserId | null // null means we don't want priority
170 }) {
171 const { user, payload } = options
172
173 return {
174 type: 'video-transcoding' as 'video-transcoding',
175 priority: await this.getTranscodingJobPriority({ user, fallback: undefined }),
176 payload
177 }
178 }
179
180 private async buildLowerResolutionJobPayloads (options: {
181 video: MVideoWithFileThumbnail
182 inputVideoResolution: number
183 inputVideoFPS: number
184 hasAudio: boolean
185 isNewVideo: boolean
186 }) {
187 const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio } = options
188
189 // Create transcoding jobs if there are enabled resolutions
190 const resolutionsEnabled = await Hooks.wrapObject(
191 computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }),
192 'filter:transcoding.auto.resolutions-to-transcode.result',
193 options
194 )
195
196 const sequentialPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = []
197
198 for (const resolution of resolutionsEnabled) {
199 const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution })
200
201 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
202 const payloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[] = [
203 this.buildWebTorrentJobPayload({
204 videoUUID: video.uuid,
205 resolution,
206 fps,
207 isNewVideo
208 })
209 ]
210
211 // Create a subsequent job to create HLS resolution that will just copy web video codecs
212 if (CONFIG.TRANSCODING.HLS.ENABLED) {
213 payloads.push(
214 this.buildHLSJobPayload({
215 videoUUID: video.uuid,
216 resolution,
217 fps,
218 isNewVideo,
219 copyCodecs: true
220 })
221 )
222 }
223
224 sequentialPayloads.push(payloads)
225 } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
226 sequentialPayloads.push([
227 this.buildHLSJobPayload({
228 videoUUID: video.uuid,
229 resolution,
230 fps,
231 copyCodecs: false,
232 isNewVideo
233 })
234 ])
235 }
236 }
237
238 return sequentialPayloads
239 }
240
241 private buildHLSJobPayload (options: {
242 videoUUID: string
243 resolution: number
244 fps: number
245 isNewVideo: boolean
246 deleteWebTorrentFiles?: boolean // default false
247 copyCodecs?: boolean // default false
248 }): HLSTranscodingPayload {
249 const { videoUUID, resolution, fps, isNewVideo, deleteWebTorrentFiles = false, copyCodecs = false } = options
250
251 return {
252 type: 'new-resolution-to-hls',
253 videoUUID,
254 resolution,
255 fps,
256 copyCodecs,
257 isNewVideo,
258 deleteWebTorrentFiles
259 }
260 }
261
262 private buildWebTorrentJobPayload (options: {
263 videoUUID: string
264 resolution: number
265 fps: number
266 isNewVideo: boolean
267 }): NewWebTorrentResolutionTranscodingPayload {
268 const { videoUUID, resolution, fps, isNewVideo } = options
269
270 return {
271 type: 'new-resolution-to-webtorrent',
272 videoUUID,
273 isNewVideo,
274 resolution,
275 fps
276 }
277 }
278
279 private buildMergeAudioPayload (options: {
280 videoUUID: string
281 isNewVideo: boolean
282 }): MergeAudioTranscodingPayload {
283 const { videoUUID, isNewVideo } = options
284
285 return {
286 type: 'merge-audio-to-webtorrent',
287 resolution: DEFAULT_AUDIO_RESOLUTION,
288 fps: VIDEO_TRANSCODING_FPS.AUDIO_MERGE,
289 videoUUID,
290 isNewVideo
291 }
292 }
293
294 private buildOptimizePayload (options: {
295 videoUUID: string
296 quickTranscode: boolean
297 isNewVideo: boolean
298 }): OptimizeTranscodingPayload {
299 const { videoUUID, quickTranscode, isNewVideo } = options
300
301 return {
302 type: 'optimize-to-webtorrent',
303 videoUUID,
304 isNewVideo,
305 quickTranscode
306 }
307 }
308}
diff --git a/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts
new file mode 100644
index 000000000..c7a63d2e2
--- /dev/null
+++ b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts
@@ -0,0 +1,189 @@
1import { computeOutputFPS } from '@server/helpers/ffmpeg'
2import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config'
4import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants'
5import { Hooks } from '@server/lib/plugins/hooks'
6import { VODAudioMergeTranscodingJobHandler, VODHLSTranscodingJobHandler, VODWebVideoTranscodingJobHandler } from '@server/lib/runners'
7import { VideoPathManager } from '@server/lib/video-path-manager'
8import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models'
9import { MRunnerJob } from '@server/types/models/runners'
10import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg'
11import { computeResolutionsToTranscode } from '../../transcoding-resolutions'
12import { AbstractJobBuilder } from './abstract-job-builder'
13
14/**
15 *
16 * Class to build transcoding job in the local job queue
17 *
18 */
19
20const lTags = loggerTagsFactory('transcoding')
21
22export class TranscodingRunnerJobBuilder extends AbstractJobBuilder {
23
24 async createOptimizeOrMergeAudioJobs (options: {
25 video: MVideoFullLight
26 videoFile: MVideoFile
27 isNewVideo: boolean
28 user: MUserId
29 }) {
30 const { video, videoFile, isNewVideo, user } = options
31
32 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
33
34 try {
35 await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => {
36 const probe = await ffprobePromise(videoFilePath)
37
38 const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe)
39 const hasAudio = await hasAudioStream(videoFilePath, probe)
40 const inputFPS = videoFile.isAudio()
41 ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value
42 : await getVideoStreamFPS(videoFilePath, probe)
43
44 const maxResolution = await isAudioFile(videoFilePath, probe)
45 ? DEFAULT_AUDIO_RESOLUTION
46 : resolution
47
48 const fps = computeOutputFPS({ inputFPS, resolution: maxResolution })
49 const priority = await this.getTranscodingJobPriority({ user, fallback: 0 })
50
51 const mainRunnerJob = videoFile.isAudio()
52 ? await new VODAudioMergeTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority })
53 : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority })
54
55 if (CONFIG.TRANSCODING.HLS.ENABLED === true) {
56 await new VODHLSTranscodingJobHandler().create({
57 video,
58 deleteWebVideoFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false,
59 resolution: maxResolution,
60 fps,
61 isNewVideo,
62 dependsOnRunnerJob: mainRunnerJob,
63 priority: await this.getTranscodingJobPriority({ user, fallback: 0 })
64 })
65 }
66
67 await this.buildLowerResolutionJobPayloads({
68 video,
69 inputVideoResolution: maxResolution,
70 inputVideoFPS: inputFPS,
71 hasAudio,
72 isNewVideo,
73 mainRunnerJob,
74 user
75 })
76 })
77 } finally {
78 mutexReleaser()
79 }
80 }
81
82 // ---------------------------------------------------------------------------
83
84 async createTranscodingJobs (options: {
85 transcodingType: 'hls' | 'webtorrent'
86 video: MVideoFullLight
87 resolutions: number[]
88 isNewVideo: boolean
89 user: MUserId | null
90 }) {
91 const { video, transcodingType, resolutions, isNewVideo, user } = options
92
93 const maxResolution = Math.max(...resolutions)
94 const { fps: inputFPS } = await video.probeMaxQualityFile()
95 const maxFPS = computeOutputFPS({ inputFPS, resolution: maxResolution })
96 const priority = await this.getTranscodingJobPriority({ user, fallback: 0 })
97
98 const childrenResolutions = resolutions.filter(r => r !== maxResolution)
99
100 logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution })
101
102 // Process the last resolution before the other ones to prevent concurrency issue
103 // Because low resolutions use the biggest one as ffmpeg input
104 const mainJob = transcodingType === 'hls'
105 // eslint-disable-next-line max-len
106 ? await new VODHLSTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, deleteWebVideoFiles: false, priority })
107 : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, priority })
108
109 for (const resolution of childrenResolutions) {
110 const dependsOnRunnerJob = mainJob
111 const fps = computeOutputFPS({ inputFPS, resolution: maxResolution })
112
113 if (transcodingType === 'hls') {
114 await new VODHLSTranscodingJobHandler().create({
115 video,
116 resolution,
117 fps,
118 isNewVideo,
119 deleteWebVideoFiles: false,
120 dependsOnRunnerJob,
121 priority: await this.getTranscodingJobPriority({ user, fallback: 0 })
122 })
123 continue
124 }
125
126 if (transcodingType === 'webtorrent') {
127 await new VODWebVideoTranscodingJobHandler().create({
128 video,
129 resolution,
130 fps,
131 isNewVideo,
132 dependsOnRunnerJob,
133 priority: await this.getTranscodingJobPriority({ user, fallback: 0 })
134 })
135 continue
136 }
137
138 throw new Error('Unknown transcoding type')
139 }
140 }
141
142 private async buildLowerResolutionJobPayloads (options: {
143 mainRunnerJob: MRunnerJob
144 video: MVideoWithFileThumbnail
145 inputVideoResolution: number
146 inputVideoFPS: number
147 hasAudio: boolean
148 isNewVideo: boolean
149 user: MUserId
150 }) {
151 const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio, mainRunnerJob, user } = options
152
153 // Create transcoding jobs if there are enabled resolutions
154 const resolutionsEnabled = await Hooks.wrapObject(
155 computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }),
156 'filter:transcoding.auto.resolutions-to-transcode.result',
157 options
158 )
159
160 logger.debug('Lower resolutions build for %s.', video.uuid, { resolutionsEnabled, ...lTags(video.uuid) })
161
162 for (const resolution of resolutionsEnabled) {
163 const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution })
164
165 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
166 await new VODWebVideoTranscodingJobHandler().create({
167 video,
168 resolution,
169 fps,
170 isNewVideo,
171 dependsOnRunnerJob: mainRunnerJob,
172 priority: await this.getTranscodingJobPriority({ user, fallback: 0 })
173 })
174 }
175
176 if (CONFIG.TRANSCODING.HLS.ENABLED) {
177 await new VODHLSTranscodingJobHandler().create({
178 video,
179 resolution,
180 fps,
181 isNewVideo,
182 deleteWebVideoFiles: false,
183 dependsOnRunnerJob: mainRunnerJob,
184 priority: await this.getTranscodingJobPriority({ user, fallback: 0 })
185 })
186 }
187 }
188 }
189}
diff --git a/server/lib/transcoding/transcoding-quick-transcode.ts b/server/lib/transcoding/transcoding-quick-transcode.ts
new file mode 100644
index 000000000..b7f921890
--- /dev/null
+++ b/server/lib/transcoding/transcoding-quick-transcode.ts
@@ -0,0 +1,61 @@
1import { FfprobeData } from 'fluent-ffmpeg'
2import { CONFIG } from '@server/initializers/config'
3import { VIDEO_TRANSCODING_FPS } from '@server/initializers/constants'
4import { getMaxBitrate } from '@shared/core-utils'
5import {
6 ffprobePromise,
7 getAudioStream,
8 getMaxAudioBitrate,
9 getVideoStream,
10 getVideoStreamBitrate,
11 getVideoStreamDimensionsInfo,
12 getVideoStreamFPS
13} from '@shared/ffmpeg'
14
15export async function canDoQuickTranscode (path: string, existingProbe?: FfprobeData): Promise<boolean> {
16 if (CONFIG.TRANSCODING.PROFILE !== 'default') return false
17
18 const probe = existingProbe || await ffprobePromise(path)
19
20 return await canDoQuickVideoTranscode(path, probe) &&
21 await canDoQuickAudioTranscode(path, probe)
22}
23
24export async function canDoQuickAudioTranscode (path: string, probe?: FfprobeData): Promise<boolean> {
25 const parsedAudio = await getAudioStream(path, probe)
26
27 if (!parsedAudio.audioStream) return true
28
29 if (parsedAudio.audioStream['codec_name'] !== 'aac') return false
30
31 const audioBitrate = parsedAudio.bitrate
32 if (!audioBitrate) return false
33
34 const maxAudioBitrate = getMaxAudioBitrate('aac', audioBitrate)
35 if (maxAudioBitrate !== -1 && audioBitrate > maxAudioBitrate) return false
36
37 const channelLayout = parsedAudio.audioStream['channel_layout']
38 // Causes playback issues with Chrome
39 if (!channelLayout || channelLayout === 'unknown' || channelLayout === 'quad') return false
40
41 return true
42}
43
44export async function canDoQuickVideoTranscode (path: string, probe?: FfprobeData): Promise<boolean> {
45 const videoStream = await getVideoStream(path, probe)
46 const fps = await getVideoStreamFPS(path, probe)
47 const bitRate = await getVideoStreamBitrate(path, probe)
48 const resolutionData = await getVideoStreamDimensionsInfo(path, probe)
49
50 // If ffprobe did not manage to guess the bitrate
51 if (!bitRate) return false
52
53 // check video params
54 if (!videoStream) return false
55 if (videoStream['codec_name'] !== 'h264') return false
56 if (videoStream['pix_fmt'] !== 'yuv420p') return false
57 if (fps < VIDEO_TRANSCODING_FPS.MIN || fps > VIDEO_TRANSCODING_FPS.MAX) return false
58 if (bitRate > getMaxBitrate({ ...resolutionData, fps })) return false
59
60 return true
61}
diff --git a/server/lib/transcoding/transcoding-resolutions.ts b/server/lib/transcoding/transcoding-resolutions.ts
new file mode 100644
index 000000000..91f4d18d8
--- /dev/null
+++ b/server/lib/transcoding/transcoding-resolutions.ts
@@ -0,0 +1,52 @@
1import { CONFIG } from '@server/initializers/config'
2import { toEven } from '@shared/core-utils'
3import { VideoResolution } from '@shared/models'
4
5export function computeResolutionsToTranscode (options: {
6 input: number
7 type: 'vod' | 'live'
8 includeInput: boolean
9 strictLower: boolean
10 hasAudio: boolean
11}) {
12 const { input, type, includeInput, strictLower, hasAudio } = options
13
14 const configResolutions = type === 'vod'
15 ? CONFIG.TRANSCODING.RESOLUTIONS
16 : CONFIG.LIVE.TRANSCODING.RESOLUTIONS
17
18 const resolutionsEnabled = new Set<number>()
19
20 // Put in the order we want to proceed jobs
21 const availableResolutions: VideoResolution[] = [
22 VideoResolution.H_NOVIDEO,
23 VideoResolution.H_480P,
24 VideoResolution.H_360P,
25 VideoResolution.H_720P,
26 VideoResolution.H_240P,
27 VideoResolution.H_144P,
28 VideoResolution.H_1080P,
29 VideoResolution.H_1440P,
30 VideoResolution.H_4K
31 ]
32
33 for (const resolution of availableResolutions) {
34 // Resolution not enabled
35 if (configResolutions[resolution + 'p'] !== true) continue
36 // Too big resolution for input file
37 if (input < resolution) continue
38 // We only want lower resolutions than input file
39 if (strictLower && input === resolution) continue
40 // Audio resolutio but no audio in the video
41 if (resolution === VideoResolution.H_NOVIDEO && !hasAudio) continue
42
43 resolutionsEnabled.add(resolution)
44 }
45
46 if (includeInput) {
47 // Always use an even resolution to avoid issues with ffmpeg
48 resolutionsEnabled.add(toEven(input))
49 }
50
51 return Array.from(resolutionsEnabled)
52}
diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts
deleted file mode 100644
index c7b61e9ba..000000000
--- a/server/lib/transcoding/transcoding.ts
+++ /dev/null
@@ -1,465 +0,0 @@
1import { MutexInterface } from 'async-mutex'
2import { Job } from 'bullmq'
3import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
4import { basename, extname as extnameUtil, join } from 'path'
5import { toEven } from '@server/helpers/core-utils'
6import { retryTransactionWrapper } from '@server/helpers/database-utils'
7import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
8import { sequelizeTypescript } from '@server/initializers/database'
9import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
10import { pick } from '@shared/core-utils'
11import { VideoResolution, VideoStorage } from '../../../shared/models/videos'
12import {
13 buildFileMetadata,
14 canDoQuickTranscode,
15 computeResolutionsToTranscode,
16 ffprobePromise,
17 getVideoStreamDuration,
18 getVideoStreamFPS,
19 transcodeVOD,
20 TranscodeVODOptions,
21 TranscodeVODOptionsType
22} from '../../helpers/ffmpeg'
23import { CONFIG } from '../../initializers/config'
24import { VideoFileModel } from '../../models/video/video-file'
25import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
26import { updatePlaylistAfterFileChange } from '../hls'
27import { generateHLSVideoFilename, generateWebTorrentVideoFilename, getHlsResolutionPlaylistFilename } from '../paths'
28import { VideoPathManager } from '../video-path-manager'
29import { VideoTranscodingProfilesManager } from './default-transcoding-profiles'
30
31/**
32 *
33 * Functions that run transcoding functions, update the database, cleanup files, create torrent files...
34 * Mainly called by the job queue
35 *
36 */
37
38// Optimize the original video file and replace it. The resolution is not changed.
39async function optimizeOriginalVideofile (options: {
40 video: MVideoFullLight
41 inputVideoFile: MVideoFile
42 job: Job
43}) {
44 const { video, inputVideoFile, job } = options
45
46 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
47 const newExtname = '.mp4'
48
49 // Will be released by our transcodeVOD function once ffmpeg is ran
50 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
51
52 try {
53 await video.reload()
54
55 const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video)
56
57 const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => {
58 const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
59
60 const transcodeType: TranscodeVODOptionsType = await canDoQuickTranscode(videoInputPath)
61 ? 'quick-transcode'
62 : 'video'
63
64 const resolution = buildOriginalFileResolution(inputVideoFile.resolution)
65
66 const transcodeOptions: TranscodeVODOptions = {
67 type: transcodeType,
68
69 inputPath: videoInputPath,
70 outputPath: videoTranscodedPath,
71
72 inputFileMutexReleaser,
73
74 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
75 profile: CONFIG.TRANSCODING.PROFILE,
76
77 resolution,
78
79 job
80 }
81
82 // Could be very long!
83 await transcodeVOD(transcodeOptions)
84
85 // Important to do this before getVideoFilename() to take in account the new filename
86 inputVideoFile.resolution = resolution
87 inputVideoFile.extname = newExtname
88 inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname)
89 inputVideoFile.storage = VideoStorage.FILE_SYSTEM
90
91 const { videoFile } = await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile)
92 await remove(videoInputPath)
93
94 return { transcodeType, videoFile }
95 })
96
97 return result
98 } finally {
99 inputFileMutexReleaser()
100 }
101}
102
103// Transcode the original video file to a lower resolution compatible with WebTorrent
104async function transcodeNewWebTorrentResolution (options: {
105 video: MVideoFullLight
106 resolution: VideoResolution
107 job: Job
108}) {
109 const { video, resolution, job } = options
110
111 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
112 const newExtname = '.mp4'
113
114 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
115
116 try {
117 await video.reload()
118
119 const file = video.getMaxQualityFile().withVideoOrPlaylist(video)
120
121 const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => {
122 const newVideoFile = new VideoFileModel({
123 resolution,
124 extname: newExtname,
125 filename: generateWebTorrentVideoFilename(resolution, newExtname),
126 size: 0,
127 videoId: video.id
128 })
129
130 const videoTranscodedPath = join(transcodeDirectory, newVideoFile.filename)
131
132 const transcodeOptions = resolution === VideoResolution.H_NOVIDEO
133 ? {
134 type: 'only-audio' as 'only-audio',
135
136 inputPath: videoInputPath,
137 outputPath: videoTranscodedPath,
138
139 inputFileMutexReleaser,
140
141 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
142 profile: CONFIG.TRANSCODING.PROFILE,
143
144 resolution,
145
146 job
147 }
148 : {
149 type: 'video' as 'video',
150 inputPath: videoInputPath,
151 outputPath: videoTranscodedPath,
152
153 inputFileMutexReleaser,
154
155 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
156 profile: CONFIG.TRANSCODING.PROFILE,
157
158 resolution,
159
160 job
161 }
162
163 await transcodeVOD(transcodeOptions)
164
165 return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, newVideoFile)
166 })
167
168 return result
169 } finally {
170 inputFileMutexReleaser()
171 }
172}
173
174// Merge an image with an audio file to create a video
175async function mergeAudioVideofile (options: {
176 video: MVideoFullLight
177 resolution: VideoResolution
178 job: Job
179}) {
180 const { video, resolution, job } = options
181
182 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
183 const newExtname = '.mp4'
184
185 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
186
187 try {
188 await video.reload()
189
190 const inputVideoFile = video.getMinQualityFile()
191
192 const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video)
193
194 const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => {
195 const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
196
197 // If the user updates the video preview during transcoding
198 const previewPath = video.getPreview().getPath()
199 const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath))
200 await copyFile(previewPath, tmpPreviewPath)
201
202 const transcodeOptions = {
203 type: 'merge-audio' as 'merge-audio',
204
205 inputPath: tmpPreviewPath,
206 outputPath: videoTranscodedPath,
207
208 inputFileMutexReleaser,
209
210 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
211 profile: CONFIG.TRANSCODING.PROFILE,
212
213 audioPath: audioInputPath,
214 resolution,
215
216 job
217 }
218
219 try {
220 await transcodeVOD(transcodeOptions)
221
222 await remove(audioInputPath)
223 await remove(tmpPreviewPath)
224 } catch (err) {
225 await remove(tmpPreviewPath)
226 throw err
227 }
228
229 // Important to do this before getVideoFilename() to take in account the new file extension
230 inputVideoFile.extname = newExtname
231 inputVideoFile.resolution = resolution
232 inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname)
233
234 // ffmpeg generated a new video file, so update the video duration
235 // See https://trac.ffmpeg.org/ticket/5456
236 video.duration = await getVideoStreamDuration(videoTranscodedPath)
237 await video.save()
238
239 return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile)
240 })
241
242 return result
243 } finally {
244 inputFileMutexReleaser()
245 }
246}
247
248// Concat TS segments from a live video to a fragmented mp4 HLS playlist
249async function generateHlsPlaylistResolutionFromTS (options: {
250 video: MVideo
251 concatenatedTsFilePath: string
252 resolution: VideoResolution
253 isAAC: boolean
254 inputFileMutexReleaser: MutexInterface.Releaser
255}) {
256 return generateHlsPlaylistCommon({
257 type: 'hls-from-ts' as 'hls-from-ts',
258 inputPath: options.concatenatedTsFilePath,
259
260 ...pick(options, [ 'video', 'resolution', 'inputFileMutexReleaser', 'isAAC' ])
261 })
262}
263
264// Generate an HLS playlist from an input file, and update the master playlist
265function generateHlsPlaylistResolution (options: {
266 video: MVideo
267 videoInputPath: string
268 resolution: VideoResolution
269 copyCodecs: boolean
270 inputFileMutexReleaser: MutexInterface.Releaser
271 job?: Job
272}) {
273 return generateHlsPlaylistCommon({
274 type: 'hls' as 'hls',
275 inputPath: options.videoInputPath,
276
277 ...pick(options, [ 'video', 'resolution', 'copyCodecs', 'inputFileMutexReleaser', 'job' ])
278 })
279}
280
281// ---------------------------------------------------------------------------
282
283export {
284 generateHlsPlaylistResolution,
285 generateHlsPlaylistResolutionFromTS,
286 optimizeOriginalVideofile,
287 transcodeNewWebTorrentResolution,
288 mergeAudioVideofile
289}
290
291// ---------------------------------------------------------------------------
292
293async function onWebTorrentVideoFileTranscoding (
294 video: MVideoFullLight,
295 videoFile: MVideoFile,
296 transcodingPath: string,
297 newVideoFile: MVideoFile
298) {
299 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
300
301 try {
302 await video.reload()
303
304 const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile)
305
306 const stats = await stat(transcodingPath)
307
308 const probe = await ffprobePromise(transcodingPath)
309 const fps = await getVideoStreamFPS(transcodingPath, probe)
310 const metadata = await buildFileMetadata(transcodingPath, probe)
311
312 await move(transcodingPath, outputPath, { overwrite: true })
313
314 videoFile.size = stats.size
315 videoFile.fps = fps
316 videoFile.metadata = metadata
317
318 await createTorrentAndSetInfoHash(video, videoFile)
319
320 const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution })
321 if (oldFile) await video.removeWebTorrentFile(oldFile)
322
323 await VideoFileModel.customUpsert(videoFile, 'video', undefined)
324 video.VideoFiles = await video.$get('VideoFiles')
325
326 return { video, videoFile }
327 } finally {
328 mutexReleaser()
329 }
330}
331
332async function generateHlsPlaylistCommon (options: {
333 type: 'hls' | 'hls-from-ts'
334 video: MVideo
335 inputPath: string
336 resolution: VideoResolution
337
338 inputFileMutexReleaser: MutexInterface.Releaser
339
340 copyCodecs?: boolean
341 isAAC?: boolean
342
343 job?: Job
344}) {
345 const { type, video, inputPath, resolution, copyCodecs, isAAC, job, inputFileMutexReleaser } = options
346 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
347
348 const videoTranscodedBasePath = join(transcodeDirectory, type)
349 await ensureDir(videoTranscodedBasePath)
350
351 const videoFilename = generateHLSVideoFilename(resolution)
352 const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFilename)
353 const resolutionPlaylistFileTranscodePath = join(videoTranscodedBasePath, resolutionPlaylistFilename)
354
355 const transcodeOptions = {
356 type,
357
358 inputPath,
359 outputPath: resolutionPlaylistFileTranscodePath,
360
361 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
362 profile: CONFIG.TRANSCODING.PROFILE,
363
364 resolution,
365 copyCodecs,
366
367 isAAC,
368
369 inputFileMutexReleaser,
370
371 hlsPlaylist: {
372 videoFilename
373 },
374
375 job
376 }
377
378 await transcodeVOD(transcodeOptions)
379
380 // Create or update the playlist
381 const playlist = await retryTransactionWrapper(() => {
382 return sequelizeTypescript.transaction(async transaction => {
383 return VideoStreamingPlaylistModel.loadOrGenerate(video, transaction)
384 })
385 })
386
387 const newVideoFile = new VideoFileModel({
388 resolution,
389 extname: extnameUtil(videoFilename),
390 size: 0,
391 filename: videoFilename,
392 fps: -1,
393 videoStreamingPlaylistId: playlist.id
394 })
395
396 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
397
398 try {
399 // VOD transcoding is a long task, refresh video attributes
400 await video.reload()
401
402 const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, newVideoFile)
403 await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video))
404
405 // Move playlist file
406 const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, resolutionPlaylistFilename)
407 await move(resolutionPlaylistFileTranscodePath, resolutionPlaylistPath, { overwrite: true })
408 // Move video file
409 await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true })
410
411 // Update video duration if it was not set (in case of a live for example)
412 if (!video.duration) {
413 video.duration = await getVideoStreamDuration(videoFilePath)
414 await video.save()
415 }
416
417 const stats = await stat(videoFilePath)
418
419 newVideoFile.size = stats.size
420 newVideoFile.fps = await getVideoStreamFPS(videoFilePath)
421 newVideoFile.metadata = await buildFileMetadata(videoFilePath)
422
423 await createTorrentAndSetInfoHash(playlist, newVideoFile)
424
425 const oldFile = await VideoFileModel.loadHLSFile({
426 playlistId: playlist.id,
427 fps: newVideoFile.fps,
428 resolution: newVideoFile.resolution
429 })
430
431 if (oldFile) {
432 await video.removeStreamingPlaylistVideoFile(playlist, oldFile)
433 await oldFile.destroy()
434 }
435
436 const savedVideoFile = await VideoFileModel.customUpsert(newVideoFile, 'streaming-playlist', undefined)
437
438 await updatePlaylistAfterFileChange(video, playlist)
439
440 return { resolutionPlaylistPath, videoFile: savedVideoFile }
441 } finally {
442 mutexReleaser()
443 }
444}
445
446function buildOriginalFileResolution (inputResolution: number) {
447 if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) {
448 return toEven(inputResolution)
449 }
450
451 const resolutions = computeResolutionsToTranscode({
452 input: inputResolution,
453 type: 'vod',
454 includeInput: false,
455 strictLower: false,
456 // We don't really care about the audio resolution in this context
457 hasAudio: true
458 })
459
460 if (resolutions.length === 0) {
461 return toEven(inputResolution)
462 }
463
464 return Math.max(...resolutions)
465}
diff --git a/server/lib/transcoding/web-transcoding.ts b/server/lib/transcoding/web-transcoding.ts
new file mode 100644
index 000000000..d43d03b2a
--- /dev/null
+++ b/server/lib/transcoding/web-transcoding.ts
@@ -0,0 +1,273 @@
1import { Job } from 'bullmq'
2import { copyFile, move, remove, stat } from 'fs-extra'
3import { basename, join } from 'path'
4import { computeOutputFPS } from '@server/helpers/ffmpeg'
5import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
6import { MVideoFile, MVideoFullLight } from '@server/types/models'
7import { toEven } from '@shared/core-utils'
8import { ffprobePromise, getVideoStreamDuration, getVideoStreamFPS, TranscodeVODOptionsType } from '@shared/ffmpeg'
9import { VideoResolution, VideoStorage } from '@shared/models'
10import { CONFIG } from '../../initializers/config'
11import { VideoFileModel } from '../../models/video/video-file'
12import { generateWebTorrentVideoFilename } from '../paths'
13import { buildFileMetadata } from '../video-file'
14import { VideoPathManager } from '../video-path-manager'
15import { buildFFmpegVOD } from './shared'
16import { computeResolutionsToTranscode } from './transcoding-resolutions'
17
18// Optimize the original video file and replace it. The resolution is not changed.
19export async function optimizeOriginalVideofile (options: {
20 video: MVideoFullLight
21 inputVideoFile: MVideoFile
22 quickTranscode: boolean
23 job: Job
24}) {
25 const { video, inputVideoFile, quickTranscode, job } = options
26
27 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
28 const newExtname = '.mp4'
29
30 // Will be released by our transcodeVOD function once ffmpeg is ran
31 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
32
33 try {
34 await video.reload()
35
36 const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video)
37
38 const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => {
39 const videoOutputPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
40
41 const transcodeType: TranscodeVODOptionsType = quickTranscode
42 ? 'quick-transcode'
43 : 'video'
44
45 const resolution = buildOriginalFileResolution(inputVideoFile.resolution)
46 const fps = computeOutputFPS({ inputFPS: inputVideoFile.fps, resolution })
47
48 // Could be very long!
49 await buildFFmpegVOD(job).transcode({
50 type: transcodeType,
51
52 inputPath: videoInputPath,
53 outputPath: videoOutputPath,
54
55 inputFileMutexReleaser,
56
57 resolution,
58 fps
59 })
60
61 // Important to do this before getVideoFilename() to take in account the new filename
62 inputVideoFile.resolution = resolution
63 inputVideoFile.extname = newExtname
64 inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname)
65 inputVideoFile.storage = VideoStorage.FILE_SYSTEM
66
67 const { videoFile } = await onWebTorrentVideoFileTranscoding({
68 video,
69 videoFile: inputVideoFile,
70 videoOutputPath
71 })
72
73 await remove(videoInputPath)
74
75 return { transcodeType, videoFile }
76 })
77
78 return result
79 } finally {
80 inputFileMutexReleaser()
81 }
82}
83
84// Transcode the original video file to a lower resolution compatible with WebTorrent
85export async function transcodeNewWebTorrentResolution (options: {
86 video: MVideoFullLight
87 resolution: VideoResolution
88 fps: number
89 job: Job
90}) {
91 const { video, resolution, fps, job } = options
92
93 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
94 const newExtname = '.mp4'
95
96 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
97
98 try {
99 await video.reload()
100
101 const file = video.getMaxQualityFile().withVideoOrPlaylist(video)
102
103 const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => {
104 const newVideoFile = new VideoFileModel({
105 resolution,
106 extname: newExtname,
107 filename: generateWebTorrentVideoFilename(resolution, newExtname),
108 size: 0,
109 videoId: video.id
110 })
111
112 const videoOutputPath = join(transcodeDirectory, newVideoFile.filename)
113
114 const transcodeOptions = {
115 type: 'video' as 'video',
116
117 inputPath: videoInputPath,
118 outputPath: videoOutputPath,
119
120 inputFileMutexReleaser,
121
122 resolution,
123 fps
124 }
125
126 await buildFFmpegVOD(job).transcode(transcodeOptions)
127
128 return onWebTorrentVideoFileTranscoding({ video, videoFile: newVideoFile, videoOutputPath })
129 })
130
131 return result
132 } finally {
133 inputFileMutexReleaser()
134 }
135}
136
137// Merge an image with an audio file to create a video
138export async function mergeAudioVideofile (options: {
139 video: MVideoFullLight
140 resolution: VideoResolution
141 fps: number
142 job: Job
143}) {
144 const { video, resolution, fps, job } = options
145
146 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
147 const newExtname = '.mp4'
148
149 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
150
151 try {
152 await video.reload()
153
154 const inputVideoFile = video.getMinQualityFile()
155
156 const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video)
157
158 const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => {
159 const videoOutputPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
160
161 // If the user updates the video preview during transcoding
162 const previewPath = video.getPreview().getPath()
163 const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath))
164 await copyFile(previewPath, tmpPreviewPath)
165
166 const transcodeOptions = {
167 type: 'merge-audio' as 'merge-audio',
168
169 inputPath: tmpPreviewPath,
170 outputPath: videoOutputPath,
171
172 inputFileMutexReleaser,
173
174 audioPath: audioInputPath,
175 resolution,
176 fps
177 }
178
179 try {
180 await buildFFmpegVOD(job).transcode(transcodeOptions)
181
182 await remove(audioInputPath)
183 await remove(tmpPreviewPath)
184 } catch (err) {
185 await remove(tmpPreviewPath)
186 throw err
187 }
188
189 // Important to do this before getVideoFilename() to take in account the new file extension
190 inputVideoFile.extname = newExtname
191 inputVideoFile.resolution = resolution
192 inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname)
193
194 // ffmpeg generated a new video file, so update the video duration
195 // See https://trac.ffmpeg.org/ticket/5456
196 video.duration = await getVideoStreamDuration(videoOutputPath)
197 await video.save()
198
199 return onWebTorrentVideoFileTranscoding({
200 video,
201 videoFile: inputVideoFile,
202 videoOutputPath
203 })
204 })
205
206 return result
207 } finally {
208 inputFileMutexReleaser()
209 }
210}
211
212export async function onWebTorrentVideoFileTranscoding (options: {
213 video: MVideoFullLight
214 videoFile: MVideoFile
215 videoOutputPath: string
216}) {
217 const { video, videoFile, videoOutputPath } = options
218
219 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
220
221 try {
222 await video.reload()
223
224 const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, videoFile)
225
226 const stats = await stat(videoOutputPath)
227
228 const probe = await ffprobePromise(videoOutputPath)
229 const fps = await getVideoStreamFPS(videoOutputPath, probe)
230 const metadata = await buildFileMetadata(videoOutputPath, probe)
231
232 await move(videoOutputPath, outputPath, { overwrite: true })
233
234 videoFile.size = stats.size
235 videoFile.fps = fps
236 videoFile.metadata = metadata
237
238 await createTorrentAndSetInfoHash(video, videoFile)
239
240 const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution })
241 if (oldFile) await video.removeWebTorrentFile(oldFile)
242
243 await VideoFileModel.customUpsert(videoFile, 'video', undefined)
244 video.VideoFiles = await video.$get('VideoFiles')
245
246 return { video, videoFile }
247 } finally {
248 mutexReleaser()
249 }
250}
251
252// ---------------------------------------------------------------------------
253
254function buildOriginalFileResolution (inputResolution: number) {
255 if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) {
256 return toEven(inputResolution)
257 }
258
259 const resolutions = computeResolutionsToTranscode({
260 input: inputResolution,
261 type: 'vod',
262 includeInput: false,
263 strictLower: false,
264 // We don't really care about the audio resolution in this context
265 hasAudio: true
266 })
267
268 if (resolutions.length === 0) {
269 return toEven(inputResolution)
270 }
271
272 return Math.max(...resolutions)
273}
diff --git a/server/lib/uploadx.ts b/server/lib/uploadx.ts
index 58040cb6d..c7e0eb414 100644
--- a/server/lib/uploadx.ts
+++ b/server/lib/uploadx.ts
@@ -3,6 +3,7 @@ import { buildLogger } from '@server/helpers/logger'
3import { getResumableUploadPath } from '@server/helpers/upload' 3import { getResumableUploadPath } from '@server/helpers/upload'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
5import { LogLevel, Uploadx } from '@uploadx/core' 5import { LogLevel, Uploadx } from '@uploadx/core'
6import { extname } from 'path'
6 7
7const logger = buildLogger('uploadx') 8const logger = buildLogger('uploadx')
8 9
@@ -26,7 +27,9 @@ const uploadx = new Uploadx({
26 if (!res.locals.oauth) return undefined 27 if (!res.locals.oauth) return undefined
27 28
28 return res.locals.oauth.token.user.id + '' 29 return res.locals.oauth.token.user.id + ''
29 } 30 },
31
32 filename: file => `${file.userId}-${file.id}${extname(file.metadata.filename)}`
30}) 33})
31 34
32export { 35export {
diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts
index fd5837a3a..cb1ea834c 100644
--- a/server/lib/video-blacklist.ts
+++ b/server/lib/video-blacklist.ts
@@ -81,7 +81,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video
81 } 81 }
82 82
83 if (videoInstance.isLive) { 83 if (videoInstance.isLive) {
84 LiveManager.Instance.stopSessionOf(videoInstance.id, LiveVideoError.BLACKLISTED) 84 LiveManager.Instance.stopSessionOf(videoInstance.uuid, LiveVideoError.BLACKLISTED)
85 } 85 }
86 86
87 Notifier.Instance.notifyOnVideoBlacklist(blacklist) 87 Notifier.Instance.notifyOnVideoBlacklist(blacklist)
diff --git a/server/lib/video-file.ts b/server/lib/video-file.ts
index 2ab7190f1..8fcc3c253 100644
--- a/server/lib/video-file.ts
+++ b/server/lib/video-file.ts
@@ -1,6 +1,44 @@
1import { FfprobeData } from 'fluent-ffmpeg'
1import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { VideoFileModel } from '@server/models/video/video-file'
2import { MVideoWithAllFiles } from '@server/types/models' 4import { MVideoWithAllFiles } from '@server/types/models'
5import { getLowercaseExtension } from '@shared/core-utils'
6import { getFileSize } from '@shared/extra-utils'
7import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg'
8import { VideoFileMetadata, VideoResolution } from '@shared/models'
3import { lTags } from './object-storage/shared' 9import { lTags } from './object-storage/shared'
10import { generateHLSVideoFilename, generateWebTorrentVideoFilename } from './paths'
11
12async function buildNewFile (options: {
13 path: string
14 mode: 'web-video' | 'hls'
15}) {
16 const { path, mode } = options
17
18 const probe = await ffprobePromise(path)
19 const size = await getFileSize(path)
20
21 const videoFile = new VideoFileModel({
22 extname: getLowercaseExtension(path),
23 size,
24 metadata: await buildFileMetadata(path, probe)
25 })
26
27 if (await isAudioFile(path, probe)) {
28 videoFile.resolution = VideoResolution.H_NOVIDEO
29 } else {
30 videoFile.fps = await getVideoStreamFPS(path, probe)
31 videoFile.resolution = (await getVideoStreamDimensionsInfo(path, probe)).resolution
32 }
33
34 videoFile.filename = mode === 'web-video'
35 ? generateWebTorrentVideoFilename(videoFile.resolution, videoFile.extname)
36 : generateHLSVideoFilename(videoFile.resolution)
37
38 return videoFile
39}
40
41// ---------------------------------------------------------------------------
4 42
5async function removeHLSPlaylist (video: MVideoWithAllFiles) { 43async function removeHLSPlaylist (video: MVideoWithAllFiles) {
6 const hls = video.getHLSPlaylist() 44 const hls = video.getHLSPlaylist()
@@ -61,9 +99,23 @@ async function removeWebTorrentFile (video: MVideoWithAllFiles, fileToDeleteId:
61 return video 99 return video
62} 100}
63 101
102// ---------------------------------------------------------------------------
103
104async function buildFileMetadata (path: string, existingProbe?: FfprobeData) {
105 const metadata = existingProbe || await ffprobePromise(path)
106
107 return new VideoFileMetadata(metadata)
108}
109
110// ---------------------------------------------------------------------------
111
64export { 112export {
113 buildNewFile,
114
65 removeHLSPlaylist, 115 removeHLSPlaylist,
66 removeHLSFile, 116 removeHLSFile,
67 removeAllWebTorrentFiles, 117 removeAllWebTorrentFiles,
68 removeWebTorrentFile 118 removeWebTorrentFile,
119
120 buildFileMetadata
69} 121}
diff --git a/server/lib/video-studio.ts b/server/lib/video-studio.ts
index cdacd35f2..b392bdb00 100644
--- a/server/lib/video-studio.ts
+++ b/server/lib/video-studio.ts
@@ -1,5 +1,5 @@
1import { MVideoFullLight } from '@server/types/models' 1import { MVideoFullLight } from '@server/types/models'
2import { getVideoStreamDuration } from '@shared/extra-utils' 2import { getVideoStreamDuration } from '@shared/ffmpeg'
3import { VideoStudioTask } from '@shared/models' 3import { VideoStudioTask } from '@shared/models'
4 4
5function buildTaskFileFieldname (indice: number, fieldName = 'file') { 5function buildTaskFileFieldname (indice: number, fieldName = 'file') {
diff --git a/server/lib/video.ts b/server/lib/video.ts
index aacc41a7a..588dc553f 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -2,14 +2,14 @@ import { UploadFiles } from 'express'
2import memoizee from 'memoizee' 2import memoizee from 'memoizee'
3import { Transaction } from 'sequelize/types' 3import { Transaction } from 'sequelize/types'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
5import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' 5import { MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants'
6import { TagModel } from '@server/models/video/tag' 6import { TagModel } from '@server/models/video/tag'
7import { VideoModel } from '@server/models/video/video' 7import { VideoModel } from '@server/models/video/video'
8import { VideoJobInfoModel } from '@server/models/video/video-job-info' 8import { VideoJobInfoModel } from '@server/models/video/video-job-info'
9import { FilteredModelAttributes } from '@server/types' 9import { FilteredModelAttributes } from '@server/types'
10import { MThumbnail, MUserId, MVideoFile, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' 10import { MThumbnail, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
11import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 11import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState } from '@shared/models'
12import { CreateJobArgument, CreateJobOptions, JobQueue } from './job-queue/job-queue' 12import { CreateJobArgument, JobQueue } from './job-queue/job-queue'
13import { updateVideoMiniatureFromExisting } from './thumbnail' 13import { updateVideoMiniatureFromExisting } from './thumbnail'
14import { moveFilesIfPrivacyChanged } from './video-privacy' 14import { moveFilesIfPrivacyChanged } from './video-privacy'
15 15
@@ -87,58 +87,6 @@ async function setVideoTags (options: {
87 87
88// --------------------------------------------------------------------------- 88// ---------------------------------------------------------------------------
89 89
90async function buildOptimizeOrMergeAudioJob (options: {
91 video: MVideoUUID
92 videoFile: MVideoFile
93 user: MUserId
94 isNewVideo?: boolean // Default true
95}) {
96 const { video, videoFile, user, isNewVideo } = options
97
98 let payload: VideoTranscodingPayload
99
100 if (videoFile.isAudio()) {
101 payload = {
102 type: 'merge-audio-to-webtorrent',
103 resolution: DEFAULT_AUDIO_RESOLUTION,
104 videoUUID: video.uuid,
105 createHLSIfNeeded: true,
106 isNewVideo
107 }
108 } else {
109 payload = {
110 type: 'optimize-to-webtorrent',
111 videoUUID: video.uuid,
112 isNewVideo
113 }
114 }
115
116 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
117
118 return {
119 type: 'video-transcoding' as 'video-transcoding',
120 priority: await getTranscodingJobPriority(user),
121 payload
122 }
123}
124
125async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
126 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
127
128 return { type: 'video-transcoding' as 'video-transcoding', payload, ...options }
129}
130
131async function getTranscodingJobPriority (user: MUserId) {
132 const now = new Date()
133 const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7)
134
135 const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek)
136
137 return JOB_PRIORITY.TRANSCODING + videoUploadedByUser
138}
139
140// ---------------------------------------------------------------------------
141
142async function buildMoveToObjectStorageJob (options: { 90async function buildMoveToObjectStorageJob (options: {
143 video: MVideoUUID 91 video: MVideoUUID
144 previousVideoState: VideoState 92 previousVideoState: VideoState
@@ -235,10 +183,7 @@ export {
235 buildLocalVideoFromReq, 183 buildLocalVideoFromReq,
236 buildVideoThumbnailsFromReq, 184 buildVideoThumbnailsFromReq,
237 setVideoTags, 185 setVideoTags,
238 buildOptimizeOrMergeAudioJob,
239 buildTranscodingJob,
240 buildMoveToObjectStorageJob, 186 buildMoveToObjectStorageJob,
241 getTranscodingJobPriority,
242 addVideoJobsAfterUpdate, 187 addVideoJobsAfterUpdate,
243 getCachedVideoDuration 188 getCachedVideoDuration
244} 189}