]>
Commit | Line | Data |
---|---|---|
1 | import { isTestOrDevInstance } from '@server/helpers/core-utils' | |
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
3 | import { VIEW_LIFETIME } from '@server/initializers/constants' | |
4 | import { sendView } from '@server/lib/activitypub/send/send-view' | |
5 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | |
6 | import { getServerActor } from '@server/models/application/application' | |
7 | import { VideoModel } from '@server/models/video/video' | |
8 | import { MVideo, MVideoImmutable } from '@server/types/models' | |
9 | import { buildUUID, sha256 } from '@shared/extra-utils' | |
10 | ||
11 | const lTags = loggerTagsFactory('views') | |
12 | ||
13 | export type ViewerScope = 'local' | 'remote' | |
14 | export type VideoScope = 'local' | 'remote' | |
15 | ||
16 | type Viewer = { | |
17 | expires: number | |
18 | id: string | |
19 | viewerScope: ViewerScope | |
20 | videoScope: VideoScope | |
21 | lastFederation?: number | |
22 | } | |
23 | ||
24 | export class VideoViewerCounters { | |
25 | ||
26 | // expires is new Date().getTime() | |
27 | private readonly viewersPerVideo = new Map<number, Viewer[]>() | |
28 | private readonly idToViewer = new Map<string, Viewer>() | |
29 | ||
30 | private readonly salt = buildUUID() | |
31 | ||
32 | private processingViewerCounters = false | |
33 | ||
34 | constructor () { | |
35 | setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER) | |
36 | } | |
37 | ||
38 | // --------------------------------------------------------------------------- | |
39 | ||
40 | async addLocalViewer (options: { | |
41 | video: MVideoImmutable | |
42 | ip: string | |
43 | }) { | |
44 | const { video, ip } = options | |
45 | ||
46 | logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) }) | |
47 | ||
48 | const viewerId = this.generateViewerId(ip, video.uuid) | |
49 | const viewer = this.idToViewer.get(viewerId) | |
50 | ||
51 | if (viewer) { | |
52 | viewer.expires = this.buildViewerExpireTime() | |
53 | await this.federateViewerIfNeeded(video, viewer) | |
54 | ||
55 | return false | |
56 | } | |
57 | ||
58 | const newViewer = await this.addViewerToVideo({ viewerId, video, viewerScope: 'local' }) | |
59 | await this.federateViewerIfNeeded(video, newViewer) | |
60 | ||
61 | return true | |
62 | } | |
63 | ||
64 | async addRemoteViewer (options: { | |
65 | video: MVideo | |
66 | viewerId: string | |
67 | viewerExpires: Date | |
68 | }) { | |
69 | const { video, viewerExpires, viewerId } = options | |
70 | ||
71 | logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) | |
72 | ||
73 | await this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote' }) | |
74 | ||
75 | return true | |
76 | } | |
77 | ||
78 | // --------------------------------------------------------------------------- | |
79 | ||
80 | getTotalViewers (options: { | |
81 | viewerScope: ViewerScope | |
82 | videoScope: VideoScope | |
83 | }) { | |
84 | let total = 0 | |
85 | ||
86 | for (const viewers of this.viewersPerVideo.values()) { | |
87 | total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope).length | |
88 | } | |
89 | ||
90 | return total | |
91 | } | |
92 | ||
93 | getViewers (video: MVideo) { | |
94 | const viewers = this.viewersPerVideo.get(video.id) | |
95 | if (!viewers) return 0 | |
96 | ||
97 | return viewers.length | |
98 | } | |
99 | ||
100 | buildViewerExpireTime () { | |
101 | return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER | |
102 | } | |
103 | ||
104 | // --------------------------------------------------------------------------- | |
105 | ||
106 | private async addViewerToVideo (options: { | |
107 | video: MVideoImmutable | |
108 | viewerId: string | |
109 | viewerScope: ViewerScope | |
110 | viewerExpires?: Date | |
111 | }) { | |
112 | const { video, viewerExpires, viewerId, viewerScope } = options | |
113 | ||
114 | let watchers = this.viewersPerVideo.get(video.id) | |
115 | ||
116 | if (!watchers) { | |
117 | watchers = [] | |
118 | this.viewersPerVideo.set(video.id, watchers) | |
119 | } | |
120 | ||
121 | const expires = viewerExpires | |
122 | ? viewerExpires.getTime() | |
123 | : this.buildViewerExpireTime() | |
124 | ||
125 | const videoScope: VideoScope = video.remote | |
126 | ? 'remote' | |
127 | : 'local' | |
128 | ||
129 | const viewer = { id: viewerId, expires, videoScope, viewerScope } | |
130 | watchers.push(viewer) | |
131 | ||
132 | this.idToViewer.set(viewerId, viewer) | |
133 | ||
134 | await this.notifyClients(video.id, watchers.length) | |
135 | ||
136 | return viewer | |
137 | } | |
138 | ||
139 | private async cleanViewerCounters () { | |
140 | if (this.processingViewerCounters) return | |
141 | this.processingViewerCounters = true | |
142 | ||
143 | if (!isTestOrDevInstance()) logger.info('Cleaning video viewers.', lTags()) | |
144 | ||
145 | try { | |
146 | for (const videoId of this.viewersPerVideo.keys()) { | |
147 | const notBefore = new Date().getTime() | |
148 | ||
149 | const viewers = this.viewersPerVideo.get(videoId) | |
150 | ||
151 | // Only keep not expired viewers | |
152 | const newViewers: Viewer[] = [] | |
153 | ||
154 | // Filter new viewers | |
155 | for (const viewer of viewers) { | |
156 | if (viewer.expires > notBefore) { | |
157 | newViewers.push(viewer) | |
158 | } else { | |
159 | this.idToViewer.delete(viewer.id) | |
160 | } | |
161 | } | |
162 | ||
163 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | |
164 | else this.viewersPerVideo.set(videoId, newViewers) | |
165 | ||
166 | await this.notifyClients(videoId, newViewers.length) | |
167 | } | |
168 | } catch (err) { | |
169 | logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) | |
170 | } | |
171 | ||
172 | this.processingViewerCounters = false | |
173 | } | |
174 | ||
175 | private async notifyClients (videoId: string | number, viewersLength: number) { | |
176 | const video = await VideoModel.loadImmutableAttributes(videoId) | |
177 | if (!video) return | |
178 | ||
179 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | |
180 | ||
181 | logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) | |
182 | } | |
183 | ||
184 | private generateViewerId (ip: string, videoUUID: string) { | |
185 | return sha256(this.salt + '-' + ip + '-' + videoUUID) | |
186 | } | |
187 | ||
188 | private async federateViewerIfNeeded (video: MVideoImmutable, viewer: Viewer) { | |
189 | // Federate the viewer if it's been a "long" time we did not | |
190 | const now = new Date().getTime() | |
191 | const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75) | |
192 | ||
193 | if (viewer.lastFederation && viewer.lastFederation > federationLimit) return | |
194 | ||
195 | await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id }) | |
196 | viewer.lastFederation = now | |
197 | } | |
198 | } |