import { AsyncQueue, queue } from 'async'
import * as chokidar from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir, readdir, remove } from 'fs-extra'
-import { basename, join } from 'path'
+import { ensureDir, stat } from 'fs-extra'
+import { basename } from 'path'
import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
import { logger } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { UserModel } from '@server/models/account/user'
+import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models'
+import { MStreamingPlaylist, MUser, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
+import { federateVideoIfNeeded } from './activitypub/videos'
import { buildSha256Segment } from './hls'
+import { JobQueue } from './job-queue'
+import { PeerTubeSocket } from './peertube-socket'
+import { isAbleToUploadVideo } from './user'
import { getHLSDirectory } from './video-paths'
+import memoizee = require('memoizee')
const NodeRtmpServer = require('node-media-server/node_rtmp_server')
const context = require('node-media-server/node_core_ctx')
const nodeMediaServerLogger = require('node-media-server/node_core_logger')
private static instance: LiveManager
private readonly transSessions = new Map<string, FfmpegCommand>()
+ private readonly videoSessions = new Map<number, string>()
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
+ private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
+
+ private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
+ return isAbleToUploadVideo(userId, 1000)
+ }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
private rtmpServer: any
}
init () {
- this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => {
+ const events = this.getContext().nodeEvent
+ events.on('postPublish', (sessionId: string, streamPath: string) => {
logger.debug('RTMP received stream', { id: sessionId, streamPath })
const splittedPath = streamPath.split('/')
.catch(err => logger.error('Cannot handle sessions.', { err }))
})
- this.getContext().nodeEvent.on('donePublish', sessionId => {
+ events.on('donePublish', sessionId => {
this.abortSession(sessionId)
})
return this.segmentsSha256.get(videoUUID)
}
+ stopSessionOf (videoId: number) {
+ const sessionId = this.videoSessions.get(videoId)
+ if (!sessionId) return
+
+ this.abortSession(sessionId)
+
+ this.onEndTransmuxing(videoId, true)
+ .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
+ }
+
private getContext () {
return context
}
}
const video = videoLive.Video
+ if (video.isBlacklisted()) {
+ logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
+ return this.abortSession(sessionId)
+ }
+
+ this.videoSessions.set(video.id, sessionId)
+
const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
const session = this.getContext().sessions.get(sessionId)
type: VideoStreamingPlaylistType.HLS
}, { returning: true }) as [ MStreamingPlaylist, boolean ]
- video.state = VideoState.PUBLISHED
- await video.save()
-
- // FIXME: federation?
-
return this.runMuxing({
sessionId,
videoLive,
originalResolution: number
}) {
const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options
+ const startStreamDateTime = new Date().getTime()
const allResolutions = resolutionsEnabled.concat([ originalResolution ])
+ const user = await UserModel.loadByLiveId(videoLive.id)
+ if (!this.livesPerUser.has(user.id)) {
+ this.livesPerUser.set(user.id, [])
+ }
+
+ const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
+ const livesOfUser = this.livesPerUser.get(user.id)
+ livesOfUser.push(currentUserLive)
+
for (let i = 0; i < allResolutions.length; i++) {
const resolution = allResolutions[i]
const outPath = getHLSDirectory(videoLive.Video)
await ensureDir(outPath)
+ const deleteSegments = videoLive.saveReplay === false
+
const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
- ? runLiveTranscoding(rtmpUrl, outPath, allResolutions)
- : runLiveMuxing(rtmpUrl, outPath)
+ ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments)
+ : runLiveMuxing(rtmpUrl, outPath, deleteSegments)
logger.info('Running live muxing/transcoding.')
-
this.transSessions.set(sessionId, ffmpegExec)
+ const videoUUID = videoLive.Video.uuid
+ const tsWatcher = chokidar.watch(outPath + '/*.ts')
+
+ const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+
+ const addHandler = segmentPath => {
+ updateSegment(segmentPath)
+
+ if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
+ this.stopSessionOf(videoLive.videoId)
+ }
+
+ if (videoLive.saveReplay === true) {
+ stat(segmentPath)
+ .then(segmentStat => {
+ currentUserLive.size += segmentStat.size
+ })
+ .then(() => this.isQuotaConstraintValid(user, videoLive))
+ .then(quotaValid => {
+ if (quotaValid !== true) {
+ this.stopSessionOf(videoLive.videoId)
+ }
+ })
+ .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
+ }
+ }
+
+ const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
+
+ tsWatcher.on('add', p => addHandler(p))
+ tsWatcher.on('change', p => updateSegment(p))
+ tsWatcher.on('unlink', p => deleteHandler(p))
+
+ const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
+ masterWatcher.on('add', async () => {
+ try {
+ const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
+
+ video.state = VideoState.PUBLISHED
+ await video.save()
+ videoLive.Video = video
+
+ await federateVideoIfNeeded(video, false)
+
+ PeerTubeSocket.Instance.sendVideoLiveNewState(video)
+ } catch (err) {
+ logger.error('Cannot federate video %d.', videoLive.videoId, { err })
+ } finally {
+ masterWatcher.close()
+ .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
+ }
+ })
+
const onFFmpegEnded = () => {
- watcher.close()
- .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err }))
+ logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath)
+
+ Promise.all([ tsWatcher.close(), masterWatcher.close() ])
+ .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
- this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath)
+ this.onEndTransmuxing(videoLive.Video.id)
.catch(err => logger.error('Error in closed transmuxing.', { err }))
}
})
ffmpegExec.on('end', () => onFFmpegEnded())
+ }
- const videoUUID = videoLive.Video.uuid
- const watcher = chokidar.watch(outPath + '/*.ts')
+ getLiveQuotaUsedByUser (userId: number) {
+ const currentLives = this.livesPerUser.get(userId)
+ if (!currentLives) return 0
- const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
- const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
-
- watcher.on('add', p => updateHandler(p))
- watcher.on('change', p => updateHandler(p))
- watcher.on('unlink', p => deleteHandler(p))
+ return currentLives.reduce((sum, obj) => sum + obj.size, 0)
}
- private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) {
- logger.info('RTMP transmuxing for %s ended.', streamPath)
-
- const files = await readdir(outPath)
+ private async onEndTransmuxing (videoId: number, cleanupNow = false) {
+ try {
+ const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+ if (!fullVideo) return
- for (const filename of files) {
- if (
- filename.endsWith('.ts') ||
- filename.endsWith('.m3u8') ||
- filename.endsWith('.mpd') ||
- filename.endsWith('.m4s') ||
- filename.endsWith('.tmp')
- ) {
- const p = join(outPath, filename)
+ JobQueue.Instance.createJob({
+ type: 'video-live-ending',
+ payload: {
+ videoId: fullVideo.id
+ }
+ }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
- remove(p)
- .catch(err => logger.error('Cannot remove %s.', p, { err }))
- }
- }
+ // FIXME: use end
+ fullVideo.state = VideoState.WAITING_FOR_LIVE
+ await fullVideo.save()
- playlist.destroy()
- .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
+ PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
- video.state = VideoState.LIVE_ENDED
- video.save()
- .catch(err => logger.error('Cannot save new video state of live streaming.', { err }))
+ await federateVideoIfNeeded(fullVideo, false)
+ } catch (err) {
+ logger.error('Cannot save/federate new video state of live streaming.', { err })
+ }
}
private async addSegmentSha (options: SegmentSha256QueueParam) {
filesMap.delete(segmentName)
}
+ private isDurationConstraintValid (streamingStartTime: number) {
+ const maxDuration = CONFIG.LIVE.MAX_DURATION
+ // No limit
+ if (maxDuration === null) return true
+
+ const now = new Date().getTime()
+ const max = streamingStartTime + maxDuration
+
+ return now <= max
+ }
+
+ private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
+ if (live.saveReplay !== true) return true
+
+ return this.isAbleToUploadVideoWithCache(user.id)
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}