diff options
author | Chocobozzz <me@florianbigard.com> | 2020-11-06 16:42:23 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2020-11-09 15:33:04 +0100 |
commit | e4bf78561763cd84d22ebceb6f371cccf9a356d8 (patch) | |
tree | 7d8ea6ed53810d1dfcc2cfa5e3150da8e87e4645 | |
parent | 529f037294d9917a62235f8162887a8edc04c32f (diff) | |
download | PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.tar.gz PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.tar.zst PeerTube-e4bf78561763cd84d22ebceb6f371cccf9a356d8.zip |
Handle views for live videos
-rw-r--r-- | server/controllers/api/videos/index.ts | 45 | ||||
-rw-r--r-- | server/initializers/constants.ts | 11 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-view.ts | 19 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-live-ending.ts | 2 | ||||
-rw-r--r-- | server/lib/live-manager.ts | 50 | ||||
-rw-r--r-- | server/lib/redis.ts | 10 | ||||
-rw-r--r-- | server/tests/api/live/live.ts | 77 |
7 files changed, 190 insertions, 24 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index ff29e584b..3f96f142c 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts | |||
@@ -66,6 +66,7 @@ import { liveRouter } from './live' | |||
66 | import { ownershipVideoRouter } from './ownership' | 66 | import { ownershipVideoRouter } from './ownership' |
67 | import { rateVideoRouter } from './rate' | 67 | import { rateVideoRouter } from './rate' |
68 | import { watchingRouter } from './watching' | 68 | import { watchingRouter } from './watching' |
69 | import { LiveManager } from '@server/lib/live-manager' | ||
69 | 70 | ||
70 | const auditLogger = auditLoggerFactory('videos') | 71 | const auditLogger = auditLoggerFactory('videos') |
71 | const videosRouter = express.Router() | 72 | const videosRouter = express.Router() |
@@ -416,26 +417,46 @@ async function getVideo (req: express.Request, res: express.Response) { | |||
416 | } | 417 | } |
417 | 418 | ||
418 | async function viewVideo (req: express.Request, res: express.Response) { | 419 | async function viewVideo (req: express.Request, res: express.Response) { |
419 | const videoInstance = res.locals.onlyImmutableVideo | 420 | const immutableVideoAttrs = res.locals.onlyImmutableVideo |
420 | 421 | ||
421 | const ip = req.ip | 422 | const ip = req.ip |
422 | const exists = await Redis.Instance.doesVideoIPViewExist(ip, videoInstance.uuid) | 423 | const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) |
423 | if (exists) { | 424 | if (exists) { |
424 | logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid) | 425 | logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid) |
425 | return res.status(204).end() | 426 | return res.sendStatus(204) |
426 | } | 427 | } |
427 | 428 | ||
428 | await Promise.all([ | 429 | const video = await VideoModel.load(immutableVideoAttrs.id) |
429 | Redis.Instance.addVideoView(videoInstance.id), | ||
430 | Redis.Instance.setIPVideoView(ip, videoInstance.uuid) | ||
431 | ]) | ||
432 | 430 | ||
433 | const serverActor = await getServerActor() | 431 | const promises: Promise<any>[] = [ |
434 | await sendView(serverActor, videoInstance, undefined) | 432 | Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive) |
433 | ] | ||
435 | 434 | ||
436 | Hooks.runAction('action:api.video.viewed', { video: videoInstance, ip }) | 435 | let federateView = true |
437 | 436 | ||
438 | return res.status(204).end() | 437 | // Increment our live manager |
438 | if (video.isLive && video.isOwned()) { | ||
439 | LiveManager.Instance.addViewTo(video.id) | ||
440 | |||
441 | // Views of our local live will be sent by our live manager | ||
442 | federateView = false | ||
443 | } | ||
444 | |||
445 | // Increment our video views cache counter | ||
446 | if (!video.isLive) { | ||
447 | promises.push(Redis.Instance.addVideoView(video.id)) | ||
448 | } | ||
449 | |||
450 | if (federateView) { | ||
451 | const serverActor = await getServerActor() | ||
452 | promises.push(sendView(serverActor, video, undefined)) | ||
453 | } | ||
454 | |||
455 | await Promise.all(promises) | ||
456 | |||
457 | Hooks.runAction('action:api.video.viewed', { video, ip }) | ||
458 | |||
459 | return res.sendStatus(204) | ||
439 | } | 460 | } |
440 | 461 | ||
441 | async function getVideoDescription (req: express.Request, res: express.Response) { | 462 | async function getVideoDescription (req: express.Request, res: express.Response) { |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index e712f02a8..a93fe3c51 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -316,7 +316,11 @@ const CONSTRAINTS_FIELDS = { | |||
316 | } | 316 | } |
317 | } | 317 | } |
318 | 318 | ||
319 | let VIDEO_VIEW_LIFETIME = 60000 * 60 // 1 hour | 319 | let VIEW_LIFETIME = { |
320 | VIDEO: 60000 * 60, // 1 hour | ||
321 | LIVE: 60000 * 5 // 5 minutes | ||
322 | } | ||
323 | |||
320 | let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour | 324 | let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour |
321 | 325 | ||
322 | const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = { | 326 | const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = { |
@@ -726,7 +730,8 @@ if (isTestInstance() === true) { | |||
726 | 730 | ||
727 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 | 731 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 |
728 | 732 | ||
729 | VIDEO_VIEW_LIFETIME = 1000 // 1 second | 733 | VIEW_LIFETIME.VIDEO = 1000 // 1 second |
734 | VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second | ||
730 | CONTACT_FORM_LIFETIME = 1000 // 1 second | 735 | CONTACT_FORM_LIFETIME = 1000 // 1 second |
731 | 736 | ||
732 | JOB_ATTEMPTS['email'] = 1 | 737 | JOB_ATTEMPTS['email'] = 1 |
@@ -838,7 +843,7 @@ export { | |||
838 | JOB_COMPLETED_LIFETIME, | 843 | JOB_COMPLETED_LIFETIME, |
839 | HTTP_SIGNATURE, | 844 | HTTP_SIGNATURE, |
840 | VIDEO_IMPORT_STATES, | 845 | VIDEO_IMPORT_STATES, |
841 | VIDEO_VIEW_LIFETIME, | 846 | VIEW_LIFETIME, |
842 | CONTACT_FORM_LIFETIME, | 847 | CONTACT_FORM_LIFETIME, |
843 | VIDEO_PLAYLIST_PRIVACIES, | 848 | VIDEO_PLAYLIST_PRIVACIES, |
844 | PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME, | 849 | PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME, |
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index cc26180af..efceb21a2 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts | |||
@@ -4,6 +4,7 @@ import { Redis } from '../../redis' | |||
4 | import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' | 4 | import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' |
5 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' | 5 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' |
6 | import { MActorSignature } from '../../../types/models' | 6 | import { MActorSignature } from '../../../types/models' |
7 | import { LiveManager } from '@server/lib/live-manager' | ||
7 | 8 | ||
8 | async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { | 9 | async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { |
9 | const { activity, byActor } = options | 10 | const { activity, byActor } = options |
@@ -19,19 +20,27 @@ export { | |||
19 | // --------------------------------------------------------------------------- | 20 | // --------------------------------------------------------------------------- |
20 | 21 | ||
21 | async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { | 22 | async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { |
22 | const videoObject = activity.type === 'View' ? activity.object : (activity.object as ViewObject).object | 23 | const videoObject = activity.type === 'View' |
24 | ? activity.object | ||
25 | : (activity.object as ViewObject).object | ||
23 | 26 | ||
24 | const options = { | 27 | const options = { |
25 | videoObject, | 28 | videoObject, |
26 | fetchType: 'only-immutable-attributes' as 'only-immutable-attributes', | 29 | fetchType: 'only-video' as 'only-video', |
27 | allowRefresh: false as false | 30 | allowRefresh: false as false |
28 | } | 31 | } |
29 | const { video } = await getOrCreateVideoAndAccountAndChannel(options) | 32 | const { video } = await getOrCreateVideoAndAccountAndChannel(options) |
30 | 33 | ||
31 | await Redis.Instance.addVideoView(video.id) | ||
32 | |||
33 | if (video.isOwned()) { | 34 | if (video.isOwned()) { |
34 | // Don't resend the activity to the sender | 35 | // Our live manager will increment the counter and send the view to followers |
36 | if (video.isLive) { | ||
37 | LiveManager.Instance.addViewTo(video.id) | ||
38 | return | ||
39 | } | ||
40 | |||
41 | await Redis.Instance.addVideoView(video.id) | ||
42 | |||
43 | // Forward the view but don't resend the activity to the sender | ||
35 | const exceptions = [ byActor ] | 44 | const exceptions = [ byActor ] |
36 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) | 45 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
37 | } | 46 | } |
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 55c7a4ccb..599aabf80 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -83,6 +83,8 @@ async function saveLive (video: MVideo, live: MVideoLive) { | |||
83 | await live.destroy() | 83 | await live.destroy() |
84 | 84 | ||
85 | video.isLive = false | 85 | video.isLive = false |
86 | // Reinit views | ||
87 | video.views = 0 | ||
86 | video.state = VideoState.TO_TRANSCODE | 88 | video.state = VideoState.TO_TRANSCODE |
87 | video.duration = duration | 89 | video.duration = duration |
88 | 90 | ||
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index ef9377e43..4a1081a4f 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts | |||
@@ -13,7 +13,7 @@ import { | |||
13 | } from '@server/helpers/ffmpeg-utils' | 13 | } from '@server/helpers/ffmpeg-utils' |
14 | import { logger } from '@server/helpers/logger' | 14 | import { logger } from '@server/helpers/logger' |
15 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 15 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
16 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' | 16 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' |
17 | import { UserModel } from '@server/models/account/user' | 17 | import { UserModel } from '@server/models/account/user' |
18 | import { VideoModel } from '@server/models/video/video' | 18 | import { VideoModel } from '@server/models/video/video' |
19 | import { VideoFileModel } from '@server/models/video/video-file' | 19 | import { VideoFileModel } from '@server/models/video/video-file' |
@@ -61,6 +61,8 @@ class LiveManager { | |||
61 | 61 | ||
62 | private readonly transSessions = new Map<string, FfmpegCommand>() | 62 | private readonly transSessions = new Map<string, FfmpegCommand>() |
63 | private readonly videoSessions = new Map<number, string>() | 63 | private readonly videoSessions = new Map<number, string>() |
64 | // Values are Date().getTime() | ||
65 | private readonly watchersPerVideo = new Map<number, number[]>() | ||
64 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() | 66 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() |
65 | private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>() | 67 | private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>() |
66 | 68 | ||
@@ -115,6 +117,8 @@ class LiveManager { | |||
115 | this.stop() | 117 | this.stop() |
116 | } | 118 | } |
117 | }) | 119 | }) |
120 | |||
121 | setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) | ||
118 | } | 122 | } |
119 | 123 | ||
120 | run () { | 124 | run () { |
@@ -131,6 +135,10 @@ class LiveManager { | |||
131 | this.rtmpServer = undefined | 135 | this.rtmpServer = undefined |
132 | } | 136 | } |
133 | 137 | ||
138 | isRunning () { | ||
139 | return !!this.rtmpServer | ||
140 | } | ||
141 | |||
134 | getSegmentsSha256 (videoUUID: string) { | 142 | getSegmentsSha256 (videoUUID: string) { |
135 | return this.segmentsSha256.get(videoUUID) | 143 | return this.segmentsSha256.get(videoUUID) |
136 | } | 144 | } |
@@ -150,6 +158,19 @@ class LiveManager { | |||
150 | return currentLives.reduce((sum, obj) => sum + obj.size, 0) | 158 | return currentLives.reduce((sum, obj) => sum + obj.size, 0) |
151 | } | 159 | } |
152 | 160 | ||
161 | addViewTo (videoId: number) { | ||
162 | if (this.videoSessions.has(videoId) === false) return | ||
163 | |||
164 | let watchers = this.watchersPerVideo.get(videoId) | ||
165 | |||
166 | if (!watchers) { | ||
167 | watchers = [] | ||
168 | this.watchersPerVideo.set(videoId, watchers) | ||
169 | } | ||
170 | |||
171 | watchers.push(new Date().getTime()) | ||
172 | } | ||
173 | |||
153 | private getContext () { | 174 | private getContext () { |
154 | return context | 175 | return context |
155 | } | 176 | } |
@@ -331,6 +352,7 @@ class LiveManager { | |||
331 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) | 352 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) |
332 | 353 | ||
333 | this.transSessions.delete(sessionId) | 354 | this.transSessions.delete(sessionId) |
355 | this.watchersPerVideo.delete(videoLive.videoId) | ||
334 | 356 | ||
335 | Promise.all([ tsWatcher.close(), masterWatcher.close() ]) | 357 | Promise.all([ tsWatcher.close(), masterWatcher.close() ]) |
336 | .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) | 358 | .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) |
@@ -426,6 +448,32 @@ class LiveManager { | |||
426 | return this.isAbleToUploadVideoWithCache(user.id) | 448 | return this.isAbleToUploadVideoWithCache(user.id) |
427 | } | 449 | } |
428 | 450 | ||
451 | private async updateLiveViews () { | ||
452 | if (!this.isRunning()) return | ||
453 | |||
454 | logger.info('Updating live video views.') | ||
455 | |||
456 | for (const videoId of this.watchersPerVideo.keys()) { | ||
457 | const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE | ||
458 | |||
459 | const watchers = this.watchersPerVideo.get(videoId) | ||
460 | |||
461 | const numWatchers = watchers.length | ||
462 | |||
463 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
464 | video.views = numWatchers | ||
465 | await video.save() | ||
466 | |||
467 | await federateVideoIfNeeded(video, false) | ||
468 | |||
469 | // Only keep not expired watchers | ||
470 | const newWatchers = watchers.filter(w => w > notBefore) | ||
471 | this.watchersPerVideo.set(videoId, newWatchers) | ||
472 | |||
473 | logger.debug('New live video views for %s is %d.', video.url, numWatchers) | ||
474 | } | ||
475 | } | ||
476 | |||
429 | static get Instance () { | 477 | static get Instance () { |
430 | return this.instance || (this.instance = new this()) | 478 | return this.instance || (this.instance = new this()) |
431 | } | 479 | } |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index a075eee2d..4325598b2 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -7,7 +7,7 @@ import { | |||
7 | USER_EMAIL_VERIFY_LIFETIME, | 7 | USER_EMAIL_VERIFY_LIFETIME, |
8 | USER_PASSWORD_RESET_LIFETIME, | 8 | USER_PASSWORD_RESET_LIFETIME, |
9 | USER_PASSWORD_CREATE_LIFETIME, | 9 | USER_PASSWORD_CREATE_LIFETIME, |
10 | VIDEO_VIEW_LIFETIME, | 10 | VIEW_LIFETIME, |
11 | WEBSERVER, | 11 | WEBSERVER, |
12 | TRACKER_RATE_LIMITS | 12 | TRACKER_RATE_LIMITS |
13 | } from '../initializers/constants' | 13 | } from '../initializers/constants' |
@@ -118,8 +118,12 @@ class Redis { | |||
118 | 118 | ||
119 | /* ************ Views per IP ************ */ | 119 | /* ************ Views per IP ************ */ |
120 | 120 | ||
121 | setIPVideoView (ip: string, videoUUID: string) { | 121 | setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { |
122 | return this.setValue(this.generateViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) | 122 | const lifetime = isLive |
123 | ? VIEW_LIFETIME.LIVE | ||
124 | : VIEW_LIFETIME.VIDEO | ||
125 | |||
126 | return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) | ||
123 | } | 127 | } |
124 | 128 | ||
125 | async doesVideoIPViewExist (ip: string, videoUUID: string) { | 129 | async doesVideoIPViewExist (ip: string, videoUUID: string) { |
diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index b41b5fc2e..2198114b4 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts | |||
@@ -28,9 +28,12 @@ import { | |||
28 | testImage, | 28 | testImage, |
29 | updateCustomSubConfig, | 29 | updateCustomSubConfig, |
30 | updateLive, | 30 | updateLive, |
31 | viewVideo, | ||
32 | wait, | ||
31 | waitJobs, | 33 | waitJobs, |
32 | waitUntilLiveStarts | 34 | waitUntilLiveStarts |
33 | } from '../../../../shared/extra-utils' | 35 | } from '../../../../shared/extra-utils' |
36 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
34 | 37 | ||
35 | const expect = chai.expect | 38 | const expect = chai.expect |
36 | 39 | ||
@@ -419,6 +422,80 @@ describe('Test live', function () { | |||
419 | }) | 422 | }) |
420 | }) | 423 | }) |
421 | 424 | ||
425 | describe('Live views', function () { | ||
426 | let liveVideoId: string | ||
427 | let command: FfmpegCommand | ||
428 | |||
429 | async function countViews (expected: number) { | ||
430 | for (const server of servers) { | ||
431 | const res = await getVideo(server.url, liveVideoId) | ||
432 | const video: VideoDetails = res.body | ||
433 | |||
434 | expect(video.views).to.equal(expected) | ||
435 | } | ||
436 | } | ||
437 | |||
438 | before(async function () { | ||
439 | this.timeout(30000) | ||
440 | |||
441 | const liveAttributes = { | ||
442 | name: 'live video', | ||
443 | channelId: servers[0].videoChannel.id, | ||
444 | privacy: VideoPrivacy.PUBLIC | ||
445 | } | ||
446 | |||
447 | const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) | ||
448 | liveVideoId = res.body.video.uuid | ||
449 | |||
450 | command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) | ||
451 | await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId) | ||
452 | await waitJobs(servers) | ||
453 | }) | ||
454 | |||
455 | it('Should display no views for a live', async function () { | ||
456 | await countViews(0) | ||
457 | }) | ||
458 | |||
459 | it('Should view a live twice and display 1 view', async function () { | ||
460 | this.timeout(30000) | ||
461 | |||
462 | await viewVideo(servers[0].url, liveVideoId) | ||
463 | await viewVideo(servers[0].url, liveVideoId) | ||
464 | |||
465 | await wait(5000) | ||
466 | |||
467 | await waitJobs(servers) | ||
468 | |||
469 | await countViews(1) | ||
470 | }) | ||
471 | |||
472 | it('Should wait 5 seconds and display 0 views', async function () { | ||
473 | this.timeout(30000) | ||
474 | |||
475 | await wait(5000) | ||
476 | await waitJobs(servers) | ||
477 | |||
478 | await countViews(0) | ||
479 | }) | ||
480 | |||
481 | it('Should view a live on a remote and on local and display 2 views', async function () { | ||
482 | this.timeout(30000) | ||
483 | |||
484 | await viewVideo(servers[0].url, liveVideoId) | ||
485 | await viewVideo(servers[1].url, liveVideoId) | ||
486 | await viewVideo(servers[1].url, liveVideoId) | ||
487 | |||
488 | await wait(5000) | ||
489 | await waitJobs(servers) | ||
490 | |||
491 | await countViews(2) | ||
492 | }) | ||
493 | |||
494 | after(async function () { | ||
495 | await stopFfmpeg(command) | ||
496 | }) | ||
497 | }) | ||
498 | |||
422 | describe('Live socket messages', function () { | 499 | describe('Live socket messages', function () { |
423 | 500 | ||
424 | async function createLiveWrapper () { | 501 | async function createLiveWrapper () { |