aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/initializers/constants.ts4
-rw-r--r--server/lib/live-manager.ts64
2 files changed, 60 insertions, 8 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index a78a66d78..84a515857 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -644,6 +644,7 @@ const VIDEO_LIVE = {
644 SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist 644 SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist
645 REPLAY_DIRECTORY: 'replay', 645 REPLAY_DIRECTORY: 'replay',
646 EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4, 646 EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4,
647 MAX_SOCKET_WAITING_DATA: 1024 * 1000 * 100, // 100MB
647 RTMP: { 648 RTMP: {
648 CHUNK_SIZE: 60000, 649 CHUNK_SIZE: 60000,
649 GOP_CACHE: true, 650 GOP_CACHE: true,
@@ -656,7 +657,8 @@ const VIDEO_LIVE = {
656const MEMOIZE_TTL = { 657const MEMOIZE_TTL = {
657 OVERVIEWS_SAMPLE: 1000 * 3600 * 4, // 4 hours 658 OVERVIEWS_SAMPLE: 1000 * 3600 * 4, // 4 hours
658 INFO_HASH_EXISTS: 1000 * 3600 * 12, // 12 hours 659 INFO_HASH_EXISTS: 1000 * 3600 * 12, // 12 hours
659 LIVE_ABLE_TO_UPLOAD: 1000 * 60 // 1 minute 660 LIVE_ABLE_TO_UPLOAD: 1000 * 60, // 1 minute
661 LIVE_CHECK_SOCKET_HEALTH: 1000 * 60 // 1 minute
660} 662}
661 663
662const MEMOIZE_LENGTH = { 664const MEMOIZE_LENGTH = {
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index 7f5fdf899..d968f05da 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -3,6 +3,7 @@ import * as Bluebird from 'bluebird'
3import * as chokidar from 'chokidar' 3import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 5import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
6import { createServer, Server } from 'net'
6import { basename, join } from 'path' 7import { basename, join } from 'path'
7import { isTestInstance } from '@server/helpers/core-utils' 8import { isTestInstance } from '@server/helpers/core-utils'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' 9import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
@@ -27,8 +28,7 @@ import { getHLSDirectory } from './video-paths'
27import { availableEncoders } from './video-transcoding-profiles' 28import { availableEncoders } from './video-transcoding-profiles'
28 29
29import memoizee = require('memoizee') 30import memoizee = require('memoizee')
30 31const NodeRtmpSession = require('node-media-server/node_rtmp_session')
31const NodeRtmpServer = require('node-media-server/node_rtmp_server')
32const context = require('node-media-server/node_core_ctx') 32const context = require('node-media-server/node_core_ctx')
33const nodeMediaServerLogger = require('node-media-server/node_core_logger') 33const nodeMediaServerLogger = require('node-media-server/node_core_logger')
34 34
@@ -63,7 +63,11 @@ class LiveManager {
63 return isAbleToUploadVideo(userId, 1000) 63 return isAbleToUploadVideo(userId, 1000)
64 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) 64 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
65 65
66 private rtmpServer: any 66 private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => {
67 return this.hasClientSocketsInBadHealth(sessionId)
68 }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
69
70 private rtmpServer: Server
67 71
68 private constructor () { 72 private constructor () {
69 } 73 }
@@ -108,19 +112,31 @@ class LiveManager {
108 run () { 112 run () {
109 logger.info('Running RTMP server on port %d', config.rtmp.port) 113 logger.info('Running RTMP server on port %d', config.rtmp.port)
110 114
111 this.rtmpServer = new NodeRtmpServer(config) 115 this.rtmpServer = createServer(socket => {
112 this.rtmpServer.tcpServer.on('error', err => { 116 const session = new NodeRtmpSession(config, socket)
117
118 session.run()
119 })
120
121 this.rtmpServer.on('error', err => {
113 logger.error('Cannot run RTMP server.', { err }) 122 logger.error('Cannot run RTMP server.', { err })
114 }) 123 })
115 124
116 this.rtmpServer.run() 125 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
117 } 126 }
118 127
119 stop () { 128 stop () {
120 logger.info('Stopping RTMP server.') 129 logger.info('Stopping RTMP server.')
121 130
122 this.rtmpServer.stop() 131 this.rtmpServer.close()
123 this.rtmpServer = undefined 132 this.rtmpServer = undefined
133
134 // Sessions is an object
135 this.getContext().sessions.forEach((session: any) => {
136 if (session instanceof NodeRtmpSession) {
137 session.stop()
138 }
139 })
124 } 140 }
125 141
126 isRunning () { 142 isRunning () {
@@ -344,11 +360,21 @@ class LiveManager {
344 360
345 segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] 361 segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
346 362
363 if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
364 logger.error(
365 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
366 ' Stopping session of video %s.', videoUUID)
367
368 this.stopSessionOf(videoLive.videoId)
369 return
370 }
371
347 // Duration constraint check 372 // Duration constraint check
348 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 373 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
349 logger.info('Stopping session of %s: max duration exceeded.', videoUUID) 374 logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
350 375
351 this.stopSessionOf(videoLive.videoId) 376 this.stopSessionOf(videoLive.videoId)
377 return
352 } 378 }
353 379
354 // Check user quota if the user enabled replay saving 380 // Check user quota if the user enabled replay saving
@@ -517,6 +543,30 @@ class LiveManager {
517 return now <= max 543 return now <= max
518 } 544 }
519 545
546 private hasClientSocketsInBadHealth (sessionId: string) {
547 const rtmpSession = this.getContext().sessions.get(sessionId)
548
549 if (!rtmpSession) {
550 logger.warn('Cannot get session %s to check players socket health.', sessionId)
551 return
552 }
553
554 for (const playerSessionId of rtmpSession.players) {
555 const playerSession = this.getContext().sessions.get(playerSessionId)
556
557 if (!playerSession) {
558 logger.error('Cannot get player session %s to check socket health.', playerSession)
559 continue
560 }
561
562 if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
563 return true
564 }
565 }
566
567 return false
568 }
569
520 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) { 570 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
521 if (live.saveReplay !== true) return true 571 if (live.saveReplay !== true) return true
522 572