diff options
author | Chocobozzz <me@florianbigard.com> | 2022-03-24 13:36:47 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2022-04-15 09:49:35 +0200 |
commit | b211106695bb82f6c32e53306081b5262c3d109d (patch) | |
tree | fa187de1c33b0956665f5362e29af6b0f6d8bb57 /server/lib/views/shared/video-viewers.ts | |
parent | 69d48ee30c9d47cddf0c3c047dc99a99dcb6e894 (diff) | |
download | PeerTube-b211106695bb82f6c32e53306081b5262c3d109d.tar.gz PeerTube-b211106695bb82f6c32e53306081b5262c3d109d.tar.zst PeerTube-b211106695bb82f6c32e53306081b5262c3d109d.zip |
Support video views/viewers stats in server
* Add "currentTime" and "event" body params to view endpoint
* Merge watching and view endpoints
* Introduce WatchAction AP activity
* Add tables to store viewer information of local videos
* Add endpoints to fetch video views/viewers stats of local videos
* Refactor views/viewers handlers
* Support "views" and "viewers" counters for both VOD and live videos
Diffstat (limited to 'server/lib/views/shared/video-viewers.ts')
-rw-r--r-- | server/lib/views/shared/video-viewers.ts | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/server/lib/views/shared/video-viewers.ts b/server/lib/views/shared/video-viewers.ts new file mode 100644 index 000000000..5c26f8982 --- /dev/null +++ b/server/lib/views/shared/video-viewers.ts | |||
@@ -0,0 +1,276 @@ | |||
1 | import { Transaction } from 'sequelize/types' | ||
2 | import { isTestInstance } from '@server/helpers/core-utils' | ||
3 | import { GeoIP } from '@server/helpers/geo-ip' | ||
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
5 | import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' | ||
6 | import { sequelizeTypescript } from '@server/initializers/database' | ||
7 | import { sendCreateWatchAction } from '@server/lib/activitypub/send' | ||
8 | import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' | ||
9 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | ||
10 | import { Redis } from '@server/lib/redis' | ||
11 | import { VideoModel } from '@server/models/video/video' | ||
12 | import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' | ||
13 | import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' | ||
14 | import { MVideo } from '@server/types/models' | ||
15 | import { VideoViewEvent } from '@shared/models' | ||
16 | |||
17 | const lTags = loggerTagsFactory('views') | ||
18 | |||
19 | type LocalViewerStats = { | ||
20 | firstUpdated: number // Date.getTime() | ||
21 | lastUpdated: number // Date.getTime() | ||
22 | |||
23 | watchSections: { | ||
24 | start: number | ||
25 | end: number | ||
26 | }[] | ||
27 | |||
28 | watchTime: number | ||
29 | |||
30 | country: string | ||
31 | |||
32 | videoId: number | ||
33 | } | ||
34 | |||
35 | export class VideoViewers { | ||
36 | |||
37 | // Values are Date().getTime() | ||
38 | private readonly viewersPerVideo = new Map<number, number[]>() | ||
39 | |||
40 | private processingViewerCounters = false | ||
41 | private processingViewerStats = false | ||
42 | |||
43 | constructor () { | ||
44 | setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER) | ||
45 | |||
46 | setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) | ||
47 | } | ||
48 | |||
49 | // --------------------------------------------------------------------------- | ||
50 | |||
51 | getViewers (video: MVideo) { | ||
52 | const viewers = this.viewersPerVideo.get(video.id) | ||
53 | if (!viewers) return 0 | ||
54 | |||
55 | return viewers.length | ||
56 | } | ||
57 | |||
58 | buildViewerExpireTime () { | ||
59 | return new Date().getTime() + VIEW_LIFETIME.VIEWER | ||
60 | } | ||
61 | |||
62 | async getWatchTime (videoId: number, ip: string) { | ||
63 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) | ||
64 | |||
65 | return stats?.watchTime || 0 | ||
66 | } | ||
67 | |||
68 | async addLocalViewer (options: { | ||
69 | video: MVideo | ||
70 | currentTime: number | ||
71 | ip: string | ||
72 | viewEvent?: VideoViewEvent | ||
73 | }) { | ||
74 | const { video, ip, viewEvent, currentTime } = options | ||
75 | |||
76 | logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) | ||
77 | |||
78 | await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) | ||
79 | |||
80 | const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) | ||
81 | if (viewExists) return false | ||
82 | |||
83 | await Redis.Instance.setIPVideoViewer(ip, video.uuid) | ||
84 | |||
85 | return this.addViewerToVideo({ video }) | ||
86 | } | ||
87 | |||
88 | async addRemoteViewer (options: { | ||
89 | video: MVideo | ||
90 | viewerExpires: Date | ||
91 | }) { | ||
92 | const { video, viewerExpires } = options | ||
93 | |||
94 | logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) | ||
95 | |||
96 | return this.addViewerToVideo({ video, viewerExpires }) | ||
97 | } | ||
98 | |||
99 | private async addViewerToVideo (options: { | ||
100 | video: MVideo | ||
101 | viewerExpires?: Date | ||
102 | }) { | ||
103 | const { video, viewerExpires } = options | ||
104 | |||
105 | let watchers = this.viewersPerVideo.get(video.id) | ||
106 | |||
107 | if (!watchers) { | ||
108 | watchers = [] | ||
109 | this.viewersPerVideo.set(video.id, watchers) | ||
110 | } | ||
111 | |||
112 | const expiration = viewerExpires | ||
113 | ? viewerExpires.getTime() | ||
114 | : this.buildViewerExpireTime() | ||
115 | |||
116 | watchers.push(expiration) | ||
117 | await this.notifyClients(video.id, watchers.length) | ||
118 | |||
119 | return true | ||
120 | } | ||
121 | |||
122 | private async updateLocalViewerStats (options: { | ||
123 | video: MVideo | ||
124 | ip: string | ||
125 | currentTime: number | ||
126 | viewEvent?: VideoViewEvent | ||
127 | }) { | ||
128 | const { video, ip, viewEvent, currentTime } = options | ||
129 | const nowMs = new Date().getTime() | ||
130 | |||
131 | let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) | ||
132 | |||
133 | if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { | ||
134 | logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) | ||
135 | return | ||
136 | } | ||
137 | |||
138 | if (!stats) { | ||
139 | const country = await GeoIP.Instance.safeCountryISOLookup(ip) | ||
140 | |||
141 | stats = { | ||
142 | firstUpdated: nowMs, | ||
143 | lastUpdated: nowMs, | ||
144 | |||
145 | watchSections: [], | ||
146 | |||
147 | watchTime: 0, | ||
148 | |||
149 | country, | ||
150 | videoId: video.id | ||
151 | } | ||
152 | } | ||
153 | |||
154 | stats.lastUpdated = nowMs | ||
155 | |||
156 | if (viewEvent === 'seek' || stats.watchSections.length === 0) { | ||
157 | stats.watchSections.push({ | ||
158 | start: currentTime, | ||
159 | end: currentTime | ||
160 | }) | ||
161 | } else { | ||
162 | const lastSection = stats.watchSections[stats.watchSections.length - 1] | ||
163 | lastSection.end = currentTime | ||
164 | } | ||
165 | |||
166 | stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) | ||
167 | |||
168 | logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) | ||
169 | |||
170 | await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) | ||
171 | } | ||
172 | |||
173 | private async cleanViewerCounters () { | ||
174 | if (this.processingViewerCounters) return | ||
175 | this.processingViewerCounters = true | ||
176 | |||
177 | if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) | ||
178 | |||
179 | try { | ||
180 | for (const videoId of this.viewersPerVideo.keys()) { | ||
181 | const notBefore = new Date().getTime() | ||
182 | |||
183 | const viewers = this.viewersPerVideo.get(videoId) | ||
184 | |||
185 | // Only keep not expired viewers | ||
186 | const newViewers = viewers.filter(w => w > notBefore) | ||
187 | |||
188 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | ||
189 | else this.viewersPerVideo.set(videoId, newViewers) | ||
190 | |||
191 | await this.notifyClients(videoId, newViewers.length) | ||
192 | } | ||
193 | } catch (err) { | ||
194 | logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) | ||
195 | } | ||
196 | |||
197 | this.processingViewerCounters = false | ||
198 | } | ||
199 | |||
200 | private async notifyClients (videoId: string | number, viewersLength: number) { | ||
201 | const video = await VideoModel.loadImmutableAttributes(videoId) | ||
202 | if (!video) return | ||
203 | |||
204 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | ||
205 | |||
206 | logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) | ||
207 | } | ||
208 | |||
209 | async processViewerStats () { | ||
210 | if (this.processingViewerStats) return | ||
211 | this.processingViewerStats = true | ||
212 | |||
213 | if (!isTestInstance()) logger.info('Processing viewers.', lTags()) | ||
214 | |||
215 | const now = new Date().getTime() | ||
216 | |||
217 | try { | ||
218 | const allKeys = await Redis.Instance.listLocalVideoViewerKeys() | ||
219 | |||
220 | for (const key of allKeys) { | ||
221 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) | ||
222 | |||
223 | if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { | ||
224 | continue | ||
225 | } | ||
226 | |||
227 | try { | ||
228 | await sequelizeTypescript.transaction(async t => { | ||
229 | const video = await VideoModel.load(stats.videoId, t) | ||
230 | |||
231 | const statsModel = await this.saveViewerStats(video, stats, t) | ||
232 | |||
233 | if (video.remote) { | ||
234 | await sendCreateWatchAction(statsModel, t) | ||
235 | } | ||
236 | }) | ||
237 | |||
238 | await Redis.Instance.deleteLocalVideoViewersKeys(key) | ||
239 | } catch (err) { | ||
240 | logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) | ||
241 | } | ||
242 | } | ||
243 | } catch (err) { | ||
244 | logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) | ||
245 | } | ||
246 | |||
247 | this.processingViewerStats = false | ||
248 | } | ||
249 | |||
250 | private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { | ||
251 | const statsModel = new LocalVideoViewerModel({ | ||
252 | startDate: new Date(stats.firstUpdated), | ||
253 | endDate: new Date(stats.lastUpdated), | ||
254 | watchTime: stats.watchTime, | ||
255 | country: stats.country, | ||
256 | videoId: video.id | ||
257 | }) | ||
258 | |||
259 | statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) | ||
260 | statsModel.Video = video as VideoModel | ||
261 | |||
262 | await statsModel.save({ transaction }) | ||
263 | |||
264 | statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ | ||
265 | localVideoViewerId: statsModel.id, | ||
266 | watchSections: stats.watchSections, | ||
267 | transaction | ||
268 | }) | ||
269 | |||
270 | return statsModel | ||
271 | } | ||
272 | |||
273 | private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { | ||
274 | return sections.reduce((p, current) => p + (current.end - current.start), 0) | ||
275 | } | ||
276 | } | ||