From df1db951c512a58110171d046ef367789df02733 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 5 Nov 2021 11:36:03 +0100 Subject: Support RTMPS --- server/lib/live/live-manager.ts | 92 +++++++++++++++++++++----------- server/lib/live/shared/muxing-session.ts | 12 ++--- 2 files changed, 68 insertions(+), 36 deletions(-) (limited to 'server/lib') diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index d7dc841d9..c75cc3bda 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,5 +1,7 @@ +import { readFile } from 'fs-extra' import { createServer, Server } from 'net' +import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { isTestInstance } from '@server/helpers/core-utils' import { computeResolutionsToTranscode, @@ -19,8 +21,8 @@ import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/ import { VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' -import { PeerTubeSocket } from '../peertube-socket' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' +import { PeerTubeSocket } from '../peertube-socket' import { LiveQuotaStore } from './live-quota-store' import { LiveSegmentShaStore } from './live-segment-sha-store' import { cleanupLive } from './live-utils' @@ -40,9 +42,6 @@ const config = { gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, ping: VIDEO_LIVE.RTMP.PING, ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT - }, - transcoding: { - ffmpeg: 'ffmpeg' } } @@ -58,6 +57,9 @@ class LiveManager { private readonly watchersPerVideo = new Map() private rtmpServer: Server + private rtmpsServer: ServerTLS + + private running = false private constructor () { } @@ -73,7 +75,9 @@ class LiveManager { return this.abortSession(sessionId) } - this.handleSession(sessionId, streamPath, splittedPath[2]) + const session = this.getContext().sessions.get(sessionId) + + this.handleSession(sessionId, session.inputOriginUrl + streamPath, splittedPath[2]) .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) }) @@ -82,12 +86,12 @@ class LiveManager { }) registerConfigChangedHandler(() => { - if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { - this.run() + if (!this.running && CONFIG.LIVE.ENABLED === true) { + this.run().catch(err => logger.error('Cannot run live server.', { err })) return } - if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) { + if (this.running && CONFIG.LIVE.ENABLED === false) { this.stop() } }) @@ -99,23 +103,53 @@ class LiveManager { setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } - run () { - logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) + async run () { + this.running = true - this.rtmpServer = createServer(socket => { - const session = new NodeRtmpSession(config, socket) + if (CONFIG.LIVE.RTMP.ENABLED) { + logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags()) - session.run() - }) + this.rtmpServer = createServer(socket => { + const session = new NodeRtmpSession(config, socket) - this.rtmpServer.on('error', err => { - logger.error('Cannot run RTMP server.', { err, ...lTags() }) - }) + session.inputOriginUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT + session.run() + }) + + this.rtmpServer.on('error', err => { + logger.error('Cannot run RTMP server.', { err, ...lTags() }) + }) + + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + } - this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + if (CONFIG.LIVE.RTMPS.ENABLED) { + logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags()) + + const [ key, cert ] = await Promise.all([ + readFile(CONFIG.LIVE.RTMPS.KEY_FILE), + readFile(CONFIG.LIVE.RTMPS.CERT_FILE) + ]) + const serverOptions = { key, cert } + + this.rtmpsServer = createServerTLS(serverOptions, socket => { + const session = new NodeRtmpSession(config, socket) + + session.inputOriginUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT + session.run() + }) + + this.rtmpsServer.on('error', err => { + logger.error('Cannot run RTMPS server.', { err, ...lTags() }) + }) + + this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT) + } } stop () { + this.running = false + logger.info('Stopping RTMP server.', lTags()) this.rtmpServer.close() @@ -174,7 +208,7 @@ class LiveManager { } } - private async handleSession (sessionId: string, streamPath: string, streamKey: string) { + private async handleSession (sessionId: string, inputUrl: string, streamKey: string) { const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) if (!videoLive) { logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) @@ -197,20 +231,18 @@ class LiveManager { this.videoSessions.set(video.id, sessionId) - const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath - const now = Date.now() - const probe = await ffprobePromise(rtmpUrl) + const probe = await ffprobePromise(inputUrl) const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([ - getVideoFileResolution(rtmpUrl, probe), - getVideoFileFPS(rtmpUrl, probe), - getVideoFileBitrate(rtmpUrl, probe) + getVideoFileResolution(inputUrl, probe), + getVideoFileFPS(inputUrl, probe), + getVideoFileBitrate(inputUrl, probe) ]) logger.info( '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)', - rtmpUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) + inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) ) const allResolutions = this.buildAllResolutionsToTranscode(resolution) @@ -226,7 +258,7 @@ class LiveManager { sessionId, videoLive, streamingPlaylist, - rtmpUrl, + inputUrl, fps, bitrate, ratio, @@ -238,13 +270,13 @@ class LiveManager { sessionId: string videoLive: MVideoLiveVideo streamingPlaylist: MStreamingPlaylistVideo - rtmpUrl: string + inputUrl: string fps: number bitrate: number ratio: number allResolutions: number[] }) { - const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, rtmpUrl } = options + const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, inputUrl } = options const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) @@ -257,7 +289,7 @@ class LiveManager { sessionId, videoLive, streamingPlaylist, - rtmpUrl, + inputUrl, bitrate, ratio, fps, diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index b52363af7..c71f4e25f 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -52,7 +52,7 @@ class MuxingSession extends EventEmitter { private readonly sessionId: string private readonly videoLive: MVideoLiveVideo private readonly streamingPlaylist: MStreamingPlaylistVideo - private readonly rtmpUrl: string + private readonly inputUrl: string private readonly fps: number private readonly allResolutions: number[] @@ -84,7 +84,7 @@ class MuxingSession extends EventEmitter { sessionId: string videoLive: MVideoLiveVideo streamingPlaylist: MStreamingPlaylistVideo - rtmpUrl: string + inputUrl: string fps: number bitrate: number ratio: number @@ -97,7 +97,7 @@ class MuxingSession extends EventEmitter { this.sessionId = options.sessionId this.videoLive = options.videoLive this.streamingPlaylist = options.streamingPlaylist - this.rtmpUrl = options.rtmpUrl + this.inputUrl = options.inputUrl this.fps = options.fps this.bitrate = options.bitrate @@ -120,7 +120,7 @@ class MuxingSession extends EventEmitter { this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED ? await getLiveTranscodingCommand({ - rtmpUrl: this.rtmpUrl, + inputUrl: this.inputUrl, outPath, masterPlaylistName: this.streamingPlaylist.playlistFilename, @@ -133,7 +133,7 @@ class MuxingSession extends EventEmitter { availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), profile: CONFIG.LIVE.TRANSCODING.PROFILE }) - : getLiveMuxingCommand(this.rtmpUrl, outPath, this.streamingPlaylist.playlistFilename) + : getLiveMuxingCommand(this.inputUrl, outPath, this.streamingPlaylist.playlistFilename) logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags) @@ -173,7 +173,7 @@ class MuxingSession extends EventEmitter { } private onFFmpegEnded (outPath: string) { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags) + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags) setTimeout(() => { // Wait latest segments generation, and close watchers -- cgit v1.2.3