aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/process/process-view.ts32
-rw-r--r--server/lib/activitypub/send/send-view.ts4
-rw-r--r--server/lib/activitypub/videos/updater.ts1
-rw-r--r--server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts)36
-rw-r--r--server/lib/job-queue/job-queue.ts12
-rw-r--r--server/lib/live/live-manager.ts49
-rw-r--r--server/lib/peertube-socket.ts8
-rw-r--r--server/lib/redis.ts112
-rw-r--r--server/lib/schedulers/video-views-buffer-scheduler.ts52
-rw-r--r--server/lib/video-views.ts130
10 files changed, 302 insertions, 134 deletions
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts
index 5593ee257..720385f9b 100644
--- a/server/lib/activitypub/process/process-view.ts
+++ b/server/lib/activitypub/process/process-view.ts
@@ -1,13 +1,13 @@
1import { getOrCreateAPVideo } from '../videos' 1import { VideoViews } from '@server/lib/video-views'
2import { forwardVideoRelatedActivity } from '../send/utils' 2import { ActivityView } from '../../../../shared/models/activitypub'
3import { Redis } from '../../redis'
4import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
5import { APProcessorOptions } from '../../../types/activitypub-processor.model' 3import { APProcessorOptions } from '../../../types/activitypub-processor.model'
6import { MActorSignature } from '../../../types/models' 4import { MActorSignature } from '../../../types/models'
7import { LiveManager } from '@server/lib/live/live-manager' 5import { forwardVideoRelatedActivity } from '../send/utils'
6import { getOrCreateAPVideo } from '../videos'
8 7
9async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { 8async function processViewActivity (options: APProcessorOptions<ActivityView>) {
10 const { activity, byActor } = options 9 const { activity, byActor } = options
10
11 return processCreateView(activity, byActor) 11 return processCreateView(activity, byActor)
12} 12}
13 13
@@ -19,10 +19,8 @@ export {
19 19
20// --------------------------------------------------------------------------- 20// ---------------------------------------------------------------------------
21 21
22async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { 22async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
23 const videoObject = activity.type === 'View' 23 const videoObject = activity.object
24 ? activity.object
25 : (activity.object as ViewObject).object
26 24
27 const { video } = await getOrCreateAPVideo({ 25 const { video } = await getOrCreateAPVideo({
28 videoObject, 26 videoObject,
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
30 allowRefresh: false 28 allowRefresh: false
31 }) 29 })
32 30
33 if (!video.isLive) { 31 const viewerExpires = activity.expires
34 await Redis.Instance.addVideoView(video.id) 32 ? new Date(activity.expires)
35 } 33 : undefined
36 34
37 if (video.isOwned()) { 35 await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
38 // Our live manager will increment the counter and send the view to followers
39 if (video.isLive) {
40 LiveManager.Instance.addViewTo(video.id)
41 return
42 }
43 36
37 if (video.isOwned()) {
44 // Forward the view but don't resend the activity to the sender 38 // Forward the view but don't resend the activity to the sender
45 const exceptions = [ byActor ] 39 const exceptions = [ byActor ]
46 await forwardVideoRelatedActivity(activity, undefined, exceptions, video) 40 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts
index 153e94295..b12583e26 100644
--- a/server/lib/activitypub/send/send-view.ts
+++ b/server/lib/activitypub/send/send-view.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { VideoViews } from '@server/lib/video-views'
2import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' 3import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
3import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' 4import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
4import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU
27 id: url, 28 id: url,
28 type: 'View' as 'View', 29 type: 'View' as 'View',
29 actor: byActor.url, 30 actor: byActor.url,
30 object: video.url 31 object: video.url,
32 expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
31 }, 33 },
32 audience 34 audience
33 ) 35 )
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts
index 157569414..f786bb196 100644
--- a/server/lib/activitypub/videos/updater.ts
+++ b/server/lib/activitypub/videos/updater.ts
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
81 81
82 if (videoUpdated.isLive) { 82 if (videoUpdated.isLive) {
83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) 83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
84 PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
85 } 84 }
86 85
87 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) 86 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts
index 86d0a271f..caf5f6962 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views-stats.ts
@@ -1,11 +1,10 @@
1import { Redis } from '../../redis' 1import { isTestInstance } from '../../../helpers/core-utils'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video' 3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-view' 4import { VideoViewModel } from '../../../models/video/video-view'
5import { isTestInstance } from '../../../helpers/core-utils' 5import { Redis } from '../../redis'
6import { federateVideoIfNeeded } from '../../activitypub/videos'
7 6
8async function processVideosViews () { 7async function processVideosViewsStats () {
9 const lastHour = new Date() 8 const lastHour = new Date()
10 9
11 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour 10 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@@ -15,23 +14,23 @@ async function processVideosViews () {
15 const startDate = lastHour.setMinutes(0, 0, 0) 14 const startDate = lastHour.setMinutes(0, 0, 0)
16 const endDate = lastHour.setMinutes(59, 59, 999) 15 const endDate = lastHour.setMinutes(59, 59, 999)
17 16
18 const videoIds = await Redis.Instance.getVideosIdViewed(hour) 17 const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
19 if (videoIds.length === 0) return 18 if (videoIds.length === 0) return
20 19
21 logger.info('Processing videos views in job for hour %d.', hour) 20 logger.info('Processing videos views stats in job for hour %d.', hour)
22 21
23 for (const videoId of videoIds) { 22 for (const videoId of videoIds) {
24 try { 23 try {
25 const views = await Redis.Instance.getVideoViews(videoId, hour) 24 const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
26 await Redis.Instance.deleteVideoViews(videoId, hour) 25 await Redis.Instance.deleteVideoViewsStats(videoId, hour)
27 26
28 if (views) { 27 if (views) {
29 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 28 logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
30 29
31 try { 30 try {
32 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 31 const video = await VideoModel.load(videoId)
33 if (!video) { 32 if (!video) {
34 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) 33 logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
35 continue 34 continue
36 } 35 }
37 36
@@ -41,21 +40,12 @@ async function processVideosViews () {
41 views, 40 views,
42 videoId 41 videoId
43 }) 42 })
44
45 if (video.isOwned()) {
46 // If this is a remote video, the origin instance will send us an update
47 await VideoModel.incrementViews(videoId, views)
48
49 // Send video update
50 video.views += views
51 await federateVideoIfNeeded(video, false)
52 }
53 } catch (err) { 43 } catch (err) {
54 logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) 44 logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
55 } 45 }
56 } 46 }
57 } catch (err) { 47 } catch (err) {
58 logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) 48 logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
59 } 49 }
60 } 50 }
61} 51}
@@ -63,5 +53,5 @@ async function processVideosViews () {
63// --------------------------------------------------------------------------- 53// ---------------------------------------------------------------------------
64 54
65export { 55export {
66 processVideosViews 56 processVideosViewsStats
67} 57}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0eab720d9..4c1597b33 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import'
36import { processVideoImport } from './handlers/video-import' 36import { processVideoImport } from './handlers/video-import'
37import { processVideoLiveEnding } from './handlers/video-live-ending' 37import { processVideoLiveEnding } from './handlers/video-live-ending'
38import { processVideoTranscoding } from './handlers/video-transcoding' 38import { processVideoTranscoding } from './handlers/video-transcoding'
39import { processVideosViews } from './handlers/video-views' 39import { processVideosViewsStats } from './handlers/video-views-stats'
40 40
41type CreateJobArgument = 41type CreateJobArgument =
42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -49,7 +49,7 @@ type CreateJobArgument =
49 { type: 'email', payload: EmailPayload } | 49 { type: 'email', payload: EmailPayload } |
50 { type: 'video-import', payload: VideoImportPayload } | 50 { type: 'video-import', payload: VideoImportPayload } |
51 { type: 'activitypub-refresher', payload: RefreshPayload } | 51 { type: 'activitypub-refresher', payload: RefreshPayload } |
52 { type: 'videos-views', payload: {} } | 52 { type: 'videos-views-stats', payload: {} } |
53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } | 53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
54 { type: 'actor-keys', payload: ActorKeysPayload } | 54 { type: 'actor-keys', payload: ActorKeysPayload } |
55 { type: 'video-redundancy', payload: VideoRedundancyPayload } | 55 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
71 'video-transcoding': processVideoTranscoding, 71 'video-transcoding': processVideoTranscoding,
72 'email': processEmail, 72 'email': processEmail,
73 'video-import': processVideoImport, 73 'video-import': processVideoImport,
74 'videos-views': processVideosViews, 74 'videos-views-stats': processVideosViewsStats,
75 'activitypub-refresher': refreshAPObject, 75 'activitypub-refresher': refreshAPObject,
76 'video-live-ending': processVideoLiveEnding, 76 'video-live-ending': processVideoLiveEnding,
77 'actor-keys': processActorKeys, 77 'actor-keys': processActorKeys,
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [
89 'video-transcoding', 89 'video-transcoding',
90 'video-file-import', 90 'video-file-import',
91 'video-import', 91 'video-import',
92 'videos-views', 92 'videos-views-stats',
93 'activitypub-refresher', 93 'activitypub-refresher',
94 'video-redundancy', 94 'video-redundancy',
95 'actor-keys', 95 'actor-keys',
@@ -247,8 +247,8 @@ class JobQueue {
247 } 247 }
248 248
249 private addRepeatableJobs () { 249 private addRepeatableJobs () {
250 this.queues['videos-views'].add({}, { 250 this.queues['videos-views-stats'].add({}, {
251 repeat: REPEAT_JOBS['videos-views'] 251 repeat: REPEAT_JOBS['videos-views-stats']
252 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 252 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
253 253
254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 1b7b9dd4d..2562edb75 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,7 +2,6 @@
2import { readFile } from 'fs-extra' 2import { readFile } from 'fs-extra'
3import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { isTestInstance } from '@server/helpers/core-utils'
6import { 5import {
7 computeResolutionsToTranscode, 6 computeResolutionsToTranscode,
8 ffprobePromise, 7 ffprobePromise,
@@ -12,7 +11,7 @@ import {
12} from '@server/helpers/ffprobe-utils' 11} from '@server/helpers/ffprobe-utils'
13import { logger, loggerTagsFactory } from '@server/helpers/logger' 12import { logger, loggerTagsFactory } from '@server/helpers/logger'
14import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 13import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
15import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' 14import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
16import { UserModel } from '@server/models/user/user' 15import { UserModel } from '@server/models/user/user'
17import { VideoModel } from '@server/models/video/video' 16import { VideoModel } from '@server/models/video/video'
18import { VideoLiveModel } from '@server/models/video/video-live' 17import { VideoLiveModel } from '@server/models/video/video-live'
@@ -53,8 +52,6 @@ class LiveManager {
53 52
54 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
55 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<number, string>()
56 // Values are Date().getTime()
57 private readonly watchersPerVideo = new Map<number, number[]>()
58 55
59 private rtmpServer: Server 56 private rtmpServer: Server
60 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -99,8 +96,6 @@ class LiveManager {
99 // Cleanup broken lives, that were terminated by a server restart for example 96 // Cleanup broken lives, that were terminated by a server restart for example
100 this.handleBrokenLives() 97 this.handleBrokenLives()
101 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) 98 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
102
103 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
104 } 99 }
105 100
106 async run () { 101 async run () {
@@ -184,19 +179,6 @@ class LiveManager {
184 this.abortSession(sessionId) 179 this.abortSession(sessionId)
185 } 180 }
186 181
187 addViewTo (videoId: number) {
188 if (this.videoSessions.has(videoId) === false) return
189
190 let watchers = this.watchersPerVideo.get(videoId)
191
192 if (!watchers) {
193 watchers = []
194 this.watchersPerVideo.set(videoId, watchers)
195 }
196
197 watchers.push(new Date().getTime())
198 }
199
200 private getContext () { 182 private getContext () {
201 return context 183 return context
202 } 184 }
@@ -377,7 +359,6 @@ class LiveManager {
377 } 359 }
378 360
379 private onMuxingFFmpegEnd (videoId: number) { 361 private onMuxingFFmpegEnd (videoId: number) {
380 this.watchersPerVideo.delete(videoId)
381 this.videoSessions.delete(videoId) 362 this.videoSessions.delete(videoId)
382 } 363 }
383 364
@@ -411,34 +392,6 @@ class LiveManager {
411 } 392 }
412 } 393 }
413 394
414 private async updateLiveViews () {
415 if (!this.isRunning()) return
416
417 if (!isTestInstance()) logger.info('Updating live video views.', lTags())
418
419 for (const videoId of this.watchersPerVideo.keys()) {
420 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
421
422 const watchers = this.watchersPerVideo.get(videoId)
423
424 const numWatchers = watchers.length
425
426 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
427 video.views = numWatchers
428 await video.save()
429
430 await federateVideoIfNeeded(video, false)
431
432 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
433
434 // Only keep not expired watchers
435 const newWatchers = watchers.filter(w => w > notBefore)
436 this.watchersPerVideo.set(videoId, newWatchers)
437
438 logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
439 }
440 }
441
442 private async handleBrokenLives () { 395 private async handleBrokenLives () {
443 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 396 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
444 397
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 901435dea..0398ca61d 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -1,7 +1,7 @@
1import { Server as HTTPServer } from 'http' 1import { Server as HTTPServer } from 'http'
2import { Namespace, Server as SocketServer, Socket } from 'socket.io' 2import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3import { isIdValid } from '@server/helpers/custom-validators/misc' 3import { isIdValid } from '@server/helpers/custom-validators/misc'
4import { MVideo } from '@server/types/models' 4import { MVideo, MVideoImmutable } from '@server/types/models'
5import { UserNotificationModelForApi } from '@server/types/models/user' 5import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' 6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger' 7import { logger } from '../helpers/logger'
@@ -78,11 +78,11 @@ class PeerTubeSocket {
78 .emit(type, data) 78 .emit(type, data)
79 } 79 }
80 80
81 sendVideoViewsUpdate (video: MVideo) { 81 sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
82 const data: LiveVideoEventPayload = { views: video.views } 82 const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
83 const type: LiveVideoEventType = 'views-change' 83 const type: LiveVideoEventType = 'views-change'
84 84
85 logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) 85 logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
86 86
87 this.liveVideosNamespace 87 this.liveVideosNamespace
88 .in(video.id) 88 .in(video.id)
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 46617b07e..76b7868e8 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -13,6 +13,7 @@ import {
13 RESUMABLE_UPLOAD_SESSION_LIFETIME 13 RESUMABLE_UPLOAD_SESSION_LIFETIME
14} from '../initializers/constants' 14} from '../initializers/constants'
15import { CONFIG } from '../initializers/config' 15import { CONFIG } from '../initializers/config'
16import { exists } from '@server/helpers/custom-validators/misc'
16 17
17type CachedRoute = { 18type CachedRoute = {
18 body: string 19 body: string
@@ -119,16 +120,20 @@ class Redis {
119 120
120 /* ************ Views per IP ************ */ 121 /* ************ Views per IP ************ */
121 122
122 setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { 123 setIPVideoView (ip: string, videoUUID: string) {
123 const lifetime = isLive 124 return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
124 ? VIEW_LIFETIME.LIVE 125 }
125 : VIEW_LIFETIME.VIDEO
126 126
127 return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) 127 setIPVideoViewer (ip: string, videoUUID: string) {
128 return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
128 } 129 }
129 130
130 async doesVideoIPViewExist (ip: string, videoUUID: string) { 131 async doesVideoIPViewExist (ip: string, videoUUID: string) {
131 return this.exists(this.generateViewKey(ip, videoUUID)) 132 return this.exists(this.generateIPViewKey(ip, videoUUID))
133 }
134
135 async doesVideoIPViewerExist (ip: string, videoUUID: string) {
136 return this.exists(this.generateIPViewerKey(ip, videoUUID))
132 } 137 }
133 138
134 /* ************ Tracker IP block ************ */ 139 /* ************ Tracker IP block ************ */
@@ -160,46 +165,85 @@ class Redis {
160 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) 165 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
161 } 166 }
162 167
163 /* ************ Video views ************ */ 168 /* ************ Video views stats ************ */
164 169
165 addVideoView (videoId: number) { 170 addVideoViewStats (videoId: number) {
166 const keyIncr = this.generateVideoViewKey(videoId) 171 const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
167 const keySet = this.generateVideosViewKey()
168 172
169 return Promise.all([ 173 return Promise.all([
170 this.addToSet(keySet, videoId.toString()), 174 this.addToSet(setKey, videoId.toString()),
171 this.increment(keyIncr) 175 this.increment(videoKey)
172 ]) 176 ])
173 } 177 }
174 178
175 async getVideoViews (videoId: number, hour: number) { 179 async getVideoViewsStats (videoId: number, hour: number) {
176 const key = this.generateVideoViewKey(videoId, hour) 180 const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
177 181
178 const valueString = await this.getValue(key) 182 const valueString = await this.getValue(videoKey)
179 const valueInt = parseInt(valueString, 10) 183 const valueInt = parseInt(valueString, 10)
180 184
181 if (isNaN(valueInt)) { 185 if (isNaN(valueInt)) {
182 logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) 186 logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
183 return undefined 187 return undefined
184 } 188 }
185 189
186 return valueInt 190 return valueInt
187 } 191 }
188 192
189 async getVideosIdViewed (hour: number) { 193 async listVideosViewedForStats (hour: number) {
190 const key = this.generateVideosViewKey(hour) 194 const { setKey } = this.generateVideoViewStatsKeys({ hour })
191 195
192 const stringIds = await this.getSet(key) 196 const stringIds = await this.getSet(setKey)
193 return stringIds.map(s => parseInt(s, 10)) 197 return stringIds.map(s => parseInt(s, 10))
194 } 198 }
195 199
196 deleteVideoViews (videoId: number, hour: number) { 200 deleteVideoViewsStats (videoId: number, hour: number) {
197 const keySet = this.generateVideosViewKey(hour) 201 const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
198 const keyIncr = this.generateVideoViewKey(videoId, hour) 202
203 return Promise.all([
204 this.deleteFromSet(setKey, videoId.toString()),
205 this.deleteKey(videoKey)
206 ])
207 }
208
209 /* ************ Local video views buffer ************ */
210
211 addLocalVideoView (videoId: number) {
212 const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
199 213
200 return Promise.all([ 214 return Promise.all([
201 this.deleteFromSet(keySet, videoId.toString()), 215 this.addToSet(setKey, videoId.toString()),
202 this.deleteKey(keyIncr) 216 this.increment(videoKey)
217 ])
218 }
219
220 async getLocalVideoViews (videoId: number) {
221 const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
222
223 const valueString = await this.getValue(videoKey)
224 const valueInt = parseInt(valueString, 10)
225
226 if (isNaN(valueInt)) {
227 logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
228 return undefined
229 }
230
231 return valueInt
232 }
233
234 async listLocalVideosViewed () {
235 const { setKey } = this.generateLocalVideoViewsKeys()
236
237 const stringIds = await this.getSet(setKey)
238 return stringIds.map(s => parseInt(s, 10))
239 }
240
241 deleteLocalVideoViews (videoId: number) {
242 const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
243
244 return Promise.all([
245 this.deleteFromSet(setKey, videoId.toString()),
246 this.deleteKey(videoKey)
203 ]) 247 ])
204 } 248 }
205 249
@@ -233,16 +277,16 @@ class Redis {
233 return req.method + '-' + req.originalUrl 277 return req.method + '-' + req.originalUrl
234 } 278 }
235 279
236 private generateVideosViewKey (hour?: number) { 280 private generateLocalVideoViewsKeys (videoId?: Number) {
237 if (!hour) hour = new Date().getHours() 281 return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
238
239 return `videos-view-h${hour}`
240 } 282 }
241 283
242 private generateVideoViewKey (videoId: number, hour?: number) { 284 private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
243 if (hour === undefined || hour === null) hour = new Date().getHours() 285 const hour = exists(options.hour)
286 ? options.hour
287 : new Date().getHours()
244 288
245 return `video-view-${videoId}-h${hour}` 289 return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
246 } 290 }
247 291
248 private generateResetPasswordKey (userId: number) { 292 private generateResetPasswordKey (userId: number) {
@@ -253,10 +297,14 @@ class Redis {
253 return 'verify-email-' + userId 297 return 'verify-email-' + userId
254 } 298 }
255 299
256 private generateViewKey (ip: string, videoUUID: string) { 300 private generateIPViewKey (ip: string, videoUUID: string) {
257 return `views-${videoUUID}-${ip}` 301 return `views-${videoUUID}-${ip}`
258 } 302 }
259 303
304 private generateIPViewerKey (ip: string, videoUUID: string) {
305 return `viewer-${videoUUID}-${ip}`
306 }
307
260 private generateTrackerBlockIPKey (ip: string) { 308 private generateTrackerBlockIPKey (ip: string) {
261 return `tracker-block-ip-${ip}` 309 return `tracker-block-ip-${ip}`
262 } 310 }
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts
new file mode 100644
index 000000000..c0e72c461
--- /dev/null
+++ b/server/lib/schedulers/video-views-buffer-scheduler.ts
@@ -0,0 +1,52 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VideoModel } from '@server/models/video/video'
3import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { federateVideoIfNeeded } from '../activitypub/videos'
5import { Redis } from '../redis'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViewsBufferScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const videoIds = await Redis.Instance.listLocalVideosViewed()
22 if (videoIds.length === 0) return
23
24 logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
25
26 for (const videoId of videoIds) {
27 try {
28 const views = await Redis.Instance.getLocalVideoViews(videoId)
29 await Redis.Instance.deleteLocalVideoViews(videoId)
30
31 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
32 if (!video) {
33 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
34 continue
35 }
36
37 // If this is a remote video, the origin instance will send us an update
38 await VideoModel.incrementViews(videoId, views)
39
40 // Send video update
41 video.views += views
42 await federateVideoIfNeeded(video, false)
43 } catch (err) {
44 logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
45 }
46 }
47 }
48
49 static get Instance () {
50 return this.instance || (this.instance = new this())
51 }
52}
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts
new file mode 100644
index 000000000..220b509c2
--- /dev/null
+++ b/server/lib/video-views.ts
@@ -0,0 +1,130 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VIEW_LIFETIME } from '@server/initializers/constants'
3import { VideoModel } from '@server/models/video/video'
4import { MVideo } from '@server/types/models'
5import { PeerTubeSocket } from './peertube-socket'
6import { Redis } from './redis'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViews {
11
12 // Values are Date().getTime()
13 private readonly viewersPerVideo = new Map<number, number[]>()
14
15 private static instance: VideoViews
16
17 private constructor () {
18 }
19
20 init () {
21 setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
22 }
23
24 async processView (options: {
25 video: MVideo
26 ip: string | null
27 viewerExpires?: Date
28 }) {
29 const { video, ip, viewerExpires } = options
30
31 logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
32
33 let success = await this.addView(video, ip)
34
35 if (video.isLive) {
36 const successViewer = await this.addViewer(video, ip, viewerExpires)
37 success ||= successViewer
38 }
39
40 return success
41 }
42
43 getViewers (video: MVideo) {
44 const viewers = this.viewersPerVideo.get(video.id)
45 if (!viewers) return 0
46
47 return viewers.length
48 }
49
50 buildViewerExpireTime () {
51 return new Date().getTime() + VIEW_LIFETIME.VIEWER
52 }
53
54 private async addView (video: MVideo, ip: string | null) {
55 const promises: Promise<any>[] = []
56
57 if (ip !== null) {
58 const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
59 if (viewExists) return false
60
61 promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
62 }
63
64 if (video.isOwned()) {
65 promises.push(Redis.Instance.addLocalVideoView(video.id))
66 }
67
68 promises.push(Redis.Instance.addVideoViewStats(video.id))
69
70 await Promise.all(promises)
71
72 return true
73 }
74
75 private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
76 if (ip !== null) {
77 const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
78 if (viewExists) return false
79
80 await Redis.Instance.setIPVideoViewer(ip, video.uuid)
81 }
82
83 let watchers = this.viewersPerVideo.get(video.id)
84
85 if (!watchers) {
86 watchers = []
87 this.viewersPerVideo.set(video.id, watchers)
88 }
89
90 const expiration = viewerExpires
91 ? viewerExpires.getTime()
92 : this.buildViewerExpireTime()
93
94 watchers.push(expiration)
95 await this.notifyClients(video.id, watchers.length)
96
97 return true
98 }
99
100 private async cleanViewers () {
101 logger.info('Cleaning video viewers.', lTags())
102
103 for (const videoId of this.viewersPerVideo.keys()) {
104 const notBefore = new Date().getTime()
105
106 const viewers = this.viewersPerVideo.get(videoId)
107
108 // Only keep not expired viewers
109 const newViewers = viewers.filter(w => w > notBefore)
110
111 if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
112 else this.viewersPerVideo.set(videoId, newViewers)
113
114 await this.notifyClients(videoId, newViewers.length)
115 }
116 }
117
118 private async notifyClients (videoId: string | number, viewersLength: number) {
119 const video = await VideoModel.loadImmutableAttributes(videoId)
120 if (!video) return
121
122 PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
123
124 logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
125 }
126
127 static get Instance () {
128 return this.instance || (this.instance = new this())
129 }
130}