diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib')
54 files changed, 2902 insertions, 1014 deletions
diff --git a/server/lib/hls.ts b/server/lib/hls.ts index 053b5d326..fc1d7e1b0 100644 --- a/server/lib/hls.ts +++ b/server/lib/hls.ts | |||
@@ -3,10 +3,11 @@ import { flatten } from 'lodash' | |||
3 | import PQueue from 'p-queue' | 3 | import PQueue from 'p-queue' |
4 | import { basename, dirname, join } from 'path' | 4 | import { basename, dirname, join } from 'path' |
5 | import { MStreamingPlaylist, MStreamingPlaylistFilesVideo, MVideo } from '@server/types/models' | 5 | import { MStreamingPlaylist, MStreamingPlaylistFilesVideo, MVideo } from '@server/types/models' |
6 | import { uniqify } from '@shared/core-utils' | 6 | import { uniqify, uuidRegex } from '@shared/core-utils' |
7 | import { sha256 } from '@shared/extra-utils' | 7 | import { sha256 } from '@shared/extra-utils' |
8 | import { getVideoStreamDimensionsInfo } from '@shared/ffmpeg' | ||
8 | import { VideoStorage } from '@shared/models' | 9 | import { VideoStorage } from '@shared/models' |
9 | import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamDimensionsInfo } from '../helpers/ffmpeg' | 10 | import { getAudioStreamCodec, getVideoStreamCodec } from '../helpers/ffmpeg' |
10 | import { logger } from '../helpers/logger' | 11 | import { logger } from '../helpers/logger' |
11 | import { doRequest, doRequestAndSaveToFile } from '../helpers/requests' | 12 | import { doRequest, doRequestAndSaveToFile } from '../helpers/requests' |
12 | import { generateRandomString } from '../helpers/utils' | 13 | import { generateRandomString } from '../helpers/utils' |
@@ -234,6 +235,16 @@ function downloadPlaylistSegments (playlistUrl: string, destinationDir: string, | |||
234 | 235 | ||
235 | // --------------------------------------------------------------------------- | 236 | // --------------------------------------------------------------------------- |
236 | 237 | ||
238 | async function renameVideoFileInPlaylist (playlistPath: string, newVideoFilename: string) { | ||
239 | const content = await readFile(playlistPath, 'utf8') | ||
240 | |||
241 | const newContent = content.replace(new RegExp(`${uuidRegex}-\\d+-fragmented.mp4`, 'g'), newVideoFilename) | ||
242 | |||
243 | await writeFile(playlistPath, newContent, 'utf8') | ||
244 | } | ||
245 | |||
246 | // --------------------------------------------------------------------------- | ||
247 | |||
237 | function injectQueryToPlaylistUrls (content: string, queryString: string) { | 248 | function injectQueryToPlaylistUrls (content: string, queryString: string) { |
238 | return content.replace(/\.(m3u8|ts|mp4)/gm, '.$1?' + queryString) | 249 | return content.replace(/\.(m3u8|ts|mp4)/gm, '.$1?' + queryString) |
239 | } | 250 | } |
@@ -247,7 +258,8 @@ export { | |||
247 | downloadPlaylistSegments, | 258 | downloadPlaylistSegments, |
248 | updateStreamingPlaylistsInfohashesIfNeeded, | 259 | updateStreamingPlaylistsInfohashesIfNeeded, |
249 | updatePlaylistAfterFileChange, | 260 | updatePlaylistAfterFileChange, |
250 | injectQueryToPlaylistUrls | 261 | injectQueryToPlaylistUrls, |
262 | renameVideoFileInPlaylist | ||
251 | } | 263 | } |
252 | 264 | ||
253 | // --------------------------------------------------------------------------- | 265 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts new file mode 100644 index 000000000..8b4a877d7 --- /dev/null +++ b/server/lib/job-queue/handlers/transcoding-job-builder.ts | |||
@@ -0,0 +1,47 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
3 | import { UserModel } from '@server/models/user/user' | ||
4 | import { VideoModel } from '@server/models/video/video' | ||
5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
6 | import { pick } from '@shared/core-utils' | ||
7 | import { TranscodingJobBuilderPayload } from '@shared/models' | ||
8 | import { logger } from '../../../helpers/logger' | ||
9 | import { JobQueue } from '../job-queue' | ||
10 | |||
11 | async function processTranscodingJobBuilder (job: Job) { | ||
12 | const payload = job.data as TranscodingJobBuilderPayload | ||
13 | |||
14 | logger.info('Processing transcoding job builder in job %s.', job.id) | ||
15 | |||
16 | if (payload.optimizeJob) { | ||
17 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
18 | const user = await UserModel.loadByVideoId(video.id) | ||
19 | const videoFile = video.getMaxQualityFile() | ||
20 | |||
21 | await createOptimizeOrMergeAudioJobs({ | ||
22 | ...pick(payload.optimizeJob, [ 'isNewVideo' ]), | ||
23 | |||
24 | video, | ||
25 | videoFile, | ||
26 | user | ||
27 | }) | ||
28 | } | ||
29 | |||
30 | for (const job of (payload.jobs || [])) { | ||
31 | await JobQueue.Instance.createJob(job) | ||
32 | |||
33 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | ||
34 | } | ||
35 | |||
36 | for (const sequentialJobs of (payload.sequentialJobs || [])) { | ||
37 | await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs) | ||
38 | |||
39 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.length) | ||
40 | } | ||
41 | } | ||
42 | |||
43 | // --------------------------------------------------------------------------- | ||
44 | |||
45 | export { | ||
46 | processTranscodingJobBuilder | ||
47 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index d950f6407..9a4550e4d 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -10,8 +10,8 @@ import { VideoModel } from '@server/models/video/video' | |||
10 | import { VideoFileModel } from '@server/models/video/video-file' | 10 | import { VideoFileModel } from '@server/models/video/video-file' |
11 | import { MVideoFullLight } from '@server/types/models' | 11 | import { MVideoFullLight } from '@server/types/models' |
12 | import { getLowercaseExtension } from '@shared/core-utils' | 12 | import { getLowercaseExtension } from '@shared/core-utils' |
13 | import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' | ||
13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' | 14 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' |
14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' | ||
15 | import { logger } from '../../../helpers/logger' | 15 | import { logger } from '../../../helpers/logger' |
16 | import { JobQueue } from '../job-queue' | 16 | import { JobQueue } from '../job-queue' |
17 | 17 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4d361c7b9..2a063282c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -7,15 +7,16 @@ import { isPostImportVideoAccepted } from '@server/lib/moderation' | |||
7 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 7 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
8 | import { Hooks } from '@server/lib/plugins/hooks' | 8 | import { Hooks } from '@server/lib/plugins/hooks' |
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | 9 | import { ServerConfigManager } from '@server/lib/server-config-manager' |
10 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 11 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' | 12 | import { buildMoveToObjectStorageJob } from '@server/lib/video' |
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 13 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 14 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 15 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
15 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' | 16 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' |
16 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 17 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' |
17 | import { getLowercaseExtension } from '@shared/core-utils' | 18 | import { getLowercaseExtension } from '@shared/core-utils' |
18 | import { isAudioFile } from '@shared/extra-utils' | 19 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' |
19 | import { | 20 | import { |
20 | ThumbnailType, | 21 | ThumbnailType, |
21 | VideoImportPayload, | 22 | VideoImportPayload, |
@@ -28,7 +29,6 @@ import { | |||
28 | VideoResolution, | 29 | VideoResolution, |
29 | VideoState | 30 | VideoState |
30 | } from '@shared/models' | 31 | } from '@shared/models' |
31 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '../../../helpers/ffmpeg' | ||
32 | import { logger } from '../../../helpers/logger' | 32 | import { logger } from '../../../helpers/logger' |
33 | import { getSecureTorrentName } from '../../../helpers/utils' | 33 | import { getSecureTorrentName } from '../../../helpers/utils' |
34 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | 34 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' |
@@ -137,7 +137,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
137 | 137 | ||
138 | const { resolution } = await isAudioFile(tempVideoPath, probe) | 138 | const { resolution } = await isAudioFile(tempVideoPath, probe) |
139 | ? { resolution: VideoResolution.H_NOVIDEO } | 139 | ? { resolution: VideoResolution.H_NOVIDEO } |
140 | : await getVideoStreamDimensionsInfo(tempVideoPath) | 140 | : await getVideoStreamDimensionsInfo(tempVideoPath, probe) |
141 | 141 | ||
142 | const fps = await getVideoStreamFPS(tempVideoPath, probe) | 142 | const fps = await getVideoStreamFPS(tempVideoPath, probe) |
143 | const duration = await getVideoStreamDuration(tempVideoPath, probe) | 143 | const duration = await getVideoStreamDuration(tempVideoPath, probe) |
@@ -313,9 +313,7 @@ async function afterImportSuccess (options: { | |||
313 | } | 313 | } |
314 | 314 | ||
315 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? | 315 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? |
316 | await JobQueue.Instance.createJob( | 316 | await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user }) |
317 | await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) | ||
318 | ) | ||
319 | } | 317 | } |
320 | } | 318 | } |
321 | 319 | ||
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 2f3a971bd..1bf43f592 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -1,25 +1,25 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { readdir, remove } from 'fs-extra' | 2 | import { readdir, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' | ||
5 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' | 4 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' |
6 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
7 | import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' | 6 | import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' |
8 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' | 7 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' |
9 | import { generateVideoMiniature } from '@server/lib/thumbnail' | 8 | import { generateVideoMiniature } from '@server/lib/thumbnail' |
10 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' | 9 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding' |
10 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
11 | import { moveToNextState } from '@server/lib/video-state' | 11 | import { moveToNextState } from '@server/lib/video-state' |
12 | import { VideoModel } from '@server/models/video/video' | 12 | import { VideoModel } from '@server/models/video/video' |
13 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' | 13 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' |
14 | import { VideoFileModel } from '@server/models/video/video-file' | 14 | import { VideoFileModel } from '@server/models/video/video-file' |
15 | import { VideoLiveModel } from '@server/models/video/video-live' | 15 | import { VideoLiveModel } from '@server/models/video/video-live' |
16 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
16 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | 17 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' |
17 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 18 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
18 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' | 19 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' |
20 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' | ||
19 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' | 21 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' |
20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 22 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
21 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
22 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
23 | 23 | ||
24 | const lTags = loggerTagsFactory('live', 'job') | 24 | const lTags = loggerTagsFactory('live', 'job') |
25 | 25 | ||
@@ -224,6 +224,7 @@ async function assignReplayFilesToVideo (options: { | |||
224 | const probe = await ffprobePromise(concatenatedTsFilePath) | 224 | const probe = await ffprobePromise(concatenatedTsFilePath) |
225 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) | 225 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) |
226 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) | 226 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) |
227 | const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe) | ||
227 | 228 | ||
228 | try { | 229 | try { |
229 | await generateHlsPlaylistResolutionFromTS({ | 230 | await generateHlsPlaylistResolutionFromTS({ |
@@ -231,6 +232,7 @@ async function assignReplayFilesToVideo (options: { | |||
231 | inputFileMutexReleaser, | 232 | inputFileMutexReleaser, |
232 | concatenatedTsFilePath, | 233 | concatenatedTsFilePath, |
233 | resolution, | 234 | resolution, |
235 | fps, | ||
234 | isAAC: audioStream?.codec_name === 'aac' | 236 | isAAC: audioStream?.codec_name === 'aac' |
235 | }) | 237 | }) |
236 | } catch (err) { | 238 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 3e208d83d..991d11ef1 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts | |||
@@ -1,15 +1,16 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { move, remove } from 'fs-extra' | 2 | import { move, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' | 4 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' |
5 | import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' | 5 | import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' |
6 | import { CONFIG } from '@server/initializers/config' | 6 | import { CONFIG } from '@server/initializers/config' |
7 | import { VIDEO_FILTERS } from '@server/initializers/constants' | ||
7 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 8 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 9 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
10 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | 11 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 12 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' | 13 | import { buildFileMetadata, removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' |
12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' | ||
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | 14 | import { VideoPathManager } from '@server/lib/video-path-manager' |
14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' | 15 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' |
15 | import { UserModel } from '@server/models/user/user' | 16 | import { UserModel } from '@server/models/user/user' |
@@ -17,15 +18,8 @@ import { VideoModel } from '@server/models/video/video' | |||
17 | import { VideoFileModel } from '@server/models/video/video-file' | 18 | import { VideoFileModel } from '@server/models/video/video-file' |
18 | import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' | 19 | import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' |
19 | import { getLowercaseExtension, pick } from '@shared/core-utils' | 20 | import { getLowercaseExtension, pick } from '@shared/core-utils' |
20 | import { | 21 | import { buildUUID, getFileSize } from '@shared/extra-utils' |
21 | buildFileMetadata, | 22 | import { FFmpegEdition, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg' |
22 | buildUUID, | ||
23 | ffprobePromise, | ||
24 | getFileSize, | ||
25 | getVideoStreamDimensionsInfo, | ||
26 | getVideoStreamDuration, | ||
27 | getVideoStreamFPS | ||
28 | } from '@shared/extra-utils' | ||
29 | import { | 23 | import { |
30 | VideoStudioEditionPayload, | 24 | VideoStudioEditionPayload, |
31 | VideoStudioTask, | 25 | VideoStudioTask, |
@@ -36,7 +30,6 @@ import { | |||
36 | VideoStudioTaskWatermarkPayload | 30 | VideoStudioTaskWatermarkPayload |
37 | } from '@shared/models' | 31 | } from '@shared/models' |
38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 32 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
39 | import { JobQueue } from '../job-queue' | ||
40 | 33 | ||
41 | const lTagsBase = loggerTagsFactory('video-edition') | 34 | const lTagsBase = loggerTagsFactory('video-edition') |
42 | 35 | ||
@@ -102,9 +95,7 @@ async function processVideoStudioEdition (job: Job) { | |||
102 | 95 | ||
103 | const user = await UserModel.loadByVideoId(video.id) | 96 | const user = await UserModel.loadByVideoId(video.id) |
104 | 97 | ||
105 | await JobQueue.Instance.createJob( | 98 | await createOptimizeOrMergeAudioJobs({ video, videoFile: newFile, isNewVideo: false, user }) |
106 | await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | ||
107 | ) | ||
108 | } | 99 | } |
109 | 100 | ||
110 | // --------------------------------------------------------------------------- | 101 | // --------------------------------------------------------------------------- |
@@ -131,9 +122,9 @@ const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessor | |||
131 | } | 122 | } |
132 | 123 | ||
133 | async function processTask (options: TaskProcessorOptions) { | 124 | async function processTask (options: TaskProcessorOptions) { |
134 | const { video, task } = options | 125 | const { video, task, lTags } = options |
135 | 126 | ||
136 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) | 127 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags }) |
137 | 128 | ||
138 | const processor = taskProcessors[options.task.name] | 129 | const processor = taskProcessors[options.task.name] |
139 | if (!process) throw new Error('Unknown task ' + task.name) | 130 | if (!process) throw new Error('Unknown task ' + task.name) |
@@ -142,48 +133,53 @@ async function processTask (options: TaskProcessorOptions) { | |||
142 | } | 133 | } |
143 | 134 | ||
144 | function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { | 135 | function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { |
145 | const { task } = options | 136 | const { task, lTags } = options |
137 | |||
138 | logger.debug('Will add intro/outro to the video.', { options, ...lTags }) | ||
146 | 139 | ||
147 | return addIntroOutro({ | 140 | return buildFFmpegEdition().addIntroOutro({ |
148 | ...pick(options, [ 'inputPath', 'outputPath' ]), | 141 | ...pick(options, [ 'inputPath', 'outputPath' ]), |
149 | 142 | ||
150 | introOutroPath: task.options.file, | 143 | introOutroPath: task.options.file, |
151 | type: task.name === 'add-intro' | 144 | type: task.name === 'add-intro' |
152 | ? 'intro' | 145 | ? 'intro' |
153 | : 'outro', | 146 | : 'outro' |
154 | |||
155 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
156 | profile: CONFIG.TRANSCODING.PROFILE | ||
157 | }) | 147 | }) |
158 | } | 148 | } |
159 | 149 | ||
160 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { | 150 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { |
161 | const { task } = options | 151 | const { task, lTags } = options |
162 | 152 | ||
163 | return cutVideo({ | 153 | logger.debug('Will cut the video.', { options, ...lTags }) |
154 | |||
155 | return buildFFmpegEdition().cutVideo({ | ||
164 | ...pick(options, [ 'inputPath', 'outputPath' ]), | 156 | ...pick(options, [ 'inputPath', 'outputPath' ]), |
165 | 157 | ||
166 | start: task.options.start, | 158 | start: task.options.start, |
167 | end: task.options.end, | 159 | end: task.options.end |
168 | |||
169 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
170 | profile: CONFIG.TRANSCODING.PROFILE | ||
171 | }) | 160 | }) |
172 | } | 161 | } |
173 | 162 | ||
174 | function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { | 163 | function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { |
175 | const { task } = options | 164 | const { task, lTags } = options |
165 | |||
166 | logger.debug('Will add watermark to the video.', { options, ...lTags }) | ||
176 | 167 | ||
177 | return addWatermark({ | 168 | return buildFFmpegEdition().addWatermark({ |
178 | ...pick(options, [ 'inputPath', 'outputPath' ]), | 169 | ...pick(options, [ 'inputPath', 'outputPath' ]), |
179 | 170 | ||
180 | watermarkPath: task.options.file, | 171 | watermarkPath: task.options.file, |
181 | 172 | ||
182 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | 173 | videoFilters: { |
183 | profile: CONFIG.TRANSCODING.PROFILE | 174 | watermarkSizeRatio: VIDEO_FILTERS.WATERMARK.SIZE_RATIO, |
175 | horitonzalMarginRatio: VIDEO_FILTERS.WATERMARK.HORIZONTAL_MARGIN_RATIO, | ||
176 | verticalMarginRatio: VIDEO_FILTERS.WATERMARK.VERTICAL_MARGIN_RATIO | ||
177 | } | ||
184 | }) | 178 | }) |
185 | } | 179 | } |
186 | 180 | ||
181 | // --------------------------------------------------------------------------- | ||
182 | |||
187 | async function buildNewFile (video: MVideoId, path: string) { | 183 | async function buildNewFile (video: MVideoId, path: string) { |
188 | const videoFile = new VideoFileModel({ | 184 | const videoFile = new VideoFileModel({ |
189 | extname: getLowercaseExtension(path), | 185 | extname: getLowercaseExtension(path), |
@@ -223,3 +219,7 @@ async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStud | |||
223 | throw new Error('Quota exceeded for this user to edit the video') | 219 | throw new Error('Quota exceeded for this user to edit the video') |
224 | } | 220 | } |
225 | } | 221 | } |
222 | |||
223 | function buildFFmpegEdition () { | ||
224 | return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) | ||
225 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 3e6d23363..17b717275 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding' |
4 | import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding' |
5 | import { removeAllWebTorrentFiles } from '@server/lib/video-file' | ||
5 | import { VideoPathManager } from '@server/lib/video-path-manager' | 6 | import { VideoPathManager } from '@server/lib/video-path-manager' |
6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 7 | import { moveToFailedTranscodingState } from '@server/lib/video-state' |
7 | import { UserModel } from '@server/models/user/user' | 8 | import { UserModel } from '@server/models/user/user' |
8 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
9 | import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' | 10 | import { MUser, MUserId, MVideoFullLight } from '@server/types/models' |
10 | import { pick } from '@shared/core-utils' | ||
11 | import { | 11 | import { |
12 | HLSTranscodingPayload, | 12 | HLSTranscodingPayload, |
13 | MergeAudioTranscodingPayload, | 13 | MergeAudioTranscodingPayload, |
@@ -15,18 +15,8 @@ import { | |||
15 | OptimizeTranscodingPayload, | 15 | OptimizeTranscodingPayload, |
16 | VideoTranscodingPayload | 16 | VideoTranscodingPayload |
17 | } from '@shared/models' | 17 | } from '@shared/models' |
18 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
19 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg' | ||
20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
21 | import { CONFIG } from '../../../initializers/config' | ||
22 | import { VideoModel } from '../../../models/video/video' | 19 | import { VideoModel } from '../../../models/video/video' |
23 | import { | ||
24 | generateHlsPlaylistResolution, | ||
25 | mergeAudioVideofile, | ||
26 | optimizeOriginalVideofile, | ||
27 | transcodeNewWebTorrentResolution | ||
28 | } from '../../transcoding/transcoding' | ||
29 | import { JobQueue } from '../job-queue' | ||
30 | 20 | ||
31 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 21 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
32 | 22 | ||
@@ -84,260 +74,72 @@ export { | |||
84 | // Job handlers | 74 | // Job handlers |
85 | // --------------------------------------------------------------------------- | 75 | // --------------------------------------------------------------------------- |
86 | 76 | ||
87 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { | ||
88 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) | ||
89 | |||
90 | const videoFileInput = payload.copyCodecs | ||
91 | ? video.getWebTorrentFile(payload.resolution) | ||
92 | : video.getMaxQualityFile() | ||
93 | |||
94 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | ||
95 | |||
96 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
97 | |||
98 | try { | ||
99 | await videoFileInput.getVideo().reload() | ||
100 | |||
101 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { | ||
102 | return generateHlsPlaylistResolution({ | ||
103 | video, | ||
104 | videoInputPath, | ||
105 | inputFileMutexReleaser, | ||
106 | resolution: payload.resolution, | ||
107 | copyCodecs: payload.copyCodecs, | ||
108 | job | ||
109 | }) | ||
110 | }) | ||
111 | } finally { | ||
112 | inputFileMutexReleaser() | ||
113 | } | ||
114 | |||
115 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | ||
116 | |||
117 | await onHlsPlaylistGeneration(video, user, payload) | ||
118 | } | ||
119 | |||
120 | async function handleNewWebTorrentResolutionJob ( | ||
121 | job: Job, | ||
122 | payload: NewWebTorrentResolutionTranscodingPayload, | ||
123 | video: MVideoFullLight, | ||
124 | user: MUserId | ||
125 | ) { | ||
126 | logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) | ||
127 | |||
128 | await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job }) | ||
129 | |||
130 | logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | ||
131 | |||
132 | await onNewWebTorrentFileResolution(video, user, payload) | ||
133 | } | ||
134 | |||
135 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 77 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
136 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) | 78 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) |
137 | 79 | ||
138 | await mergeAudioVideofile({ video, resolution: payload.resolution, job }) | 80 | await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job }) |
139 | 81 | ||
140 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 82 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
141 | 83 | ||
142 | await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) | 84 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
143 | } | 85 | } |
144 | 86 | ||
145 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 87 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
146 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) | 88 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) |
147 | 89 | ||
148 | const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) | 90 | await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job }) |
149 | 91 | ||
150 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 92 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
151 | 93 | ||
152 | await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) | 94 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
153 | } | 95 | } |
154 | 96 | ||
155 | // --------------------------------------------------------------------------- | 97 | async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) { |
156 | 98 | logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) | |
157 | async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { | ||
158 | if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { | ||
159 | // Remove webtorrent files if not enabled | ||
160 | for (const file of video.VideoFiles) { | ||
161 | await video.removeWebTorrentFile(file) | ||
162 | await file.destroy() | ||
163 | } | ||
164 | |||
165 | video.VideoFiles = [] | ||
166 | |||
167 | // Create HLS new resolution jobs | ||
168 | await createLowerResolutionsJobs({ | ||
169 | video, | ||
170 | user, | ||
171 | videoFileResolution: payload.resolution, | ||
172 | hasAudio: payload.hasAudio, | ||
173 | isNewVideo: payload.isNewVideo ?? true, | ||
174 | type: 'hls' | ||
175 | }) | ||
176 | } | ||
177 | |||
178 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
179 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) | ||
180 | } | ||
181 | 99 | ||
182 | async function onVideoFirstWebTorrentTranscoding ( | 100 | await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job }) |
183 | videoArg: MVideoWithFile, | ||
184 | payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, | ||
185 | transcodeType: TranscodeVODOptionsType, | ||
186 | user: MUserId | ||
187 | ) { | ||
188 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) | ||
189 | 101 | ||
190 | try { | 102 | logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
191 | // Maybe the video changed in database, refresh it | ||
192 | const videoDatabase = await VideoModel.loadFull(videoArg.uuid) | ||
193 | // Video does not exist anymore | ||
194 | if (!videoDatabase) return undefined | ||
195 | |||
196 | const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile() | ||
197 | |||
198 | // Generate HLS version of the original file | ||
199 | const originalFileHLSPayload = { | ||
200 | ...payload, | ||
201 | |||
202 | hasAudio: !!audioStream, | ||
203 | resolution: videoDatabase.getMaxQualityFile().resolution, | ||
204 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | ||
205 | copyCodecs: transcodeType !== 'quick-transcode', | ||
206 | isMaxQuality: true | ||
207 | } | ||
208 | const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) | ||
209 | const hasNewResolutions = await createLowerResolutionsJobs({ | ||
210 | video: videoDatabase, | ||
211 | user, | ||
212 | videoFileResolution: resolution, | ||
213 | hasAudio: !!audioStream, | ||
214 | type: 'webtorrent', | ||
215 | isNewVideo: payload.isNewVideo ?? true | ||
216 | }) | ||
217 | |||
218 | await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') | ||
219 | 103 | ||
220 | // Move to next state if there are no other resolutions to generate | 104 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
221 | if (!hasHls && !hasNewResolutions) { | ||
222 | await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) | ||
223 | } | ||
224 | } finally { | ||
225 | mutexReleaser() | ||
226 | } | ||
227 | } | 105 | } |
228 | 106 | ||
229 | async function onNewWebTorrentFileResolution ( | 107 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { |
230 | video: MVideo, | 108 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) |
231 | user: MUserId, | ||
232 | payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload | ||
233 | ) { | ||
234 | if (payload.createHLSIfNeeded) { | ||
235 | await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) | ||
236 | } | ||
237 | |||
238 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
239 | 109 | ||
240 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) | 110 | const videoFileInput = payload.copyCodecs |
241 | } | 111 | ? video.getWebTorrentFile(payload.resolution) |
112 | : video.getMaxQualityFile() | ||
242 | 113 | ||
243 | // --------------------------------------------------------------------------- | 114 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() |
244 | 115 | ||
245 | async function createHlsJobIfEnabled (user: MUserId, payload: { | 116 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) |
246 | videoUUID: string | ||
247 | resolution: number | ||
248 | hasAudio: boolean | ||
249 | copyCodecs: boolean | ||
250 | isMaxQuality: boolean | ||
251 | isNewVideo?: boolean | ||
252 | }) { | ||
253 | if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false | ||
254 | |||
255 | const jobOptions = { | ||
256 | priority: await getTranscodingJobPriority(user) | ||
257 | } | ||
258 | 117 | ||
259 | const hlsTranscodingPayload: HLSTranscodingPayload = { | 118 | try { |
260 | type: 'new-resolution-to-hls', | 119 | await videoFileInput.getVideo().reload() |
261 | autoDeleteWebTorrentIfNeeded: true, | ||
262 | 120 | ||
263 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) | 121 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { |
122 | return generateHlsPlaylistResolution({ | ||
123 | video, | ||
124 | videoInputPath, | ||
125 | inputFileMutexReleaser, | ||
126 | resolution: payload.resolution, | ||
127 | fps: payload.fps, | ||
128 | copyCodecs: payload.copyCodecs, | ||
129 | job | ||
130 | }) | ||
131 | }) | ||
132 | } finally { | ||
133 | inputFileMutexReleaser() | ||
264 | } | 134 | } |
265 | 135 | ||
266 | await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) | 136 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
267 | |||
268 | return true | ||
269 | } | ||
270 | |||
271 | async function createLowerResolutionsJobs (options: { | ||
272 | video: MVideoFullLight | ||
273 | user: MUserId | ||
274 | videoFileResolution: number | ||
275 | hasAudio: boolean | ||
276 | isNewVideo: boolean | ||
277 | type: 'hls' | 'webtorrent' | ||
278 | }) { | ||
279 | const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options | ||
280 | |||
281 | // Create transcoding jobs if there are enabled resolutions | ||
282 | const resolutionsEnabled = await Hooks.wrapObject( | ||
283 | computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), | ||
284 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
285 | options | ||
286 | ) | ||
287 | |||
288 | const resolutionCreated: string[] = [] | ||
289 | |||
290 | for (const resolution of resolutionsEnabled) { | ||
291 | let dataInput: VideoTranscodingPayload | ||
292 | |||
293 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { | ||
294 | // WebTorrent will create subsequent HLS job | ||
295 | dataInput = { | ||
296 | type: 'new-resolution-to-webtorrent', | ||
297 | videoUUID: video.uuid, | ||
298 | resolution, | ||
299 | hasAudio, | ||
300 | createHLSIfNeeded: true, | ||
301 | isNewVideo | ||
302 | } | ||
303 | |||
304 | resolutionCreated.push('webtorrent-' + resolution) | ||
305 | } | ||
306 | |||
307 | if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { | ||
308 | dataInput = { | ||
309 | type: 'new-resolution-to-hls', | ||
310 | videoUUID: video.uuid, | ||
311 | resolution, | ||
312 | hasAudio, | ||
313 | copyCodecs: false, | ||
314 | isMaxQuality: false, | ||
315 | autoDeleteWebTorrentIfNeeded: true, | ||
316 | isNewVideo | ||
317 | } | ||
318 | |||
319 | resolutionCreated.push('hls-' + resolution) | ||
320 | } | ||
321 | |||
322 | if (!dataInput) continue | ||
323 | |||
324 | const jobOptions = { | ||
325 | priority: await getTranscodingJobPriority(user) | ||
326 | } | ||
327 | |||
328 | await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) | ||
329 | } | ||
330 | 137 | ||
331 | if (resolutionCreated.length === 0) { | 138 | if (payload.deleteWebTorrentFiles === true) { |
332 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) | 139 | logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid)) |
333 | 140 | ||
334 | return false | 141 | await removeAllWebTorrentFiles(video) |
335 | } | 142 | } |
336 | 143 | ||
337 | logger.info( | 144 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
338 | 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, | ||
339 | { resolutionCreated, ...lTags(video.uuid) } | ||
340 | ) | ||
341 | |||
342 | return true | ||
343 | } | 145 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index cc6be0bd8..21bf0f226 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -31,6 +31,7 @@ import { | |||
31 | MoveObjectStoragePayload, | 31 | MoveObjectStoragePayload, |
32 | NotifyPayload, | 32 | NotifyPayload, |
33 | RefreshPayload, | 33 | RefreshPayload, |
34 | TranscodingJobBuilderPayload, | ||
34 | VideoChannelImportPayload, | 35 | VideoChannelImportPayload, |
35 | VideoFileImportPayload, | 36 | VideoFileImportPayload, |
36 | VideoImportPayload, | 37 | VideoImportPayload, |
@@ -56,6 +57,7 @@ import { processFederateVideo } from './handlers/federate-video' | |||
56 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 57 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
57 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 58 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
58 | import { processNotify } from './handlers/notify' | 59 | import { processNotify } from './handlers/notify' |
60 | import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' | ||
59 | import { processVideoChannelImport } from './handlers/video-channel-import' | 61 | import { processVideoChannelImport } from './handlers/video-channel-import' |
60 | import { processVideoFileImport } from './handlers/video-file-import' | 62 | import { processVideoFileImport } from './handlers/video-file-import' |
61 | import { processVideoImport } from './handlers/video-import' | 63 | import { processVideoImport } from './handlers/video-import' |
@@ -69,11 +71,12 @@ export type CreateJobArgument = | |||
69 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | 71 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
70 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 72 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
71 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | 73 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | |
72 | { type: 'activitypub-http-cleaner', payload: {} } | | 74 | { type: 'activitypub-cleaner', payload: {} } | |
73 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | 75 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | |
74 | { type: 'video-file-import', payload: VideoFileImportPayload } | | 76 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
75 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | | 77 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | |
76 | { type: 'email', payload: EmailPayload } | | 78 | { type: 'email', payload: EmailPayload } | |
79 | { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | | ||
77 | { type: 'video-import', payload: VideoImportPayload } | | 80 | { type: 'video-import', payload: VideoImportPayload } | |
78 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 81 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
79 | { type: 'videos-views-stats', payload: {} } | | 82 | { type: 'videos-views-stats', payload: {} } | |
@@ -96,28 +99,29 @@ export type CreateJobOptions = { | |||
96 | } | 99 | } |
97 | 100 | ||
98 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | 101 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
99 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, | ||
100 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, | ||
101 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
102 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
103 | 'activitypub-cleaner': processActivityPubCleaner, | 102 | 'activitypub-cleaner': processActivityPubCleaner, |
104 | 'activitypub-follow': processActivityPubFollow, | 103 | 'activitypub-follow': processActivityPubFollow, |
105 | 'video-file-import': processVideoFileImport, | 104 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, |
106 | 'video-transcoding': processVideoTranscoding, | 105 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, |
106 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
107 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
108 | 'activitypub-refresher': refreshAPObject, | ||
109 | 'actor-keys': processActorKeys, | ||
110 | 'after-video-channel-import': processAfterVideoChannelImport, | ||
107 | 'email': processEmail, | 111 | 'email': processEmail, |
112 | 'federate-video': processFederateVideo, | ||
113 | 'transcoding-job-builder': processTranscodingJobBuilder, | ||
114 | 'manage-video-torrent': processManageVideoTorrent, | ||
115 | 'move-to-object-storage': processMoveToObjectStorage, | ||
116 | 'notify': processNotify, | ||
117 | 'video-channel-import': processVideoChannelImport, | ||
118 | 'video-file-import': processVideoFileImport, | ||
108 | 'video-import': processVideoImport, | 119 | 'video-import': processVideoImport, |
109 | 'videos-views-stats': processVideosViewsStats, | ||
110 | 'activitypub-refresher': refreshAPObject, | ||
111 | 'video-live-ending': processVideoLiveEnding, | 120 | 'video-live-ending': processVideoLiveEnding, |
112 | 'actor-keys': processActorKeys, | ||
113 | 'video-redundancy': processVideoRedundancy, | 121 | 'video-redundancy': processVideoRedundancy, |
114 | 'move-to-object-storage': processMoveToObjectStorage, | ||
115 | 'manage-video-torrent': processManageVideoTorrent, | ||
116 | 'video-studio-edition': processVideoStudioEdition, | 122 | 'video-studio-edition': processVideoStudioEdition, |
117 | 'video-channel-import': processVideoChannelImport, | 123 | 'video-transcoding': processVideoTranscoding, |
118 | 'after-video-channel-import': processAfterVideoChannelImport, | 124 | 'videos-views-stats': processVideosViewsStats |
119 | 'notify': processNotify, | ||
120 | 'federate-video': processFederateVideo | ||
121 | } | 125 | } |
122 | 126 | ||
123 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | 127 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
@@ -125,28 +129,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } | |||
125 | } | 129 | } |
126 | 130 | ||
127 | const jobTypes: JobType[] = [ | 131 | const jobTypes: JobType[] = [ |
132 | 'activitypub-cleaner', | ||
128 | 'activitypub-follow', | 133 | 'activitypub-follow', |
129 | 'activitypub-http-broadcast', | ||
130 | 'activitypub-http-broadcast-parallel', | 134 | 'activitypub-http-broadcast-parallel', |
135 | 'activitypub-http-broadcast', | ||
131 | 'activitypub-http-fetcher', | 136 | 'activitypub-http-fetcher', |
132 | 'activitypub-http-unicast', | 137 | 'activitypub-http-unicast', |
133 | 'activitypub-cleaner', | 138 | 'activitypub-refresher', |
139 | 'actor-keys', | ||
140 | 'after-video-channel-import', | ||
134 | 'email', | 141 | 'email', |
135 | 'video-transcoding', | 142 | 'federate-video', |
143 | 'transcoding-job-builder', | ||
144 | 'manage-video-torrent', | ||
145 | 'move-to-object-storage', | ||
146 | 'notify', | ||
147 | 'video-channel-import', | ||
136 | 'video-file-import', | 148 | 'video-file-import', |
137 | 'video-import', | 149 | 'video-import', |
138 | 'videos-views-stats', | ||
139 | 'activitypub-refresher', | ||
140 | 'video-redundancy', | ||
141 | 'actor-keys', | ||
142 | 'video-live-ending', | 150 | 'video-live-ending', |
143 | 'move-to-object-storage', | 151 | 'video-redundancy', |
144 | 'manage-video-torrent', | ||
145 | 'video-studio-edition', | 152 | 'video-studio-edition', |
146 | 'video-channel-import', | 153 | 'video-transcoding', |
147 | 'after-video-channel-import', | 154 | 'videos-views-stats' |
148 | 'notify', | ||
149 | 'federate-video' | ||
150 | ] | 155 | ] |
151 | 156 | ||
152 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | 157 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 05274955d..aa32a9d52 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra' | |||
2 | import { createServer, Server } from 'net' | 2 | import { createServer, Server } from 'net' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | 4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' |
5 | import { | ||
6 | computeResolutionsToTranscode, | ||
7 | ffprobePromise, | ||
8 | getLiveSegmentTime, | ||
9 | getVideoStreamBitrate, | ||
10 | getVideoStreamDimensionsInfo, | ||
11 | getVideoStreamFPS, | ||
12 | hasAudioStream | ||
13 | } from '@server/helpers/ffmpeg' | ||
14 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 5 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
15 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 6 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
16 | import { VIDEO_LIVE } from '@server/initializers/constants' | 7 | import { VIDEO_LIVE } from '@server/initializers/constants' |
8 | import { sequelizeTypescript } from '@server/initializers/database' | ||
17 | import { UserModel } from '@server/models/user/user' | 9 | import { UserModel } from '@server/models/user/user' |
18 | import { VideoModel } from '@server/models/video/video' | 10 | import { VideoModel } from '@server/models/video/video' |
19 | import { VideoLiveModel } from '@server/models/video/video-live' | 11 | import { VideoLiveModel } from '@server/models/video/video-live' |
12 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
20 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | 13 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' |
21 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 14 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
22 | import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' | 15 | import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' |
23 | import { pick, wait } from '@shared/core-utils' | 16 | import { pick, wait } from '@shared/core-utils' |
17 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg' | ||
24 | import { LiveVideoError, VideoState } from '@shared/models' | 18 | import { LiveVideoError, VideoState } from '@shared/models' |
25 | import { federateVideoIfNeeded } from '../activitypub/videos' | 19 | import { federateVideoIfNeeded } from '../activitypub/videos' |
26 | import { JobQueue } from '../job-queue' | 20 | import { JobQueue } from '../job-queue' |
27 | import { getLiveReplayBaseDirectory } from '../paths' | 21 | import { getLiveReplayBaseDirectory } from '../paths' |
28 | import { PeerTubeSocket } from '../peertube-socket' | 22 | import { PeerTubeSocket } from '../peertube-socket' |
29 | import { Hooks } from '../plugins/hooks' | 23 | import { Hooks } from '../plugins/hooks' |
24 | import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions' | ||
30 | import { LiveQuotaStore } from './live-quota-store' | 25 | import { LiveQuotaStore } from './live-quota-store' |
31 | import { cleanupAndDestroyPermanentLive } from './live-utils' | 26 | import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils' |
32 | import { MuxingSession } from './shared' | 27 | import { MuxingSession } from './shared' |
33 | import { sequelizeTypescript } from '@server/initializers/database' | 28 | import { RunnerJobModel } from '@server/models/runner/runner-job' |
34 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
35 | 29 | ||
36 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') | 30 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') |
37 | const context = require('node-media-server/src/node_core_ctx') | 31 | const context = require('node-media-server/src/node_core_ctx') |
@@ -57,7 +51,7 @@ class LiveManager { | |||
57 | private static instance: LiveManager | 51 | private static instance: LiveManager |
58 | 52 | ||
59 | private readonly muxingSessions = new Map<string, MuxingSession>() | 53 | private readonly muxingSessions = new Map<string, MuxingSession>() |
60 | private readonly videoSessions = new Map<number, string>() | 54 | private readonly videoSessions = new Map<string, string>() |
61 | 55 | ||
62 | private rtmpServer: Server | 56 | private rtmpServer: Server |
63 | private rtmpsServer: ServerTLS | 57 | private rtmpsServer: ServerTLS |
@@ -177,14 +171,19 @@ class LiveManager { | |||
177 | return !!this.rtmpServer | 171 | return !!this.rtmpServer |
178 | } | 172 | } |
179 | 173 | ||
180 | stopSessionOf (videoId: number, error: LiveVideoError | null) { | 174 | stopSessionOf (videoUUID: string, error: LiveVideoError | null) { |
181 | const sessionId = this.videoSessions.get(videoId) | 175 | const sessionId = this.videoSessions.get(videoUUID) |
182 | if (!sessionId) return | 176 | if (!sessionId) { |
177 | logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID)) | ||
178 | return | ||
179 | } | ||
183 | 180 | ||
184 | this.saveEndingSession(videoId, error) | 181 | logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) |
185 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | ||
186 | 182 | ||
187 | this.videoSessions.delete(videoId) | 183 | this.saveEndingSession(videoUUID, error) |
184 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) })) | ||
185 | |||
186 | this.videoSessions.delete(videoUUID) | ||
188 | this.abortSession(sessionId) | 187 | this.abortSession(sessionId) |
189 | } | 188 | } |
190 | 189 | ||
@@ -221,6 +220,11 @@ class LiveManager { | |||
221 | return this.abortSession(sessionId) | 220 | return this.abortSession(sessionId) |
222 | } | 221 | } |
223 | 222 | ||
223 | if (this.videoSessions.has(video.uuid)) { | ||
224 | logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid)) | ||
225 | return this.abortSession(sessionId) | ||
226 | } | ||
227 | |||
224 | // Cleanup old potential live (could happen with a permanent live) | 228 | // Cleanup old potential live (could happen with a permanent live) |
225 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | 229 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) |
226 | if (oldStreamingPlaylist) { | 230 | if (oldStreamingPlaylist) { |
@@ -229,7 +233,7 @@ class LiveManager { | |||
229 | await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) | 233 | await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) |
230 | } | 234 | } |
231 | 235 | ||
232 | this.videoSessions.set(video.id, sessionId) | 236 | this.videoSessions.set(video.uuid, sessionId) |
233 | 237 | ||
234 | const now = Date.now() | 238 | const now = Date.now() |
235 | const probe = await ffprobePromise(inputUrl) | 239 | const probe = await ffprobePromise(inputUrl) |
@@ -253,7 +257,7 @@ class LiveManager { | |||
253 | ) | 257 | ) |
254 | 258 | ||
255 | logger.info( | 259 | logger.info( |
256 | 'Will mux/transcode live video of original resolution %d.', resolution, | 260 | 'Handling live video of original resolution %d.', resolution, |
257 | { allResolutions, ...lTags(sessionId, video.uuid) } | 261 | { allResolutions, ...lTags(sessionId, video.uuid) } |
258 | ) | 262 | ) |
259 | 263 | ||
@@ -301,44 +305,44 @@ class LiveManager { | |||
301 | 305 | ||
302 | muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) | 306 | muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) |
303 | 307 | ||
304 | muxingSession.on('bad-socket-health', ({ videoId }) => { | 308 | muxingSession.on('bad-socket-health', ({ videoUUID }) => { |
305 | logger.error( | 309 | logger.error( |
306 | 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + | 310 | 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + |
307 | ' Stopping session of video %s.', videoUUID, | 311 | ' Stopping session of video %s.', videoUUID, |
308 | localLTags | 312 | localLTags |
309 | ) | 313 | ) |
310 | 314 | ||
311 | this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) | 315 | this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH) |
312 | }) | 316 | }) |
313 | 317 | ||
314 | muxingSession.on('duration-exceeded', ({ videoId }) => { | 318 | muxingSession.on('duration-exceeded', ({ videoUUID }) => { |
315 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) | 319 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) |
316 | 320 | ||
317 | this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) | 321 | this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED) |
318 | }) | 322 | }) |
319 | 323 | ||
320 | muxingSession.on('quota-exceeded', ({ videoId }) => { | 324 | muxingSession.on('quota-exceeded', ({ videoUUID }) => { |
321 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) | 325 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) |
322 | 326 | ||
323 | this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) | 327 | this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED) |
324 | }) | 328 | }) |
325 | 329 | ||
326 | muxingSession.on('ffmpeg-error', ({ videoId }) => { | 330 | muxingSession.on('transcoding-error', ({ videoUUID }) => { |
327 | this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) | 331 | this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR) |
328 | }) | 332 | }) |
329 | 333 | ||
330 | muxingSession.on('ffmpeg-end', ({ videoId }) => { | 334 | muxingSession.on('transcoding-end', ({ videoUUID }) => { |
331 | this.onMuxingFFmpegEnd(videoId, sessionId) | 335 | this.onMuxingFFmpegEnd(videoUUID, sessionId) |
332 | }) | 336 | }) |
333 | 337 | ||
334 | muxingSession.on('after-cleanup', ({ videoId }) => { | 338 | muxingSession.on('after-cleanup', ({ videoUUID }) => { |
335 | this.muxingSessions.delete(sessionId) | 339 | this.muxingSessions.delete(sessionId) |
336 | 340 | ||
337 | LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) | 341 | LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) |
338 | 342 | ||
339 | muxingSession.destroy() | 343 | muxingSession.destroy() |
340 | 344 | ||
341 | return this.onAfterMuxingCleanup({ videoId, liveSession }) | 345 | return this.onAfterMuxingCleanup({ videoUUID, liveSession }) |
342 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) | 346 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) |
343 | }) | 347 | }) |
344 | 348 | ||
@@ -379,22 +383,24 @@ class LiveManager { | |||
379 | } | 383 | } |
380 | } | 384 | } |
381 | 385 | ||
382 | private onMuxingFFmpegEnd (videoId: number, sessionId: string) { | 386 | private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) { |
383 | this.videoSessions.delete(videoId) | 387 | this.videoSessions.delete(videoUUID) |
384 | 388 | ||
385 | this.saveEndingSession(videoId, null) | 389 | this.saveEndingSession(videoUUID, null) |
386 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | 390 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) |
387 | } | 391 | } |
388 | 392 | ||
389 | private async onAfterMuxingCleanup (options: { | 393 | private async onAfterMuxingCleanup (options: { |
390 | videoId: number | string | 394 | videoUUID: string |
391 | liveSession?: MVideoLiveSession | 395 | liveSession?: MVideoLiveSession |
392 | cleanupNow?: boolean // Default false | 396 | cleanupNow?: boolean // Default false |
393 | }) { | 397 | }) { |
394 | const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options | 398 | const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options |
399 | |||
400 | logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID)) | ||
395 | 401 | ||
396 | try { | 402 | try { |
397 | const fullVideo = await VideoModel.loadFull(videoId) | 403 | const fullVideo = await VideoModel.loadFull(videoUUID) |
398 | if (!fullVideo) return | 404 | if (!fullVideo) return |
399 | 405 | ||
400 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) | 406 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) |
@@ -437,15 +443,17 @@ class LiveManager { | |||
437 | 443 | ||
438 | await federateVideoIfNeeded(fullVideo, false) | 444 | await federateVideoIfNeeded(fullVideo, false) |
439 | } catch (err) { | 445 | } catch (err) { |
440 | logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) | 446 | logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) }) |
441 | } | 447 | } |
442 | } | 448 | } |
443 | 449 | ||
444 | private async handleBrokenLives () { | 450 | private async handleBrokenLives () { |
451 | await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' }) | ||
452 | |||
445 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | 453 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() |
446 | 454 | ||
447 | for (const uuid of videoUUIDs) { | 455 | for (const uuid of videoUUIDs) { |
448 | await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) | 456 | await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true }) |
449 | } | 457 | } |
450 | } | 458 | } |
451 | 459 | ||
@@ -494,8 +502,8 @@ class LiveManager { | |||
494 | }) | 502 | }) |
495 | } | 503 | } |
496 | 504 | ||
497 | private async saveEndingSession (videoId: number, error: LiveVideoError | null) { | 505 | private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) { |
498 | const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) | 506 | const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID) |
499 | if (!liveSession) return | 507 | if (!liveSession) return |
500 | 508 | ||
501 | liveSession.endDate = new Date() | 509 | liveSession.endDate = new Date() |
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts index 4d03754a9..251301141 100644 --- a/server/lib/live/live-segment-sha-store.ts +++ b/server/lib/live/live-segment-sha-store.ts | |||
@@ -52,7 +52,10 @@ class LiveSegmentShaStore { | |||
52 | logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) | 52 | logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) |
53 | 53 | ||
54 | if (!this.segmentsSha256.has(segmentName)) { | 54 | if (!this.segmentsSha256.has(segmentName)) { |
55 | logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID)) | 55 | logger.warn( |
56 | 'Unknown segment in live segment hash store for video %s and segment %s.', | ||
57 | this.videoUUID, segmentPath, lTags(this.videoUUID) | ||
58 | ) | ||
56 | return | 59 | return |
57 | } | 60 | } |
58 | 61 | ||
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index c0dec9829..3fb3ce1ce 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts | |||
@@ -1,8 +1,9 @@ | |||
1 | import { pathExists, readdir, remove } from 'fs-extra' | 1 | import { pathExists, readdir, remove } from 'fs-extra' |
2 | import { basename, join } from 'path' | 2 | import { basename, join } from 'path' |
3 | import { logger } from '@server/helpers/logger' | 3 | import { logger } from '@server/helpers/logger' |
4 | import { VIDEO_LIVE } from '@server/initializers/constants' | ||
4 | import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' | 5 | import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' |
5 | import { VideoStorage } from '@shared/models' | 6 | import { LiveVideoLatencyMode, VideoStorage } from '@shared/models' |
6 | import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage' | 7 | import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage' |
7 | import { getLiveDirectory } from '../paths' | 8 | import { getLiveDirectory } from '../paths' |
8 | 9 | ||
@@ -37,10 +38,19 @@ async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreaming | |||
37 | await cleanupTMPLiveFilesFromFilesystem(video) | 38 | await cleanupTMPLiveFilesFromFilesystem(video) |
38 | } | 39 | } |
39 | 40 | ||
41 | function getLiveSegmentTime (latencyMode: LiveVideoLatencyMode) { | ||
42 | if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) { | ||
43 | return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY | ||
44 | } | ||
45 | |||
46 | return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY | ||
47 | } | ||
48 | |||
40 | export { | 49 | export { |
41 | cleanupAndDestroyPermanentLive, | 50 | cleanupAndDestroyPermanentLive, |
42 | cleanupUnsavedNormalLive, | 51 | cleanupUnsavedNormalLive, |
43 | cleanupTMPLiveFiles, | 52 | cleanupTMPLiveFiles, |
53 | getLiveSegmentTime, | ||
44 | buildConcatenatedName | 54 | buildConcatenatedName |
45 | } | 55 | } |
46 | 56 | ||
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 2727fc4a7..f3f8fc886 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { mapSeries } from 'bluebird' | 1 | import { mapSeries } from 'bluebird' |
2 | import { FSWatcher, watch } from 'chokidar' | 2 | import { FSWatcher, watch } from 'chokidar' |
3 | import { FfmpegCommand } from 'fluent-ffmpeg' | 3 | import { EventEmitter } from 'events' |
4 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' | 4 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' |
5 | import PQueue from 'p-queue' | 5 | import PQueue from 'p-queue' |
6 | import { basename, join } from 'path' | 6 | import { basename, join } from 'path' |
7 | import { EventEmitter } from 'stream' | 7 | import { computeOutputFPS } from '@server/helpers/ffmpeg' |
8 | import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' | ||
9 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' | 8 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' |
10 | import { CONFIG } from '@server/initializers/config' | 9 | import { CONFIG } from '@server/initializers/config' |
11 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' | 10 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' |
@@ -20,24 +19,24 @@ import { | |||
20 | getLiveDirectory, | 19 | getLiveDirectory, |
21 | getLiveReplayBaseDirectory | 20 | getLiveReplayBaseDirectory |
22 | } from '../../paths' | 21 | } from '../../paths' |
23 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' | ||
24 | import { isAbleToUploadVideo } from '../../user' | 22 | import { isAbleToUploadVideo } from '../../user' |
25 | import { LiveQuotaStore } from '../live-quota-store' | 23 | import { LiveQuotaStore } from '../live-quota-store' |
26 | import { LiveSegmentShaStore } from '../live-segment-sha-store' | 24 | import { LiveSegmentShaStore } from '../live-segment-sha-store' |
27 | import { buildConcatenatedName } from '../live-utils' | 25 | import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils' |
26 | import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper' | ||
28 | 27 | ||
29 | import memoizee = require('memoizee') | 28 | import memoizee = require('memoizee') |
30 | interface MuxingSessionEvents { | 29 | interface MuxingSessionEvents { |
31 | 'live-ready': (options: { videoId: number }) => void | 30 | 'live-ready': (options: { videoUUID: string }) => void |
32 | 31 | ||
33 | 'bad-socket-health': (options: { videoId: number }) => void | 32 | 'bad-socket-health': (options: { videoUUID: string }) => void |
34 | 'duration-exceeded': (options: { videoId: number }) => void | 33 | 'duration-exceeded': (options: { videoUUID: string }) => void |
35 | 'quota-exceeded': (options: { videoId: number }) => void | 34 | 'quota-exceeded': (options: { videoUUID: string }) => void |
36 | 35 | ||
37 | 'ffmpeg-end': (options: { videoId: number }) => void | 36 | 'transcoding-end': (options: { videoUUID: string }) => void |
38 | 'ffmpeg-error': (options: { videoId: number }) => void | 37 | 'transcoding-error': (options: { videoUUID: string }) => void |
39 | 38 | ||
40 | 'after-cleanup': (options: { videoId: number }) => void | 39 | 'after-cleanup': (options: { videoUUID: string }) => void |
41 | } | 40 | } |
42 | 41 | ||
43 | declare interface MuxingSession { | 42 | declare interface MuxingSession { |
@@ -52,7 +51,7 @@ declare interface MuxingSession { | |||
52 | 51 | ||
53 | class MuxingSession extends EventEmitter { | 52 | class MuxingSession extends EventEmitter { |
54 | 53 | ||
55 | private ffmpegCommand: FfmpegCommand | 54 | private transcodingWrapper: AbstractTranscodingWrapper |
56 | 55 | ||
57 | private readonly context: any | 56 | private readonly context: any |
58 | private readonly user: MUserId | 57 | private readonly user: MUserId |
@@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter { | |||
67 | 66 | ||
68 | private readonly hasAudio: boolean | 67 | private readonly hasAudio: boolean |
69 | 68 | ||
70 | private readonly videoId: number | ||
71 | private readonly videoUUID: string | 69 | private readonly videoUUID: string |
72 | private readonly saveReplay: boolean | 70 | private readonly saveReplay: boolean |
73 | 71 | ||
@@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter { | |||
126 | 124 | ||
127 | this.allResolutions = options.allResolutions | 125 | this.allResolutions = options.allResolutions |
128 | 126 | ||
129 | this.videoId = this.videoLive.Video.id | ||
130 | this.videoUUID = this.videoLive.Video.uuid | 127 | this.videoUUID = this.videoLive.Video.uuid |
131 | 128 | ||
132 | this.saveReplay = this.videoLive.saveReplay | 129 | this.saveReplay = this.videoLive.saveReplay |
@@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter { | |||
145 | 142 | ||
146 | await this.prepareDirectories() | 143 | await this.prepareDirectories() |
147 | 144 | ||
148 | this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED | 145 | this.transcodingWrapper = this.buildTranscodingWrapper() |
149 | ? await getLiveTranscodingCommand({ | ||
150 | inputUrl: this.inputUrl, | ||
151 | 146 | ||
152 | outPath: this.outDirectory, | 147 | this.transcodingWrapper.on('end', () => this.onTranscodedEnded()) |
153 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | 148 | this.transcodingWrapper.on('error', () => this.onTranscodingError()) |
154 | 149 | ||
155 | latencyMode: this.videoLive.latencyMode, | 150 | await this.transcodingWrapper.run() |
156 | |||
157 | resolutions: this.allResolutions, | ||
158 | fps: this.fps, | ||
159 | bitrate: this.bitrate, | ||
160 | ratio: this.ratio, | ||
161 | |||
162 | hasAudio: this.hasAudio, | ||
163 | |||
164 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
165 | profile: CONFIG.LIVE.TRANSCODING.PROFILE | ||
166 | }) | ||
167 | : getLiveMuxingCommand({ | ||
168 | inputUrl: this.inputUrl, | ||
169 | outPath: this.outDirectory, | ||
170 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
171 | latencyMode: this.videoLive.latencyMode | ||
172 | }) | ||
173 | |||
174 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | ||
175 | 151 | ||
176 | this.watchMasterFile() | 152 | this.watchMasterFile() |
177 | this.watchTSFiles() | 153 | this.watchTSFiles() |
178 | this.watchM3U8File() | 154 | this.watchM3U8File() |
179 | |||
180 | let ffmpegShellCommand: string | ||
181 | this.ffmpegCommand.on('start', cmdline => { | ||
182 | ffmpegShellCommand = cmdline | ||
183 | |||
184 | logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) | ||
185 | }) | ||
186 | |||
187 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
188 | this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) | ||
189 | }) | ||
190 | |||
191 | this.ffmpegCommand.on('end', () => { | ||
192 | this.emit('ffmpeg-end', ({ videoId: this.videoId })) | ||
193 | |||
194 | this.onFFmpegEnded() | ||
195 | }) | ||
196 | |||
197 | this.ffmpegCommand.run() | ||
198 | } | 155 | } |
199 | 156 | ||
200 | abort () { | 157 | abort () { |
201 | if (!this.ffmpegCommand) return | 158 | if (!this.transcodingWrapper) return |
202 | 159 | ||
203 | this.aborted = true | 160 | this.aborted = true |
204 | this.ffmpegCommand.kill('SIGINT') | 161 | this.transcodingWrapper.abort() |
205 | } | 162 | } |
206 | 163 | ||
207 | destroy () { | 164 | destroy () { |
@@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter { | |||
210 | this.hasClientSocketInBadHealthWithCache.clear() | 167 | this.hasClientSocketInBadHealthWithCache.clear() |
211 | } | 168 | } |
212 | 169 | ||
213 | private onFFmpegError (options: { | ||
214 | err: any | ||
215 | stdout: string | ||
216 | stderr: string | ||
217 | ffmpegShellCommand: string | ||
218 | }) { | ||
219 | const { err, stdout, stderr, ffmpegShellCommand } = options | ||
220 | |||
221 | this.onFFmpegEnded() | ||
222 | |||
223 | // Don't care that we killed the ffmpeg process | ||
224 | if (err?.message?.includes('Exiting normally')) return | ||
225 | |||
226 | logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) | ||
227 | |||
228 | this.emit('ffmpeg-error', ({ videoId: this.videoId })) | ||
229 | } | ||
230 | |||
231 | private onFFmpegEnded () { | ||
232 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) | ||
233 | |||
234 | setTimeout(() => { | ||
235 | // Wait latest segments generation, and close watchers | ||
236 | |||
237 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) | ||
238 | .then(() => { | ||
239 | // Process remaining segments hash | ||
240 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | ||
241 | this.processSegments(this.segmentsToProcessPerPlaylist[key]) | ||
242 | } | ||
243 | }) | ||
244 | .catch(err => { | ||
245 | logger.error( | ||
246 | 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, | ||
247 | { err, ...this.lTags() } | ||
248 | ) | ||
249 | }) | ||
250 | |||
251 | this.emit('after-cleanup', { videoId: this.videoId }) | ||
252 | }, 1000) | ||
253 | } | ||
254 | |||
255 | private watchMasterFile () { | 170 | private watchMasterFile () { |
256 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) | 171 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) |
257 | 172 | ||
@@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter { | |||
272 | 187 | ||
273 | this.masterPlaylistCreated = true | 188 | this.masterPlaylistCreated = true |
274 | 189 | ||
190 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) | ||
191 | |||
275 | this.masterWatcher.close() | 192 | this.masterWatcher.close() |
276 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) | 193 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) |
277 | }) | 194 | }) |
@@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter { | |||
318 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] | 235 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] |
319 | 236 | ||
320 | if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { | 237 | if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { |
321 | this.emit('bad-socket-health', { videoId: this.videoId }) | 238 | this.emit('bad-socket-health', { videoUUID: this.videoUUID }) |
322 | return | 239 | return |
323 | } | 240 | } |
324 | 241 | ||
325 | // Duration constraint check | 242 | // Duration constraint check |
326 | if (this.isDurationConstraintValid(startStreamDateTime) !== true) { | 243 | if (this.isDurationConstraintValid(startStreamDateTime) !== true) { |
327 | this.emit('duration-exceeded', { videoId: this.videoId }) | 244 | this.emit('duration-exceeded', { videoUUID: this.videoUUID }) |
328 | return | 245 | return |
329 | } | 246 | } |
330 | 247 | ||
331 | // Check user quota if the user enabled replay saving | 248 | // Check user quota if the user enabled replay saving |
332 | if (await this.isQuotaExceeded(segmentPath) === true) { | 249 | if (await this.isQuotaExceeded(segmentPath) === true) { |
333 | this.emit('quota-exceeded', { videoId: this.videoId }) | 250 | this.emit('quota-exceeded', { videoUUID: this.videoUUID }) |
334 | } | 251 | } |
335 | } | 252 | } |
336 | 253 | ||
@@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter { | |||
438 | if (this.masterPlaylistCreated && !this.liveReady) { | 355 | if (this.masterPlaylistCreated && !this.liveReady) { |
439 | this.liveReady = true | 356 | this.liveReady = true |
440 | 357 | ||
441 | this.emit('live-ready', { videoId: this.videoId }) | 358 | this.emit('live-ready', { videoUUID: this.videoUUID }) |
442 | } | 359 | } |
443 | } | 360 | } |
444 | 361 | ||
362 | private onTranscodingError () { | ||
363 | this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) | ||
364 | } | ||
365 | |||
366 | private onTranscodedEnded () { | ||
367 | this.emit('transcoding-end', ({ videoUUID: this.videoUUID })) | ||
368 | |||
369 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) | ||
370 | |||
371 | setTimeout(() => { | ||
372 | // Wait latest segments generation, and close watchers | ||
373 | |||
374 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) | ||
375 | .then(() => { | ||
376 | // Process remaining segments hash | ||
377 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | ||
378 | this.processSegments(this.segmentsToProcessPerPlaylist[key]) | ||
379 | } | ||
380 | }) | ||
381 | .catch(err => { | ||
382 | logger.error( | ||
383 | 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, | ||
384 | { err, ...this.lTags() } | ||
385 | ) | ||
386 | }) | ||
387 | |||
388 | this.emit('after-cleanup', { videoUUID: this.videoUUID }) | ||
389 | }, 1000) | ||
390 | } | ||
391 | |||
445 | private hasClientSocketInBadHealth (sessionId: string) { | 392 | private hasClientSocketInBadHealth (sessionId: string) { |
446 | const rtmpSession = this.context.sessions.get(sessionId) | 393 | const rtmpSession = this.context.sessions.get(sessionId) |
447 | 394 | ||
@@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter { | |||
503 | sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED | 450 | sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED |
504 | }) | 451 | }) |
505 | } | 452 | } |
453 | |||
454 | private buildTranscodingWrapper () { | ||
455 | const options = { | ||
456 | streamingPlaylist: this.streamingPlaylist, | ||
457 | videoLive: this.videoLive, | ||
458 | |||
459 | lTags: this.lTags, | ||
460 | |||
461 | inputUrl: this.inputUrl, | ||
462 | |||
463 | toTranscode: this.allResolutions.map(resolution => ({ | ||
464 | resolution, | ||
465 | fps: computeOutputFPS({ inputFPS: this.fps, resolution }) | ||
466 | })), | ||
467 | |||
468 | fps: this.fps, | ||
469 | bitrate: this.bitrate, | ||
470 | ratio: this.ratio, | ||
471 | hasAudio: this.hasAudio, | ||
472 | |||
473 | segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, | ||
474 | segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode), | ||
475 | |||
476 | outDirectory: this.outDirectory | ||
477 | } | ||
478 | |||
479 | return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED | ||
480 | ? new RemoteTranscodingWrapper(options) | ||
481 | : new FFmpegTranscodingWrapper(options) | ||
482 | } | ||
506 | } | 483 | } |
507 | 484 | ||
508 | // --------------------------------------------------------------------------- | 485 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts new file mode 100644 index 000000000..226ba4573 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts | |||
@@ -0,0 +1,101 @@ | |||
1 | import EventEmitter from 'events' | ||
2 | import { LoggerTagsFn } from '@server/helpers/logger' | ||
3 | import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models' | ||
4 | import { LiveVideoError } from '@shared/models' | ||
5 | |||
6 | interface TranscodingWrapperEvents { | ||
7 | 'end': () => void | ||
8 | |||
9 | 'error': (options: { err: Error }) => void | ||
10 | } | ||
11 | |||
12 | declare interface AbstractTranscodingWrapper { | ||
13 | on<U extends keyof TranscodingWrapperEvents>( | ||
14 | event: U, listener: TranscodingWrapperEvents[U] | ||
15 | ): this | ||
16 | |||
17 | emit<U extends keyof TranscodingWrapperEvents>( | ||
18 | event: U, ...args: Parameters<TranscodingWrapperEvents[U]> | ||
19 | ): boolean | ||
20 | } | ||
21 | |||
22 | interface AbstractTranscodingWrapperOptions { | ||
23 | streamingPlaylist: MStreamingPlaylistVideo | ||
24 | videoLive: MVideoLiveVideo | ||
25 | |||
26 | lTags: LoggerTagsFn | ||
27 | |||
28 | inputUrl: string | ||
29 | fps: number | ||
30 | toTranscode: { | ||
31 | resolution: number | ||
32 | fps: number | ||
33 | }[] | ||
34 | |||
35 | bitrate: number | ||
36 | ratio: number | ||
37 | hasAudio: boolean | ||
38 | |||
39 | segmentListSize: number | ||
40 | segmentDuration: number | ||
41 | |||
42 | outDirectory: string | ||
43 | } | ||
44 | |||
45 | abstract class AbstractTranscodingWrapper extends EventEmitter { | ||
46 | protected readonly videoLive: MVideoLiveVideo | ||
47 | |||
48 | protected readonly toTranscode: { | ||
49 | resolution: number | ||
50 | fps: number | ||
51 | }[] | ||
52 | |||
53 | protected readonly inputUrl: string | ||
54 | protected readonly fps: number | ||
55 | protected readonly bitrate: number | ||
56 | protected readonly ratio: number | ||
57 | protected readonly hasAudio: boolean | ||
58 | |||
59 | protected readonly segmentListSize: number | ||
60 | protected readonly segmentDuration: number | ||
61 | |||
62 | protected readonly videoUUID: string | ||
63 | |||
64 | protected readonly outDirectory: string | ||
65 | |||
66 | protected readonly lTags: LoggerTagsFn | ||
67 | |||
68 | protected readonly streamingPlaylist: MStreamingPlaylistVideo | ||
69 | |||
70 | constructor (options: AbstractTranscodingWrapperOptions) { | ||
71 | super() | ||
72 | |||
73 | this.lTags = options.lTags | ||
74 | |||
75 | this.videoLive = options.videoLive | ||
76 | this.videoUUID = options.videoLive.Video.uuid | ||
77 | this.streamingPlaylist = options.streamingPlaylist | ||
78 | |||
79 | this.inputUrl = options.inputUrl | ||
80 | this.fps = options.fps | ||
81 | this.toTranscode = options.toTranscode | ||
82 | |||
83 | this.bitrate = options.bitrate | ||
84 | this.ratio = options.ratio | ||
85 | this.hasAudio = options.hasAudio | ||
86 | |||
87 | this.segmentListSize = options.segmentListSize | ||
88 | this.segmentDuration = options.segmentDuration | ||
89 | |||
90 | this.outDirectory = options.outDirectory | ||
91 | } | ||
92 | |||
93 | abstract run (): Promise<void> | ||
94 | |||
95 | abstract abort (error?: LiveVideoError): void | ||
96 | } | ||
97 | |||
98 | export { | ||
99 | AbstractTranscodingWrapper, | ||
100 | AbstractTranscodingWrapperOptions | ||
101 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts new file mode 100644 index 000000000..1f4c12bd4 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts | |||
@@ -0,0 +1,95 @@ | |||
1 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
2 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { CONFIG } from '@server/initializers/config' | ||
5 | import { VIDEO_LIVE } from '@server/initializers/constants' | ||
6 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | ||
7 | import { FFmpegLive } from '@shared/ffmpeg' | ||
8 | import { getLiveSegmentTime } from '../../live-utils' | ||
9 | import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' | ||
10 | |||
11 | export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { | ||
12 | private ffmpegCommand: FfmpegCommand | ||
13 | private ended = false | ||
14 | |||
15 | async run () { | ||
16 | this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED | ||
17 | ? await this.buildFFmpegLive().getLiveTranscodingCommand({ | ||
18 | inputUrl: this.inputUrl, | ||
19 | |||
20 | outPath: this.outDirectory, | ||
21 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
22 | |||
23 | segmentListSize: this.segmentListSize, | ||
24 | segmentDuration: this.segmentDuration, | ||
25 | |||
26 | toTranscode: this.toTranscode, | ||
27 | |||
28 | bitrate: this.bitrate, | ||
29 | ratio: this.ratio, | ||
30 | |||
31 | hasAudio: this.hasAudio | ||
32 | }) | ||
33 | : this.buildFFmpegLive().getLiveMuxingCommand({ | ||
34 | inputUrl: this.inputUrl, | ||
35 | outPath: this.outDirectory, | ||
36 | |||
37 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
38 | |||
39 | segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, | ||
40 | segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode) | ||
41 | }) | ||
42 | |||
43 | logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | ||
44 | |||
45 | this.ffmpegCommand.run() | ||
46 | |||
47 | let ffmpegShellCommand: string | ||
48 | this.ffmpegCommand.on('start', cmdline => { | ||
49 | ffmpegShellCommand = cmdline | ||
50 | |||
51 | logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) | ||
52 | }) | ||
53 | |||
54 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
55 | this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) | ||
56 | }) | ||
57 | |||
58 | this.ffmpegCommand.on('end', () => { | ||
59 | this.onFFmpegEnded() | ||
60 | }) | ||
61 | |||
62 | this.ffmpegCommand.run() | ||
63 | } | ||
64 | |||
65 | abort () { | ||
66 | // Nothing to do, ffmpeg will automatically exit | ||
67 | } | ||
68 | |||
69 | private onFFmpegError (options: { | ||
70 | err: any | ||
71 | stdout: string | ||
72 | stderr: string | ||
73 | ffmpegShellCommand: string | ||
74 | }) { | ||
75 | const { err, stdout, stderr, ffmpegShellCommand } = options | ||
76 | |||
77 | // Don't care that we killed the ffmpeg process | ||
78 | if (err?.message?.includes('Exiting normally')) return | ||
79 | |||
80 | logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) | ||
81 | |||
82 | this.emit('error', { err }) | ||
83 | } | ||
84 | |||
85 | private onFFmpegEnded () { | ||
86 | if (this.ended) return | ||
87 | |||
88 | this.ended = true | ||
89 | this.emit('end') | ||
90 | } | ||
91 | |||
92 | private buildFFmpegLive () { | ||
93 | return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) | ||
94 | } | ||
95 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts new file mode 100644 index 000000000..ae28fa1ca --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './abstract-transcoding-wrapper' | ||
2 | export * from './ffmpeg-transcoding-wrapper' | ||
3 | export * from './remote-transcoding-wrapper' | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts new file mode 100644 index 000000000..345eaf442 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts | |||
@@ -0,0 +1,20 @@ | |||
1 | import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners' | ||
2 | import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' | ||
3 | |||
4 | export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper { | ||
5 | async run () { | ||
6 | await new LiveRTMPHLSTranscodingJobHandler().create({ | ||
7 | rtmpUrl: this.inputUrl, | ||
8 | toTranscode: this.toTranscode, | ||
9 | video: this.videoLive.Video, | ||
10 | outputDirectory: this.outDirectory, | ||
11 | playlist: this.streamingPlaylist, | ||
12 | segmentListSize: this.segmentListSize, | ||
13 | segmentDuration: this.segmentDuration | ||
14 | }) | ||
15 | } | ||
16 | |||
17 | abort () { | ||
18 | this.emit('end') | ||
19 | } | ||
20 | } | ||
diff --git a/server/lib/object-storage/index.ts b/server/lib/object-storage/index.ts index 8b413a40e..6525f8dfb 100644 --- a/server/lib/object-storage/index.ts +++ b/server/lib/object-storage/index.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | export * from './keys' | 1 | export * from './keys' |
2 | export * from './proxy' | ||
2 | export * from './urls' | 3 | export * from './urls' |
3 | export * from './videos' | 4 | export * from './videos' |
diff --git a/server/lib/object-storage/proxy.ts b/server/lib/object-storage/proxy.ts new file mode 100644 index 000000000..c782a8a25 --- /dev/null +++ b/server/lib/object-storage/proxy.ts | |||
@@ -0,0 +1,97 @@ | |||
1 | import express from 'express' | ||
2 | import { PassThrough, pipeline } from 'stream' | ||
3 | import { GetObjectCommandOutput } from '@aws-sdk/client-s3' | ||
4 | import { buildReinjectVideoFileTokenQuery } from '@server/controllers/shared/m3u8-playlist' | ||
5 | import { logger } from '@server/helpers/logger' | ||
6 | import { StreamReplacer } from '@server/helpers/stream-replacer' | ||
7 | import { MStreamingPlaylist, MVideo } from '@server/types/models' | ||
8 | import { HttpStatusCode } from '@shared/models' | ||
9 | import { injectQueryToPlaylistUrls } from '../hls' | ||
10 | import { getHLSFileReadStream, getWebTorrentFileReadStream } from './videos' | ||
11 | |||
12 | export async function proxifyWebTorrentFile (options: { | ||
13 | req: express.Request | ||
14 | res: express.Response | ||
15 | filename: string | ||
16 | }) { | ||
17 | const { req, res, filename } = options | ||
18 | |||
19 | logger.debug('Proxifying WebTorrent file %s from object storage.', filename) | ||
20 | |||
21 | try { | ||
22 | const { response: s3Response, stream } = await getWebTorrentFileReadStream({ | ||
23 | filename, | ||
24 | rangeHeader: req.header('range') | ||
25 | }) | ||
26 | |||
27 | setS3Headers(res, s3Response) | ||
28 | |||
29 | return stream.pipe(res) | ||
30 | } catch (err) { | ||
31 | return handleObjectStorageFailure(res, err) | ||
32 | } | ||
33 | } | ||
34 | |||
35 | export async function proxifyHLS (options: { | ||
36 | req: express.Request | ||
37 | res: express.Response | ||
38 | playlist: MStreamingPlaylist | ||
39 | video: MVideo | ||
40 | filename: string | ||
41 | reinjectVideoFileToken: boolean | ||
42 | }) { | ||
43 | const { req, res, playlist, video, filename, reinjectVideoFileToken } = options | ||
44 | |||
45 | logger.debug('Proxifying HLS file %s from object storage.', filename) | ||
46 | |||
47 | try { | ||
48 | const { response: s3Response, stream } = await getHLSFileReadStream({ | ||
49 | playlist: playlist.withVideo(video), | ||
50 | filename, | ||
51 | rangeHeader: req.header('range') | ||
52 | }) | ||
53 | |||
54 | setS3Headers(res, s3Response) | ||
55 | |||
56 | const streamReplacer = reinjectVideoFileToken | ||
57 | ? new StreamReplacer(line => injectQueryToPlaylistUrls(line, buildReinjectVideoFileTokenQuery(req, filename.endsWith('master.m3u8')))) | ||
58 | : new PassThrough() | ||
59 | |||
60 | return pipeline( | ||
61 | stream, | ||
62 | streamReplacer, | ||
63 | res, | ||
64 | err => { | ||
65 | if (!err) return | ||
66 | |||
67 | handleObjectStorageFailure(res, err) | ||
68 | } | ||
69 | ) | ||
70 | } catch (err) { | ||
71 | return handleObjectStorageFailure(res, err) | ||
72 | } | ||
73 | } | ||
74 | |||
75 | // --------------------------------------------------------------------------- | ||
76 | // Private | ||
77 | // --------------------------------------------------------------------------- | ||
78 | |||
79 | function handleObjectStorageFailure (res: express.Response, err: Error) { | ||
80 | if (err.name === 'NoSuchKey') { | ||
81 | logger.debug('Could not find key in object storage to proxify private HLS video file.', { err }) | ||
82 | return res.sendStatus(HttpStatusCode.NOT_FOUND_404) | ||
83 | } | ||
84 | |||
85 | return res.fail({ | ||
86 | status: HttpStatusCode.INTERNAL_SERVER_ERROR_500, | ||
87 | message: err.message, | ||
88 | type: err.name | ||
89 | }) | ||
90 | } | ||
91 | |||
92 | function setS3Headers (res: express.Response, s3Response: GetObjectCommandOutput) { | ||
93 | if (s3Response.$metadata.httpStatusCode === HttpStatusCode.PARTIAL_CONTENT_206) { | ||
94 | res.setHeader('Content-Range', s3Response.ContentRange) | ||
95 | res.status(HttpStatusCode.PARTIAL_CONTENT_206) | ||
96 | } | ||
97 | } | ||
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 0398ca61d..ded7e9743 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts | |||
@@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http' | |||
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | 2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' |
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | 3 | import { isIdValid } from '@server/helpers/custom-validators/misc' |
4 | import { MVideo, MVideoImmutable } from '@server/types/models' | 4 | import { MVideo, MVideoImmutable } from '@server/types/models' |
5 | import { MRunner } from '@server/types/models/runners' | ||
5 | import { UserNotificationModelForApi } from '@server/types/models/user' | 6 | import { UserNotificationModelForApi } from '@server/types/models/user' |
6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | 7 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' |
7 | import { logger } from '../helpers/logger' | 8 | import { logger } from '../helpers/logger' |
8 | import { authenticateSocket } from '../middlewares' | 9 | import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' |
10 | import { Debounce } from '@server/helpers/debounce' | ||
9 | 11 | ||
10 | class PeerTubeSocket { | 12 | class PeerTubeSocket { |
11 | 13 | ||
@@ -13,6 +15,7 @@ class PeerTubeSocket { | |||
13 | 15 | ||
14 | private userNotificationSockets: { [ userId: number ]: Socket[] } = {} | 16 | private userNotificationSockets: { [ userId: number ]: Socket[] } = {} |
15 | private liveVideosNamespace: Namespace | 17 | private liveVideosNamespace: Namespace |
18 | private readonly runnerSockets = new Set<Socket>() | ||
16 | 19 | ||
17 | private constructor () {} | 20 | private constructor () {} |
18 | 21 | ||
@@ -24,7 +27,7 @@ class PeerTubeSocket { | |||
24 | .on('connection', socket => { | 27 | .on('connection', socket => { |
25 | const userId = socket.handshake.auth.user.id | 28 | const userId = socket.handshake.auth.user.id |
26 | 29 | ||
27 | logger.debug('User %d connected on the notification system.', userId) | 30 | logger.debug('User %d connected to the notification system.', userId) |
28 | 31 | ||
29 | if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] | 32 | if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] |
30 | 33 | ||
@@ -53,6 +56,22 @@ class PeerTubeSocket { | |||
53 | socket.leave(videoId) | 56 | socket.leave(videoId) |
54 | }) | 57 | }) |
55 | }) | 58 | }) |
59 | |||
60 | io.of('/runners') | ||
61 | .use(authenticateRunnerSocket) | ||
62 | .on('connection', socket => { | ||
63 | const runner: MRunner = socket.handshake.auth.runner | ||
64 | |||
65 | logger.debug(`New runner "${runner.name}" connected to the notification system.`) | ||
66 | |||
67 | this.runnerSockets.add(socket) | ||
68 | |||
69 | socket.on('disconnect', () => { | ||
70 | logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) | ||
71 | |||
72 | this.runnerSockets.delete(socket) | ||
73 | }) | ||
74 | }) | ||
56 | } | 75 | } |
57 | 76 | ||
58 | sendNotification (userId: number, notification: UserNotificationModelForApi) { | 77 | sendNotification (userId: number, notification: UserNotificationModelForApi) { |
@@ -89,6 +108,15 @@ class PeerTubeSocket { | |||
89 | .emit(type, data) | 108 | .emit(type, data) |
90 | } | 109 | } |
91 | 110 | ||
111 | @Debounce({ timeoutMS: 1000 }) | ||
112 | sendAvailableJobsPingToRunners () { | ||
113 | logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) | ||
114 | |||
115 | for (const runners of this.runnerSockets) { | ||
116 | runners.emit('available-jobs') | ||
117 | } | ||
118 | } | ||
119 | |||
92 | static get Instance () { | 120 | static get Instance () { |
93 | return this.instance || (this.instance = new this()) | 121 | return this.instance || (this.instance = new this()) |
94 | } | 122 | } |
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts index 66383af46..92ef87cca 100644 --- a/server/lib/plugins/plugin-helpers-builder.ts +++ b/server/lib/plugins/plugin-helpers-builder.ts | |||
@@ -1,7 +1,6 @@ | |||
1 | import express from 'express' | 1 | import express from 'express' |
2 | import { Server } from 'http' | 2 | import { Server } from 'http' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { ffprobePromise } from '@server/helpers/ffmpeg/ffprobe-utils' | ||
5 | import { buildLogger } from '@server/helpers/logger' | 4 | import { buildLogger } from '@server/helpers/logger' |
6 | import { CONFIG } from '@server/initializers/config' | 5 | import { CONFIG } from '@server/initializers/config' |
7 | import { WEBSERVER } from '@server/initializers/constants' | 6 | import { WEBSERVER } from '@server/initializers/constants' |
@@ -16,6 +15,7 @@ import { VideoModel } from '@server/models/video/video' | |||
16 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' | 15 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' |
17 | import { MPlugin, MVideo, UserNotificationModelForApi } from '@server/types/models' | 16 | import { MPlugin, MVideo, UserNotificationModelForApi } from '@server/types/models' |
18 | import { PeerTubeHelpers } from '@server/types/plugins' | 17 | import { PeerTubeHelpers } from '@server/types/plugins' |
18 | import { ffprobePromise } from '@shared/ffmpeg' | ||
19 | import { VideoBlacklistCreate, VideoStorage } from '@shared/models' | 19 | import { VideoBlacklistCreate, VideoStorage } from '@shared/models' |
20 | import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist' | 20 | import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist' |
21 | import { PeerTubeSocket } from '../peertube-socket' | 21 | import { PeerTubeSocket } from '../peertube-socket' |
diff --git a/server/lib/runners/index.ts b/server/lib/runners/index.ts new file mode 100644 index 000000000..a737c7b59 --- /dev/null +++ b/server/lib/runners/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './job-handlers' | ||
2 | export * from './runner' | ||
3 | export * from './runner-urls' | ||
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts new file mode 100644 index 000000000..73fc14574 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts | |||
@@ -0,0 +1,271 @@ | |||
1 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
3 | import { RUNNER_JOBS } from '@server/initializers/constants' | ||
4 | import { sequelizeTypescript } from '@server/initializers/database' | ||
5 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | ||
6 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
7 | import { setAsUpdated } from '@server/models/shared' | ||
8 | import { MRunnerJob } from '@server/types/models/runners' | ||
9 | import { pick } from '@shared/core-utils' | ||
10 | import { | ||
11 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
12 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
13 | RunnerJobState, | ||
14 | RunnerJobSuccessPayload, | ||
15 | RunnerJobType, | ||
16 | RunnerJobUpdatePayload, | ||
17 | RunnerJobVODAudioMergeTranscodingPayload, | ||
18 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | ||
19 | RunnerJobVODHLSTranscodingPayload, | ||
20 | RunnerJobVODHLSTranscodingPrivatePayload, | ||
21 | RunnerJobVODWebVideoTranscodingPayload, | ||
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | ||
23 | } from '@shared/models' | ||
24 | |||
25 | type CreateRunnerJobArg = | ||
26 | { | ||
27 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | ||
28 | payload: RunnerJobVODWebVideoTranscodingPayload | ||
29 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | ||
30 | } | | ||
31 | { | ||
32 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | ||
33 | payload: RunnerJobVODHLSTranscodingPayload | ||
34 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | ||
35 | } | | ||
36 | { | ||
37 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | ||
38 | payload: RunnerJobVODAudioMergeTranscodingPayload | ||
39 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | ||
40 | } | | ||
41 | { | ||
42 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | ||
43 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | ||
44 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
45 | } | ||
46 | |||
47 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | ||
48 | |||
49 | protected readonly lTags = loggerTagsFactory('runner') | ||
50 | |||
51 | // --------------------------------------------------------------------------- | ||
52 | |||
53 | abstract create (options: C): Promise<MRunnerJob> | ||
54 | |||
55 | protected async createRunnerJob (options: CreateRunnerJobArg & { | ||
56 | jobUUID: string | ||
57 | priority: number | ||
58 | dependsOnRunnerJob?: MRunnerJob | ||
59 | }): Promise<MRunnerJob> { | ||
60 | const { priority, dependsOnRunnerJob } = options | ||
61 | |||
62 | const runnerJob = new RunnerJobModel({ | ||
63 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | ||
64 | |||
65 | uuid: options.jobUUID, | ||
66 | |||
67 | state: dependsOnRunnerJob | ||
68 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | ||
69 | : RunnerJobState.PENDING, | ||
70 | |||
71 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | ||
72 | |||
73 | priority | ||
74 | }) | ||
75 | |||
76 | const job = await sequelizeTypescript.transaction(async transaction => { | ||
77 | return runnerJob.save({ transaction }) | ||
78 | }) | ||
79 | |||
80 | if (runnerJob.state === RunnerJobState.PENDING) { | ||
81 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
82 | } | ||
83 | |||
84 | return job | ||
85 | } | ||
86 | |||
87 | // --------------------------------------------------------------------------- | ||
88 | |||
89 | protected abstract specificUpdate (options: { | ||
90 | runnerJob: MRunnerJob | ||
91 | updatePayload?: U | ||
92 | }): Promise<void> | void | ||
93 | |||
94 | async update (options: { | ||
95 | runnerJob: MRunnerJob | ||
96 | progress?: number | ||
97 | updatePayload?: U | ||
98 | }) { | ||
99 | const { runnerJob, progress } = options | ||
100 | |||
101 | await this.specificUpdate(options) | ||
102 | |||
103 | if (progress) runnerJob.progress = progress | ||
104 | |||
105 | await retryTransactionWrapper(() => { | ||
106 | return sequelizeTypescript.transaction(async transaction => { | ||
107 | if (runnerJob.changed()) { | ||
108 | return runnerJob.save({ transaction }) | ||
109 | } | ||
110 | |||
111 | // Don't update the job too often | ||
112 | if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { | ||
113 | await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) | ||
114 | } | ||
115 | }) | ||
116 | }) | ||
117 | } | ||
118 | |||
119 | // --------------------------------------------------------------------------- | ||
120 | |||
121 | async complete (options: { | ||
122 | runnerJob: MRunnerJob | ||
123 | resultPayload: S | ||
124 | }) { | ||
125 | const { runnerJob } = options | ||
126 | |||
127 | try { | ||
128 | await this.specificComplete(options) | ||
129 | |||
130 | runnerJob.state = RunnerJobState.COMPLETED | ||
131 | } catch (err) { | ||
132 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
133 | |||
134 | runnerJob.state = RunnerJobState.ERRORED | ||
135 | runnerJob.error = err.message | ||
136 | } | ||
137 | |||
138 | runnerJob.progress = null | ||
139 | runnerJob.finishedAt = new Date() | ||
140 | |||
141 | await retryTransactionWrapper(() => { | ||
142 | return sequelizeTypescript.transaction(async transaction => { | ||
143 | await runnerJob.save({ transaction }) | ||
144 | }) | ||
145 | }) | ||
146 | |||
147 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | ||
148 | |||
149 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
150 | } | ||
151 | |||
152 | protected abstract specificComplete (options: { | ||
153 | runnerJob: MRunnerJob | ||
154 | resultPayload: S | ||
155 | }): Promise<void> | void | ||
156 | |||
157 | // --------------------------------------------------------------------------- | ||
158 | |||
159 | async cancel (options: { | ||
160 | runnerJob: MRunnerJob | ||
161 | fromParent?: boolean | ||
162 | }) { | ||
163 | const { runnerJob, fromParent } = options | ||
164 | |||
165 | await this.specificCancel(options) | ||
166 | |||
167 | const cancelState = fromParent | ||
168 | ? RunnerJobState.PARENT_CANCELLED | ||
169 | : RunnerJobState.CANCELLED | ||
170 | |||
171 | runnerJob.setToErrorOrCancel(cancelState) | ||
172 | |||
173 | await retryTransactionWrapper(() => { | ||
174 | return sequelizeTypescript.transaction(async transaction => { | ||
175 | await runnerJob.save({ transaction }) | ||
176 | }) | ||
177 | }) | ||
178 | |||
179 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
180 | for (const child of children) { | ||
181 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | ||
182 | |||
183 | await this.cancel({ runnerJob: child, fromParent: true }) | ||
184 | } | ||
185 | } | ||
186 | |||
187 | protected abstract specificCancel (options: { | ||
188 | runnerJob: MRunnerJob | ||
189 | }): Promise<void> | void | ||
190 | |||
191 | // --------------------------------------------------------------------------- | ||
192 | |||
193 | protected abstract isAbortSupported (): boolean | ||
194 | |||
195 | async abort (options: { | ||
196 | runnerJob: MRunnerJob | ||
197 | }) { | ||
198 | const { runnerJob } = options | ||
199 | |||
200 | if (this.isAbortSupported() !== true) { | ||
201 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | ||
202 | } | ||
203 | |||
204 | await this.specificAbort(options) | ||
205 | |||
206 | runnerJob.resetToPending() | ||
207 | |||
208 | await retryTransactionWrapper(() => { | ||
209 | return sequelizeTypescript.transaction(async transaction => { | ||
210 | await runnerJob.save({ transaction }) | ||
211 | }) | ||
212 | }) | ||
213 | } | ||
214 | |||
215 | protected setAbortState (runnerJob: MRunnerJob) { | ||
216 | runnerJob.resetToPending() | ||
217 | } | ||
218 | |||
219 | protected abstract specificAbort (options: { | ||
220 | runnerJob: MRunnerJob | ||
221 | }): Promise<void> | void | ||
222 | |||
223 | // --------------------------------------------------------------------------- | ||
224 | |||
225 | async error (options: { | ||
226 | runnerJob: MRunnerJob | ||
227 | message: string | ||
228 | fromParent?: boolean | ||
229 | }) { | ||
230 | const { runnerJob, message, fromParent } = options | ||
231 | |||
232 | const errorState = fromParent | ||
233 | ? RunnerJobState.PARENT_ERRORED | ||
234 | : RunnerJobState.ERRORED | ||
235 | |||
236 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | ||
237 | ? RunnerJobState.PENDING | ||
238 | : errorState | ||
239 | |||
240 | await this.specificError({ ...options, nextState }) | ||
241 | |||
242 | if (nextState === errorState) { | ||
243 | runnerJob.setToErrorOrCancel(nextState) | ||
244 | runnerJob.error = message | ||
245 | } else { | ||
246 | runnerJob.resetToPending() | ||
247 | } | ||
248 | |||
249 | await retryTransactionWrapper(() => { | ||
250 | return sequelizeTypescript.transaction(async transaction => { | ||
251 | await runnerJob.save({ transaction }) | ||
252 | }) | ||
253 | }) | ||
254 | |||
255 | if (runnerJob.state === errorState) { | ||
256 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
257 | |||
258 | for (const child of children) { | ||
259 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | ||
260 | |||
261 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | ||
262 | } | ||
263 | } | ||
264 | } | ||
265 | |||
266 | protected abstract specificError (options: { | ||
267 | runnerJob: MRunnerJob | ||
268 | message: string | ||
269 | nextState: RunnerJobState | ||
270 | }): Promise<void> | void | ||
271 | } | ||
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts new file mode 100644 index 000000000..517645848 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts | |||
@@ -0,0 +1,71 @@ | |||
1 | |||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | ||
5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
6 | import { MRunnerJob } from '@server/types/models/runners' | ||
7 | import { | ||
8 | LiveRTMPHLSTranscodingUpdatePayload, | ||
9 | RunnerJobSuccessPayload, | ||
10 | RunnerJobUpdatePayload, | ||
11 | RunnerJobVODPrivatePayload | ||
12 | } from '@shared/models' | ||
13 | import { AbstractJobHandler } from './abstract-job-handler' | ||
14 | import { loadTranscodingRunnerVideo } from './shared' | ||
15 | |||
16 | // eslint-disable-next-line max-len | ||
17 | export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> { | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | protected isAbortSupported () { | ||
22 | return true | ||
23 | } | ||
24 | |||
25 | protected specificUpdate (_options: { | ||
26 | runnerJob: MRunnerJob | ||
27 | updatePayload?: LiveRTMPHLSTranscodingUpdatePayload | ||
28 | }) { | ||
29 | // empty | ||
30 | } | ||
31 | |||
32 | protected specificAbort (_options: { | ||
33 | runnerJob: MRunnerJob | ||
34 | }) { | ||
35 | // empty | ||
36 | } | ||
37 | |||
38 | protected async specificError (options: { | ||
39 | runnerJob: MRunnerJob | ||
40 | }) { | ||
41 | const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) | ||
42 | if (!video) return | ||
43 | |||
44 | await moveToFailedTranscodingState(video) | ||
45 | |||
46 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
47 | } | ||
48 | |||
49 | protected async specificCancel (options: { | ||
50 | runnerJob: MRunnerJob | ||
51 | }) { | ||
52 | const { runnerJob } = options | ||
53 | |||
54 | const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) | ||
55 | if (!video) return | ||
56 | |||
57 | const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
58 | |||
59 | logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid)) | ||
60 | |||
61 | if (pending === 0) { | ||
62 | logger.info( | ||
63 | `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`, | ||
64 | this.lTags(video.uuid) | ||
65 | ) | ||
66 | |||
67 | const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload | ||
68 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo }) | ||
69 | } | ||
70 | } | ||
71 | } | ||
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts new file mode 100644 index 000000000..0fca72b9a --- /dev/null +++ b/server/lib/runners/job-handlers/index.ts | |||
@@ -0,0 +1,6 @@ | |||
1 | export * from './abstract-job-handler' | ||
2 | export * from './live-rtmp-hls-transcoding-job-handler' | ||
3 | export * from './vod-audio-merge-transcoding-job-handler' | ||
4 | export * from './vod-hls-transcoding-job-handler' | ||
5 | export * from './vod-web-video-transcoding-job-handler' | ||
6 | export * from './runner-job-handlers' | ||
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..c3d0e427d --- /dev/null +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts | |||
@@ -0,0 +1,170 @@ | |||
1 | import { move, remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { JOB_PRIORITY } from '@server/initializers/constants' | ||
5 | import { LiveManager } from '@server/lib/live' | ||
6 | import { MStreamingPlaylist, MVideo } from '@server/types/models' | ||
7 | import { MRunnerJob } from '@server/types/models/runners' | ||
8 | import { buildUUID } from '@shared/extra-utils' | ||
9 | import { | ||
10 | LiveRTMPHLSTranscodingSuccess, | ||
11 | LiveRTMPHLSTranscodingUpdatePayload, | ||
12 | LiveVideoError, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
14 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
15 | RunnerJobState | ||
16 | } from '@shared/models' | ||
17 | import { AbstractJobHandler } from './abstract-job-handler' | ||
18 | |||
19 | type CreateOptions = { | ||
20 | video: MVideo | ||
21 | playlist: MStreamingPlaylist | ||
22 | |||
23 | rtmpUrl: string | ||
24 | |||
25 | toTranscode: { | ||
26 | resolution: number | ||
27 | fps: number | ||
28 | }[] | ||
29 | |||
30 | segmentListSize: number | ||
31 | segmentDuration: number | ||
32 | |||
33 | outputDirectory: string | ||
34 | } | ||
35 | |||
36 | // eslint-disable-next-line max-len | ||
37 | export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> { | ||
38 | |||
39 | async create (options: CreateOptions) { | ||
40 | const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options | ||
41 | |||
42 | const jobUUID = buildUUID() | ||
43 | const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { | ||
44 | input: { | ||
45 | rtmpUrl | ||
46 | }, | ||
47 | output: { | ||
48 | toTranscode, | ||
49 | segmentListSize, | ||
50 | segmentDuration | ||
51 | } | ||
52 | } | ||
53 | |||
54 | const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { | ||
55 | videoUUID: video.uuid, | ||
56 | masterPlaylistName: playlist.playlistFilename, | ||
57 | outputDirectory | ||
58 | } | ||
59 | |||
60 | const job = await this.createRunnerJob({ | ||
61 | type: 'live-rtmp-hls-transcoding', | ||
62 | jobUUID, | ||
63 | payload, | ||
64 | privatePayload, | ||
65 | priority: JOB_PRIORITY.TRANSCODING | ||
66 | }) | ||
67 | |||
68 | return job | ||
69 | } | ||
70 | |||
71 | // --------------------------------------------------------------------------- | ||
72 | |||
73 | async specificUpdate (options: { | ||
74 | runnerJob: MRunnerJob | ||
75 | updatePayload: LiveRTMPHLSTranscodingUpdatePayload | ||
76 | }) { | ||
77 | const { runnerJob, updatePayload } = options | ||
78 | |||
79 | const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
80 | const outputDirectory = privatePayload.outputDirectory | ||
81 | const videoUUID = privatePayload.videoUUID | ||
82 | |||
83 | if (updatePayload.type === 'add-chunk') { | ||
84 | await move( | ||
85 | updatePayload.videoChunkFile as string, | ||
86 | join(outputDirectory, updatePayload.videoChunkFilename), | ||
87 | { overwrite: true } | ||
88 | ) | ||
89 | } else if (updatePayload.type === 'remove-chunk') { | ||
90 | await remove(join(outputDirectory, updatePayload.videoChunkFilename)) | ||
91 | } | ||
92 | |||
93 | if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) { | ||
94 | await move( | ||
95 | updatePayload.resolutionPlaylistFile as string, | ||
96 | join(outputDirectory, updatePayload.resolutionPlaylistFilename), | ||
97 | { overwrite: true } | ||
98 | ) | ||
99 | } | ||
100 | |||
101 | if (updatePayload.masterPlaylistFile) { | ||
102 | await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true }) | ||
103 | } | ||
104 | |||
105 | logger.info( | ||
106 | 'Runner live RTMP to HLS job %s for %s updated.', | ||
107 | runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) } | ||
108 | ) | ||
109 | } | ||
110 | |||
111 | // --------------------------------------------------------------------------- | ||
112 | |||
113 | protected specificComplete (options: { | ||
114 | runnerJob: MRunnerJob | ||
115 | }) { | ||
116 | return this.stopLive({ | ||
117 | runnerJob: options.runnerJob, | ||
118 | type: 'ended' | ||
119 | }) | ||
120 | } | ||
121 | |||
122 | // --------------------------------------------------------------------------- | ||
123 | |||
124 | protected isAbortSupported () { | ||
125 | return false | ||
126 | } | ||
127 | |||
128 | protected specificAbort () { | ||
129 | throw new Error('Not implemented') | ||
130 | } | ||
131 | |||
132 | protected specificError (options: { | ||
133 | runnerJob: MRunnerJob | ||
134 | nextState: RunnerJobState | ||
135 | }) { | ||
136 | return this.stopLive({ | ||
137 | runnerJob: options.runnerJob, | ||
138 | type: 'errored' | ||
139 | }) | ||
140 | } | ||
141 | |||
142 | protected specificCancel (options: { | ||
143 | runnerJob: MRunnerJob | ||
144 | }) { | ||
145 | return this.stopLive({ | ||
146 | runnerJob: options.runnerJob, | ||
147 | type: 'cancelled' | ||
148 | }) | ||
149 | } | ||
150 | |||
151 | private stopLive (options: { | ||
152 | runnerJob: MRunnerJob | ||
153 | type: 'ended' | 'errored' | 'cancelled' | ||
154 | }) { | ||
155 | const { runnerJob, type } = options | ||
156 | |||
157 | const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
158 | const videoUUID = privatePayload.videoUUID | ||
159 | |||
160 | const errorType = { | ||
161 | ended: null, | ||
162 | errored: LiveVideoError.RUNNER_JOB_ERROR, | ||
163 | cancelled: LiveVideoError.RUNNER_JOB_CANCEL | ||
164 | } | ||
165 | |||
166 | LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type]) | ||
167 | |||
168 | logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID)) | ||
169 | } | ||
170 | } | ||
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts new file mode 100644 index 000000000..7bad1bc77 --- /dev/null +++ b/server/lib/runners/job-handlers/runner-job-handlers.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import { MRunnerJob } from '@server/types/models/runners' | ||
2 | import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' | ||
3 | import { AbstractJobHandler } from './abstract-job-handler' | ||
4 | import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' | ||
5 | import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' | ||
6 | import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' | ||
7 | import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' | ||
8 | |||
9 | const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, RunnerJobUpdatePayload, RunnerJobSuccessPayload>> = { | ||
10 | 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, | ||
11 | 'vod-hls-transcoding': VODHLSTranscodingJobHandler, | ||
12 | 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, | ||
13 | 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler | ||
14 | } | ||
15 | |||
16 | export function getRunnerJobHandlerClass (job: MRunnerJob) { | ||
17 | return processors[job.type] | ||
18 | } | ||
diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts new file mode 100644 index 000000000..348273ae2 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './vod-helpers' | |||
diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts new file mode 100644 index 000000000..93ae89ff8 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/vod-helpers.ts | |||
@@ -0,0 +1,44 @@ | |||
1 | import { move } from 'fs-extra' | ||
2 | import { dirname, join } from 'path' | ||
3 | import { logger, LoggerTagsFn } from '@server/helpers/logger' | ||
4 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' | ||
5 | import { onWebTorrentVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding' | ||
6 | import { buildNewFile } from '@server/lib/video-file' | ||
7 | import { VideoModel } from '@server/models/video/video' | ||
8 | import { MVideoFullLight } from '@server/types/models' | ||
9 | import { MRunnerJob } from '@server/types/models/runners' | ||
10 | import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models' | ||
11 | |||
12 | export async function onVODWebVideoOrAudioMergeTranscodingJob (options: { | ||
13 | video: MVideoFullLight | ||
14 | videoFilePath: string | ||
15 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload | ||
16 | }) { | ||
17 | const { video, videoFilePath, privatePayload } = options | ||
18 | |||
19 | const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' }) | ||
20 | videoFile.videoId = video.id | ||
21 | |||
22 | const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) | ||
23 | await move(videoFilePath, newVideoFilePath) | ||
24 | |||
25 | await onWebTorrentVideoFileTranscoding({ | ||
26 | video, | ||
27 | videoFile, | ||
28 | videoOutputPath: newVideoFilePath | ||
29 | }) | ||
30 | |||
31 | await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) | ||
32 | } | ||
33 | |||
34 | export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) { | ||
35 | const videoUUID = runnerJob.privatePayload.videoUUID | ||
36 | |||
37 | const video = await VideoModel.loadFull(videoUUID) | ||
38 | if (!video) { | ||
39 | logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID)) | ||
40 | return undefined | ||
41 | } | ||
42 | |||
43 | return video | ||
44 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts new file mode 100644 index 000000000..a7b33f87e --- /dev/null +++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts | |||
@@ -0,0 +1,97 @@ | |||
1 | import { pick } from 'lodash' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { MRunnerJob } from '@server/types/models/runners' | ||
6 | import { buildUUID } from '@shared/extra-utils' | ||
7 | import { getVideoStreamDuration } from '@shared/ffmpeg' | ||
8 | import { | ||
9 | RunnerJobUpdatePayload, | ||
10 | RunnerJobVODAudioMergeTranscodingPayload, | ||
11 | RunnerJobVODWebVideoTranscodingPrivatePayload, | ||
12 | VODAudioMergeTranscodingSuccess | ||
13 | } from '@shared/models' | ||
14 | import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls' | ||
15 | import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' | ||
16 | import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' | ||
17 | |||
18 | type CreateOptions = { | ||
19 | video: MVideo | ||
20 | isNewVideo: boolean | ||
21 | resolution: number | ||
22 | fps: number | ||
23 | priority: number | ||
24 | dependsOnRunnerJob?: MRunnerJob | ||
25 | } | ||
26 | |||
27 | // eslint-disable-next-line max-len | ||
28 | export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODAudioMergeTranscodingSuccess> { | ||
29 | |||
30 | async create (options: CreateOptions) { | ||
31 | const { video, resolution, fps, priority, dependsOnRunnerJob } = options | ||
32 | |||
33 | const jobUUID = buildUUID() | ||
34 | const payload: RunnerJobVODAudioMergeTranscodingPayload = { | ||
35 | input: { | ||
36 | audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid), | ||
37 | previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid) | ||
38 | }, | ||
39 | output: { | ||
40 | resolution, | ||
41 | fps | ||
42 | } | ||
43 | } | ||
44 | |||
45 | const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { | ||
46 | ...pick(options, [ 'isNewVideo' ]), | ||
47 | |||
48 | videoUUID: video.uuid | ||
49 | } | ||
50 | |||
51 | const job = await this.createRunnerJob({ | ||
52 | type: 'vod-audio-merge-transcoding', | ||
53 | jobUUID, | ||
54 | payload, | ||
55 | privatePayload, | ||
56 | priority, | ||
57 | dependsOnRunnerJob | ||
58 | }) | ||
59 | |||
60 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') | ||
61 | |||
62 | return job | ||
63 | } | ||
64 | |||
65 | // --------------------------------------------------------------------------- | ||
66 | |||
67 | async specificComplete (options: { | ||
68 | runnerJob: MRunnerJob | ||
69 | resultPayload: VODAudioMergeTranscodingSuccess | ||
70 | }) { | ||
71 | const { runnerJob, resultPayload } = options | ||
72 | const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload | ||
73 | |||
74 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
75 | if (!video) return | ||
76 | |||
77 | const videoFilePath = resultPayload.videoFile as string | ||
78 | |||
79 | // ffmpeg generated a new video file, so update the video duration | ||
80 | // See https://trac.ffmpeg.org/ticket/5456 | ||
81 | video.duration = await getVideoStreamDuration(videoFilePath) | ||
82 | await video.save() | ||
83 | |||
84 | // We can remove the old audio file | ||
85 | const oldAudioFile = video.VideoFiles[0] | ||
86 | await video.removeWebTorrentFile(oldAudioFile) | ||
87 | await oldAudioFile.destroy() | ||
88 | video.VideoFiles = [] | ||
89 | |||
90 | await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) | ||
91 | |||
92 | logger.info( | ||
93 | 'Runner VOD audio merge transcoding job %s for %s ended.', | ||
94 | runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) | ||
95 | ) | ||
96 | } | ||
97 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..02566b9d5 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts | |||
@@ -0,0 +1,114 @@ | |||
1 | import { move } from 'fs-extra' | ||
2 | import { dirname, join } from 'path' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { renameVideoFileInPlaylist } from '@server/lib/hls' | ||
5 | import { getHlsResolutionPlaylistFilename } from '@server/lib/paths' | ||
6 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' | ||
7 | import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding' | ||
8 | import { buildNewFile, removeAllWebTorrentFiles } from '@server/lib/video-file' | ||
9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
10 | import { MVideo } from '@server/types/models' | ||
11 | import { MRunnerJob } from '@server/types/models/runners' | ||
12 | import { pick } from '@shared/core-utils' | ||
13 | import { buildUUID } from '@shared/extra-utils' | ||
14 | import { | ||
15 | RunnerJobUpdatePayload, | ||
16 | RunnerJobVODHLSTranscodingPayload, | ||
17 | RunnerJobVODHLSTranscodingPrivatePayload, | ||
18 | VODHLSTranscodingSuccess | ||
19 | } from '@shared/models' | ||
20 | import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' | ||
21 | import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' | ||
22 | import { loadTranscodingRunnerVideo } from './shared' | ||
23 | |||
24 | type CreateOptions = { | ||
25 | video: MVideo | ||
26 | isNewVideo: boolean | ||
27 | deleteWebVideoFiles: boolean | ||
28 | resolution: number | ||
29 | fps: number | ||
30 | priority: number | ||
31 | dependsOnRunnerJob?: MRunnerJob | ||
32 | } | ||
33 | |||
34 | // eslint-disable-next-line max-len | ||
35 | export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODHLSTranscodingSuccess> { | ||
36 | |||
37 | async create (options: CreateOptions) { | ||
38 | const { video, resolution, fps, dependsOnRunnerJob, priority } = options | ||
39 | |||
40 | const jobUUID = buildUUID() | ||
41 | |||
42 | const payload: RunnerJobVODHLSTranscodingPayload = { | ||
43 | input: { | ||
44 | videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) | ||
45 | }, | ||
46 | output: { | ||
47 | resolution, | ||
48 | fps | ||
49 | } | ||
50 | } | ||
51 | |||
52 | const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = { | ||
53 | ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]), | ||
54 | |||
55 | videoUUID: video.uuid | ||
56 | } | ||
57 | |||
58 | const job = await this.createRunnerJob({ | ||
59 | type: 'vod-hls-transcoding', | ||
60 | jobUUID, | ||
61 | payload, | ||
62 | privatePayload, | ||
63 | priority, | ||
64 | dependsOnRunnerJob | ||
65 | }) | ||
66 | |||
67 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') | ||
68 | |||
69 | return job | ||
70 | } | ||
71 | |||
72 | // --------------------------------------------------------------------------- | ||
73 | |||
74 | async specificComplete (options: { | ||
75 | runnerJob: MRunnerJob | ||
76 | resultPayload: VODHLSTranscodingSuccess | ||
77 | }) { | ||
78 | const { runnerJob, resultPayload } = options | ||
79 | const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload | ||
80 | |||
81 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
82 | if (!video) return | ||
83 | |||
84 | const videoFilePath = resultPayload.videoFile as string | ||
85 | const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string | ||
86 | |||
87 | const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' }) | ||
88 | const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) | ||
89 | await move(videoFilePath, newVideoFilePath) | ||
90 | |||
91 | const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename) | ||
92 | const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename) | ||
93 | await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath) | ||
94 | |||
95 | await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename) | ||
96 | |||
97 | await onHLSVideoFileTranscoding({ | ||
98 | video, | ||
99 | videoFile, | ||
100 | m3u8OutputPath: newResolutionPlaylistFilePath, | ||
101 | videoOutputPath: newVideoFilePath | ||
102 | }) | ||
103 | |||
104 | await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) | ||
105 | |||
106 | if (privatePayload.deleteWebVideoFiles === true) { | ||
107 | logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid)) | ||
108 | |||
109 | await removeAllWebTorrentFiles(video) | ||
110 | } | ||
111 | |||
112 | logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid)) | ||
113 | } | ||
114 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts new file mode 100644 index 000000000..57761a7a1 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts | |||
@@ -0,0 +1,84 @@ | |||
1 | import { pick } from 'lodash' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { MRunnerJob } from '@server/types/models/runners' | ||
6 | import { buildUUID } from '@shared/extra-utils' | ||
7 | import { | ||
8 | RunnerJobUpdatePayload, | ||
9 | RunnerJobVODWebVideoTranscodingPayload, | ||
10 | RunnerJobVODWebVideoTranscodingPrivatePayload, | ||
11 | VODWebVideoTranscodingSuccess | ||
12 | } from '@shared/models' | ||
13 | import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' | ||
14 | import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' | ||
15 | import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' | ||
16 | |||
17 | type CreateOptions = { | ||
18 | video: MVideo | ||
19 | isNewVideo: boolean | ||
20 | resolution: number | ||
21 | fps: number | ||
22 | priority: number | ||
23 | dependsOnRunnerJob?: MRunnerJob | ||
24 | } | ||
25 | |||
26 | // eslint-disable-next-line max-len | ||
27 | export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODWebVideoTranscodingSuccess> { | ||
28 | |||
29 | async create (options: CreateOptions) { | ||
30 | const { video, resolution, fps, priority, dependsOnRunnerJob } = options | ||
31 | |||
32 | const jobUUID = buildUUID() | ||
33 | const payload: RunnerJobVODWebVideoTranscodingPayload = { | ||
34 | input: { | ||
35 | videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) | ||
36 | }, | ||
37 | output: { | ||
38 | resolution, | ||
39 | fps | ||
40 | } | ||
41 | } | ||
42 | |||
43 | const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { | ||
44 | ...pick(options, [ 'isNewVideo' ]), | ||
45 | |||
46 | videoUUID: video.uuid | ||
47 | } | ||
48 | |||
49 | const job = await this.createRunnerJob({ | ||
50 | type: 'vod-web-video-transcoding', | ||
51 | jobUUID, | ||
52 | payload, | ||
53 | privatePayload, | ||
54 | dependsOnRunnerJob, | ||
55 | priority | ||
56 | }) | ||
57 | |||
58 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') | ||
59 | |||
60 | return job | ||
61 | } | ||
62 | |||
63 | // --------------------------------------------------------------------------- | ||
64 | |||
65 | async specificComplete (options: { | ||
66 | runnerJob: MRunnerJob | ||
67 | resultPayload: VODWebVideoTranscodingSuccess | ||
68 | }) { | ||
69 | const { runnerJob, resultPayload } = options | ||
70 | const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload | ||
71 | |||
72 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
73 | if (!video) return | ||
74 | |||
75 | const videoFilePath = resultPayload.videoFile as string | ||
76 | |||
77 | await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) | ||
78 | |||
79 | logger.info( | ||
80 | 'Runner VOD web video transcoding job %s for %s ended.', | ||
81 | runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) | ||
82 | ) | ||
83 | } | ||
84 | } | ||
diff --git a/server/lib/runners/runner-urls.ts b/server/lib/runners/runner-urls.ts new file mode 100644 index 000000000..329fb1170 --- /dev/null +++ b/server/lib/runners/runner-urls.ts | |||
@@ -0,0 +1,9 @@ | |||
1 | import { WEBSERVER } from '@server/initializers/constants' | ||
2 | |||
3 | export function generateRunnerTranscodingVideoInputFileUrl (jobUUID: string, videoUUID: string) { | ||
4 | return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/max-quality' | ||
5 | } | ||
6 | |||
7 | export function generateRunnerTranscodingVideoPreviewFileUrl (jobUUID: string, videoUUID: string) { | ||
8 | return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/previews/max-quality' | ||
9 | } | ||
diff --git a/server/lib/runners/runner.ts b/server/lib/runners/runner.ts new file mode 100644 index 000000000..74c814ba1 --- /dev/null +++ b/server/lib/runners/runner.ts | |||
@@ -0,0 +1,36 @@ | |||
1 | import express from 'express' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
4 | import { sequelizeTypescript } from '@server/initializers/database' | ||
5 | import { MRunner } from '@server/types/models/runners' | ||
6 | |||
7 | const lTags = loggerTagsFactory('runner') | ||
8 | |||
9 | const updatingRunner = new Set<number>() | ||
10 | |||
11 | function updateLastRunnerContact (req: express.Request, runner: MRunner) { | ||
12 | const now = new Date() | ||
13 | |||
14 | // Don't update last runner contact too often | ||
15 | if (now.getTime() - runner.lastContact.getTime() < 2000) return | ||
16 | if (updatingRunner.has(runner.id)) return | ||
17 | |||
18 | updatingRunner.add(runner.id) | ||
19 | |||
20 | runner.lastContact = now | ||
21 | runner.ip = req.ip | ||
22 | |||
23 | logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name)) | ||
24 | |||
25 | retryTransactionWrapper(() => { | ||
26 | return sequelizeTypescript.transaction(async transaction => { | ||
27 | return runner.save({ transaction }) | ||
28 | }) | ||
29 | }) | ||
30 | .catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) })) | ||
31 | .finally(() => updatingRunner.delete(runner.id)) | ||
32 | } | ||
33 | |||
34 | export { | ||
35 | updateLastRunnerContact | ||
36 | } | ||
diff --git a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts new file mode 100644 index 000000000..f7a26d2bc --- /dev/null +++ b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts | |||
@@ -0,0 +1,42 @@ | |||
1 | import { CONFIG } from '@server/initializers/config' | ||
2 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
3 | import { logger, loggerTagsFactory } from '../../helpers/logger' | ||
4 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
5 | import { getRunnerJobHandlerClass } from '../runners' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('runner') | ||
9 | |||
10 | export class RunnerJobWatchDogScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.RUNNER_JOB_WATCH_DOG | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const vodStalledJobs = await RunnerJobModel.listStalledJobs({ | ||
22 | staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.VOD, | ||
23 | types: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ] | ||
24 | }) | ||
25 | |||
26 | const liveStalledJobs = await RunnerJobModel.listStalledJobs({ | ||
27 | staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.LIVE, | ||
28 | types: [ 'live-rtmp-hls-transcoding' ] | ||
29 | }) | ||
30 | |||
31 | for (const stalled of [ ...vodStalledJobs, ...liveStalledJobs ]) { | ||
32 | logger.info('Abort stalled runner job %s (%s)', stalled.uuid, stalled.type, lTags(stalled.uuid, stalled.type)) | ||
33 | |||
34 | const Handler = getRunnerJobHandlerClass(stalled) | ||
35 | await new Handler().abort({ runnerJob: stalled }) | ||
36 | } | ||
37 | } | ||
38 | |||
39 | static get Instance () { | ||
40 | return this.instance || (this.instance = new this()) | ||
41 | } | ||
42 | } | ||
diff --git a/server/lib/server-config-manager.ts b/server/lib/server-config-manager.ts index e87e2854f..ba7916363 100644 --- a/server/lib/server-config-manager.ts +++ b/server/lib/server-config-manager.ts | |||
@@ -126,11 +126,14 @@ class ServerConfigManager { | |||
126 | serverVersion: PEERTUBE_VERSION, | 126 | serverVersion: PEERTUBE_VERSION, |
127 | serverCommit: this.serverCommit, | 127 | serverCommit: this.serverCommit, |
128 | transcoding: { | 128 | transcoding: { |
129 | remoteRunners: { | ||
130 | enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED | ||
131 | }, | ||
129 | hls: { | 132 | hls: { |
130 | enabled: CONFIG.TRANSCODING.HLS.ENABLED | 133 | enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.HLS.ENABLED |
131 | }, | 134 | }, |
132 | webtorrent: { | 135 | webtorrent: { |
133 | enabled: CONFIG.TRANSCODING.WEBTORRENT.ENABLED | 136 | enabled: CONFIG.TRANSCODING.ENABLED && CONFIG.TRANSCODING.WEBTORRENT.ENABLED |
134 | }, | 137 | }, |
135 | enabledResolutions: this.getEnabledResolutions('vod'), | 138 | enabledResolutions: this.getEnabledResolutions('vod'), |
136 | profile: CONFIG.TRANSCODING.PROFILE, | 139 | profile: CONFIG.TRANSCODING.PROFILE, |
@@ -150,6 +153,9 @@ class ServerConfigManager { | |||
150 | 153 | ||
151 | transcoding: { | 154 | transcoding: { |
152 | enabled: CONFIG.LIVE.TRANSCODING.ENABLED, | 155 | enabled: CONFIG.LIVE.TRANSCODING.ENABLED, |
156 | remoteRunners: { | ||
157 | enabled: CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED | ||
158 | }, | ||
153 | enabledResolutions: this.getEnabledResolutions('live'), | 159 | enabledResolutions: this.getEnabledResolutions('live'), |
154 | profile: CONFIG.LIVE.TRANSCODING.PROFILE, | 160 | profile: CONFIG.LIVE.TRANSCODING.PROFILE, |
155 | availableProfiles: VideoTranscodingProfilesManager.Instance.getAvailableProfiles('live') | 161 | availableProfiles: VideoTranscodingProfilesManager.Instance.getAvailableProfiles('live') |
diff --git a/server/lib/transcoding/create-transcoding-job.ts b/server/lib/transcoding/create-transcoding-job.ts new file mode 100644 index 000000000..46831a912 --- /dev/null +++ b/server/lib/transcoding/create-transcoding-job.ts | |||
@@ -0,0 +1,36 @@ | |||
1 | import { CONFIG } from '@server/initializers/config' | ||
2 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' | ||
3 | import { TranscodingJobQueueBuilder, TranscodingRunnerJobBuilder } from './shared' | ||
4 | |||
5 | export function createOptimizeOrMergeAudioJobs (options: { | ||
6 | video: MVideoFullLight | ||
7 | videoFile: MVideoFile | ||
8 | isNewVideo: boolean | ||
9 | user: MUserId | ||
10 | }) { | ||
11 | return getJobBuilder().createOptimizeOrMergeAudioJobs(options) | ||
12 | } | ||
13 | |||
14 | // --------------------------------------------------------------------------- | ||
15 | |||
16 | export function createTranscodingJobs (options: { | ||
17 | transcodingType: 'hls' | 'webtorrent' | ||
18 | video: MVideoFullLight | ||
19 | resolutions: number[] | ||
20 | isNewVideo: boolean | ||
21 | user: MUserId | ||
22 | }) { | ||
23 | return getJobBuilder().createTranscodingJobs(options) | ||
24 | } | ||
25 | |||
26 | // --------------------------------------------------------------------------- | ||
27 | // Private | ||
28 | // --------------------------------------------------------------------------- | ||
29 | |||
30 | function getJobBuilder () { | ||
31 | if (CONFIG.TRANSCODING.REMOTE_RUNNERS.ENABLED === true) { | ||
32 | return new TranscodingRunnerJobBuilder() | ||
33 | } | ||
34 | |||
35 | return new TranscodingJobQueueBuilder() | ||
36 | } | ||
diff --git a/server/lib/transcoding/default-transcoding-profiles.ts b/server/lib/transcoding/default-transcoding-profiles.ts index f47718819..5251784ac 100644 --- a/server/lib/transcoding/default-transcoding-profiles.ts +++ b/server/lib/transcoding/default-transcoding-profiles.ts | |||
@@ -1,15 +1,9 @@ | |||
1 | 1 | ||
2 | import { logger } from '@server/helpers/logger' | 2 | import { logger } from '@server/helpers/logger' |
3 | import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils' | 3 | import { getAverageBitrate, getMinLimitBitrate } from '@shared/core-utils' |
4 | import { AvailableEncoders, EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '../../../shared/models/videos' | 4 | import { buildStreamSuffix, FFmpegCommandWrapper, ffprobePromise, getAudioStream, getMaxAudioBitrate } from '@shared/ffmpeg' |
5 | import { | 5 | import { AvailableEncoders, EncoderOptionsBuilder, EncoderOptionsBuilderParams, VideoResolution } from '@shared/models' |
6 | buildStreamSuffix, | 6 | import { canDoQuickAudioTranscode } from './transcoding-quick-transcode' |
7 | canDoQuickAudioTranscode, | ||
8 | ffprobePromise, | ||
9 | getAudioStream, | ||
10 | getMaxAudioBitrate, | ||
11 | resetSupportedEncoders | ||
12 | } from '../../helpers/ffmpeg' | ||
13 | 7 | ||
14 | /** | 8 | /** |
15 | * | 9 | * |
@@ -184,14 +178,14 @@ class VideoTranscodingProfilesManager { | |||
184 | addEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { | 178 | addEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { |
185 | this.encodersPriorities[type][streamType].push({ name: encoder, priority }) | 179 | this.encodersPriorities[type][streamType].push({ name: encoder, priority }) |
186 | 180 | ||
187 | resetSupportedEncoders() | 181 | FFmpegCommandWrapper.resetSupportedEncoders() |
188 | } | 182 | } |
189 | 183 | ||
190 | removeEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { | 184 | removeEncoderPriority (type: 'vod' | 'live', streamType: 'audio' | 'video', encoder: string, priority: number) { |
191 | this.encodersPriorities[type][streamType] = this.encodersPriorities[type][streamType] | 185 | this.encodersPriorities[type][streamType] = this.encodersPriorities[type][streamType] |
192 | .filter(o => o.name !== encoder && o.priority !== priority) | 186 | .filter(o => o.name !== encoder && o.priority !== priority) |
193 | 187 | ||
194 | resetSupportedEncoders() | 188 | FFmpegCommandWrapper.resetSupportedEncoders() |
195 | } | 189 | } |
196 | 190 | ||
197 | private getEncodersByPriority (type: 'vod' | 'live', streamType: 'audio' | 'video') { | 191 | private getEncodersByPriority (type: 'vod' | 'live', streamType: 'audio' | 'video') { |
diff --git a/server/lib/transcoding/ended-transcoding.ts b/server/lib/transcoding/ended-transcoding.ts new file mode 100644 index 000000000..d31674ede --- /dev/null +++ b/server/lib/transcoding/ended-transcoding.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
2 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
3 | import { MVideo } from '@server/types/models' | ||
4 | import { moveToNextState } from '../video-state' | ||
5 | |||
6 | export async function onTranscodingEnded (options: { | ||
7 | video: MVideo | ||
8 | isNewVideo: boolean | ||
9 | moveVideoToNextState: boolean | ||
10 | }) { | ||
11 | const { video, isNewVideo, moveVideoToNextState } = options | ||
12 | |||
13 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
14 | |||
15 | if (moveVideoToNextState) { | ||
16 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo }) | ||
17 | } | ||
18 | } | ||
diff --git a/server/lib/transcoding/hls-transcoding.ts b/server/lib/transcoding/hls-transcoding.ts new file mode 100644 index 000000000..cffa859c7 --- /dev/null +++ b/server/lib/transcoding/hls-transcoding.ts | |||
@@ -0,0 +1,181 @@ | |||
1 | import { MutexInterface } from 'async-mutex' | ||
2 | import { Job } from 'bullmq' | ||
3 | import { ensureDir, move, stat } from 'fs-extra' | ||
4 | import { basename, extname as extnameUtil, join } from 'path' | ||
5 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
6 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | ||
7 | import { sequelizeTypescript } from '@server/initializers/database' | ||
8 | import { MVideo, MVideoFile } from '@server/types/models' | ||
9 | import { pick } from '@shared/core-utils' | ||
10 | import { getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg' | ||
11 | import { VideoResolution } from '@shared/models' | ||
12 | import { CONFIG } from '../../initializers/config' | ||
13 | import { VideoFileModel } from '../../models/video/video-file' | ||
14 | import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist' | ||
15 | import { updatePlaylistAfterFileChange } from '../hls' | ||
16 | import { generateHLSVideoFilename, getHlsResolutionPlaylistFilename } from '../paths' | ||
17 | import { buildFileMetadata } from '../video-file' | ||
18 | import { VideoPathManager } from '../video-path-manager' | ||
19 | import { buildFFmpegVOD } from './shared' | ||
20 | |||
21 | // Concat TS segments from a live video to a fragmented mp4 HLS playlist | ||
22 | export async function generateHlsPlaylistResolutionFromTS (options: { | ||
23 | video: MVideo | ||
24 | concatenatedTsFilePath: string | ||
25 | resolution: VideoResolution | ||
26 | fps: number | ||
27 | isAAC: boolean | ||
28 | inputFileMutexReleaser: MutexInterface.Releaser | ||
29 | }) { | ||
30 | return generateHlsPlaylistCommon({ | ||
31 | type: 'hls-from-ts' as 'hls-from-ts', | ||
32 | inputPath: options.concatenatedTsFilePath, | ||
33 | |||
34 | ...pick(options, [ 'video', 'resolution', 'fps', 'inputFileMutexReleaser', 'isAAC' ]) | ||
35 | }) | ||
36 | } | ||
37 | |||
38 | // Generate an HLS playlist from an input file, and update the master playlist | ||
39 | export function generateHlsPlaylistResolution (options: { | ||
40 | video: MVideo | ||
41 | videoInputPath: string | ||
42 | resolution: VideoResolution | ||
43 | fps: number | ||
44 | copyCodecs: boolean | ||
45 | inputFileMutexReleaser: MutexInterface.Releaser | ||
46 | job?: Job | ||
47 | }) { | ||
48 | return generateHlsPlaylistCommon({ | ||
49 | type: 'hls' as 'hls', | ||
50 | inputPath: options.videoInputPath, | ||
51 | |||
52 | ...pick(options, [ 'video', 'resolution', 'fps', 'copyCodecs', 'inputFileMutexReleaser', 'job' ]) | ||
53 | }) | ||
54 | } | ||
55 | |||
56 | export async function onHLSVideoFileTranscoding (options: { | ||
57 | video: MVideo | ||
58 | videoFile: MVideoFile | ||
59 | videoOutputPath: string | ||
60 | m3u8OutputPath: string | ||
61 | }) { | ||
62 | const { video, videoFile, videoOutputPath, m3u8OutputPath } = options | ||
63 | |||
64 | // Create or update the playlist | ||
65 | const playlist = await retryTransactionWrapper(() => { | ||
66 | return sequelizeTypescript.transaction(async transaction => { | ||
67 | return VideoStreamingPlaylistModel.loadOrGenerate(video, transaction) | ||
68 | }) | ||
69 | }) | ||
70 | videoFile.videoStreamingPlaylistId = playlist.id | ||
71 | |||
72 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
73 | |||
74 | try { | ||
75 | // VOD transcoding is a long task, refresh video attributes | ||
76 | await video.reload() | ||
77 | |||
78 | const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, videoFile) | ||
79 | await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video)) | ||
80 | |||
81 | // Move playlist file | ||
82 | const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, basename(m3u8OutputPath)) | ||
83 | await move(m3u8OutputPath, resolutionPlaylistPath, { overwrite: true }) | ||
84 | // Move video file | ||
85 | await move(videoOutputPath, videoFilePath, { overwrite: true }) | ||
86 | |||
87 | // Update video duration if it was not set (in case of a live for example) | ||
88 | if (!video.duration) { | ||
89 | video.duration = await getVideoStreamDuration(videoFilePath) | ||
90 | await video.save() | ||
91 | } | ||
92 | |||
93 | const stats = await stat(videoFilePath) | ||
94 | |||
95 | videoFile.size = stats.size | ||
96 | videoFile.fps = await getVideoStreamFPS(videoFilePath) | ||
97 | videoFile.metadata = await buildFileMetadata(videoFilePath) | ||
98 | |||
99 | await createTorrentAndSetInfoHash(playlist, videoFile) | ||
100 | |||
101 | const oldFile = await VideoFileModel.loadHLSFile({ | ||
102 | playlistId: playlist.id, | ||
103 | fps: videoFile.fps, | ||
104 | resolution: videoFile.resolution | ||
105 | }) | ||
106 | |||
107 | if (oldFile) { | ||
108 | await video.removeStreamingPlaylistVideoFile(playlist, oldFile) | ||
109 | await oldFile.destroy() | ||
110 | } | ||
111 | |||
112 | const savedVideoFile = await VideoFileModel.customUpsert(videoFile, 'streaming-playlist', undefined) | ||
113 | |||
114 | await updatePlaylistAfterFileChange(video, playlist) | ||
115 | |||
116 | return { resolutionPlaylistPath, videoFile: savedVideoFile } | ||
117 | } finally { | ||
118 | mutexReleaser() | ||
119 | } | ||
120 | } | ||
121 | |||
122 | // --------------------------------------------------------------------------- | ||
123 | |||
124 | async function generateHlsPlaylistCommon (options: { | ||
125 | type: 'hls' | 'hls-from-ts' | ||
126 | video: MVideo | ||
127 | inputPath: string | ||
128 | |||
129 | resolution: VideoResolution | ||
130 | fps: number | ||
131 | |||
132 | inputFileMutexReleaser: MutexInterface.Releaser | ||
133 | |||
134 | copyCodecs?: boolean | ||
135 | isAAC?: boolean | ||
136 | |||
137 | job?: Job | ||
138 | }) { | ||
139 | const { type, video, inputPath, resolution, fps, copyCodecs, isAAC, job, inputFileMutexReleaser } = options | ||
140 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
141 | |||
142 | const videoTranscodedBasePath = join(transcodeDirectory, type) | ||
143 | await ensureDir(videoTranscodedBasePath) | ||
144 | |||
145 | const videoFilename = generateHLSVideoFilename(resolution) | ||
146 | const videoOutputPath = join(videoTranscodedBasePath, videoFilename) | ||
147 | |||
148 | const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFilename) | ||
149 | const m3u8OutputPath = join(videoTranscodedBasePath, resolutionPlaylistFilename) | ||
150 | |||
151 | const transcodeOptions = { | ||
152 | type, | ||
153 | |||
154 | inputPath, | ||
155 | outputPath: m3u8OutputPath, | ||
156 | |||
157 | resolution, | ||
158 | fps, | ||
159 | copyCodecs, | ||
160 | |||
161 | isAAC, | ||
162 | |||
163 | inputFileMutexReleaser, | ||
164 | |||
165 | hlsPlaylist: { | ||
166 | videoFilename | ||
167 | } | ||
168 | } | ||
169 | |||
170 | await buildFFmpegVOD(job).transcode(transcodeOptions) | ||
171 | |||
172 | const newVideoFile = new VideoFileModel({ | ||
173 | resolution, | ||
174 | extname: extnameUtil(videoFilename), | ||
175 | size: 0, | ||
176 | filename: videoFilename, | ||
177 | fps: -1 | ||
178 | }) | ||
179 | |||
180 | await onHLSVideoFileTranscoding({ video, videoFile: newVideoFile, videoOutputPath, m3u8OutputPath }) | ||
181 | } | ||
diff --git a/server/lib/transcoding/shared/ffmpeg-builder.ts b/server/lib/transcoding/shared/ffmpeg-builder.ts new file mode 100644 index 000000000..441445ec4 --- /dev/null +++ b/server/lib/transcoding/shared/ffmpeg-builder.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { FFmpegVOD } from '@shared/ffmpeg' | ||
5 | import { VideoTranscodingProfilesManager } from '../default-transcoding-profiles' | ||
6 | |||
7 | export function buildFFmpegVOD (job?: Job) { | ||
8 | return new FFmpegVOD({ | ||
9 | ...getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()), | ||
10 | |||
11 | updateJobProgress: progress => { | ||
12 | if (!job) return | ||
13 | |||
14 | job.updateProgress(progress) | ||
15 | .catch(err => logger.error('Cannot update ffmpeg job progress', { err })) | ||
16 | } | ||
17 | }) | ||
18 | } | ||
diff --git a/server/lib/transcoding/shared/index.ts b/server/lib/transcoding/shared/index.ts new file mode 100644 index 000000000..f0b45bcbb --- /dev/null +++ b/server/lib/transcoding/shared/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './job-builders' | ||
2 | export * from './ffmpeg-builder' | ||
diff --git a/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts new file mode 100644 index 000000000..f1e9efdcf --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/abstract-job-builder.ts | |||
@@ -0,0 +1,38 @@ | |||
1 | |||
2 | import { JOB_PRIORITY } from '@server/initializers/constants' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' | ||
5 | |||
6 | export abstract class AbstractJobBuilder { | ||
7 | |||
8 | abstract createOptimizeOrMergeAudioJobs (options: { | ||
9 | video: MVideoFullLight | ||
10 | videoFile: MVideoFile | ||
11 | isNewVideo: boolean | ||
12 | user: MUserId | ||
13 | }): Promise<any> | ||
14 | |||
15 | abstract createTranscodingJobs (options: { | ||
16 | transcodingType: 'hls' | 'webtorrent' | ||
17 | video: MVideoFullLight | ||
18 | resolutions: number[] | ||
19 | isNewVideo: boolean | ||
20 | user: MUserId | null | ||
21 | }): Promise<any> | ||
22 | |||
23 | protected async getTranscodingJobPriority (options: { | ||
24 | user: MUserId | ||
25 | fallback: number | ||
26 | }) { | ||
27 | const { user, fallback } = options | ||
28 | |||
29 | if (!user) return fallback | ||
30 | |||
31 | const now = new Date() | ||
32 | const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) | ||
33 | |||
34 | const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) | ||
35 | |||
36 | return JOB_PRIORITY.TRANSCODING + videoUploadedByUser | ||
37 | } | ||
38 | } | ||
diff --git a/server/lib/transcoding/shared/job-builders/index.ts b/server/lib/transcoding/shared/job-builders/index.ts new file mode 100644 index 000000000..9b1c82adf --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './transcoding-job-queue-builder' | ||
2 | export * from './transcoding-runner-job-builder' | ||
diff --git a/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts new file mode 100644 index 000000000..7c892718b --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts | |||
@@ -0,0 +1,308 @@ | |||
1 | import Bluebird from 'bluebird' | ||
2 | import { computeOutputFPS } from '@server/helpers/ffmpeg' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { CONFIG } from '@server/initializers/config' | ||
5 | import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' | ||
6 | import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' | ||
7 | import { Hooks } from '@server/lib/plugins/hooks' | ||
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
10 | import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' | ||
11 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' | ||
12 | import { | ||
13 | HLSTranscodingPayload, | ||
14 | MergeAudioTranscodingPayload, | ||
15 | NewWebTorrentResolutionTranscodingPayload, | ||
16 | OptimizeTranscodingPayload, | ||
17 | VideoTranscodingPayload | ||
18 | } from '@shared/models' | ||
19 | import { canDoQuickTranscode } from '../../transcoding-quick-transcode' | ||
20 | import { computeResolutionsToTranscode } from '../../transcoding-resolutions' | ||
21 | import { AbstractJobBuilder } from './abstract-job-builder' | ||
22 | |||
23 | export class TranscodingJobQueueBuilder extends AbstractJobBuilder { | ||
24 | |||
25 | async createOptimizeOrMergeAudioJobs (options: { | ||
26 | video: MVideoFullLight | ||
27 | videoFile: MVideoFile | ||
28 | isNewVideo: boolean | ||
29 | user: MUserId | ||
30 | }) { | ||
31 | const { video, videoFile, isNewVideo, user } = options | ||
32 | |||
33 | let mergeOrOptimizePayload: MergeAudioTranscodingPayload | OptimizeTranscodingPayload | ||
34 | let nextTranscodingSequentialJobPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] | ||
35 | |||
36 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
37 | |||
38 | try { | ||
39 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { | ||
40 | const probe = await ffprobePromise(videoFilePath) | ||
41 | |||
42 | const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) | ||
43 | const hasAudio = await hasAudioStream(videoFilePath, probe) | ||
44 | const quickTranscode = await canDoQuickTranscode(videoFilePath, probe) | ||
45 | const inputFPS = videoFile.isAudio() | ||
46 | ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value | ||
47 | : await getVideoStreamFPS(videoFilePath, probe) | ||
48 | |||
49 | const maxResolution = await isAudioFile(videoFilePath, probe) | ||
50 | ? DEFAULT_AUDIO_RESOLUTION | ||
51 | : resolution | ||
52 | |||
53 | if (CONFIG.TRANSCODING.HLS.ENABLED === true) { | ||
54 | nextTranscodingSequentialJobPayloads.push([ | ||
55 | this.buildHLSJobPayload({ | ||
56 | deleteWebTorrentFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, | ||
57 | |||
58 | // We had some issues with a web video quick transcoded while producing a HLS version of it | ||
59 | copyCodecs: !quickTranscode, | ||
60 | |||
61 | resolution: maxResolution, | ||
62 | fps: computeOutputFPS({ inputFPS, resolution: maxResolution }), | ||
63 | videoUUID: video.uuid, | ||
64 | isNewVideo | ||
65 | }) | ||
66 | ]) | ||
67 | } | ||
68 | |||
69 | const lowerResolutionJobPayloads = await this.buildLowerResolutionJobPayloads({ | ||
70 | video, | ||
71 | inputVideoResolution: maxResolution, | ||
72 | inputVideoFPS: inputFPS, | ||
73 | hasAudio, | ||
74 | isNewVideo | ||
75 | }) | ||
76 | |||
77 | nextTranscodingSequentialJobPayloads = [ ...nextTranscodingSequentialJobPayloads, ...lowerResolutionJobPayloads ] | ||
78 | |||
79 | mergeOrOptimizePayload = videoFile.isAudio() | ||
80 | ? this.buildMergeAudioPayload({ videoUUID: video.uuid, isNewVideo }) | ||
81 | : this.buildOptimizePayload({ videoUUID: video.uuid, isNewVideo, quickTranscode }) | ||
82 | }) | ||
83 | } finally { | ||
84 | mutexReleaser() | ||
85 | } | ||
86 | |||
87 | const nextTranscodingSequentialJobs = await Bluebird.mapSeries(nextTranscodingSequentialJobPayloads, payloads => { | ||
88 | return Bluebird.mapSeries(payloads, payload => { | ||
89 | return this.buildTranscodingJob({ payload, user }) | ||
90 | }) | ||
91 | }) | ||
92 | |||
93 | const transcodingJobBuilderJob: CreateJobArgument = { | ||
94 | type: 'transcoding-job-builder', | ||
95 | payload: { | ||
96 | videoUUID: video.uuid, | ||
97 | sequentialJobs: nextTranscodingSequentialJobs | ||
98 | } | ||
99 | } | ||
100 | |||
101 | const mergeOrOptimizeJob = await this.buildTranscodingJob({ payload: mergeOrOptimizePayload, user }) | ||
102 | |||
103 | return JobQueue.Instance.createSequentialJobFlow(...[ mergeOrOptimizeJob, transcodingJobBuilderJob ]) | ||
104 | } | ||
105 | |||
106 | // --------------------------------------------------------------------------- | ||
107 | |||
108 | async createTranscodingJobs (options: { | ||
109 | transcodingType: 'hls' | 'webtorrent' | ||
110 | video: MVideoFullLight | ||
111 | resolutions: number[] | ||
112 | isNewVideo: boolean | ||
113 | user: MUserId | null | ||
114 | }) { | ||
115 | const { video, transcodingType, resolutions, isNewVideo } = options | ||
116 | |||
117 | const maxResolution = Math.max(...resolutions) | ||
118 | const childrenResolutions = resolutions.filter(r => r !== maxResolution) | ||
119 | |||
120 | logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) | ||
121 | |||
122 | const { fps: inputFPS } = await video.probeMaxQualityFile() | ||
123 | |||
124 | const children = childrenResolutions.map(resolution => { | ||
125 | const fps = computeOutputFPS({ inputFPS, resolution }) | ||
126 | |||
127 | if (transcodingType === 'hls') { | ||
128 | return this.buildHLSJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) | ||
129 | } | ||
130 | |||
131 | if (transcodingType === 'webtorrent') { | ||
132 | return this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution, fps, isNewVideo }) | ||
133 | } | ||
134 | |||
135 | throw new Error('Unknown transcoding type') | ||
136 | }) | ||
137 | |||
138 | const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) | ||
139 | |||
140 | const parent = transcodingType === 'hls' | ||
141 | ? this.buildHLSJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) | ||
142 | : this.buildWebTorrentJobPayload({ videoUUID: video.uuid, resolution: maxResolution, fps, isNewVideo }) | ||
143 | |||
144 | // Process the last resolution after the other ones to prevent concurrency issue | ||
145 | // Because low resolutions use the biggest one as ffmpeg input | ||
146 | await this.createTranscodingJobsWithChildren({ videoUUID: video.uuid, parent, children, user: null }) | ||
147 | } | ||
148 | |||
149 | // --------------------------------------------------------------------------- | ||
150 | |||
151 | private async createTranscodingJobsWithChildren (options: { | ||
152 | videoUUID: string | ||
153 | parent: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload) | ||
154 | children: (HLSTranscodingPayload | NewWebTorrentResolutionTranscodingPayload)[] | ||
155 | user: MUserId | null | ||
156 | }) { | ||
157 | const { videoUUID, parent, children, user } = options | ||
158 | |||
159 | const parentJob = await this.buildTranscodingJob({ payload: parent, user }) | ||
160 | const childrenJobs = await Bluebird.mapSeries(children, c => this.buildTranscodingJob({ payload: c, user })) | ||
161 | |||
162 | await JobQueue.Instance.createJobWithChildren(parentJob, childrenJobs) | ||
163 | |||
164 | await VideoJobInfoModel.increaseOrCreate(videoUUID, 'pendingTranscode', 1 + children.length) | ||
165 | } | ||
166 | |||
167 | private async buildTranscodingJob (options: { | ||
168 | payload: VideoTranscodingPayload | ||
169 | user: MUserId | null // null means we don't want priority | ||
170 | }) { | ||
171 | const { user, payload } = options | ||
172 | |||
173 | return { | ||
174 | type: 'video-transcoding' as 'video-transcoding', | ||
175 | priority: await this.getTranscodingJobPriority({ user, fallback: undefined }), | ||
176 | payload | ||
177 | } | ||
178 | } | ||
179 | |||
180 | private async buildLowerResolutionJobPayloads (options: { | ||
181 | video: MVideoWithFileThumbnail | ||
182 | inputVideoResolution: number | ||
183 | inputVideoFPS: number | ||
184 | hasAudio: boolean | ||
185 | isNewVideo: boolean | ||
186 | }) { | ||
187 | const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio } = options | ||
188 | |||
189 | // Create transcoding jobs if there are enabled resolutions | ||
190 | const resolutionsEnabled = await Hooks.wrapObject( | ||
191 | computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), | ||
192 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
193 | options | ||
194 | ) | ||
195 | |||
196 | const sequentialPayloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[][] = [] | ||
197 | |||
198 | for (const resolution of resolutionsEnabled) { | ||
199 | const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) | ||
200 | |||
201 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { | ||
202 | const payloads: (NewWebTorrentResolutionTranscodingPayload | HLSTranscodingPayload)[] = [ | ||
203 | this.buildWebTorrentJobPayload({ | ||
204 | videoUUID: video.uuid, | ||
205 | resolution, | ||
206 | fps, | ||
207 | isNewVideo | ||
208 | }) | ||
209 | ] | ||
210 | |||
211 | // Create a subsequent job to create HLS resolution that will just copy web video codecs | ||
212 | if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
213 | payloads.push( | ||
214 | this.buildHLSJobPayload({ | ||
215 | videoUUID: video.uuid, | ||
216 | resolution, | ||
217 | fps, | ||
218 | isNewVideo, | ||
219 | copyCodecs: true | ||
220 | }) | ||
221 | ) | ||
222 | } | ||
223 | |||
224 | sequentialPayloads.push(payloads) | ||
225 | } else if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
226 | sequentialPayloads.push([ | ||
227 | this.buildHLSJobPayload({ | ||
228 | videoUUID: video.uuid, | ||
229 | resolution, | ||
230 | fps, | ||
231 | copyCodecs: false, | ||
232 | isNewVideo | ||
233 | }) | ||
234 | ]) | ||
235 | } | ||
236 | } | ||
237 | |||
238 | return sequentialPayloads | ||
239 | } | ||
240 | |||
241 | private buildHLSJobPayload (options: { | ||
242 | videoUUID: string | ||
243 | resolution: number | ||
244 | fps: number | ||
245 | isNewVideo: boolean | ||
246 | deleteWebTorrentFiles?: boolean // default false | ||
247 | copyCodecs?: boolean // default false | ||
248 | }): HLSTranscodingPayload { | ||
249 | const { videoUUID, resolution, fps, isNewVideo, deleteWebTorrentFiles = false, copyCodecs = false } = options | ||
250 | |||
251 | return { | ||
252 | type: 'new-resolution-to-hls', | ||
253 | videoUUID, | ||
254 | resolution, | ||
255 | fps, | ||
256 | copyCodecs, | ||
257 | isNewVideo, | ||
258 | deleteWebTorrentFiles | ||
259 | } | ||
260 | } | ||
261 | |||
262 | private buildWebTorrentJobPayload (options: { | ||
263 | videoUUID: string | ||
264 | resolution: number | ||
265 | fps: number | ||
266 | isNewVideo: boolean | ||
267 | }): NewWebTorrentResolutionTranscodingPayload { | ||
268 | const { videoUUID, resolution, fps, isNewVideo } = options | ||
269 | |||
270 | return { | ||
271 | type: 'new-resolution-to-webtorrent', | ||
272 | videoUUID, | ||
273 | isNewVideo, | ||
274 | resolution, | ||
275 | fps | ||
276 | } | ||
277 | } | ||
278 | |||
279 | private buildMergeAudioPayload (options: { | ||
280 | videoUUID: string | ||
281 | isNewVideo: boolean | ||
282 | }): MergeAudioTranscodingPayload { | ||
283 | const { videoUUID, isNewVideo } = options | ||
284 | |||
285 | return { | ||
286 | type: 'merge-audio-to-webtorrent', | ||
287 | resolution: DEFAULT_AUDIO_RESOLUTION, | ||
288 | fps: VIDEO_TRANSCODING_FPS.AUDIO_MERGE, | ||
289 | videoUUID, | ||
290 | isNewVideo | ||
291 | } | ||
292 | } | ||
293 | |||
294 | private buildOptimizePayload (options: { | ||
295 | videoUUID: string | ||
296 | quickTranscode: boolean | ||
297 | isNewVideo: boolean | ||
298 | }): OptimizeTranscodingPayload { | ||
299 | const { videoUUID, quickTranscode, isNewVideo } = options | ||
300 | |||
301 | return { | ||
302 | type: 'optimize-to-webtorrent', | ||
303 | videoUUID, | ||
304 | isNewVideo, | ||
305 | quickTranscode | ||
306 | } | ||
307 | } | ||
308 | } | ||
diff --git a/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts new file mode 100644 index 000000000..c7a63d2e2 --- /dev/null +++ b/server/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts | |||
@@ -0,0 +1,189 @@ | |||
1 | import { computeOutputFPS } from '@server/helpers/ffmpeg' | ||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
3 | import { CONFIG } from '@server/initializers/config' | ||
4 | import { DEFAULT_AUDIO_RESOLUTION, VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' | ||
5 | import { Hooks } from '@server/lib/plugins/hooks' | ||
6 | import { VODAudioMergeTranscodingJobHandler, VODHLSTranscodingJobHandler, VODWebVideoTranscodingJobHandler } from '@server/lib/runners' | ||
7 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
8 | import { MUserId, MVideoFile, MVideoFullLight, MVideoWithFileThumbnail } from '@server/types/models' | ||
9 | import { MRunnerJob } from '@server/types/models/runners' | ||
10 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream, isAudioFile } from '@shared/ffmpeg' | ||
11 | import { computeResolutionsToTranscode } from '../../transcoding-resolutions' | ||
12 | import { AbstractJobBuilder } from './abstract-job-builder' | ||
13 | |||
14 | /** | ||
15 | * | ||
16 | * Class to build transcoding job in the local job queue | ||
17 | * | ||
18 | */ | ||
19 | |||
20 | const lTags = loggerTagsFactory('transcoding') | ||
21 | |||
22 | export class TranscodingRunnerJobBuilder extends AbstractJobBuilder { | ||
23 | |||
24 | async createOptimizeOrMergeAudioJobs (options: { | ||
25 | video: MVideoFullLight | ||
26 | videoFile: MVideoFile | ||
27 | isNewVideo: boolean | ||
28 | user: MUserId | ||
29 | }) { | ||
30 | const { video, videoFile, isNewVideo, user } = options | ||
31 | |||
32 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
33 | |||
34 | try { | ||
35 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFile.withVideoOrPlaylist(video), async videoFilePath => { | ||
36 | const probe = await ffprobePromise(videoFilePath) | ||
37 | |||
38 | const { resolution } = await getVideoStreamDimensionsInfo(videoFilePath, probe) | ||
39 | const hasAudio = await hasAudioStream(videoFilePath, probe) | ||
40 | const inputFPS = videoFile.isAudio() | ||
41 | ? VIDEO_TRANSCODING_FPS.AUDIO_MERGE // The first transcoding job will transcode to this FPS value | ||
42 | : await getVideoStreamFPS(videoFilePath, probe) | ||
43 | |||
44 | const maxResolution = await isAudioFile(videoFilePath, probe) | ||
45 | ? DEFAULT_AUDIO_RESOLUTION | ||
46 | : resolution | ||
47 | |||
48 | const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) | ||
49 | const priority = await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
50 | |||
51 | const mainRunnerJob = videoFile.isAudio() | ||
52 | ? await new VODAudioMergeTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority }) | ||
53 | : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps, isNewVideo, priority }) | ||
54 | |||
55 | if (CONFIG.TRANSCODING.HLS.ENABLED === true) { | ||
56 | await new VODHLSTranscodingJobHandler().create({ | ||
57 | video, | ||
58 | deleteWebVideoFiles: CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false, | ||
59 | resolution: maxResolution, | ||
60 | fps, | ||
61 | isNewVideo, | ||
62 | dependsOnRunnerJob: mainRunnerJob, | ||
63 | priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
64 | }) | ||
65 | } | ||
66 | |||
67 | await this.buildLowerResolutionJobPayloads({ | ||
68 | video, | ||
69 | inputVideoResolution: maxResolution, | ||
70 | inputVideoFPS: inputFPS, | ||
71 | hasAudio, | ||
72 | isNewVideo, | ||
73 | mainRunnerJob, | ||
74 | user | ||
75 | }) | ||
76 | }) | ||
77 | } finally { | ||
78 | mutexReleaser() | ||
79 | } | ||
80 | } | ||
81 | |||
82 | // --------------------------------------------------------------------------- | ||
83 | |||
84 | async createTranscodingJobs (options: { | ||
85 | transcodingType: 'hls' | 'webtorrent' | ||
86 | video: MVideoFullLight | ||
87 | resolutions: number[] | ||
88 | isNewVideo: boolean | ||
89 | user: MUserId | null | ||
90 | }) { | ||
91 | const { video, transcodingType, resolutions, isNewVideo, user } = options | ||
92 | |||
93 | const maxResolution = Math.max(...resolutions) | ||
94 | const { fps: inputFPS } = await video.probeMaxQualityFile() | ||
95 | const maxFPS = computeOutputFPS({ inputFPS, resolution: maxResolution }) | ||
96 | const priority = await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
97 | |||
98 | const childrenResolutions = resolutions.filter(r => r !== maxResolution) | ||
99 | |||
100 | logger.info('Manually creating transcoding jobs for %s.', transcodingType, { childrenResolutions, maxResolution }) | ||
101 | |||
102 | // Process the last resolution before the other ones to prevent concurrency issue | ||
103 | // Because low resolutions use the biggest one as ffmpeg input | ||
104 | const mainJob = transcodingType === 'hls' | ||
105 | // eslint-disable-next-line max-len | ||
106 | ? await new VODHLSTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, deleteWebVideoFiles: false, priority }) | ||
107 | : await new VODWebVideoTranscodingJobHandler().create({ video, resolution: maxResolution, fps: maxFPS, isNewVideo, priority }) | ||
108 | |||
109 | for (const resolution of childrenResolutions) { | ||
110 | const dependsOnRunnerJob = mainJob | ||
111 | const fps = computeOutputFPS({ inputFPS, resolution: maxResolution }) | ||
112 | |||
113 | if (transcodingType === 'hls') { | ||
114 | await new VODHLSTranscodingJobHandler().create({ | ||
115 | video, | ||
116 | resolution, | ||
117 | fps, | ||
118 | isNewVideo, | ||
119 | deleteWebVideoFiles: false, | ||
120 | dependsOnRunnerJob, | ||
121 | priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
122 | }) | ||
123 | continue | ||
124 | } | ||
125 | |||
126 | if (transcodingType === 'webtorrent') { | ||
127 | await new VODWebVideoTranscodingJobHandler().create({ | ||
128 | video, | ||
129 | resolution, | ||
130 | fps, | ||
131 | isNewVideo, | ||
132 | dependsOnRunnerJob, | ||
133 | priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
134 | }) | ||
135 | continue | ||
136 | } | ||
137 | |||
138 | throw new Error('Unknown transcoding type') | ||
139 | } | ||
140 | } | ||
141 | |||
142 | private async buildLowerResolutionJobPayloads (options: { | ||
143 | mainRunnerJob: MRunnerJob | ||
144 | video: MVideoWithFileThumbnail | ||
145 | inputVideoResolution: number | ||
146 | inputVideoFPS: number | ||
147 | hasAudio: boolean | ||
148 | isNewVideo: boolean | ||
149 | user: MUserId | ||
150 | }) { | ||
151 | const { video, inputVideoResolution, inputVideoFPS, isNewVideo, hasAudio, mainRunnerJob, user } = options | ||
152 | |||
153 | // Create transcoding jobs if there are enabled resolutions | ||
154 | const resolutionsEnabled = await Hooks.wrapObject( | ||
155 | computeResolutionsToTranscode({ input: inputVideoResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), | ||
156 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
157 | options | ||
158 | ) | ||
159 | |||
160 | logger.debug('Lower resolutions build for %s.', video.uuid, { resolutionsEnabled, ...lTags(video.uuid) }) | ||
161 | |||
162 | for (const resolution of resolutionsEnabled) { | ||
163 | const fps = computeOutputFPS({ inputFPS: inputVideoFPS, resolution }) | ||
164 | |||
165 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { | ||
166 | await new VODWebVideoTranscodingJobHandler().create({ | ||
167 | video, | ||
168 | resolution, | ||
169 | fps, | ||
170 | isNewVideo, | ||
171 | dependsOnRunnerJob: mainRunnerJob, | ||
172 | priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
173 | }) | ||
174 | } | ||
175 | |||
176 | if (CONFIG.TRANSCODING.HLS.ENABLED) { | ||
177 | await new VODHLSTranscodingJobHandler().create({ | ||
178 | video, | ||
179 | resolution, | ||
180 | fps, | ||
181 | isNewVideo, | ||
182 | deleteWebVideoFiles: false, | ||
183 | dependsOnRunnerJob: mainRunnerJob, | ||
184 | priority: await this.getTranscodingJobPriority({ user, fallback: 0 }) | ||
185 | }) | ||
186 | } | ||
187 | } | ||
188 | } | ||
189 | } | ||
diff --git a/server/lib/transcoding/transcoding-quick-transcode.ts b/server/lib/transcoding/transcoding-quick-transcode.ts new file mode 100644 index 000000000..b7f921890 --- /dev/null +++ b/server/lib/transcoding/transcoding-quick-transcode.ts | |||
@@ -0,0 +1,61 @@ | |||
1 | import { FfprobeData } from 'fluent-ffmpeg' | ||
2 | import { CONFIG } from '@server/initializers/config' | ||
3 | import { VIDEO_TRANSCODING_FPS } from '@server/initializers/constants' | ||
4 | import { getMaxBitrate } from '@shared/core-utils' | ||
5 | import { | ||
6 | ffprobePromise, | ||
7 | getAudioStream, | ||
8 | getMaxAudioBitrate, | ||
9 | getVideoStream, | ||
10 | getVideoStreamBitrate, | ||
11 | getVideoStreamDimensionsInfo, | ||
12 | getVideoStreamFPS | ||
13 | } from '@shared/ffmpeg' | ||
14 | |||
15 | export async function canDoQuickTranscode (path: string, existingProbe?: FfprobeData): Promise<boolean> { | ||
16 | if (CONFIG.TRANSCODING.PROFILE !== 'default') return false | ||
17 | |||
18 | const probe = existingProbe || await ffprobePromise(path) | ||
19 | |||
20 | return await canDoQuickVideoTranscode(path, probe) && | ||
21 | await canDoQuickAudioTranscode(path, probe) | ||
22 | } | ||
23 | |||
24 | export async function canDoQuickAudioTranscode (path: string, probe?: FfprobeData): Promise<boolean> { | ||
25 | const parsedAudio = await getAudioStream(path, probe) | ||
26 | |||
27 | if (!parsedAudio.audioStream) return true | ||
28 | |||
29 | if (parsedAudio.audioStream['codec_name'] !== 'aac') return false | ||
30 | |||
31 | const audioBitrate = parsedAudio.bitrate | ||
32 | if (!audioBitrate) return false | ||
33 | |||
34 | const maxAudioBitrate = getMaxAudioBitrate('aac', audioBitrate) | ||
35 | if (maxAudioBitrate !== -1 && audioBitrate > maxAudioBitrate) return false | ||
36 | |||
37 | const channelLayout = parsedAudio.audioStream['channel_layout'] | ||
38 | // Causes playback issues with Chrome | ||
39 | if (!channelLayout || channelLayout === 'unknown' || channelLayout === 'quad') return false | ||
40 | |||
41 | return true | ||
42 | } | ||
43 | |||
44 | export async function canDoQuickVideoTranscode (path: string, probe?: FfprobeData): Promise<boolean> { | ||
45 | const videoStream = await getVideoStream(path, probe) | ||
46 | const fps = await getVideoStreamFPS(path, probe) | ||
47 | const bitRate = await getVideoStreamBitrate(path, probe) | ||
48 | const resolutionData = await getVideoStreamDimensionsInfo(path, probe) | ||
49 | |||
50 | // If ffprobe did not manage to guess the bitrate | ||
51 | if (!bitRate) return false | ||
52 | |||
53 | // check video params | ||
54 | if (!videoStream) return false | ||
55 | if (videoStream['codec_name'] !== 'h264') return false | ||
56 | if (videoStream['pix_fmt'] !== 'yuv420p') return false | ||
57 | if (fps < VIDEO_TRANSCODING_FPS.MIN || fps > VIDEO_TRANSCODING_FPS.MAX) return false | ||
58 | if (bitRate > getMaxBitrate({ ...resolutionData, fps })) return false | ||
59 | |||
60 | return true | ||
61 | } | ||
diff --git a/server/lib/transcoding/transcoding-resolutions.ts b/server/lib/transcoding/transcoding-resolutions.ts new file mode 100644 index 000000000..91f4d18d8 --- /dev/null +++ b/server/lib/transcoding/transcoding-resolutions.ts | |||
@@ -0,0 +1,52 @@ | |||
1 | import { CONFIG } from '@server/initializers/config' | ||
2 | import { toEven } from '@shared/core-utils' | ||
3 | import { VideoResolution } from '@shared/models' | ||
4 | |||
5 | export function computeResolutionsToTranscode (options: { | ||
6 | input: number | ||
7 | type: 'vod' | 'live' | ||
8 | includeInput: boolean | ||
9 | strictLower: boolean | ||
10 | hasAudio: boolean | ||
11 | }) { | ||
12 | const { input, type, includeInput, strictLower, hasAudio } = options | ||
13 | |||
14 | const configResolutions = type === 'vod' | ||
15 | ? CONFIG.TRANSCODING.RESOLUTIONS | ||
16 | : CONFIG.LIVE.TRANSCODING.RESOLUTIONS | ||
17 | |||
18 | const resolutionsEnabled = new Set<number>() | ||
19 | |||
20 | // Put in the order we want to proceed jobs | ||
21 | const availableResolutions: VideoResolution[] = [ | ||
22 | VideoResolution.H_NOVIDEO, | ||
23 | VideoResolution.H_480P, | ||
24 | VideoResolution.H_360P, | ||
25 | VideoResolution.H_720P, | ||
26 | VideoResolution.H_240P, | ||
27 | VideoResolution.H_144P, | ||
28 | VideoResolution.H_1080P, | ||
29 | VideoResolution.H_1440P, | ||
30 | VideoResolution.H_4K | ||
31 | ] | ||
32 | |||
33 | for (const resolution of availableResolutions) { | ||
34 | // Resolution not enabled | ||
35 | if (configResolutions[resolution + 'p'] !== true) continue | ||
36 | // Too big resolution for input file | ||
37 | if (input < resolution) continue | ||
38 | // We only want lower resolutions than input file | ||
39 | if (strictLower && input === resolution) continue | ||
40 | // Audio resolutio but no audio in the video | ||
41 | if (resolution === VideoResolution.H_NOVIDEO && !hasAudio) continue | ||
42 | |||
43 | resolutionsEnabled.add(resolution) | ||
44 | } | ||
45 | |||
46 | if (includeInput) { | ||
47 | // Always use an even resolution to avoid issues with ffmpeg | ||
48 | resolutionsEnabled.add(toEven(input)) | ||
49 | } | ||
50 | |||
51 | return Array.from(resolutionsEnabled) | ||
52 | } | ||
diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts deleted file mode 100644 index c7b61e9ba..000000000 --- a/server/lib/transcoding/transcoding.ts +++ /dev/null | |||
@@ -1,465 +0,0 @@ | |||
1 | import { MutexInterface } from 'async-mutex' | ||
2 | import { Job } from 'bullmq' | ||
3 | import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' | ||
4 | import { basename, extname as extnameUtil, join } from 'path' | ||
5 | import { toEven } from '@server/helpers/core-utils' | ||
6 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
7 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | ||
8 | import { sequelizeTypescript } from '@server/initializers/database' | ||
9 | import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' | ||
10 | import { pick } from '@shared/core-utils' | ||
11 | import { VideoResolution, VideoStorage } from '../../../shared/models/videos' | ||
12 | import { | ||
13 | buildFileMetadata, | ||
14 | canDoQuickTranscode, | ||
15 | computeResolutionsToTranscode, | ||
16 | ffprobePromise, | ||
17 | getVideoStreamDuration, | ||
18 | getVideoStreamFPS, | ||
19 | transcodeVOD, | ||
20 | TranscodeVODOptions, | ||
21 | TranscodeVODOptionsType | ||
22 | } from '../../helpers/ffmpeg' | ||
23 | import { CONFIG } from '../../initializers/config' | ||
24 | import { VideoFileModel } from '../../models/video/video-file' | ||
25 | import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist' | ||
26 | import { updatePlaylistAfterFileChange } from '../hls' | ||
27 | import { generateHLSVideoFilename, generateWebTorrentVideoFilename, getHlsResolutionPlaylistFilename } from '../paths' | ||
28 | import { VideoPathManager } from '../video-path-manager' | ||
29 | import { VideoTranscodingProfilesManager } from './default-transcoding-profiles' | ||
30 | |||
31 | /** | ||
32 | * | ||
33 | * Functions that run transcoding functions, update the database, cleanup files, create torrent files... | ||
34 | * Mainly called by the job queue | ||
35 | * | ||
36 | */ | ||
37 | |||
38 | // Optimize the original video file and replace it. The resolution is not changed. | ||
39 | async function optimizeOriginalVideofile (options: { | ||
40 | video: MVideoFullLight | ||
41 | inputVideoFile: MVideoFile | ||
42 | job: Job | ||
43 | }) { | ||
44 | const { video, inputVideoFile, job } = options | ||
45 | |||
46 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
47 | const newExtname = '.mp4' | ||
48 | |||
49 | // Will be released by our transcodeVOD function once ffmpeg is ran | ||
50 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
51 | |||
52 | try { | ||
53 | await video.reload() | ||
54 | |||
55 | const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) | ||
56 | |||
57 | const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => { | ||
58 | const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) | ||
59 | |||
60 | const transcodeType: TranscodeVODOptionsType = await canDoQuickTranscode(videoInputPath) | ||
61 | ? 'quick-transcode' | ||
62 | : 'video' | ||
63 | |||
64 | const resolution = buildOriginalFileResolution(inputVideoFile.resolution) | ||
65 | |||
66 | const transcodeOptions: TranscodeVODOptions = { | ||
67 | type: transcodeType, | ||
68 | |||
69 | inputPath: videoInputPath, | ||
70 | outputPath: videoTranscodedPath, | ||
71 | |||
72 | inputFileMutexReleaser, | ||
73 | |||
74 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
75 | profile: CONFIG.TRANSCODING.PROFILE, | ||
76 | |||
77 | resolution, | ||
78 | |||
79 | job | ||
80 | } | ||
81 | |||
82 | // Could be very long! | ||
83 | await transcodeVOD(transcodeOptions) | ||
84 | |||
85 | // Important to do this before getVideoFilename() to take in account the new filename | ||
86 | inputVideoFile.resolution = resolution | ||
87 | inputVideoFile.extname = newExtname | ||
88 | inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname) | ||
89 | inputVideoFile.storage = VideoStorage.FILE_SYSTEM | ||
90 | |||
91 | const { videoFile } = await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile) | ||
92 | await remove(videoInputPath) | ||
93 | |||
94 | return { transcodeType, videoFile } | ||
95 | }) | ||
96 | |||
97 | return result | ||
98 | } finally { | ||
99 | inputFileMutexReleaser() | ||
100 | } | ||
101 | } | ||
102 | |||
103 | // Transcode the original video file to a lower resolution compatible with WebTorrent | ||
104 | async function transcodeNewWebTorrentResolution (options: { | ||
105 | video: MVideoFullLight | ||
106 | resolution: VideoResolution | ||
107 | job: Job | ||
108 | }) { | ||
109 | const { video, resolution, job } = options | ||
110 | |||
111 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
112 | const newExtname = '.mp4' | ||
113 | |||
114 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
115 | |||
116 | try { | ||
117 | await video.reload() | ||
118 | |||
119 | const file = video.getMaxQualityFile().withVideoOrPlaylist(video) | ||
120 | |||
121 | const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => { | ||
122 | const newVideoFile = new VideoFileModel({ | ||
123 | resolution, | ||
124 | extname: newExtname, | ||
125 | filename: generateWebTorrentVideoFilename(resolution, newExtname), | ||
126 | size: 0, | ||
127 | videoId: video.id | ||
128 | }) | ||
129 | |||
130 | const videoTranscodedPath = join(transcodeDirectory, newVideoFile.filename) | ||
131 | |||
132 | const transcodeOptions = resolution === VideoResolution.H_NOVIDEO | ||
133 | ? { | ||
134 | type: 'only-audio' as 'only-audio', | ||
135 | |||
136 | inputPath: videoInputPath, | ||
137 | outputPath: videoTranscodedPath, | ||
138 | |||
139 | inputFileMutexReleaser, | ||
140 | |||
141 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
142 | profile: CONFIG.TRANSCODING.PROFILE, | ||
143 | |||
144 | resolution, | ||
145 | |||
146 | job | ||
147 | } | ||
148 | : { | ||
149 | type: 'video' as 'video', | ||
150 | inputPath: videoInputPath, | ||
151 | outputPath: videoTranscodedPath, | ||
152 | |||
153 | inputFileMutexReleaser, | ||
154 | |||
155 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
156 | profile: CONFIG.TRANSCODING.PROFILE, | ||
157 | |||
158 | resolution, | ||
159 | |||
160 | job | ||
161 | } | ||
162 | |||
163 | await transcodeVOD(transcodeOptions) | ||
164 | |||
165 | return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, newVideoFile) | ||
166 | }) | ||
167 | |||
168 | return result | ||
169 | } finally { | ||
170 | inputFileMutexReleaser() | ||
171 | } | ||
172 | } | ||
173 | |||
174 | // Merge an image with an audio file to create a video | ||
175 | async function mergeAudioVideofile (options: { | ||
176 | video: MVideoFullLight | ||
177 | resolution: VideoResolution | ||
178 | job: Job | ||
179 | }) { | ||
180 | const { video, resolution, job } = options | ||
181 | |||
182 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
183 | const newExtname = '.mp4' | ||
184 | |||
185 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
186 | |||
187 | try { | ||
188 | await video.reload() | ||
189 | |||
190 | const inputVideoFile = video.getMinQualityFile() | ||
191 | |||
192 | const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) | ||
193 | |||
194 | const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => { | ||
195 | const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) | ||
196 | |||
197 | // If the user updates the video preview during transcoding | ||
198 | const previewPath = video.getPreview().getPath() | ||
199 | const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath)) | ||
200 | await copyFile(previewPath, tmpPreviewPath) | ||
201 | |||
202 | const transcodeOptions = { | ||
203 | type: 'merge-audio' as 'merge-audio', | ||
204 | |||
205 | inputPath: tmpPreviewPath, | ||
206 | outputPath: videoTranscodedPath, | ||
207 | |||
208 | inputFileMutexReleaser, | ||
209 | |||
210 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
211 | profile: CONFIG.TRANSCODING.PROFILE, | ||
212 | |||
213 | audioPath: audioInputPath, | ||
214 | resolution, | ||
215 | |||
216 | job | ||
217 | } | ||
218 | |||
219 | try { | ||
220 | await transcodeVOD(transcodeOptions) | ||
221 | |||
222 | await remove(audioInputPath) | ||
223 | await remove(tmpPreviewPath) | ||
224 | } catch (err) { | ||
225 | await remove(tmpPreviewPath) | ||
226 | throw err | ||
227 | } | ||
228 | |||
229 | // Important to do this before getVideoFilename() to take in account the new file extension | ||
230 | inputVideoFile.extname = newExtname | ||
231 | inputVideoFile.resolution = resolution | ||
232 | inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname) | ||
233 | |||
234 | // ffmpeg generated a new video file, so update the video duration | ||
235 | // See https://trac.ffmpeg.org/ticket/5456 | ||
236 | video.duration = await getVideoStreamDuration(videoTranscodedPath) | ||
237 | await video.save() | ||
238 | |||
239 | return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile) | ||
240 | }) | ||
241 | |||
242 | return result | ||
243 | } finally { | ||
244 | inputFileMutexReleaser() | ||
245 | } | ||
246 | } | ||
247 | |||
248 | // Concat TS segments from a live video to a fragmented mp4 HLS playlist | ||
249 | async function generateHlsPlaylistResolutionFromTS (options: { | ||
250 | video: MVideo | ||
251 | concatenatedTsFilePath: string | ||
252 | resolution: VideoResolution | ||
253 | isAAC: boolean | ||
254 | inputFileMutexReleaser: MutexInterface.Releaser | ||
255 | }) { | ||
256 | return generateHlsPlaylistCommon({ | ||
257 | type: 'hls-from-ts' as 'hls-from-ts', | ||
258 | inputPath: options.concatenatedTsFilePath, | ||
259 | |||
260 | ...pick(options, [ 'video', 'resolution', 'inputFileMutexReleaser', 'isAAC' ]) | ||
261 | }) | ||
262 | } | ||
263 | |||
264 | // Generate an HLS playlist from an input file, and update the master playlist | ||
265 | function generateHlsPlaylistResolution (options: { | ||
266 | video: MVideo | ||
267 | videoInputPath: string | ||
268 | resolution: VideoResolution | ||
269 | copyCodecs: boolean | ||
270 | inputFileMutexReleaser: MutexInterface.Releaser | ||
271 | job?: Job | ||
272 | }) { | ||
273 | return generateHlsPlaylistCommon({ | ||
274 | type: 'hls' as 'hls', | ||
275 | inputPath: options.videoInputPath, | ||
276 | |||
277 | ...pick(options, [ 'video', 'resolution', 'copyCodecs', 'inputFileMutexReleaser', 'job' ]) | ||
278 | }) | ||
279 | } | ||
280 | |||
281 | // --------------------------------------------------------------------------- | ||
282 | |||
283 | export { | ||
284 | generateHlsPlaylistResolution, | ||
285 | generateHlsPlaylistResolutionFromTS, | ||
286 | optimizeOriginalVideofile, | ||
287 | transcodeNewWebTorrentResolution, | ||
288 | mergeAudioVideofile | ||
289 | } | ||
290 | |||
291 | // --------------------------------------------------------------------------- | ||
292 | |||
293 | async function onWebTorrentVideoFileTranscoding ( | ||
294 | video: MVideoFullLight, | ||
295 | videoFile: MVideoFile, | ||
296 | transcodingPath: string, | ||
297 | newVideoFile: MVideoFile | ||
298 | ) { | ||
299 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
300 | |||
301 | try { | ||
302 | await video.reload() | ||
303 | |||
304 | const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile) | ||
305 | |||
306 | const stats = await stat(transcodingPath) | ||
307 | |||
308 | const probe = await ffprobePromise(transcodingPath) | ||
309 | const fps = await getVideoStreamFPS(transcodingPath, probe) | ||
310 | const metadata = await buildFileMetadata(transcodingPath, probe) | ||
311 | |||
312 | await move(transcodingPath, outputPath, { overwrite: true }) | ||
313 | |||
314 | videoFile.size = stats.size | ||
315 | videoFile.fps = fps | ||
316 | videoFile.metadata = metadata | ||
317 | |||
318 | await createTorrentAndSetInfoHash(video, videoFile) | ||
319 | |||
320 | const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution }) | ||
321 | if (oldFile) await video.removeWebTorrentFile(oldFile) | ||
322 | |||
323 | await VideoFileModel.customUpsert(videoFile, 'video', undefined) | ||
324 | video.VideoFiles = await video.$get('VideoFiles') | ||
325 | |||
326 | return { video, videoFile } | ||
327 | } finally { | ||
328 | mutexReleaser() | ||
329 | } | ||
330 | } | ||
331 | |||
332 | async function generateHlsPlaylistCommon (options: { | ||
333 | type: 'hls' | 'hls-from-ts' | ||
334 | video: MVideo | ||
335 | inputPath: string | ||
336 | resolution: VideoResolution | ||
337 | |||
338 | inputFileMutexReleaser: MutexInterface.Releaser | ||
339 | |||
340 | copyCodecs?: boolean | ||
341 | isAAC?: boolean | ||
342 | |||
343 | job?: Job | ||
344 | }) { | ||
345 | const { type, video, inputPath, resolution, copyCodecs, isAAC, job, inputFileMutexReleaser } = options | ||
346 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
347 | |||
348 | const videoTranscodedBasePath = join(transcodeDirectory, type) | ||
349 | await ensureDir(videoTranscodedBasePath) | ||
350 | |||
351 | const videoFilename = generateHLSVideoFilename(resolution) | ||
352 | const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFilename) | ||
353 | const resolutionPlaylistFileTranscodePath = join(videoTranscodedBasePath, resolutionPlaylistFilename) | ||
354 | |||
355 | const transcodeOptions = { | ||
356 | type, | ||
357 | |||
358 | inputPath, | ||
359 | outputPath: resolutionPlaylistFileTranscodePath, | ||
360 | |||
361 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
362 | profile: CONFIG.TRANSCODING.PROFILE, | ||
363 | |||
364 | resolution, | ||
365 | copyCodecs, | ||
366 | |||
367 | isAAC, | ||
368 | |||
369 | inputFileMutexReleaser, | ||
370 | |||
371 | hlsPlaylist: { | ||
372 | videoFilename | ||
373 | }, | ||
374 | |||
375 | job | ||
376 | } | ||
377 | |||
378 | await transcodeVOD(transcodeOptions) | ||
379 | |||
380 | // Create or update the playlist | ||
381 | const playlist = await retryTransactionWrapper(() => { | ||
382 | return sequelizeTypescript.transaction(async transaction => { | ||
383 | return VideoStreamingPlaylistModel.loadOrGenerate(video, transaction) | ||
384 | }) | ||
385 | }) | ||
386 | |||
387 | const newVideoFile = new VideoFileModel({ | ||
388 | resolution, | ||
389 | extname: extnameUtil(videoFilename), | ||
390 | size: 0, | ||
391 | filename: videoFilename, | ||
392 | fps: -1, | ||
393 | videoStreamingPlaylistId: playlist.id | ||
394 | }) | ||
395 | |||
396 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
397 | |||
398 | try { | ||
399 | // VOD transcoding is a long task, refresh video attributes | ||
400 | await video.reload() | ||
401 | |||
402 | const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, newVideoFile) | ||
403 | await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video)) | ||
404 | |||
405 | // Move playlist file | ||
406 | const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, resolutionPlaylistFilename) | ||
407 | await move(resolutionPlaylistFileTranscodePath, resolutionPlaylistPath, { overwrite: true }) | ||
408 | // Move video file | ||
409 | await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true }) | ||
410 | |||
411 | // Update video duration if it was not set (in case of a live for example) | ||
412 | if (!video.duration) { | ||
413 | video.duration = await getVideoStreamDuration(videoFilePath) | ||
414 | await video.save() | ||
415 | } | ||
416 | |||
417 | const stats = await stat(videoFilePath) | ||
418 | |||
419 | newVideoFile.size = stats.size | ||
420 | newVideoFile.fps = await getVideoStreamFPS(videoFilePath) | ||
421 | newVideoFile.metadata = await buildFileMetadata(videoFilePath) | ||
422 | |||
423 | await createTorrentAndSetInfoHash(playlist, newVideoFile) | ||
424 | |||
425 | const oldFile = await VideoFileModel.loadHLSFile({ | ||
426 | playlistId: playlist.id, | ||
427 | fps: newVideoFile.fps, | ||
428 | resolution: newVideoFile.resolution | ||
429 | }) | ||
430 | |||
431 | if (oldFile) { | ||
432 | await video.removeStreamingPlaylistVideoFile(playlist, oldFile) | ||
433 | await oldFile.destroy() | ||
434 | } | ||
435 | |||
436 | const savedVideoFile = await VideoFileModel.customUpsert(newVideoFile, 'streaming-playlist', undefined) | ||
437 | |||
438 | await updatePlaylistAfterFileChange(video, playlist) | ||
439 | |||
440 | return { resolutionPlaylistPath, videoFile: savedVideoFile } | ||
441 | } finally { | ||
442 | mutexReleaser() | ||
443 | } | ||
444 | } | ||
445 | |||
446 | function buildOriginalFileResolution (inputResolution: number) { | ||
447 | if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) { | ||
448 | return toEven(inputResolution) | ||
449 | } | ||
450 | |||
451 | const resolutions = computeResolutionsToTranscode({ | ||
452 | input: inputResolution, | ||
453 | type: 'vod', | ||
454 | includeInput: false, | ||
455 | strictLower: false, | ||
456 | // We don't really care about the audio resolution in this context | ||
457 | hasAudio: true | ||
458 | }) | ||
459 | |||
460 | if (resolutions.length === 0) { | ||
461 | return toEven(inputResolution) | ||
462 | } | ||
463 | |||
464 | return Math.max(...resolutions) | ||
465 | } | ||
diff --git a/server/lib/transcoding/web-transcoding.ts b/server/lib/transcoding/web-transcoding.ts new file mode 100644 index 000000000..d43d03b2a --- /dev/null +++ b/server/lib/transcoding/web-transcoding.ts | |||
@@ -0,0 +1,273 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { copyFile, move, remove, stat } from 'fs-extra' | ||
3 | import { basename, join } from 'path' | ||
4 | import { computeOutputFPS } from '@server/helpers/ffmpeg' | ||
5 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | ||
6 | import { MVideoFile, MVideoFullLight } from '@server/types/models' | ||
7 | import { toEven } from '@shared/core-utils' | ||
8 | import { ffprobePromise, getVideoStreamDuration, getVideoStreamFPS, TranscodeVODOptionsType } from '@shared/ffmpeg' | ||
9 | import { VideoResolution, VideoStorage } from '@shared/models' | ||
10 | import { CONFIG } from '../../initializers/config' | ||
11 | import { VideoFileModel } from '../../models/video/video-file' | ||
12 | import { generateWebTorrentVideoFilename } from '../paths' | ||
13 | import { buildFileMetadata } from '../video-file' | ||
14 | import { VideoPathManager } from '../video-path-manager' | ||
15 | import { buildFFmpegVOD } from './shared' | ||
16 | import { computeResolutionsToTranscode } from './transcoding-resolutions' | ||
17 | |||
18 | // Optimize the original video file and replace it. The resolution is not changed. | ||
19 | export async function optimizeOriginalVideofile (options: { | ||
20 | video: MVideoFullLight | ||
21 | inputVideoFile: MVideoFile | ||
22 | quickTranscode: boolean | ||
23 | job: Job | ||
24 | }) { | ||
25 | const { video, inputVideoFile, quickTranscode, job } = options | ||
26 | |||
27 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
28 | const newExtname = '.mp4' | ||
29 | |||
30 | // Will be released by our transcodeVOD function once ffmpeg is ran | ||
31 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
32 | |||
33 | try { | ||
34 | await video.reload() | ||
35 | |||
36 | const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) | ||
37 | |||
38 | const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => { | ||
39 | const videoOutputPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) | ||
40 | |||
41 | const transcodeType: TranscodeVODOptionsType = quickTranscode | ||
42 | ? 'quick-transcode' | ||
43 | : 'video' | ||
44 | |||
45 | const resolution = buildOriginalFileResolution(inputVideoFile.resolution) | ||
46 | const fps = computeOutputFPS({ inputFPS: inputVideoFile.fps, resolution }) | ||
47 | |||
48 | // Could be very long! | ||
49 | await buildFFmpegVOD(job).transcode({ | ||
50 | type: transcodeType, | ||
51 | |||
52 | inputPath: videoInputPath, | ||
53 | outputPath: videoOutputPath, | ||
54 | |||
55 | inputFileMutexReleaser, | ||
56 | |||
57 | resolution, | ||
58 | fps | ||
59 | }) | ||
60 | |||
61 | // Important to do this before getVideoFilename() to take in account the new filename | ||
62 | inputVideoFile.resolution = resolution | ||
63 | inputVideoFile.extname = newExtname | ||
64 | inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname) | ||
65 | inputVideoFile.storage = VideoStorage.FILE_SYSTEM | ||
66 | |||
67 | const { videoFile } = await onWebTorrentVideoFileTranscoding({ | ||
68 | video, | ||
69 | videoFile: inputVideoFile, | ||
70 | videoOutputPath | ||
71 | }) | ||
72 | |||
73 | await remove(videoInputPath) | ||
74 | |||
75 | return { transcodeType, videoFile } | ||
76 | }) | ||
77 | |||
78 | return result | ||
79 | } finally { | ||
80 | inputFileMutexReleaser() | ||
81 | } | ||
82 | } | ||
83 | |||
84 | // Transcode the original video file to a lower resolution compatible with WebTorrent | ||
85 | export async function transcodeNewWebTorrentResolution (options: { | ||
86 | video: MVideoFullLight | ||
87 | resolution: VideoResolution | ||
88 | fps: number | ||
89 | job: Job | ||
90 | }) { | ||
91 | const { video, resolution, fps, job } = options | ||
92 | |||
93 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
94 | const newExtname = '.mp4' | ||
95 | |||
96 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
97 | |||
98 | try { | ||
99 | await video.reload() | ||
100 | |||
101 | const file = video.getMaxQualityFile().withVideoOrPlaylist(video) | ||
102 | |||
103 | const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => { | ||
104 | const newVideoFile = new VideoFileModel({ | ||
105 | resolution, | ||
106 | extname: newExtname, | ||
107 | filename: generateWebTorrentVideoFilename(resolution, newExtname), | ||
108 | size: 0, | ||
109 | videoId: video.id | ||
110 | }) | ||
111 | |||
112 | const videoOutputPath = join(transcodeDirectory, newVideoFile.filename) | ||
113 | |||
114 | const transcodeOptions = { | ||
115 | type: 'video' as 'video', | ||
116 | |||
117 | inputPath: videoInputPath, | ||
118 | outputPath: videoOutputPath, | ||
119 | |||
120 | inputFileMutexReleaser, | ||
121 | |||
122 | resolution, | ||
123 | fps | ||
124 | } | ||
125 | |||
126 | await buildFFmpegVOD(job).transcode(transcodeOptions) | ||
127 | |||
128 | return onWebTorrentVideoFileTranscoding({ video, videoFile: newVideoFile, videoOutputPath }) | ||
129 | }) | ||
130 | |||
131 | return result | ||
132 | } finally { | ||
133 | inputFileMutexReleaser() | ||
134 | } | ||
135 | } | ||
136 | |||
137 | // Merge an image with an audio file to create a video | ||
138 | export async function mergeAudioVideofile (options: { | ||
139 | video: MVideoFullLight | ||
140 | resolution: VideoResolution | ||
141 | fps: number | ||
142 | job: Job | ||
143 | }) { | ||
144 | const { video, resolution, fps, job } = options | ||
145 | |||
146 | const transcodeDirectory = CONFIG.STORAGE.TMP_DIR | ||
147 | const newExtname = '.mp4' | ||
148 | |||
149 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
150 | |||
151 | try { | ||
152 | await video.reload() | ||
153 | |||
154 | const inputVideoFile = video.getMinQualityFile() | ||
155 | |||
156 | const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video) | ||
157 | |||
158 | const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => { | ||
159 | const videoOutputPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) | ||
160 | |||
161 | // If the user updates the video preview during transcoding | ||
162 | const previewPath = video.getPreview().getPath() | ||
163 | const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath)) | ||
164 | await copyFile(previewPath, tmpPreviewPath) | ||
165 | |||
166 | const transcodeOptions = { | ||
167 | type: 'merge-audio' as 'merge-audio', | ||
168 | |||
169 | inputPath: tmpPreviewPath, | ||
170 | outputPath: videoOutputPath, | ||
171 | |||
172 | inputFileMutexReleaser, | ||
173 | |||
174 | audioPath: audioInputPath, | ||
175 | resolution, | ||
176 | fps | ||
177 | } | ||
178 | |||
179 | try { | ||
180 | await buildFFmpegVOD(job).transcode(transcodeOptions) | ||
181 | |||
182 | await remove(audioInputPath) | ||
183 | await remove(tmpPreviewPath) | ||
184 | } catch (err) { | ||
185 | await remove(tmpPreviewPath) | ||
186 | throw err | ||
187 | } | ||
188 | |||
189 | // Important to do this before getVideoFilename() to take in account the new file extension | ||
190 | inputVideoFile.extname = newExtname | ||
191 | inputVideoFile.resolution = resolution | ||
192 | inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname) | ||
193 | |||
194 | // ffmpeg generated a new video file, so update the video duration | ||
195 | // See https://trac.ffmpeg.org/ticket/5456 | ||
196 | video.duration = await getVideoStreamDuration(videoOutputPath) | ||
197 | await video.save() | ||
198 | |||
199 | return onWebTorrentVideoFileTranscoding({ | ||
200 | video, | ||
201 | videoFile: inputVideoFile, | ||
202 | videoOutputPath | ||
203 | }) | ||
204 | }) | ||
205 | |||
206 | return result | ||
207 | } finally { | ||
208 | inputFileMutexReleaser() | ||
209 | } | ||
210 | } | ||
211 | |||
212 | export async function onWebTorrentVideoFileTranscoding (options: { | ||
213 | video: MVideoFullLight | ||
214 | videoFile: MVideoFile | ||
215 | videoOutputPath: string | ||
216 | }) { | ||
217 | const { video, videoFile, videoOutputPath } = options | ||
218 | |||
219 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
220 | |||
221 | try { | ||
222 | await video.reload() | ||
223 | |||
224 | const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, videoFile) | ||
225 | |||
226 | const stats = await stat(videoOutputPath) | ||
227 | |||
228 | const probe = await ffprobePromise(videoOutputPath) | ||
229 | const fps = await getVideoStreamFPS(videoOutputPath, probe) | ||
230 | const metadata = await buildFileMetadata(videoOutputPath, probe) | ||
231 | |||
232 | await move(videoOutputPath, outputPath, { overwrite: true }) | ||
233 | |||
234 | videoFile.size = stats.size | ||
235 | videoFile.fps = fps | ||
236 | videoFile.metadata = metadata | ||
237 | |||
238 | await createTorrentAndSetInfoHash(video, videoFile) | ||
239 | |||
240 | const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution }) | ||
241 | if (oldFile) await video.removeWebTorrentFile(oldFile) | ||
242 | |||
243 | await VideoFileModel.customUpsert(videoFile, 'video', undefined) | ||
244 | video.VideoFiles = await video.$get('VideoFiles') | ||
245 | |||
246 | return { video, videoFile } | ||
247 | } finally { | ||
248 | mutexReleaser() | ||
249 | } | ||
250 | } | ||
251 | |||
252 | // --------------------------------------------------------------------------- | ||
253 | |||
254 | function buildOriginalFileResolution (inputResolution: number) { | ||
255 | if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) { | ||
256 | return toEven(inputResolution) | ||
257 | } | ||
258 | |||
259 | const resolutions = computeResolutionsToTranscode({ | ||
260 | input: inputResolution, | ||
261 | type: 'vod', | ||
262 | includeInput: false, | ||
263 | strictLower: false, | ||
264 | // We don't really care about the audio resolution in this context | ||
265 | hasAudio: true | ||
266 | }) | ||
267 | |||
268 | if (resolutions.length === 0) { | ||
269 | return toEven(inputResolution) | ||
270 | } | ||
271 | |||
272 | return Math.max(...resolutions) | ||
273 | } | ||
diff --git a/server/lib/uploadx.ts b/server/lib/uploadx.ts index 58040cb6d..c7e0eb414 100644 --- a/server/lib/uploadx.ts +++ b/server/lib/uploadx.ts | |||
@@ -3,6 +3,7 @@ import { buildLogger } from '@server/helpers/logger' | |||
3 | import { getResumableUploadPath } from '@server/helpers/upload' | 3 | import { getResumableUploadPath } from '@server/helpers/upload' |
4 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
5 | import { LogLevel, Uploadx } from '@uploadx/core' | 5 | import { LogLevel, Uploadx } from '@uploadx/core' |
6 | import { extname } from 'path' | ||
6 | 7 | ||
7 | const logger = buildLogger('uploadx') | 8 | const logger = buildLogger('uploadx') |
8 | 9 | ||
@@ -26,7 +27,9 @@ const uploadx = new Uploadx({ | |||
26 | if (!res.locals.oauth) return undefined | 27 | if (!res.locals.oauth) return undefined |
27 | 28 | ||
28 | return res.locals.oauth.token.user.id + '' | 29 | return res.locals.oauth.token.user.id + '' |
29 | } | 30 | }, |
31 | |||
32 | filename: file => `${file.userId}-${file.id}${extname(file.metadata.filename)}` | ||
30 | }) | 33 | }) |
31 | 34 | ||
32 | export { | 35 | export { |
diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index fd5837a3a..cb1ea834c 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts | |||
@@ -81,7 +81,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video | |||
81 | } | 81 | } |
82 | 82 | ||
83 | if (videoInstance.isLive) { | 83 | if (videoInstance.isLive) { |
84 | LiveManager.Instance.stopSessionOf(videoInstance.id, LiveVideoError.BLACKLISTED) | 84 | LiveManager.Instance.stopSessionOf(videoInstance.uuid, LiveVideoError.BLACKLISTED) |
85 | } | 85 | } |
86 | 86 | ||
87 | Notifier.Instance.notifyOnVideoBlacklist(blacklist) | 87 | Notifier.Instance.notifyOnVideoBlacklist(blacklist) |
diff --git a/server/lib/video-file.ts b/server/lib/video-file.ts index 2ab7190f1..8fcc3c253 100644 --- a/server/lib/video-file.ts +++ b/server/lib/video-file.ts | |||
@@ -1,6 +1,44 @@ | |||
1 | import { FfprobeData } from 'fluent-ffmpeg' | ||
1 | import { logger } from '@server/helpers/logger' | 2 | import { logger } from '@server/helpers/logger' |
3 | import { VideoFileModel } from '@server/models/video/video-file' | ||
2 | import { MVideoWithAllFiles } from '@server/types/models' | 4 | import { MVideoWithAllFiles } from '@server/types/models' |
5 | import { getLowercaseExtension } from '@shared/core-utils' | ||
6 | import { getFileSize } from '@shared/extra-utils' | ||
7 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' | ||
8 | import { VideoFileMetadata, VideoResolution } from '@shared/models' | ||
3 | import { lTags } from './object-storage/shared' | 9 | import { lTags } from './object-storage/shared' |
10 | import { generateHLSVideoFilename, generateWebTorrentVideoFilename } from './paths' | ||
11 | |||
12 | async function buildNewFile (options: { | ||
13 | path: string | ||
14 | mode: 'web-video' | 'hls' | ||
15 | }) { | ||
16 | const { path, mode } = options | ||
17 | |||
18 | const probe = await ffprobePromise(path) | ||
19 | const size = await getFileSize(path) | ||
20 | |||
21 | const videoFile = new VideoFileModel({ | ||
22 | extname: getLowercaseExtension(path), | ||
23 | size, | ||
24 | metadata: await buildFileMetadata(path, probe) | ||
25 | }) | ||
26 | |||
27 | if (await isAudioFile(path, probe)) { | ||
28 | videoFile.resolution = VideoResolution.H_NOVIDEO | ||
29 | } else { | ||
30 | videoFile.fps = await getVideoStreamFPS(path, probe) | ||
31 | videoFile.resolution = (await getVideoStreamDimensionsInfo(path, probe)).resolution | ||
32 | } | ||
33 | |||
34 | videoFile.filename = mode === 'web-video' | ||
35 | ? generateWebTorrentVideoFilename(videoFile.resolution, videoFile.extname) | ||
36 | : generateHLSVideoFilename(videoFile.resolution) | ||
37 | |||
38 | return videoFile | ||
39 | } | ||
40 | |||
41 | // --------------------------------------------------------------------------- | ||
4 | 42 | ||
5 | async function removeHLSPlaylist (video: MVideoWithAllFiles) { | 43 | async function removeHLSPlaylist (video: MVideoWithAllFiles) { |
6 | const hls = video.getHLSPlaylist() | 44 | const hls = video.getHLSPlaylist() |
@@ -61,9 +99,23 @@ async function removeWebTorrentFile (video: MVideoWithAllFiles, fileToDeleteId: | |||
61 | return video | 99 | return video |
62 | } | 100 | } |
63 | 101 | ||
102 | // --------------------------------------------------------------------------- | ||
103 | |||
104 | async function buildFileMetadata (path: string, existingProbe?: FfprobeData) { | ||
105 | const metadata = existingProbe || await ffprobePromise(path) | ||
106 | |||
107 | return new VideoFileMetadata(metadata) | ||
108 | } | ||
109 | |||
110 | // --------------------------------------------------------------------------- | ||
111 | |||
64 | export { | 112 | export { |
113 | buildNewFile, | ||
114 | |||
65 | removeHLSPlaylist, | 115 | removeHLSPlaylist, |
66 | removeHLSFile, | 116 | removeHLSFile, |
67 | removeAllWebTorrentFiles, | 117 | removeAllWebTorrentFiles, |
68 | removeWebTorrentFile | 118 | removeWebTorrentFile, |
119 | |||
120 | buildFileMetadata | ||
69 | } | 121 | } |
diff --git a/server/lib/video-studio.ts b/server/lib/video-studio.ts index cdacd35f2..b392bdb00 100644 --- a/server/lib/video-studio.ts +++ b/server/lib/video-studio.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import { MVideoFullLight } from '@server/types/models' | 1 | import { MVideoFullLight } from '@server/types/models' |
2 | import { getVideoStreamDuration } from '@shared/extra-utils' | 2 | import { getVideoStreamDuration } from '@shared/ffmpeg' |
3 | import { VideoStudioTask } from '@shared/models' | 3 | import { VideoStudioTask } from '@shared/models' |
4 | 4 | ||
5 | function buildTaskFileFieldname (indice: number, fieldName = 'file') { | 5 | function buildTaskFileFieldname (indice: number, fieldName = 'file') { |
diff --git a/server/lib/video.ts b/server/lib/video.ts index aacc41a7a..588dc553f 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts | |||
@@ -2,14 +2,14 @@ import { UploadFiles } from 'express' | |||
2 | import memoizee from 'memoizee' | 2 | import memoizee from 'memoizee' |
3 | import { Transaction } from 'sequelize/types' | 3 | import { Transaction } from 'sequelize/types' |
4 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
5 | import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' | 5 | import { MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' |
6 | import { TagModel } from '@server/models/video/tag' | 6 | import { TagModel } from '@server/models/video/tag' |
7 | import { VideoModel } from '@server/models/video/video' | 7 | import { VideoModel } from '@server/models/video/video' |
8 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 8 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
9 | import { FilteredModelAttributes } from '@server/types' | 9 | import { FilteredModelAttributes } from '@server/types' |
10 | import { MThumbnail, MUserId, MVideoFile, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' | 10 | import { MThumbnail, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' |
11 | import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' | 11 | import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState } from '@shared/models' |
12 | import { CreateJobArgument, CreateJobOptions, JobQueue } from './job-queue/job-queue' | 12 | import { CreateJobArgument, JobQueue } from './job-queue/job-queue' |
13 | import { updateVideoMiniatureFromExisting } from './thumbnail' | 13 | import { updateVideoMiniatureFromExisting } from './thumbnail' |
14 | import { moveFilesIfPrivacyChanged } from './video-privacy' | 14 | import { moveFilesIfPrivacyChanged } from './video-privacy' |
15 | 15 | ||
@@ -87,58 +87,6 @@ async function setVideoTags (options: { | |||
87 | 87 | ||
88 | // --------------------------------------------------------------------------- | 88 | // --------------------------------------------------------------------------- |
89 | 89 | ||
90 | async function buildOptimizeOrMergeAudioJob (options: { | ||
91 | video: MVideoUUID | ||
92 | videoFile: MVideoFile | ||
93 | user: MUserId | ||
94 | isNewVideo?: boolean // Default true | ||
95 | }) { | ||
96 | const { video, videoFile, user, isNewVideo } = options | ||
97 | |||
98 | let payload: VideoTranscodingPayload | ||
99 | |||
100 | if (videoFile.isAudio()) { | ||
101 | payload = { | ||
102 | type: 'merge-audio-to-webtorrent', | ||
103 | resolution: DEFAULT_AUDIO_RESOLUTION, | ||
104 | videoUUID: video.uuid, | ||
105 | createHLSIfNeeded: true, | ||
106 | isNewVideo | ||
107 | } | ||
108 | } else { | ||
109 | payload = { | ||
110 | type: 'optimize-to-webtorrent', | ||
111 | videoUUID: video.uuid, | ||
112 | isNewVideo | ||
113 | } | ||
114 | } | ||
115 | |||
116 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | ||
117 | |||
118 | return { | ||
119 | type: 'video-transcoding' as 'video-transcoding', | ||
120 | priority: await getTranscodingJobPriority(user), | ||
121 | payload | ||
122 | } | ||
123 | } | ||
124 | |||
125 | async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { | ||
126 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | ||
127 | |||
128 | return { type: 'video-transcoding' as 'video-transcoding', payload, ...options } | ||
129 | } | ||
130 | |||
131 | async function getTranscodingJobPriority (user: MUserId) { | ||
132 | const now = new Date() | ||
133 | const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) | ||
134 | |||
135 | const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) | ||
136 | |||
137 | return JOB_PRIORITY.TRANSCODING + videoUploadedByUser | ||
138 | } | ||
139 | |||
140 | // --------------------------------------------------------------------------- | ||
141 | |||
142 | async function buildMoveToObjectStorageJob (options: { | 90 | async function buildMoveToObjectStorageJob (options: { |
143 | video: MVideoUUID | 91 | video: MVideoUUID |
144 | previousVideoState: VideoState | 92 | previousVideoState: VideoState |
@@ -235,10 +183,7 @@ export { | |||
235 | buildLocalVideoFromReq, | 183 | buildLocalVideoFromReq, |
236 | buildVideoThumbnailsFromReq, | 184 | buildVideoThumbnailsFromReq, |
237 | setVideoTags, | 185 | setVideoTags, |
238 | buildOptimizeOrMergeAudioJob, | ||
239 | buildTranscodingJob, | ||
240 | buildMoveToObjectStorageJob, | 186 | buildMoveToObjectStorageJob, |
241 | getTranscodingJobPriority, | ||
242 | addVideoJobsAfterUpdate, | 187 | addVideoJobsAfterUpdate, |
243 | getCachedVideoDuration | 188 | getCachedVideoDuration |
244 | } | 189 | } |