From ac907dc7c158056e9b6a5cb58acd27df5c7c2670 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 6 Apr 2022 08:50:43 +0200 Subject: Improve viewer counter More precise, avoid weird decrease, reuse an id to federate viewers --- server/lib/views/shared/video-viewer-stats.ts | 185 ++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 server/lib/views/shared/video-viewer-stats.ts (limited to 'server/lib/views/shared/video-viewer-stats.ts') diff --git a/server/lib/views/shared/video-viewer-stats.ts b/server/lib/views/shared/video-viewer-stats.ts new file mode 100644 index 000000000..fd66fd5c7 --- /dev/null +++ b/server/lib/views/shared/video-viewer-stats.ts @@ -0,0 +1,185 @@ +import { Transaction } from 'sequelize/types' +import { isTestInstance } from '@server/helpers/core-utils' +import { GeoIP } from '@server/helpers/geo-ip' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { sendCreateWatchAction } from '@server/lib/activitypub/send' +import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' +import { Redis } from '@server/lib/redis' +import { VideoModel } from '@server/models/video/video' +import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' +import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' +import { MVideo } from '@server/types/models' +import { VideoViewEvent } from '@shared/models' + +const lTags = loggerTagsFactory('views') + +type LocalViewerStats = { + firstUpdated: number // Date.getTime() + lastUpdated: number // Date.getTime() + + watchSections: { + start: number + end: number + }[] + + watchTime: number + + country: string + + videoId: number +} + +export class VideoViewerStats { + private processingViewersStats = false + + constructor () { + setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) + } + + // --------------------------------------------------------------------------- + + async addLocalViewer (options: { + video: MVideo + currentTime: number + ip: string + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + + logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) + + return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) + } + + // --------------------------------------------------------------------------- + + async getWatchTime (videoId: number, ip: string) { + const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) + + return stats?.watchTime || 0 + } + + // --------------------------------------------------------------------------- + + private async updateLocalViewerStats (options: { + video: MVideo + ip: string + currentTime: number + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + const nowMs = new Date().getTime() + + let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) + + if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { + logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) + return + } + + if (!stats) { + const country = await GeoIP.Instance.safeCountryISOLookup(ip) + + stats = { + firstUpdated: nowMs, + lastUpdated: nowMs, + + watchSections: [], + + watchTime: 0, + + country, + videoId: video.id + } + } + + stats.lastUpdated = nowMs + + if (viewEvent === 'seek' || stats.watchSections.length === 0) { + stats.watchSections.push({ + start: currentTime, + end: currentTime + }) + } else { + const lastSection = stats.watchSections[stats.watchSections.length - 1] + lastSection.end = currentTime + } + + stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) + + logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) + + await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) + } + + async processViewerStats () { + if (this.processingViewersStats) return + this.processingViewersStats = true + + if (!isTestInstance()) logger.info('Processing viewer statistics.', lTags()) + + const now = new Date().getTime() + + try { + const allKeys = await Redis.Instance.listLocalVideoViewerKeys() + + for (const key of allKeys) { + const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) + + // Process expired stats + if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { + continue + } + + try { + await sequelizeTypescript.transaction(async t => { + const video = await VideoModel.load(stats.videoId, t) + + const statsModel = await this.saveViewerStats(video, stats, t) + + if (video.remote) { + await sendCreateWatchAction(statsModel, t) + } + }) + + await Redis.Instance.deleteLocalVideoViewersKeys(key) + } catch (err) { + logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) + } + } + } catch (err) { + logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) + } + + this.processingViewersStats = false + } + + private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { + const statsModel = new LocalVideoViewerModel({ + startDate: new Date(stats.firstUpdated), + endDate: new Date(stats.lastUpdated), + watchTime: stats.watchTime, + country: stats.country, + videoId: video.id + }) + + statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) + statsModel.Video = video as VideoModel + + await statsModel.save({ transaction }) + + statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ + localVideoViewerId: statsModel.id, + watchSections: stats.watchSections, + transaction + }) + + return statsModel + } + + private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { + return sections.reduce((p, current) => p + (current.end - current.start), 0) + } +} -- cgit v1.2.3