aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/views/shared/video-viewers.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-03-24 13:36:47 +0100
committerChocobozzz <chocobozzz@cpy.re>2022-04-15 09:49:35 +0200
commitb211106695bb82f6c32e53306081b5262c3d109d (patch)
treefa187de1c33b0956665f5362e29af6b0f6d8bb57 /server/lib/views/shared/video-viewers.ts
parent69d48ee30c9d47cddf0c3c047dc99a99dcb6e894 (diff)
downloadPeerTube-b211106695bb82f6c32e53306081b5262c3d109d.tar.gz
PeerTube-b211106695bb82f6c32e53306081b5262c3d109d.tar.zst
PeerTube-b211106695bb82f6c32e53306081b5262c3d109d.zip
Support video views/viewers stats in server
* Add "currentTime" and "event" body params to view endpoint * Merge watching and view endpoints * Introduce WatchAction AP activity * Add tables to store viewer information of local videos * Add endpoints to fetch video views/viewers stats of local videos * Refactor views/viewers handlers * Support "views" and "viewers" counters for both VOD and live videos
Diffstat (limited to 'server/lib/views/shared/video-viewers.ts')
-rw-r--r--server/lib/views/shared/video-viewers.ts276
1 files changed, 276 insertions, 0 deletions
diff --git a/server/lib/views/shared/video-viewers.ts b/server/lib/views/shared/video-viewers.ts
new file mode 100644
index 000000000..5c26f8982
--- /dev/null
+++ b/server/lib/views/shared/video-viewers.ts
@@ -0,0 +1,276 @@
1import { Transaction } from 'sequelize/types'
2import { isTestInstance } from '@server/helpers/core-utils'
3import { GeoIP } from '@server/helpers/geo-ip'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants'
6import { sequelizeTypescript } from '@server/initializers/database'
7import { sendCreateWatchAction } from '@server/lib/activitypub/send'
8import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url'
9import { PeerTubeSocket } from '@server/lib/peertube-socket'
10import { Redis } from '@server/lib/redis'
11import { VideoModel } from '@server/models/video/video'
12import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
13import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section'
14import { MVideo } from '@server/types/models'
15import { VideoViewEvent } from '@shared/models'
16
17const lTags = loggerTagsFactory('views')
18
19type LocalViewerStats = {
20 firstUpdated: number // Date.getTime()
21 lastUpdated: number // Date.getTime()
22
23 watchSections: {
24 start: number
25 end: number
26 }[]
27
28 watchTime: number
29
30 country: string
31
32 videoId: number
33}
34
35export class VideoViewers {
36
37 // Values are Date().getTime()
38 private readonly viewersPerVideo = new Map<number, number[]>()
39
40 private processingViewerCounters = false
41 private processingViewerStats = false
42
43 constructor () {
44 setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER)
45
46 setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
47 }
48
49 // ---------------------------------------------------------------------------
50
51 getViewers (video: MVideo) {
52 const viewers = this.viewersPerVideo.get(video.id)
53 if (!viewers) return 0
54
55 return viewers.length
56 }
57
58 buildViewerExpireTime () {
59 return new Date().getTime() + VIEW_LIFETIME.VIEWER
60 }
61
62 async getWatchTime (videoId: number, ip: string) {
63 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
64
65 return stats?.watchTime || 0
66 }
67
68 async addLocalViewer (options: {
69 video: MVideo
70 currentTime: number
71 ip: string
72 viewEvent?: VideoViewEvent
73 }) {
74 const { video, ip, viewEvent, currentTime } = options
75
76 logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
77
78 await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
79
80 const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
81 if (viewExists) return false
82
83 await Redis.Instance.setIPVideoViewer(ip, video.uuid)
84
85 return this.addViewerToVideo({ video })
86 }
87
88 async addRemoteViewer (options: {
89 video: MVideo
90 viewerExpires: Date
91 }) {
92 const { video, viewerExpires } = options
93
94 logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
95
96 return this.addViewerToVideo({ video, viewerExpires })
97 }
98
99 private async addViewerToVideo (options: {
100 video: MVideo
101 viewerExpires?: Date
102 }) {
103 const { video, viewerExpires } = options
104
105 let watchers = this.viewersPerVideo.get(video.id)
106
107 if (!watchers) {
108 watchers = []
109 this.viewersPerVideo.set(video.id, watchers)
110 }
111
112 const expiration = viewerExpires
113 ? viewerExpires.getTime()
114 : this.buildViewerExpireTime()
115
116 watchers.push(expiration)
117 await this.notifyClients(video.id, watchers.length)
118
119 return true
120 }
121
122 private async updateLocalViewerStats (options: {
123 video: MVideo
124 ip: string
125 currentTime: number
126 viewEvent?: VideoViewEvent
127 }) {
128 const { video, ip, viewEvent, currentTime } = options
129 const nowMs = new Date().getTime()
130
131 let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id })
132
133 if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) {
134 logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) })
135 return
136 }
137
138 if (!stats) {
139 const country = await GeoIP.Instance.safeCountryISOLookup(ip)
140
141 stats = {
142 firstUpdated: nowMs,
143 lastUpdated: nowMs,
144
145 watchSections: [],
146
147 watchTime: 0,
148
149 country,
150 videoId: video.id
151 }
152 }
153
154 stats.lastUpdated = nowMs
155
156 if (viewEvent === 'seek' || stats.watchSections.length === 0) {
157 stats.watchSections.push({
158 start: currentTime,
159 end: currentTime
160 })
161 } else {
162 const lastSection = stats.watchSections[stats.watchSections.length - 1]
163 lastSection.end = currentTime
164 }
165
166 stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections)
167
168 logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
169
170 await Redis.Instance.setLocalVideoViewer(ip, video.id, stats)
171 }
172
173 private async cleanViewerCounters () {
174 if (this.processingViewerCounters) return
175 this.processingViewerCounters = true
176
177 if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags())
178
179 try {
180 for (const videoId of this.viewersPerVideo.keys()) {
181 const notBefore = new Date().getTime()
182
183 const viewers = this.viewersPerVideo.get(videoId)
184
185 // Only keep not expired viewers
186 const newViewers = viewers.filter(w => w > notBefore)
187
188 if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
189 else this.viewersPerVideo.set(videoId, newViewers)
190
191 await this.notifyClients(videoId, newViewers.length)
192 }
193 } catch (err) {
194 logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
195 }
196
197 this.processingViewerCounters = false
198 }
199
200 private async notifyClients (videoId: string | number, viewersLength: number) {
201 const video = await VideoModel.loadImmutableAttributes(videoId)
202 if (!video) return
203
204 PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
205
206 logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
207 }
208
209 async processViewerStats () {
210 if (this.processingViewerStats) return
211 this.processingViewerStats = true
212
213 if (!isTestInstance()) logger.info('Processing viewers.', lTags())
214
215 const now = new Date().getTime()
216
217 try {
218 const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
219
220 for (const key of allKeys) {
221 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key })
222
223 if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) {
224 continue
225 }
226
227 try {
228 await sequelizeTypescript.transaction(async t => {
229 const video = await VideoModel.load(stats.videoId, t)
230
231 const statsModel = await this.saveViewerStats(video, stats, t)
232
233 if (video.remote) {
234 await sendCreateWatchAction(statsModel, t)
235 }
236 })
237
238 await Redis.Instance.deleteLocalVideoViewersKeys(key)
239 } catch (err) {
240 logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() })
241 }
242 }
243 } catch (err) {
244 logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
245 }
246
247 this.processingViewerStats = false
248 }
249
250 private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {
251 const statsModel = new LocalVideoViewerModel({
252 startDate: new Date(stats.firstUpdated),
253 endDate: new Date(stats.lastUpdated),
254 watchTime: stats.watchTime,
255 country: stats.country,
256 videoId: video.id
257 })
258
259 statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel)
260 statsModel.Video = video as VideoModel
261
262 await statsModel.save({ transaction })
263
264 statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({
265 localVideoViewerId: statsModel.id,
266 watchSections: stats.watchSections,
267 transaction
268 })
269
270 return statsModel
271 }
272
273 private buildWatchTimeFromSections (sections: { start: number, end: number }[]) {
274 return sections.reduce((p, current) => p + (current.end - current.start), 0)
275 }
276}