]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/live-manager.ts
Check live duration and size
[github/Chocobozzz/PeerTube.git] / server / lib / live-manager.ts
index 41176d19742b64b5e26a5cefb0323e638dddd963..3ff2434ff4b9e5507d25a41e24c7b55ecefc68c1 100644 (file)
@@ -2,24 +2,27 @@
 import { AsyncQueue, queue } from 'async'
 import * as chokidar from 'chokidar'
 import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir } from 'fs-extra'
+import { ensureDir, stat } from 'fs-extra'
 import { basename } from 'path'
 import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
 import { logger } from '@server/helpers/logger'
 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { UserModel } from '@server/models/account/user'
 import { VideoModel } from '@server/models/video/video'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { VideoLiveModel } from '@server/models/video/video-live'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models'
+import { MStreamingPlaylist, MUser, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
 import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
 import { federateVideoIfNeeded } from './activitypub/videos'
 import { buildSha256Segment } from './hls'
 import { JobQueue } from './job-queue'
 import { PeerTubeSocket } from './peertube-socket'
+import { isAbleToUploadVideo } from './user'
 import { getHLSDirectory } from './video-paths'
 
+import memoizee = require('memoizee')
 const NodeRtmpServer = require('node-media-server/node_rtmp_server')
 const context = require('node-media-server/node_core_ctx')
 const nodeMediaServerLogger = require('node-media-server/node_core_logger')
@@ -53,6 +56,11 @@ class LiveManager {
   private readonly transSessions = new Map<string, FfmpegCommand>()
   private readonly videoSessions = new Map<number, string>()
   private readonly segmentsSha256 = new Map<string, Map<string, string>>()
+  private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
+
+  private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
+    return isAbleToUploadVideo(userId, 1000)
+  }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
 
   private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
   private rtmpServer: any
@@ -127,7 +135,7 @@ class LiveManager {
 
     this.abortSession(sessionId)
 
-    this.onEndTransmuxing(videoId)
+    this.onEndTransmuxing(videoId, true)
       .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
   }
 
@@ -196,8 +204,18 @@ class LiveManager {
     originalResolution: number
   }) {
     const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options
+    const startStreamDateTime = new Date().getTime()
     const allResolutions = resolutionsEnabled.concat([ originalResolution ])
 
+    const user = await UserModel.loadByLiveId(videoLive.id)
+    if (!this.livesPerUser.has(user.id)) {
+      this.livesPerUser.set(user.id, [])
+    }
+
+    const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
+    const livesOfUser = this.livesPerUser.get(user.id)
+    livesOfUser.push(currentUserLive)
+
     for (let i = 0; i < allResolutions.length; i++) {
       const resolution = allResolutions[i]
 
@@ -216,26 +234,47 @@ class LiveManager {
     const outPath = getHLSDirectory(videoLive.Video)
     await ensureDir(outPath)
 
+    const deleteSegments = videoLive.saveReplay === false
+
     const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
     const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
-      ? runLiveTranscoding(rtmpUrl, outPath, allResolutions)
-      : runLiveMuxing(rtmpUrl, outPath)
+      ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments)
+      : runLiveMuxing(rtmpUrl, outPath, deleteSegments)
 
     logger.info('Running live muxing/transcoding.')
-
     this.transSessions.set(sessionId, ffmpegExec)
 
     const videoUUID = videoLive.Video.uuid
     const tsWatcher = chokidar.watch(outPath + '/*.ts')
 
-    const updateHandler = segmentPath => {
-      this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+    const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+
+    const addHandler = segmentPath => {
+      updateSegment(segmentPath)
+
+      if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
+        this.stopSessionOf(videoLive.videoId)
+      }
+
+      if (videoLive.saveReplay === true) {
+        stat(segmentPath)
+          .then(segmentStat => {
+            currentUserLive.size += segmentStat.size
+          })
+          .then(() => this.isQuotaConstraintValid(user, videoLive))
+          .then(quotaValid => {
+            if (quotaValid !== true) {
+              this.stopSessionOf(videoLive.videoId)
+            }
+          })
+          .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
+      }
     }
 
     const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
 
-    tsWatcher.on('add', p => updateHandler(p))
-    tsWatcher.on('change', p => updateHandler(p))
+    tsWatcher.on('add', p => addHandler(p))
+    tsWatcher.on('change', p => updateSegment(p))
     tsWatcher.on('unlink', p => deleteHandler(p))
 
     const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
@@ -280,7 +319,14 @@ class LiveManager {
     ffmpegExec.on('end', () => onFFmpegEnded())
   }
 
-  private async onEndTransmuxing (videoId: number) {
+  getLiveQuotaUsedByUser (userId: number) {
+    const currentLives = this.livesPerUser.get(userId)
+    if (!currentLives) return 0
+
+    return currentLives.reduce((sum, obj) => sum + obj.size, 0)
+  }
+
+  private async onEndTransmuxing (videoId: number, cleanupNow = false) {
     try {
       const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
       if (!fullVideo) return
@@ -290,7 +336,7 @@ class LiveManager {
         payload: {
           videoId: fullVideo.id
         }
-      }, { delay: VIDEO_LIVE.CLEANUP_DELAY })
+      }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
 
       // FIXME: use end
       fullVideo.state = VideoState.WAITING_FOR_LIVE
@@ -337,6 +383,23 @@ class LiveManager {
     filesMap.delete(segmentName)
   }
 
+  private isDurationConstraintValid (streamingStartTime: number) {
+    const maxDuration = CONFIG.LIVE.MAX_DURATION
+    // No limit
+    if (maxDuration === null) return true
+
+    const now = new Date().getTime()
+    const max = streamingStartTime + maxDuration
+
+    return now <= max
+  }
+
+  private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
+    if (live.saveReplay !== true) return true
+
+    return this.isAbleToUploadVideoWithCache(user.id)
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }