From 51353d9a035fb6b81f903a8b5f391292841649fd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 9 Nov 2021 10:11:20 +0100 Subject: Refactor video views Introduce viewers attribute for live videos Count views for live videos Reduce delay to see the viewer update for lives Add ability to configure video views buffer interval and view ip expiration --- server/lib/activitypub/process/process-view.ts | 32 +++-- server/lib/activitypub/send/send-view.ts | 4 +- server/lib/activitypub/videos/updater.ts | 1 - server/lib/job-queue/handlers/video-views-stats.ts | 57 +++++++++ server/lib/job-queue/handlers/video-views.ts | 67 ----------- server/lib/job-queue/job-queue.ts | 12 +- server/lib/live/live-manager.ts | 49 +------- server/lib/peertube-socket.ts | 8 +- server/lib/redis.ts | 112 +++++++++++++----- .../lib/schedulers/video-views-buffer-scheduler.ts | 52 +++++++++ server/lib/video-views.ts | 130 +++++++++++++++++++++ 11 files changed, 346 insertions(+), 178 deletions(-) create mode 100644 server/lib/job-queue/handlers/video-views-stats.ts delete mode 100644 server/lib/job-queue/handlers/video-views.ts create mode 100644 server/lib/schedulers/video-views-buffer-scheduler.ts create mode 100644 server/lib/video-views.ts (limited to 'server/lib') diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index 5593ee257..720385f9b 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts @@ -1,13 +1,13 @@ -import { getOrCreateAPVideo } from '../videos' -import { forwardVideoRelatedActivity } from '../send/utils' -import { Redis } from '../../redis' -import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' +import { VideoViews } from '@server/lib/video-views' +import { ActivityView } from '../../../../shared/models/activitypub' import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { MActorSignature } from '../../../types/models' -import { LiveManager } from '@server/lib/live/live-manager' +import { forwardVideoRelatedActivity } from '../send/utils' +import { getOrCreateAPVideo } from '../videos' -async function processViewActivity (options: APProcessorOptions) { +async function processViewActivity (options: APProcessorOptions) { const { activity, byActor } = options + return processCreateView(activity, byActor) } @@ -19,10 +19,8 @@ export { // --------------------------------------------------------------------------- -async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { - const videoObject = activity.type === 'View' - ? activity.object - : (activity.object as ViewObject).object +async function processCreateView (activity: ActivityView, byActor: MActorSignature) { + const videoObject = activity.object const { video } = await getOrCreateAPVideo({ videoObject, @@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct allowRefresh: false }) - if (!video.isLive) { - await Redis.Instance.addVideoView(video.id) - } + const viewerExpires = activity.expires + ? new Date(activity.expires) + : undefined - if (video.isOwned()) { - // Our live manager will increment the counter and send the view to followers - if (video.isLive) { - LiveManager.Instance.addViewTo(video.id) - return - } + await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) + if (video.isOwned()) { // Forward the view but don't resend the activity to the sender const exceptions = [ byActor ] await forwardVideoRelatedActivity(activity, undefined, exceptions, video) diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 153e94295..b12583e26 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts @@ -1,4 +1,5 @@ import { Transaction } from 'sequelize' +import { VideoViews } from '@server/lib/video-views' import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' import { logger } from '../../../helpers/logger' @@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU id: url, type: 'View' as 'View', actor: byActor.url, - object: video.url + object: video.url, + expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() }, audience ) diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts index 157569414..f786bb196 100644 --- a/server/lib/activitypub/videos/updater.ts +++ b/server/lib/activitypub/videos/updater.ts @@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder { if (videoUpdated.isLive) { PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) - PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated) } logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) diff --git a/server/lib/job-queue/handlers/video-views-stats.ts b/server/lib/job-queue/handlers/video-views-stats.ts new file mode 100644 index 000000000..caf5f6962 --- /dev/null +++ b/server/lib/job-queue/handlers/video-views-stats.ts @@ -0,0 +1,57 @@ +import { isTestInstance } from '../../../helpers/core-utils' +import { logger } from '../../../helpers/logger' +import { VideoModel } from '../../../models/video/video' +import { VideoViewModel } from '../../../models/video/video-view' +import { Redis } from '../../redis' + +async function processVideosViewsStats () { + const lastHour = new Date() + + // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour + if (!isTestInstance()) lastHour.setHours(lastHour.getHours() - 1) + + const hour = lastHour.getHours() + const startDate = lastHour.setMinutes(0, 0, 0) + const endDate = lastHour.setMinutes(59, 59, 999) + + const videoIds = await Redis.Instance.listVideosViewedForStats(hour) + if (videoIds.length === 0) return + + logger.info('Processing videos views stats in job for hour %d.', hour) + + for (const videoId of videoIds) { + try { + const views = await Redis.Instance.getVideoViewsStats(videoId, hour) + await Redis.Instance.deleteVideoViewsStats(videoId, hour) + + if (views) { + logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) + + try { + const video = await VideoModel.load(videoId) + if (!video) { + logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) + continue + } + + await VideoViewModel.create({ + startDate: new Date(startDate), + endDate: new Date(endDate), + views, + videoId + }) + } catch (err) { + logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) + } + } + } catch (err) { + logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) + } + } +} + +// --------------------------------------------------------------------------- + +export { + processVideosViewsStats +} diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts deleted file mode 100644 index 86d0a271f..000000000 --- a/server/lib/job-queue/handlers/video-views.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { Redis } from '../../redis' -import { logger } from '../../../helpers/logger' -import { VideoModel } from '../../../models/video/video' -import { VideoViewModel } from '../../../models/video/video-view' -import { isTestInstance } from '../../../helpers/core-utils' -import { federateVideoIfNeeded } from '../../activitypub/videos' - -async function processVideosViews () { - const lastHour = new Date() - - // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour - if (!isTestInstance()) lastHour.setHours(lastHour.getHours() - 1) - - const hour = lastHour.getHours() - const startDate = lastHour.setMinutes(0, 0, 0) - const endDate = lastHour.setMinutes(59, 59, 999) - - const videoIds = await Redis.Instance.getVideosIdViewed(hour) - if (videoIds.length === 0) return - - logger.info('Processing videos views in job for hour %d.', hour) - - for (const videoId of videoIds) { - try { - const views = await Redis.Instance.getVideoViews(videoId, hour) - await Redis.Instance.deleteVideoViews(videoId, hour) - - if (views) { - logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) - - try { - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - if (!video) { - logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) - continue - } - - await VideoViewModel.create({ - startDate: new Date(startDate), - endDate: new Date(endDate), - views, - videoId - }) - - if (video.isOwned()) { - // If this is a remote video, the origin instance will send us an update - await VideoModel.incrementViews(videoId, views) - - // Send video update - video.views += views - await federateVideoIfNeeded(video, false) - } - } catch (err) { - logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) - } - } - } catch (err) { - logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) - } - } -} - -// --------------------------------------------------------------------------- - -export { - processVideosViews -} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0eab720d9..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoTranscoding } from './handlers/video-transcoding' -import { processVideosViews } from './handlers/video-views' +import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -49,7 +49,7 @@ type CreateJobArgument = { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | - { type: 'videos-views', payload: {} } | + { type: 'videos-views-stats', payload: {} } | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'actor-keys', payload: ActorKeysPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } | @@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-transcoding': processVideoTranscoding, 'email': processEmail, 'video-import': processVideoImport, - 'videos-views': processVideosViews, + 'videos-views-stats': processVideosViewsStats, 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, 'actor-keys': processActorKeys, @@ -89,7 +89,7 @@ const jobTypes: JobType[] = [ 'video-transcoding', 'video-file-import', 'video-import', - 'videos-views', + 'videos-views-stats', 'activitypub-refresher', 'video-redundancy', 'actor-keys', @@ -247,8 +247,8 @@ class JobQueue { } private addRepeatableJobs () { - this.queues['videos-views'].add({}, { - repeat: REPEAT_JOBS['videos-views'] + this.queues['videos-views-stats'].add({}, { + repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1b7b9dd4d..2562edb75 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -2,7 +2,6 @@ import { readFile } from 'fs-extra' import { createServer, Server } from 'net' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' -import { isTestInstance } from '@server/helpers/core-utils' import { computeResolutionsToTranscode, ffprobePromise, @@ -12,7 +11,7 @@ import { } from '@server/helpers/ffprobe-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' +import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' @@ -53,8 +52,6 @@ class LiveManager { private readonly muxingSessions = new Map() private readonly videoSessions = new Map() - // Values are Date().getTime() - private readonly watchersPerVideo = new Map() private rtmpServer: Server private rtmpsServer: ServerTLS @@ -99,8 +96,6 @@ class LiveManager { // Cleanup broken lives, that were terminated by a server restart for example this.handleBrokenLives() .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) - - setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } async run () { @@ -184,19 +179,6 @@ class LiveManager { this.abortSession(sessionId) } - addViewTo (videoId: number) { - if (this.videoSessions.has(videoId) === false) return - - let watchers = this.watchersPerVideo.get(videoId) - - if (!watchers) { - watchers = [] - this.watchersPerVideo.set(videoId, watchers) - } - - watchers.push(new Date().getTime()) - } - private getContext () { return context } @@ -377,7 +359,6 @@ class LiveManager { } private onMuxingFFmpegEnd (videoId: number) { - this.watchersPerVideo.delete(videoId) this.videoSessions.delete(videoId) } @@ -411,34 +392,6 @@ class LiveManager { } } - private async updateLiveViews () { - if (!this.isRunning()) return - - if (!isTestInstance()) logger.info('Updating live video views.', lTags()) - - for (const videoId of this.watchersPerVideo.keys()) { - const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE - - const watchers = this.watchersPerVideo.get(videoId) - - const numWatchers = watchers.length - - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - video.views = numWatchers - await video.save() - - await federateVideoIfNeeded(video, false) - - PeerTubeSocket.Instance.sendVideoViewsUpdate(video) - - // Only keep not expired watchers - const newWatchers = watchers.filter(w => w > notBefore) - this.watchersPerVideo.set(videoId, newWatchers) - - logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) - } - } - private async handleBrokenLives () { const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 901435dea..0398ca61d 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -1,7 +1,7 @@ import { Server as HTTPServer } from 'http' import { Namespace, Server as SocketServer, Socket } from 'socket.io' import { isIdValid } from '@server/helpers/custom-validators/misc' -import { MVideo } from '@server/types/models' +import { MVideo, MVideoImmutable } from '@server/types/models' import { UserNotificationModelForApi } from '@server/types/models/user' import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { logger } from '../helpers/logger' @@ -78,11 +78,11 @@ class PeerTubeSocket { .emit(type, data) } - sendVideoViewsUpdate (video: MVideo) { - const data: LiveVideoEventPayload = { views: video.views } + sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { + const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } const type: LiveVideoEventType = 'views-change' - logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) + logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) this.liveVideosNamespace .in(video.id) diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 46617b07e..76b7868e8 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -13,6 +13,7 @@ import { RESUMABLE_UPLOAD_SESSION_LIFETIME } from '../initializers/constants' import { CONFIG } from '../initializers/config' +import { exists } from '@server/helpers/custom-validators/misc' type CachedRoute = { body: string @@ -119,16 +120,20 @@ class Redis { /* ************ Views per IP ************ */ - setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { - const lifetime = isLive - ? VIEW_LIFETIME.LIVE - : VIEW_LIFETIME.VIDEO + setIPVideoView (ip: string, videoUUID: string) { + return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW) + } - return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) + setIPVideoViewer (ip: string, videoUUID: string) { + return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER) } async doesVideoIPViewExist (ip: string, videoUUID: string) { - return this.exists(this.generateViewKey(ip, videoUUID)) + return this.exists(this.generateIPViewKey(ip, videoUUID)) + } + + async doesVideoIPViewerExist (ip: string, videoUUID: string) { + return this.exists(this.generateIPViewerKey(ip, videoUUID)) } /* ************ Tracker IP block ************ */ @@ -160,46 +165,85 @@ class Redis { return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) } - /* ************ Video views ************ */ + /* ************ Video views stats ************ */ - addVideoView (videoId: number) { - const keyIncr = this.generateVideoViewKey(videoId) - const keySet = this.generateVideosViewKey() + addVideoViewStats (videoId: number) { + const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId }) return Promise.all([ - this.addToSet(keySet, videoId.toString()), - this.increment(keyIncr) + this.addToSet(setKey, videoId.toString()), + this.increment(videoKey) ]) } - async getVideoViews (videoId: number, hour: number) { - const key = this.generateVideoViewKey(videoId, hour) + async getVideoViewsStats (videoId: number, hour: number) { + const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) - const valueString = await this.getValue(key) + const valueString = await this.getValue(videoKey) const valueInt = parseInt(valueString, 10) if (isNaN(valueInt)) { - logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) + logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) return undefined } return valueInt } - async getVideosIdViewed (hour: number) { - const key = this.generateVideosViewKey(hour) + async listVideosViewedForStats (hour: number) { + const { setKey } = this.generateVideoViewStatsKeys({ hour }) - const stringIds = await this.getSet(key) + const stringIds = await this.getSet(setKey) return stringIds.map(s => parseInt(s, 10)) } - deleteVideoViews (videoId: number, hour: number) { - const keySet = this.generateVideosViewKey(hour) - const keyIncr = this.generateVideoViewKey(videoId, hour) + deleteVideoViewsStats (videoId: number, hour: number) { + const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) + + return Promise.all([ + this.deleteFromSet(setKey, videoId.toString()), + this.deleteKey(videoKey) + ]) + } + + /* ************ Local video views buffer ************ */ + + addLocalVideoView (videoId: number) { + const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId) return Promise.all([ - this.deleteFromSet(keySet, videoId.toString()), - this.deleteKey(keyIncr) + this.addToSet(setKey, videoId.toString()), + this.increment(videoKey) + ]) + } + + async getLocalVideoViews (videoId: number) { + const { videoKey } = this.generateLocalVideoViewsKeys(videoId) + + const valueString = await this.getValue(videoKey) + const valueInt = parseInt(valueString, 10) + + if (isNaN(valueInt)) { + logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString) + return undefined + } + + return valueInt + } + + async listLocalVideosViewed () { + const { setKey } = this.generateLocalVideoViewsKeys() + + const stringIds = await this.getSet(setKey) + return stringIds.map(s => parseInt(s, 10)) + } + + deleteLocalVideoViews (videoId: number) { + const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId) + + return Promise.all([ + this.deleteFromSet(setKey, videoId.toString()), + this.deleteKey(videoKey) ]) } @@ -233,16 +277,16 @@ class Redis { return req.method + '-' + req.originalUrl } - private generateVideosViewKey (hour?: number) { - if (!hour) hour = new Date().getHours() - - return `videos-view-h${hour}` + private generateLocalVideoViewsKeys (videoId?: Number) { + return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } } - private generateVideoViewKey (videoId: number, hour?: number) { - if (hour === undefined || hour === null) hour = new Date().getHours() + private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { + const hour = exists(options.hour) + ? options.hour + : new Date().getHours() - return `video-view-${videoId}-h${hour}` + return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` } } private generateResetPasswordKey (userId: number) { @@ -253,10 +297,14 @@ class Redis { return 'verify-email-' + userId } - private generateViewKey (ip: string, videoUUID: string) { + private generateIPViewKey (ip: string, videoUUID: string) { return `views-${videoUUID}-${ip}` } + private generateIPViewerKey (ip: string, videoUUID: string) { + return `viewer-${videoUUID}-${ip}` + } + private generateTrackerBlockIPKey (ip: string) { return `tracker-block-ip-${ip}` } diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts new file mode 100644 index 000000000..c0e72c461 --- /dev/null +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts @@ -0,0 +1,52 @@ +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { VideoModel } from '@server/models/video/video' +import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { federateVideoIfNeeded } from '../activitypub/videos' +import { Redis } from '../redis' +import { AbstractScheduler } from './abstract-scheduler' + +const lTags = loggerTagsFactory('views') + +export class VideoViewsBufferScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE + + private constructor () { + super() + } + + protected async internalExecute () { + const videoIds = await Redis.Instance.listLocalVideosViewed() + if (videoIds.length === 0) return + + logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) + + for (const videoId of videoIds) { + try { + const views = await Redis.Instance.getLocalVideoViews(videoId) + await Redis.Instance.deleteLocalVideoViews(videoId) + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + if (!video) { + logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) + continue + } + + // If this is a remote video, the origin instance will send us an update + await VideoModel.incrementViews(videoId, views) + + // Send video update + video.views += views + await federateVideoIfNeeded(video, false) + } catch (err) { + logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) + } + } + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts new file mode 100644 index 000000000..220b509c2 --- /dev/null +++ b/server/lib/video-views.ts @@ -0,0 +1,130 @@ +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { VIEW_LIFETIME } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' +import { MVideo } from '@server/types/models' +import { PeerTubeSocket } from './peertube-socket' +import { Redis } from './redis' + +const lTags = loggerTagsFactory('views') + +export class VideoViews { + + // Values are Date().getTime() + private readonly viewersPerVideo = new Map() + + private static instance: VideoViews + + private constructor () { + } + + init () { + setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) + } + + async processView (options: { + video: MVideo + ip: string | null + viewerExpires?: Date + }) { + const { video, ip, viewerExpires } = options + + logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) + + let success = await this.addView(video, ip) + + if (video.isLive) { + const successViewer = await this.addViewer(video, ip, viewerExpires) + success ||= successViewer + } + + return success + } + + getViewers (video: MVideo) { + const viewers = this.viewersPerVideo.get(video.id) + if (!viewers) return 0 + + return viewers.length + } + + buildViewerExpireTime () { + return new Date().getTime() + VIEW_LIFETIME.VIEWER + } + + private async addView (video: MVideo, ip: string | null) { + const promises: Promise[] = [] + + if (ip !== null) { + const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) + if (viewExists) return false + + promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) + } + + if (video.isOwned()) { + promises.push(Redis.Instance.addLocalVideoView(video.id)) + } + + promises.push(Redis.Instance.addVideoViewStats(video.id)) + + await Promise.all(promises) + + return true + } + + private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { + if (ip !== null) { + const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) + if (viewExists) return false + + await Redis.Instance.setIPVideoViewer(ip, video.uuid) + } + + let watchers = this.viewersPerVideo.get(video.id) + + if (!watchers) { + watchers = [] + this.viewersPerVideo.set(video.id, watchers) + } + + const expiration = viewerExpires + ? viewerExpires.getTime() + : this.buildViewerExpireTime() + + watchers.push(expiration) + await this.notifyClients(video.id, watchers.length) + + return true + } + + private async cleanViewers () { + logger.info('Cleaning video viewers.', lTags()) + + for (const videoId of this.viewersPerVideo.keys()) { + const notBefore = new Date().getTime() + + const viewers = this.viewersPerVideo.get(videoId) + + // Only keep not expired viewers + const newViewers = viewers.filter(w => w > notBefore) + + if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) + else this.viewersPerVideo.set(videoId, newViewers) + + await this.notifyClients(videoId, newViewers.length) + } + } + + private async notifyClients (videoId: string | number, viewersLength: number) { + const video = await VideoModel.loadImmutableAttributes(videoId) + if (!video) return + + PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) + + logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} -- cgit v1.2.3