]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/live-manager.ts
Merge branch 'release/3.1.0' into develop
[github/Chocobozzz/PeerTube.git] / server / lib / live-manager.ts
CommitLineData
c6c0fa6c 1
e772bdf1 2import * as Bluebird from 'bluebird'
c6c0fa6c
C
3import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg'
e772bdf1 5import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
00b87c57 6import { createServer, Server } from 'net'
937581b8 7import { basename, join } from 'path'
c655c9ef 8import { isTestInstance } from '@server/helpers/core-utils'
9252a33d 9import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
daf6e480 10import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
c6c0fa6c
C
11import { logger } from '@server/helpers/logger'
12import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
e4bf7856 13import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
fb719404 14import { UserModel } from '@server/models/account/user'
a5cf76af 15import { VideoModel } from '@server/models/video/video'
c6c0fa6c
C
16import { VideoFileModel } from '@server/models/video/video-file'
17import { VideoLiveModel } from '@server/models/video/video-live'
18import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
90a8bd30 19import { MStreamingPlaylist, MStreamingPlaylistVideo, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
c6c0fa6c 20import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
a5cf76af 21import { federateVideoIfNeeded } from './activitypub/videos'
c6c0fa6c 22import { buildSha256Segment } from './hls'
a5cf76af 23import { JobQueue } from './job-queue'
bb4ba6d9 24import { cleanupLive } from './job-queue/handlers/video-live-ending'
a5cf76af 25import { PeerTubeSocket } from './peertube-socket'
fb719404 26import { isAbleToUploadVideo } from './user'
c6c0fa6c 27import { getHLSDirectory } from './video-paths'
529b3752 28import { VideoTranscodingProfilesManager } from './video-transcoding-profiles'
c6c0fa6c 29
fb719404 30import memoizee = require('memoizee')
00b87c57 31const NodeRtmpSession = require('node-media-server/node_rtmp_session')
c6c0fa6c
C
32const context = require('node-media-server/node_core_ctx')
33const nodeMediaServerLogger = require('node-media-server/node_core_logger')
34
35// Disable node media server logs
36nodeMediaServerLogger.setLogType(0)
37
38const config = {
39 rtmp: {
40 port: CONFIG.LIVE.RTMP.PORT,
41 chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
42 gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
43 ping: VIDEO_LIVE.RTMP.PING,
44 ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
45 },
46 transcoding: {
47 ffmpeg: 'ffmpeg'
48 }
49}
50
c6c0fa6c
C
51class LiveManager {
52
53 private static instance: LiveManager
54
55 private readonly transSessions = new Map<string, FfmpegCommand>()
a5cf76af 56 private readonly videoSessions = new Map<number, string>()
e4bf7856
C
57 // Values are Date().getTime()
58 private readonly watchersPerVideo = new Map<number, number[]>()
c6c0fa6c 59 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
fb719404
C
60 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
61
62 private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
63 return isAbleToUploadVideo(userId, 1000)
64 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
c6c0fa6c 65
00b87c57
C
66 private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => {
67 return this.hasClientSocketsInBadHealth(sessionId)
68 }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
69
70 private rtmpServer: Server
c6c0fa6c
C
71
72 private constructor () {
73 }
74
75 init () {
a5cf76af
C
76 const events = this.getContext().nodeEvent
77 events.on('postPublish', (sessionId: string, streamPath: string) => {
c6c0fa6c
C
78 logger.debug('RTMP received stream', { id: sessionId, streamPath })
79
80 const splittedPath = streamPath.split('/')
81 if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
82 logger.warn('Live path is incorrect.', { streamPath })
83 return this.abortSession(sessionId)
84 }
85
86 this.handleSession(sessionId, streamPath, splittedPath[2])
87 .catch(err => logger.error('Cannot handle sessions.', { err }))
88 })
89
a5cf76af 90 events.on('donePublish', sessionId => {
284ef529 91 logger.info('Live session ended.', { sessionId })
c6c0fa6c
C
92 })
93
c6c0fa6c
C
94 registerConfigChangedHandler(() => {
95 if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
96 this.run()
97 return
98 }
99
100 if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) {
101 this.stop()
102 }
103 })
e4bf7856 104
5c0904fc
C
105 // Cleanup broken lives, that were terminated by a server restart for example
106 this.handleBrokenLives()
107 .catch(err => logger.error('Cannot handle broken lives.', { err }))
108
e4bf7856 109 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
c6c0fa6c
C
110 }
111
112 run () {
3cabf353 113 logger.info('Running RTMP server on port %d', config.rtmp.port)
c6c0fa6c 114
00b87c57
C
115 this.rtmpServer = createServer(socket => {
116 const session = new NodeRtmpSession(config, socket)
117
118 session.run()
119 })
120
121 this.rtmpServer.on('error', err => {
c2b82382
C
122 logger.error('Cannot run RTMP server.', { err })
123 })
124
00b87c57 125 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
c6c0fa6c
C
126 }
127
128 stop () {
129 logger.info('Stopping RTMP server.')
130
00b87c57 131 this.rtmpServer.close()
c6c0fa6c 132 this.rtmpServer = undefined
00b87c57
C
133
134 // Sessions is an object
135 this.getContext().sessions.forEach((session: any) => {
136 if (session instanceof NodeRtmpSession) {
137 session.stop()
138 }
139 })
c6c0fa6c
C
140 }
141
e4bf7856
C
142 isRunning () {
143 return !!this.rtmpServer
144 }
145
c6c0fa6c
C
146 getSegmentsSha256 (videoUUID: string) {
147 return this.segmentsSha256.get(videoUUID)
148 }
149
a5cf76af
C
150 stopSessionOf (videoId: number) {
151 const sessionId = this.videoSessions.get(videoId)
152 if (!sessionId) return
153
97969c4e 154 this.videoSessions.delete(videoId)
a5cf76af 155 this.abortSession(sessionId)
a5cf76af
C
156 }
157
68e70a74
C
158 getLiveQuotaUsedByUser (userId: number) {
159 const currentLives = this.livesPerUser.get(userId)
160 if (!currentLives) return 0
161
162 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
163 }
164
e4bf7856
C
165 addViewTo (videoId: number) {
166 if (this.videoSessions.has(videoId) === false) return
167
168 let watchers = this.watchersPerVideo.get(videoId)
169
170 if (!watchers) {
171 watchers = []
172 this.watchersPerVideo.set(videoId, watchers)
173 }
174
175 watchers.push(new Date().getTime())
176 }
177
bb4ba6d9
C
178 cleanupShaSegments (videoUUID: string) {
179 this.segmentsSha256.delete(videoUUID)
180 }
181
3851e732
C
182 addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
183 const segmentName = basename(segmentPath)
184 const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, this.buildConcatenatedName(segmentName))
185
186 return readFile(segmentPath)
187 .then(data => appendFile(dest, data))
188 .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err }))
189 }
190
191 buildConcatenatedName (segmentOrPlaylistPath: string) {
192 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
193
194 return 'concat-' + num[1] + '.ts'
195 }
196
197 private processSegments (hlsVideoPath: string, videoUUID: string, videoLive: MVideoLive, segmentPaths: string[]) {
198 Bluebird.mapSeries(segmentPaths, async previousSegment => {
199 // Add sha hash of previous segments, because ffmpeg should have finished generating them
200 await this.addSegmentSha(videoUUID, previousSegment)
201
202 if (videoLive.saveReplay) {
203 await this.addSegmentToReplay(hlsVideoPath, previousSegment)
204 }
205 }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err }))
206 }
207
c6c0fa6c
C
208 private getContext () {
209 return context
210 }
211
212 private abortSession (id: string) {
213 const session = this.getContext().sessions.get(id)
284ef529
C
214 if (session) {
215 session.stop()
216 this.getContext().sessions.delete(id)
217 }
c6c0fa6c
C
218
219 const transSession = this.transSessions.get(id)
284ef529
C
220 if (transSession) {
221 transSession.kill('SIGINT')
222 this.transSessions.delete(id)
223 }
c6c0fa6c
C
224 }
225
226 private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
227 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
228 if (!videoLive) {
229 logger.warn('Unknown live video with stream key %s.', streamKey)
230 return this.abortSession(sessionId)
231 }
232
233 const video = videoLive.Video
a5cf76af
C
234 if (video.isBlacklisted()) {
235 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
236 return this.abortSession(sessionId)
237 }
238
bb4ba6d9
C
239 // Cleanup old potential live files (could happen with a permanent live)
240 this.cleanupShaSegments(video.uuid)
241
242 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
243 if (oldStreamingPlaylist) {
244 await cleanupLive(video, oldStreamingPlaylist)
245 }
246
a5cf76af
C
247 this.videoSessions.set(video.id, sessionId)
248
c6c0fa6c
C
249 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
250
251 const session = this.getContext().sessions.get(sessionId)
68e70a74
C
252 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
253
254 const [ resolutionResult, fps ] = await Promise.all([
255 getVideoFileResolution(rtmpUrl),
256 getVideoFileFPS(rtmpUrl)
257 ])
258
c6c0fa6c 259 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
68e70a74 260 ? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live')
c6c0fa6c
C
261 : []
262
6297bae0
C
263 const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
264
265 logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions })
c6c0fa6c
C
266
267 const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
268 videoId: video.id,
269 playlistUrl,
270 segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive),
6297bae0 271 p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions),
c6c0fa6c
C
272 p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
273
274 type: VideoStreamingPlaylistType.HLS
275 }, { returning: true }) as [ MStreamingPlaylist, boolean ]
276
c6c0fa6c
C
277 return this.runMuxing({
278 sessionId,
279 videoLive,
90a8bd30 280 playlist: Object.assign(videoStreamingPlaylist, { Video: video }),
68e70a74
C
281 rtmpUrl,
282 fps,
6297bae0 283 allResolutions
c6c0fa6c
C
284 })
285 }
286
287 private async runMuxing (options: {
288 sessionId: string
289 videoLive: MVideoLiveVideo
90a8bd30 290 playlist: MStreamingPlaylistVideo
68e70a74
C
291 rtmpUrl: string
292 fps: number
6297bae0 293 allResolutions: number[]
c6c0fa6c 294 }) {
6297bae0 295 const { sessionId, videoLive, playlist, allResolutions, fps, rtmpUrl } = options
fb719404 296 const startStreamDateTime = new Date().getTime()
c6c0fa6c 297
fb719404
C
298 const user = await UserModel.loadByLiveId(videoLive.id)
299 if (!this.livesPerUser.has(user.id)) {
300 this.livesPerUser.set(user.id, [])
301 }
302
303 const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
304 const livesOfUser = this.livesPerUser.get(user.id)
305 livesOfUser.push(currentUserLive)
306
c6c0fa6c
C
307 for (let i = 0; i < allResolutions.length; i++) {
308 const resolution = allResolutions[i]
309
0d8de275 310 const file = new VideoFileModel({
c6c0fa6c
C
311 resolution,
312 size: -1,
313 extname: '.ts',
314 infoHash: null,
bd54ad19 315 fps,
c6c0fa6c 316 videoStreamingPlaylistId: playlist.id
c6c0fa6c 317 })
0d8de275
C
318
319 VideoFileModel.customUpsert(file, 'streaming-playlist', null)
320 .catch(err => logger.error('Cannot create file for live streaming.', { err }))
c6c0fa6c
C
321 }
322
323 const outPath = getHLSDirectory(videoLive.Video)
324 await ensureDir(outPath)
325
937581b8
C
326 const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
327
328 if (videoLive.saveReplay === true) {
329 await ensureDir(replayDirectory)
330 }
331
68e70a74 332 const videoUUID = videoLive.Video.uuid
fb719404 333
c6c0fa6c 334 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
5a547f69
C
335 ? await getLiveTranscodingCommand({
336 rtmpUrl,
337 outPath,
33ff70ba 338 resolutions: allResolutions,
5a547f69 339 fps,
529b3752 340 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
1896bca0 341 profile: CONFIG.LIVE.TRANSCODING.PROFILE
5a547f69 342 })
937581b8 343 : getLiveMuxingCommand(rtmpUrl, outPath)
c6c0fa6c 344
68e70a74 345 logger.info('Running live muxing/transcoding for %s.', videoUUID)
c6c0fa6c
C
346 this.transSessions.set(sessionId, ffmpegExec)
347
a5cf76af
C
348 const tsWatcher = chokidar.watch(outPath + '/*.ts')
349
786b855a
C
350 const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
351 const playlistIdMatcher = /^([\d+])-/
fb719404 352
6bff8ce2
C
353 const addHandler = segmentPath => {
354 logger.debug('Live add handler of %s.', segmentPath)
355
356 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
357
358 const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
3851e732 359 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcess)
210856a7 360
786b855a 361 segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
fb719404 362
00b87c57
C
363 if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
364 logger.error(
365 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
366 ' Stopping session of video %s.', videoUUID)
367
368 this.stopSessionOf(videoLive.videoId)
369 return
370 }
371
210856a7 372 // Duration constraint check
fb719404 373 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
97969c4e
C
374 logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
375
fb719404 376 this.stopSessionOf(videoLive.videoId)
00b87c57 377 return
fb719404
C
378 }
379
97969c4e 380 // Check user quota if the user enabled replay saving
fb719404
C
381 if (videoLive.saveReplay === true) {
382 stat(segmentPath)
383 .then(segmentStat => {
384 currentUserLive.size += segmentStat.size
385 })
386 .then(() => this.isQuotaConstraintValid(user, videoLive))
387 .then(quotaValid => {
388 if (quotaValid !== true) {
97969c4e
C
389 logger.info('Stopping session of %s: user quota exceeded.', videoUUID)
390
fb719404
C
391 this.stopSessionOf(videoLive.videoId)
392 }
393 })
394 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
395 }
a5cf76af
C
396 }
397
210856a7 398 const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
a5cf76af 399
fb719404 400 tsWatcher.on('add', p => addHandler(p))
a5cf76af
C
401 tsWatcher.on('unlink', p => deleteHandler(p))
402
403 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
404 masterWatcher.on('add', async () => {
405 try {
406 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
407
408 video.state = VideoState.PUBLISHED
409 await video.save()
410 videoLive.Video = video
411
501af82d
C
412 setTimeout(() => {
413 federateVideoIfNeeded(video, false)
414 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err }))
415
416 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
417 }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
a5cf76af 418
a5cf76af 419 } catch (err) {
501af82d 420 logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err })
a5cf76af
C
421 } finally {
422 masterWatcher.close()
423 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
424 }
425 })
426
c6c0fa6c 427 const onFFmpegEnded = () => {
68e70a74 428 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
c6c0fa6c 429
284ef529 430 this.transSessions.delete(sessionId)
bb4ba6d9 431
e4bf7856 432 this.watchersPerVideo.delete(videoLive.videoId)
bb4ba6d9
C
433 this.videoSessions.delete(videoLive.videoId)
434
435 const newLivesPerUser = this.livesPerUser.get(user.id)
436 .filter(o => o.liveId !== videoLive.id)
437 this.livesPerUser.set(user.id, newLivesPerUser)
284ef529 438
6bff8ce2
C
439 setTimeout(() => {
440 // Wait latest segments generation, and close watchers
441
442 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
8c666c44
C
443 .then(() => {
444 // Process remaining segments hash
445 for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
3851e732 446 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
8c666c44
C
447 }
448 })
449 .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
6bff8ce2
C
450
451 this.onEndTransmuxing(videoLive.Video.id)
452 .catch(err => logger.error('Error in closed transmuxing.', { err }))
453 }, 1000)
c6c0fa6c
C
454 }
455
456 ffmpegExec.on('error', (err, stdout, stderr) => {
457 onFFmpegEnded()
458
459 // Don't care that we killed the ffmpeg process
97969c4e 460 if (err?.message?.includes('Exiting normally')) return
c6c0fa6c
C
461
462 logger.error('Live transcoding error.', { err, stdout, stderr })
77e9f859
C
463
464 this.abortSession(sessionId)
c6c0fa6c
C
465 })
466
467 ffmpegExec.on('end', () => onFFmpegEnded())
9252a33d
C
468
469 ffmpegExec.run()
c6c0fa6c
C
470 }
471
fb719404 472 private async onEndTransmuxing (videoId: number, cleanupNow = false) {
a5cf76af
C
473 try {
474 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
475 if (!fullVideo) return
c6c0fa6c 476
bb4ba6d9
C
477 const live = await VideoLiveModel.loadByVideoId(videoId)
478
479 if (!live.permanentLive) {
480 JobQueue.Instance.createJob({
481 type: 'video-live-ending',
482 payload: {
483 videoId: fullVideo.id
484 }
485 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
486
487 fullVideo.state = VideoState.LIVE_ENDED
488 } else {
489 fullVideo.state = VideoState.WAITING_FOR_LIVE
490 }
c6c0fa6c 491
a5cf76af 492 await fullVideo.save()
c6c0fa6c 493
a5cf76af 494 PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
c6c0fa6c 495
a5cf76af
C
496 await federateVideoIfNeeded(fullVideo, false)
497 } catch (err) {
88cfa3e8 498 logger.error('Cannot save/federate new video state of live streaming of video id %d.', videoId, { err })
a5cf76af 499 }
c6c0fa6c
C
500 }
501
210856a7
C
502 private async addSegmentSha (videoUUID: string, segmentPath: string) {
503 const segmentName = basename(segmentPath)
504 logger.debug('Adding live sha segment %s.', segmentPath)
c6c0fa6c 505
210856a7 506 const shaResult = await buildSha256Segment(segmentPath)
c6c0fa6c 507
210856a7
C
508 if (!this.segmentsSha256.has(videoUUID)) {
509 this.segmentsSha256.set(videoUUID, new Map())
c6c0fa6c
C
510 }
511
210856a7 512 const filesMap = this.segmentsSha256.get(videoUUID)
c6c0fa6c
C
513 filesMap.set(segmentName, shaResult)
514 }
515
210856a7
C
516 private removeSegmentSha (videoUUID: string, segmentPath: string) {
517 const segmentName = basename(segmentPath)
c6c0fa6c 518
210856a7 519 logger.debug('Removing live sha segment %s.', segmentPath)
c6c0fa6c 520
210856a7 521 const filesMap = this.segmentsSha256.get(videoUUID)
c6c0fa6c 522 if (!filesMap) {
210856a7 523 logger.warn('Unknown files map to remove sha for %s.', videoUUID)
c6c0fa6c
C
524 return
525 }
526
527 if (!filesMap.has(segmentName)) {
210856a7 528 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
c6c0fa6c
C
529 return
530 }
531
532 filesMap.delete(segmentName)
533 }
534
fb719404
C
535 private isDurationConstraintValid (streamingStartTime: number) {
536 const maxDuration = CONFIG.LIVE.MAX_DURATION
537 // No limit
c9bc850e 538 if (maxDuration < 0) return true
fb719404
C
539
540 const now = new Date().getTime()
541 const max = streamingStartTime + maxDuration
542
543 return now <= max
544 }
545
00b87c57
C
546 private hasClientSocketsInBadHealth (sessionId: string) {
547 const rtmpSession = this.getContext().sessions.get(sessionId)
548
549 if (!rtmpSession) {
550 logger.warn('Cannot get session %s to check players socket health.', sessionId)
551 return
552 }
553
554 for (const playerSessionId of rtmpSession.players) {
555 const playerSession = this.getContext().sessions.get(playerSessionId)
556
557 if (!playerSession) {
558 logger.error('Cannot get player session %s to check socket health.', playerSession)
559 continue
560 }
561
562 if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
563 return true
564 }
565 }
566
567 return false
568 }
569
fb719404
C
570 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
571 if (live.saveReplay !== true) return true
572
573 return this.isAbleToUploadVideoWithCache(user.id)
574 }
575
e4bf7856
C
576 private async updateLiveViews () {
577 if (!this.isRunning()) return
578
c655c9ef 579 if (!isTestInstance()) logger.info('Updating live video views.')
e4bf7856
C
580
581 for (const videoId of this.watchersPerVideo.keys()) {
582 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
583
584 const watchers = this.watchersPerVideo.get(videoId)
585
586 const numWatchers = watchers.length
587
588 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
589 video.views = numWatchers
590 await video.save()
591
592 await federateVideoIfNeeded(video, false)
593
a800dbf3
C
594 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
595
e4bf7856
C
596 // Only keep not expired watchers
597 const newWatchers = watchers.filter(w => w > notBefore)
598 this.watchersPerVideo.set(videoId, newWatchers)
599
600 logger.debug('New live video views for %s is %d.', video.url, numWatchers)
601 }
602 }
603
5c0904fc
C
604 private async handleBrokenLives () {
605 const videoIds = await VideoModel.listPublishedLiveIds()
606
607 for (const id of videoIds) {
608 await this.onEndTransmuxing(id, true)
609 }
610 }
611
c6c0fa6c
C
612 static get Instance () {
613 return this.instance || (this.instance = new this())
614 }
615}
616
617// ---------------------------------------------------------------------------
618
619export {
620 LiveManager
621}