1 import { Transaction } from 'sequelize/types'
2 import { isTestOrDevInstance } from '@server/helpers/core-utils'
3 import { GeoIP } from '@server/helpers/geo-ip'
4 import { logger, loggerTagsFactory } from '@server/helpers/logger'
5 import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants'
6 import { sequelizeTypescript } from '@server/initializers/database'
7 import { sendCreateWatchAction } from '@server/lib/activitypub/send'
8 import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url'
9 import { Redis } from '@server/lib/redis'
10 import { VideoModel } from '@server/models/video/video'
11 import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
12 import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section'
13 import { MVideo, MVideoImmutable } from '@server/types/models'
14 import { VideoViewEvent } from '@shared/models'
16 const lTags = loggerTagsFactory('views')
18 type LocalViewerStats = {
19 firstUpdated: number // Date.getTime()
20 lastUpdated: number // Date.getTime()
34 export class VideoViewerStats {
35 private processingViewersStats = false
38 setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
41 // ---------------------------------------------------------------------------
43 async addLocalViewer (options: {
44 video: MVideoImmutable
47 viewEvent?: VideoViewEvent
49 const { video, ip, viewEvent, currentTime } = options
51 logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
53 return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
56 // ---------------------------------------------------------------------------
58 async getWatchTime (videoId: number, ip: string) {
59 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
61 return stats?.watchTime || 0
64 // ---------------------------------------------------------------------------
66 private async updateLocalViewerStats (options: {
67 video: MVideoImmutable
70 viewEvent?: VideoViewEvent
72 const { video, ip, viewEvent, currentTime } = options
73 const nowMs = new Date().getTime()
75 let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id })
77 if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) {
78 logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) })
83 const country = await GeoIP.Instance.safeCountryISOLookup(ip)
98 stats.lastUpdated = nowMs
100 if (viewEvent === 'seek' || stats.watchSections.length === 0) {
101 stats.watchSections.push({
106 const lastSection = stats.watchSections[stats.watchSections.length - 1]
108 if (lastSection.start > currentTime) {
109 logger.warn('Invalid end watch section %d. Last start record was at %d.', currentTime, lastSection.start)
111 lastSection.end = currentTime
115 stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections)
117 logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
119 await Redis.Instance.setLocalVideoViewer(ip, video.id, stats)
122 async processViewerStats () {
123 if (this.processingViewersStats) return
124 this.processingViewersStats = true
126 if (!isTestOrDevInstance()) logger.info('Processing viewer statistics.', lTags())
128 const now = new Date().getTime()
131 const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
133 for (const key of allKeys) {
134 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key })
136 // Process expired stats
137 if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) {
142 await sequelizeTypescript.transaction(async t => {
143 const video = await VideoModel.load(stats.videoId, t)
146 const statsModel = await this.saveViewerStats(video, stats, t)
149 await sendCreateWatchAction(statsModel, t)
153 await Redis.Instance.deleteLocalVideoViewersKeys(key)
155 logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() })
159 logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
162 this.processingViewersStats = false
165 private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {
166 const statsModel = new LocalVideoViewerModel({
167 startDate: new Date(stats.firstUpdated),
168 endDate: new Date(stats.lastUpdated),
169 watchTime: stats.watchTime,
170 country: stats.country,
174 statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel)
175 statsModel.Video = video as VideoModel
177 await statsModel.save({ transaction })
179 statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({
180 localVideoViewerId: statsModel.id,
181 watchSections: stats.watchSections,
188 private buildWatchTimeFromSections (sections: { start: number, end: number }[]) {
189 return sections.reduce((p, current) => p + (current.end - current.start), 0)