import { AsyncQueue, queue } from 'async'
import * as chokidar from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir } from 'fs-extra'
+import { ensureDir, stat } from 'fs-extra'
import { basename } from 'path'
import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
import { logger } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { UserModel } from '@server/models/account/user'
import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models'
+import { MStreamingPlaylist, MUser, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from './activitypub/videos'
import { buildSha256Segment } from './hls'
import { JobQueue } from './job-queue'
import { PeerTubeSocket } from './peertube-socket'
+import { isAbleToUploadVideo } from './user'
import { getHLSDirectory } from './video-paths'
+import memoizee = require('memoizee')
const NodeRtmpServer = require('node-media-server/node_rtmp_server')
const context = require('node-media-server/node_core_ctx')
const nodeMediaServerLogger = require('node-media-server/node_core_logger')
private readonly transSessions = new Map<string, FfmpegCommand>()
private readonly videoSessions = new Map<number, string>()
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
+ private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
+
+ private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
+ return isAbleToUploadVideo(userId, 1000)
+ }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
private rtmpServer: any
this.abortSession(sessionId)
- this.onEndTransmuxing(videoId)
+ this.onEndTransmuxing(videoId, true)
.catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
}
originalResolution: number
}) {
const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options
+ const startStreamDateTime = new Date().getTime()
const allResolutions = resolutionsEnabled.concat([ originalResolution ])
+ const user = await UserModel.loadByLiveId(videoLive.id)
+ if (!this.livesPerUser.has(user.id)) {
+ this.livesPerUser.set(user.id, [])
+ }
+
+ const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
+ const livesOfUser = this.livesPerUser.get(user.id)
+ livesOfUser.push(currentUserLive)
+
for (let i = 0; i < allResolutions.length; i++) {
const resolution = allResolutions[i]
const outPath = getHLSDirectory(videoLive.Video)
await ensureDir(outPath)
+ const deleteSegments = videoLive.saveReplay === false
+
const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
- ? runLiveTranscoding(rtmpUrl, outPath, allResolutions)
- : runLiveMuxing(rtmpUrl, outPath)
+ ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments)
+ : runLiveMuxing(rtmpUrl, outPath, deleteSegments)
logger.info('Running live muxing/transcoding.')
-
this.transSessions.set(sessionId, ffmpegExec)
const videoUUID = videoLive.Video.uuid
const tsWatcher = chokidar.watch(outPath + '/*.ts')
- const updateHandler = segmentPath => {
- this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+ const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+
+ const addHandler = segmentPath => {
+ updateSegment(segmentPath)
+
+ if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
+ this.stopSessionOf(videoLive.videoId)
+ }
+
+ if (videoLive.saveReplay === true) {
+ stat(segmentPath)
+ .then(segmentStat => {
+ currentUserLive.size += segmentStat.size
+ })
+ .then(() => this.isQuotaConstraintValid(user, videoLive))
+ .then(quotaValid => {
+ if (quotaValid !== true) {
+ this.stopSessionOf(videoLive.videoId)
+ }
+ })
+ .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
+ }
}
const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
- tsWatcher.on('add', p => updateHandler(p))
- tsWatcher.on('change', p => updateHandler(p))
+ tsWatcher.on('add', p => addHandler(p))
+ tsWatcher.on('change', p => updateSegment(p))
tsWatcher.on('unlink', p => deleteHandler(p))
const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
ffmpegExec.on('end', () => onFFmpegEnded())
}
- private async onEndTransmuxing (videoId: number) {
+ getLiveQuotaUsedByUser (userId: number) {
+ const currentLives = this.livesPerUser.get(userId)
+ if (!currentLives) return 0
+
+ return currentLives.reduce((sum, obj) => sum + obj.size, 0)
+ }
+
+ private async onEndTransmuxing (videoId: number, cleanupNow = false) {
try {
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
if (!fullVideo) return
payload: {
videoId: fullVideo.id
}
- }, { delay: VIDEO_LIVE.CLEANUP_DELAY })
+ }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
// FIXME: use end
fullVideo.state = VideoState.WAITING_FOR_LIVE
filesMap.delete(segmentName)
}
+ private isDurationConstraintValid (streamingStartTime: number) {
+ const maxDuration = CONFIG.LIVE.MAX_DURATION
+ // No limit
+ if (maxDuration === null) return true
+
+ const now = new Date().getTime()
+ const max = streamingStartTime + maxDuration
+
+ return now <= max
+ }
+
+ private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
+ if (live.saveReplay !== true) return true
+
+ return this.isAbleToUploadVideoWithCache(user.id)
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}