]>
Commit | Line | Data |
---|---|---|
1 | import { Transaction } from 'sequelize/types' | |
2 | import { isTestOrDevInstance } from '@server/helpers/core-utils' | |
3 | import { GeoIP } from '@server/helpers/geo-ip' | |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
5 | import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' | |
6 | import { sequelizeTypescript } from '@server/initializers/database' | |
7 | import { sendCreateWatchAction } from '@server/lib/activitypub/send' | |
8 | import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' | |
9 | import { Redis } from '@server/lib/redis' | |
10 | import { VideoModel } from '@server/models/video/video' | |
11 | import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' | |
12 | import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' | |
13 | import { MVideo, MVideoImmutable } from '@server/types/models' | |
14 | import { VideoViewEvent } from '@shared/models' | |
15 | ||
16 | const lTags = loggerTagsFactory('views') | |
17 | ||
18 | type LocalViewerStats = { | |
19 | firstUpdated: number // Date.getTime() | |
20 | lastUpdated: number // Date.getTime() | |
21 | ||
22 | watchSections: { | |
23 | start: number | |
24 | end: number | |
25 | }[] | |
26 | ||
27 | watchTime: number | |
28 | ||
29 | country: string | |
30 | ||
31 | videoId: number | |
32 | } | |
33 | ||
34 | export class VideoViewerStats { | |
35 | private processingViewersStats = false | |
36 | ||
37 | constructor () { | |
38 | setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) | |
39 | } | |
40 | ||
41 | // --------------------------------------------------------------------------- | |
42 | ||
43 | async addLocalViewer (options: { | |
44 | video: MVideoImmutable | |
45 | currentTime: number | |
46 | ip: string | |
47 | viewEvent?: VideoViewEvent | |
48 | }) { | |
49 | const { video, ip, viewEvent, currentTime } = options | |
50 | ||
51 | logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) | |
52 | ||
53 | return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) | |
54 | } | |
55 | ||
56 | // --------------------------------------------------------------------------- | |
57 | ||
58 | async getWatchTime (videoId: number, ip: string) { | |
59 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) | |
60 | ||
61 | return stats?.watchTime || 0 | |
62 | } | |
63 | ||
64 | // --------------------------------------------------------------------------- | |
65 | ||
66 | private async updateLocalViewerStats (options: { | |
67 | video: MVideoImmutable | |
68 | ip: string | |
69 | currentTime: number | |
70 | viewEvent?: VideoViewEvent | |
71 | }) { | |
72 | const { video, ip, viewEvent, currentTime } = options | |
73 | const nowMs = new Date().getTime() | |
74 | ||
75 | let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) | |
76 | ||
77 | if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { | |
78 | logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) | |
79 | return | |
80 | } | |
81 | ||
82 | if (!stats) { | |
83 | const country = await GeoIP.Instance.safeCountryISOLookup(ip) | |
84 | ||
85 | stats = { | |
86 | firstUpdated: nowMs, | |
87 | lastUpdated: nowMs, | |
88 | ||
89 | watchSections: [], | |
90 | ||
91 | watchTime: 0, | |
92 | ||
93 | country, | |
94 | videoId: video.id | |
95 | } | |
96 | } | |
97 | ||
98 | stats.lastUpdated = nowMs | |
99 | ||
100 | if (viewEvent === 'seek' || stats.watchSections.length === 0) { | |
101 | stats.watchSections.push({ | |
102 | start: currentTime, | |
103 | end: currentTime | |
104 | }) | |
105 | } else { | |
106 | const lastSection = stats.watchSections[stats.watchSections.length - 1] | |
107 | lastSection.end = currentTime | |
108 | } | |
109 | ||
110 | stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) | |
111 | ||
112 | logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) | |
113 | ||
114 | await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) | |
115 | } | |
116 | ||
117 | async processViewerStats () { | |
118 | if (this.processingViewersStats) return | |
119 | this.processingViewersStats = true | |
120 | ||
121 | if (!isTestOrDevInstance()) logger.info('Processing viewer statistics.', lTags()) | |
122 | ||
123 | const now = new Date().getTime() | |
124 | ||
125 | try { | |
126 | const allKeys = await Redis.Instance.listLocalVideoViewerKeys() | |
127 | ||
128 | for (const key of allKeys) { | |
129 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) | |
130 | ||
131 | // Process expired stats | |
132 | if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { | |
133 | continue | |
134 | } | |
135 | ||
136 | try { | |
137 | await sequelizeTypescript.transaction(async t => { | |
138 | const video = await VideoModel.load(stats.videoId, t) | |
139 | if (!video) return | |
140 | ||
141 | const statsModel = await this.saveViewerStats(video, stats, t) | |
142 | ||
143 | if (video.remote) { | |
144 | await sendCreateWatchAction(statsModel, t) | |
145 | } | |
146 | }) | |
147 | ||
148 | await Redis.Instance.deleteLocalVideoViewersKeys(key) | |
149 | } catch (err) { | |
150 | logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) | |
151 | } | |
152 | } | |
153 | } catch (err) { | |
154 | logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) | |
155 | } | |
156 | ||
157 | this.processingViewersStats = false | |
158 | } | |
159 | ||
160 | private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { | |
161 | const statsModel = new LocalVideoViewerModel({ | |
162 | startDate: new Date(stats.firstUpdated), | |
163 | endDate: new Date(stats.lastUpdated), | |
164 | watchTime: stats.watchTime, | |
165 | country: stats.country, | |
166 | videoId: video.id | |
167 | }) | |
168 | ||
169 | statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) | |
170 | statsModel.Video = video as VideoModel | |
171 | ||
172 | await statsModel.save({ transaction }) | |
173 | ||
174 | statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ | |
175 | localVideoViewerId: statsModel.id, | |
176 | watchSections: stats.watchSections, | |
177 | transaction | |
178 | }) | |
179 | ||
180 | return statsModel | |
181 | } | |
182 | ||
183 | private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { | |
184 | return sections.reduce((p, current) => p + (current.end - current.start), 0) | |
185 | } | |
186 | } |