aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-11-06 16:42:23 +0100
committerChocobozzz <chocobozzz@cpy.re>2020-11-09 15:33:04 +0100
commite4bf78561763cd84d22ebceb6f371cccf9a356d8 (patch)
tree7d8ea6ed53810d1dfcc2cfa5e3150da8e87e4645
parent529f037294d9917a62235f8162887a8edc04c32f (diff)
downloadPeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.tar.gz
PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.tar.zst
PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.zip
Handle views for live videos
-rw-r--r--server/controllers/api/videos/index.ts45
-rw-r--r--server/initializers/constants.ts11
-rw-r--r--server/lib/activitypub/process/process-view.ts19
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts2
-rw-r--r--server/lib/live-manager.ts50
-rw-r--r--server/lib/redis.ts10
-rw-r--r--server/tests/api/live/live.ts77
7 files changed, 190 insertions, 24 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index ff29e584b..3f96f142c 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -66,6 +66,7 @@ import { liveRouter } from './live'
66import { ownershipVideoRouter } from './ownership' 66import { ownershipVideoRouter } from './ownership'
67import { rateVideoRouter } from './rate' 67import { rateVideoRouter } from './rate'
68import { watchingRouter } from './watching' 68import { watchingRouter } from './watching'
69import { LiveManager } from '@server/lib/live-manager'
69 70
70const auditLogger = auditLoggerFactory('videos') 71const auditLogger = auditLoggerFactory('videos')
71const videosRouter = express.Router() 72const videosRouter = express.Router()
@@ -416,26 +417,46 @@ async function getVideo (req: express.Request, res: express.Response) {
416} 417}
417 418
418async function viewVideo (req: express.Request, res: express.Response) { 419async function viewVideo (req: express.Request, res: express.Response) {
419 const videoInstance = res.locals.onlyImmutableVideo 420 const immutableVideoAttrs = res.locals.onlyImmutableVideo
420 421
421 const ip = req.ip 422 const ip = req.ip
422 const exists = await Redis.Instance.doesVideoIPViewExist(ip, videoInstance.uuid) 423 const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid)
423 if (exists) { 424 if (exists) {
424 logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid) 425 logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
425 return res.status(204).end() 426 return res.sendStatus(204)
426 } 427 }
427 428
428 await Promise.all([ 429 const video = await VideoModel.load(immutableVideoAttrs.id)
429 Redis.Instance.addVideoView(videoInstance.id),
430 Redis.Instance.setIPVideoView(ip, videoInstance.uuid)
431 ])
432 430
433 const serverActor = await getServerActor() 431 const promises: Promise<any>[] = [
434 await sendView(serverActor, videoInstance, undefined) 432 Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
433 ]
435 434
436 Hooks.runAction('action:api.video.viewed', { video: videoInstance, ip }) 435 let federateView = true
437 436
438 return res.status(204).end() 437 // Increment our live manager
438 if (video.isLive && video.isOwned()) {
439 LiveManager.Instance.addViewTo(video.id)
440
441 // Views of our local live will be sent by our live manager
442 federateView = false
443 }
444
445 // Increment our video views cache counter
446 if (!video.isLive) {
447 promises.push(Redis.Instance.addVideoView(video.id))
448 }
449
450 if (federateView) {
451 const serverActor = await getServerActor()
452 promises.push(sendView(serverActor, video, undefined))
453 }
454
455 await Promise.all(promises)
456
457 Hooks.runAction('action:api.video.viewed', { video, ip })
458
459 return res.sendStatus(204)
439} 460}
440 461
441async function getVideoDescription (req: express.Request, res: express.Response) { 462async function getVideoDescription (req: express.Request, res: express.Response) {
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index e712f02a8..a93fe3c51 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -316,7 +316,11 @@ const CONSTRAINTS_FIELDS = {
316 } 316 }
317} 317}
318 318
319let VIDEO_VIEW_LIFETIME = 60000 * 60 // 1 hour 319let VIEW_LIFETIME = {
320 VIDEO: 60000 * 60, // 1 hour
321 LIVE: 60000 * 5 // 5 minutes
322}
323
320let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour 324let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
321 325
322const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = { 326const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = {
@@ -726,7 +730,8 @@ if (isTestInstance() === true) {
726 730
727 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 731 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
728 732
729 VIDEO_VIEW_LIFETIME = 1000 // 1 second 733 VIEW_LIFETIME.VIDEO = 1000 // 1 second
734 VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
730 CONTACT_FORM_LIFETIME = 1000 // 1 second 735 CONTACT_FORM_LIFETIME = 1000 // 1 second
731 736
732 JOB_ATTEMPTS['email'] = 1 737 JOB_ATTEMPTS['email'] = 1
@@ -838,7 +843,7 @@ export {
838 JOB_COMPLETED_LIFETIME, 843 JOB_COMPLETED_LIFETIME,
839 HTTP_SIGNATURE, 844 HTTP_SIGNATURE,
840 VIDEO_IMPORT_STATES, 845 VIDEO_IMPORT_STATES,
841 VIDEO_VIEW_LIFETIME, 846 VIEW_LIFETIME,
842 CONTACT_FORM_LIFETIME, 847 CONTACT_FORM_LIFETIME,
843 VIDEO_PLAYLIST_PRIVACIES, 848 VIDEO_PLAYLIST_PRIVACIES,
844 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME, 849 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME,
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts
index cc26180af..efceb21a2 100644
--- a/server/lib/activitypub/process/process-view.ts
+++ b/server/lib/activitypub/process/process-view.ts
@@ -4,6 +4,7 @@ import { Redis } from '../../redis'
4import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' 4import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
5import { APProcessorOptions } from '../../../types/activitypub-processor.model' 5import { APProcessorOptions } from '../../../types/activitypub-processor.model'
6import { MActorSignature } from '../../../types/models' 6import { MActorSignature } from '../../../types/models'
7import { LiveManager } from '@server/lib/live-manager'
7 8
8async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { 9async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
9 const { activity, byActor } = options 10 const { activity, byActor } = options
@@ -19,19 +20,27 @@ export {
19// --------------------------------------------------------------------------- 20// ---------------------------------------------------------------------------
20 21
21async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { 22async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) {
22 const videoObject = activity.type === 'View' ? activity.object : (activity.object as ViewObject).object 23 const videoObject = activity.type === 'View'
24 ? activity.object
25 : (activity.object as ViewObject).object
23 26
24 const options = { 27 const options = {
25 videoObject, 28 videoObject,
26 fetchType: 'only-immutable-attributes' as 'only-immutable-attributes', 29 fetchType: 'only-video' as 'only-video',
27 allowRefresh: false as false 30 allowRefresh: false as false
28 } 31 }
29 const { video } = await getOrCreateVideoAndAccountAndChannel(options) 32 const { video } = await getOrCreateVideoAndAccountAndChannel(options)
30 33
31 await Redis.Instance.addVideoView(video.id)
32
33 if (video.isOwned()) { 34 if (video.isOwned()) {
34 // Don't resend the activity to the sender 35 // Our live manager will increment the counter and send the view to followers
36 if (video.isLive) {
37 LiveManager.Instance.addViewTo(video.id)
38 return
39 }
40
41 await Redis.Instance.addVideoView(video.id)
42
43 // Forward the view but don't resend the activity to the sender
35 const exceptions = [ byActor ] 44 const exceptions = [ byActor ]
36 await forwardVideoRelatedActivity(activity, undefined, exceptions, video) 45 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
37 } 46 }
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 55c7a4ccb..599aabf80 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -83,6 +83,8 @@ async function saveLive (video: MVideo, live: MVideoLive) {
83 await live.destroy() 83 await live.destroy()
84 84
85 video.isLive = false 85 video.isLive = false
86 // Reinit views
87 video.views = 0
86 video.state = VideoState.TO_TRANSCODE 88 video.state = VideoState.TO_TRANSCODE
87 video.duration = duration 89 video.duration = duration
88 90
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index ef9377e43..4a1081a4f 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -13,7 +13,7 @@ import {
13} from '@server/helpers/ffmpeg-utils' 13} from '@server/helpers/ffmpeg-utils'
14import { logger } from '@server/helpers/logger' 14import { logger } from '@server/helpers/logger'
15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
16import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' 16import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
17import { UserModel } from '@server/models/account/user' 17import { UserModel } from '@server/models/account/user'
18import { VideoModel } from '@server/models/video/video' 18import { VideoModel } from '@server/models/video/video'
19import { VideoFileModel } from '@server/models/video/video-file' 19import { VideoFileModel } from '@server/models/video/video-file'
@@ -61,6 +61,8 @@ class LiveManager {
61 61
62 private readonly transSessions = new Map<string, FfmpegCommand>() 62 private readonly transSessions = new Map<string, FfmpegCommand>()
63 private readonly videoSessions = new Map<number, string>() 63 private readonly videoSessions = new Map<number, string>()
64 // Values are Date().getTime()
65 private readonly watchersPerVideo = new Map<number, number[]>()
64 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 66 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
65 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>() 67 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
66 68
@@ -115,6 +117,8 @@ class LiveManager {
115 this.stop() 117 this.stop()
116 } 118 }
117 }) 119 })
120
121 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
118 } 122 }
119 123
120 run () { 124 run () {
@@ -131,6 +135,10 @@ class LiveManager {
131 this.rtmpServer = undefined 135 this.rtmpServer = undefined
132 } 136 }
133 137
138 isRunning () {
139 return !!this.rtmpServer
140 }
141
134 getSegmentsSha256 (videoUUID: string) { 142 getSegmentsSha256 (videoUUID: string) {
135 return this.segmentsSha256.get(videoUUID) 143 return this.segmentsSha256.get(videoUUID)
136 } 144 }
@@ -150,6 +158,19 @@ class LiveManager {
150 return currentLives.reduce((sum, obj) => sum + obj.size, 0) 158 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
151 } 159 }
152 160
161 addViewTo (videoId: number) {
162 if (this.videoSessions.has(videoId) === false) return
163
164 let watchers = this.watchersPerVideo.get(videoId)
165
166 if (!watchers) {
167 watchers = []
168 this.watchersPerVideo.set(videoId, watchers)
169 }
170
171 watchers.push(new Date().getTime())
172 }
173
153 private getContext () { 174 private getContext () {
154 return context 175 return context
155 } 176 }
@@ -331,6 +352,7 @@ class LiveManager {
331 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) 352 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
332 353
333 this.transSessions.delete(sessionId) 354 this.transSessions.delete(sessionId)
355 this.watchersPerVideo.delete(videoLive.videoId)
334 356
335 Promise.all([ tsWatcher.close(), masterWatcher.close() ]) 357 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
336 .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) 358 .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
@@ -426,6 +448,32 @@ class LiveManager {
426 return this.isAbleToUploadVideoWithCache(user.id) 448 return this.isAbleToUploadVideoWithCache(user.id)
427 } 449 }
428 450
451 private async updateLiveViews () {
452 if (!this.isRunning()) return
453
454 logger.info('Updating live video views.')
455
456 for (const videoId of this.watchersPerVideo.keys()) {
457 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
458
459 const watchers = this.watchersPerVideo.get(videoId)
460
461 const numWatchers = watchers.length
462
463 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
464 video.views = numWatchers
465 await video.save()
466
467 await federateVideoIfNeeded(video, false)
468
469 // Only keep not expired watchers
470 const newWatchers = watchers.filter(w => w > notBefore)
471 this.watchersPerVideo.set(videoId, newWatchers)
472
473 logger.debug('New live video views for %s is %d.', video.url, numWatchers)
474 }
475 }
476
429 static get Instance () { 477 static get Instance () {
430 return this.instance || (this.instance = new this()) 478 return this.instance || (this.instance = new this())
431 } 479 }
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index a075eee2d..4325598b2 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -7,7 +7,7 @@ import {
7 USER_EMAIL_VERIFY_LIFETIME, 7 USER_EMAIL_VERIFY_LIFETIME,
8 USER_PASSWORD_RESET_LIFETIME, 8 USER_PASSWORD_RESET_LIFETIME,
9 USER_PASSWORD_CREATE_LIFETIME, 9 USER_PASSWORD_CREATE_LIFETIME,
10 VIDEO_VIEW_LIFETIME, 10 VIEW_LIFETIME,
11 WEBSERVER, 11 WEBSERVER,
12 TRACKER_RATE_LIMITS 12 TRACKER_RATE_LIMITS
13} from '../initializers/constants' 13} from '../initializers/constants'
@@ -118,8 +118,12 @@ class Redis {
118 118
119 /* ************ Views per IP ************ */ 119 /* ************ Views per IP ************ */
120 120
121 setIPVideoView (ip: string, videoUUID: string) { 121 setIPVideoView (ip: string, videoUUID: string, isLive: boolean) {
122 return this.setValue(this.generateViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) 122 const lifetime = isLive
123 ? VIEW_LIFETIME.LIVE
124 : VIEW_LIFETIME.VIDEO
125
126 return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime)
123 } 127 }
124 128
125 async doesVideoIPViewExist (ip: string, videoUUID: string) { 129 async doesVideoIPViewExist (ip: string, videoUUID: string) {
diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts
index b41b5fc2e..2198114b4 100644
--- a/server/tests/api/live/live.ts
+++ b/server/tests/api/live/live.ts
@@ -28,9 +28,12 @@ import {
28 testImage, 28 testImage,
29 updateCustomSubConfig, 29 updateCustomSubConfig,
30 updateLive, 30 updateLive,
31 viewVideo,
32 wait,
31 waitJobs, 33 waitJobs,
32 waitUntilLiveStarts 34 waitUntilLiveStarts
33} from '../../../../shared/extra-utils' 35} from '../../../../shared/extra-utils'
36import { FfmpegCommand } from 'fluent-ffmpeg'
34 37
35const expect = chai.expect 38const expect = chai.expect
36 39
@@ -419,6 +422,80 @@ describe('Test live', function () {
419 }) 422 })
420 }) 423 })
421 424
425 describe('Live views', function () {
426 let liveVideoId: string
427 let command: FfmpegCommand
428
429 async function countViews (expected: number) {
430 for (const server of servers) {
431 const res = await getVideo(server.url, liveVideoId)
432 const video: VideoDetails = res.body
433
434 expect(video.views).to.equal(expected)
435 }
436 }
437
438 before(async function () {
439 this.timeout(30000)
440
441 const liveAttributes = {
442 name: 'live video',
443 channelId: servers[0].videoChannel.id,
444 privacy: VideoPrivacy.PUBLIC
445 }
446
447 const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
448 liveVideoId = res.body.video.uuid
449
450 command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
451 await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId)
452 await waitJobs(servers)
453 })
454
455 it('Should display no views for a live', async function () {
456 await countViews(0)
457 })
458
459 it('Should view a live twice and display 1 view', async function () {
460 this.timeout(30000)
461
462 await viewVideo(servers[0].url, liveVideoId)
463 await viewVideo(servers[0].url, liveVideoId)
464
465 await wait(5000)
466
467 await waitJobs(servers)
468
469 await countViews(1)
470 })
471
472 it('Should wait 5 seconds and display 0 views', async function () {
473 this.timeout(30000)
474
475 await wait(5000)
476 await waitJobs(servers)
477
478 await countViews(0)
479 })
480
481 it('Should view a live on a remote and on local and display 2 views', async function () {
482 this.timeout(30000)
483
484 await viewVideo(servers[0].url, liveVideoId)
485 await viewVideo(servers[1].url, liveVideoId)
486 await viewVideo(servers[1].url, liveVideoId)
487
488 await wait(5000)
489 await waitJobs(servers)
490
491 await countViews(2)
492 })
493
494 after(async function () {
495 await stopFfmpeg(command)
496 })
497 })
498
422 describe('Live socket messages', function () { 499 describe('Live socket messages', function () {
423 500
424 async function createLiveWrapper () { 501 async function createLiveWrapper () {