2 import * as Bluebird from 'bluebird'
3 import * as chokidar from 'chokidar'
4 import { FfmpegCommand } from 'fluent-ffmpeg'
5 import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
6 import { createServer, Server } from 'net'
7 import { basename, join } from 'path'
8 import { isTestInstance } from '@server/helpers/core-utils'
9 import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
10 import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
11 import { logger } from '@server/helpers/logger'
12 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
13 import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
14 import { UserModel } from '@server/models/account/user'
15 import { VideoModel } from '@server/models/video/video'
16 import { VideoFileModel } from '@server/models/video/video-file'
17 import { VideoLiveModel } from '@server/models/video/video-live'
18 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
19 import { MStreamingPlaylist, MStreamingPlaylistVideo, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
20 import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
21 import { federateVideoIfNeeded } from './activitypub/videos'
22 import { buildSha256Segment } from './hls'
23 import { JobQueue } from './job-queue'
24 import { cleanupLive } from './job-queue/handlers/video-live-ending'
25 import { PeerTubeSocket } from './peertube-socket'
26 import { isAbleToUploadVideo } from './user'
27 import { getHLSDirectory } from './video-paths'
28 import { VideoTranscodingProfilesManager } from './video-transcoding-profiles'
30 import memoizee = require('memoizee')
31 const NodeRtmpSession = require('node-media-server/node_rtmp_session')
32 const context = require('node-media-server/node_core_ctx')
33 const nodeMediaServerLogger = require('node-media-server/node_core_logger')
35 // Disable node media server logs
36 nodeMediaServerLogger.setLogType(0)
40 port: CONFIG.LIVE.RTMP.PORT,
41 chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
42 gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
43 ping: VIDEO_LIVE.RTMP.PING,
44 ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
53 private static instance: LiveManager
55 private readonly transSessions = new Map<string, FfmpegCommand>()
56 private readonly videoSessions = new Map<number, string>()
57 // Values are Date().getTime()
58 private readonly watchersPerVideo = new Map<number, number[]>()
59 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
60 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
62 private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
63 return isAbleToUploadVideo(userId, 1000)
64 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
66 private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => {
67 return this.hasClientSocketsInBadHealth(sessionId)
68 }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
70 private rtmpServer: Server
72 private constructor () {
76 const events = this.getContext().nodeEvent
77 events.on('postPublish', (sessionId: string, streamPath: string) => {
78 logger.debug('RTMP received stream', { id: sessionId, streamPath })
80 const splittedPath = streamPath.split('/')
81 if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
82 logger.warn('Live path is incorrect.', { streamPath })
83 return this.abortSession(sessionId)
86 this.handleSession(sessionId, streamPath, splittedPath[2])
87 .catch(err => logger.error('Cannot handle sessions.', { err }))
90 events.on('donePublish', sessionId => {
91 logger.info('Live session ended.', { sessionId })
94 registerConfigChangedHandler(() => {
95 if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
100 if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) {
105 // Cleanup broken lives, that were terminated by a server restart for example
106 this.handleBrokenLives()
107 .catch(err => logger.error('Cannot handle broken lives.', { err }))
109 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
113 logger.info('Running RTMP server on port %d', config.rtmp.port)
115 this.rtmpServer = createServer(socket => {
116 const session = new NodeRtmpSession(config, socket)
121 this.rtmpServer.on('error', err => {
122 logger.error('Cannot run RTMP server.', { err })
125 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
129 logger.info('Stopping RTMP server.')
131 this.rtmpServer.close()
132 this.rtmpServer = undefined
134 // Sessions is an object
135 this.getContext().sessions.forEach((session: any) => {
136 if (session instanceof NodeRtmpSession) {
143 return !!this.rtmpServer
146 getSegmentsSha256 (videoUUID: string) {
147 return this.segmentsSha256.get(videoUUID)
150 stopSessionOf (videoId: number) {
151 const sessionId = this.videoSessions.get(videoId)
152 if (!sessionId) return
154 this.videoSessions.delete(videoId)
155 this.abortSession(sessionId)
158 getLiveQuotaUsedByUser (userId: number) {
159 const currentLives = this.livesPerUser.get(userId)
160 if (!currentLives) return 0
162 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
165 addViewTo (videoId: number) {
166 if (this.videoSessions.has(videoId) === false) return
168 let watchers = this.watchersPerVideo.get(videoId)
172 this.watchersPerVideo.set(videoId, watchers)
175 watchers.push(new Date().getTime())
178 cleanupShaSegments (videoUUID: string) {
179 this.segmentsSha256.delete(videoUUID)
182 addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
183 const segmentName = basename(segmentPath)
184 const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, this.buildConcatenatedName(segmentName))
186 return readFile(segmentPath)
187 .then(data => appendFile(dest, data))
188 .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err }))
191 buildConcatenatedName (segmentOrPlaylistPath: string) {
192 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
194 return 'concat-' + num[1] + '.ts'
197 private processSegments (hlsVideoPath: string, videoUUID: string, videoLive: MVideoLive, segmentPaths: string[]) {
198 Bluebird.mapSeries(segmentPaths, async previousSegment => {
199 // Add sha hash of previous segments, because ffmpeg should have finished generating them
200 await this.addSegmentSha(videoUUID, previousSegment)
202 if (videoLive.saveReplay) {
203 await this.addSegmentToReplay(hlsVideoPath, previousSegment)
205 }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err }))
208 private getContext () {
212 private abortSession (id: string) {
213 const session = this.getContext().sessions.get(id)
216 this.getContext().sessions.delete(id)
219 const transSession = this.transSessions.get(id)
221 transSession.kill('SIGINT')
222 this.transSessions.delete(id)
226 private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
227 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
229 logger.warn('Unknown live video with stream key %s.', streamKey)
230 return this.abortSession(sessionId)
233 const video = videoLive.Video
234 if (video.isBlacklisted()) {
235 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
236 return this.abortSession(sessionId)
239 // Cleanup old potential live files (could happen with a permanent live)
240 this.cleanupShaSegments(video.uuid)
242 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
243 if (oldStreamingPlaylist) {
244 await cleanupLive(video, oldStreamingPlaylist)
247 this.videoSessions.set(video.id, sessionId)
249 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
251 const session = this.getContext().sessions.get(sessionId)
252 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
254 const [ resolutionResult, fps ] = await Promise.all([
255 getVideoFileResolution(rtmpUrl),
256 getVideoFileFPS(rtmpUrl)
259 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
260 ? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live')
263 const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
265 logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions })
267 const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
270 segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive),
271 p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions),
272 p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
274 type: VideoStreamingPlaylistType.HLS
275 }, { returning: true }) as [ MStreamingPlaylist, boolean ]
277 return this.runMuxing({
280 playlist: Object.assign(videoStreamingPlaylist, { Video: video }),
287 private async runMuxing (options: {
289 videoLive: MVideoLiveVideo
290 playlist: MStreamingPlaylistVideo
293 allResolutions: number[]
295 const { sessionId, videoLive, playlist, allResolutions, fps, rtmpUrl } = options
296 const startStreamDateTime = new Date().getTime()
298 const user = await UserModel.loadByLiveId(videoLive.id)
299 if (!this.livesPerUser.has(user.id)) {
300 this.livesPerUser.set(user.id, [])
303 const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
304 const livesOfUser = this.livesPerUser.get(user.id)
305 livesOfUser.push(currentUserLive)
307 for (let i = 0; i < allResolutions.length; i++) {
308 const resolution = allResolutions[i]
310 const file = new VideoFileModel({
316 videoStreamingPlaylistId: playlist.id
319 VideoFileModel.customUpsert(file, 'streaming-playlist', null)
320 .catch(err => logger.error('Cannot create file for live streaming.', { err }))
323 const outPath = getHLSDirectory(videoLive.Video)
324 await ensureDir(outPath)
326 const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
328 if (videoLive.saveReplay === true) {
329 await ensureDir(replayDirectory)
332 const videoUUID = videoLive.Video.uuid
334 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
335 ? await getLiveTranscodingCommand({
338 resolutions: allResolutions,
340 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
341 profile: CONFIG.LIVE.TRANSCODING.PROFILE
343 : getLiveMuxingCommand(rtmpUrl, outPath)
345 logger.info('Running live muxing/transcoding for %s.', videoUUID)
346 this.transSessions.set(sessionId, ffmpegExec)
348 const tsWatcher = chokidar.watch(outPath + '/*.ts')
350 const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
351 const playlistIdMatcher = /^([\d+])-/
353 const addHandler = segmentPath => {
354 logger.debug('Live add handler of %s.', segmentPath)
356 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
358 const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
359 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcess)
361 segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
363 if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
365 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
366 ' Stopping session of video %s.', videoUUID)
368 this.stopSessionOf(videoLive.videoId)
372 // Duration constraint check
373 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
374 logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
376 this.stopSessionOf(videoLive.videoId)
380 // Check user quota if the user enabled replay saving
381 if (videoLive.saveReplay === true) {
383 .then(segmentStat => {
384 currentUserLive.size += segmentStat.size
386 .then(() => this.isQuotaConstraintValid(user, videoLive))
387 .then(quotaValid => {
388 if (quotaValid !== true) {
389 logger.info('Stopping session of %s: user quota exceeded.', videoUUID)
391 this.stopSessionOf(videoLive.videoId)
394 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
398 const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
400 tsWatcher.on('add', p => addHandler(p))
401 tsWatcher.on('unlink', p => deleteHandler(p))
403 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
404 masterWatcher.on('add', async () => {
406 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
408 video.state = VideoState.PUBLISHED
410 videoLive.Video = video
413 federateVideoIfNeeded(video, false)
414 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err }))
416 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
417 }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
420 logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err })
422 masterWatcher.close()
423 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
427 const onFFmpegEnded = () => {
428 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
430 this.transSessions.delete(sessionId)
432 this.watchersPerVideo.delete(videoLive.videoId)
433 this.videoSessions.delete(videoLive.videoId)
435 const newLivesPerUser = this.livesPerUser.get(user.id)
436 .filter(o => o.liveId !== videoLive.id)
437 this.livesPerUser.set(user.id, newLivesPerUser)
440 // Wait latest segments generation, and close watchers
442 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
444 // Process remaining segments hash
445 for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
446 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
449 .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
451 this.onEndTransmuxing(videoLive.Video.id)
452 .catch(err => logger.error('Error in closed transmuxing.', { err }))
456 ffmpegExec.on('error', (err, stdout, stderr) => {
459 // Don't care that we killed the ffmpeg process
460 if (err?.message?.includes('Exiting normally')) return
462 logger.error('Live transcoding error.', { err, stdout, stderr })
464 this.abortSession(sessionId)
467 ffmpegExec.on('end', () => onFFmpegEnded())
472 private async onEndTransmuxing (videoId: number, cleanupNow = false) {
474 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
475 if (!fullVideo) return
477 const live = await VideoLiveModel.loadByVideoId(videoId)
479 if (!live.permanentLive) {
480 JobQueue.Instance.createJob({
481 type: 'video-live-ending',
483 videoId: fullVideo.id
485 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
487 fullVideo.state = VideoState.LIVE_ENDED
489 fullVideo.state = VideoState.WAITING_FOR_LIVE
492 await fullVideo.save()
494 PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
496 await federateVideoIfNeeded(fullVideo, false)
498 logger.error('Cannot save/federate new video state of live streaming of video id %d.', videoId, { err })
502 private async addSegmentSha (videoUUID: string, segmentPath: string) {
503 const segmentName = basename(segmentPath)
504 logger.debug('Adding live sha segment %s.', segmentPath)
506 const shaResult = await buildSha256Segment(segmentPath)
508 if (!this.segmentsSha256.has(videoUUID)) {
509 this.segmentsSha256.set(videoUUID, new Map())
512 const filesMap = this.segmentsSha256.get(videoUUID)
513 filesMap.set(segmentName, shaResult)
516 private removeSegmentSha (videoUUID: string, segmentPath: string) {
517 const segmentName = basename(segmentPath)
519 logger.debug('Removing live sha segment %s.', segmentPath)
521 const filesMap = this.segmentsSha256.get(videoUUID)
523 logger.warn('Unknown files map to remove sha for %s.', videoUUID)
527 if (!filesMap.has(segmentName)) {
528 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
532 filesMap.delete(segmentName)
535 private isDurationConstraintValid (streamingStartTime: number) {
536 const maxDuration = CONFIG.LIVE.MAX_DURATION
538 if (maxDuration < 0) return true
540 const now = new Date().getTime()
541 const max = streamingStartTime + maxDuration
546 private hasClientSocketsInBadHealth (sessionId: string) {
547 const rtmpSession = this.getContext().sessions.get(sessionId)
550 logger.warn('Cannot get session %s to check players socket health.', sessionId)
554 for (const playerSessionId of rtmpSession.players) {
555 const playerSession = this.getContext().sessions.get(playerSessionId)
557 if (!playerSession) {
558 logger.error('Cannot get player session %s to check socket health.', playerSession)
562 if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
570 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
571 if (live.saveReplay !== true) return true
573 return this.isAbleToUploadVideoWithCache(user.id)
576 private async updateLiveViews () {
577 if (!this.isRunning()) return
579 if (!isTestInstance()) logger.info('Updating live video views.')
581 for (const videoId of this.watchersPerVideo.keys()) {
582 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
584 const watchers = this.watchersPerVideo.get(videoId)
586 const numWatchers = watchers.length
588 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
589 video.views = numWatchers
592 await federateVideoIfNeeded(video, false)
594 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
596 // Only keep not expired watchers
597 const newWatchers = watchers.filter(w => w > notBefore)
598 this.watchersPerVideo.set(videoId, newWatchers)
600 logger.debug('New live video views for %s is %d.', video.url, numWatchers)
604 private async handleBrokenLives () {
605 const videoIds = await VideoModel.listPublishedLiveIds()
607 for (const id of videoIds) {
608 await this.onEndTransmuxing(id, true)
612 static get Instance () {
613 return this.instance || (this.instance = new this())
617 // ---------------------------------------------------------------------------