aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/views/shared/video-viewer-stats.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-04-06 08:50:43 +0200
committerChocobozzz <chocobozzz@cpy.re>2022-04-15 09:49:35 +0200
commitac907dc7c158056e9b6a5cb58acd27df5c7c2670 (patch)
treef9d8bff22e0543a305c64e1a1808c0df6d512f1e /server/lib/views/shared/video-viewer-stats.ts
parentdfbcefc20dc64f0814b1f2e8e782a4ea1bd24db2 (diff)
downloadPeerTube-ac907dc7c158056e9b6a5cb58acd27df5c7c2670.tar.gz
PeerTube-ac907dc7c158056e9b6a5cb58acd27df5c7c2670.tar.zst
PeerTube-ac907dc7c158056e9b6a5cb58acd27df5c7c2670.zip
Improve viewer counter
More precise, avoid weird decrease, reuse an id to federate viewers
Diffstat (limited to 'server/lib/views/shared/video-viewer-stats.ts')
-rw-r--r--server/lib/views/shared/video-viewer-stats.ts185
1 files changed, 185 insertions, 0 deletions
diff --git a/server/lib/views/shared/video-viewer-stats.ts b/server/lib/views/shared/video-viewer-stats.ts
new file mode 100644
index 000000000..fd66fd5c7
--- /dev/null
+++ b/server/lib/views/shared/video-viewer-stats.ts
@@ -0,0 +1,185 @@
1import { Transaction } from 'sequelize/types'
2import { isTestInstance } from '@server/helpers/core-utils'
3import { GeoIP } from '@server/helpers/geo-ip'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants'
6import { sequelizeTypescript } from '@server/initializers/database'
7import { sendCreateWatchAction } from '@server/lib/activitypub/send'
8import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url'
9import { Redis } from '@server/lib/redis'
10import { VideoModel } from '@server/models/video/video'
11import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
12import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section'
13import { MVideo } from '@server/types/models'
14import { VideoViewEvent } from '@shared/models'
15
16const lTags = loggerTagsFactory('views')
17
18type LocalViewerStats = {
19 firstUpdated: number // Date.getTime()
20 lastUpdated: number // Date.getTime()
21
22 watchSections: {
23 start: number
24 end: number
25 }[]
26
27 watchTime: number
28
29 country: string
30
31 videoId: number
32}
33
34export class VideoViewerStats {
35 private processingViewersStats = false
36
37 constructor () {
38 setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
39 }
40
41 // ---------------------------------------------------------------------------
42
43 async addLocalViewer (options: {
44 video: MVideo
45 currentTime: number
46 ip: string
47 viewEvent?: VideoViewEvent
48 }) {
49 const { video, ip, viewEvent, currentTime } = options
50
51 logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
52
53 return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
54 }
55
56 // ---------------------------------------------------------------------------
57
58 async getWatchTime (videoId: number, ip: string) {
59 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
60
61 return stats?.watchTime || 0
62 }
63
64 // ---------------------------------------------------------------------------
65
66 private async updateLocalViewerStats (options: {
67 video: MVideo
68 ip: string
69 currentTime: number
70 viewEvent?: VideoViewEvent
71 }) {
72 const { video, ip, viewEvent, currentTime } = options
73 const nowMs = new Date().getTime()
74
75 let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id })
76
77 if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) {
78 logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) })
79 return
80 }
81
82 if (!stats) {
83 const country = await GeoIP.Instance.safeCountryISOLookup(ip)
84
85 stats = {
86 firstUpdated: nowMs,
87 lastUpdated: nowMs,
88
89 watchSections: [],
90
91 watchTime: 0,
92
93 country,
94 videoId: video.id
95 }
96 }
97
98 stats.lastUpdated = nowMs
99
100 if (viewEvent === 'seek' || stats.watchSections.length === 0) {
101 stats.watchSections.push({
102 start: currentTime,
103 end: currentTime
104 })
105 } else {
106 const lastSection = stats.watchSections[stats.watchSections.length - 1]
107 lastSection.end = currentTime
108 }
109
110 stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections)
111
112 logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
113
114 await Redis.Instance.setLocalVideoViewer(ip, video.id, stats)
115 }
116
117 async processViewerStats () {
118 if (this.processingViewersStats) return
119 this.processingViewersStats = true
120
121 if (!isTestInstance()) logger.info('Processing viewer statistics.', lTags())
122
123 const now = new Date().getTime()
124
125 try {
126 const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
127
128 for (const key of allKeys) {
129 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key })
130
131 // Process expired stats
132 if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) {
133 continue
134 }
135
136 try {
137 await sequelizeTypescript.transaction(async t => {
138 const video = await VideoModel.load(stats.videoId, t)
139
140 const statsModel = await this.saveViewerStats(video, stats, t)
141
142 if (video.remote) {
143 await sendCreateWatchAction(statsModel, t)
144 }
145 })
146
147 await Redis.Instance.deleteLocalVideoViewersKeys(key)
148 } catch (err) {
149 logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() })
150 }
151 }
152 } catch (err) {
153 logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
154 }
155
156 this.processingViewersStats = false
157 }
158
159 private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {
160 const statsModel = new LocalVideoViewerModel({
161 startDate: new Date(stats.firstUpdated),
162 endDate: new Date(stats.lastUpdated),
163 watchTime: stats.watchTime,
164 country: stats.country,
165 videoId: video.id
166 })
167
168 statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel)
169 statsModel.Video = video as VideoModel
170
171 await statsModel.save({ transaction })
172
173 statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({
174 localVideoViewerId: statsModel.id,
175 watchSections: stats.watchSections,
176 transaction
177 })
178
179 return statsModel
180 }
181
182 private buildWatchTimeFromSections (sections: { start: number, end: number }[]) {
183 return sections.reduce((p, current) => p + (current.end - current.start), 0)
184 }
185}