]>
Commit | Line | Data |
---|---|---|
b2111066 | 1 | import { Transaction } from 'sequelize/types' |
9452d4fd | 2 | import { isTestOrDevInstance } from '@server/helpers/core-utils' |
b2111066 C |
3 | import { GeoIP } from '@server/helpers/geo-ip' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
5 | import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' | |
6 | import { sequelizeTypescript } from '@server/initializers/database' | |
7 | import { sendCreateWatchAction } from '@server/lib/activitypub/send' | |
8 | import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' | |
b2111066 C |
9 | import { Redis } from '@server/lib/redis' |
10 | import { VideoModel } from '@server/models/video/video' | |
11 | import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' | |
12 | import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' | |
aa2ce188 | 13 | import { MVideo, MVideoImmutable } from '@server/types/models' |
b2111066 C |
14 | import { VideoViewEvent } from '@shared/models' |
15 | ||
16 | const lTags = loggerTagsFactory('views') | |
17 | ||
18 | type LocalViewerStats = { | |
19 | firstUpdated: number // Date.getTime() | |
20 | lastUpdated: number // Date.getTime() | |
21 | ||
22 | watchSections: { | |
23 | start: number | |
24 | end: number | |
25 | }[] | |
26 | ||
27 | watchTime: number | |
28 | ||
29 | country: string | |
30 | ||
31 | videoId: number | |
32 | } | |
33 | ||
ac907dc7 C |
34 | export class VideoViewerStats { |
35 | private processingViewersStats = false | |
b2111066 C |
36 | |
37 | constructor () { | |
b2111066 C |
38 | setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) |
39 | } | |
40 | ||
41 | // --------------------------------------------------------------------------- | |
42 | ||
b2111066 | 43 | async addLocalViewer (options: { |
aa2ce188 | 44 | video: MVideoImmutable |
b2111066 C |
45 | currentTime: number |
46 | ip: string | |
47 | viewEvent?: VideoViewEvent | |
48 | }) { | |
49 | const { video, ip, viewEvent, currentTime } = options | |
50 | ||
ac907dc7 | 51 | logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) |
b2111066 | 52 | |
ac907dc7 | 53 | return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) |
b2111066 C |
54 | } |
55 | ||
ac907dc7 | 56 | // --------------------------------------------------------------------------- |
b2111066 | 57 | |
ac907dc7 C |
58 | async getWatchTime (videoId: number, ip: string) { |
59 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) | |
b2111066 | 60 | |
ac907dc7 | 61 | return stats?.watchTime || 0 |
b2111066 C |
62 | } |
63 | ||
ac907dc7 | 64 | // --------------------------------------------------------------------------- |
b2111066 C |
65 | |
66 | private async updateLocalViewerStats (options: { | |
aa2ce188 | 67 | video: MVideoImmutable |
b2111066 C |
68 | ip: string |
69 | currentTime: number | |
70 | viewEvent?: VideoViewEvent | |
71 | }) { | |
72 | const { video, ip, viewEvent, currentTime } = options | |
73 | const nowMs = new Date().getTime() | |
74 | ||
75 | let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) | |
76 | ||
77 | if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { | |
78 | logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) | |
79 | return | |
80 | } | |
81 | ||
82 | if (!stats) { | |
83 | const country = await GeoIP.Instance.safeCountryISOLookup(ip) | |
84 | ||
85 | stats = { | |
86 | firstUpdated: nowMs, | |
87 | lastUpdated: nowMs, | |
88 | ||
89 | watchSections: [], | |
90 | ||
91 | watchTime: 0, | |
92 | ||
93 | country, | |
94 | videoId: video.id | |
95 | } | |
96 | } | |
97 | ||
98 | stats.lastUpdated = nowMs | |
99 | ||
100 | if (viewEvent === 'seek' || stats.watchSections.length === 0) { | |
101 | stats.watchSections.push({ | |
102 | start: currentTime, | |
103 | end: currentTime | |
104 | }) | |
105 | } else { | |
106 | const lastSection = stats.watchSections[stats.watchSections.length - 1] | |
a4940752 C |
107 | |
108 | if (lastSection.start > currentTime) { | |
109 | logger.warn('Invalid end watch section %d. Last start record was at %d.', currentTime, lastSection.start) | |
110 | } else { | |
111 | lastSection.end = currentTime | |
112 | } | |
b2111066 C |
113 | } |
114 | ||
115 | stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) | |
116 | ||
117 | logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) | |
118 | ||
119 | await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) | |
120 | } | |
121 | ||
b2111066 | 122 | async processViewerStats () { |
ac907dc7 C |
123 | if (this.processingViewersStats) return |
124 | this.processingViewersStats = true | |
b2111066 | 125 | |
9452d4fd | 126 | if (!isTestOrDevInstance()) logger.info('Processing viewer statistics.', lTags()) |
b2111066 C |
127 | |
128 | const now = new Date().getTime() | |
129 | ||
130 | try { | |
131 | const allKeys = await Redis.Instance.listLocalVideoViewerKeys() | |
132 | ||
133 | for (const key of allKeys) { | |
134 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) | |
135 | ||
dfbcefc2 | 136 | // Process expired stats |
b2111066 C |
137 | if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { |
138 | continue | |
139 | } | |
140 | ||
141 | try { | |
142 | await sequelizeTypescript.transaction(async t => { | |
143 | const video = await VideoModel.load(stats.videoId, t) | |
941d28cc | 144 | if (!video) return |
b2111066 C |
145 | |
146 | const statsModel = await this.saveViewerStats(video, stats, t) | |
147 | ||
148 | if (video.remote) { | |
149 | await sendCreateWatchAction(statsModel, t) | |
150 | } | |
151 | }) | |
152 | ||
153 | await Redis.Instance.deleteLocalVideoViewersKeys(key) | |
154 | } catch (err) { | |
155 | logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) | |
156 | } | |
157 | } | |
158 | } catch (err) { | |
159 | logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) | |
160 | } | |
161 | ||
ac907dc7 | 162 | this.processingViewersStats = false |
b2111066 C |
163 | } |
164 | ||
165 | private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { | |
166 | const statsModel = new LocalVideoViewerModel({ | |
167 | startDate: new Date(stats.firstUpdated), | |
168 | endDate: new Date(stats.lastUpdated), | |
169 | watchTime: stats.watchTime, | |
170 | country: stats.country, | |
171 | videoId: video.id | |
172 | }) | |
173 | ||
174 | statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) | |
175 | statsModel.Video = video as VideoModel | |
176 | ||
177 | await statsModel.save({ transaction }) | |
178 | ||
179 | statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ | |
180 | localVideoViewerId: statsModel.id, | |
181 | watchSections: stats.watchSections, | |
182 | transaction | |
183 | }) | |
184 | ||
185 | return statsModel | |
186 | } | |
187 | ||
188 | private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { | |
189 | return sections.reduce((p, current) => p + (current.end - current.start), 0) | |
190 | } | |
191 | } |