]>
Commit | Line | Data |
---|---|---|
1 | import { isTestInstance } from '@server/helpers/core-utils' | |
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | |
3 | import { VIEW_LIFETIME } from '@server/initializers/constants' | |
4 | import { sendView } from '@server/lib/activitypub/send/send-view' | |
5 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | |
6 | import { getServerActor } from '@server/models/application/application' | |
7 | import { VideoModel } from '@server/models/video/video' | |
8 | import { MVideo, MVideoImmutable } from '@server/types/models' | |
9 | import { buildUUID, sha256 } from '@shared/extra-utils' | |
10 | ||
11 | const lTags = loggerTagsFactory('views') | |
12 | ||
13 | type Viewer = { | |
14 | expires: number | |
15 | id: string | |
16 | lastFederation?: number | |
17 | } | |
18 | ||
19 | export class VideoViewerCounters { | |
20 | ||
21 | // expires is new Date().getTime() | |
22 | private readonly viewersPerVideo = new Map<number, Viewer[]>() | |
23 | private readonly idToViewer = new Map<string, Viewer>() | |
24 | ||
25 | private readonly salt = buildUUID() | |
26 | ||
27 | private processingViewerCounters = false | |
28 | ||
29 | constructor () { | |
30 | setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER) | |
31 | } | |
32 | ||
33 | // --------------------------------------------------------------------------- | |
34 | ||
35 | async addLocalViewer (options: { | |
36 | video: MVideoImmutable | |
37 | ip: string | |
38 | }) { | |
39 | const { video, ip } = options | |
40 | ||
41 | logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) }) | |
42 | ||
43 | const viewerId = this.generateViewerId(ip, video.uuid) | |
44 | const viewer = this.idToViewer.get(viewerId) | |
45 | ||
46 | if (viewer) { | |
47 | viewer.expires = this.buildViewerExpireTime() | |
48 | await this.federateViewerIfNeeded(video, viewer) | |
49 | ||
50 | return false | |
51 | } | |
52 | ||
53 | const newViewer = await this.addViewerToVideo({ viewerId, video }) | |
54 | await this.federateViewerIfNeeded(video, newViewer) | |
55 | ||
56 | return true | |
57 | } | |
58 | ||
59 | async addRemoteViewer (options: { | |
60 | video: MVideo | |
61 | viewerId: string | |
62 | viewerExpires: Date | |
63 | }) { | |
64 | const { video, viewerExpires, viewerId } = options | |
65 | ||
66 | logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) | |
67 | ||
68 | await this.addViewerToVideo({ video, viewerExpires, viewerId }) | |
69 | ||
70 | return true | |
71 | } | |
72 | ||
73 | // --------------------------------------------------------------------------- | |
74 | ||
75 | getViewers (video: MVideo) { | |
76 | const viewers = this.viewersPerVideo.get(video.id) | |
77 | if (!viewers) return 0 | |
78 | ||
79 | return viewers.length | |
80 | } | |
81 | ||
82 | buildViewerExpireTime () { | |
83 | return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER | |
84 | } | |
85 | ||
86 | // --------------------------------------------------------------------------- | |
87 | ||
88 | private async addViewerToVideo (options: { | |
89 | video: MVideoImmutable | |
90 | viewerId: string | |
91 | viewerExpires?: Date | |
92 | }) { | |
93 | const { video, viewerExpires, viewerId } = options | |
94 | ||
95 | let watchers = this.viewersPerVideo.get(video.id) | |
96 | ||
97 | if (!watchers) { | |
98 | watchers = [] | |
99 | this.viewersPerVideo.set(video.id, watchers) | |
100 | } | |
101 | ||
102 | const expires = viewerExpires | |
103 | ? viewerExpires.getTime() | |
104 | : this.buildViewerExpireTime() | |
105 | ||
106 | const viewer = { id: viewerId, expires } | |
107 | watchers.push(viewer) | |
108 | ||
109 | this.idToViewer.set(viewerId, viewer) | |
110 | ||
111 | await this.notifyClients(video.id, watchers.length) | |
112 | ||
113 | return viewer | |
114 | } | |
115 | ||
116 | private async cleanViewerCounters () { | |
117 | if (this.processingViewerCounters) return | |
118 | this.processingViewerCounters = true | |
119 | ||
120 | if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) | |
121 | ||
122 | try { | |
123 | for (const videoId of this.viewersPerVideo.keys()) { | |
124 | const notBefore = new Date().getTime() | |
125 | ||
126 | const viewers = this.viewersPerVideo.get(videoId) | |
127 | ||
128 | // Only keep not expired viewers | |
129 | const newViewers: Viewer[] = [] | |
130 | ||
131 | // Filter new viewers | |
132 | for (const viewer of viewers) { | |
133 | if (viewer.expires > notBefore) { | |
134 | newViewers.push(viewer) | |
135 | } else { | |
136 | this.idToViewer.delete(viewer.id) | |
137 | } | |
138 | } | |
139 | ||
140 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | |
141 | else this.viewersPerVideo.set(videoId, newViewers) | |
142 | ||
143 | await this.notifyClients(videoId, newViewers.length) | |
144 | } | |
145 | } catch (err) { | |
146 | logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) | |
147 | } | |
148 | ||
149 | this.processingViewerCounters = false | |
150 | } | |
151 | ||
152 | private async notifyClients (videoId: string | number, viewersLength: number) { | |
153 | const video = await VideoModel.loadImmutableAttributes(videoId) | |
154 | if (!video) return | |
155 | ||
156 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | |
157 | ||
158 | logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) | |
159 | } | |
160 | ||
161 | private generateViewerId (ip: string, videoUUID: string) { | |
162 | return sha256(this.salt + '-' + ip + '-' + videoUUID) | |
163 | } | |
164 | ||
165 | private async federateViewerIfNeeded (video: MVideoImmutable, viewer: Viewer) { | |
166 | // Federate the viewer if it's been a "long" time we did not | |
167 | const now = new Date().getTime() | |
168 | const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75) | |
169 | ||
170 | if (viewer.lastFederation && viewer.lastFederation > federationLimit) return | |
171 | ||
172 | await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id }) | |
173 | viewer.lastFederation = now | |
174 | } | |
175 | } |