aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/controllers/api/videos/index.ts44
-rw-r--r--server/initializers/checker-before-init.ts2
-rw-r--r--server/initializers/config.ts4
-rw-r--r--server/initializers/constants.ts20
-rw-r--r--server/lib/activitypub/process/process-view.ts32
-rw-r--r--server/lib/activitypub/send/send-view.ts4
-rw-r--r--server/lib/activitypub/videos/updater.ts1
-rw-r--r--server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts)36
-rw-r--r--server/lib/job-queue/job-queue.ts12
-rw-r--r--server/lib/live/live-manager.ts49
-rw-r--r--server/lib/peertube-socket.ts8
-rw-r--r--server/lib/redis.ts112
-rw-r--r--server/lib/schedulers/video-views-buffer-scheduler.ts52
-rw-r--r--server/lib/video-views.ts130
-rw-r--r--server/models/video/formatter/video-format-utils.ts5
-rw-r--r--server/tests/api/live/live-views.ts109
-rw-r--r--server/tests/api/server/jobs.ts2
17 files changed, 388 insertions, 234 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index 821161c64..72b382595 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -2,7 +2,7 @@ import express from 'express'
2import toInt from 'validator/lib/toInt' 2import toInt from 'validator/lib/toInt'
3import { pickCommonVideoQuery } from '@server/helpers/query' 3import { pickCommonVideoQuery } from '@server/helpers/query'
4import { doJSONRequest } from '@server/helpers/requests' 4import { doJSONRequest } from '@server/helpers/requests'
5import { LiveManager } from '@server/lib/live' 5import { VideoViews } from '@server/lib/video-views'
6import { openapiOperationDoc } from '@server/middlewares/doc' 6import { openapiOperationDoc } from '@server/middlewares/doc'
7import { getServerActor } from '@server/models/application/application' 7import { getServerActor } from '@server/models/application/application'
8import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' 8import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils'
@@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
17import { sendView } from '../../../lib/activitypub/send/send-view' 17import { sendView } from '../../../lib/activitypub/send/send-view'
18import { JobQueue } from '../../../lib/job-queue' 18import { JobQueue } from '../../../lib/job-queue'
19import { Hooks } from '../../../lib/plugins/hooks' 19import { Hooks } from '../../../lib/plugins/hooks'
20import { Redis } from '../../../lib/redis'
21import { 20import {
22 asyncMiddleware, 21 asyncMiddleware,
23 asyncRetryTransactionMiddleware, 22 asyncRetryTransactionMiddleware,
@@ -107,7 +106,7 @@ videosRouter.get('/:id',
107) 106)
108videosRouter.post('/:id/views', 107videosRouter.post('/:id/views',
109 openapiOperationDoc({ operationId: 'addView' }), 108 openapiOperationDoc({ operationId: 'addView' }),
110 asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')), 109 asyncMiddleware(videosCustomGetValidator('only-video')),
111 asyncMiddleware(viewVideo) 110 asyncMiddleware(viewVideo)
112) 111)
113 112
@@ -153,44 +152,17 @@ function getVideo (_req: express.Request, res: express.Response) {
153} 152}
154 153
155async function viewVideo (req: express.Request, res: express.Response) { 154async function viewVideo (req: express.Request, res: express.Response) {
156 const immutableVideoAttrs = res.locals.onlyImmutableVideo 155 const video = res.locals.onlyVideo
157 156
158 const ip = req.ip 157 const ip = req.ip
159 const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) 158 const success = await VideoViews.Instance.processView({ video, ip })
160 if (exists) {
161 logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
162 return res.status(HttpStatusCode.NO_CONTENT_204).end()
163 }
164
165 const video = await VideoModel.load(immutableVideoAttrs.id)
166
167 const promises: Promise<any>[] = [
168 Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
169 ]
170
171 let federateView = true
172
173 // Increment our live manager
174 if (video.isLive && video.isOwned()) {
175 LiveManager.Instance.addViewTo(video.id)
176
177 // Views of our local live will be sent by our live manager
178 federateView = false
179 }
180
181 // Increment our video views cache counter
182 if (!video.isLive) {
183 promises.push(Redis.Instance.addVideoView(video.id))
184 }
185 159
186 if (federateView) { 160 if (success) {
187 const serverActor = await getServerActor() 161 const serverActor = await getServerActor()
188 promises.push(sendView(serverActor, video, undefined)) 162 await sendView(serverActor, video, undefined)
189 }
190
191 await Promise.all(promises)
192 163
193 Hooks.runAction('action:api.video.viewed', { video, ip }) 164 Hooks.runAction('action:api.video.viewed', { video: video, ip })
165 }
194 166
195 return res.status(HttpStatusCode.NO_CONTENT_204).end() 167 return res.status(HttpStatusCode.NO_CONTENT_204).end()
196} 168}
diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts
index 1015c5e45..51c396548 100644
--- a/server/initializers/checker-before-init.ts
+++ b/server/initializers/checker-before-init.ts
@@ -38,7 +38,7 @@ function checkMissedConfig () {
38 'services.twitter.username', 'services.twitter.whitelisted', 38 'services.twitter.username', 'services.twitter.whitelisted',
39 'followers.instance.enabled', 'followers.instance.manual_approval', 39 'followers.instance.enabled', 'followers.instance.manual_approval',
40 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', 40 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces',
41 'history.videos.max_age', 'views.videos.remote.max_age', 41 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration',
42 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', 42 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
43 'theme.default', 43 'theme.default',
44 'remote_redundancy.videos.accept_from', 44 'remote_redundancy.videos.accept_from',
diff --git a/server/initializers/config.ts b/server/initializers/config.ts
index 1288768d8..dadda2a77 100644
--- a/server/initializers/config.ts
+++ b/server/initializers/config.ts
@@ -182,7 +182,9 @@ const CONFIG = {
182 VIDEOS: { 182 VIDEOS: {
183 REMOTE: { 183 REMOTE: {
184 MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) 184 MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age'))
185 } 185 },
186 LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')),
187 IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration'))
186 } 188 }
187 }, 189 },
188 PLUGINS: { 190 PLUGINS: {
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 845576667..b65741bbd 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
148 'video-import': 1, 148 'video-import': 1,
149 'email': 5, 149 'email': 5,
150 'actor-keys': 3, 150 'actor-keys': 3,
151 'videos-views': 1, 151 'videos-views-stats': 1,
152 'activitypub-refresher': 1, 152 'activitypub-refresher': 1,
153 'video-redundancy': 1, 153 'video-redundancy': 1,
154 'video-live-ending': 1, 154 'video-live-ending': 1,
@@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
164 'video-file-import': 1, 164 'video-file-import': 1,
165 'email': 5, 165 'email': 5,
166 'actor-keys': 1, 166 'actor-keys': 1,
167 'videos-views': 1, 167 'videos-views-stats': 1,
168 'activitypub-refresher': 1, 168 'activitypub-refresher': 1,
169 'video-redundancy': 1, 169 'video-redundancy': 1,
170 'video-live-ending': 10, 170 'video-live-ending': 10,
@@ -181,14 +181,14 @@ const JOB_TTL: { [id in JobType]: number } = {
181 'video-import': 1000 * 3600 * 2, // 2 hours 181 'video-import': 1000 * 3600 * 2, // 2 hours
182 'email': 60000 * 10, // 10 minutes 182 'email': 60000 * 10, // 10 minutes
183 'actor-keys': 60000 * 20, // 20 minutes 183 'actor-keys': 60000 * 20, // 20 minutes
184 'videos-views': undefined, // Unlimited 184 'videos-views-stats': undefined, // Unlimited
185 'activitypub-refresher': 60000 * 10, // 10 minutes 185 'activitypub-refresher': 60000 * 10, // 10 minutes
186 'video-redundancy': 1000 * 3600 * 3, // 3 hours 186 'video-redundancy': 1000 * 3600 * 3, // 3 hours
187 'video-live-ending': 1000 * 60 * 10, // 10 minutes 187 'video-live-ending': 1000 * 60 * 10, // 10 minutes
188 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours 188 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
189} 189}
190const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 190const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
191 'videos-views': { 191 'videos-views-stats': {
192 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour 192 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
193 }, 193 },
194 'activitypub-cleaner': { 194 'activitypub-cleaner': {
@@ -211,6 +211,7 @@ const SCHEDULER_INTERVALS_MS = {
211 REMOVE_OLD_JOBS: 60000 * 60, // 1 hour 211 REMOVE_OLD_JOBS: 60000 * 60, // 1 hour
212 UPDATE_VIDEOS: 60000, // 1 minute 212 UPDATE_VIDEOS: 60000, // 1 minute
213 YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day 213 YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day
214 VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL,
214 CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, 215 CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
215 CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day 216 CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day
216 AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day 217 AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day
@@ -343,8 +344,8 @@ const CONSTRAINTS_FIELDS = {
343} 344}
344 345
345const VIEW_LIFETIME = { 346const VIEW_LIFETIME = {
346 VIDEO: 60000 * 60, // 1 hour 347 VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
347 LIVE: 60000 * 5 // 5 minutes 348 VIEWER: 60000 * 5 // 5 minutes
348} 349}
349 350
350let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour 351let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
@@ -789,13 +790,12 @@ if (isTestInstance() === true) {
789 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 790 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
790 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 791 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
791 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 792 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
792 REPEAT_JOBS['videos-views'] = { every: 5000 } 793 REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
793 REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } 794 REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
794 795
795 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 796 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
796 797
797 VIEW_LIFETIME.VIDEO = 1000 // 1 second 798 VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second
798 VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
799 CONTACT_FORM_LIFETIME = 1000 // 1 second 799 CONTACT_FORM_LIFETIME = 1000 // 1 second
800 800
801 JOB_ATTEMPTS['email'] = 1 801 JOB_ATTEMPTS['email'] = 1
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts
index 5593ee257..720385f9b 100644
--- a/server/lib/activitypub/process/process-view.ts
+++ b/server/lib/activitypub/process/process-view.ts
@@ -1,13 +1,13 @@
1import { getOrCreateAPVideo } from '../videos' 1import { VideoViews } from '@server/lib/video-views'
2import { forwardVideoRelatedActivity } from '../send/utils' 2import { ActivityView } from '../../../../shared/models/activitypub'
3import { Redis } from '../../redis'
4import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
5import { APProcessorOptions } from '../../../types/activitypub-processor.model' 3import { APProcessorOptions } from '../../../types/activitypub-processor.model'
6import { MActorSignature } from '../../../types/models' 4import { MActorSignature } from '../../../types/models'
7import { LiveManager } from '@server/lib/live/live-manager' 5import { forwardVideoRelatedActivity } from '../send/utils'
6import { getOrCreateAPVideo } from '../videos'
8 7
9async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { 8async function processViewActivity (options: APProcessorOptions<ActivityView>) {
10 const { activity, byActor } = options 9 const { activity, byActor } = options
10
11 return processCreateView(activity, byActor) 11 return processCreateView(activity, byActor)
12} 12}
13 13
@@ -19,10 +19,8 @@ export {
19 19
20// --------------------------------------------------------------------------- 20// ---------------------------------------------------------------------------
21 21
22async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { 22async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
23 const videoObject = activity.type === 'View' 23 const videoObject = activity.object
24 ? activity.object
25 : (activity.object as ViewObject).object
26 24
27 const { video } = await getOrCreateAPVideo({ 25 const { video } = await getOrCreateAPVideo({
28 videoObject, 26 videoObject,
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
30 allowRefresh: false 28 allowRefresh: false
31 }) 29 })
32 30
33 if (!video.isLive) { 31 const viewerExpires = activity.expires
34 await Redis.Instance.addVideoView(video.id) 32 ? new Date(activity.expires)
35 } 33 : undefined
36 34
37 if (video.isOwned()) { 35 await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
38 // Our live manager will increment the counter and send the view to followers
39 if (video.isLive) {
40 LiveManager.Instance.addViewTo(video.id)
41 return
42 }
43 36
37 if (video.isOwned()) {
44 // Forward the view but don't resend the activity to the sender 38 // Forward the view but don't resend the activity to the sender
45 const exceptions = [ byActor ] 39 const exceptions = [ byActor ]
46 await forwardVideoRelatedActivity(activity, undefined, exceptions, video) 40 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts
index 153e94295..b12583e26 100644
--- a/server/lib/activitypub/send/send-view.ts
+++ b/server/lib/activitypub/send/send-view.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { VideoViews } from '@server/lib/video-views'
2import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' 3import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
3import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' 4import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
4import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU
27 id: url, 28 id: url,
28 type: 'View' as 'View', 29 type: 'View' as 'View',
29 actor: byActor.url, 30 actor: byActor.url,
30 object: video.url 31 object: video.url,
32 expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
31 }, 33 },
32 audience 34 audience
33 ) 35 )
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts
index 157569414..f786bb196 100644
--- a/server/lib/activitypub/videos/updater.ts
+++ b/server/lib/activitypub/videos/updater.ts
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
81 81
82 if (videoUpdated.isLive) { 82 if (videoUpdated.isLive) {
83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) 83 PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
84 PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
85 } 84 }
86 85
87 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) 86 logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts
index 86d0a271f..caf5f6962 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views-stats.ts
@@ -1,11 +1,10 @@
1import { Redis } from '../../redis' 1import { isTestInstance } from '../../../helpers/core-utils'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video' 3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-view' 4import { VideoViewModel } from '../../../models/video/video-view'
5import { isTestInstance } from '../../../helpers/core-utils' 5import { Redis } from '../../redis'
6import { federateVideoIfNeeded } from '../../activitypub/videos'
7 6
8async function processVideosViews () { 7async function processVideosViewsStats () {
9 const lastHour = new Date() 8 const lastHour = new Date()
10 9
11 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour 10 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@@ -15,23 +14,23 @@ async function processVideosViews () {
15 const startDate = lastHour.setMinutes(0, 0, 0) 14 const startDate = lastHour.setMinutes(0, 0, 0)
16 const endDate = lastHour.setMinutes(59, 59, 999) 15 const endDate = lastHour.setMinutes(59, 59, 999)
17 16
18 const videoIds = await Redis.Instance.getVideosIdViewed(hour) 17 const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
19 if (videoIds.length === 0) return 18 if (videoIds.length === 0) return
20 19
21 logger.info('Processing videos views in job for hour %d.', hour) 20 logger.info('Processing videos views stats in job for hour %d.', hour)
22 21
23 for (const videoId of videoIds) { 22 for (const videoId of videoIds) {
24 try { 23 try {
25 const views = await Redis.Instance.getVideoViews(videoId, hour) 24 const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
26 await Redis.Instance.deleteVideoViews(videoId, hour) 25 await Redis.Instance.deleteVideoViewsStats(videoId, hour)
27 26
28 if (views) { 27 if (views) {
29 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 28 logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
30 29
31 try { 30 try {
32 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 31 const video = await VideoModel.load(videoId)
33 if (!video) { 32 if (!video) {
34 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) 33 logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
35 continue 34 continue
36 } 35 }
37 36
@@ -41,21 +40,12 @@ async function processVideosViews () {
41 views, 40 views,
42 videoId 41 videoId
43 }) 42 })
44
45 if (video.isOwned()) {
46 // If this is a remote video, the origin instance will send us an update
47 await VideoModel.incrementViews(videoId, views)
48
49 // Send video update
50 video.views += views
51 await federateVideoIfNeeded(video, false)
52 }
53 } catch (err) { 43 } catch (err) {
54 logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) 44 logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
55 } 45 }
56 } 46 }
57 } catch (err) { 47 } catch (err) {
58 logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) 48 logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
59 } 49 }
60 } 50 }
61} 51}
@@ -63,5 +53,5 @@ async function processVideosViews () {
63// --------------------------------------------------------------------------- 53// ---------------------------------------------------------------------------
64 54
65export { 55export {
66 processVideosViews 56 processVideosViewsStats
67} 57}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0eab720d9..4c1597b33 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import'
36import { processVideoImport } from './handlers/video-import' 36import { processVideoImport } from './handlers/video-import'
37import { processVideoLiveEnding } from './handlers/video-live-ending' 37import { processVideoLiveEnding } from './handlers/video-live-ending'
38import { processVideoTranscoding } from './handlers/video-transcoding' 38import { processVideoTranscoding } from './handlers/video-transcoding'
39import { processVideosViews } from './handlers/video-views' 39import { processVideosViewsStats } from './handlers/video-views-stats'
40 40
41type CreateJobArgument = 41type CreateJobArgument =
42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 42 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -49,7 +49,7 @@ type CreateJobArgument =
49 { type: 'email', payload: EmailPayload } | 49 { type: 'email', payload: EmailPayload } |
50 { type: 'video-import', payload: VideoImportPayload } | 50 { type: 'video-import', payload: VideoImportPayload } |
51 { type: 'activitypub-refresher', payload: RefreshPayload } | 51 { type: 'activitypub-refresher', payload: RefreshPayload } |
52 { type: 'videos-views', payload: {} } | 52 { type: 'videos-views-stats', payload: {} } |
53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } | 53 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
54 { type: 'actor-keys', payload: ActorKeysPayload } | 54 { type: 'actor-keys', payload: ActorKeysPayload } |
55 { type: 'video-redundancy', payload: VideoRedundancyPayload } | 55 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
71 'video-transcoding': processVideoTranscoding, 71 'video-transcoding': processVideoTranscoding,
72 'email': processEmail, 72 'email': processEmail,
73 'video-import': processVideoImport, 73 'video-import': processVideoImport,
74 'videos-views': processVideosViews, 74 'videos-views-stats': processVideosViewsStats,
75 'activitypub-refresher': refreshAPObject, 75 'activitypub-refresher': refreshAPObject,
76 'video-live-ending': processVideoLiveEnding, 76 'video-live-ending': processVideoLiveEnding,
77 'actor-keys': processActorKeys, 77 'actor-keys': processActorKeys,
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [
89 'video-transcoding', 89 'video-transcoding',
90 'video-file-import', 90 'video-file-import',
91 'video-import', 91 'video-import',
92 'videos-views', 92 'videos-views-stats',
93 'activitypub-refresher', 93 'activitypub-refresher',
94 'video-redundancy', 94 'video-redundancy',
95 'actor-keys', 95 'actor-keys',
@@ -247,8 +247,8 @@ class JobQueue {
247 } 247 }
248 248
249 private addRepeatableJobs () { 249 private addRepeatableJobs () {
250 this.queues['videos-views'].add({}, { 250 this.queues['videos-views-stats'].add({}, {
251 repeat: REPEAT_JOBS['videos-views'] 251 repeat: REPEAT_JOBS['videos-views-stats']
252 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 252 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
253 253
254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 254 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 1b7b9dd4d..2562edb75 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,7 +2,6 @@
2import { readFile } from 'fs-extra' 2import { readFile } from 'fs-extra'
3import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { isTestInstance } from '@server/helpers/core-utils'
6import { 5import {
7 computeResolutionsToTranscode, 6 computeResolutionsToTranscode,
8 ffprobePromise, 7 ffprobePromise,
@@ -12,7 +11,7 @@ import {
12} from '@server/helpers/ffprobe-utils' 11} from '@server/helpers/ffprobe-utils'
13import { logger, loggerTagsFactory } from '@server/helpers/logger' 12import { logger, loggerTagsFactory } from '@server/helpers/logger'
14import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 13import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
15import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' 14import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
16import { UserModel } from '@server/models/user/user' 15import { UserModel } from '@server/models/user/user'
17import { VideoModel } from '@server/models/video/video' 16import { VideoModel } from '@server/models/video/video'
18import { VideoLiveModel } from '@server/models/video/video-live' 17import { VideoLiveModel } from '@server/models/video/video-live'
@@ -53,8 +52,6 @@ class LiveManager {
53 52
54 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
55 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<number, string>()
56 // Values are Date().getTime()
57 private readonly watchersPerVideo = new Map<number, number[]>()
58 55
59 private rtmpServer: Server 56 private rtmpServer: Server
60 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -99,8 +96,6 @@ class LiveManager {
99 // Cleanup broken lives, that were terminated by a server restart for example 96 // Cleanup broken lives, that were terminated by a server restart for example
100 this.handleBrokenLives() 97 this.handleBrokenLives()
101 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) 98 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
102
103 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
104 } 99 }
105 100
106 async run () { 101 async run () {
@@ -184,19 +179,6 @@ class LiveManager {
184 this.abortSession(sessionId) 179 this.abortSession(sessionId)
185 } 180 }
186 181
187 addViewTo (videoId: number) {
188 if (this.videoSessions.has(videoId) === false) return
189
190 let watchers = this.watchersPerVideo.get(videoId)
191
192 if (!watchers) {
193 watchers = []
194 this.watchersPerVideo.set(videoId, watchers)
195 }
196
197 watchers.push(new Date().getTime())
198 }
199
200 private getContext () { 182 private getContext () {
201 return context 183 return context
202 } 184 }
@@ -377,7 +359,6 @@ class LiveManager {
377 } 359 }
378 360
379 private onMuxingFFmpegEnd (videoId: number) { 361 private onMuxingFFmpegEnd (videoId: number) {
380 this.watchersPerVideo.delete(videoId)
381 this.videoSessions.delete(videoId) 362 this.videoSessions.delete(videoId)
382 } 363 }
383 364
@@ -411,34 +392,6 @@ class LiveManager {
411 } 392 }
412 } 393 }
413 394
414 private async updateLiveViews () {
415 if (!this.isRunning()) return
416
417 if (!isTestInstance()) logger.info('Updating live video views.', lTags())
418
419 for (const videoId of this.watchersPerVideo.keys()) {
420 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
421
422 const watchers = this.watchersPerVideo.get(videoId)
423
424 const numWatchers = watchers.length
425
426 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
427 video.views = numWatchers
428 await video.save()
429
430 await federateVideoIfNeeded(video, false)
431
432 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
433
434 // Only keep not expired watchers
435 const newWatchers = watchers.filter(w => w > notBefore)
436 this.watchersPerVideo.set(videoId, newWatchers)
437
438 logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
439 }
440 }
441
442 private async handleBrokenLives () { 395 private async handleBrokenLives () {
443 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 396 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
444 397
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 901435dea..0398ca61d 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -1,7 +1,7 @@
1import { Server as HTTPServer } from 'http' 1import { Server as HTTPServer } from 'http'
2import { Namespace, Server as SocketServer, Socket } from 'socket.io' 2import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3import { isIdValid } from '@server/helpers/custom-validators/misc' 3import { isIdValid } from '@server/helpers/custom-validators/misc'
4import { MVideo } from '@server/types/models' 4import { MVideo, MVideoImmutable } from '@server/types/models'
5import { UserNotificationModelForApi } from '@server/types/models/user' 5import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' 6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger' 7import { logger } from '../helpers/logger'
@@ -78,11 +78,11 @@ class PeerTubeSocket {
78 .emit(type, data) 78 .emit(type, data)
79 } 79 }
80 80
81 sendVideoViewsUpdate (video: MVideo) { 81 sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
82 const data: LiveVideoEventPayload = { views: video.views } 82 const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
83 const type: LiveVideoEventType = 'views-change' 83 const type: LiveVideoEventType = 'views-change'
84 84
85 logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) 85 logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
86 86
87 this.liveVideosNamespace 87 this.liveVideosNamespace
88 .in(video.id) 88 .in(video.id)
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 46617b07e..76b7868e8 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -13,6 +13,7 @@ import {
13 RESUMABLE_UPLOAD_SESSION_LIFETIME 13 RESUMABLE_UPLOAD_SESSION_LIFETIME
14} from '../initializers/constants' 14} from '../initializers/constants'
15import { CONFIG } from '../initializers/config' 15import { CONFIG } from '../initializers/config'
16import { exists } from '@server/helpers/custom-validators/misc'
16 17
17type CachedRoute = { 18type CachedRoute = {
18 body: string 19 body: string
@@ -119,16 +120,20 @@ class Redis {
119 120
120 /* ************ Views per IP ************ */ 121 /* ************ Views per IP ************ */
121 122
122 setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { 123 setIPVideoView (ip: string, videoUUID: string) {
123 const lifetime = isLive 124 return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
124 ? VIEW_LIFETIME.LIVE 125 }
125 : VIEW_LIFETIME.VIDEO
126 126
127 return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) 127 setIPVideoViewer (ip: string, videoUUID: string) {
128 return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
128 } 129 }
129 130
130 async doesVideoIPViewExist (ip: string, videoUUID: string) { 131 async doesVideoIPViewExist (ip: string, videoUUID: string) {
131 return this.exists(this.generateViewKey(ip, videoUUID)) 132 return this.exists(this.generateIPViewKey(ip, videoUUID))
133 }
134
135 async doesVideoIPViewerExist (ip: string, videoUUID: string) {
136 return this.exists(this.generateIPViewerKey(ip, videoUUID))
132 } 137 }
133 138
134 /* ************ Tracker IP block ************ */ 139 /* ************ Tracker IP block ************ */
@@ -160,46 +165,85 @@ class Redis {
160 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) 165 return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
161 } 166 }
162 167
163 /* ************ Video views ************ */ 168 /* ************ Video views stats ************ */
164 169
165 addVideoView (videoId: number) { 170 addVideoViewStats (videoId: number) {
166 const keyIncr = this.generateVideoViewKey(videoId) 171 const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
167 const keySet = this.generateVideosViewKey()
168 172
169 return Promise.all([ 173 return Promise.all([
170 this.addToSet(keySet, videoId.toString()), 174 this.addToSet(setKey, videoId.toString()),
171 this.increment(keyIncr) 175 this.increment(videoKey)
172 ]) 176 ])
173 } 177 }
174 178
175 async getVideoViews (videoId: number, hour: number) { 179 async getVideoViewsStats (videoId: number, hour: number) {
176 const key = this.generateVideoViewKey(videoId, hour) 180 const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
177 181
178 const valueString = await this.getValue(key) 182 const valueString = await this.getValue(videoKey)
179 const valueInt = parseInt(valueString, 10) 183 const valueInt = parseInt(valueString, 10)
180 184
181 if (isNaN(valueInt)) { 185 if (isNaN(valueInt)) {
182 logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) 186 logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
183 return undefined 187 return undefined
184 } 188 }
185 189
186 return valueInt 190 return valueInt
187 } 191 }
188 192
189 async getVideosIdViewed (hour: number) { 193 async listVideosViewedForStats (hour: number) {
190 const key = this.generateVideosViewKey(hour) 194 const { setKey } = this.generateVideoViewStatsKeys({ hour })
191 195
192 const stringIds = await this.getSet(key) 196 const stringIds = await this.getSet(setKey)
193 return stringIds.map(s => parseInt(s, 10)) 197 return stringIds.map(s => parseInt(s, 10))
194 } 198 }
195 199
196 deleteVideoViews (videoId: number, hour: number) { 200 deleteVideoViewsStats (videoId: number, hour: number) {
197 const keySet = this.generateVideosViewKey(hour) 201 const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
198 const keyIncr = this.generateVideoViewKey(videoId, hour) 202
203 return Promise.all([
204 this.deleteFromSet(setKey, videoId.toString()),
205 this.deleteKey(videoKey)
206 ])
207 }
208
209 /* ************ Local video views buffer ************ */
210
211 addLocalVideoView (videoId: number) {
212 const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
199 213
200 return Promise.all([ 214 return Promise.all([
201 this.deleteFromSet(keySet, videoId.toString()), 215 this.addToSet(setKey, videoId.toString()),
202 this.deleteKey(keyIncr) 216 this.increment(videoKey)
217 ])
218 }
219
220 async getLocalVideoViews (videoId: number) {
221 const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
222
223 const valueString = await this.getValue(videoKey)
224 const valueInt = parseInt(valueString, 10)
225
226 if (isNaN(valueInt)) {
227 logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
228 return undefined
229 }
230
231 return valueInt
232 }
233
234 async listLocalVideosViewed () {
235 const { setKey } = this.generateLocalVideoViewsKeys()
236
237 const stringIds = await this.getSet(setKey)
238 return stringIds.map(s => parseInt(s, 10))
239 }
240
241 deleteLocalVideoViews (videoId: number) {
242 const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
243
244 return Promise.all([
245 this.deleteFromSet(setKey, videoId.toString()),
246 this.deleteKey(videoKey)
203 ]) 247 ])
204 } 248 }
205 249
@@ -233,16 +277,16 @@ class Redis {
233 return req.method + '-' + req.originalUrl 277 return req.method + '-' + req.originalUrl
234 } 278 }
235 279
236 private generateVideosViewKey (hour?: number) { 280 private generateLocalVideoViewsKeys (videoId?: Number) {
237 if (!hour) hour = new Date().getHours() 281 return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
238
239 return `videos-view-h${hour}`
240 } 282 }
241 283
242 private generateVideoViewKey (videoId: number, hour?: number) { 284 private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
243 if (hour === undefined || hour === null) hour = new Date().getHours() 285 const hour = exists(options.hour)
286 ? options.hour
287 : new Date().getHours()
244 288
245 return `video-view-${videoId}-h${hour}` 289 return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
246 } 290 }
247 291
248 private generateResetPasswordKey (userId: number) { 292 private generateResetPasswordKey (userId: number) {
@@ -253,10 +297,14 @@ class Redis {
253 return 'verify-email-' + userId 297 return 'verify-email-' + userId
254 } 298 }
255 299
256 private generateViewKey (ip: string, videoUUID: string) { 300 private generateIPViewKey (ip: string, videoUUID: string) {
257 return `views-${videoUUID}-${ip}` 301 return `views-${videoUUID}-${ip}`
258 } 302 }
259 303
304 private generateIPViewerKey (ip: string, videoUUID: string) {
305 return `viewer-${videoUUID}-${ip}`
306 }
307
260 private generateTrackerBlockIPKey (ip: string) { 308 private generateTrackerBlockIPKey (ip: string) {
261 return `tracker-block-ip-${ip}` 309 return `tracker-block-ip-${ip}`
262 } 310 }
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts
new file mode 100644
index 000000000..c0e72c461
--- /dev/null
+++ b/server/lib/schedulers/video-views-buffer-scheduler.ts
@@ -0,0 +1,52 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VideoModel } from '@server/models/video/video'
3import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { federateVideoIfNeeded } from '../activitypub/videos'
5import { Redis } from '../redis'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViewsBufferScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const videoIds = await Redis.Instance.listLocalVideosViewed()
22 if (videoIds.length === 0) return
23
24 logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
25
26 for (const videoId of videoIds) {
27 try {
28 const views = await Redis.Instance.getLocalVideoViews(videoId)
29 await Redis.Instance.deleteLocalVideoViews(videoId)
30
31 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
32 if (!video) {
33 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
34 continue
35 }
36
37 // If this is a remote video, the origin instance will send us an update
38 await VideoModel.incrementViews(videoId, views)
39
40 // Send video update
41 video.views += views
42 await federateVideoIfNeeded(video, false)
43 } catch (err) {
44 logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
45 }
46 }
47 }
48
49 static get Instance () {
50 return this.instance || (this.instance = new this())
51 }
52}
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts
new file mode 100644
index 000000000..220b509c2
--- /dev/null
+++ b/server/lib/video-views.ts
@@ -0,0 +1,130 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VIEW_LIFETIME } from '@server/initializers/constants'
3import { VideoModel } from '@server/models/video/video'
4import { MVideo } from '@server/types/models'
5import { PeerTubeSocket } from './peertube-socket'
6import { Redis } from './redis'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViews {
11
12 // Values are Date().getTime()
13 private readonly viewersPerVideo = new Map<number, number[]>()
14
15 private static instance: VideoViews
16
17 private constructor () {
18 }
19
20 init () {
21 setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
22 }
23
24 async processView (options: {
25 video: MVideo
26 ip: string | null
27 viewerExpires?: Date
28 }) {
29 const { video, ip, viewerExpires } = options
30
31 logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
32
33 let success = await this.addView(video, ip)
34
35 if (video.isLive) {
36 const successViewer = await this.addViewer(video, ip, viewerExpires)
37 success ||= successViewer
38 }
39
40 return success
41 }
42
43 getViewers (video: MVideo) {
44 const viewers = this.viewersPerVideo.get(video.id)
45 if (!viewers) return 0
46
47 return viewers.length
48 }
49
50 buildViewerExpireTime () {
51 return new Date().getTime() + VIEW_LIFETIME.VIEWER
52 }
53
54 private async addView (video: MVideo, ip: string | null) {
55 const promises: Promise<any>[] = []
56
57 if (ip !== null) {
58 const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
59 if (viewExists) return false
60
61 promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
62 }
63
64 if (video.isOwned()) {
65 promises.push(Redis.Instance.addLocalVideoView(video.id))
66 }
67
68 promises.push(Redis.Instance.addVideoViewStats(video.id))
69
70 await Promise.all(promises)
71
72 return true
73 }
74
75 private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
76 if (ip !== null) {
77 const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
78 if (viewExists) return false
79
80 await Redis.Instance.setIPVideoViewer(ip, video.uuid)
81 }
82
83 let watchers = this.viewersPerVideo.get(video.id)
84
85 if (!watchers) {
86 watchers = []
87 this.viewersPerVideo.set(video.id, watchers)
88 }
89
90 const expiration = viewerExpires
91 ? viewerExpires.getTime()
92 : this.buildViewerExpireTime()
93
94 watchers.push(expiration)
95 await this.notifyClients(video.id, watchers.length)
96
97 return true
98 }
99
100 private async cleanViewers () {
101 logger.info('Cleaning video viewers.', lTags())
102
103 for (const videoId of this.viewersPerVideo.keys()) {
104 const notBefore = new Date().getTime()
105
106 const viewers = this.viewersPerVideo.get(videoId)
107
108 // Only keep not expired viewers
109 const newViewers = viewers.filter(w => w > notBefore)
110
111 if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
112 else this.viewersPerVideo.set(videoId, newViewers)
113
114 await this.notifyClients(videoId, newViewers.length)
115 }
116 }
117
118 private async notifyClients (videoId: string | number, viewersLength: number) {
119 const video = await VideoModel.loadImmutableAttributes(videoId)
120 if (!video) return
121
122 PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
123
124 logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
125 }
126
127 static get Instance () {
128 return this.instance || (this.instance = new this())
129 }
130}
diff --git a/server/models/video/formatter/video-format-utils.ts b/server/models/video/formatter/video-format-utils.ts
index ba49e41ae..461e296df 100644
--- a/server/models/video/formatter/video-format-utils.ts
+++ b/server/models/video/formatter/video-format-utils.ts
@@ -1,6 +1,7 @@
1import { uuidToShort } from '@server/helpers/uuid' 1import { uuidToShort } from '@server/helpers/uuid'
2import { generateMagnetUri } from '@server/helpers/webtorrent' 2import { generateMagnetUri } from '@server/helpers/webtorrent'
3import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' 3import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls'
4import { VideoViews } from '@server/lib/video-views'
4import { VideosCommonQueryAfterSanitize } from '@shared/models' 5import { VideosCommonQueryAfterSanitize } from '@shared/models'
5import { VideoFile } from '@shared/models/videos/video-file.model' 6import { VideoFile } from '@shared/models/videos/video-file.model'
6import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' 7import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects'
@@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm
121 pluginData: (video as any).pluginData 122 pluginData: (video as any).pluginData
122 } 123 }
123 124
125 if (video.isLive) {
126 videoObject.viewers = VideoViews.Instance.getViewers(video)
127 }
128
124 const add = options.additionalAttributes 129 const add = options.additionalAttributes
125 if (add?.state === true) { 130 if (add?.state === true) {
126 videoObject.state = { 131 videoObject.state = {
diff --git a/server/tests/api/live/live-views.ts b/server/tests/api/live/live-views.ts
index 5e3a79c64..9186af8e7 100644
--- a/server/tests/api/live/live-views.ts
+++ b/server/tests/api/live/live-views.ts
@@ -19,7 +19,7 @@ import {
19 19
20const expect = chai.expect 20const expect = chai.expect
21 21
22describe('Test live', function () { 22describe('Live views', function () {
23 let servers: PeerTubeServer[] = [] 23 let servers: PeerTubeServer[] = []
24 24
25 before(async function () { 25 before(async function () {
@@ -47,79 +47,86 @@ describe('Test live', function () {
47 await doubleFollow(servers[0], servers[1]) 47 await doubleFollow(servers[0], servers[1])
48 }) 48 })
49 49
50 describe('Live views', function () { 50 let liveVideoId: string
51 let liveVideoId: string 51 let command: FfmpegCommand
52 let command: FfmpegCommand
53 52
54 async function countViews (expected: number) { 53 async function countViewers (expectedViewers: number) {
55 for (const server of servers) { 54 for (const server of servers) {
56 const video = await server.videos.get({ id: liveVideoId }) 55 const video = await server.videos.get({ id: liveVideoId })
57 expect(video.views).to.equal(expected) 56 expect(video.viewers).to.equal(expectedViewers)
58 }
59 } 57 }
58 }
60 59
61 before(async function () { 60 async function countViews (expectedViews: number) {
62 this.timeout(30000) 61 for (const server of servers) {
62 const video = await server.videos.get({ id: liveVideoId })
63 expect(video.views).to.equal(expectedViews)
64 }
65 }
63 66
64 const liveAttributes = { 67 before(async function () {
65 name: 'live video', 68 this.timeout(30000)
66 channelId: servers[0].store.channel.id,
67 privacy: VideoPrivacy.PUBLIC
68 }
69 69
70 const live = await servers[0].live.create({ fields: liveAttributes }) 70 const liveAttributes = {
71 liveVideoId = live.uuid 71 name: 'live video',
72 channelId: servers[0].store.channel.id,
73 privacy: VideoPrivacy.PUBLIC
74 }
72 75
73 command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) 76 const live = await servers[0].live.create({ fields: liveAttributes })
74 await waitUntilLivePublishedOnAllServers(servers, liveVideoId) 77 liveVideoId = live.uuid
75 await waitJobs(servers)
76 })
77 78
78 it('Should display no views for a live', async function () { 79 command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
79 await countViews(0) 80 await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
80 }) 81 await waitJobs(servers)
82 })
81 83
82 it('Should view a live twice and display 1 view', async function () { 84 it('Should display no views and viewers for a live', async function () {
83 this.timeout(30000) 85 await countViews(0)
86 await countViewers(0)
87 })
84 88
85 await servers[0].videos.view({ id: liveVideoId }) 89 it('Should view a live twice and display 1 view/viewer', async function () {
86 await servers[0].videos.view({ id: liveVideoId }) 90 this.timeout(30000)
87 91
88 await wait(7000) 92 await servers[0].videos.view({ id: liveVideoId })
93 await servers[0].videos.view({ id: liveVideoId })
89 94
90 await waitJobs(servers) 95 await waitJobs(servers)
96 await countViewers(1)
91 97
92 await countViews(1) 98 await wait(7000)
93 }) 99 await countViews(1)
100 })
94 101
95 it('Should wait and display 0 views', async function () { 102 it('Should wait and display 0 viewers while still have 1 view', async function () {
96 this.timeout(30000) 103 this.timeout(30000)
97 104
98 await wait(12000) 105 await wait(12000)
99 await waitJobs(servers) 106 await waitJobs(servers)
100 107
101 await countViews(0) 108 await countViews(1)
102 }) 109 await countViewers(0)
110 })
103 111
104 it('Should view a live on a remote and on local and display 2 views', async function () { 112 it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () {
105 this.timeout(30000) 113 this.timeout(30000)
106 114
107 await servers[0].videos.view({ id: liveVideoId }) 115 await servers[0].videos.view({ id: liveVideoId })
108 await servers[1].videos.view({ id: liveVideoId }) 116 await servers[1].videos.view({ id: liveVideoId })
109 await servers[1].videos.view({ id: liveVideoId }) 117 await servers[1].videos.view({ id: liveVideoId })
118 await waitJobs(servers)
110 119
111 await wait(7000) 120 await countViewers(2)
112 await waitJobs(servers)
113 121
114 await countViews(2) 122 await wait(7000)
115 }) 123 await waitJobs(servers)
116 124
117 after(async function () { 125 await countViews(3)
118 await stopFfmpeg(command)
119 })
120 }) 126 })
121 127
122 after(async function () { 128 after(async function () {
129 await stopFfmpeg(command)
123 await cleanupTests(servers) 130 await cleanupTests(servers)
124 }) 131 })
125}) 132})
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index 8c4e01226..5d946f5e8 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -56,7 +56,7 @@ describe('Test jobs', function () {
56 56
57 let job = body.data[0] 57 let job = body.data[0]
58 // Skip repeat jobs 58 // Skip repeat jobs
59 if (job.type === 'videos-views') job = body.data[1] 59 if (job.type === 'videos-views-stats') job = body.data[1]
60 60
61 expect(job.state).to.equal('completed') 61 expect(job.state).to.equal('completed')
62 expect(job.type.startsWith('activitypub-')).to.be.true 62 expect(job.type.startsWith('activitypub-')).to.be.true