aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-11-05 11:36:03 +0100
committerChocobozzz <me@florianbigard.com>2021-11-05 11:38:17 +0100
commitdf1db951c512a58110171d046ef367789df02733 (patch)
treea8894b4a4864d9e378923f011b4ca9d206e2ee0b /server/lib/live
parent8dd754c76735417305c4b68e2ada6f623e9d7650 (diff)
downloadPeerTube-df1db951c512a58110171d046ef367789df02733.tar.gz
PeerTube-df1db951c512a58110171d046ef367789df02733.tar.zst
PeerTube-df1db951c512a58110171d046ef367789df02733.zip
Support RTMPS
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/live-manager.ts92
-rw-r--r--server/lib/live/shared/muxing-session.ts12
2 files changed, 68 insertions, 36 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index d7dc841d9..c75cc3bda 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -1,5 +1,7 @@
1 1
2import { readFile } from 'fs-extra'
2import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
3import { isTestInstance } from '@server/helpers/core-utils' 5import { isTestInstance } from '@server/helpers/core-utils'
4import { 6import {
5 computeResolutionsToTranscode, 7 computeResolutionsToTranscode,
@@ -19,8 +21,8 @@ import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/
19import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 21import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
20import { federateVideoIfNeeded } from '../activitypub/videos' 22import { federateVideoIfNeeded } from '../activitypub/videos'
21import { JobQueue } from '../job-queue' 23import { JobQueue } from '../job-queue'
22import { PeerTubeSocket } from '../peertube-socket'
23import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' 24import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths'
25import { PeerTubeSocket } from '../peertube-socket'
24import { LiveQuotaStore } from './live-quota-store' 26import { LiveQuotaStore } from './live-quota-store'
25import { LiveSegmentShaStore } from './live-segment-sha-store' 27import { LiveSegmentShaStore } from './live-segment-sha-store'
26import { cleanupLive } from './live-utils' 28import { cleanupLive } from './live-utils'
@@ -40,9 +42,6 @@ const config = {
40 gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, 42 gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
41 ping: VIDEO_LIVE.RTMP.PING, 43 ping: VIDEO_LIVE.RTMP.PING,
42 ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT 44 ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
43 },
44 transcoding: {
45 ffmpeg: 'ffmpeg'
46 } 45 }
47} 46}
48 47
@@ -58,6 +57,9 @@ class LiveManager {
58 private readonly watchersPerVideo = new Map<number, number[]>() 57 private readonly watchersPerVideo = new Map<number, number[]>()
59 58
60 private rtmpServer: Server 59 private rtmpServer: Server
60 private rtmpsServer: ServerTLS
61
62 private running = false
61 63
62 private constructor () { 64 private constructor () {
63 } 65 }
@@ -73,7 +75,9 @@ class LiveManager {
73 return this.abortSession(sessionId) 75 return this.abortSession(sessionId)
74 } 76 }
75 77
76 this.handleSession(sessionId, streamPath, splittedPath[2]) 78 const session = this.getContext().sessions.get(sessionId)
79
80 this.handleSession(sessionId, session.inputOriginUrl + streamPath, splittedPath[2])
77 .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) 81 .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
78 }) 82 })
79 83
@@ -82,12 +86,12 @@ class LiveManager {
82 }) 86 })
83 87
84 registerConfigChangedHandler(() => { 88 registerConfigChangedHandler(() => {
85 if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { 89 if (!this.running && CONFIG.LIVE.ENABLED === true) {
86 this.run() 90 this.run().catch(err => logger.error('Cannot run live server.', { err }))
87 return 91 return
88 } 92 }
89 93
90 if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) { 94 if (this.running && CONFIG.LIVE.ENABLED === false) {
91 this.stop() 95 this.stop()
92 } 96 }
93 }) 97 })
@@ -99,23 +103,53 @@ class LiveManager {
99 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) 103 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
100 } 104 }
101 105
102 run () { 106 async run () {
103 logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) 107 this.running = true
104 108
105 this.rtmpServer = createServer(socket => { 109 if (CONFIG.LIVE.RTMP.ENABLED) {
106 const session = new NodeRtmpSession(config, socket) 110 logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags())
107 111
108 session.run() 112 this.rtmpServer = createServer(socket => {
109 }) 113 const session = new NodeRtmpSession(config, socket)
110 114
111 this.rtmpServer.on('error', err => { 115 session.inputOriginUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
112 logger.error('Cannot run RTMP server.', { err, ...lTags() }) 116 session.run()
113 }) 117 })
118
119 this.rtmpServer.on('error', err => {
120 logger.error('Cannot run RTMP server.', { err, ...lTags() })
121 })
122
123 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
124 }
114 125
115 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) 126 if (CONFIG.LIVE.RTMPS.ENABLED) {
127 logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags())
128
129 const [ key, cert ] = await Promise.all([
130 readFile(CONFIG.LIVE.RTMPS.KEY_FILE),
131 readFile(CONFIG.LIVE.RTMPS.CERT_FILE)
132 ])
133 const serverOptions = { key, cert }
134
135 this.rtmpsServer = createServerTLS(serverOptions, socket => {
136 const session = new NodeRtmpSession(config, socket)
137
138 session.inputOriginUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
139 session.run()
140 })
141
142 this.rtmpsServer.on('error', err => {
143 logger.error('Cannot run RTMPS server.', { err, ...lTags() })
144 })
145
146 this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT)
147 }
116 } 148 }
117 149
118 stop () { 150 stop () {
151 this.running = false
152
119 logger.info('Stopping RTMP server.', lTags()) 153 logger.info('Stopping RTMP server.', lTags())
120 154
121 this.rtmpServer.close() 155 this.rtmpServer.close()
@@ -174,7 +208,7 @@ class LiveManager {
174 } 208 }
175 } 209 }
176 210
177 private async handleSession (sessionId: string, streamPath: string, streamKey: string) { 211 private async handleSession (sessionId: string, inputUrl: string, streamKey: string) {
178 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) 212 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
179 if (!videoLive) { 213 if (!videoLive) {
180 logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) 214 logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
@@ -197,20 +231,18 @@ class LiveManager {
197 231
198 this.videoSessions.set(video.id, sessionId) 232 this.videoSessions.set(video.id, sessionId)
199 233
200 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
201
202 const now = Date.now() 234 const now = Date.now()
203 const probe = await ffprobePromise(rtmpUrl) 235 const probe = await ffprobePromise(inputUrl)
204 236
205 const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([ 237 const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([
206 getVideoFileResolution(rtmpUrl, probe), 238 getVideoFileResolution(inputUrl, probe),
207 getVideoFileFPS(rtmpUrl, probe), 239 getVideoFileFPS(inputUrl, probe),
208 getVideoFileBitrate(rtmpUrl, probe) 240 getVideoFileBitrate(inputUrl, probe)
209 ]) 241 ])
210 242
211 logger.info( 243 logger.info(
212 '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)', 244 '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
213 rtmpUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) 245 inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
214 ) 246 )
215 247
216 const allResolutions = this.buildAllResolutionsToTranscode(resolution) 248 const allResolutions = this.buildAllResolutionsToTranscode(resolution)
@@ -226,7 +258,7 @@ class LiveManager {
226 sessionId, 258 sessionId,
227 videoLive, 259 videoLive,
228 streamingPlaylist, 260 streamingPlaylist,
229 rtmpUrl, 261 inputUrl,
230 fps, 262 fps,
231 bitrate, 263 bitrate,
232 ratio, 264 ratio,
@@ -238,13 +270,13 @@ class LiveManager {
238 sessionId: string 270 sessionId: string
239 videoLive: MVideoLiveVideo 271 videoLive: MVideoLiveVideo
240 streamingPlaylist: MStreamingPlaylistVideo 272 streamingPlaylist: MStreamingPlaylistVideo
241 rtmpUrl: string 273 inputUrl: string
242 fps: number 274 fps: number
243 bitrate: number 275 bitrate: number
244 ratio: number 276 ratio: number
245 allResolutions: number[] 277 allResolutions: number[]
246 }) { 278 }) {
247 const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, rtmpUrl } = options 279 const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, inputUrl } = options
248 const videoUUID = videoLive.Video.uuid 280 const videoUUID = videoLive.Video.uuid
249 const localLTags = lTags(sessionId, videoUUID) 281 const localLTags = lTags(sessionId, videoUUID)
250 282
@@ -257,7 +289,7 @@ class LiveManager {
257 sessionId, 289 sessionId,
258 videoLive, 290 videoLive,
259 streamingPlaylist, 291 streamingPlaylist,
260 rtmpUrl, 292 inputUrl,
261 bitrate, 293 bitrate,
262 ratio, 294 ratio,
263 fps, 295 fps,
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index b52363af7..c71f4e25f 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -52,7 +52,7 @@ class MuxingSession extends EventEmitter {
52 private readonly sessionId: string 52 private readonly sessionId: string
53 private readonly videoLive: MVideoLiveVideo 53 private readonly videoLive: MVideoLiveVideo
54 private readonly streamingPlaylist: MStreamingPlaylistVideo 54 private readonly streamingPlaylist: MStreamingPlaylistVideo
55 private readonly rtmpUrl: string 55 private readonly inputUrl: string
56 private readonly fps: number 56 private readonly fps: number
57 private readonly allResolutions: number[] 57 private readonly allResolutions: number[]
58 58
@@ -84,7 +84,7 @@ class MuxingSession extends EventEmitter {
84 sessionId: string 84 sessionId: string
85 videoLive: MVideoLiveVideo 85 videoLive: MVideoLiveVideo
86 streamingPlaylist: MStreamingPlaylistVideo 86 streamingPlaylist: MStreamingPlaylistVideo
87 rtmpUrl: string 87 inputUrl: string
88 fps: number 88 fps: number
89 bitrate: number 89 bitrate: number
90 ratio: number 90 ratio: number
@@ -97,7 +97,7 @@ class MuxingSession extends EventEmitter {
97 this.sessionId = options.sessionId 97 this.sessionId = options.sessionId
98 this.videoLive = options.videoLive 98 this.videoLive = options.videoLive
99 this.streamingPlaylist = options.streamingPlaylist 99 this.streamingPlaylist = options.streamingPlaylist
100 this.rtmpUrl = options.rtmpUrl 100 this.inputUrl = options.inputUrl
101 this.fps = options.fps 101 this.fps = options.fps
102 102
103 this.bitrate = options.bitrate 103 this.bitrate = options.bitrate
@@ -120,7 +120,7 @@ class MuxingSession extends EventEmitter {
120 120
121 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED 121 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
122 ? await getLiveTranscodingCommand({ 122 ? await getLiveTranscodingCommand({
123 rtmpUrl: this.rtmpUrl, 123 inputUrl: this.inputUrl,
124 124
125 outPath, 125 outPath,
126 masterPlaylistName: this.streamingPlaylist.playlistFilename, 126 masterPlaylistName: this.streamingPlaylist.playlistFilename,
@@ -133,7 +133,7 @@ class MuxingSession extends EventEmitter {
133 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 133 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
134 profile: CONFIG.LIVE.TRANSCODING.PROFILE 134 profile: CONFIG.LIVE.TRANSCODING.PROFILE
135 }) 135 })
136 : getLiveMuxingCommand(this.rtmpUrl, outPath, this.streamingPlaylist.playlistFilename) 136 : getLiveMuxingCommand(this.inputUrl, outPath, this.streamingPlaylist.playlistFilename)
137 137
138 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags) 138 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags)
139 139
@@ -173,7 +173,7 @@ class MuxingSession extends EventEmitter {
173 } 173 }
174 174
175 private onFFmpegEnded (outPath: string) { 175 private onFFmpegEnded (outPath: string) {
176 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags) 176 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags)
177 177
178 setTimeout(() => { 178 setTimeout(() => {
179 // Wait latest segments generation, and close watchers 179 // Wait latest segments generation, and close watchers