From ac907dc7c158056e9b6a5cb58acd27df5c7c2670 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 6 Apr 2022 08:50:43 +0200 Subject: Improve viewer counter More precise, avoid weird decrease, reuse an id to federate viewers --- server/controllers/api/server/debug.ts | 2 +- server/controllers/api/videos/view.ts | 10 +- server/initializers/constants.ts | 2 +- server/lib/activitypub/process/process-view.ts | 2 +- server/lib/activitypub/send/send-view.ts | 5 +- server/lib/activitypub/url.ts | 4 +- server/lib/redis.ts | 12 - server/lib/views/shared/index.ts | 3 +- server/lib/views/shared/video-viewer-counters.ts | 176 +++++++++++++ server/lib/views/shared/video-viewer-stats.ts | 185 ++++++++++++++ server/lib/views/shared/video-viewers.ts | 277 --------------------- server/lib/views/shared/video-views.ts | 7 + server/lib/views/video-views-manager.ts | 29 ++- server/tests/api/check-params/views.ts | 2 +- server/tests/api/views/video-views-counter.ts | 9 +- .../tests/api/views/video-views-overall-stats.ts | 6 +- .../tests/api/views/video-views-retention-stats.ts | 2 +- .../tests/api/views/video-views-timeserie-stats.ts | 4 +- 18 files changed, 408 insertions(+), 329 deletions(-) create mode 100644 server/lib/views/shared/video-viewer-counters.ts create mode 100644 server/lib/views/shared/video-viewer-stats.ts delete mode 100644 server/lib/views/shared/video-viewers.ts diff --git a/server/controllers/api/server/debug.ts b/server/controllers/api/server/debug.ts index 6b6ff027c..e09510dc3 100644 --- a/server/controllers/api/server/debug.ts +++ b/server/controllers/api/server/debug.ts @@ -43,7 +43,7 @@ async function runCommand (req: express.Request, res: express.Response) { const processors: { [id in SendDebugCommand['command']]: () => Promise } = { 'remove-dandling-resumable-uploads': () => RemoveDanglingResumableUploadsScheduler.Instance.execute(), 'process-video-views-buffer': () => VideoViewsBufferScheduler.Instance.execute(), - 'process-video-viewers': () => VideoViewsManager.Instance.processViewers() + 'process-video-viewers': () => VideoViewsManager.Instance.processViewerStats() } await processors[body.command]() diff --git a/server/controllers/api/videos/view.ts b/server/controllers/api/videos/view.ts index e28cf371a..db1091f2d 100644 --- a/server/controllers/api/videos/view.ts +++ b/server/controllers/api/videos/view.ts @@ -1,8 +1,6 @@ import express from 'express' -import { sendView } from '@server/lib/activitypub/send/send-view' import { Hooks } from '@server/lib/plugins/hooks' import { VideoViewsManager } from '@server/lib/views/video-views-manager' -import { getServerActor } from '@server/models/application/application' import { MVideoId } from '@server/types/models' import { HttpStatusCode, VideoView } from '@shared/models' import { asyncMiddleware, methodsValidator, openapiOperationDoc, optionalAuthenticate, videoViewValidator } from '../../../middlewares' @@ -33,7 +31,7 @@ async function viewVideo (req: express.Request, res: express.Response) { const body = req.body as VideoView const ip = req.ip - const { successView, successViewer } = await VideoViewsManager.Instance.processLocalView({ + const { successView } = await VideoViewsManager.Instance.processLocalView({ video, ip, currentTime: body.currentTime, @@ -41,15 +39,9 @@ async function viewVideo (req: express.Request, res: express.Response) { }) if (successView) { - await sendView({ byActor: await getServerActor(), video, type: 'view' }) - Hooks.runAction('action:api.video.viewed', { video: video, ip, req, res }) } - if (successViewer) { - await sendView({ byActor: await getServerActor(), video, type: 'viewer' }) - } - await updateUserHistoryIfNeeded(body, video, res) return res.status(HttpStatusCode.NO_CONTENT_204).end() diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 9afbc5aea..a4d8d8fe7 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -367,7 +367,7 @@ const CONSTRAINTS_FIELDS = { const VIEW_LIFETIME = { VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION, - VIEWER_COUNTER: 60000 * 5, // 5 minutes + VIEWER_COUNTER: 60000 * 1, // 1 minute VIEWER_STATS: 60000 * 60 // 1 hour } diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index bad079843..e49506d82 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts @@ -32,7 +32,7 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu ? new Date(activity.expires) : undefined - await VideoViewsManager.Instance.processRemoteView({ video, viewerExpires }) + await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires }) if (video.isOwned()) { // Forward the view but don't resend the activity to the sender diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 1088bf258..25a20ec6d 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts @@ -13,14 +13,15 @@ async function sendView (options: { byActor: MActorLight type: ViewType video: MVideoImmutable + viewerIdentifier: string transaction?: Transaction }) { - const { byActor, type, video, transaction } = options + const { byActor, type, video, viewerIdentifier, transaction } = options logger.info('Creating job to send %s of %s.', type, video.url) const activityBuilder = (audience: ActivityAudience) => { - const url = getLocalVideoViewActivityPubUrl(byActor, video) + const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier) return buildViewActivity({ url, byActor, video, audience, type }) } diff --git a/server/lib/activitypub/url.ts b/server/lib/activitypub/url.ts index 8443fef4c..2f68f7a17 100644 --- a/server/lib/activitypub/url.ts +++ b/server/lib/activitypub/url.ts @@ -56,8 +56,8 @@ function getLocalAbuseActivityPubUrl (abuse: MAbuseId) { return WEBSERVER.URL + '/admin/abuses/' + abuse.id } -function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId) { - return byActor.url + '/views/videos/' + video.id + '/' + new Date().toISOString() +function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId, viewerIdentifier: string) { + return byActor.url + '/views/videos/' + video.id + '/' + viewerIdentifier } function getLocalVideoViewerActivityPubUrl (stats: MLocalVideoViewer) { diff --git a/server/lib/redis.ts b/server/lib/redis.ts index f9cea57cd..d052de786 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -145,18 +145,10 @@ class Redis { return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW) } - setIPVideoViewer (ip: string, videoUUID: string) { - return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER_COUNTER) - } - async doesVideoIPViewExist (ip: string, videoUUID: string) { return this.exists(this.generateIPViewKey(ip, videoUUID)) } - async doesVideoIPViewerExist (ip: string, videoUUID: string) { - return this.exists(this.generateIPViewerKey(ip, videoUUID)) - } - /* ************ Tracker IP block ************ */ setTrackerBlockIP (ip: string) { @@ -361,10 +353,6 @@ class Redis { return `views-${videoUUID}-${ip}` } - private generateIPViewerKey (ip: string, videoUUID: string) { - return `viewer-${videoUUID}-${ip}` - } - private generateTrackerBlockIPKey (ip: string) { return `tracker-block-ip-${ip}` } diff --git a/server/lib/views/shared/index.ts b/server/lib/views/shared/index.ts index dd510f4e2..139471183 100644 --- a/server/lib/views/shared/index.ts +++ b/server/lib/views/shared/index.ts @@ -1,2 +1,3 @@ -export * from './video-viewers' +export * from './video-viewer-counters' +export * from './video-viewer-stats' export * from './video-views' diff --git a/server/lib/views/shared/video-viewer-counters.ts b/server/lib/views/shared/video-viewer-counters.ts new file mode 100644 index 000000000..941b62ed7 --- /dev/null +++ b/server/lib/views/shared/video-viewer-counters.ts @@ -0,0 +1,176 @@ + +import { isTestInstance } from '@server/helpers/core-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { VIEW_LIFETIME } from '@server/initializers/constants' +import { sendView } from '@server/lib/activitypub/send/send-view' +import { PeerTubeSocket } from '@server/lib/peertube-socket' +import { getServerActor } from '@server/models/application/application' +import { VideoModel } from '@server/models/video/video' +import { MVideo } from '@server/types/models' +import { buildUUID, sha256 } from '@shared/extra-utils' + +const lTags = loggerTagsFactory('views') + +type Viewer = { + expires: number + id: string + lastFederation?: number +} + +export class VideoViewerCounters { + + // expires is new Date().getTime() + private readonly viewersPerVideo = new Map() + private readonly idToViewer = new Map() + + private readonly salt = buildUUID() + + private processingViewerCounters = false + + constructor () { + setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER) + } + + // --------------------------------------------------------------------------- + + async addLocalViewer (options: { + video: MVideo + ip: string + }) { + const { video, ip } = options + + logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) }) + + const viewerId = this.generateViewerId(ip, video.uuid) + const viewer = this.idToViewer.get(viewerId) + + if (viewer) { + viewer.expires = this.buildViewerExpireTime() + await this.federateViewerIfNeeded(video, viewer) + + return false + } + + const newViewer = await this.addViewerToVideo({ viewerId, video }) + await this.federateViewerIfNeeded(video, newViewer) + + return true + } + + async addRemoteViewer (options: { + video: MVideo + viewerId: string + viewerExpires: Date + }) { + const { video, viewerExpires, viewerId } = options + + logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) + + await this.addViewerToVideo({ video, viewerExpires, viewerId }) + + return true + } + + // --------------------------------------------------------------------------- + + getViewers (video: MVideo) { + const viewers = this.viewersPerVideo.get(video.id) + if (!viewers) return 0 + + return viewers.length + } + + buildViewerExpireTime () { + return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER + } + + // --------------------------------------------------------------------------- + + private async addViewerToVideo (options: { + video: MVideo + viewerId: string + viewerExpires?: Date + }) { + const { video, viewerExpires, viewerId } = options + + let watchers = this.viewersPerVideo.get(video.id) + + if (!watchers) { + watchers = [] + this.viewersPerVideo.set(video.id, watchers) + } + + const expires = viewerExpires + ? viewerExpires.getTime() + : this.buildViewerExpireTime() + + const viewer = { id: viewerId, expires } + watchers.push(viewer) + + this.idToViewer.set(viewerId, viewer) + + await this.notifyClients(video.id, watchers.length) + + return viewer + } + + private async cleanViewerCounters () { + if (this.processingViewerCounters) return + this.processingViewerCounters = true + + if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) + + try { + for (const videoId of this.viewersPerVideo.keys()) { + const notBefore = new Date().getTime() + + const viewers = this.viewersPerVideo.get(videoId) + + // Only keep not expired viewers + const newViewers: Viewer[] = [] + + // Filter new viewers + for (const viewer of viewers) { + if (viewer.expires > notBefore) { + newViewers.push(viewer) + } else { + this.idToViewer.delete(viewer.id) + } + } + + if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) + else this.viewersPerVideo.set(videoId, newViewers) + + await this.notifyClients(videoId, newViewers.length) + } + } catch (err) { + logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) + } + + this.processingViewerCounters = false + } + + private async notifyClients (videoId: string | number, viewersLength: number) { + const video = await VideoModel.loadImmutableAttributes(videoId) + if (!video) return + + PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) + + logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) + } + + private generateViewerId (ip: string, videoUUID: string) { + return sha256(this.salt + '-' + ip + '-' + videoUUID) + } + + private async federateViewerIfNeeded (video: MVideo, viewer: Viewer) { + // Federate the viewer if it's been a "long" time we did not + const now = new Date().getTime() + const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER / 2) + + if (viewer.lastFederation && viewer.lastFederation > federationLimit) return + + await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id }) + viewer.lastFederation = now + } +} diff --git a/server/lib/views/shared/video-viewer-stats.ts b/server/lib/views/shared/video-viewer-stats.ts new file mode 100644 index 000000000..fd66fd5c7 --- /dev/null +++ b/server/lib/views/shared/video-viewer-stats.ts @@ -0,0 +1,185 @@ +import { Transaction } from 'sequelize/types' +import { isTestInstance } from '@server/helpers/core-utils' +import { GeoIP } from '@server/helpers/geo-ip' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { sendCreateWatchAction } from '@server/lib/activitypub/send' +import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' +import { Redis } from '@server/lib/redis' +import { VideoModel } from '@server/models/video/video' +import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' +import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' +import { MVideo } from '@server/types/models' +import { VideoViewEvent } from '@shared/models' + +const lTags = loggerTagsFactory('views') + +type LocalViewerStats = { + firstUpdated: number // Date.getTime() + lastUpdated: number // Date.getTime() + + watchSections: { + start: number + end: number + }[] + + watchTime: number + + country: string + + videoId: number +} + +export class VideoViewerStats { + private processingViewersStats = false + + constructor () { + setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) + } + + // --------------------------------------------------------------------------- + + async addLocalViewer (options: { + video: MVideo + currentTime: number + ip: string + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + + logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) + + return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) + } + + // --------------------------------------------------------------------------- + + async getWatchTime (videoId: number, ip: string) { + const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) + + return stats?.watchTime || 0 + } + + // --------------------------------------------------------------------------- + + private async updateLocalViewerStats (options: { + video: MVideo + ip: string + currentTime: number + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + const nowMs = new Date().getTime() + + let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) + + if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { + logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) + return + } + + if (!stats) { + const country = await GeoIP.Instance.safeCountryISOLookup(ip) + + stats = { + firstUpdated: nowMs, + lastUpdated: nowMs, + + watchSections: [], + + watchTime: 0, + + country, + videoId: video.id + } + } + + stats.lastUpdated = nowMs + + if (viewEvent === 'seek' || stats.watchSections.length === 0) { + stats.watchSections.push({ + start: currentTime, + end: currentTime + }) + } else { + const lastSection = stats.watchSections[stats.watchSections.length - 1] + lastSection.end = currentTime + } + + stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) + + logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) + + await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) + } + + async processViewerStats () { + if (this.processingViewersStats) return + this.processingViewersStats = true + + if (!isTestInstance()) logger.info('Processing viewer statistics.', lTags()) + + const now = new Date().getTime() + + try { + const allKeys = await Redis.Instance.listLocalVideoViewerKeys() + + for (const key of allKeys) { + const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) + + // Process expired stats + if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { + continue + } + + try { + await sequelizeTypescript.transaction(async t => { + const video = await VideoModel.load(stats.videoId, t) + + const statsModel = await this.saveViewerStats(video, stats, t) + + if (video.remote) { + await sendCreateWatchAction(statsModel, t) + } + }) + + await Redis.Instance.deleteLocalVideoViewersKeys(key) + } catch (err) { + logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) + } + } + } catch (err) { + logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) + } + + this.processingViewersStats = false + } + + private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { + const statsModel = new LocalVideoViewerModel({ + startDate: new Date(stats.firstUpdated), + endDate: new Date(stats.lastUpdated), + watchTime: stats.watchTime, + country: stats.country, + videoId: video.id + }) + + statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) + statsModel.Video = video as VideoModel + + await statsModel.save({ transaction }) + + statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ + localVideoViewerId: statsModel.id, + watchSections: stats.watchSections, + transaction + }) + + return statsModel + } + + private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { + return sections.reduce((p, current) => p + (current.end - current.start), 0) + } +} diff --git a/server/lib/views/shared/video-viewers.ts b/server/lib/views/shared/video-viewers.ts deleted file mode 100644 index 4dad1f0e8..000000000 --- a/server/lib/views/shared/video-viewers.ts +++ /dev/null @@ -1,277 +0,0 @@ -import { Transaction } from 'sequelize/types' -import { isTestInstance } from '@server/helpers/core-utils' -import { GeoIP } from '@server/helpers/geo-ip' -import { logger, loggerTagsFactory } from '@server/helpers/logger' -import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' -import { sequelizeTypescript } from '@server/initializers/database' -import { sendCreateWatchAction } from '@server/lib/activitypub/send' -import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' -import { PeerTubeSocket } from '@server/lib/peertube-socket' -import { Redis } from '@server/lib/redis' -import { VideoModel } from '@server/models/video/video' -import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' -import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' -import { MVideo } from '@server/types/models' -import { VideoViewEvent } from '@shared/models' - -const lTags = loggerTagsFactory('views') - -type LocalViewerStats = { - firstUpdated: number // Date.getTime() - lastUpdated: number // Date.getTime() - - watchSections: { - start: number - end: number - }[] - - watchTime: number - - country: string - - videoId: number -} - -export class VideoViewers { - - // Values are Date().getTime() - private readonly viewersPerVideo = new Map() - - private processingViewerCounters = false - private processingViewerStats = false - - constructor () { - setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER) - - setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) - } - - // --------------------------------------------------------------------------- - - getViewers (video: MVideo) { - const viewers = this.viewersPerVideo.get(video.id) - if (!viewers) return 0 - - return viewers.length - } - - buildViewerExpireTime () { - return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER - } - - async getWatchTime (videoId: number, ip: string) { - const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) - - return stats?.watchTime || 0 - } - - async addLocalViewer (options: { - video: MVideo - currentTime: number - ip: string - viewEvent?: VideoViewEvent - }) { - const { video, ip, viewEvent, currentTime } = options - - logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) - - await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) - - const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) - if (viewExists) return false - - await Redis.Instance.setIPVideoViewer(ip, video.uuid) - - return this.addViewerToVideo({ video }) - } - - async addRemoteViewer (options: { - video: MVideo - viewerExpires: Date - }) { - const { video, viewerExpires } = options - - logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) - - return this.addViewerToVideo({ video, viewerExpires }) - } - - private async addViewerToVideo (options: { - video: MVideo - viewerExpires?: Date - }) { - const { video, viewerExpires } = options - - let watchers = this.viewersPerVideo.get(video.id) - - if (!watchers) { - watchers = [] - this.viewersPerVideo.set(video.id, watchers) - } - - const expiration = viewerExpires - ? viewerExpires.getTime() - : this.buildViewerExpireTime() - - watchers.push(expiration) - await this.notifyClients(video.id, watchers.length) - - return true - } - - private async updateLocalViewerStats (options: { - video: MVideo - ip: string - currentTime: number - viewEvent?: VideoViewEvent - }) { - const { video, ip, viewEvent, currentTime } = options - const nowMs = new Date().getTime() - - let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) - - if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { - logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) - return - } - - if (!stats) { - const country = await GeoIP.Instance.safeCountryISOLookup(ip) - - stats = { - firstUpdated: nowMs, - lastUpdated: nowMs, - - watchSections: [], - - watchTime: 0, - - country, - videoId: video.id - } - } - - stats.lastUpdated = nowMs - - if (viewEvent === 'seek' || stats.watchSections.length === 0) { - stats.watchSections.push({ - start: currentTime, - end: currentTime - }) - } else { - const lastSection = stats.watchSections[stats.watchSections.length - 1] - lastSection.end = currentTime - } - - stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) - - logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) - - await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) - } - - private async cleanViewerCounters () { - if (this.processingViewerCounters) return - this.processingViewerCounters = true - - if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) - - try { - for (const videoId of this.viewersPerVideo.keys()) { - const notBefore = new Date().getTime() - - const viewers = this.viewersPerVideo.get(videoId) - - // Only keep not expired viewers - const newViewers = viewers.filter(w => w > notBefore) - - if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) - else this.viewersPerVideo.set(videoId, newViewers) - - await this.notifyClients(videoId, newViewers.length) - } - } catch (err) { - logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) - } - - this.processingViewerCounters = false - } - - private async notifyClients (videoId: string | number, viewersLength: number) { - const video = await VideoModel.loadImmutableAttributes(videoId) - if (!video) return - - PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) - - logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) - } - - async processViewerStats () { - if (this.processingViewerStats) return - this.processingViewerStats = true - - if (!isTestInstance()) logger.info('Processing viewer statistics.', lTags()) - - const now = new Date().getTime() - - try { - const allKeys = await Redis.Instance.listLocalVideoViewerKeys() - - for (const key of allKeys) { - const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) - - // Process expired stats - if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { - continue - } - - try { - await sequelizeTypescript.transaction(async t => { - const video = await VideoModel.load(stats.videoId, t) - - const statsModel = await this.saveViewerStats(video, stats, t) - - if (video.remote) { - await sendCreateWatchAction(statsModel, t) - } - }) - - await Redis.Instance.deleteLocalVideoViewersKeys(key) - } catch (err) { - logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) - } - } - } catch (err) { - logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) - } - - this.processingViewerStats = false - } - - private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { - const statsModel = new LocalVideoViewerModel({ - startDate: new Date(stats.firstUpdated), - endDate: new Date(stats.lastUpdated), - watchTime: stats.watchTime, - country: stats.country, - videoId: video.id - }) - - statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) - statsModel.Video = video as VideoModel - - await statsModel.save({ transaction }) - - statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ - localVideoViewerId: statsModel.id, - watchSections: stats.watchSections, - transaction - }) - - return statsModel - } - - private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { - return sections.reduce((p, current) => p + (current.end - current.start), 0) - } -} diff --git a/server/lib/views/shared/video-views.ts b/server/lib/views/shared/video-views.ts index 19250f993..275f7a014 100644 --- a/server/lib/views/shared/video-views.ts +++ b/server/lib/views/shared/video-views.ts @@ -1,5 +1,8 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { sendView } from '@server/lib/activitypub/send/send-view' +import { getServerActor } from '@server/models/application/application' import { MVideo } from '@server/types/models' +import { buildUUID } from '@shared/extra-utils' import { Redis } from '../../redis' const lTags = loggerTagsFactory('views') @@ -24,6 +27,8 @@ export class VideoViews { await this.addView(video) + await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() }) + return true } @@ -39,6 +44,8 @@ export class VideoViews { return true } + // --------------------------------------------------------------------------- + private async addView (video: MVideo) { const promises: Promise[] = [] diff --git a/server/lib/views/video-views-manager.ts b/server/lib/views/video-views-manager.ts index 9382fb482..ea3b35c6c 100644 --- a/server/lib/views/video-views-manager.ts +++ b/server/lib/views/video-views-manager.ts @@ -1,7 +1,7 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger' import { MVideo } from '@server/types/models' import { VideoViewEvent } from '@shared/models' -import { VideoViewers, VideoViews } from './shared' +import { VideoViewerCounters, VideoViewerStats, VideoViews } from './shared' /** * If processing a local view: @@ -27,14 +27,16 @@ export class VideoViewsManager { private static instance: VideoViewsManager - private videoViewers: VideoViewers + private videoViewerStats: VideoViewerStats + private videoViewerCounters: VideoViewerCounters private videoViews: VideoViews private constructor () { } init () { - this.videoViewers = new VideoViewers() + this.videoViewerStats = new VideoViewerStats() + this.videoViewerCounters = new VideoViewerCounters() this.videoViews = new VideoViews() } @@ -48,10 +50,12 @@ export class VideoViewsManager { logger.debug('Processing local view for %s and ip %s.', video.url, ip, lTags()) - const successViewer = await this.videoViewers.addLocalViewer({ video, ip, viewEvent, currentTime }) + await this.videoViewerStats.addLocalViewer({ video, ip, viewEvent, currentTime }) + + const successViewer = await this.videoViewerCounters.addLocalViewer({ video, ip }) // Do it after added local viewer to fetch updated information - const watchTime = await this.videoViewers.getWatchTime(video.id, ip) + const watchTime = await this.videoViewerStats.getWatchTime(video.id, ip) const successView = await this.videoViews.addLocalView({ video, watchTime, ip }) @@ -60,26 +64,27 @@ export class VideoViewsManager { async processRemoteView (options: { video: MVideo + viewerId: string | null viewerExpires?: Date }) { - const { video, viewerExpires } = options + const { video, viewerId, viewerExpires } = options - logger.debug('Processing remote view for %s.', video.url, { viewerExpires, ...lTags() }) + logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() }) - if (viewerExpires) await this.videoViewers.addRemoteViewer({ video, viewerExpires }) + if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires }) else await this.videoViews.addRemoteView({ video }) } getViewers (video: MVideo) { - return this.videoViewers.getViewers(video) + return this.videoViewerCounters.getViewers(video) } buildViewerExpireTime () { - return this.videoViewers.buildViewerExpireTime() + return this.videoViewerCounters.buildViewerExpireTime() } - processViewers () { - return this.videoViewers.processViewerStats() + processViewerStats () { + return this.videoViewerStats.processViewerStats() } static get Instance () { diff --git a/server/tests/api/check-params/views.ts b/server/tests/api/check-params/views.ts index 185b04af1..ca4752345 100644 --- a/server/tests/api/check-params/views.ts +++ b/server/tests/api/check-params/views.ts @@ -19,7 +19,7 @@ describe('Test videos views', function () { let userAccessToken: string before(async function () { - this.timeout(30000) + this.timeout(120000) servers = await createMultipleServers(2) await setAccessTokensToServers(servers) diff --git a/server/tests/api/views/video-views-counter.ts b/server/tests/api/views/video-views-counter.ts index b68aaa350..b8969d52d 100644 --- a/server/tests/api/views/video-views-counter.ts +++ b/server/tests/api/views/video-views-counter.ts @@ -57,10 +57,11 @@ describe('Test video views/viewers counters', function () { }) it('Should not view again this video with the same IP', async function () { - await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] }) + await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] }) + await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] }) await processViewsBuffer(servers) - await checkCounter('views', videoUUID, 1) + await checkCounter('views', videoUUID, 2) }) it('Should view the video from server 2 and send the event', async function () { @@ -68,7 +69,7 @@ describe('Test video views/viewers counters', function () { await waitJobs(servers) await processViewsBuffer(servers) - await checkCounter('views', videoUUID, 2) + await checkCounter('views', videoUUID, 3) }) }) @@ -78,7 +79,7 @@ describe('Test video views/viewers counters', function () { let command: FfmpegCommand before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true })) }) diff --git a/server/tests/api/views/video-views-overall-stats.ts b/server/tests/api/views/video-views-overall-stats.ts index 22761d6ec..d8de73cbc 100644 --- a/server/tests/api/views/video-views-overall-stats.ts +++ b/server/tests/api/views/video-views-overall-stats.ts @@ -21,7 +21,7 @@ describe('Test views overall stats', function () { let vodVideoId: string before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true })) }) @@ -74,7 +74,7 @@ describe('Test views overall stats', function () { let command: FfmpegCommand before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true })) }) @@ -189,7 +189,7 @@ describe('Test views overall stats', function () { let videoUUID: string before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId: videoUUID } = await prepareViewsVideos({ servers, live: true, vod: true })) }) diff --git a/server/tests/api/views/video-views-retention-stats.ts b/server/tests/api/views/video-views-retention-stats.ts index 98be7bfdb..a27141d68 100644 --- a/server/tests/api/views/video-views-retention-stats.ts +++ b/server/tests/api/views/video-views-retention-stats.ts @@ -20,7 +20,7 @@ describe('Test views retention stats', function () { let vodVideoId: string before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true })) }) diff --git a/server/tests/api/views/video-views-timeserie-stats.ts b/server/tests/api/views/video-views-timeserie-stats.ts index 98c041cdf..858edeff7 100644 --- a/server/tests/api/views/video-views-timeserie-stats.ts +++ b/server/tests/api/views/video-views-timeserie-stats.ts @@ -24,7 +24,7 @@ describe('Test views timeserie stats', function () { let vodVideoId: string before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true })) }) @@ -63,7 +63,7 @@ describe('Test views timeserie stats', function () { } before(async function () { - this.timeout(60000); + this.timeout(120000); ({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true })) }) -- cgit v1.2.3