]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/live/live-manager.ts
Add Podcast RSS feeds (#5487)
[github/Chocobozzz/PeerTube.git] / server / lib / live / live-manager.ts
index aa32a9d522d292e0ccc02b6d34057333f7b7e48b..acb7af2745fc6d551a67011a2b0bf5f1284ee11c 100644 (file)
@@ -4,8 +4,9 @@ import { join } from 'path'
 import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { VIDEO_LIVE } from '@server/initializers/constants'
+import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
 import { sequelizeTypescript } from '@server/initializers/database'
+import { RunnerJobModel } from '@server/models/runner/runner-job'
 import { UserModel } from '@server/models/user/user'
 import { VideoModel } from '@server/models/video/video'
 import { VideoLiveModel } from '@server/models/video/video-live'
@@ -25,7 +26,6 @@ import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolu
 import { LiveQuotaStore } from './live-quota-store'
 import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils'
 import { MuxingSession } from './shared'
-import { RunnerJobModel } from '@server/models/runner/runner-job'
 
 const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
 const context = require('node-media-server/src/node_core_ctx')
@@ -73,13 +73,18 @@ class LiveManager {
       }
 
       const session = this.getContext().sessions.get(sessionId)
+      const inputLocalUrl = session.inputOriginLocalUrl + streamPath
+      const inputPublicUrl = session.inputOriginPublicUrl + streamPath
 
-      this.handleSession(sessionId, session.inputOriginUrl + streamPath, splittedPath[2])
+      this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] })
         .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
     })
 
     events.on('donePublish', sessionId => {
       logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
+
+      // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU)
+      setTimeout(() => this.abortSession(sessionId), 2000)
     })
 
     registerConfigChangedHandler(() => {
@@ -107,7 +112,8 @@ class LiveManager {
       this.rtmpServer = createServer(socket => {
         const session = new NodeRtmpSession(config, socket)
 
-        session.inputOriginUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
+        session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
+        session.inputOriginPublicUrl = WEBSERVER.RTMP_URL
         session.run()
       })
 
@@ -130,7 +136,8 @@ class LiveManager {
       this.rtmpsServer = createServerTLS(serverOptions, socket => {
         const session = new NodeRtmpSession(config, socket)
 
-        session.inputOriginUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
+        session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
+        session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL
         session.run()
       })
 
@@ -171,6 +178,10 @@ class LiveManager {
     return !!this.rtmpServer
   }
 
+  hasSession (sessionId: string) {
+    return this.getContext().sessions.has(sessionId)
+  }
+
   stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
     const sessionId = this.videoSessions.get(videoUUID)
     if (!sessionId) {
@@ -207,7 +218,14 @@ class LiveManager {
     }
   }
 
-  private async handleSession (sessionId: string, inputUrl: string, streamKey: string) {
+  private async handleSession (options: {
+    sessionId: string
+    inputLocalUrl: string
+    inputPublicUrl: string
+    streamKey: string
+  }) {
+    const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options
+
     const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
     if (!videoLive) {
       logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
@@ -236,18 +254,18 @@ class LiveManager {
     this.videoSessions.set(video.uuid, sessionId)
 
     const now = Date.now()
-    const probe = await ffprobePromise(inputUrl)
+    const probe = await ffprobePromise(inputLocalUrl)
 
     const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([
-      getVideoStreamDimensionsInfo(inputUrl, probe),
-      getVideoStreamFPS(inputUrl, probe),
-      getVideoStreamBitrate(inputUrl, probe),
-      hasAudioStream(inputUrl, probe)
+      getVideoStreamDimensionsInfo(inputLocalUrl, probe),
+      getVideoStreamFPS(inputLocalUrl, probe),
+      getVideoStreamBitrate(inputLocalUrl, probe),
+      hasAudioStream(inputLocalUrl, probe)
     ])
 
     logger.info(
       '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
-      inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
+      inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
     )
 
     const allResolutions = await Hooks.wrapObject(
@@ -265,7 +283,8 @@ class LiveManager {
       sessionId,
       videoLive,
 
-      inputUrl,
+      inputLocalUrl,
+      inputPublicUrl,
       fps,
       bitrate,
       ratio,
@@ -278,7 +297,9 @@ class LiveManager {
     sessionId: string
     videoLive: MVideoLiveVideoWithSetting
 
-    inputUrl: string
+    inputLocalUrl: string
+    inputPublicUrl: string
+
     fps: number
     bitrate: number
     ratio: number
@@ -292,7 +313,7 @@ class LiveManager {
     const liveSession = await this.saveStartingSession(videoLive)
 
     const user = await UserModel.loadByLiveId(videoLive.id)
-    LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
+    LiveQuotaStore.Instance.addNewLive(user.id, sessionId)
 
     const muxingSession = new MuxingSession({
       context: this.getContext(),
@@ -300,7 +321,7 @@ class LiveManager {
       videoLive,
       user,
 
-      ...pick(options, [ 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
+      ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
     })
 
     muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
@@ -338,7 +359,7 @@ class LiveManager {
     muxingSession.on('after-cleanup', ({ videoUUID }) => {
       this.muxingSessions.delete(sessionId)
 
-      LiveQuotaStore.Instance.removeLive(user.id, videoLive.id)
+      LiveQuotaStore.Instance.removeLive(user.id, sessionId)
 
       muxingSession.destroy()
 
@@ -378,12 +399,17 @@ class LiveManager {
       }
 
       PeerTubeSocket.Instance.sendVideoLiveNewState(video)
+
+      Hooks.runAction('action:live.video.state.updated', { video })
     } catch (err) {
       logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
     }
   }
 
   private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
+    // Session already cleaned up
+    if (!this.videoSessions.has(videoUUID)) return
+
     this.videoSessions.delete(videoUUID)
 
     this.saveEndingSession(videoUUID, null)
@@ -442,6 +468,8 @@ class LiveManager {
       PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
 
       await federateVideoIfNeeded(fullVideo, false)
+
+      Hooks.runAction('action:live.video.state.updated', { video: fullVideo })
     } catch (err) {
       logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
     }