]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/live-manager.ts
Save
[github/Chocobozzz/PeerTube.git] / server / lib / live-manager.ts
index f602bfb6da36fd97684b0b45abc2c9ec1242cea8..3ff2434ff4b9e5507d25a41e24c7b55ecefc68c1 100644 (file)
@@ -2,20 +2,27 @@
 import { AsyncQueue, queue } from 'async'
 import * as chokidar from 'chokidar'
 import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir, readdir, remove } from 'fs-extra'
-import { basename, join } from 'path'
+import { ensureDir, stat } from 'fs-extra'
+import { basename } from 'path'
 import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
 import { logger } from '@server/helpers/logger'
 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { UserModel } from '@server/models/account/user'
+import { VideoModel } from '@server/models/video/video'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { VideoLiveModel } from '@server/models/video/video-live'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models'
+import { MStreamingPlaylist, MUser, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
 import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
+import { federateVideoIfNeeded } from './activitypub/videos'
 import { buildSha256Segment } from './hls'
+import { JobQueue } from './job-queue'
+import { PeerTubeSocket } from './peertube-socket'
+import { isAbleToUploadVideo } from './user'
 import { getHLSDirectory } from './video-paths'
 
+import memoizee = require('memoizee')
 const NodeRtmpServer = require('node-media-server/node_rtmp_server')
 const context = require('node-media-server/node_core_ctx')
 const nodeMediaServerLogger = require('node-media-server/node_core_logger')
@@ -47,7 +54,13 @@ class LiveManager {
   private static instance: LiveManager
 
   private readonly transSessions = new Map<string, FfmpegCommand>()
+  private readonly videoSessions = new Map<number, string>()
   private readonly segmentsSha256 = new Map<string, Map<string, string>>()
+  private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
+
+  private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
+    return isAbleToUploadVideo(userId, 1000)
+  }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
 
   private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
   private rtmpServer: any
@@ -56,7 +69,8 @@ class LiveManager {
   }
 
   init () {
-    this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => {
+    const events = this.getContext().nodeEvent
+    events.on('postPublish', (sessionId: string, streamPath: string) => {
       logger.debug('RTMP received stream', { id: sessionId, streamPath })
 
       const splittedPath = streamPath.split('/')
@@ -69,7 +83,7 @@ class LiveManager {
         .catch(err => logger.error('Cannot handle sessions.', { err }))
     })
 
-    this.getContext().nodeEvent.on('donePublish', sessionId => {
+    events.on('donePublish', sessionId => {
       this.abortSession(sessionId)
     })
 
@@ -115,6 +129,16 @@ class LiveManager {
     return this.segmentsSha256.get(videoUUID)
   }
 
+  stopSessionOf (videoId: number) {
+    const sessionId = this.videoSessions.get(videoId)
+    if (!sessionId) return
+
+    this.abortSession(sessionId)
+
+    this.onEndTransmuxing(videoId, true)
+      .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
+  }
+
   private getContext () {
     return context
   }
@@ -135,6 +159,13 @@ class LiveManager {
     }
 
     const video = videoLive.Video
+    if (video.isBlacklisted()) {
+      logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
+      return this.abortSession(sessionId)
+    }
+
+    this.videoSessions.set(video.id, sessionId)
+
     const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
 
     const session = this.getContext().sessions.get(sessionId)
@@ -154,11 +185,6 @@ class LiveManager {
       type: VideoStreamingPlaylistType.HLS
     }, { returning: true }) as [ MStreamingPlaylist, boolean ]
 
-    video.state = VideoState.PUBLISHED
-    await video.save()
-
-    // FIXME: federation?
-
     return this.runMuxing({
       sessionId,
       videoLive,
@@ -178,8 +204,18 @@ class LiveManager {
     originalResolution: number
   }) {
     const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options
+    const startStreamDateTime = new Date().getTime()
     const allResolutions = resolutionsEnabled.concat([ originalResolution ])
 
+    const user = await UserModel.loadByLiveId(videoLive.id)
+    if (!this.livesPerUser.has(user.id)) {
+      this.livesPerUser.set(user.id, [])
+    }
+
+    const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
+    const livesOfUser = this.livesPerUser.get(user.id)
+    livesOfUser.push(currentUserLive)
+
     for (let i = 0; i < allResolutions.length; i++) {
       const resolution = allResolutions[i]
 
@@ -198,20 +234,76 @@ class LiveManager {
     const outPath = getHLSDirectory(videoLive.Video)
     await ensureDir(outPath)
 
+    const deleteSegments = videoLive.saveReplay === false
+
     const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
     const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
-      ? runLiveTranscoding(rtmpUrl, outPath, allResolutions)
-      : runLiveMuxing(rtmpUrl, outPath)
+      ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments)
+      : runLiveMuxing(rtmpUrl, outPath, deleteSegments)
 
     logger.info('Running live muxing/transcoding.')
-
     this.transSessions.set(sessionId, ffmpegExec)
 
+    const videoUUID = videoLive.Video.uuid
+    const tsWatcher = chokidar.watch(outPath + '/*.ts')
+
+    const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+
+    const addHandler = segmentPath => {
+      updateSegment(segmentPath)
+
+      if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
+        this.stopSessionOf(videoLive.videoId)
+      }
+
+      if (videoLive.saveReplay === true) {
+        stat(segmentPath)
+          .then(segmentStat => {
+            currentUserLive.size += segmentStat.size
+          })
+          .then(() => this.isQuotaConstraintValid(user, videoLive))
+          .then(quotaValid => {
+            if (quotaValid !== true) {
+              this.stopSessionOf(videoLive.videoId)
+            }
+          })
+          .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
+      }
+    }
+
+    const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
+
+    tsWatcher.on('add', p => addHandler(p))
+    tsWatcher.on('change', p => updateSegment(p))
+    tsWatcher.on('unlink', p => deleteHandler(p))
+
+    const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
+    masterWatcher.on('add', async () => {
+      try {
+        const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
+
+        video.state = VideoState.PUBLISHED
+        await video.save()
+        videoLive.Video = video
+
+        await federateVideoIfNeeded(video, false)
+
+        PeerTubeSocket.Instance.sendVideoLiveNewState(video)
+      } catch (err) {
+        logger.error('Cannot federate video %d.', videoLive.videoId, { err })
+      } finally {
+        masterWatcher.close()
+          .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
+      }
+    })
+
     const onFFmpegEnded = () => {
-      watcher.close()
-        .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err }))
+      logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath)
+
+      Promise.all([ tsWatcher.close(), masterWatcher.close() ])
+        .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
 
-      this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath)
+      this.onEndTransmuxing(videoLive.Video.id)
         .catch(err => logger.error('Error in closed transmuxing.', { err }))
     }
 
@@ -225,44 +317,37 @@ class LiveManager {
     })
 
     ffmpegExec.on('end', () => onFFmpegEnded())
+  }
 
-    const videoUUID = videoLive.Video.uuid
-    const watcher = chokidar.watch(outPath + '/*.ts')
+  getLiveQuotaUsedByUser (userId: number) {
+    const currentLives = this.livesPerUser.get(userId)
+    if (!currentLives) return 0
 
-    const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
-    const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
-
-    watcher.on('add', p => updateHandler(p))
-    watcher.on('change', p => updateHandler(p))
-    watcher.on('unlink', p => deleteHandler(p))
+    return currentLives.reduce((sum, obj) => sum + obj.size, 0)
   }
 
-  private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) {
-    logger.info('RTMP transmuxing for %s ended.', streamPath)
-
-    const files = await readdir(outPath)
+  private async onEndTransmuxing (videoId: number, cleanupNow = false) {
+    try {
+      const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+      if (!fullVideo) return
 
-    for (const filename of files) {
-      if (
-        filename.endsWith('.ts') ||
-        filename.endsWith('.m3u8') ||
-        filename.endsWith('.mpd') ||
-        filename.endsWith('.m4s') ||
-        filename.endsWith('.tmp')
-      ) {
-        const p = join(outPath, filename)
+      JobQueue.Instance.createJob({
+        type: 'video-live-ending',
+        payload: {
+          videoId: fullVideo.id
+        }
+      }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
 
-        remove(p)
-          .catch(err => logger.error('Cannot remove %s.', p, { err }))
-      }
-    }
+      // FIXME: use end
+      fullVideo.state = VideoState.WAITING_FOR_LIVE
+      await fullVideo.save()
 
-    playlist.destroy()
-      .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
+      PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
 
-    video.state = VideoState.LIVE_ENDED
-    video.save()
-      .catch(err => logger.error('Cannot save new video state of live streaming.', { err }))
+      await federateVideoIfNeeded(fullVideo, false)
+    } catch (err) {
+      logger.error('Cannot save/federate new video state of live streaming.', { err })
+    }
   }
 
   private async addSegmentSha (options: SegmentSha256QueueParam) {
@@ -298,6 +383,23 @@ class LiveManager {
     filesMap.delete(segmentName)
   }
 
+  private isDurationConstraintValid (streamingStartTime: number) {
+    const maxDuration = CONFIG.LIVE.MAX_DURATION
+    // No limit
+    if (maxDuration === null) return true
+
+    const now = new Date().getTime()
+    const max = streamingStartTime + maxDuration
+
+    return now <= max
+  }
+
+  private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
+    if (live.saveReplay !== true) return true
+
+    return this.isAbleToUploadVideoWithCache(user.id)
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }