aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/live
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/live-manager.ts94
-rw-r--r--server/lib/live/live-segment-sha-store.ts5
-rw-r--r--server/lib/live/live-utils.ts12
-rw-r--r--server/lib/live/shared/muxing-session.ts191
-rw-r--r--server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts101
-rw-r--r--server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts95
-rw-r--r--server/lib/live/shared/transcoding-wrapper/index.ts3
-rw-r--r--server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts20
8 files changed, 369 insertions, 152 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 05274955d..aa32a9d52 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra'
2import { createServer, Server } from 'net' 2import { createServer, Server } from 'net'
3import { join } from 'path' 3import { join } from 'path'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import {
6 computeResolutionsToTranscode,
7 ffprobePromise,
8 getLiveSegmentTime,
9 getVideoStreamBitrate,
10 getVideoStreamDimensionsInfo,
11 getVideoStreamFPS,
12 hasAudioStream
13} from '@server/helpers/ffmpeg'
14import { logger, loggerTagsFactory } from '@server/helpers/logger' 5import { logger, loggerTagsFactory } from '@server/helpers/logger'
15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 6import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
16import { VIDEO_LIVE } from '@server/initializers/constants' 7import { VIDEO_LIVE } from '@server/initializers/constants'
8import { sequelizeTypescript } from '@server/initializers/database'
17import { UserModel } from '@server/models/user/user' 9import { UserModel } from '@server/models/user/user'
18import { VideoModel } from '@server/models/video/video' 10import { VideoModel } from '@server/models/video/video'
19import { VideoLiveModel } from '@server/models/video/video-live' 11import { VideoLiveModel } from '@server/models/video/video-live'
12import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
20import { VideoLiveSessionModel } from '@server/models/video/video-live-session' 13import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
22import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' 15import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models'
23import { pick, wait } from '@shared/core-utils' 16import { pick, wait } from '@shared/core-utils'
17import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg'
24import { LiveVideoError, VideoState } from '@shared/models' 18import { LiveVideoError, VideoState } from '@shared/models'
25import { federateVideoIfNeeded } from '../activitypub/videos' 19import { federateVideoIfNeeded } from '../activitypub/videos'
26import { JobQueue } from '../job-queue' 20import { JobQueue } from '../job-queue'
27import { getLiveReplayBaseDirectory } from '../paths' 21import { getLiveReplayBaseDirectory } from '../paths'
28import { PeerTubeSocket } from '../peertube-socket' 22import { PeerTubeSocket } from '../peertube-socket'
29import { Hooks } from '../plugins/hooks' 23import { Hooks } from '../plugins/hooks'
24import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions'
30import { LiveQuotaStore } from './live-quota-store' 25import { LiveQuotaStore } from './live-quota-store'
31import { cleanupAndDestroyPermanentLive } from './live-utils' 26import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils'
32import { MuxingSession } from './shared' 27import { MuxingSession } from './shared'
33import { sequelizeTypescript } from '@server/initializers/database' 28import { RunnerJobModel } from '@server/models/runner/runner-job'
34import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
35 29
36const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') 30const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
37const context = require('node-media-server/src/node_core_ctx') 31const context = require('node-media-server/src/node_core_ctx')
@@ -57,7 +51,7 @@ class LiveManager {
57 private static instance: LiveManager 51 private static instance: LiveManager
58 52
59 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
60 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<string, string>()
61 55
62 private rtmpServer: Server 56 private rtmpServer: Server
63 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -177,14 +171,19 @@ class LiveManager {
177 return !!this.rtmpServer 171 return !!this.rtmpServer
178 } 172 }
179 173
180 stopSessionOf (videoId: number, error: LiveVideoError | null) { 174 stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
181 const sessionId = this.videoSessions.get(videoId) 175 const sessionId = this.videoSessions.get(videoUUID)
182 if (!sessionId) return 176 if (!sessionId) {
177 logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
178 return
179 }
183 180
184 this.saveEndingSession(videoId, error) 181 logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
185 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
186 182
187 this.videoSessions.delete(videoId) 183 this.saveEndingSession(videoUUID, error)
184 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
185
186 this.videoSessions.delete(videoUUID)
188 this.abortSession(sessionId) 187 this.abortSession(sessionId)
189 } 188 }
190 189
@@ -221,6 +220,11 @@ class LiveManager {
221 return this.abortSession(sessionId) 220 return this.abortSession(sessionId)
222 } 221 }
223 222
223 if (this.videoSessions.has(video.uuid)) {
224 logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
225 return this.abortSession(sessionId)
226 }
227
224 // Cleanup old potential live (could happen with a permanent live) 228 // Cleanup old potential live (could happen with a permanent live)
225 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) 229 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
226 if (oldStreamingPlaylist) { 230 if (oldStreamingPlaylist) {
@@ -229,7 +233,7 @@ class LiveManager {
229 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) 233 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
230 } 234 }
231 235
232 this.videoSessions.set(video.id, sessionId) 236 this.videoSessions.set(video.uuid, sessionId)
233 237
234 const now = Date.now() 238 const now = Date.now()
235 const probe = await ffprobePromise(inputUrl) 239 const probe = await ffprobePromise(inputUrl)
@@ -253,7 +257,7 @@ class LiveManager {
253 ) 257 )
254 258
255 logger.info( 259 logger.info(
256 'Will mux/transcode live video of original resolution %d.', resolution, 260 'Handling live video of original resolution %d.', resolution,
257 { allResolutions, ...lTags(sessionId, video.uuid) } 261 { allResolutions, ...lTags(sessionId, video.uuid) }
258 ) 262 )
259 263
@@ -301,44 +305,44 @@ class LiveManager {
301 305
302 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) 306 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
303 307
304 muxingSession.on('bad-socket-health', ({ videoId }) => { 308 muxingSession.on('bad-socket-health', ({ videoUUID }) => {
305 logger.error( 309 logger.error(
306 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + 310 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
307 ' Stopping session of video %s.', videoUUID, 311 ' Stopping session of video %s.', videoUUID,
308 localLTags 312 localLTags
309 ) 313 )
310 314
311 this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) 315 this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH)
312 }) 316 })
313 317
314 muxingSession.on('duration-exceeded', ({ videoId }) => { 318 muxingSession.on('duration-exceeded', ({ videoUUID }) => {
315 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) 319 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
316 320
317 this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) 321 this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED)
318 }) 322 })
319 323
320 muxingSession.on('quota-exceeded', ({ videoId }) => { 324 muxingSession.on('quota-exceeded', ({ videoUUID }) => {
321 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) 325 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
322 326
323 this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) 327 this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED)
324 }) 328 })
325 329
326 muxingSession.on('ffmpeg-error', ({ videoId }) => { 330 muxingSession.on('transcoding-error', ({ videoUUID }) => {
327 this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) 331 this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR)
328 }) 332 })
329 333
330 muxingSession.on('ffmpeg-end', ({ videoId }) => { 334 muxingSession.on('transcoding-end', ({ videoUUID }) => {
331 this.onMuxingFFmpegEnd(videoId, sessionId) 335 this.onMuxingFFmpegEnd(videoUUID, sessionId)
332 }) 336 })
333 337
334 muxingSession.on('after-cleanup', ({ videoId }) => { 338 muxingSession.on('after-cleanup', ({ videoUUID }) => {
335 this.muxingSessions.delete(sessionId) 339 this.muxingSessions.delete(sessionId)
336 340
337 LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) 341 LiveQuotaStore.Instance.removeLive(user.id, videoLive.id)
338 342
339 muxingSession.destroy() 343 muxingSession.destroy()
340 344
341 return this.onAfterMuxingCleanup({ videoId, liveSession }) 345 return this.onAfterMuxingCleanup({ videoUUID, liveSession })
342 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 346 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
343 }) 347 })
344 348
@@ -379,22 +383,24 @@ class LiveManager {
379 } 383 }
380 } 384 }
381 385
382 private onMuxingFFmpegEnd (videoId: number, sessionId: string) { 386 private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
383 this.videoSessions.delete(videoId) 387 this.videoSessions.delete(videoUUID)
384 388
385 this.saveEndingSession(videoId, null) 389 this.saveEndingSession(videoUUID, null)
386 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) 390 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
387 } 391 }
388 392
389 private async onAfterMuxingCleanup (options: { 393 private async onAfterMuxingCleanup (options: {
390 videoId: number | string 394 videoUUID: string
391 liveSession?: MVideoLiveSession 395 liveSession?: MVideoLiveSession
392 cleanupNow?: boolean // Default false 396 cleanupNow?: boolean // Default false
393 }) { 397 }) {
394 const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options 398 const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
399
400 logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
395 401
396 try { 402 try {
397 const fullVideo = await VideoModel.loadFull(videoId) 403 const fullVideo = await VideoModel.loadFull(videoUUID)
398 if (!fullVideo) return 404 if (!fullVideo) return
399 405
400 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 406 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
@@ -437,15 +443,17 @@ class LiveManager {
437 443
438 await federateVideoIfNeeded(fullVideo, false) 444 await federateVideoIfNeeded(fullVideo, false)
439 } catch (err) { 445 } catch (err) {
440 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) 446 logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
441 } 447 }
442 } 448 }
443 449
444 private async handleBrokenLives () { 450 private async handleBrokenLives () {
451 await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' })
452
445 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 453 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
446 454
447 for (const uuid of videoUUIDs) { 455 for (const uuid of videoUUIDs) {
448 await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) 456 await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
449 } 457 }
450 } 458 }
451 459
@@ -494,8 +502,8 @@ class LiveManager {
494 }) 502 })
495 } 503 }
496 504
497 private async saveEndingSession (videoId: number, error: LiveVideoError | null) { 505 private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) {
498 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) 506 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
499 if (!liveSession) return 507 if (!liveSession) return
500 508
501 liveSession.endDate = new Date() 509 liveSession.endDate = new Date()
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts
index 4d03754a9..251301141 100644
--- a/server/lib/live/live-segment-sha-store.ts
+++ b/server/lib/live/live-segment-sha-store.ts
@@ -52,7 +52,10 @@ class LiveSegmentShaStore {
52 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) 52 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
53 53
54 if (!this.segmentsSha256.has(segmentName)) { 54 if (!this.segmentsSha256.has(segmentName)) {
55 logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID)) 55 logger.warn(
56 'Unknown segment in live segment hash store for video %s and segment %s.',
57 this.videoUUID, segmentPath, lTags(this.videoUUID)
58 )
56 return 59 return
57 } 60 }
58 61
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index c0dec9829..3fb3ce1ce 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -1,8 +1,9 @@
1import { pathExists, readdir, remove } from 'fs-extra' 1import { pathExists, readdir, remove } from 'fs-extra'
2import { basename, join } from 'path' 2import { basename, join } from 'path'
3import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
4import { VIDEO_LIVE } from '@server/initializers/constants'
4import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' 5import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
5import { VideoStorage } from '@shared/models' 6import { LiveVideoLatencyMode, VideoStorage } from '@shared/models'
6import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage' 7import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage'
7import { getLiveDirectory } from '../paths' 8import { getLiveDirectory } from '../paths'
8 9
@@ -37,10 +38,19 @@ async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreaming
37 await cleanupTMPLiveFilesFromFilesystem(video) 38 await cleanupTMPLiveFilesFromFilesystem(video)
38} 39}
39 40
41function getLiveSegmentTime (latencyMode: LiveVideoLatencyMode) {
42 if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) {
43 return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY
44 }
45
46 return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY
47}
48
40export { 49export {
41 cleanupAndDestroyPermanentLive, 50 cleanupAndDestroyPermanentLive,
42 cleanupUnsavedNormalLive, 51 cleanupUnsavedNormalLive,
43 cleanupTMPLiveFiles, 52 cleanupTMPLiveFiles,
53 getLiveSegmentTime,
44 buildConcatenatedName 54 buildConcatenatedName
45} 55}
46 56
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 2727fc4a7..f3f8fc886 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -1,11 +1,10 @@
1import { mapSeries } from 'bluebird' 1import { mapSeries } from 'bluebird'
2import { FSWatcher, watch } from 'chokidar' 2import { FSWatcher, watch } from 'chokidar'
3import { FfmpegCommand } from 'fluent-ffmpeg' 3import { EventEmitter } from 'events'
4import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 4import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
5import PQueue from 'p-queue' 5import PQueue from 'p-queue'
6import { basename, join } from 'path' 6import { basename, join } from 'path'
7import { EventEmitter } from 'stream' 7import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 9import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
@@ -20,24 +19,24 @@ import {
20 getLiveDirectory, 19 getLiveDirectory,
21 getLiveReplayBaseDirectory 20 getLiveReplayBaseDirectory
22} from '../../paths' 21} from '../../paths'
23import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
24import { isAbleToUploadVideo } from '../../user' 22import { isAbleToUploadVideo } from '../../user'
25import { LiveQuotaStore } from '../live-quota-store' 23import { LiveQuotaStore } from '../live-quota-store'
26import { LiveSegmentShaStore } from '../live-segment-sha-store' 24import { LiveSegmentShaStore } from '../live-segment-sha-store'
27import { buildConcatenatedName } from '../live-utils' 25import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils'
26import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper'
28 27
29import memoizee = require('memoizee') 28import memoizee = require('memoizee')
30interface MuxingSessionEvents { 29interface MuxingSessionEvents {
31 'live-ready': (options: { videoId: number }) => void 30 'live-ready': (options: { videoUUID: string }) => void
32 31
33 'bad-socket-health': (options: { videoId: number }) => void 32 'bad-socket-health': (options: { videoUUID: string }) => void
34 'duration-exceeded': (options: { videoId: number }) => void 33 'duration-exceeded': (options: { videoUUID: string }) => void
35 'quota-exceeded': (options: { videoId: number }) => void 34 'quota-exceeded': (options: { videoUUID: string }) => void
36 35
37 'ffmpeg-end': (options: { videoId: number }) => void 36 'transcoding-end': (options: { videoUUID: string }) => void
38 'ffmpeg-error': (options: { videoId: number }) => void 37 'transcoding-error': (options: { videoUUID: string }) => void
39 38
40 'after-cleanup': (options: { videoId: number }) => void 39 'after-cleanup': (options: { videoUUID: string }) => void
41} 40}
42 41
43declare interface MuxingSession { 42declare interface MuxingSession {
@@ -52,7 +51,7 @@ declare interface MuxingSession {
52 51
53class MuxingSession extends EventEmitter { 52class MuxingSession extends EventEmitter {
54 53
55 private ffmpegCommand: FfmpegCommand 54 private transcodingWrapper: AbstractTranscodingWrapper
56 55
57 private readonly context: any 56 private readonly context: any
58 private readonly user: MUserId 57 private readonly user: MUserId
@@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter {
67 66
68 private readonly hasAudio: boolean 67 private readonly hasAudio: boolean
69 68
70 private readonly videoId: number
71 private readonly videoUUID: string 69 private readonly videoUUID: string
72 private readonly saveReplay: boolean 70 private readonly saveReplay: boolean
73 71
@@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter {
126 124
127 this.allResolutions = options.allResolutions 125 this.allResolutions = options.allResolutions
128 126
129 this.videoId = this.videoLive.Video.id
130 this.videoUUID = this.videoLive.Video.uuid 127 this.videoUUID = this.videoLive.Video.uuid
131 128
132 this.saveReplay = this.videoLive.saveReplay 129 this.saveReplay = this.videoLive.saveReplay
@@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter {
145 142
146 await this.prepareDirectories() 143 await this.prepareDirectories()
147 144
148 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED 145 this.transcodingWrapper = this.buildTranscodingWrapper()
149 ? await getLiveTranscodingCommand({
150 inputUrl: this.inputUrl,
151 146
152 outPath: this.outDirectory, 147 this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
153 masterPlaylistName: this.streamingPlaylist.playlistFilename, 148 this.transcodingWrapper.on('error', () => this.onTranscodingError())
154 149
155 latencyMode: this.videoLive.latencyMode, 150 await this.transcodingWrapper.run()
156
157 resolutions: this.allResolutions,
158 fps: this.fps,
159 bitrate: this.bitrate,
160 ratio: this.ratio,
161
162 hasAudio: this.hasAudio,
163
164 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
165 profile: CONFIG.LIVE.TRANSCODING.PROFILE
166 })
167 : getLiveMuxingCommand({
168 inputUrl: this.inputUrl,
169 outPath: this.outDirectory,
170 masterPlaylistName: this.streamingPlaylist.playlistFilename,
171 latencyMode: this.videoLive.latencyMode
172 })
173
174 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
175 151
176 this.watchMasterFile() 152 this.watchMasterFile()
177 this.watchTSFiles() 153 this.watchTSFiles()
178 this.watchM3U8File() 154 this.watchM3U8File()
179
180 let ffmpegShellCommand: string
181 this.ffmpegCommand.on('start', cmdline => {
182 ffmpegShellCommand = cmdline
183
184 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
185 })
186
187 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
188 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
189 })
190
191 this.ffmpegCommand.on('end', () => {
192 this.emit('ffmpeg-end', ({ videoId: this.videoId }))
193
194 this.onFFmpegEnded()
195 })
196
197 this.ffmpegCommand.run()
198 } 155 }
199 156
200 abort () { 157 abort () {
201 if (!this.ffmpegCommand) return 158 if (!this.transcodingWrapper) return
202 159
203 this.aborted = true 160 this.aborted = true
204 this.ffmpegCommand.kill('SIGINT') 161 this.transcodingWrapper.abort()
205 } 162 }
206 163
207 destroy () { 164 destroy () {
@@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter {
210 this.hasClientSocketInBadHealthWithCache.clear() 167 this.hasClientSocketInBadHealthWithCache.clear()
211 } 168 }
212 169
213 private onFFmpegError (options: {
214 err: any
215 stdout: string
216 stderr: string
217 ffmpegShellCommand: string
218 }) {
219 const { err, stdout, stderr, ffmpegShellCommand } = options
220
221 this.onFFmpegEnded()
222
223 // Don't care that we killed the ffmpeg process
224 if (err?.message?.includes('Exiting normally')) return
225
226 logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
227
228 this.emit('ffmpeg-error', ({ videoId: this.videoId }))
229 }
230
231 private onFFmpegEnded () {
232 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
233
234 setTimeout(() => {
235 // Wait latest segments generation, and close watchers
236
237 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
238 .then(() => {
239 // Process remaining segments hash
240 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
241 this.processSegments(this.segmentsToProcessPerPlaylist[key])
242 }
243 })
244 .catch(err => {
245 logger.error(
246 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
247 { err, ...this.lTags() }
248 )
249 })
250
251 this.emit('after-cleanup', { videoId: this.videoId })
252 }, 1000)
253 }
254
255 private watchMasterFile () { 170 private watchMasterFile () {
256 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 171 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
257 172
@@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter {
272 187
273 this.masterPlaylistCreated = true 188 this.masterPlaylistCreated = true
274 189
190 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
191
275 this.masterWatcher.close() 192 this.masterWatcher.close()
276 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 193 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
277 }) 194 })
@@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter {
318 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] 235 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
319 236
320 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { 237 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
321 this.emit('bad-socket-health', { videoId: this.videoId }) 238 this.emit('bad-socket-health', { videoUUID: this.videoUUID })
322 return 239 return
323 } 240 }
324 241
325 // Duration constraint check 242 // Duration constraint check
326 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 243 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
327 this.emit('duration-exceeded', { videoId: this.videoId }) 244 this.emit('duration-exceeded', { videoUUID: this.videoUUID })
328 return 245 return
329 } 246 }
330 247
331 // Check user quota if the user enabled replay saving 248 // Check user quota if the user enabled replay saving
332 if (await this.isQuotaExceeded(segmentPath) === true) { 249 if (await this.isQuotaExceeded(segmentPath) === true) {
333 this.emit('quota-exceeded', { videoId: this.videoId }) 250 this.emit('quota-exceeded', { videoUUID: this.videoUUID })
334 } 251 }
335 } 252 }
336 253
@@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter {
438 if (this.masterPlaylistCreated && !this.liveReady) { 355 if (this.masterPlaylistCreated && !this.liveReady) {
439 this.liveReady = true 356 this.liveReady = true
440 357
441 this.emit('live-ready', { videoId: this.videoId }) 358 this.emit('live-ready', { videoUUID: this.videoUUID })
442 } 359 }
443 } 360 }
444 361
362 private onTranscodingError () {
363 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
364 }
365
366 private onTranscodedEnded () {
367 this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
368
369 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
370
371 setTimeout(() => {
372 // Wait latest segments generation, and close watchers
373
374 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
375 .then(() => {
376 // Process remaining segments hash
377 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
378 this.processSegments(this.segmentsToProcessPerPlaylist[key])
379 }
380 })
381 .catch(err => {
382 logger.error(
383 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
384 { err, ...this.lTags() }
385 )
386 })
387
388 this.emit('after-cleanup', { videoUUID: this.videoUUID })
389 }, 1000)
390 }
391
445 private hasClientSocketInBadHealth (sessionId: string) { 392 private hasClientSocketInBadHealth (sessionId: string) {
446 const rtmpSession = this.context.sessions.get(sessionId) 393 const rtmpSession = this.context.sessions.get(sessionId)
447 394
@@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter {
503 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED 450 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
504 }) 451 })
505 } 452 }
453
454 private buildTranscodingWrapper () {
455 const options = {
456 streamingPlaylist: this.streamingPlaylist,
457 videoLive: this.videoLive,
458
459 lTags: this.lTags,
460
461 inputUrl: this.inputUrl,
462
463 toTranscode: this.allResolutions.map(resolution => ({
464 resolution,
465 fps: computeOutputFPS({ inputFPS: this.fps, resolution })
466 })),
467
468 fps: this.fps,
469 bitrate: this.bitrate,
470 ratio: this.ratio,
471 hasAudio: this.hasAudio,
472
473 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
474 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
475
476 outDirectory: this.outDirectory
477 }
478
479 return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
480 ? new RemoteTranscodingWrapper(options)
481 : new FFmpegTranscodingWrapper(options)
482 }
506} 483}
507 484
508// --------------------------------------------------------------------------- 485// ---------------------------------------------------------------------------
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
new file mode 100644
index 000000000..226ba4573
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
@@ -0,0 +1,101 @@
1import EventEmitter from 'events'
2import { LoggerTagsFn } from '@server/helpers/logger'
3import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models'
4import { LiveVideoError } from '@shared/models'
5
6interface TranscodingWrapperEvents {
7 'end': () => void
8
9 'error': (options: { err: Error }) => void
10}
11
12declare interface AbstractTranscodingWrapper {
13 on<U extends keyof TranscodingWrapperEvents>(
14 event: U, listener: TranscodingWrapperEvents[U]
15 ): this
16
17 emit<U extends keyof TranscodingWrapperEvents>(
18 event: U, ...args: Parameters<TranscodingWrapperEvents[U]>
19 ): boolean
20}
21
22interface AbstractTranscodingWrapperOptions {
23 streamingPlaylist: MStreamingPlaylistVideo
24 videoLive: MVideoLiveVideo
25
26 lTags: LoggerTagsFn
27
28 inputUrl: string
29 fps: number
30 toTranscode: {
31 resolution: number
32 fps: number
33 }[]
34
35 bitrate: number
36 ratio: number
37 hasAudio: boolean
38
39 segmentListSize: number
40 segmentDuration: number
41
42 outDirectory: string
43}
44
45abstract class AbstractTranscodingWrapper extends EventEmitter {
46 protected readonly videoLive: MVideoLiveVideo
47
48 protected readonly toTranscode: {
49 resolution: number
50 fps: number
51 }[]
52
53 protected readonly inputUrl: string
54 protected readonly fps: number
55 protected readonly bitrate: number
56 protected readonly ratio: number
57 protected readonly hasAudio: boolean
58
59 protected readonly segmentListSize: number
60 protected readonly segmentDuration: number
61
62 protected readonly videoUUID: string
63
64 protected readonly outDirectory: string
65
66 protected readonly lTags: LoggerTagsFn
67
68 protected readonly streamingPlaylist: MStreamingPlaylistVideo
69
70 constructor (options: AbstractTranscodingWrapperOptions) {
71 super()
72
73 this.lTags = options.lTags
74
75 this.videoLive = options.videoLive
76 this.videoUUID = options.videoLive.Video.uuid
77 this.streamingPlaylist = options.streamingPlaylist
78
79 this.inputUrl = options.inputUrl
80 this.fps = options.fps
81 this.toTranscode = options.toTranscode
82
83 this.bitrate = options.bitrate
84 this.ratio = options.ratio
85 this.hasAudio = options.hasAudio
86
87 this.segmentListSize = options.segmentListSize
88 this.segmentDuration = options.segmentDuration
89
90 this.outDirectory = options.outDirectory
91 }
92
93 abstract run (): Promise<void>
94
95 abstract abort (error?: LiveVideoError): void
96}
97
98export {
99 AbstractTranscodingWrapper,
100 AbstractTranscodingWrapperOptions
101}
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
new file mode 100644
index 000000000..1f4c12bd4
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
@@ -0,0 +1,95 @@
1import { FfmpegCommand } from 'fluent-ffmpeg'
2import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
3import { logger } from '@server/helpers/logger'
4import { CONFIG } from '@server/initializers/config'
5import { VIDEO_LIVE } from '@server/initializers/constants'
6import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
7import { FFmpegLive } from '@shared/ffmpeg'
8import { getLiveSegmentTime } from '../../live-utils'
9import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
10
11export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
12 private ffmpegCommand: FfmpegCommand
13 private ended = false
14
15 async run () {
16 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
17 ? await this.buildFFmpegLive().getLiveTranscodingCommand({
18 inputUrl: this.inputUrl,
19
20 outPath: this.outDirectory,
21 masterPlaylistName: this.streamingPlaylist.playlistFilename,
22
23 segmentListSize: this.segmentListSize,
24 segmentDuration: this.segmentDuration,
25
26 toTranscode: this.toTranscode,
27
28 bitrate: this.bitrate,
29 ratio: this.ratio,
30
31 hasAudio: this.hasAudio
32 })
33 : this.buildFFmpegLive().getLiveMuxingCommand({
34 inputUrl: this.inputUrl,
35 outPath: this.outDirectory,
36
37 masterPlaylistName: this.streamingPlaylist.playlistFilename,
38
39 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
40 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode)
41 })
42
43 logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags())
44
45 this.ffmpegCommand.run()
46
47 let ffmpegShellCommand: string
48 this.ffmpegCommand.on('start', cmdline => {
49 ffmpegShellCommand = cmdline
50
51 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
52 })
53
54 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
55 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
56 })
57
58 this.ffmpegCommand.on('end', () => {
59 this.onFFmpegEnded()
60 })
61
62 this.ffmpegCommand.run()
63 }
64
65 abort () {
66 // Nothing to do, ffmpeg will automatically exit
67 }
68
69 private onFFmpegError (options: {
70 err: any
71 stdout: string
72 stderr: string
73 ffmpegShellCommand: string
74 }) {
75 const { err, stdout, stderr, ffmpegShellCommand } = options
76
77 // Don't care that we killed the ffmpeg process
78 if (err?.message?.includes('Exiting normally')) return
79
80 logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
81
82 this.emit('error', { err })
83 }
84
85 private onFFmpegEnded () {
86 if (this.ended) return
87
88 this.ended = true
89 this.emit('end')
90 }
91
92 private buildFFmpegLive () {
93 return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
94 }
95}
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts
new file mode 100644
index 000000000..ae28fa1ca
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/index.ts
@@ -0,0 +1,3 @@
1export * from './abstract-transcoding-wrapper'
2export * from './ffmpeg-transcoding-wrapper'
3export * from './remote-transcoding-wrapper'
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
new file mode 100644
index 000000000..345eaf442
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
@@ -0,0 +1,20 @@
1import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners'
2import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
3
4export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
5 async run () {
6 await new LiveRTMPHLSTranscodingJobHandler().create({
7 rtmpUrl: this.inputUrl,
8 toTranscode: this.toTranscode,
9 video: this.videoLive.Video,
10 outputDirectory: this.outDirectory,
11 playlist: this.streamingPlaylist,
12 segmentListSize: this.segmentListSize,
13 segmentDuration: this.segmentDuration
14 })
15 }
16
17 abort () {
18 this.emit('end')
19 }
20}