aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live-manager.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-11-06 16:42:23 +0100
committerChocobozzz <chocobozzz@cpy.re>2020-11-09 15:33:04 +0100
commite4bf78561763cd84d22ebceb6f371cccf9a356d8 (patch)
tree7d8ea6ed53810d1dfcc2cfa5e3150da8e87e4645 /server/lib/live-manager.ts
parent529f037294d9917a62235f8162887a8edc04c32f (diff)
downloadPeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.tar.gz
PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.tar.zst
PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.zip
Handle views for live videos
Diffstat (limited to 'server/lib/live-manager.ts')
-rw-r--r--server/lib/live-manager.ts50
1 files changed, 49 insertions, 1 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index ef9377e43..4a1081a4f 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -13,7 +13,7 @@ import {
13} from '@server/helpers/ffmpeg-utils' 13} from '@server/helpers/ffmpeg-utils'
14import { logger } from '@server/helpers/logger' 14import { logger } from '@server/helpers/logger'
15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
16import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' 16import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
17import { UserModel } from '@server/models/account/user' 17import { UserModel } from '@server/models/account/user'
18import { VideoModel } from '@server/models/video/video' 18import { VideoModel } from '@server/models/video/video'
19import { VideoFileModel } from '@server/models/video/video-file' 19import { VideoFileModel } from '@server/models/video/video-file'
@@ -61,6 +61,8 @@ class LiveManager {
61 61
62 private readonly transSessions = new Map<string, FfmpegCommand>() 62 private readonly transSessions = new Map<string, FfmpegCommand>()
63 private readonly videoSessions = new Map<number, string>() 63 private readonly videoSessions = new Map<number, string>()
64 // Values are Date().getTime()
65 private readonly watchersPerVideo = new Map<number, number[]>()
64 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 66 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
65 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>() 67 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
66 68
@@ -115,6 +117,8 @@ class LiveManager {
115 this.stop() 117 this.stop()
116 } 118 }
117 }) 119 })
120
121 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
118 } 122 }
119 123
120 run () { 124 run () {
@@ -131,6 +135,10 @@ class LiveManager {
131 this.rtmpServer = undefined 135 this.rtmpServer = undefined
132 } 136 }
133 137
138 isRunning () {
139 return !!this.rtmpServer
140 }
141
134 getSegmentsSha256 (videoUUID: string) { 142 getSegmentsSha256 (videoUUID: string) {
135 return this.segmentsSha256.get(videoUUID) 143 return this.segmentsSha256.get(videoUUID)
136 } 144 }
@@ -150,6 +158,19 @@ class LiveManager {
150 return currentLives.reduce((sum, obj) => sum + obj.size, 0) 158 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
151 } 159 }
152 160
161 addViewTo (videoId: number) {
162 if (this.videoSessions.has(videoId) === false) return
163
164 let watchers = this.watchersPerVideo.get(videoId)
165
166 if (!watchers) {
167 watchers = []
168 this.watchersPerVideo.set(videoId, watchers)
169 }
170
171 watchers.push(new Date().getTime())
172 }
173
153 private getContext () { 174 private getContext () {
154 return context 175 return context
155 } 176 }
@@ -331,6 +352,7 @@ class LiveManager {
331 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) 352 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
332 353
333 this.transSessions.delete(sessionId) 354 this.transSessions.delete(sessionId)
355 this.watchersPerVideo.delete(videoLive.videoId)
334 356
335 Promise.all([ tsWatcher.close(), masterWatcher.close() ]) 357 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
336 .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) 358 .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
@@ -426,6 +448,32 @@ class LiveManager {
426 return this.isAbleToUploadVideoWithCache(user.id) 448 return this.isAbleToUploadVideoWithCache(user.id)
427 } 449 }
428 450
451 private async updateLiveViews () {
452 if (!this.isRunning()) return
453
454 logger.info('Updating live video views.')
455
456 for (const videoId of this.watchersPerVideo.keys()) {
457 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
458
459 const watchers = this.watchersPerVideo.get(videoId)
460
461 const numWatchers = watchers.length
462
463 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
464 video.views = numWatchers
465 await video.save()
466
467 await federateVideoIfNeeded(video, false)
468
469 // Only keep not expired watchers
470 const newWatchers = watchers.filter(w => w > notBefore)
471 this.watchersPerVideo.set(videoId, newWatchers)
472
473 logger.debug('New live video views for %s is %d.', video.url, numWatchers)
474 }
475 }
476
429 static get Instance () { 477 static get Instance () {
430 return this.instance || (this.instance = new this()) 478 return this.instance || (this.instance = new this())
431 } 479 }