aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/views
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-07-31 14:34:36 +0200
committerChocobozzz <me@florianbigard.com>2023-08-11 15:02:33 +0200
commit3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch)
treee4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/views
parent04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff)
downloadPeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge conflicts, but it's a major step forward: * Server can be faster at startup because imports() are async and we can easily lazy import big modules * Angular doesn't seem to support ES import (with .js extension), so we had to correctly organize peertube into a monorepo: * Use yarn workspace feature * Use typescript reference projects for dependencies * Shared projects have been moved into "packages", each one is now a node module (with a dedicated package.json/tsconfig.json) * server/tools have been moved into apps/ and is now a dedicated app bundled and published on NPM so users don't have to build peertube cli tools manually * server/tests have been moved into packages/ so we don't compile them every time we want to run the server * Use isolatedModule option: * Had to move from const enum to const (https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums) * Had to explictely specify "type" imports when used in decorators * Prefer tsx (that uses esbuild under the hood) instead of ts-node to load typescript files (tests with mocha or scripts): * To reduce test complexity as esbuild doesn't support decorator metadata, we only test server files that do not import server models * We still build tests files into js files for a faster CI * Remove unmaintained peertube CLI import script * Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/views')
-rw-r--r--server/lib/views/shared/index.ts3
-rw-r--r--server/lib/views/shared/video-viewer-counters.ts198
-rw-r--r--server/lib/views/shared/video-viewer-stats.ts196
-rw-r--r--server/lib/views/shared/video-views.ts70
-rw-r--r--server/lib/views/video-views-manager.ts100
5 files changed, 0 insertions, 567 deletions
diff --git a/server/lib/views/shared/index.ts b/server/lib/views/shared/index.ts
deleted file mode 100644
index 139471183..000000000
--- a/server/lib/views/shared/index.ts
+++ /dev/null
@@ -1,3 +0,0 @@
1export * from './video-viewer-counters'
2export * from './video-viewer-stats'
3export * from './video-views'
diff --git a/server/lib/views/shared/video-viewer-counters.ts b/server/lib/views/shared/video-viewer-counters.ts
deleted file mode 100644
index f5b83130e..000000000
--- a/server/lib/views/shared/video-viewer-counters.ts
+++ /dev/null
@@ -1,198 +0,0 @@
1import { isTestOrDevInstance } from '@server/helpers/core-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { VIEW_LIFETIME } from '@server/initializers/constants'
4import { sendView } from '@server/lib/activitypub/send/send-view'
5import { PeerTubeSocket } from '@server/lib/peertube-socket'
6import { getServerActor } from '@server/models/application/application'
7import { VideoModel } from '@server/models/video/video'
8import { MVideo, MVideoImmutable } from '@server/types/models'
9import { buildUUID, sha256 } from '@shared/extra-utils'
10
11const lTags = loggerTagsFactory('views')
12
13export type ViewerScope = 'local' | 'remote'
14export type VideoScope = 'local' | 'remote'
15
16type Viewer = {
17 expires: number
18 id: string
19 viewerScope: ViewerScope
20 videoScope: VideoScope
21 lastFederation?: number
22}
23
24export class VideoViewerCounters {
25
26 // expires is new Date().getTime()
27 private readonly viewersPerVideo = new Map<number, Viewer[]>()
28 private readonly idToViewer = new Map<string, Viewer>()
29
30 private readonly salt = buildUUID()
31
32 private processingViewerCounters = false
33
34 constructor () {
35 setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER)
36 }
37
38 // ---------------------------------------------------------------------------
39
40 async addLocalViewer (options: {
41 video: MVideoImmutable
42 ip: string
43 }) {
44 const { video, ip } = options
45
46 logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) })
47
48 const viewerId = this.generateViewerId(ip, video.uuid)
49 const viewer = this.idToViewer.get(viewerId)
50
51 if (viewer) {
52 viewer.expires = this.buildViewerExpireTime()
53 await this.federateViewerIfNeeded(video, viewer)
54
55 return false
56 }
57
58 const newViewer = await this.addViewerToVideo({ viewerId, video, viewerScope: 'local' })
59 await this.federateViewerIfNeeded(video, newViewer)
60
61 return true
62 }
63
64 async addRemoteViewer (options: {
65 video: MVideo
66 viewerId: string
67 viewerExpires: Date
68 }) {
69 const { video, viewerExpires, viewerId } = options
70
71 logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
72
73 await this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote' })
74
75 return true
76 }
77
78 // ---------------------------------------------------------------------------
79
80 getTotalViewers (options: {
81 viewerScope: ViewerScope
82 videoScope: VideoScope
83 }) {
84 let total = 0
85
86 for (const viewers of this.viewersPerVideo.values()) {
87 total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope).length
88 }
89
90 return total
91 }
92
93 getViewers (video: MVideo) {
94 const viewers = this.viewersPerVideo.get(video.id)
95 if (!viewers) return 0
96
97 return viewers.length
98 }
99
100 buildViewerExpireTime () {
101 return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER
102 }
103
104 // ---------------------------------------------------------------------------
105
106 private async addViewerToVideo (options: {
107 video: MVideoImmutable
108 viewerId: string
109 viewerScope: ViewerScope
110 viewerExpires?: Date
111 }) {
112 const { video, viewerExpires, viewerId, viewerScope } = options
113
114 let watchers = this.viewersPerVideo.get(video.id)
115
116 if (!watchers) {
117 watchers = []
118 this.viewersPerVideo.set(video.id, watchers)
119 }
120
121 const expires = viewerExpires
122 ? viewerExpires.getTime()
123 : this.buildViewerExpireTime()
124
125 const videoScope: VideoScope = video.remote
126 ? 'remote'
127 : 'local'
128
129 const viewer = { id: viewerId, expires, videoScope, viewerScope }
130 watchers.push(viewer)
131
132 this.idToViewer.set(viewerId, viewer)
133
134 await this.notifyClients(video.id, watchers.length)
135
136 return viewer
137 }
138
139 private async cleanViewerCounters () {
140 if (this.processingViewerCounters) return
141 this.processingViewerCounters = true
142
143 if (!isTestOrDevInstance()) logger.info('Cleaning video viewers.', lTags())
144
145 try {
146 for (const videoId of this.viewersPerVideo.keys()) {
147 const notBefore = new Date().getTime()
148
149 const viewers = this.viewersPerVideo.get(videoId)
150
151 // Only keep not expired viewers
152 const newViewers: Viewer[] = []
153
154 // Filter new viewers
155 for (const viewer of viewers) {
156 if (viewer.expires > notBefore) {
157 newViewers.push(viewer)
158 } else {
159 this.idToViewer.delete(viewer.id)
160 }
161 }
162
163 if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
164 else this.viewersPerVideo.set(videoId, newViewers)
165
166 await this.notifyClients(videoId, newViewers.length)
167 }
168 } catch (err) {
169 logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
170 }
171
172 this.processingViewerCounters = false
173 }
174
175 private async notifyClients (videoId: string | number, viewersLength: number) {
176 const video = await VideoModel.loadImmutableAttributes(videoId)
177 if (!video) return
178
179 PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
180
181 logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
182 }
183
184 private generateViewerId (ip: string, videoUUID: string) {
185 return sha256(this.salt + '-' + ip + '-' + videoUUID)
186 }
187
188 private async federateViewerIfNeeded (video: MVideoImmutable, viewer: Viewer) {
189 // Federate the viewer if it's been a "long" time we did not
190 const now = new Date().getTime()
191 const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75)
192
193 if (viewer.lastFederation && viewer.lastFederation > federationLimit) return
194
195 await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id })
196 viewer.lastFederation = now
197 }
198}
diff --git a/server/lib/views/shared/video-viewer-stats.ts b/server/lib/views/shared/video-viewer-stats.ts
deleted file mode 100644
index ebd963e59..000000000
--- a/server/lib/views/shared/video-viewer-stats.ts
+++ /dev/null
@@ -1,196 +0,0 @@
1import { Transaction } from 'sequelize/types'
2import { isTestOrDevInstance } from '@server/helpers/core-utils'
3import { GeoIP } from '@server/helpers/geo-ip'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants'
6import { sequelizeTypescript } from '@server/initializers/database'
7import { sendCreateWatchAction } from '@server/lib/activitypub/send'
8import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url'
9import { Redis } from '@server/lib/redis'
10import { VideoModel } from '@server/models/video/video'
11import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
12import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section'
13import { MVideo, MVideoImmutable } from '@server/types/models'
14import { VideoViewEvent } from '@shared/models'
15
16const lTags = loggerTagsFactory('views')
17
18type LocalViewerStats = {
19 firstUpdated: number // Date.getTime()
20 lastUpdated: number // Date.getTime()
21
22 watchSections: {
23 start: number
24 end: number
25 }[]
26
27 watchTime: number
28
29 country: string
30
31 videoId: number
32}
33
34export class VideoViewerStats {
35 private processingViewersStats = false
36
37 constructor () {
38 setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
39 }
40
41 // ---------------------------------------------------------------------------
42
43 async addLocalViewer (options: {
44 video: MVideoImmutable
45 currentTime: number
46 ip: string
47 viewEvent?: VideoViewEvent
48 }) {
49 const { video, ip, viewEvent, currentTime } = options
50
51 logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
52
53 return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
54 }
55
56 // ---------------------------------------------------------------------------
57
58 async getWatchTime (videoId: number, ip: string) {
59 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
60
61 return stats?.watchTime || 0
62 }
63
64 // ---------------------------------------------------------------------------
65
66 private async updateLocalViewerStats (options: {
67 video: MVideoImmutable
68 ip: string
69 currentTime: number
70 viewEvent?: VideoViewEvent
71 }) {
72 const { video, ip, viewEvent, currentTime } = options
73 const nowMs = new Date().getTime()
74
75 let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id })
76
77 if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) {
78 logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) })
79 return
80 }
81
82 if (!stats) {
83 const country = await GeoIP.Instance.safeCountryISOLookup(ip)
84
85 stats = {
86 firstUpdated: nowMs,
87 lastUpdated: nowMs,
88
89 watchSections: [],
90
91 watchTime: 0,
92
93 country,
94 videoId: video.id
95 }
96 }
97
98 stats.lastUpdated = nowMs
99
100 if (viewEvent === 'seek' || stats.watchSections.length === 0) {
101 stats.watchSections.push({
102 start: currentTime,
103 end: currentTime
104 })
105 } else {
106 const lastSection = stats.watchSections[stats.watchSections.length - 1]
107
108 if (lastSection.start > currentTime) {
109 logger.debug('Invalid end watch section %d. Last start record was at %d. Starting a new section.', currentTime, lastSection.start)
110
111 stats.watchSections.push({
112 start: currentTime,
113 end: currentTime
114 })
115 } else {
116 lastSection.end = currentTime
117 }
118 }
119
120 stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections)
121
122 logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
123
124 await Redis.Instance.setLocalVideoViewer(ip, video.id, stats)
125 }
126
127 async processViewerStats () {
128 if (this.processingViewersStats) return
129 this.processingViewersStats = true
130
131 if (!isTestOrDevInstance()) logger.info('Processing viewer statistics.', lTags())
132
133 const now = new Date().getTime()
134
135 try {
136 const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
137
138 for (const key of allKeys) {
139 const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key })
140
141 // Process expired stats
142 if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) {
143 continue
144 }
145
146 try {
147 await sequelizeTypescript.transaction(async t => {
148 const video = await VideoModel.load(stats.videoId, t)
149 if (!video) return
150
151 const statsModel = await this.saveViewerStats(video, stats, t)
152
153 if (video.remote) {
154 await sendCreateWatchAction(statsModel, t)
155 }
156 })
157
158 await Redis.Instance.deleteLocalVideoViewersKeys(key)
159 } catch (err) {
160 logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() })
161 }
162 }
163 } catch (err) {
164 logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
165 }
166
167 this.processingViewersStats = false
168 }
169
170 private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {
171 const statsModel = new LocalVideoViewerModel({
172 startDate: new Date(stats.firstUpdated),
173 endDate: new Date(stats.lastUpdated),
174 watchTime: stats.watchTime,
175 country: stats.country,
176 videoId: video.id
177 })
178
179 statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel)
180 statsModel.Video = video as VideoModel
181
182 await statsModel.save({ transaction })
183
184 statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({
185 localVideoViewerId: statsModel.id,
186 watchSections: stats.watchSections,
187 transaction
188 })
189
190 return statsModel
191 }
192
193 private buildWatchTimeFromSections (sections: { start: number, end: number }[]) {
194 return sections.reduce((p, current) => p + (current.end - current.start), 0)
195 }
196}
diff --git a/server/lib/views/shared/video-views.ts b/server/lib/views/shared/video-views.ts
deleted file mode 100644
index e563287e1..000000000
--- a/server/lib/views/shared/video-views.ts
+++ /dev/null
@@ -1,70 +0,0 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { sendView } from '@server/lib/activitypub/send/send-view'
3import { getCachedVideoDuration } from '@server/lib/video'
4import { getServerActor } from '@server/models/application/application'
5import { MVideo, MVideoImmutable } from '@server/types/models'
6import { buildUUID } from '@shared/extra-utils'
7import { Redis } from '../../redis'
8
9const lTags = loggerTagsFactory('views')
10
11export class VideoViews {
12
13 async addLocalView (options: {
14 video: MVideoImmutable
15 ip: string
16 watchTime: number
17 }) {
18 const { video, ip, watchTime } = options
19
20 logger.debug('Adding local view to video %s.', video.uuid, { watchTime, ...lTags(video.uuid) })
21
22 if (!await this.hasEnoughWatchTime(video, watchTime)) return false
23
24 const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
25 if (viewExists) return false
26
27 await Redis.Instance.setIPVideoView(ip, video.uuid)
28
29 await this.addView(video)
30
31 await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() })
32
33 return true
34 }
35
36 async addRemoteView (options: {
37 video: MVideo
38 }) {
39 const { video } = options
40
41 logger.debug('Adding remote view to video %s.', video.uuid, { ...lTags(video.uuid) })
42
43 await this.addView(video)
44
45 return true
46 }
47
48 // ---------------------------------------------------------------------------
49
50 private async addView (video: MVideoImmutable) {
51 const promises: Promise<any>[] = []
52
53 if (video.isOwned()) {
54 promises.push(Redis.Instance.addLocalVideoView(video.id))
55 }
56
57 promises.push(Redis.Instance.addVideoViewStats(video.id))
58
59 await Promise.all(promises)
60 }
61
62 private async hasEnoughWatchTime (video: MVideoImmutable, watchTime: number) {
63 const { duration, isLive } = await getCachedVideoDuration(video.id)
64
65 if (isLive || duration >= 30) return watchTime >= 30
66
67 // Check more than 50% of the video is watched
68 return duration / watchTime < 2
69 }
70}
diff --git a/server/lib/views/video-views-manager.ts b/server/lib/views/video-views-manager.ts
deleted file mode 100644
index c088dad5e..000000000
--- a/server/lib/views/video-views-manager.ts
+++ /dev/null
@@ -1,100 +0,0 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { MVideo, MVideoImmutable } from '@server/types/models'
3import { VideoViewEvent } from '@shared/models'
4import { VideoScope, VideoViewerCounters, VideoViewerStats, VideoViews, ViewerScope } from './shared'
5
6/**
7 * If processing a local view:
8 * - We update viewer information (segments watched, watch time etc)
9 * - We add +1 to video viewers counter if this is a new viewer
10 * - We add +1 to video views counter if this is a new view and if the user watched enough seconds
11 * - We send AP message to notify about this viewer and this view
12 * - We update last video time for the user if authenticated
13 *
14 * If processing a remote view:
15 * - We add +1 to video viewers counter
16 * - We add +1 to video views counter
17 *
18 * A viewer is a someone that watched one or multiple sections of a video
19 * A viewer that watched only a few seconds of a video may not increment the video views counter
20 * Viewers statistics are sent to origin instance using the `WatchAction` ActivityPub object
21 *
22 */
23
24const lTags = loggerTagsFactory('views')
25
26export class VideoViewsManager {
27
28 private static instance: VideoViewsManager
29
30 private videoViewerStats: VideoViewerStats
31 private videoViewerCounters: VideoViewerCounters
32 private videoViews: VideoViews
33
34 private constructor () {
35 }
36
37 init () {
38 this.videoViewerStats = new VideoViewerStats()
39 this.videoViewerCounters = new VideoViewerCounters()
40 this.videoViews = new VideoViews()
41 }
42
43 async processLocalView (options: {
44 video: MVideoImmutable
45 currentTime: number
46 ip: string | null
47 viewEvent?: VideoViewEvent
48 }) {
49 const { video, ip, viewEvent, currentTime } = options
50
51 logger.debug('Processing local view for %s and ip %s.', video.url, ip, lTags())
52
53 await this.videoViewerStats.addLocalViewer({ video, ip, viewEvent, currentTime })
54
55 const successViewer = await this.videoViewerCounters.addLocalViewer({ video, ip })
56
57 // Do it after added local viewer to fetch updated information
58 const watchTime = await this.videoViewerStats.getWatchTime(video.id, ip)
59
60 const successView = await this.videoViews.addLocalView({ video, watchTime, ip })
61
62 return { successView, successViewer }
63 }
64
65 async processRemoteView (options: {
66 video: MVideo
67 viewerId: string | null
68 viewerExpires?: Date
69 }) {
70 const { video, viewerId, viewerExpires } = options
71
72 logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() })
73
74 if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires })
75 else await this.videoViews.addRemoteView({ video })
76 }
77
78 getViewers (video: MVideo) {
79 return this.videoViewerCounters.getViewers(video)
80 }
81
82 getTotalViewers (options: {
83 viewerScope: ViewerScope
84 videoScope: VideoScope
85 }) {
86 return this.videoViewerCounters.getTotalViewers(options)
87 }
88
89 buildViewerExpireTime () {
90 return this.videoViewerCounters.buildViewerExpireTime()
91 }
92
93 processViewerStats () {
94 return this.videoViewerStats.processViewerStats()
95 }
96
97 static get Instance () {
98 return this.instance || (this.instance = new this())
99 }
100}