From b211106695bb82f6c32e53306081b5262c3d109d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 24 Mar 2022 13:36:47 +0100 Subject: Support video views/viewers stats in server * Add "currentTime" and "event" body params to view endpoint * Merge watching and view endpoints * Introduce WatchAction AP activity * Add tables to store viewer information of local videos * Add endpoints to fetch video views/viewers stats of local videos * Refactor views/viewers handlers * Support "views" and "viewers" counters for both VOD and live videos --- server/lib/activitypub/activity.ts | 13 +- server/lib/activitypub/context.ts | 23 +- server/lib/activitypub/local-video-viewer.ts | 42 ++++ server/lib/activitypub/process/process-create.ts | 21 +- server/lib/activitypub/process/process-view.ts | 4 +- server/lib/activitypub/send/send-create.ts | 17 +- server/lib/activitypub/send/send-view.ts | 51 ++-- server/lib/activitypub/url.ts | 6 + .../videos/shared/object-to-model-attributes.ts | 4 +- server/lib/client-html.ts | 2 +- server/lib/job-queue/handlers/video-views-stats.ts | 2 +- server/lib/redis.ts | 68 ++++- server/lib/schedulers/geo-ip-update-scheduler.ts | 22 ++ .../lib/schedulers/remove-old-views-scheduler.ts | 6 +- .../lib/schedulers/video-views-buffer-scheduler.ts | 4 +- server/lib/video-views.ts | 131 ---------- server/lib/views/shared/index.ts | 2 + server/lib/views/shared/video-viewers.ts | 276 +++++++++++++++++++++ server/lib/views/shared/video-views.ts | 60 +++++ server/lib/views/video-views-manager.ts | 70 ++++++ 20 files changed, 658 insertions(+), 166 deletions(-) create mode 100644 server/lib/activitypub/local-video-viewer.ts create mode 100644 server/lib/schedulers/geo-ip-update-scheduler.ts delete mode 100644 server/lib/video-views.ts create mode 100644 server/lib/views/shared/index.ts create mode 100644 server/lib/views/shared/video-viewers.ts create mode 100644 server/lib/views/shared/video-views.ts create mode 100644 server/lib/views/video-views-manager.ts (limited to 'server/lib') diff --git a/server/lib/activitypub/activity.ts b/server/lib/activitypub/activity.ts index cccb7b1c1..e6cec1ba7 100644 --- a/server/lib/activitypub/activity.ts +++ b/server/lib/activitypub/activity.ts @@ -4,6 +4,17 @@ function getAPId (object: string | { id: string }) { return object.id } +function getActivityStreamDuration (duration: number) { + // https://www.w3.org/TR/activitystreams-vocabulary/#dfn-duration + return 'PT' + duration + 'S' +} + +function getDurationFromActivityStream (duration: string) { + return parseInt(duration.replace(/[^\d]+/, '')) +} + export { - getAPId + getAPId, + getActivityStreamDuration, + getDurationFromActivityStream } diff --git a/server/lib/activitypub/context.ts b/server/lib/activitypub/context.ts index 3bc40e2aa..b452cf9b3 100644 --- a/server/lib/activitypub/context.ts +++ b/server/lib/activitypub/context.ts @@ -15,7 +15,7 @@ export { type ContextValue = { [ id: string ]: (string | { '@type': string, '@id': string }) } -const contextStore = { +const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string })[] } = { Video: buildContext({ Hashtag: 'as:Hashtag', uuid: 'sc:identifier', @@ -109,7 +109,8 @@ const contextStore = { stopTimestamp: { '@type': 'sc:Number', '@id': 'pt:stopTimestamp' - } + }, + uuid: 'sc:identifier' }), CacheFile: buildContext({ @@ -128,6 +129,24 @@ const contextStore = { } }), + WatchAction: buildContext({ + WatchAction: 'sc:WatchAction', + startTimestamp: { + '@type': 'sc:Number', + '@id': 'pt:startTimestamp' + }, + stopTimestamp: { + '@type': 'sc:Number', + '@id': 'pt:stopTimestamp' + }, + watchSection: { + '@type': 'sc:Number', + '@id': 'pt:stopTimestamp' + }, + uuid: 'sc:identifier' + }), + + Collection: buildContext(), Follow: buildContext(), Reject: buildContext(), Accept: buildContext(), diff --git a/server/lib/activitypub/local-video-viewer.ts b/server/lib/activitypub/local-video-viewer.ts new file mode 100644 index 000000000..738083adc --- /dev/null +++ b/server/lib/activitypub/local-video-viewer.ts @@ -0,0 +1,42 @@ +import { Transaction } from 'sequelize' +import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' +import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' +import { MVideo } from '@server/types/models' +import { WatchActionObject } from '@shared/models' +import { getDurationFromActivityStream } from './activity' + +async function createOrUpdateLocalVideoViewer (watchAction: WatchActionObject, video: MVideo, t: Transaction) { + const stats = await LocalVideoViewerModel.loadByUrl(watchAction.id) + if (stats) await stats.destroy({ transaction: t }) + + const localVideoViewer = await LocalVideoViewerModel.create({ + url: watchAction.id, + uuid: watchAction.uuid, + + watchTime: getDurationFromActivityStream(watchAction.duration), + + startDate: new Date(watchAction.startTime), + endDate: new Date(watchAction.endTime), + + country: watchAction.location + ? watchAction.location.addressCountry + : null, + + videoId: video.id + }) + + await LocalVideoViewerWatchSectionModel.bulkCreateSections({ + localVideoViewerId: localVideoViewer.id, + + watchSections: watchAction.watchSections.map(s => ({ + start: s.startTimestamp, + end: s.endTimestamp + })) + }) +} + +// --------------------------------------------------------------------------- + +export { + createOrUpdateLocalVideoViewer +} diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index b5b1a0feb..3e7931bb2 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts @@ -1,6 +1,7 @@ import { isBlockedByServerOrAccount } from '@server/lib/blocklist' import { isRedundancyAccepted } from '@server/lib/redundancy' -import { ActivityCreate, CacheFileObject, PlaylistObject, VideoCommentObject, VideoObject } from '@shared/models' +import { VideoModel } from '@server/models/video/video' +import { ActivityCreate, CacheFileObject, PlaylistObject, VideoCommentObject, VideoObject, WatchActionObject } from '@shared/models' import { retryTransactionWrapper } from '../../../helpers/database-utils' import { logger } from '../../../helpers/logger' import { sequelizeTypescript } from '../../../initializers/database' @@ -8,6 +9,7 @@ import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { MActorSignature, MCommentOwnerVideo, MVideoAccountLightBlacklistAllFiles } from '../../../types/models' import { Notifier } from '../../notifier' import { createOrUpdateCacheFile } from '../cache-file' +import { createOrUpdateLocalVideoViewer } from '../local-video-viewer' import { createOrUpdateVideoPlaylist } from '../playlists' import { forwardVideoRelatedActivity } from '../send/shared/send-utils' import { resolveThread } from '../video-comments' @@ -32,6 +34,10 @@ async function processCreateActivity (options: APProcessorOptions { + return createOrUpdateLocalVideoViewer(watchAction, video, t) + }) +} + async function processCreateVideoComment (activity: ActivityCreate, byActor: MActorSignature, notify: boolean) { const commentObject = activity.object as VideoCommentObject const byAccount = byActor.Account diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index c59940164..bad079843 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts @@ -1,4 +1,4 @@ -import { VideoViews } from '@server/lib/video-views' +import { VideoViewsManager } from '@server/lib/views/video-views-manager' import { ActivityView } from '../../../../shared/models/activitypub' import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { MActorSignature } from '../../../types/models' @@ -32,7 +32,7 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu ? new Date(activity.expires) : undefined - await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) + await VideoViewsManager.Instance.processRemoteView({ video, viewerExpires }) if (video.isOwned()) { // Forward the view but don't resend the activity to the sender diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 5d8763495..7c3a6bdd0 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts @@ -6,6 +6,7 @@ import { VideoCommentModel } from '../../../models/video/video-comment' import { MActorLight, MCommentOwnerVideo, + MLocalVideoViewerWithWatchSections, MVideoAccountLight, MVideoAP, MVideoPlaylistFull, @@ -19,6 +20,7 @@ import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getVideoCommentAudience, + sendVideoActivityToOrigin, sendVideoRelatedActivity, unicastTo } from './shared' @@ -61,6 +63,18 @@ async function sendCreateCacheFile ( }) } +async function sendCreateWatchAction (stats: MLocalVideoViewerWithWatchSections, transaction: Transaction) { + logger.info('Creating job to send create watch action %s.', stats.url, lTags(stats.uuid)) + + const byActor = await getServerActor() + + const activityBuilder = (audience: ActivityAudience) => { + return buildCreateActivity(stats.url, byActor, stats.toActivityPubObject(), audience) + } + + return sendVideoActivityToOrigin(activityBuilder, { byActor, video: stats.Video, transaction, contextType: 'WatchAction' }) +} + async function sendCreateVideoPlaylist (playlist: MVideoPlaylistFull, transaction: Transaction) { if (playlist.privacy === VideoPlaylistPrivacy.PRIVATE) return undefined @@ -175,7 +189,8 @@ export { buildCreateActivity, sendCreateVideoComment, sendCreateVideoPlaylist, - sendCreateCacheFile + sendCreateCacheFile, + sendCreateWatchAction } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 1f97307b9..1088bf258 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts @@ -1,27 +1,49 @@ import { Transaction } from 'sequelize' -import { VideoViews } from '@server/lib/video-views' -import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' +import { VideoViewsManager } from '@server/lib/views/video-views-manager' +import { MActorAudience, MActorLight, MVideoImmutable, MVideoUrl } from '@server/types/models' import { ActivityAudience, ActivityView } from '@shared/models' import { logger } from '../../../helpers/logger' -import { ActorModel } from '../../../models/actor/actor' import { audiencify, getAudience } from '../audience' import { getLocalVideoViewActivityPubUrl } from '../url' import { sendVideoRelatedActivity } from './shared/send-utils' -async function sendView (byActor: ActorModel, video: MVideoImmutable, t: Transaction) { - logger.info('Creating job to send view of %s.', video.url) +type ViewType = 'view' | 'viewer' + +async function sendView (options: { + byActor: MActorLight + type: ViewType + video: MVideoImmutable + transaction?: Transaction +}) { + const { byActor, type, video, transaction } = options + + logger.info('Creating job to send %s of %s.', type, video.url) const activityBuilder = (audience: ActivityAudience) => { const url = getLocalVideoViewActivityPubUrl(byActor, video) - return buildViewActivity(url, byActor, video, audience) + return buildViewActivity({ url, byActor, video, audience, type }) } - return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction: t, contextType: 'View' }) + return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' }) } -function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoUrl, audience?: ActivityAudience): ActivityView { - if (!audience) audience = getAudience(byActor) +// --------------------------------------------------------------------------- + +export { + sendView +} + +// --------------------------------------------------------------------------- + +function buildViewActivity (options: { + url: string + byActor: MActorAudience + video: MVideoUrl + type: ViewType + audience?: ActivityAudience +}): ActivityView { + const { url, byActor, type, video, audience = getAudience(byActor) } = options return audiencify( { @@ -29,14 +51,11 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU type: 'View' as 'View', actor: byActor.url, object: video.url, - expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() + + expires: type === 'viewer' + ? new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString() + : undefined }, audience ) } - -// --------------------------------------------------------------------------- - -export { - sendView -} diff --git a/server/lib/activitypub/url.ts b/server/lib/activitypub/url.ts index 50be4fac9..8443fef4c 100644 --- a/server/lib/activitypub/url.ts +++ b/server/lib/activitypub/url.ts @@ -7,6 +7,7 @@ import { MActorId, MActorUrl, MCommentId, + MLocalVideoViewer, MVideoId, MVideoPlaylistElement, MVideoUrl, @@ -59,6 +60,10 @@ function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId) { return byActor.url + '/views/videos/' + video.id + '/' + new Date().toISOString() } +function getLocalVideoViewerActivityPubUrl (stats: MLocalVideoViewer) { + return WEBSERVER.URL + '/videos/local-viewer/' + stats.uuid +} + function getVideoLikeActivityPubUrlByLocalActor (byActor: MActorUrl, video: MVideoId) { return byActor.url + '/likes/' + video.id } @@ -167,6 +172,7 @@ export { getLocalVideoCommentsActivityPubUrl, getLocalVideoLikesActivityPubUrl, getLocalVideoDislikesActivityPubUrl, + getLocalVideoViewerActivityPubUrl, getAbuseTargetUrl, checkUrlsSameHost, diff --git a/server/lib/activitypub/videos/shared/object-to-model-attributes.ts b/server/lib/activitypub/videos/shared/object-to-model-attributes.ts index c97217669..f02b9cba6 100644 --- a/server/lib/activitypub/videos/shared/object-to-model-attributes.ts +++ b/server/lib/activitypub/videos/shared/object-to-model-attributes.ts @@ -24,6 +24,7 @@ import { VideoPrivacy, VideoStreamingPlaylistType } from '@shared/models' +import { getDurationFromActivityStream } from '../../activity' function getThumbnailFromIcons (videoObject: VideoObject) { let validIcons = videoObject.icon.filter(i => i.width > THUMBNAILS_SIZE.minWidth) @@ -170,7 +171,6 @@ function getVideoAttributesFromObject (videoChannel: MChannelId, videoObject: Vi ? VideoPrivacy.PUBLIC : VideoPrivacy.UNLISTED - const duration = videoObject.duration.replace(/[^\d]+/, '') const language = videoObject.language?.identifier const category = videoObject.category @@ -200,7 +200,7 @@ function getVideoAttributesFromObject (videoChannel: MChannelId, videoObject: Vi isLive: videoObject.isLiveBroadcast, state: videoObject.state, channelId: videoChannel.id, - duration: parseInt(duration, 10), + duration: getDurationFromActivityStream(videoObject.duration), createdAt: new Date(videoObject.published), publishedAt: new Date(videoObject.published), diff --git a/server/lib/client-html.ts b/server/lib/client-html.ts index a9c835fbf..337364ac9 100644 --- a/server/lib/client-html.ts +++ b/server/lib/client-html.ts @@ -23,11 +23,11 @@ import { WEBSERVER } from '../initializers/constants' import { AccountModel } from '../models/account/account' -import { getActivityStreamDuration } from '../models/video/formatter/video-format-utils' import { VideoModel } from '../models/video/video' import { VideoChannelModel } from '../models/video/video-channel' import { VideoPlaylistModel } from '../models/video/video-playlist' import { MAccountActor, MChannelActor } from '../types/models' +import { getActivityStreamDuration } from './activitypub/activity' import { getBiggestActorImage } from './actor-image' import { ServerConfigManager } from './server-config-manager' diff --git a/server/lib/job-queue/handlers/video-views-stats.ts b/server/lib/job-queue/handlers/video-views-stats.ts index caf5f6962..689a5a3b4 100644 --- a/server/lib/job-queue/handlers/video-views-stats.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts @@ -1,7 +1,7 @@ +import { VideoViewModel } from '@server/models/view/video-view' import { isTestInstance } from '../../../helpers/core-utils' import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' -import { VideoViewModel } from '../../../models/video/video-view' import { Redis } from '../../redis' async function processVideosViewsStats () { diff --git a/server/lib/redis.ts b/server/lib/redis.ts index c4c1fa443..b86aefa0e 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -249,6 +249,45 @@ class Redis { ]) } + /* ************ Video viewers stats ************ */ + + getLocalVideoViewer (options: { + key?: string + // Or + ip?: string + videoId?: number + }) { + if (options.key) return this.getObject(options.key) + + const { viewerKey } = this.generateLocalVideoViewerKeys(options.ip, options.videoId) + + return this.getObject(viewerKey) + } + + setLocalVideoViewer (ip: string, videoId: number, object: any) { + const { setKey, viewerKey } = this.generateLocalVideoViewerKeys(ip, videoId) + + return Promise.all([ + this.addToSet(setKey, viewerKey), + this.setObject(viewerKey, object) + ]) + } + + listLocalVideoViewerKeys () { + const { setKey } = this.generateLocalVideoViewerKeys() + + return this.getSet(setKey) + } + + deleteLocalVideoViewersKeys (key: string) { + const { setKey } = this.generateLocalVideoViewerKeys() + + return Promise.all([ + this.deleteFromSet(setKey, key), + this.deleteKey(key) + ]) + } + /* ************ Resumable uploads final responses ************ */ setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) { @@ -290,10 +329,18 @@ class Redis { /* ************ Keys generation ************ */ - private generateLocalVideoViewsKeys (videoId?: Number) { + private generateLocalVideoViewsKeys (videoId: number): { setKey: string, videoKey: string } + private generateLocalVideoViewsKeys (): { setKey: string } + private generateLocalVideoViewsKeys (videoId?: number) { return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } } + private generateLocalVideoViewerKeys (ip: string, videoId: number): { setKey: string, viewerKey: string } + private generateLocalVideoViewerKeys (): { setKey: string } + private generateLocalVideoViewerKeys (ip?: string, videoId?: number) { + return { setKey: `local-video-viewer-stats-keys`, viewerKey: `local-video-viewer-stats-${ip}-${videoId}` } + } + private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { const hour = exists(options.hour) ? options.hour @@ -352,8 +399,23 @@ class Redis { return this.client.del(this.prefix + key) } - private async setValue (key: string, value: string, expirationMilliseconds: number) { - const result = await this.client.set(this.prefix + key, value, { PX: expirationMilliseconds }) + private async getObject (key: string) { + const value = await this.getValue(key) + if (!value) return null + + return JSON.parse(value) + } + + private setObject (key: string, value: { [ id: string ]: number | string }) { + return this.setValue(key, JSON.stringify(value)) + } + + private async setValue (key: string, value: string, expirationMilliseconds?: number) { + const options = expirationMilliseconds + ? { PX: expirationMilliseconds } + : {} + + const result = await this.client.set(this.prefix + key, value, options) if (result !== 'OK') throw new Error('Redis set result is not OK.') } diff --git a/server/lib/schedulers/geo-ip-update-scheduler.ts b/server/lib/schedulers/geo-ip-update-scheduler.ts new file mode 100644 index 000000000..9dda6d76c --- /dev/null +++ b/server/lib/schedulers/geo-ip-update-scheduler.ts @@ -0,0 +1,22 @@ +import { GeoIP } from '@server/helpers/geo-ip' +import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { AbstractScheduler } from './abstract-scheduler' + +export class GeoIPUpdateScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.YOUTUBE_DL_UPDATE + + private constructor () { + super() + } + + protected internalExecute () { + return GeoIP.Instance.updateDatabase() + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/schedulers/remove-old-views-scheduler.ts b/server/lib/schedulers/remove-old-views-scheduler.ts index 64bef97fe..8bc53a045 100644 --- a/server/lib/schedulers/remove-old-views-scheduler.ts +++ b/server/lib/schedulers/remove-old-views-scheduler.ts @@ -1,8 +1,8 @@ +import { VideoViewModel } from '@server/models/view/video-view' import { logger } from '../../helpers/logger' -import { AbstractScheduler } from './abstract-scheduler' -import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { CONFIG } from '../../initializers/config' -import { VideoViewModel } from '../../models/video/video-view' +import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { AbstractScheduler } from './abstract-scheduler' export class RemoveOldViewsScheduler extends AbstractScheduler { diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts index c0e72c461..937764155 100644 --- a/server/lib/schedulers/video-views-buffer-scheduler.ts +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts @@ -21,8 +21,6 @@ export class VideoViewsBufferScheduler extends AbstractScheduler { const videoIds = await Redis.Instance.listLocalVideosViewed() if (videoIds.length === 0) return - logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) - for (const videoId of videoIds) { try { const views = await Redis.Instance.getLocalVideoViews(videoId) @@ -34,6 +32,8 @@ export class VideoViewsBufferScheduler extends AbstractScheduler { continue } + logger.info('Processing local video %s views buffer.', video.uuid, lTags(video.uuid)) + // If this is a remote video, the origin instance will send us an update await VideoModel.incrementViews(videoId, views) diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts deleted file mode 100644 index c024eb93c..000000000 --- a/server/lib/video-views.ts +++ /dev/null @@ -1,131 +0,0 @@ -import { isTestInstance } from '@server/helpers/core-utils' -import { logger, loggerTagsFactory } from '@server/helpers/logger' -import { VIEW_LIFETIME } from '@server/initializers/constants' -import { VideoModel } from '@server/models/video/video' -import { MVideo } from '@server/types/models' -import { PeerTubeSocket } from './peertube-socket' -import { Redis } from './redis' - -const lTags = loggerTagsFactory('views') - -export class VideoViews { - - // Values are Date().getTime() - private readonly viewersPerVideo = new Map() - - private static instance: VideoViews - - private constructor () { - } - - init () { - setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) - } - - async processView (options: { - video: MVideo - ip: string | null - viewerExpires?: Date - }) { - const { video, ip, viewerExpires } = options - - logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) - - let success = await this.addView(video, ip) - - if (video.isLive) { - const successViewer = await this.addViewer(video, ip, viewerExpires) - success ||= successViewer - } - - return success - } - - getViewers (video: MVideo) { - const viewers = this.viewersPerVideo.get(video.id) - if (!viewers) return 0 - - return viewers.length - } - - buildViewerExpireTime () { - return new Date().getTime() + VIEW_LIFETIME.VIEWER - } - - private async addView (video: MVideo, ip: string | null) { - const promises: Promise[] = [] - - if (ip !== null) { - const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) - if (viewExists) return false - - promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) - } - - if (video.isOwned()) { - promises.push(Redis.Instance.addLocalVideoView(video.id)) - } - - promises.push(Redis.Instance.addVideoViewStats(video.id)) - - await Promise.all(promises) - - return true - } - - private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { - if (ip !== null) { - const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) - if (viewExists) return false - - await Redis.Instance.setIPVideoViewer(ip, video.uuid) - } - - let watchers = this.viewersPerVideo.get(video.id) - - if (!watchers) { - watchers = [] - this.viewersPerVideo.set(video.id, watchers) - } - - const expiration = viewerExpires - ? viewerExpires.getTime() - : this.buildViewerExpireTime() - - watchers.push(expiration) - await this.notifyClients(video.id, watchers.length) - - return true - } - - private async cleanViewers () { - if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) - - for (const videoId of this.viewersPerVideo.keys()) { - const notBefore = new Date().getTime() - - const viewers = this.viewersPerVideo.get(videoId) - - // Only keep not expired viewers - const newViewers = viewers.filter(w => w > notBefore) - - if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) - else this.viewersPerVideo.set(videoId, newViewers) - - await this.notifyClients(videoId, newViewers.length) - } - } - - private async notifyClients (videoId: string | number, viewersLength: number) { - const video = await VideoModel.loadImmutableAttributes(videoId) - if (!video) return - - PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) - - logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) - } - - static get Instance () { - return this.instance || (this.instance = new this()) - } -} diff --git a/server/lib/views/shared/index.ts b/server/lib/views/shared/index.ts new file mode 100644 index 000000000..dd510f4e2 --- /dev/null +++ b/server/lib/views/shared/index.ts @@ -0,0 +1,2 @@ +export * from './video-viewers' +export * from './video-views' diff --git a/server/lib/views/shared/video-viewers.ts b/server/lib/views/shared/video-viewers.ts new file mode 100644 index 000000000..5c26f8982 --- /dev/null +++ b/server/lib/views/shared/video-viewers.ts @@ -0,0 +1,276 @@ +import { Transaction } from 'sequelize/types' +import { isTestInstance } from '@server/helpers/core-utils' +import { GeoIP } from '@server/helpers/geo-ip' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { sendCreateWatchAction } from '@server/lib/activitypub/send' +import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' +import { PeerTubeSocket } from '@server/lib/peertube-socket' +import { Redis } from '@server/lib/redis' +import { VideoModel } from '@server/models/video/video' +import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' +import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' +import { MVideo } from '@server/types/models' +import { VideoViewEvent } from '@shared/models' + +const lTags = loggerTagsFactory('views') + +type LocalViewerStats = { + firstUpdated: number // Date.getTime() + lastUpdated: number // Date.getTime() + + watchSections: { + start: number + end: number + }[] + + watchTime: number + + country: string + + videoId: number +} + +export class VideoViewers { + + // Values are Date().getTime() + private readonly viewersPerVideo = new Map() + + private processingViewerCounters = false + private processingViewerStats = false + + constructor () { + setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER) + + setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) + } + + // --------------------------------------------------------------------------- + + getViewers (video: MVideo) { + const viewers = this.viewersPerVideo.get(video.id) + if (!viewers) return 0 + + return viewers.length + } + + buildViewerExpireTime () { + return new Date().getTime() + VIEW_LIFETIME.VIEWER + } + + async getWatchTime (videoId: number, ip: string) { + const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) + + return stats?.watchTime || 0 + } + + async addLocalViewer (options: { + video: MVideo + currentTime: number + ip: string + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + + logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) + + await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) + + const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) + if (viewExists) return false + + await Redis.Instance.setIPVideoViewer(ip, video.uuid) + + return this.addViewerToVideo({ video }) + } + + async addRemoteViewer (options: { + video: MVideo + viewerExpires: Date + }) { + const { video, viewerExpires } = options + + logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) + + return this.addViewerToVideo({ video, viewerExpires }) + } + + private async addViewerToVideo (options: { + video: MVideo + viewerExpires?: Date + }) { + const { video, viewerExpires } = options + + let watchers = this.viewersPerVideo.get(video.id) + + if (!watchers) { + watchers = [] + this.viewersPerVideo.set(video.id, watchers) + } + + const expiration = viewerExpires + ? viewerExpires.getTime() + : this.buildViewerExpireTime() + + watchers.push(expiration) + await this.notifyClients(video.id, watchers.length) + + return true + } + + private async updateLocalViewerStats (options: { + video: MVideo + ip: string + currentTime: number + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + const nowMs = new Date().getTime() + + let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) + + if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { + logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) + return + } + + if (!stats) { + const country = await GeoIP.Instance.safeCountryISOLookup(ip) + + stats = { + firstUpdated: nowMs, + lastUpdated: nowMs, + + watchSections: [], + + watchTime: 0, + + country, + videoId: video.id + } + } + + stats.lastUpdated = nowMs + + if (viewEvent === 'seek' || stats.watchSections.length === 0) { + stats.watchSections.push({ + start: currentTime, + end: currentTime + }) + } else { + const lastSection = stats.watchSections[stats.watchSections.length - 1] + lastSection.end = currentTime + } + + stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) + + logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) + + await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) + } + + private async cleanViewerCounters () { + if (this.processingViewerCounters) return + this.processingViewerCounters = true + + if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) + + try { + for (const videoId of this.viewersPerVideo.keys()) { + const notBefore = new Date().getTime() + + const viewers = this.viewersPerVideo.get(videoId) + + // Only keep not expired viewers + const newViewers = viewers.filter(w => w > notBefore) + + if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) + else this.viewersPerVideo.set(videoId, newViewers) + + await this.notifyClients(videoId, newViewers.length) + } + } catch (err) { + logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) + } + + this.processingViewerCounters = false + } + + private async notifyClients (videoId: string | number, viewersLength: number) { + const video = await VideoModel.loadImmutableAttributes(videoId) + if (!video) return + + PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) + + logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) + } + + async processViewerStats () { + if (this.processingViewerStats) return + this.processingViewerStats = true + + if (!isTestInstance()) logger.info('Processing viewers.', lTags()) + + const now = new Date().getTime() + + try { + const allKeys = await Redis.Instance.listLocalVideoViewerKeys() + + for (const key of allKeys) { + const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) + + if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { + continue + } + + try { + await sequelizeTypescript.transaction(async t => { + const video = await VideoModel.load(stats.videoId, t) + + const statsModel = await this.saveViewerStats(video, stats, t) + + if (video.remote) { + await sendCreateWatchAction(statsModel, t) + } + }) + + await Redis.Instance.deleteLocalVideoViewersKeys(key) + } catch (err) { + logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) + } + } + } catch (err) { + logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) + } + + this.processingViewerStats = false + } + + private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { + const statsModel = new LocalVideoViewerModel({ + startDate: new Date(stats.firstUpdated), + endDate: new Date(stats.lastUpdated), + watchTime: stats.watchTime, + country: stats.country, + videoId: video.id + }) + + statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) + statsModel.Video = video as VideoModel + + await statsModel.save({ transaction }) + + statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ + localVideoViewerId: statsModel.id, + watchSections: stats.watchSections, + transaction + }) + + return statsModel + } + + private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { + return sections.reduce((p, current) => p + (current.end - current.start), 0) + } +} diff --git a/server/lib/views/shared/video-views.ts b/server/lib/views/shared/video-views.ts new file mode 100644 index 000000000..19250f993 --- /dev/null +++ b/server/lib/views/shared/video-views.ts @@ -0,0 +1,60 @@ +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MVideo } from '@server/types/models' +import { Redis } from '../../redis' + +const lTags = loggerTagsFactory('views') + +export class VideoViews { + + async addLocalView (options: { + video: MVideo + ip: string + watchTime: number + }) { + const { video, ip, watchTime } = options + + logger.debug('Adding local view to video %s.', video.uuid, { watchTime, ...lTags(video.uuid) }) + + if (!this.hasEnoughWatchTime(video, watchTime)) return false + + const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) + if (viewExists) return false + + await Redis.Instance.setIPVideoView(ip, video.uuid) + + await this.addView(video) + + return true + } + + async addRemoteView (options: { + video: MVideo + }) { + const { video } = options + + logger.debug('Adding remote view to video %s.', video.uuid, { ...lTags(video.uuid) }) + + await this.addView(video) + + return true + } + + private async addView (video: MVideo) { + const promises: Promise[] = [] + + if (video.isOwned()) { + promises.push(Redis.Instance.addLocalVideoView(video.id)) + } + + promises.push(Redis.Instance.addVideoViewStats(video.id)) + + await Promise.all(promises) + } + + private hasEnoughWatchTime (video: MVideo, watchTime: number) { + if (video.isLive || video.duration >= 30) return watchTime >= 30 + + // Check more than 50% of the video is watched + return video.duration / watchTime < 2 + } +} diff --git a/server/lib/views/video-views-manager.ts b/server/lib/views/video-views-manager.ts new file mode 100644 index 000000000..e07af1ca9 --- /dev/null +++ b/server/lib/views/video-views-manager.ts @@ -0,0 +1,70 @@ +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MVideo } from '@server/types/models' +import { VideoViewEvent } from '@shared/models' +import { VideoViewers, VideoViews } from './shared' + +const lTags = loggerTagsFactory('views') + +export class VideoViewsManager { + + private static instance: VideoViewsManager + + private videoViewers: VideoViewers + private videoViews: VideoViews + + private constructor () { + } + + init () { + this.videoViewers = new VideoViewers() + this.videoViews = new VideoViews() + } + + async processLocalView (options: { + video: MVideo + currentTime: number + ip: string | null + viewEvent?: VideoViewEvent + }) { + const { video, ip, viewEvent, currentTime } = options + + logger.debug('Processing local view for %s and ip %s.', video.url, ip, lTags()) + + const successViewer = await this.videoViewers.addLocalViewer({ video, ip, viewEvent, currentTime }) + + // Do it after added local viewer to fetch updated information + const watchTime = await this.videoViewers.getWatchTime(video.id, ip) + + const successView = await this.videoViews.addLocalView({ video, watchTime, ip }) + + return { successView, successViewer } + } + + async processRemoteView (options: { + video: MVideo + viewerExpires?: Date + }) { + const { video, viewerExpires } = options + + logger.debug('Processing remote view for %s.', video.url, { viewerExpires, ...lTags() }) + + if (viewerExpires) await this.videoViewers.addRemoteViewer({ video, viewerExpires }) + else await this.videoViews.addRemoteView({ video }) + } + + getViewers (video: MVideo) { + return this.videoViewers.getViewers(video) + } + + buildViewerExpireTime () { + return this.videoViewers.buildViewerExpireTime() + } + + processViewers () { + return this.videoViewers.processViewerStats() + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} -- cgit v1.2.3