]>
Commit | Line | Data |
---|---|---|
b2111066 | 1 | import { Transaction } from 'sequelize/types' |
9452d4fd | 2 | import { isTestOrDevInstance } from '@server/helpers/core-utils' |
b2111066 C |
3 | import { GeoIP } from '@server/helpers/geo-ip' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
5 | import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' | |
6 | import { sequelizeTypescript } from '@server/initializers/database' | |
7 | import { sendCreateWatchAction } from '@server/lib/activitypub/send' | |
8 | import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' | |
b2111066 C |
9 | import { Redis } from '@server/lib/redis' |
10 | import { VideoModel } from '@server/models/video/video' | |
11 | import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' | |
12 | import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' | |
aa2ce188 | 13 | import { MVideo, MVideoImmutable } from '@server/types/models' |
b2111066 C |
14 | import { VideoViewEvent } from '@shared/models' |
15 | ||
16 | const lTags = loggerTagsFactory('views') | |
17 | ||
18 | type LocalViewerStats = { | |
19 | firstUpdated: number // Date.getTime() | |
20 | lastUpdated: number // Date.getTime() | |
21 | ||
22 | watchSections: { | |
23 | start: number | |
24 | end: number | |
25 | }[] | |
26 | ||
27 | watchTime: number | |
28 | ||
29 | country: string | |
30 | ||
31 | videoId: number | |
32 | } | |
33 | ||
ac907dc7 C |
34 | export class VideoViewerStats { |
35 | private processingViewersStats = false | |
b2111066 C |
36 | |
37 | constructor () { | |
b2111066 C |
38 | setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) |
39 | } | |
40 | ||
41 | // --------------------------------------------------------------------------- | |
42 | ||
b2111066 | 43 | async addLocalViewer (options: { |
aa2ce188 | 44 | video: MVideoImmutable |
b2111066 C |
45 | currentTime: number |
46 | ip: string | |
47 | viewEvent?: VideoViewEvent | |
48 | }) { | |
49 | const { video, ip, viewEvent, currentTime } = options | |
50 | ||
ac907dc7 | 51 | logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) |
b2111066 | 52 | |
ac907dc7 | 53 | return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) |
b2111066 C |
54 | } |
55 | ||
ac907dc7 | 56 | // --------------------------------------------------------------------------- |
b2111066 | 57 | |
ac907dc7 C |
58 | async getWatchTime (videoId: number, ip: string) { |
59 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) | |
b2111066 | 60 | |
ac907dc7 | 61 | return stats?.watchTime || 0 |
b2111066 C |
62 | } |
63 | ||
ac907dc7 | 64 | // --------------------------------------------------------------------------- |
b2111066 C |
65 | |
66 | private async updateLocalViewerStats (options: { | |
aa2ce188 | 67 | video: MVideoImmutable |
b2111066 C |
68 | ip: string |
69 | currentTime: number | |
70 | viewEvent?: VideoViewEvent | |
71 | }) { | |
72 | const { video, ip, viewEvent, currentTime } = options | |
73 | const nowMs = new Date().getTime() | |
74 | ||
75 | let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) | |
76 | ||
77 | if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { | |
78 | logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) | |
79 | return | |
80 | } | |
81 | ||
82 | if (!stats) { | |
83 | const country = await GeoIP.Instance.safeCountryISOLookup(ip) | |
84 | ||
85 | stats = { | |
86 | firstUpdated: nowMs, | |
87 | lastUpdated: nowMs, | |
88 | ||
89 | watchSections: [], | |
90 | ||
91 | watchTime: 0, | |
92 | ||
93 | country, | |
94 | videoId: video.id | |
95 | } | |
96 | } | |
97 | ||
98 | stats.lastUpdated = nowMs | |
99 | ||
100 | if (viewEvent === 'seek' || stats.watchSections.length === 0) { | |
101 | stats.watchSections.push({ | |
102 | start: currentTime, | |
103 | end: currentTime | |
104 | }) | |
105 | } else { | |
106 | const lastSection = stats.watchSections[stats.watchSections.length - 1] | |
a4940752 C |
107 | |
108 | if (lastSection.start > currentTime) { | |
7815dc45 C |
109 | logger.debug('Invalid end watch section %d. Last start record was at %d. Starting a new section.', currentTime, lastSection.start) |
110 | ||
111 | stats.watchSections.push({ | |
112 | start: currentTime, | |
113 | end: currentTime | |
114 | }) | |
a4940752 C |
115 | } else { |
116 | lastSection.end = currentTime | |
117 | } | |
b2111066 C |
118 | } |
119 | ||
120 | stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) | |
121 | ||
122 | logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) | |
123 | ||
124 | await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) | |
125 | } | |
126 | ||
b2111066 | 127 | async processViewerStats () { |
ac907dc7 C |
128 | if (this.processingViewersStats) return |
129 | this.processingViewersStats = true | |
b2111066 | 130 | |
9452d4fd | 131 | if (!isTestOrDevInstance()) logger.info('Processing viewer statistics.', lTags()) |
b2111066 C |
132 | |
133 | const now = new Date().getTime() | |
134 | ||
135 | try { | |
136 | const allKeys = await Redis.Instance.listLocalVideoViewerKeys() | |
137 | ||
138 | for (const key of allKeys) { | |
139 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) | |
140 | ||
dfbcefc2 | 141 | // Process expired stats |
b2111066 C |
142 | if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { |
143 | continue | |
144 | } | |
145 | ||
146 | try { | |
147 | await sequelizeTypescript.transaction(async t => { | |
148 | const video = await VideoModel.load(stats.videoId, t) | |
941d28cc | 149 | if (!video) return |
b2111066 C |
150 | |
151 | const statsModel = await this.saveViewerStats(video, stats, t) | |
152 | ||
153 | if (video.remote) { | |
154 | await sendCreateWatchAction(statsModel, t) | |
155 | } | |
156 | }) | |
157 | ||
158 | await Redis.Instance.deleteLocalVideoViewersKeys(key) | |
159 | } catch (err) { | |
160 | logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) | |
161 | } | |
162 | } | |
163 | } catch (err) { | |
164 | logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) | |
165 | } | |
166 | ||
ac907dc7 | 167 | this.processingViewersStats = false |
b2111066 C |
168 | } |
169 | ||
170 | private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { | |
171 | const statsModel = new LocalVideoViewerModel({ | |
172 | startDate: new Date(stats.firstUpdated), | |
173 | endDate: new Date(stats.lastUpdated), | |
174 | watchTime: stats.watchTime, | |
175 | country: stats.country, | |
176 | videoId: video.id | |
177 | }) | |
178 | ||
179 | statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) | |
180 | statsModel.Video = video as VideoModel | |
181 | ||
182 | await statsModel.save({ transaction }) | |
183 | ||
184 | statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ | |
185 | localVideoViewerId: statsModel.id, | |
186 | watchSections: stats.watchSections, | |
187 | transaction | |
188 | }) | |
189 | ||
190 | return statsModel | |
191 | } | |
192 | ||
193 | private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { | |
194 | return sections.reduce((p, current) => p + (current.end - current.start), 0) | |
195 | } | |
196 | } |