aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-11-09 10:11:20 +0100
committerChocobozzz <chocobozzz@cpy.re>2021-11-09 15:00:31 +0100
commit51353d9a035fb6b81f903a8b5f391292841649fd (patch)
tree75acb6eea5e043bf2e15a6a5a92e9a3c5967b156 /server/lib
parent221ee1adc916684d4881d2a9c4c01954dcde986e (diff)
downloadPeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.gz
PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.zst
PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.zip
Refactor video views
Introduce viewers attribute for live videos Count views for live videos Reduce delay to see the viewer update for lives Add ability to configure video views buffer interval and view ip expiration
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/process/process-view.ts32
-rw-r--r--server/lib/activitypub/send/send-view.ts4
-rw-r--r--server/lib/activitypub/videos/updater.ts1
-rw-r--r--server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts)36
-rw-r--r--server/lib/job-queue/job-queue.ts12
-rw-r--r--server/lib/live/live-manager.ts49
-rw-r--r--server/lib/peertube-socket.ts8
-rw-r--r--server/lib/redis.ts112
-rw-r--r--server/lib/schedulers/video-views-buffer-scheduler.ts52
-rw-r--r--server/lib/video-views.ts130
10 files changed, 302 insertions, 134 deletions
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts
index 5593ee257..720385f9b 100644
--- a/server/lib/activitypub/process/process-view.ts
+++ b/server/lib/activitypub/process/process-view.ts
@@ -1,13 +1,13 @@
1import { getOrCreateAPVideo } from '../videos' 1import { VideoViews } from '@server/lib/video-views'
2import { forwardVideoRelatedActivity } from '../send/utils' 2import { ActivityView } from '../../../../shared/models/activitypub'
3import { Redis } from '../../redis'
4import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
5import { APProcessorOptions } from '../../../types/activitypub-processor.model' 3import { APProcessorOptions } from '../../../types/activitypub-processor.model'
6import { MActorSignature } from '../../../types/models' 4import { MActorSignature } from '../../../types/models'
7import { LiveManager } from '@server/lib/live/live-manager' 5import { forwardVideoRelatedActivity } from '../send/utils'
6import { getOrCreateAPVideo } from '../videos'
8 7
9async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { 8async function processViewActivity (options: APProcessorOptions<ActivityView>) {
10 const { activity, byActor } = options 9 const { activity, byActor } = options
10
11 return processCreateView(activity, byActor) 11 return processCreateView(activity, byActor)
12} 12}
13 13
@@ -19,10 +19,8 @@ export {
19 19
20// --------------------------------------------------------------------------- 20// ---------------------------------------------------------------------------
21 21
22async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { 22async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
23 const videoObject = activity.type === 'View' 23 const videoObject = activity.object
24 ? activity.object
25 : (activity.object as ViewObject).object
26 24
27 const { video } = await getOrCreateAPVideo({ 25 const { video } = await getOrCreateAPVideo({
28 videoObject, 26 videoObject,
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
30 allowRefresh: false 28 allowRefresh: false
31 }) 29 })
32 30
33 if (!video.isLive) { 31 const viewerExpires = activity.expires
34 await Redis.Instance.addVideoView(video.id) 32 ? new Date(activity.expires)
35 } 33 : undefined
36 34
37 if (video.isOwned()) { 35 await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
38 // Our live manager will increment the counter and send the view to followers
39 if (video.isLive) {
40 LiveManager.Instance.addViewTo(video.id)
41 return
42 }
43 36
37 if (video.isOwned()) {
44 // Forward the view but don't resend the activity to the sender 38 // Forward the view but don't resend the activity to the sender
45 const exceptions = [ byActor ] 39 const exceptions = [ byActor ]
46 await forwardVideoRelatedActivity(activity, undefined, exceptions, video) 40 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts
index 153e94295..b12583e26 100644
--- a/server/lib/activitypub/send/send-view.ts
+++ b/server/lib/activitypub/send/send-view.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { VideoViews } from '@server/lib/video-views'
2import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' 3import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
3import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' 4import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
4import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU
27 id: url, 28 id: url,
28 type: 'View' as 'View', 29 type: 'View' as 'View',
29 actor: byActor.url, 30 actor: byActor.url,
30 object: video.url 31 object: video.url,
32 expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
31 }, 33 },
32 audience 34 audience
33 ) 35 )
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts
index 157569414..f786bb196 100644
--- a/server/lib/activitypub/videos/updater.ts
+++ b/server/lib/activitypub/videos/updater.ts
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
81 81
82 if (videoUpdated.isLive) { 82 if (videoUpdated.isLive) {
83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) 83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
84 PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
85 } 84 }
86 85
87 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) 86 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts
index 86d0a271f..caf5f6962 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views-stats.ts
@@ -1,11 +1,10 @@
1import { Redis } from '../../redis' 1import { isTestInstance } from '../../../helpers/core-utils'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video' 3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-view' 4import { VideoViewModel } from '../../../models/video/video-view'
5import { isTestInstance } from '../../../helpers/core-utils' 5import { Redis } from '../../redis'
6import { federateVideoIfNeeded } from '../../activitypub/videos'
7 6
8async function processVideosViews () { 7async function processVideosViewsStats () {
9 const lastHour = new Date() 8 const lastHour = new Date()
10 9
11 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour 10 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@@ -15,23 +14,23 @@ async function processVideosViews () {
15 const startDate = lastHour.setMinutes(0, 0, 0) 14 const startDate = lastHour.setMinutes(0, 0, 0)
16 const endDate = lastHour.setMinutes(59, 59, 999) 15 const endDate = lastHour.setMinutes(59, 59, 999)
17 16
18 const videoIds = await Redis.Instance.getVideosIdViewed(hour) 17 const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
19 if (videoIds.length === 0) return 18 if (videoIds.length === 0) return
20 19
21 logger.info('Processing videos views in job for hour %d.', hour) 20 logger.info('Processing videos views stats in job for hour %d.', hour)
22 21
23 for (const videoId of videoIds) { 22 for (const videoId of videoIds) {
24 try { 23 try {
25 const views = await Redis.Instance.getVideoViews(videoId, hour) 24 const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
26 await Redis.Instance.deleteVideoViews(videoId, hour) 25 await Redis.Instance.deleteVideoViewsStats(videoId, hour)
27 26
28 if (views) { 27 if (views) {
29 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 28 logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
30 29
31 try { 30 try {
32 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 31 const video = await VideoModel.load(videoId)
33 if (!video) { 32 if (!video) {
34 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) 33 logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
35 continue 34 continue
36 } 35 }
37 36
@@ -41,21 +40,12 @@ async function processVideosViews () {
41 views, 40 views,
42 videoId 41 videoId
43 }) 42 })
44
45 if (video.isOwned()) {
46 // If this is a remote video, the origin instance will send us an update
47 await VideoModel.incrementViews(videoId, views)
48
49 // Send video update
50 video.views += views
51 await federateVideoIfNeeded(video, false)
52 }
53 } catch (err) { 43 } catch (err) {
54 logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) 44 logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
55 } 45 }
56 } 46 }
57 } catch (err) { 47 } catch (err) {
58 logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) 48 logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
59 } 49 }
60 } 50 }
61} 51}
@@ -63,5 +53,5 @@ async function processVideosViews () {
63// --------------------------------------------------------------------------- 53// ---------------------------------------------------------------------------
64 54
65export { 55export {
66 processVideosViews 56 processVideosViewsStats
67} 57}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0eab720d9..4c1597b33 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import'
36import { processVideoImport } from './handlers/video-import' 36import { processVideoImport } from './handlers/video-import'
37import { processVideoLiveEnding } from './handlers/video-live-ending' 37import { processVideoLiveEnding } from './handlers/video-live-ending'
38import { processVideoTranscoding } from './handlers/video-transcoding' 38import { processVideoTranscoding } from './handlers/video-transcoding'
39import { processVideosViews } from './handlers/video-views' 39import { processVideosViewsStats } from './handlers/video-views-stats'
40 40
41type CreateJobArgument = 41type CreateJobArgument =
42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -49,7 +49,7 @@ type CreateJobArgument =
49 { type: 'email', payload: EmailPayload } | 49 { type: 'email', payload: EmailPayload } |
50 { type: 'video-import', payload: VideoImportPayload } | 50 { type: 'video-import', payload: VideoImportPayload } |
51 { type: 'activitypub-refresher', payload: RefreshPayload } | 51 { type: 'activitypub-refresher', payload: RefreshPayload } |
52 { type: 'videos-views', payload: {} } | 52 { type: 'videos-views-stats', payload: {} } |
53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } | 53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
54 { type: 'actor-keys', payload: ActorKeysPayload } | 54 { type: 'actor-keys', payload: ActorKeysPayload } |
55 { type: 'video-redundancy', payload: VideoRedundancyPayload } | 55 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
71 'video-transcoding': processVideoTranscoding, 71 'video-transcoding': processVideoTranscoding,
72 'email': processEmail, 72 'email': processEmail,
73 'video-import': processVideoImport, 73 'video-import': processVideoImport,
74 'videos-views': processVideosViews, 74 'videos-views-stats': processVideosViewsStats,
75 'activitypub-refresher': refreshAPObject, 75 'activitypub-refresher': refreshAPObject,
76 'video-live-ending': processVideoLiveEnding, 76 'video-live-ending': processVideoLiveEnding,
77 'actor-keys': processActorKeys, 77 'actor-keys': processActorKeys,
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [
89 'video-transcoding', 89 'video-transcoding',
90 'video-file-import', 90 'video-file-import',
91 'video-import', 91 'video-import',
92 'videos-views', 92 'videos-views-stats',
93 'activitypub-refresher', 93 'activitypub-refresher',
94 'video-redundancy', 94 'video-redundancy',
95 'actor-keys', 95 'actor-keys',
@@ -247,8 +247,8 @@ class JobQueue {
247 } 247 }
248 248
249 private addRepeatableJobs () { 249 private addRepeatableJobs () {
250 this.queues['videos-views'].add({}, { 250 this.queues['videos-views-stats'].add({}, {
251 repeat: REPEAT_JOBS['videos-views'] 251 repeat: REPEAT_JOBS['videos-views-stats']
252 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 252 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
253 253
254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 1b7b9dd4d..2562edb75 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,7 +2,6 @@
2import { readFile } from 'fs-extra' 2import { readFile } from 'fs-extra'
3import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { isTestInstance } from '@server/helpers/core-utils'
6import { 5import {
7 computeResolutionsToTranscode, 6 computeResolutionsToTranscode,
8 ffprobePromise, 7 ffprobePromise,
@@ -12,7 +11,7 @@ import {
12} from '@server/helpers/ffprobe-utils' 11} from '@server/helpers/ffprobe-utils'
13import { logger, loggerTagsFactory } from '@server/helpers/logger' 12import { logger, loggerTagsFactory } from '@server/helpers/logger'
14import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 13import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
15import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' 14import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
16import { UserModel } from '@server/models/user/user' 15import { UserModel } from '@server/models/user/user'
17import { VideoModel } from '@server/models/video/video' 16import { VideoModel } from '@server/models/video/video'
18import { VideoLiveModel } from '@server/models/video/video-live' 17import { VideoLiveModel } from '@server/models/video/video-live'
@@ -53,8 +52,6 @@ class LiveManager {
53 52
54 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
55 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<number, string>()
56 // Values are Date().getTime()
57 private readonly watchersPerVideo = new Map<number, number[]>()
58 55
59 private rtmpServer: Server 56 private rtmpServer: Server
60 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -99,8 +96,6 @@ class LiveManager {
99 // Cleanup broken lives, that were terminated by a server restart for example 96 // Cleanup broken lives, that were terminated by a server restart for example
100 this.handleBrokenLives() 97 this.handleBrokenLives()
101 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) 98 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
102
103 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
104 } 99 }
105 100
106 async run () { 101 async run () {
@@ -184,19 +179,6 @@ class LiveManager {
184 this.abortSession(sessionId) 179 this.abortSession(sessionId)
185 } 180 }
186 181
187 addViewTo (videoId: number) {
188 if (this.videoSessions.has(videoId) === false) return
189
190 let watchers = this.watchersPerVideo.get(videoId)
191
192 if (!watchers) {
193 watchers = []
194 this.watchersPerVideo.set(videoId, watchers)
195 }
196
197 watchers.push(new Date().getTime())
198 }
199
200 private getContext () { 182 private getContext () {
201 return context 183 return context
202 } 184 }
@@ -377,7 +359,6 @@ class LiveManager {
377 } 359 }
378 360
379 private onMuxingFFmpegEnd (videoId: number) { 361 private onMuxingFFmpegEnd (videoId: number) {
380 this.watchersPerVideo.delete(videoId)
381 this.videoSessions.delete(videoId) 362 this.videoSessions.delete(videoId)
382 } 363 }
383 364
@@ -411,34 +392,6 @@ class LiveManager {
411 } 392 }
412 } 393 }
413 394
414 private async updateLiveViews () {
415 if (!this.isRunning()) return
416
417 if (!isTestInstance()) logger.info('Updating live video views.', lTags())
418
419 for (const videoId of this.watchersPerVideo.keys()) {
420 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
421
422 const watchers = this.watchersPerVideo.get(videoId)
423
424 const numWatchers = watchers.length
425
426 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
427 video.views = numWatchers
428 await video.save()
429
430 await federateVideoIfNeeded(video, false)
431
432 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
433
434 // Only keep not expired watchers
435 const newWatchers = watchers.filter(w => w > notBefore)
436 this.watchersPerVideo.set(videoId, newWatchers)
437
438 logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
439 }
440 }
441
442 private async handleBrokenLives () { 395 private async handleBrokenLives () {
443 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 396 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
444 397
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 901435dea..0398ca61d 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -1,7 +1,7 @@
1import { Server as HTTPServer } from 'http' 1import { Server as HTTPServer } from 'http'
2import { Namespace, Server as SocketServer, Socket } from 'socket.io' 2import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3import { isIdValid } from '@server/helpers/custom-validators/misc' 3import { isIdValid } from '@server/helpers/custom-validators/misc'
4import { MVideo } from '@server/types/models' 4import { MVideo, MVideoImmutable } from '@server/types/models'
5import { UserNotificationModelForApi } from '@server/types/models/user' 5import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' 6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger' 7import { logger } from '../helpers/logger'
@@ -78,11 +78,11 @@ class PeerTubeSocket {
78 .emit(type, data) 78 .emit(type, data)
79 } 79 }
80 80
81 sendVideoViewsUpdate (video: MVideo) { 81 sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
82 const data: LiveVideoEventPayload = { views: video.views } 82 const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
83 const type: LiveVideoEventType = 'views-change' 83 const type: LiveVideoEventType = 'views-change'
84 84
85 logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) 85 logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
86 86
87 this.liveVideosNamespace 87 this.liveVideosNamespace
88 .in(video.id) 88 .in(video.id)
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 46617b07e..76b7868e8 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -13,6 +13,7 @@ import {
13 RESUMABLE_UPLOAD_SESSION_LIFETIME 13 RESUMABLE_UPLOAD_SESSION_LIFETIME
14} from '../initializers/constants' 14} from '../initializers/constants'
15import { CONFIG } from '../initializers/config' 15import { CONFIG } from '../initializers/config'
16import { exists } from '@server/helpers/custom-validators/misc'
16 17
17type CachedRoute = { 18type CachedRoute = {
18 body: string 19 body: string
@@ -119,16 +120,20 @@ class Redis {
119 120
120 /* ************ Views per IP ************ */ 121 /* ************ Views per IP ************ */
121 122
122 setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { 123 setIPVideoView (ip: string, videoUUID: string) {
123 const lifetime = isLive 124 return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
124 ? VIEW_LIFETIME.LIVE 125 }
125 : VIEW_LIFETIME.VIDEO
126 126
127 return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) 127 setIPVideoViewer (ip: string, videoUUID: string) {
128 return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
128 } 129 }
129 130
130 async doesVideoIPViewExist (ip: string, videoUUID: string) { 131 async doesVideoIPViewExist (ip: string, videoUUID: string) {
131 return this.exists(this.generateViewKey(ip, videoUUID)) 132 return this.exists(this.generateIPViewKey(ip, videoUUID))
133 }
134
135 async doesVideoIPViewerExist (ip: string, videoUUID: string) {
136 return this.exists(this.generateIPViewerKey(ip, videoUUID))
132 } 137 }
133 138
134 /* ************ Tracker IP block ************ */ 139 /* ************ Tracker IP block ************ */
@@ -160,46 +165,85 @@ class Redis {
160 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) 165 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
161 } 166 }
162 167
163 /* ************ Video views ************ */ 168 /* ************ Video views stats ************ */
164 169
165 addVideoView (videoId: number) { 170 addVideoViewStats (videoId: number) {
166 const keyIncr = this.generateVideoViewKey(videoId) 171 const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
167 const keySet = this.generateVideosViewKey()
168 172
169 return Promise.all([ 173 return Promise.all([
170 this.addToSet(keySet, videoId.toString()), 174 this.addToSet(setKey, videoId.toString()),
171 this.increment(keyIncr) 175 this.increment(videoKey)
172 ]) 176 ])
173 } 177 }
174 178
175 async getVideoViews (videoId: number, hour: number) { 179 async getVideoViewsStats (videoId: number, hour: number) {
176 const key = this.generateVideoViewKey(videoId, hour) 180 const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
177 181
178 const valueString = await this.getValue(key) 182 const valueString = await this.getValue(videoKey)
179 const valueInt = parseInt(valueString, 10) 183 const valueInt = parseInt(valueString, 10)
180 184
181 if (isNaN(valueInt)) { 185 if (isNaN(valueInt)) {
182 logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) 186 logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
183 return undefined 187 return undefined
184 } 188 }
185 189
186 return valueInt 190 return valueInt
187 } 191 }
188 192
189 async getVideosIdViewed (hour: number) { 193 async listVideosViewedForStats (hour: number) {
190 const key = this.generateVideosViewKey(hour) 194 const { setKey } = this.generateVideoViewStatsKeys({ hour })
191 195
192 const stringIds = await this.getSet(key) 196 const stringIds = await this.getSet(setKey)
193 return stringIds.map(s => parseInt(s, 10)) 197 return stringIds.map(s => parseInt(s, 10))
194 } 198 }
195 199
196 deleteVideoViews (videoId: number, hour: number) { 200 deleteVideoViewsStats (videoId: number, hour: number) {
197 const keySet = this.generateVideosViewKey(hour) 201 const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
198 const keyIncr = this.generateVideoViewKey(videoId, hour) 202
203 return Promise.all([
204 this.deleteFromSet(setKey, videoId.toString()),
205 this.deleteKey(videoKey)
206 ])
207 }
208
209 /* ************ Local video views buffer ************ */
210
211 addLocalVideoView (videoId: number) {
212 const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
199 213
200 return Promise.all([ 214 return Promise.all([
201 this.deleteFromSet(keySet, videoId.toString()), 215 this.addToSet(setKey, videoId.toString()),
202 this.deleteKey(keyIncr) 216 this.increment(videoKey)
217 ])
218 }
219
220 async getLocalVideoViews (videoId: number) {
221 const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
222
223 const valueString = await this.getValue(videoKey)
224 const valueInt = parseInt(valueString, 10)
225
226 if (isNaN(valueInt)) {
227 logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
228 return undefined
229 }
230
231 return valueInt
232 }
233
234 async listLocalVideosViewed () {
235 const { setKey } = this.generateLocalVideoViewsKeys()
236
237 const stringIds = await this.getSet(setKey)
238 return stringIds.map(s => parseInt(s, 10))
239 }
240
241 deleteLocalVideoViews (videoId: number) {
242 const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
243
244 return Promise.all([
245 this.deleteFromSet(setKey, videoId.toString()),
246 this.deleteKey(videoKey)
203 ]) 247 ])
204 } 248 }
205 249
@@ -233,16 +277,16 @@ class Redis {
233 return req.method + '-' + req.originalUrl 277 return req.method + '-' + req.originalUrl
234 } 278 }
235 279
236 private generateVideosViewKey (hour?: number) { 280 private generateLocalVideoViewsKeys (videoId?: Number) {
237 if (!hour) hour = new Date().getHours() 281 return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
238
239 return `videos-view-h${hour}`
240 } 282 }
241 283
242 private generateVideoViewKey (videoId: number, hour?: number) { 284 private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
243 if (hour === undefined || hour === null) hour = new Date().getHours() 285 const hour = exists(options.hour)
286 ? options.hour
287 : new Date().getHours()
244 288
245 return `video-view-${videoId}-h${hour}` 289 return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
246 } 290 }
247 291
248 private generateResetPasswordKey (userId: number) { 292 private generateResetPasswordKey (userId: number) {
@@ -253,10 +297,14 @@ class Redis {
253 return 'verify-email-' + userId 297 return 'verify-email-' + userId
254 } 298 }
255 299
256 private generateViewKey (ip: string, videoUUID: string) { 300 private generateIPViewKey (ip: string, videoUUID: string) {
257 return `views-${videoUUID}-${ip}` 301 return `views-${videoUUID}-${ip}`
258 } 302 }
259 303
304 private generateIPViewerKey (ip: string, videoUUID: string) {
305 return `viewer-${videoUUID}-${ip}`
306 }
307
260 private generateTrackerBlockIPKey (ip: string) { 308 private generateTrackerBlockIPKey (ip: string) {
261 return `tracker-block-ip-${ip}` 309 return `tracker-block-ip-${ip}`
262 } 310 }
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts
new file mode 100644
index 000000000..c0e72c461
--- /dev/null
+++ b/server/lib/schedulers/video-views-buffer-scheduler.ts
@@ -0,0 +1,52 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VideoModel } from '@server/models/video/video'
3import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { federateVideoIfNeeded } from '../activitypub/videos'
5import { Redis } from '../redis'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViewsBufferScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const videoIds = await Redis.Instance.listLocalVideosViewed()
22 if (videoIds.length === 0) return
23
24 logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
25
26 for (const videoId of videoIds) {
27 try {
28 const views = await Redis.Instance.getLocalVideoViews(videoId)
29 await Redis.Instance.deleteLocalVideoViews(videoId)
30
31 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
32 if (!video) {
33 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
34 continue
35 }
36
37 // If this is a remote video, the origin instance will send us an update
38 await VideoModel.incrementViews(videoId, views)
39
40 // Send video update
41 video.views += views
42 await federateVideoIfNeeded(video, false)
43 } catch (err) {
44 logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
45 }
46 }
47 }
48
49 static get Instance () {
50 return this.instance || (this.instance = new this())
51 }
52}
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts
new file mode 100644
index 000000000..220b509c2
--- /dev/null
+++ b/server/lib/video-views.ts
@@ -0,0 +1,130 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VIEW_LIFETIME } from '@server/initializers/constants'
3import { VideoModel } from '@server/models/video/video'
4import { MVideo } from '@server/types/models'
5import { PeerTubeSocket } from './peertube-socket'
6import { Redis } from './redis'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViews {
11
12 // Values are Date().getTime()
13 private readonly viewersPerVideo = new Map<number, number[]>()
14
15 private static instance: VideoViews
16
17 private constructor () {
18 }
19
20 init () {
21 setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
22 }
23
24 async processView (options: {
25 video: MVideo
26 ip: string | null
27 viewerExpires?: Date
28 }) {
29 const { video, ip, viewerExpires } = options
30
31 logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
32
33 let success = await this.addView(video, ip)
34
35 if (video.isLive) {
36 const successViewer = await this.addViewer(video, ip, viewerExpires)
37 success ||= successViewer
38 }
39
40 return success
41 }
42
43 getViewers (video: MVideo) {
44 const viewers = this.viewersPerVideo.get(video.id)
45 if (!viewers) return 0
46
47 return viewers.length
48 }
49
50 buildViewerExpireTime () {
51 return new Date().getTime() + VIEW_LIFETIME.VIEWER
52 }
53
54 private async addView (video: MVideo, ip: string | null) {
55 const promises: Promise<any>[] = []
56
57 if (ip !== null) {
58 const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
59 if (viewExists) return false
60
61 promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
62 }
63
64 if (video.isOwned()) {
65 promises.push(Redis.Instance.addLocalVideoView(video.id))
66 }
67
68 promises.push(Redis.Instance.addVideoViewStats(video.id))
69
70 await Promise.all(promises)
71
72 return true
73 }
74
75 private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
76 if (ip !== null) {
77 const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
78 if (viewExists) return false
79
80 await Redis.Instance.setIPVideoViewer(ip, video.uuid)
81 }
82
83 let watchers = this.viewersPerVideo.get(video.id)
84
85 if (!watchers) {
86 watchers = []
87 this.viewersPerVideo.set(video.id, watchers)
88 }
89
90 const expiration = viewerExpires
91 ? viewerExpires.getTime()
92 : this.buildViewerExpireTime()
93
94 watchers.push(expiration)
95 await this.notifyClients(video.id, watchers.length)
96
97 return true
98 }
99
100 private async cleanViewers () {
101 logger.info('Cleaning video viewers.', lTags())
102
103 for (const videoId of this.viewersPerVideo.keys()) {
104 const notBefore = new Date().getTime()
105
106 const viewers = this.viewersPerVideo.get(videoId)
107
108 // Only keep not expired viewers
109 const newViewers = viewers.filter(w => w > notBefore)
110
111 if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
112 else this.viewersPerVideo.set(videoId, newViewers)
113
114 await this.notifyClients(videoId, newViewers.length)
115 }
116 }
117
118 private async notifyClients (videoId: string | number, viewersLength: number) {
119 const video = await VideoModel.loadImmutableAttributes(videoId)
120 if (!video) return
121
122 PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
123
124 logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
125 }
126
127 static get Instance () {
128 return this.instance || (this.instance = new this())
129 }
130}