From e4bf78561763cd84d22ebceb6f371cccf9a356d8 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 6 Nov 2020 16:42:23 +0100 Subject: Handle views for live videos --- server/lib/activitypub/process/process-view.ts | 19 +++++--- server/lib/job-queue/handlers/video-live-ending.ts | 2 + server/lib/live-manager.ts | 50 +++++++++++++++++++++- server/lib/redis.ts | 10 +++-- 4 files changed, 72 insertions(+), 9 deletions(-) (limited to 'server/lib') diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index cc26180af..efceb21a2 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts @@ -4,6 +4,7 @@ import { Redis } from '../../redis' import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { MActorSignature } from '../../../types/models' +import { LiveManager } from '@server/lib/live-manager' async function processViewActivity (options: APProcessorOptions) { const { activity, byActor } = options @@ -19,19 +20,27 @@ export { // --------------------------------------------------------------------------- async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { - const videoObject = activity.type === 'View' ? activity.object : (activity.object as ViewObject).object + const videoObject = activity.type === 'View' + ? activity.object + : (activity.object as ViewObject).object const options = { videoObject, - fetchType: 'only-immutable-attributes' as 'only-immutable-attributes', + fetchType: 'only-video' as 'only-video', allowRefresh: false as false } const { video } = await getOrCreateVideoAndAccountAndChannel(options) - await Redis.Instance.addVideoView(video.id) - if (video.isOwned()) { - // Don't resend the activity to the sender + // Our live manager will increment the counter and send the view to followers + if (video.isLive) { + LiveManager.Instance.addViewTo(video.id) + return + } + + await Redis.Instance.addVideoView(video.id) + + // Forward the view but don't resend the activity to the sender const exceptions = [ byActor ] await forwardVideoRelatedActivity(activity, undefined, exceptions, video) } diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 55c7a4ccb..599aabf80 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -83,6 +83,8 @@ async function saveLive (video: MVideo, live: MVideoLive) { await live.destroy() video.isLive = false + // Reinit views + video.views = 0 video.state = VideoState.TO_TRANSCODE video.duration = duration diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index ef9377e43..4a1081a4f 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -13,7 +13,7 @@ import { } from '@server/helpers/ffmpeg-utils' import { logger } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' +import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' import { UserModel } from '@server/models/account/user' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' @@ -61,6 +61,8 @@ class LiveManager { private readonly transSessions = new Map() private readonly videoSessions = new Map() + // Values are Date().getTime() + private readonly watchersPerVideo = new Map() private readonly segmentsSha256 = new Map>() private readonly livesPerUser = new Map() @@ -115,6 +117,8 @@ class LiveManager { this.stop() } }) + + setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } run () { @@ -131,6 +135,10 @@ class LiveManager { this.rtmpServer = undefined } + isRunning () { + return !!this.rtmpServer + } + getSegmentsSha256 (videoUUID: string) { return this.segmentsSha256.get(videoUUID) } @@ -150,6 +158,19 @@ class LiveManager { return currentLives.reduce((sum, obj) => sum + obj.size, 0) } + addViewTo (videoId: number) { + if (this.videoSessions.has(videoId) === false) return + + let watchers = this.watchersPerVideo.get(videoId) + + if (!watchers) { + watchers = [] + this.watchersPerVideo.set(videoId, watchers) + } + + watchers.push(new Date().getTime()) + } + private getContext () { return context } @@ -331,6 +352,7 @@ class LiveManager { logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) this.transSessions.delete(sessionId) + this.watchersPerVideo.delete(videoLive.videoId) Promise.all([ tsWatcher.close(), masterWatcher.close() ]) .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) @@ -426,6 +448,32 @@ class LiveManager { return this.isAbleToUploadVideoWithCache(user.id) } + private async updateLiveViews () { + if (!this.isRunning()) return + + logger.info('Updating live video views.') + + for (const videoId of this.watchersPerVideo.keys()) { + const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE + + const watchers = this.watchersPerVideo.get(videoId) + + const numWatchers = watchers.length + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + video.views = numWatchers + await video.save() + + await federateVideoIfNeeded(video, false) + + // Only keep not expired watchers + const newWatchers = watchers.filter(w => w > notBefore) + this.watchersPerVideo.set(videoId, newWatchers) + + logger.debug('New live video views for %s is %d.', video.url, numWatchers) + } + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/redis.ts b/server/lib/redis.ts index a075eee2d..4325598b2 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -7,7 +7,7 @@ import { USER_EMAIL_VERIFY_LIFETIME, USER_PASSWORD_RESET_LIFETIME, USER_PASSWORD_CREATE_LIFETIME, - VIDEO_VIEW_LIFETIME, + VIEW_LIFETIME, WEBSERVER, TRACKER_RATE_LIMITS } from '../initializers/constants' @@ -118,8 +118,12 @@ class Redis { /* ************ Views per IP ************ */ - setIPVideoView (ip: string, videoUUID: string) { - return this.setValue(this.generateViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) + setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { + const lifetime = isLive + ? VIEW_LIFETIME.LIVE + : VIEW_LIFETIME.VIDEO + + return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) } async doesVideoIPViewExist (ip: string, videoUUID: string) { -- cgit v1.2.3