aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-11-09 10:11:20 +0100
committerChocobozzz <chocobozzz@cpy.re>2021-11-09 15:00:31 +0100
commit51353d9a035fb6b81f903a8b5f391292841649fd (patch)
tree75acb6eea5e043bf2e15a6a5a92e9a3c5967b156 /server
parent221ee1adc916684d4881d2a9c4c01954dcde986e (diff)
downloadPeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.gz
PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.zst
PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.zip
Refactor video views
Introduce viewers attribute for live videos Count views for live videos Reduce delay to see the viewer update for lives Add ability to configure video views buffer interval and view ip expiration
Diffstat (limited to 'server')
-rw-r--r--server/controllers/api/videos/index.ts44
-rw-r--r--server/initializers/checker-before-init.ts2
-rw-r--r--server/initializers/config.ts4
-rw-r--r--server/initializers/constants.ts20
-rw-r--r--server/lib/activitypub/process/process-view.ts32
-rw-r--r--server/lib/activitypub/send/send-view.ts4
-rw-r--r--server/lib/activitypub/videos/updater.ts1
-rw-r--r--server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts)36
-rw-r--r--server/lib/job-queue/job-queue.ts12
-rw-r--r--server/lib/live/live-manager.ts49
-rw-r--r--server/lib/peertube-socket.ts8
-rw-r--r--server/lib/redis.ts112
-rw-r--r--server/lib/schedulers/video-views-buffer-scheduler.ts52
-rw-r--r--server/lib/video-views.ts130
-rw-r--r--server/models/video/formatter/video-format-utils.ts5
-rw-r--r--server/tests/api/live/live-views.ts109
-rw-r--r--server/tests/api/server/jobs.ts2
17 files changed, 388 insertions, 234 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index 821161c64..72b382595 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -2,7 +2,7 @@ import express from 'express'
2import toInt from 'validator/lib/toInt' 2import toInt from 'validator/lib/toInt'
3import { pickCommonVideoQuery } from '@server/helpers/query' 3import { pickCommonVideoQuery } from '@server/helpers/query'
4import { doJSONRequest } from '@server/helpers/requests' 4import { doJSONRequest } from '@server/helpers/requests'
5import { LiveManager } from '@server/lib/live' 5import { VideoViews } from '@server/lib/video-views'
6import { openapiOperationDoc } from '@server/middlewares/doc' 6import { openapiOperationDoc } from '@server/middlewares/doc'
7import { getServerActor } from '@server/models/application/application' 7import { getServerActor } from '@server/models/application/application'
8import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' 8import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils'
@@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
17import { sendView } from '../../../lib/activitypub/send/send-view' 17import { sendView } from '../../../lib/activitypub/send/send-view'
18import { JobQueue } from '../../../lib/job-queue' 18import { JobQueue } from '../../../lib/job-queue'
19import { Hooks } from '../../../lib/plugins/hooks' 19import { Hooks } from '../../../lib/plugins/hooks'
20import { Redis } from '../../../lib/redis'
21import { 20import {
22 asyncMiddleware, 21 asyncMiddleware,
23 asyncRetryTransactionMiddleware, 22 asyncRetryTransactionMiddleware,
@@ -107,7 +106,7 @@ videosRouter.get('/:id',
107) 106)
108videosRouter.post('/:id/views', 107videosRouter.post('/:id/views',
109 openapiOperationDoc({ operationId: 'addView' }), 108 openapiOperationDoc({ operationId: 'addView' }),
110 asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')), 109 asyncMiddleware(videosCustomGetValidator('only-video')),
111 asyncMiddleware(viewVideo) 110 asyncMiddleware(viewVideo)
112) 111)
113 112
@@ -153,44 +152,17 @@ function getVideo (_req: express.Request, res: express.Response) {
153} 152}
154 153
155async function viewVideo (req: express.Request, res: express.Response) { 154async function viewVideo (req: express.Request, res: express.Response) {
156 const immutableVideoAttrs = res.locals.onlyImmutableVideo 155 const video = res.locals.onlyVideo
157 156
158 const ip = req.ip 157 const ip = req.ip
159 const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) 158 const success = await VideoViews.Instance.processView({ video, ip })
160 if (exists) {
161 logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
162 return res.status(HttpStatusCode.NO_CONTENT_204).end()
163 }
164
165 const video = await VideoModel.load(immutableVideoAttrs.id)
166
167 const promises: Promise<any>[] = [
168 Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
169 ]
170
171 let federateView = true
172
173 // Increment our live manager
174 if (video.isLive && video.isOwned()) {
175 LiveManager.Instance.addViewTo(video.id)
176
177 // Views of our local live will be sent by our live manager
178 federateView = false
179 }
180
181 // Increment our video views cache counter
182 if (!video.isLive) {
183 promises.push(Redis.Instance.addVideoView(video.id))
184 }
185 159
186 if (federateView) { 160 if (success) {
187 const serverActor = await getServerActor() 161 const serverActor = await getServerActor()
188 promises.push(sendView(serverActor, video, undefined)) 162 await sendView(serverActor, video, undefined)
189 }
190
191 await Promise.all(promises)
192 163
193 Hooks.runAction('action:api.video.viewed', { video, ip }) 164 Hooks.runAction('action:api.video.viewed', { video: video, ip })
165 }
194 166
195 return res.status(HttpStatusCode.NO_CONTENT_204).end() 167 return res.status(HttpStatusCode.NO_CONTENT_204).end()
196} 168}
diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts
index 1015c5e45..51c396548 100644
--- a/server/initializers/checker-before-init.ts
+++ b/server/initializers/checker-before-init.ts
@@ -38,7 +38,7 @@ function checkMissedConfig () {
38 'services.twitter.username', 'services.twitter.whitelisted', 38 'services.twitter.username', 'services.twitter.whitelisted',
39 'followers.instance.enabled', 'followers.instance.manual_approval', 39 'followers.instance.enabled', 'followers.instance.manual_approval',
40 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', 40 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces',
41 'history.videos.max_age', 'views.videos.remote.max_age', 41 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration',
42 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', 42 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
43 'theme.default', 43 'theme.default',
44 'remote_redundancy.videos.accept_from', 44 'remote_redundancy.videos.accept_from',
diff --git a/server/initializers/config.ts b/server/initializers/config.ts
index 1288768d8..dadda2a77 100644
--- a/server/initializers/config.ts
+++ b/server/initializers/config.ts
@@ -182,7 +182,9 @@ const CONFIG = {
182 VIDEOS: { 182 VIDEOS: {
183 REMOTE: { 183 REMOTE: {
184 MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) 184 MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age'))
185 } 185 },
186 LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')),
187 IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration'))
186 } 188 }
187 }, 189 },
188 PLUGINS: { 190 PLUGINS: {
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 845576667..b65741bbd 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
148 'video-import': 1, 148 'video-import': 1,
149 'email': 5, 149 'email': 5,
150 'actor-keys': 3, 150 'actor-keys': 3,
151 'videos-views': 1, 151 'videos-views-stats': 1,
152 'activitypub-refresher': 1, 152 'activitypub-refresher': 1,
153 'video-redundancy': 1, 153 'video-redundancy': 1,
154 'video-live-ending': 1, 154 'video-live-ending': 1,
@@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
164 'video-file-import': 1, 164 'video-file-import': 1,
165 'email': 5, 165 'email': 5,
166 'actor-keys': 1, 166 'actor-keys': 1,
167 'videos-views': 1, 167 'videos-views-stats': 1,
168 'activitypub-refresher': 1, 168 'activitypub-refresher': 1,
169 'video-redundancy': 1, 169 'video-redundancy': 1,
170 'video-live-ending': 10, 170 'video-live-ending': 10,
@@ -181,14 +181,14 @@ const JOB_TTL: { [id in JobType]: number } = {
181 'video-import': 1000 * 3600 * 2, // 2 hours 181 'video-import': 1000 * 3600 * 2, // 2 hours
182 'email': 60000 * 10, // 10 minutes 182 'email': 60000 * 10, // 10 minutes
183 'actor-keys': 60000 * 20, // 20 minutes 183 'actor-keys': 60000 * 20, // 20 minutes
184 'videos-views': undefined, // Unlimited 184 'videos-views-stats': undefined, // Unlimited
185 'activitypub-refresher': 60000 * 10, // 10 minutes 185 'activitypub-refresher': 60000 * 10, // 10 minutes
186 'video-redundancy': 1000 * 3600 * 3, // 3 hours 186 'video-redundancy': 1000 * 3600 * 3, // 3 hours
187 'video-live-ending': 1000 * 60 * 10, // 10 minutes 187 'video-live-ending': 1000 * 60 * 10, // 10 minutes
188 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours 188 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
189} 189}
190const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 190const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
191 'videos-views': { 191 'videos-views-stats': {
192 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour 192 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
193 }, 193 },
194 'activitypub-cleaner': { 194 'activitypub-cleaner': {
@@ -211,6 +211,7 @@ const SCHEDULER_INTERVALS_MS = {
211 REMOVE_OLD_JOBS: 60000 * 60, // 1 hour 211 REMOVE_OLD_JOBS: 60000 * 60, // 1 hour
212 UPDATE_VIDEOS: 60000, // 1 minute 212 UPDATE_VIDEOS: 60000, // 1 minute
213 YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day 213 YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day
214 VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL,
214 CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, 215 CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
215 CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day 216 CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day
216 AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day 217 AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day
@@ -343,8 +344,8 @@ const CONSTRAINTS_FIELDS = {
343} 344}
344 345
345const VIEW_LIFETIME = { 346const VIEW_LIFETIME = {
346 VIDEO: 60000 * 60, // 1 hour 347 VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
347 LIVE: 60000 * 5 // 5 minutes 348 VIEWER: 60000 * 5 // 5 minutes
348} 349}
349 350
350let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour 351let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
@@ -789,13 +790,12 @@ if (isTestInstance() === true) {
789 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 790 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
790 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 791 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
791 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 792 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
792 REPEAT_JOBS['videos-views'] = { every: 5000 } 793 REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
793 REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } 794 REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
794 795
795 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 796 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
796 797
797 VIEW_LIFETIME.VIDEO = 1000 // 1 second 798 VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second
798 VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
799 CONTACT_FORM_LIFETIME = 1000 // 1 second 799 CONTACT_FORM_LIFETIME = 1000 // 1 second
800 800
801 JOB_ATTEMPTS['email'] = 1 801 JOB_ATTEMPTS['email'] = 1
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts
index 5593ee257..720385f9b 100644
--- a/server/lib/activitypub/process/process-view.ts
+++ b/server/lib/activitypub/process/process-view.ts
@@ -1,13 +1,13 @@
1import { getOrCreateAPVideo } from '../videos' 1import { VideoViews } from '@server/lib/video-views'
2import { forwardVideoRelatedActivity } from '../send/utils' 2import { ActivityView } from '../../../../shared/models/activitypub'
3import { Redis } from '../../redis'
4import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
5import { APProcessorOptions } from '../../../types/activitypub-processor.model' 3import { APProcessorOptions } from '../../../types/activitypub-processor.model'
6import { MActorSignature } from '../../../types/models' 4import { MActorSignature } from '../../../types/models'
7import { LiveManager } from '@server/lib/live/live-manager' 5import { forwardVideoRelatedActivity } from '../send/utils'
6import { getOrCreateAPVideo } from '../videos'
8 7
9async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { 8async function processViewActivity (options: APProcessorOptions<ActivityView>) {
10 const { activity, byActor } = options 9 const { activity, byActor } = options
10
11 return processCreateView(activity, byActor) 11 return processCreateView(activity, byActor)
12} 12}
13 13
@@ -19,10 +19,8 @@ export {
19 19
20// --------------------------------------------------------------------------- 20// ---------------------------------------------------------------------------
21 21
22async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { 22async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
23 const videoObject = activity.type === 'View' 23 const videoObject = activity.object
24 ? activity.object
25 : (activity.object as ViewObject).object
26 24
27 const { video } = await getOrCreateAPVideo({ 25 const { video } = await getOrCreateAPVideo({
28 videoObject, 26 videoObject,
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
30 allowRefresh: false 28 allowRefresh: false
31 }) 29 })
32 30
33 if (!video.isLive) { 31 const viewerExpires = activity.expires
34 await Redis.Instance.addVideoView(video.id) 32 ? new Date(activity.expires)
35 } 33 : undefined
36 34
37 if (video.isOwned()) { 35 await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
38 // Our live manager will increment the counter and send the view to followers
39 if (video.isLive) {
40 LiveManager.Instance.addViewTo(video.id)
41 return
42 }
43 36
37 if (video.isOwned()) {
44 // Forward the view but don't resend the activity to the sender 38 // Forward the view but don't resend the activity to the sender
45 const exceptions = [ byActor ] 39 const exceptions = [ byActor ]
46 await forwardVideoRelatedActivity(activity, undefined, exceptions, video) 40 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts
index 153e94295..b12583e26 100644
--- a/server/lib/activitypub/send/send-view.ts
+++ b/server/lib/activitypub/send/send-view.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { VideoViews } from '@server/lib/video-views'
2import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' 3import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
3import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' 4import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
4import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU
27 id: url, 28 id: url,
28 type: 'View' as 'View', 29 type: 'View' as 'View',
29 actor: byActor.url, 30 actor: byActor.url,
30 object: video.url 31 object: video.url,
32 expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
31 }, 33 },
32 audience 34 audience
33 ) 35 )
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts
index 157569414..f786bb196 100644
--- a/server/lib/activitypub/videos/updater.ts
+++ b/server/lib/activitypub/videos/updater.ts
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
81 81
82 if (videoUpdated.isLive) { 82 if (videoUpdated.isLive) {
83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) 83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
84 PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
85 } 84 }
86 85
87 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) 86 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts
index 86d0a271f..caf5f6962 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views-stats.ts
@@ -1,11 +1,10 @@
1import { Redis } from '../../redis' 1import { isTestInstance } from '../../../helpers/core-utils'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video' 3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-view' 4import { VideoViewModel } from '../../../models/video/video-view'
5import { isTestInstance } from '../../../helpers/core-utils' 5import { Redis } from '../../redis'
6import { federateVideoIfNeeded } from '../../activitypub/videos'
7 6
8async function processVideosViews () { 7async function processVideosViewsStats () {
9 const lastHour = new Date() 8 const lastHour = new Date()
10 9
11 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour 10 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@@ -15,23 +14,23 @@ async function processVideosViews () {
15 const startDate = lastHour.setMinutes(0, 0, 0) 14 const startDate = lastHour.setMinutes(0, 0, 0)
16 const endDate = lastHour.setMinutes(59, 59, 999) 15 const endDate = lastHour.setMinutes(59, 59, 999)
17 16
18 const videoIds = await Redis.Instance.getVideosIdViewed(hour) 17 const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
19 if (videoIds.length === 0) return 18 if (videoIds.length === 0) return
20 19
21 logger.info('Processing videos views in job for hour %d.', hour) 20 logger.info('Processing videos views stats in job for hour %d.', hour)
22 21
23 for (const videoId of videoIds) { 22 for (const videoId of videoIds) {
24 try { 23 try {
25 const views = await Redis.Instance.getVideoViews(videoId, hour) 24 const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
26 await Redis.Instance.deleteVideoViews(videoId, hour) 25 await Redis.Instance.deleteVideoViewsStats(videoId, hour)
27 26
28 if (views) { 27 if (views) {
29 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 28 logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
30 29
31 try { 30 try {
32 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 31 const video = await VideoModel.load(videoId)
33 if (!video) { 32 if (!video) {
34 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) 33 logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
35 continue 34 continue
36 } 35 }
37 36
@@ -41,21 +40,12 @@ async function processVideosViews () {
41 views, 40 views,
42 videoId 41 videoId
43 }) 42 })
44
45 if (video.isOwned()) {
46 // If this is a remote video, the origin instance will send us an update
47 await VideoModel.incrementViews(videoId, views)
48
49 // Send video update
50 video.views += views
51 await federateVideoIfNeeded(video, false)
52 }
53 } catch (err) { 43 } catch (err) {
54 logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) 44 logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
55 } 45 }
56 } 46 }
57 } catch (err) { 47 } catch (err) {
58 logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) 48 logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
59 } 49 }
60 } 50 }
61} 51}
@@ -63,5 +53,5 @@ async function processVideosViews () {
63// --------------------------------------------------------------------------- 53// ---------------------------------------------------------------------------
64 54
65export { 55export {
66 processVideosViews 56 processVideosViewsStats
67} 57}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0eab720d9..4c1597b33 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import'
36import { processVideoImport } from './handlers/video-import' 36import { processVideoImport } from './handlers/video-import'
37import { processVideoLiveEnding } from './handlers/video-live-ending' 37import { processVideoLiveEnding } from './handlers/video-live-ending'
38import { processVideoTranscoding } from './handlers/video-transcoding' 38import { processVideoTranscoding } from './handlers/video-transcoding'
39import { processVideosViews } from './handlers/video-views' 39import { processVideosViewsStats } from './handlers/video-views-stats'
40 40
41type CreateJobArgument = 41type CreateJobArgument =
42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -49,7 +49,7 @@ type CreateJobArgument =
49 { type: 'email', payload: EmailPayload } | 49 { type: 'email', payload: EmailPayload } |
50 { type: 'video-import', payload: VideoImportPayload } | 50 { type: 'video-import', payload: VideoImportPayload } |
51 { type: 'activitypub-refresher', payload: RefreshPayload } | 51 { type: 'activitypub-refresher', payload: RefreshPayload } |
52 { type: 'videos-views', payload: {} } | 52 { type: 'videos-views-stats', payload: {} } |
53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } | 53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
54 { type: 'actor-keys', payload: ActorKeysPayload } | 54 { type: 'actor-keys', payload: ActorKeysPayload } |
55 { type: 'video-redundancy', payload: VideoRedundancyPayload } | 55 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
71 'video-transcoding': processVideoTranscoding, 71 'video-transcoding': processVideoTranscoding,
72 'email': processEmail, 72 'email': processEmail,
73 'video-import': processVideoImport, 73 'video-import': processVideoImport,
74 'videos-views': processVideosViews, 74 'videos-views-stats': processVideosViewsStats,
75 'activitypub-refresher': refreshAPObject, 75 'activitypub-refresher': refreshAPObject,
76 'video-live-ending': processVideoLiveEnding, 76 'video-live-ending': processVideoLiveEnding,
77 'actor-keys': processActorKeys, 77 'actor-keys': processActorKeys,
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [
89 'video-transcoding', 89 'video-transcoding',
90 'video-file-import', 90 'video-file-import',
91 'video-import', 91 'video-import',
92 'videos-views', 92 'videos-views-stats',
93 'activitypub-refresher', 93 'activitypub-refresher',
94 'video-redundancy', 94 'video-redundancy',
95 'actor-keys', 95 'actor-keys',
@@ -247,8 +247,8 @@ class JobQueue {
247 } 247 }
248 248
249 private addRepeatableJobs () { 249 private addRepeatableJobs () {
250 this.queues['videos-views'].add({}, { 250 this.queues['videos-views-stats'].add({}, {
251 repeat: REPEAT_JOBS['videos-views'] 251 repeat: REPEAT_JOBS['videos-views-stats']
252 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 252 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
253 253
254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 1b7b9dd4d..2562edb75 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,7 +2,6 @@
2import { readFile } from 'fs-extra' 2import { readFile } from 'fs-extra'
3import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { isTestInstance } from '@server/helpers/core-utils'
6import { 5import {
7 computeResolutionsToTranscode, 6 computeResolutionsToTranscode,
8 ffprobePromise, 7 ffprobePromise,
@@ -12,7 +11,7 @@ import {
12} from '@server/helpers/ffprobe-utils' 11} from '@server/helpers/ffprobe-utils'
13import { logger, loggerTagsFactory } from '@server/helpers/logger' 12import { logger, loggerTagsFactory } from '@server/helpers/logger'
14import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 13import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
15import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' 14import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
16import { UserModel } from '@server/models/user/user' 15import { UserModel } from '@server/models/user/user'
17import { VideoModel } from '@server/models/video/video' 16import { VideoModel } from '@server/models/video/video'
18import { VideoLiveModel } from '@server/models/video/video-live' 17import { VideoLiveModel } from '@server/models/video/video-live'
@@ -53,8 +52,6 @@ class LiveManager {
53 52
54 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
55 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<number, string>()
56 // Values are Date().getTime()
57 private readonly watchersPerVideo = new Map<number, number[]>()
58 55
59 private rtmpServer: Server 56 private rtmpServer: Server
60 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -99,8 +96,6 @@ class LiveManager {
99 // Cleanup broken lives, that were terminated by a server restart for example 96 // Cleanup broken lives, that were terminated by a server restart for example
100 this.handleBrokenLives() 97 this.handleBrokenLives()
101 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) 98 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
102
103 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
104 } 99 }
105 100
106 async run () { 101 async run () {
@@ -184,19 +179,6 @@ class LiveManager {
184 this.abortSession(sessionId) 179 this.abortSession(sessionId)
185 } 180 }
186 181
187 addViewTo (videoId: number) {
188 if (this.videoSessions.has(videoId) === false) return
189
190 let watchers = this.watchersPerVideo.get(videoId)
191
192 if (!watchers) {
193 watchers = []
194 this.watchersPerVideo.set(videoId, watchers)
195 }
196
197 watchers.push(new Date().getTime())
198 }
199
200 private getContext () { 182 private getContext () {
201 return context 183 return context
202 } 184 }
@@ -377,7 +359,6 @@ class LiveManager {
377 } 359 }
378 360
379 private onMuxingFFmpegEnd (videoId: number) { 361 private onMuxingFFmpegEnd (videoId: number) {
380 this.watchersPerVideo.delete(videoId)
381 this.videoSessions.delete(videoId) 362 this.videoSessions.delete(videoId)
382 } 363 }
383 364
@@ -411,34 +392,6 @@ class LiveManager {
411 } 392 }
412 } 393 }
413 394
414 private async updateLiveViews () {
415 if (!this.isRunning()) return
416
417 if (!isTestInstance()) logger.info('Updating live video views.', lTags())
418
419 for (const videoId of this.watchersPerVideo.keys()) {
420 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
421
422 const watchers = this.watchersPerVideo.get(videoId)
423
424 const numWatchers = watchers.length
425
426 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
427 video.views = numWatchers
428 await video.save()
429
430 await federateVideoIfNeeded(video, false)
431
432 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
433
434 // Only keep not expired watchers
435 const newWatchers = watchers.filter(w => w > notBefore)
436 this.watchersPerVideo.set(videoId, newWatchers)
437
438 logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
439 }
440 }
441
442 private async handleBrokenLives () { 395 private async handleBrokenLives () {
443 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 396 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
444 397
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 901435dea..0398ca61d 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -1,7 +1,7 @@
1import { Server as HTTPServer } from 'http' 1import { Server as HTTPServer } from 'http'
2import { Namespace, Server as SocketServer, Socket } from 'socket.io' 2import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3import { isIdValid } from '@server/helpers/custom-validators/misc' 3import { isIdValid } from '@server/helpers/custom-validators/misc'
4import { MVideo } from '@server/types/models' 4import { MVideo, MVideoImmutable } from '@server/types/models'
5import { UserNotificationModelForApi } from '@server/types/models/user' 5import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' 6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger' 7import { logger } from '../helpers/logger'
@@ -78,11 +78,11 @@ class PeerTubeSocket {
78 .emit(type, data) 78 .emit(type, data)
79 } 79 }
80 80
81 sendVideoViewsUpdate (video: MVideo) { 81 sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
82 const data: LiveVideoEventPayload = { views: video.views } 82 const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
83 const type: LiveVideoEventType = 'views-change' 83 const type: LiveVideoEventType = 'views-change'
84 84
85 logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) 85 logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
86 86
87 this.liveVideosNamespace 87 this.liveVideosNamespace
88 .in(video.id) 88 .in(video.id)
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 46617b07e..76b7868e8 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -13,6 +13,7 @@ import {
13 RESUMABLE_UPLOAD_SESSION_LIFETIME 13 RESUMABLE_UPLOAD_SESSION_LIFETIME
14} from '../initializers/constants' 14} from '../initializers/constants'
15import { CONFIG } from '../initializers/config' 15import { CONFIG } from '../initializers/config'
16import { exists } from '@server/helpers/custom-validators/misc'
16 17
17type CachedRoute = { 18type CachedRoute = {
18 body: string 19 body: string
@@ -119,16 +120,20 @@ class Redis {
119 120
120 /* ************ Views per IP ************ */ 121 /* ************ Views per IP ************ */
121 122
122 setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { 123 setIPVideoView (ip: string, videoUUID: string) {
123 const lifetime = isLive 124 return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
124 ? VIEW_LIFETIME.LIVE 125 }
125 : VIEW_LIFETIME.VIDEO
126 126
127 return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) 127 setIPVideoViewer (ip: string, videoUUID: string) {
128 return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
128 } 129 }
129 130
130 async doesVideoIPViewExist (ip: string, videoUUID: string) { 131 async doesVideoIPViewExist (ip: string, videoUUID: string) {
131 return this.exists(this.generateViewKey(ip, videoUUID)) 132 return this.exists(this.generateIPViewKey(ip, videoUUID))
133 }
134
135 async doesVideoIPViewerExist (ip: string, videoUUID: string) {
136 return this.exists(this.generateIPViewerKey(ip, videoUUID))
132 } 137 }
133 138
134 /* ************ Tracker IP block ************ */ 139 /* ************ Tracker IP block ************ */
@@ -160,46 +165,85 @@ class Redis {
160 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) 165 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
161 } 166 }
162 167
163 /* ************ Video views ************ */ 168 /* ************ Video views stats ************ */
164 169
165 addVideoView (videoId: number) { 170 addVideoViewStats (videoId: number) {
166 const keyIncr = this.generateVideoViewKey(videoId) 171 const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
167 const keySet = this.generateVideosViewKey()
168 172
169 return Promise.all([ 173 return Promise.all([
170 this.addToSet(keySet, videoId.toString()), 174 this.addToSet(setKey, videoId.toString()),
171 this.increment(keyIncr) 175 this.increment(videoKey)
172 ]) 176 ])
173 } 177 }
174 178
175 async getVideoViews (videoId: number, hour: number) { 179 async getVideoViewsStats (videoId: number, hour: number) {
176 const key = this.generateVideoViewKey(videoId, hour) 180 const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
177 181
178 const valueString = await this.getValue(key) 182 const valueString = await this.getValue(videoKey)
179 const valueInt = parseInt(valueString, 10) 183 const valueInt = parseInt(valueString, 10)
180 184
181 if (isNaN(valueInt)) { 185 if (isNaN(valueInt)) {
182 logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) 186 logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
183 return undefined 187 return undefined
184 } 188 }
185 189
186 return valueInt 190 return valueInt
187 } 191 }
188 192
189 async getVideosIdViewed (hour: number) { 193 async listVideosViewedForStats (hour: number) {
190 const key = this.generateVideosViewKey(hour) 194 const { setKey } = this.generateVideoViewStatsKeys({ hour })
191 195
192 const stringIds = await this.getSet(key) 196 const stringIds = await this.getSet(setKey)
193 return stringIds.map(s => parseInt(s, 10)) 197 return stringIds.map(s => parseInt(s, 10))
194 } 198 }
195 199
196 deleteVideoViews (videoId: number, hour: number) { 200 deleteVideoViewsStats (videoId: number, hour: number) {
197 const keySet = this.generateVideosViewKey(hour) 201 const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
198 const keyIncr = this.generateVideoViewKey(videoId, hour) 202
203 return Promise.all([
204 this.deleteFromSet(setKey, videoId.toString()),
205 this.deleteKey(videoKey)
206 ])
207 }
208
209 /* ************ Local video views buffer ************ */
210
211 addLocalVideoView (videoId: number) {
212 const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
199 213
200 return Promise.all([ 214 return Promise.all([
201 this.deleteFromSet(keySet, videoId.toString()), 215 this.addToSet(setKey, videoId.toString()),
202 this.deleteKey(keyIncr) 216 this.increment(videoKey)
217 ])
218 }
219
220 async getLocalVideoViews (videoId: number) {
221 const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
222
223 const valueString = await this.getValue(videoKey)
224 const valueInt = parseInt(valueString, 10)
225
226 if (isNaN(valueInt)) {
227 logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
228 return undefined
229 }
230
231 return valueInt
232 }
233
234 async listLocalVideosViewed () {
235 const { setKey } = this.generateLocalVideoViewsKeys()
236
237 const stringIds = await this.getSet(setKey)
238 return stringIds.map(s => parseInt(s, 10))
239 }
240
241 deleteLocalVideoViews (videoId: number) {
242 const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
243
244 return Promise.all([
245 this.deleteFromSet(setKey, videoId.toString()),
246 this.deleteKey(videoKey)
203 ]) 247 ])
204 } 248 }
205 249
@@ -233,16 +277,16 @@ class Redis {
233 return req.method + '-' + req.originalUrl 277 return req.method + '-' + req.originalUrl
234 } 278 }
235 279
236 private generateVideosViewKey (hour?: number) { 280 private generateLocalVideoViewsKeys (videoId?: Number) {
237 if (!hour) hour = new Date().getHours() 281 return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
238
239 return `videos-view-h${hour}`
240 } 282 }
241 283
242 private generateVideoViewKey (videoId: number, hour?: number) { 284 private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
243 if (hour === undefined || hour === null) hour = new Date().getHours() 285 const hour = exists(options.hour)
286 ? options.hour
287 : new Date().getHours()
244 288
245 return `video-view-${videoId}-h${hour}` 289 return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
246 } 290 }
247 291
248 private generateResetPasswordKey (userId: number) { 292 private generateResetPasswordKey (userId: number) {
@@ -253,10 +297,14 @@ class Redis {
253 return 'verify-email-' + userId 297 return 'verify-email-' + userId
254 } 298 }
255 299
256 private generateViewKey (ip: string, videoUUID: string) { 300 private generateIPViewKey (ip: string, videoUUID: string) {
257 return `views-${videoUUID}-${ip}` 301 return `views-${videoUUID}-${ip}`
258 } 302 }
259 303
304 private generateIPViewerKey (ip: string, videoUUID: string) {
305 return `viewer-${videoUUID}-${ip}`
306 }
307
260 private generateTrackerBlockIPKey (ip: string) { 308 private generateTrackerBlockIPKey (ip: string) {
261 return `tracker-block-ip-${ip}` 309 return `tracker-block-ip-${ip}`
262 } 310 }
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts
new file mode 100644
index 000000000..c0e72c461
--- /dev/null
+++ b/server/lib/schedulers/video-views-buffer-scheduler.ts
@@ -0,0 +1,52 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VideoModel } from '@server/models/video/video'
3import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { federateVideoIfNeeded } from '../activitypub/videos'
5import { Redis } from '../redis'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViewsBufferScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const videoIds = await Redis.Instance.listLocalVideosViewed()
22 if (videoIds.length === 0) return
23
24 logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
25
26 for (const videoId of videoIds) {
27 try {
28 const views = await Redis.Instance.getLocalVideoViews(videoId)
29 await Redis.Instance.deleteLocalVideoViews(videoId)
30
31 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
32 if (!video) {
33 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
34 continue
35 }
36
37 // If this is a remote video, the origin instance will send us an update
38 await VideoModel.incrementViews(videoId, views)
39
40 // Send video update
41 video.views += views
42 await federateVideoIfNeeded(video, false)
43 } catch (err) {
44 logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
45 }
46 }
47 }
48
49 static get Instance () {
50 return this.instance || (this.instance = new this())
51 }
52}
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts
new file mode 100644
index 000000000..220b509c2
--- /dev/null
+++ b/server/lib/video-views.ts
@@ -0,0 +1,130 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VIEW_LIFETIME } from '@server/initializers/constants'
3import { VideoModel } from '@server/models/video/video'
4import { MVideo } from '@server/types/models'
5import { PeerTubeSocket } from './peertube-socket'
6import { Redis } from './redis'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViews {
11
12 // Values are Date().getTime()
13 private readonly viewersPerVideo = new Map<number, number[]>()
14
15 private static instance: VideoViews
16
17 private constructor () {
18 }
19
20 init () {
21 setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
22 }
23
24 async processView (options: {
25 video: MVideo
26 ip: string | null
27 viewerExpires?: Date
28 }) {
29 const { video, ip, viewerExpires } = options
30
31 logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
32
33 let success = await this.addView(video, ip)
34
35 if (video.isLive) {
36 const successViewer = await this.addViewer(video, ip, viewerExpires)
37 success ||= successViewer
38 }
39
40 return success
41 }
42
43 getViewers (video: MVideo) {
44 const viewers = this.viewersPerVideo.get(video.id)
45 if (!viewers) return 0
46
47 return viewers.length
48 }
49
50 buildViewerExpireTime () {
51 return new Date().getTime() + VIEW_LIFETIME.VIEWER
52 }
53
54 private async addView (video: MVideo, ip: string | null) {
55 const promises: Promise<any>[] = []
56
57 if (ip !== null) {
58 const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
59 if (viewExists) return false
60
61 promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
62 }
63
64 if (video.isOwned()) {
65 promises.push(Redis.Instance.addLocalVideoView(video.id))
66 }
67
68 promises.push(Redis.Instance.addVideoViewStats(video.id))
69
70 await Promise.all(promises)
71
72 return true
73 }
74
75 private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
76 if (ip !== null) {
77 const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
78 if (viewExists) return false
79
80 await Redis.Instance.setIPVideoViewer(ip, video.uuid)
81 }
82
83 let watchers = this.viewersPerVideo.get(video.id)
84
85 if (!watchers) {
86 watchers = []
87 this.viewersPerVideo.set(video.id, watchers)
88 }
89
90 const expiration = viewerExpires
91 ? viewerExpires.getTime()
92 : this.buildViewerExpireTime()
93
94 watchers.push(expiration)
95 await this.notifyClients(video.id, watchers.length)
96
97 return true
98 }
99
100 private async cleanViewers () {
101 logger.info('Cleaning video viewers.', lTags())
102
103 for (const videoId of this.viewersPerVideo.keys()) {
104 const notBefore = new Date().getTime()
105
106 const viewers = this.viewersPerVideo.get(videoId)
107
108 // Only keep not expired viewers
109 const newViewers = viewers.filter(w => w > notBefore)
110
111 if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
112 else this.viewersPerVideo.set(videoId, newViewers)
113
114 await this.notifyClients(videoId, newViewers.length)
115 }
116 }
117
118 private async notifyClients (videoId: string | number, viewersLength: number) {
119 const video = await VideoModel.loadImmutableAttributes(videoId)
120 if (!video) return
121
122 PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
123
124 logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
125 }
126
127 static get Instance () {
128 return this.instance || (this.instance = new this())
129 }
130}
diff --git a/server/models/video/formatter/video-format-utils.ts b/server/models/video/formatter/video-format-utils.ts
index ba49e41ae..461e296df 100644
--- a/server/models/video/formatter/video-format-utils.ts
+++ b/server/models/video/formatter/video-format-utils.ts
@@ -1,6 +1,7 @@
1import { uuidToShort } from '@server/helpers/uuid' 1import { uuidToShort } from '@server/helpers/uuid'
2import { generateMagnetUri } from '@server/helpers/webtorrent' 2import { generateMagnetUri } from '@server/helpers/webtorrent'
3import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' 3import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls'
4import { VideoViews } from '@server/lib/video-views'
4import { VideosCommonQueryAfterSanitize } from '@shared/models' 5import { VideosCommonQueryAfterSanitize } from '@shared/models'
5import { VideoFile } from '@shared/models/videos/video-file.model' 6import { VideoFile } from '@shared/models/videos/video-file.model'
6import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' 7import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects'
@@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm
121 pluginData: (video as any).pluginData 122 pluginData: (video as any).pluginData
122 } 123 }
123 124
125 if (video.isLive) {
126 videoObject.viewers = VideoViews.Instance.getViewers(video)
127 }
128
124 const add = options.additionalAttributes 129 const add = options.additionalAttributes
125 if (add?.state === true) { 130 if (add?.state === true) {
126 videoObject.state = { 131 videoObject.state = {
diff --git a/server/tests/api/live/live-views.ts b/server/tests/api/live/live-views.ts
index 5e3a79c64..9186af8e7 100644
--- a/server/tests/api/live/live-views.ts
+++ b/server/tests/api/live/live-views.ts
@@ -19,7 +19,7 @@ import {
19 19
20const expect = chai.expect 20const expect = chai.expect
21 21
22describe('Test live', function () { 22describe('Live views', function () {
23 let servers: PeerTubeServer[] = [] 23 let servers: PeerTubeServer[] = []
24 24
25 before(async function () { 25 before(async function () {
@@ -47,79 +47,86 @@ describe('Test live', function () {
47 await doubleFollow(servers[0], servers[1]) 47 await doubleFollow(servers[0], servers[1])
48 }) 48 })
49 49
50 describe('Live views', function () { 50 let liveVideoId: string
51 let liveVideoId: string 51 let command: FfmpegCommand
52 let command: FfmpegCommand
53 52
54 async function countViews (expected: number) { 53 async function countViewers (expectedViewers: number) {
55 for (const server of servers) { 54 for (const server of servers) {
56 const video = await server.videos.get({ id: liveVideoId }) 55 const video = await server.videos.get({ id: liveVideoId })
57 expect(video.views).to.equal(expected) 56 expect(video.viewers).to.equal(expectedViewers)
58 }
59 } 57 }
58 }
60 59
61 before(async function () { 60 async function countViews (expectedViews: number) {
62 this.timeout(30000) 61 for (const server of servers) {
62 const video = await server.videos.get({ id: liveVideoId })
63 expect(video.views).to.equal(expectedViews)
64 }
65 }
63 66
64 const liveAttributes = { 67 before(async function () {
65 name: 'live video', 68 this.timeout(30000)
66 channelId: servers[0].store.channel.id,
67 privacy: VideoPrivacy.PUBLIC
68 }
69 69
70 const live = await servers[0].live.create({ fields: liveAttributes }) 70 const liveAttributes = {
71 liveVideoId = live.uuid 71 name: 'live video',
72 channelId: servers[0].store.channel.id,
73 privacy: VideoPrivacy.PUBLIC
74 }
72 75
73 command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) 76 const live = await servers[0].live.create({ fields: liveAttributes })
74 await waitUntilLivePublishedOnAllServers(servers, liveVideoId) 77 liveVideoId = live.uuid
75 await waitJobs(servers)
76 })
77 78
78 it('Should display no views for a live', async function () { 79 command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
79 await countViews(0) 80 await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
80 }) 81 await waitJobs(servers)
82 })
81 83
82 it('Should view a live twice and display 1 view', async function () { 84 it('Should display no views and viewers for a live', async function () {
83 this.timeout(30000) 85 await countViews(0)
86 await countViewers(0)
87 })
84 88
85 await servers[0].videos.view({ id: liveVideoId }) 89 it('Should view a live twice and display 1 view/viewer', async function () {
86 await servers[0].videos.view({ id: liveVideoId }) 90 this.timeout(30000)
87 91
88 await wait(7000) 92 await servers[0].videos.view({ id: liveVideoId })
93 await servers[0].videos.view({ id: liveVideoId })
89 94
90 await waitJobs(servers) 95 await waitJobs(servers)
96 await countViewers(1)
91 97
92 await countViews(1) 98 await wait(7000)
93 }) 99 await countViews(1)
100 })
94 101
95 it('Should wait and display 0 views', async function () { 102 it('Should wait and display 0 viewers while still have 1 view', async function () {
96 this.timeout(30000) 103 this.timeout(30000)
97 104
98 await wait(12000) 105 await wait(12000)
99 await waitJobs(servers) 106 await waitJobs(servers)
100 107
101 await countViews(0) 108 await countViews(1)
102 }) 109 await countViewers(0)
110 })
103 111
104 it('Should view a live on a remote and on local and display 2 views', async function () { 112 it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () {
105 this.timeout(30000) 113 this.timeout(30000)
106 114
107 await servers[0].videos.view({ id: liveVideoId }) 115 await servers[0].videos.view({ id: liveVideoId })
108 await servers[1].videos.view({ id: liveVideoId }) 116 await servers[1].videos.view({ id: liveVideoId })
109 await servers[1].videos.view({ id: liveVideoId }) 117 await servers[1].videos.view({ id: liveVideoId })
118 await waitJobs(servers)
110 119
111 await wait(7000) 120 await countViewers(2)
112 await waitJobs(servers)
113 121
114 await countViews(2) 122 await wait(7000)
115 }) 123 await waitJobs(servers)
116 124
117 after(async function () { 125 await countViews(3)
118 await stopFfmpeg(command)
119 })
120 }) 126 })
121 127
122 after(async function () { 128 after(async function () {
129 await stopFfmpeg(command)
123 await cleanupTests(servers) 130 await cleanupTests(servers)
124 }) 131 })
125}) 132})
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index 8c4e01226..5d946f5e8 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -56,7 +56,7 @@ describe('Test jobs', function () {
56 56
57 let job = body.data[0] 57 let job = body.data[0]
58 // Skip repeat jobs 58 // Skip repeat jobs
59 if (job.type === 'videos-views') job = body.data[1] 59 if (job.type === 'videos-views-stats') job = body.data[1]
60 60
61 expect(job.state).to.equal('completed') 61 expect(job.state).to.equal('completed')
62 expect(job.type.startsWith('activitypub-')).to.be.true 62 expect(job.type.startsWith('activitypub-')).to.be.true