aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live-manager.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/live-manager.ts')
-rw-r--r--server/lib/live-manager.ts89
1 files changed, 76 insertions, 13 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index 41176d197..3ff2434ff 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -2,24 +2,27 @@
2import { AsyncQueue, queue } from 'async' 2import { AsyncQueue, queue } from 'async'
3import * as chokidar from 'chokidar' 3import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { ensureDir } from 'fs-extra' 5import { ensureDir, stat } from 'fs-extra'
6import { basename } from 'path' 6import { basename } from 'path'
7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' 7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
8import { logger } from '@server/helpers/logger' 8import { logger } from '@server/helpers/logger'
9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
10import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
11import { UserModel } from '@server/models/account/user'
11import { VideoModel } from '@server/models/video/video' 12import { VideoModel } from '@server/models/video/video'
12import { VideoFileModel } from '@server/models/video/video-file' 13import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoLiveModel } from '@server/models/video/video-live' 14import { VideoLiveModel } from '@server/models/video/video-live'
14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 15import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
15import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models' 16import { MStreamingPlaylist, MUser, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
16import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 17import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
17import { federateVideoIfNeeded } from './activitypub/videos' 18import { federateVideoIfNeeded } from './activitypub/videos'
18import { buildSha256Segment } from './hls' 19import { buildSha256Segment } from './hls'
19import { JobQueue } from './job-queue' 20import { JobQueue } from './job-queue'
20import { PeerTubeSocket } from './peertube-socket' 21import { PeerTubeSocket } from './peertube-socket'
22import { isAbleToUploadVideo } from './user'
21import { getHLSDirectory } from './video-paths' 23import { getHLSDirectory } from './video-paths'
22 24
25import memoizee = require('memoizee')
23const NodeRtmpServer = require('node-media-server/node_rtmp_server') 26const NodeRtmpServer = require('node-media-server/node_rtmp_server')
24const context = require('node-media-server/node_core_ctx') 27const context = require('node-media-server/node_core_ctx')
25const nodeMediaServerLogger = require('node-media-server/node_core_logger') 28const nodeMediaServerLogger = require('node-media-server/node_core_logger')
@@ -53,6 +56,11 @@ class LiveManager {
53 private readonly transSessions = new Map<string, FfmpegCommand>() 56 private readonly transSessions = new Map<string, FfmpegCommand>()
54 private readonly videoSessions = new Map<number, string>() 57 private readonly videoSessions = new Map<number, string>()
55 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 58 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
59 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
60
61 private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
62 return isAbleToUploadVideo(userId, 1000)
63 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
56 64
57 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> 65 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
58 private rtmpServer: any 66 private rtmpServer: any
@@ -127,7 +135,7 @@ class LiveManager {
127 135
128 this.abortSession(sessionId) 136 this.abortSession(sessionId)
129 137
130 this.onEndTransmuxing(videoId) 138 this.onEndTransmuxing(videoId, true)
131 .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) 139 .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
132 } 140 }
133 141
@@ -196,8 +204,18 @@ class LiveManager {
196 originalResolution: number 204 originalResolution: number
197 }) { 205 }) {
198 const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options 206 const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options
207 const startStreamDateTime = new Date().getTime()
199 const allResolutions = resolutionsEnabled.concat([ originalResolution ]) 208 const allResolutions = resolutionsEnabled.concat([ originalResolution ])
200 209
210 const user = await UserModel.loadByLiveId(videoLive.id)
211 if (!this.livesPerUser.has(user.id)) {
212 this.livesPerUser.set(user.id, [])
213 }
214
215 const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
216 const livesOfUser = this.livesPerUser.get(user.id)
217 livesOfUser.push(currentUserLive)
218
201 for (let i = 0; i < allResolutions.length; i++) { 219 for (let i = 0; i < allResolutions.length; i++) {
202 const resolution = allResolutions[i] 220 const resolution = allResolutions[i]
203 221
@@ -216,26 +234,47 @@ class LiveManager {
216 const outPath = getHLSDirectory(videoLive.Video) 234 const outPath = getHLSDirectory(videoLive.Video)
217 await ensureDir(outPath) 235 await ensureDir(outPath)
218 236
237 const deleteSegments = videoLive.saveReplay === false
238
219 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath 239 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
220 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED 240 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
221 ? runLiveTranscoding(rtmpUrl, outPath, allResolutions) 241 ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments)
222 : runLiveMuxing(rtmpUrl, outPath) 242 : runLiveMuxing(rtmpUrl, outPath, deleteSegments)
223 243
224 logger.info('Running live muxing/transcoding.') 244 logger.info('Running live muxing/transcoding.')
225
226 this.transSessions.set(sessionId, ffmpegExec) 245 this.transSessions.set(sessionId, ffmpegExec)
227 246
228 const videoUUID = videoLive.Video.uuid 247 const videoUUID = videoLive.Video.uuid
229 const tsWatcher = chokidar.watch(outPath + '/*.ts') 248 const tsWatcher = chokidar.watch(outPath + '/*.ts')
230 249
231 const updateHandler = segmentPath => { 250 const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
232 this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) 251
252 const addHandler = segmentPath => {
253 updateSegment(segmentPath)
254
255 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
256 this.stopSessionOf(videoLive.videoId)
257 }
258
259 if (videoLive.saveReplay === true) {
260 stat(segmentPath)
261 .then(segmentStat => {
262 currentUserLive.size += segmentStat.size
263 })
264 .then(() => this.isQuotaConstraintValid(user, videoLive))
265 .then(quotaValid => {
266 if (quotaValid !== true) {
267 this.stopSessionOf(videoLive.videoId)
268 }
269 })
270 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
271 }
233 } 272 }
234 273
235 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) 274 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
236 275
237 tsWatcher.on('add', p => updateHandler(p)) 276 tsWatcher.on('add', p => addHandler(p))
238 tsWatcher.on('change', p => updateHandler(p)) 277 tsWatcher.on('change', p => updateSegment(p))
239 tsWatcher.on('unlink', p => deleteHandler(p)) 278 tsWatcher.on('unlink', p => deleteHandler(p))
240 279
241 const masterWatcher = chokidar.watch(outPath + '/master.m3u8') 280 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
@@ -280,7 +319,14 @@ class LiveManager {
280 ffmpegExec.on('end', () => onFFmpegEnded()) 319 ffmpegExec.on('end', () => onFFmpegEnded())
281 } 320 }
282 321
283 private async onEndTransmuxing (videoId: number) { 322 getLiveQuotaUsedByUser (userId: number) {
323 const currentLives = this.livesPerUser.get(userId)
324 if (!currentLives) return 0
325
326 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
327 }
328
329 private async onEndTransmuxing (videoId: number, cleanupNow = false) {
284 try { 330 try {
285 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 331 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
286 if (!fullVideo) return 332 if (!fullVideo) return
@@ -290,7 +336,7 @@ class LiveManager {
290 payload: { 336 payload: {
291 videoId: fullVideo.id 337 videoId: fullVideo.id
292 } 338 }
293 }, { delay: VIDEO_LIVE.CLEANUP_DELAY }) 339 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
294 340
295 // FIXME: use end 341 // FIXME: use end
296 fullVideo.state = VideoState.WAITING_FOR_LIVE 342 fullVideo.state = VideoState.WAITING_FOR_LIVE
@@ -337,6 +383,23 @@ class LiveManager {
337 filesMap.delete(segmentName) 383 filesMap.delete(segmentName)
338 } 384 }
339 385
386 private isDurationConstraintValid (streamingStartTime: number) {
387 const maxDuration = CONFIG.LIVE.MAX_DURATION
388 // No limit
389 if (maxDuration === null) return true
390
391 const now = new Date().getTime()
392 const max = streamingStartTime + maxDuration
393
394 return now <= max
395 }
396
397 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
398 if (live.saveReplay !== true) return true
399
400 return this.isAbleToUploadVideoWithCache(user.id)
401 }
402
340 static get Instance () { 403 static get Instance () {
341 return this.instance || (this.instance = new this()) 404 return this.instance || (this.instance = new this())
342 } 405 }