diff options
author | Chocobozzz <me@florianbigard.com> | 2021-01-27 10:51:03 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-01-27 12:05:10 +0100 |
commit | 00b87c5791ecd645bb78cbb9872d60e1f957bdfa (patch) | |
tree | 6110a249014391284f2849ee24f198a32aabf613 /server/lib | |
parent | a4a8cd39712c59119ad0be5cd584b158f06c5852 (diff) | |
download | PeerTube-00b87c5791ecd645bb78cbb9872d60e1f957bdfa.tar.gz PeerTube-00b87c5791ecd645bb78cbb9872d60e1f957bdfa.tar.zst PeerTube-00b87c5791ecd645bb78cbb9872d60e1f957bdfa.zip |
Fix live RAM usage when ffmpeg is too slow
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/live-manager.ts | 64 |
1 files changed, 57 insertions, 7 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index 7f5fdf899..d968f05da 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts | |||
@@ -3,6 +3,7 @@ import * as Bluebird from 'bluebird' | |||
3 | import * as chokidar from 'chokidar' | 3 | import * as chokidar from 'chokidar' |
4 | import { FfmpegCommand } from 'fluent-ffmpeg' | 4 | import { FfmpegCommand } from 'fluent-ffmpeg' |
5 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' | 5 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' |
6 | import { createServer, Server } from 'net' | ||
6 | import { basename, join } from 'path' | 7 | import { basename, join } from 'path' |
7 | import { isTestInstance } from '@server/helpers/core-utils' | 8 | import { isTestInstance } from '@server/helpers/core-utils' |
8 | import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' | 9 | import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' |
@@ -27,8 +28,7 @@ import { getHLSDirectory } from './video-paths' | |||
27 | import { availableEncoders } from './video-transcoding-profiles' | 28 | import { availableEncoders } from './video-transcoding-profiles' |
28 | 29 | ||
29 | import memoizee = require('memoizee') | 30 | import memoizee = require('memoizee') |
30 | 31 | const NodeRtmpSession = require('node-media-server/node_rtmp_session') | |
31 | const NodeRtmpServer = require('node-media-server/node_rtmp_server') | ||
32 | const context = require('node-media-server/node_core_ctx') | 32 | const context = require('node-media-server/node_core_ctx') |
33 | const nodeMediaServerLogger = require('node-media-server/node_core_logger') | 33 | const nodeMediaServerLogger = require('node-media-server/node_core_logger') |
34 | 34 | ||
@@ -63,7 +63,11 @@ class LiveManager { | |||
63 | return isAbleToUploadVideo(userId, 1000) | 63 | return isAbleToUploadVideo(userId, 1000) |
64 | }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) | 64 | }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) |
65 | 65 | ||
66 | private rtmpServer: any | 66 | private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => { |
67 | return this.hasClientSocketsInBadHealth(sessionId) | ||
68 | }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH }) | ||
69 | |||
70 | private rtmpServer: Server | ||
67 | 71 | ||
68 | private constructor () { | 72 | private constructor () { |
69 | } | 73 | } |
@@ -108,19 +112,31 @@ class LiveManager { | |||
108 | run () { | 112 | run () { |
109 | logger.info('Running RTMP server on port %d', config.rtmp.port) | 113 | logger.info('Running RTMP server on port %d', config.rtmp.port) |
110 | 114 | ||
111 | this.rtmpServer = new NodeRtmpServer(config) | 115 | this.rtmpServer = createServer(socket => { |
112 | this.rtmpServer.tcpServer.on('error', err => { | 116 | const session = new NodeRtmpSession(config, socket) |
117 | |||
118 | session.run() | ||
119 | }) | ||
120 | |||
121 | this.rtmpServer.on('error', err => { | ||
113 | logger.error('Cannot run RTMP server.', { err }) | 122 | logger.error('Cannot run RTMP server.', { err }) |
114 | }) | 123 | }) |
115 | 124 | ||
116 | this.rtmpServer.run() | 125 | this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) |
117 | } | 126 | } |
118 | 127 | ||
119 | stop () { | 128 | stop () { |
120 | logger.info('Stopping RTMP server.') | 129 | logger.info('Stopping RTMP server.') |
121 | 130 | ||
122 | this.rtmpServer.stop() | 131 | this.rtmpServer.close() |
123 | this.rtmpServer = undefined | 132 | this.rtmpServer = undefined |
133 | |||
134 | // Sessions is an object | ||
135 | this.getContext().sessions.forEach((session: any) => { | ||
136 | if (session instanceof NodeRtmpSession) { | ||
137 | session.stop() | ||
138 | } | ||
139 | }) | ||
124 | } | 140 | } |
125 | 141 | ||
126 | isRunning () { | 142 | isRunning () { |
@@ -344,11 +360,21 @@ class LiveManager { | |||
344 | 360 | ||
345 | segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] | 361 | segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] |
346 | 362 | ||
363 | if (this.hasClientSocketsInBadHealthWithCache(sessionId)) { | ||
364 | logger.error( | ||
365 | 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + | ||
366 | ' Stopping session of video %s.', videoUUID) | ||
367 | |||
368 | this.stopSessionOf(videoLive.videoId) | ||
369 | return | ||
370 | } | ||
371 | |||
347 | // Duration constraint check | 372 | // Duration constraint check |
348 | if (this.isDurationConstraintValid(startStreamDateTime) !== true) { | 373 | if (this.isDurationConstraintValid(startStreamDateTime) !== true) { |
349 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID) | 374 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID) |
350 | 375 | ||
351 | this.stopSessionOf(videoLive.videoId) | 376 | this.stopSessionOf(videoLive.videoId) |
377 | return | ||
352 | } | 378 | } |
353 | 379 | ||
354 | // Check user quota if the user enabled replay saving | 380 | // Check user quota if the user enabled replay saving |
@@ -517,6 +543,30 @@ class LiveManager { | |||
517 | return now <= max | 543 | return now <= max |
518 | } | 544 | } |
519 | 545 | ||
546 | private hasClientSocketsInBadHealth (sessionId: string) { | ||
547 | const rtmpSession = this.getContext().sessions.get(sessionId) | ||
548 | |||
549 | if (!rtmpSession) { | ||
550 | logger.warn('Cannot get session %s to check players socket health.', sessionId) | ||
551 | return | ||
552 | } | ||
553 | |||
554 | for (const playerSessionId of rtmpSession.players) { | ||
555 | const playerSession = this.getContext().sessions.get(playerSessionId) | ||
556 | |||
557 | if (!playerSession) { | ||
558 | logger.error('Cannot get player session %s to check socket health.', playerSession) | ||
559 | continue | ||
560 | } | ||
561 | |||
562 | if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) { | ||
563 | return true | ||
564 | } | ||
565 | } | ||
566 | |||
567 | return false | ||
568 | } | ||
569 | |||
520 | private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) { | 570 | private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) { |
521 | if (live.saveReplay !== true) return true | 571 | if (live.saveReplay !== true) return true |
522 | 572 | ||