diff options
author | Chocobozzz <me@florianbigard.com> | 2021-11-09 10:11:20 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2021-11-09 15:00:31 +0100 |
commit | 51353d9a035fb6b81f903a8b5f391292841649fd (patch) | |
tree | 75acb6eea5e043bf2e15a6a5a92e9a3c5967b156 /server | |
parent | 221ee1adc916684d4881d2a9c4c01954dcde986e (diff) | |
download | PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.gz PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.zst PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.zip |
Refactor video views
Introduce viewers attribute for live videos
Count views for live videos
Reduce delay to see the viewer update for lives
Add ability to configure video views buffer interval and view ip
expiration
Diffstat (limited to 'server')
-rw-r--r-- | server/controllers/api/videos/index.ts | 44 | ||||
-rw-r--r-- | server/initializers/checker-before-init.ts | 2 | ||||
-rw-r--r-- | server/initializers/config.ts | 4 | ||||
-rw-r--r-- | server/initializers/constants.ts | 20 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-view.ts | 32 | ||||
-rw-r--r-- | server/lib/activitypub/send/send-view.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/videos/updater.ts | 1 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts) | 36 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 12 | ||||
-rw-r--r-- | server/lib/live/live-manager.ts | 49 | ||||
-rw-r--r-- | server/lib/peertube-socket.ts | 8 | ||||
-rw-r--r-- | server/lib/redis.ts | 112 | ||||
-rw-r--r-- | server/lib/schedulers/video-views-buffer-scheduler.ts | 52 | ||||
-rw-r--r-- | server/lib/video-views.ts | 130 | ||||
-rw-r--r-- | server/models/video/formatter/video-format-utils.ts | 5 | ||||
-rw-r--r-- | server/tests/api/live/live-views.ts | 109 | ||||
-rw-r--r-- | server/tests/api/server/jobs.ts | 2 |
17 files changed, 388 insertions, 234 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index 821161c64..72b382595 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts | |||
@@ -2,7 +2,7 @@ import express from 'express' | |||
2 | import toInt from 'validator/lib/toInt' | 2 | import toInt from 'validator/lib/toInt' |
3 | import { pickCommonVideoQuery } from '@server/helpers/query' | 3 | import { pickCommonVideoQuery } from '@server/helpers/query' |
4 | import { doJSONRequest } from '@server/helpers/requests' | 4 | import { doJSONRequest } from '@server/helpers/requests' |
5 | import { LiveManager } from '@server/lib/live' | 5 | import { VideoViews } from '@server/lib/video-views' |
6 | import { openapiOperationDoc } from '@server/middlewares/doc' | 6 | import { openapiOperationDoc } from '@server/middlewares/doc' |
7 | import { getServerActor } from '@server/models/application/application' | 7 | import { getServerActor } from '@server/models/application/application' |
8 | import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' | 8 | import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' |
@@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database' | |||
17 | import { sendView } from '../../../lib/activitypub/send/send-view' | 17 | import { sendView } from '../../../lib/activitypub/send/send-view' |
18 | import { JobQueue } from '../../../lib/job-queue' | 18 | import { JobQueue } from '../../../lib/job-queue' |
19 | import { Hooks } from '../../../lib/plugins/hooks' | 19 | import { Hooks } from '../../../lib/plugins/hooks' |
20 | import { Redis } from '../../../lib/redis' | ||
21 | import { | 20 | import { |
22 | asyncMiddleware, | 21 | asyncMiddleware, |
23 | asyncRetryTransactionMiddleware, | 22 | asyncRetryTransactionMiddleware, |
@@ -107,7 +106,7 @@ videosRouter.get('/:id', | |||
107 | ) | 106 | ) |
108 | videosRouter.post('/:id/views', | 107 | videosRouter.post('/:id/views', |
109 | openapiOperationDoc({ operationId: 'addView' }), | 108 | openapiOperationDoc({ operationId: 'addView' }), |
110 | asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')), | 109 | asyncMiddleware(videosCustomGetValidator('only-video')), |
111 | asyncMiddleware(viewVideo) | 110 | asyncMiddleware(viewVideo) |
112 | ) | 111 | ) |
113 | 112 | ||
@@ -153,44 +152,17 @@ function getVideo (_req: express.Request, res: express.Response) { | |||
153 | } | 152 | } |
154 | 153 | ||
155 | async function viewVideo (req: express.Request, res: express.Response) { | 154 | async function viewVideo (req: express.Request, res: express.Response) { |
156 | const immutableVideoAttrs = res.locals.onlyImmutableVideo | 155 | const video = res.locals.onlyVideo |
157 | 156 | ||
158 | const ip = req.ip | 157 | const ip = req.ip |
159 | const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) | 158 | const success = await VideoViews.Instance.processView({ video, ip }) |
160 | if (exists) { | ||
161 | logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid) | ||
162 | return res.status(HttpStatusCode.NO_CONTENT_204).end() | ||
163 | } | ||
164 | |||
165 | const video = await VideoModel.load(immutableVideoAttrs.id) | ||
166 | |||
167 | const promises: Promise<any>[] = [ | ||
168 | Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive) | ||
169 | ] | ||
170 | |||
171 | let federateView = true | ||
172 | |||
173 | // Increment our live manager | ||
174 | if (video.isLive && video.isOwned()) { | ||
175 | LiveManager.Instance.addViewTo(video.id) | ||
176 | |||
177 | // Views of our local live will be sent by our live manager | ||
178 | federateView = false | ||
179 | } | ||
180 | |||
181 | // Increment our video views cache counter | ||
182 | if (!video.isLive) { | ||
183 | promises.push(Redis.Instance.addVideoView(video.id)) | ||
184 | } | ||
185 | 159 | ||
186 | if (federateView) { | 160 | if (success) { |
187 | const serverActor = await getServerActor() | 161 | const serverActor = await getServerActor() |
188 | promises.push(sendView(serverActor, video, undefined)) | 162 | await sendView(serverActor, video, undefined) |
189 | } | ||
190 | |||
191 | await Promise.all(promises) | ||
192 | 163 | ||
193 | Hooks.runAction('action:api.video.viewed', { video, ip }) | 164 | Hooks.runAction('action:api.video.viewed', { video: video, ip }) |
165 | } | ||
194 | 166 | ||
195 | return res.status(HttpStatusCode.NO_CONTENT_204).end() | 167 | return res.status(HttpStatusCode.NO_CONTENT_204).end() |
196 | } | 168 | } |
diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts index 1015c5e45..51c396548 100644 --- a/server/initializers/checker-before-init.ts +++ b/server/initializers/checker-before-init.ts | |||
@@ -38,7 +38,7 @@ function checkMissedConfig () { | |||
38 | 'services.twitter.username', 'services.twitter.whitelisted', | 38 | 'services.twitter.username', 'services.twitter.whitelisted', |
39 | 'followers.instance.enabled', 'followers.instance.manual_approval', | 39 | 'followers.instance.enabled', 'followers.instance.manual_approval', |
40 | 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', | 40 | 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', |
41 | 'history.videos.max_age', 'views.videos.remote.max_age', | 41 | 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration', |
42 | 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', | 42 | 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', |
43 | 'theme.default', | 43 | 'theme.default', |
44 | 'remote_redundancy.videos.accept_from', | 44 | 'remote_redundancy.videos.accept_from', |
diff --git a/server/initializers/config.ts b/server/initializers/config.ts index 1288768d8..dadda2a77 100644 --- a/server/initializers/config.ts +++ b/server/initializers/config.ts | |||
@@ -182,7 +182,9 @@ const CONFIG = { | |||
182 | VIDEOS: { | 182 | VIDEOS: { |
183 | REMOTE: { | 183 | REMOTE: { |
184 | MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) | 184 | MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) |
185 | } | 185 | }, |
186 | LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')), | ||
187 | IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration')) | ||
186 | } | 188 | } |
187 | }, | 189 | }, |
188 | PLUGINS: { | 190 | PLUGINS: { |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 845576667..b65741bbd 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
148 | 'video-import': 1, | 148 | 'video-import': 1, |
149 | 'email': 5, | 149 | 'email': 5, |
150 | 'actor-keys': 3, | 150 | 'actor-keys': 3, |
151 | 'videos-views': 1, | 151 | 'videos-views-stats': 1, |
152 | 'activitypub-refresher': 1, | 152 | 'activitypub-refresher': 1, |
153 | 'video-redundancy': 1, | 153 | 'video-redundancy': 1, |
154 | 'video-live-ending': 1, | 154 | 'video-live-ending': 1, |
@@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im | |||
164 | 'video-file-import': 1, | 164 | 'video-file-import': 1, |
165 | 'email': 5, | 165 | 'email': 5, |
166 | 'actor-keys': 1, | 166 | 'actor-keys': 1, |
167 | 'videos-views': 1, | 167 | 'videos-views-stats': 1, |
168 | 'activitypub-refresher': 1, | 168 | 'activitypub-refresher': 1, |
169 | 'video-redundancy': 1, | 169 | 'video-redundancy': 1, |
170 | 'video-live-ending': 10, | 170 | 'video-live-ending': 10, |
@@ -181,14 +181,14 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
181 | 'video-import': 1000 * 3600 * 2, // 2 hours | 181 | 'video-import': 1000 * 3600 * 2, // 2 hours |
182 | 'email': 60000 * 10, // 10 minutes | 182 | 'email': 60000 * 10, // 10 minutes |
183 | 'actor-keys': 60000 * 20, // 20 minutes | 183 | 'actor-keys': 60000 * 20, // 20 minutes |
184 | 'videos-views': undefined, // Unlimited | 184 | 'videos-views-stats': undefined, // Unlimited |
185 | 'activitypub-refresher': 60000 * 10, // 10 minutes | 185 | 'activitypub-refresher': 60000 * 10, // 10 minutes |
186 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours | 186 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours |
187 | 'video-live-ending': 1000 * 60 * 10, // 10 minutes | 187 | 'video-live-ending': 1000 * 60 * 10, // 10 minutes |
188 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours | 188 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours |
189 | } | 189 | } |
190 | const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | 190 | const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = { |
191 | 'videos-views': { | 191 | 'videos-views-stats': { |
192 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour | 192 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour |
193 | }, | 193 | }, |
194 | 'activitypub-cleaner': { | 194 | 'activitypub-cleaner': { |
@@ -211,6 +211,7 @@ const SCHEDULER_INTERVALS_MS = { | |||
211 | REMOVE_OLD_JOBS: 60000 * 60, // 1 hour | 211 | REMOVE_OLD_JOBS: 60000 * 60, // 1 hour |
212 | UPDATE_VIDEOS: 60000, // 1 minute | 212 | UPDATE_VIDEOS: 60000, // 1 minute |
213 | YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day | 213 | YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day |
214 | VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL, | ||
214 | CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, | 215 | CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, |
215 | CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day | 216 | CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day |
216 | AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day | 217 | AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day |
@@ -343,8 +344,8 @@ const CONSTRAINTS_FIELDS = { | |||
343 | } | 344 | } |
344 | 345 | ||
345 | const VIEW_LIFETIME = { | 346 | const VIEW_LIFETIME = { |
346 | VIDEO: 60000 * 60, // 1 hour | 347 | VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION, |
347 | LIVE: 60000 * 5 // 5 minutes | 348 | VIEWER: 60000 * 5 // 5 minutes |
348 | } | 349 | } |
349 | 350 | ||
350 | let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour | 351 | let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour |
@@ -789,13 +790,12 @@ if (isTestInstance() === true) { | |||
789 | SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 | 790 | SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 |
790 | SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 | 791 | SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 |
791 | SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 | 792 | SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 |
792 | REPEAT_JOBS['videos-views'] = { every: 5000 } | 793 | REPEAT_JOBS['videos-views-stats'] = { every: 5000 } |
793 | REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } | 794 | REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } |
794 | 795 | ||
795 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 | 796 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 |
796 | 797 | ||
797 | VIEW_LIFETIME.VIDEO = 1000 // 1 second | 798 | VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second |
798 | VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second | ||
799 | CONTACT_FORM_LIFETIME = 1000 // 1 second | 799 | CONTACT_FORM_LIFETIME = 1000 // 1 second |
800 | 800 | ||
801 | JOB_ATTEMPTS['email'] = 1 | 801 | JOB_ATTEMPTS['email'] = 1 |
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index 5593ee257..720385f9b 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { getOrCreateAPVideo } from '../videos' | 1 | import { VideoViews } from '@server/lib/video-views' |
2 | import { forwardVideoRelatedActivity } from '../send/utils' | 2 | import { ActivityView } from '../../../../shared/models/activitypub' |
3 | import { Redis } from '../../redis' | ||
4 | import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' | ||
5 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' | 3 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' |
6 | import { MActorSignature } from '../../../types/models' | 4 | import { MActorSignature } from '../../../types/models' |
7 | import { LiveManager } from '@server/lib/live/live-manager' | 5 | import { forwardVideoRelatedActivity } from '../send/utils' |
6 | import { getOrCreateAPVideo } from '../videos' | ||
8 | 7 | ||
9 | async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { | 8 | async function processViewActivity (options: APProcessorOptions<ActivityView>) { |
10 | const { activity, byActor } = options | 9 | const { activity, byActor } = options |
10 | |||
11 | return processCreateView(activity, byActor) | 11 | return processCreateView(activity, byActor) |
12 | } | 12 | } |
13 | 13 | ||
@@ -19,10 +19,8 @@ export { | |||
19 | 19 | ||
20 | // --------------------------------------------------------------------------- | 20 | // --------------------------------------------------------------------------- |
21 | 21 | ||
22 | async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { | 22 | async function processCreateView (activity: ActivityView, byActor: MActorSignature) { |
23 | const videoObject = activity.type === 'View' | 23 | const videoObject = activity.object |
24 | ? activity.object | ||
25 | : (activity.object as ViewObject).object | ||
26 | 24 | ||
27 | const { video } = await getOrCreateAPVideo({ | 25 | const { video } = await getOrCreateAPVideo({ |
28 | videoObject, | 26 | videoObject, |
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct | |||
30 | allowRefresh: false | 28 | allowRefresh: false |
31 | }) | 29 | }) |
32 | 30 | ||
33 | if (!video.isLive) { | 31 | const viewerExpires = activity.expires |
34 | await Redis.Instance.addVideoView(video.id) | 32 | ? new Date(activity.expires) |
35 | } | 33 | : undefined |
36 | 34 | ||
37 | if (video.isOwned()) { | 35 | await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) |
38 | // Our live manager will increment the counter and send the view to followers | ||
39 | if (video.isLive) { | ||
40 | LiveManager.Instance.addViewTo(video.id) | ||
41 | return | ||
42 | } | ||
43 | 36 | ||
37 | if (video.isOwned()) { | ||
44 | // Forward the view but don't resend the activity to the sender | 38 | // Forward the view but don't resend the activity to the sender |
45 | const exceptions = [ byActor ] | 39 | const exceptions = [ byActor ] |
46 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) | 40 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 153e94295..b12583e26 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { VideoViews } from '@server/lib/video-views' | ||
2 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' | 3 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' |
3 | import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' | 4 | import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' |
4 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU | |||
27 | id: url, | 28 | id: url, |
28 | type: 'View' as 'View', | 29 | type: 'View' as 'View', |
29 | actor: byActor.url, | 30 | actor: byActor.url, |
30 | object: video.url | 31 | object: video.url, |
32 | expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() | ||
31 | }, | 33 | }, |
32 | audience | 34 | audience |
33 | ) | 35 | ) |
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts index 157569414..f786bb196 100644 --- a/server/lib/activitypub/videos/updater.ts +++ b/server/lib/activitypub/videos/updater.ts | |||
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder { | |||
81 | 81 | ||
82 | if (videoUpdated.isLive) { | 82 | if (videoUpdated.isLive) { |
83 | PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) | 83 | PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) |
84 | PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated) | ||
85 | } | 84 | } |
86 | 85 | ||
87 | logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) | 86 | logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts index 86d0a271f..caf5f6962 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { Redis } from '../../redis' | 1 | import { isTestInstance } from '../../../helpers/core-utils' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { VideoModel } from '../../../models/video/video' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-view' | 4 | import { VideoViewModel } from '../../../models/video/video-view' |
5 | import { isTestInstance } from '../../../helpers/core-utils' | 5 | import { Redis } from '../../redis' |
6 | import { federateVideoIfNeeded } from '../../activitypub/videos' | ||
7 | 6 | ||
8 | async function processVideosViews () { | 7 | async function processVideosViewsStats () { |
9 | const lastHour = new Date() | 8 | const lastHour = new Date() |
10 | 9 | ||
11 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour | 10 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour |
@@ -15,23 +14,23 @@ async function processVideosViews () { | |||
15 | const startDate = lastHour.setMinutes(0, 0, 0) | 14 | const startDate = lastHour.setMinutes(0, 0, 0) |
16 | const endDate = lastHour.setMinutes(59, 59, 999) | 15 | const endDate = lastHour.setMinutes(59, 59, 999) |
17 | 16 | ||
18 | const videoIds = await Redis.Instance.getVideosIdViewed(hour) | 17 | const videoIds = await Redis.Instance.listVideosViewedForStats(hour) |
19 | if (videoIds.length === 0) return | 18 | if (videoIds.length === 0) return |
20 | 19 | ||
21 | logger.info('Processing videos views in job for hour %d.', hour) | 20 | logger.info('Processing videos views stats in job for hour %d.', hour) |
22 | 21 | ||
23 | for (const videoId of videoIds) { | 22 | for (const videoId of videoIds) { |
24 | try { | 23 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 24 | const views = await Redis.Instance.getVideoViewsStats(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | 25 | await Redis.Instance.deleteVideoViewsStats(videoId, hour) |
27 | 26 | ||
28 | if (views) { | 27 | if (views) { |
29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 28 | logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) |
30 | 29 | ||
31 | try { | 30 | try { |
32 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | 31 | const video = await VideoModel.load(videoId) |
33 | if (!video) { | 32 | if (!video) { |
34 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) | 33 | logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) |
35 | continue | 34 | continue |
36 | } | 35 | } |
37 | 36 | ||
@@ -41,21 +40,12 @@ async function processVideosViews () { | |||
41 | views, | 40 | views, |
42 | videoId | 41 | videoId |
43 | }) | 42 | }) |
44 | |||
45 | if (video.isOwned()) { | ||
46 | // If this is a remote video, the origin instance will send us an update | ||
47 | await VideoModel.incrementViews(videoId, views) | ||
48 | |||
49 | // Send video update | ||
50 | video.views += views | ||
51 | await federateVideoIfNeeded(video, false) | ||
52 | } | ||
53 | } catch (err) { | 43 | } catch (err) { |
54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 44 | logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) |
55 | } | 45 | } |
56 | } | 46 | } |
57 | } catch (err) { | 47 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 48 | logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 49 | } |
60 | } | 50 | } |
61 | } | 51 | } |
@@ -63,5 +53,5 @@ async function processVideosViews () { | |||
63 | // --------------------------------------------------------------------------- | 53 | // --------------------------------------------------------------------------- |
64 | 54 | ||
65 | export { | 55 | export { |
66 | processVideosViews | 56 | processVideosViewsStats |
67 | } | 57 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0eab720d9..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import' | |||
36 | import { processVideoImport } from './handlers/video-import' | 36 | import { processVideoImport } from './handlers/video-import' |
37 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 37 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
38 | import { processVideoTranscoding } from './handlers/video-transcoding' | 38 | import { processVideoTranscoding } from './handlers/video-transcoding' |
39 | import { processVideosViews } from './handlers/video-views' | 39 | import { processVideosViewsStats } from './handlers/video-views-stats' |
40 | 40 | ||
41 | type CreateJobArgument = | 41 | type CreateJobArgument = |
42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -49,7 +49,7 @@ type CreateJobArgument = | |||
49 | { type: 'email', payload: EmailPayload } | | 49 | { type: 'email', payload: EmailPayload } | |
50 | { type: 'video-import', payload: VideoImportPayload } | | 50 | { type: 'video-import', payload: VideoImportPayload } | |
51 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 51 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
52 | { type: 'videos-views', payload: {} } | | 52 | { type: 'videos-views-stats', payload: {} } | |
53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | 53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | |
54 | { type: 'actor-keys', payload: ActorKeysPayload } | | 54 | { type: 'actor-keys', payload: ActorKeysPayload } | |
55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | 55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
71 | 'video-transcoding': processVideoTranscoding, | 71 | 'video-transcoding': processVideoTranscoding, |
72 | 'email': processEmail, | 72 | 'email': processEmail, |
73 | 'video-import': processVideoImport, | 73 | 'video-import': processVideoImport, |
74 | 'videos-views': processVideosViews, | 74 | 'videos-views-stats': processVideosViewsStats, |
75 | 'activitypub-refresher': refreshAPObject, | 75 | 'activitypub-refresher': refreshAPObject, |
76 | 'video-live-ending': processVideoLiveEnding, | 76 | 'video-live-ending': processVideoLiveEnding, |
77 | 'actor-keys': processActorKeys, | 77 | 'actor-keys': processActorKeys, |
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [ | |||
89 | 'video-transcoding', | 89 | 'video-transcoding', |
90 | 'video-file-import', | 90 | 'video-file-import', |
91 | 'video-import', | 91 | 'video-import', |
92 | 'videos-views', | 92 | 'videos-views-stats', |
93 | 'activitypub-refresher', | 93 | 'activitypub-refresher', |
94 | 'video-redundancy', | 94 | 'video-redundancy', |
95 | 'actor-keys', | 95 | 'actor-keys', |
@@ -247,8 +247,8 @@ class JobQueue { | |||
247 | } | 247 | } |
248 | 248 | ||
249 | private addRepeatableJobs () { | 249 | private addRepeatableJobs () { |
250 | this.queues['videos-views'].add({}, { | 250 | this.queues['videos-views-stats'].add({}, { |
251 | repeat: REPEAT_JOBS['videos-views'] | 251 | repeat: REPEAT_JOBS['videos-views-stats'] |
252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
253 | 253 | ||
254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1b7b9dd4d..2562edb75 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -2,7 +2,6 @@ | |||
2 | import { readFile } from 'fs-extra' | 2 | import { readFile } from 'fs-extra' |
3 | import { createServer, Server } from 'net' | 3 | import { createServer, Server } from 'net' |
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | 4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' |
5 | import { isTestInstance } from '@server/helpers/core-utils' | ||
6 | import { | 5 | import { |
7 | computeResolutionsToTranscode, | 6 | computeResolutionsToTranscode, |
8 | ffprobePromise, | 7 | ffprobePromise, |
@@ -12,7 +11,7 @@ import { | |||
12 | } from '@server/helpers/ffprobe-utils' | 11 | } from '@server/helpers/ffprobe-utils' |
13 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 12 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
14 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 13 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
15 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' | 14 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' |
16 | import { UserModel } from '@server/models/user/user' | 15 | import { UserModel } from '@server/models/user/user' |
17 | import { VideoModel } from '@server/models/video/video' | 16 | import { VideoModel } from '@server/models/video/video' |
18 | import { VideoLiveModel } from '@server/models/video/video-live' | 17 | import { VideoLiveModel } from '@server/models/video/video-live' |
@@ -53,8 +52,6 @@ class LiveManager { | |||
53 | 52 | ||
54 | private readonly muxingSessions = new Map<string, MuxingSession>() | 53 | private readonly muxingSessions = new Map<string, MuxingSession>() |
55 | private readonly videoSessions = new Map<number, string>() | 54 | private readonly videoSessions = new Map<number, string>() |
56 | // Values are Date().getTime() | ||
57 | private readonly watchersPerVideo = new Map<number, number[]>() | ||
58 | 55 | ||
59 | private rtmpServer: Server | 56 | private rtmpServer: Server |
60 | private rtmpsServer: ServerTLS | 57 | private rtmpsServer: ServerTLS |
@@ -99,8 +96,6 @@ class LiveManager { | |||
99 | // Cleanup broken lives, that were terminated by a server restart for example | 96 | // Cleanup broken lives, that were terminated by a server restart for example |
100 | this.handleBrokenLives() | 97 | this.handleBrokenLives() |
101 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) | 98 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) |
102 | |||
103 | setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) | ||
104 | } | 99 | } |
105 | 100 | ||
106 | async run () { | 101 | async run () { |
@@ -184,19 +179,6 @@ class LiveManager { | |||
184 | this.abortSession(sessionId) | 179 | this.abortSession(sessionId) |
185 | } | 180 | } |
186 | 181 | ||
187 | addViewTo (videoId: number) { | ||
188 | if (this.videoSessions.has(videoId) === false) return | ||
189 | |||
190 | let watchers = this.watchersPerVideo.get(videoId) | ||
191 | |||
192 | if (!watchers) { | ||
193 | watchers = [] | ||
194 | this.watchersPerVideo.set(videoId, watchers) | ||
195 | } | ||
196 | |||
197 | watchers.push(new Date().getTime()) | ||
198 | } | ||
199 | |||
200 | private getContext () { | 182 | private getContext () { |
201 | return context | 183 | return context |
202 | } | 184 | } |
@@ -377,7 +359,6 @@ class LiveManager { | |||
377 | } | 359 | } |
378 | 360 | ||
379 | private onMuxingFFmpegEnd (videoId: number) { | 361 | private onMuxingFFmpegEnd (videoId: number) { |
380 | this.watchersPerVideo.delete(videoId) | ||
381 | this.videoSessions.delete(videoId) | 362 | this.videoSessions.delete(videoId) |
382 | } | 363 | } |
383 | 364 | ||
@@ -411,34 +392,6 @@ class LiveManager { | |||
411 | } | 392 | } |
412 | } | 393 | } |
413 | 394 | ||
414 | private async updateLiveViews () { | ||
415 | if (!this.isRunning()) return | ||
416 | |||
417 | if (!isTestInstance()) logger.info('Updating live video views.', lTags()) | ||
418 | |||
419 | for (const videoId of this.watchersPerVideo.keys()) { | ||
420 | const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE | ||
421 | |||
422 | const watchers = this.watchersPerVideo.get(videoId) | ||
423 | |||
424 | const numWatchers = watchers.length | ||
425 | |||
426 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
427 | video.views = numWatchers | ||
428 | await video.save() | ||
429 | |||
430 | await federateVideoIfNeeded(video, false) | ||
431 | |||
432 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video) | ||
433 | |||
434 | // Only keep not expired watchers | ||
435 | const newWatchers = watchers.filter(w => w > notBefore) | ||
436 | this.watchersPerVideo.set(videoId, newWatchers) | ||
437 | |||
438 | logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) | ||
439 | } | ||
440 | } | ||
441 | |||
442 | private async handleBrokenLives () { | 395 | private async handleBrokenLives () { |
443 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | 396 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() |
444 | 397 | ||
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 901435dea..0398ca61d 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Server as HTTPServer } from 'http' | 1 | import { Server as HTTPServer } from 'http' |
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | 2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' |
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | 3 | import { isIdValid } from '@server/helpers/custom-validators/misc' |
4 | import { MVideo } from '@server/types/models' | 4 | import { MVideo, MVideoImmutable } from '@server/types/models' |
5 | import { UserNotificationModelForApi } from '@server/types/models/user' | 5 | import { UserNotificationModelForApi } from '@server/types/models/user' |
6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | 6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' |
7 | import { logger } from '../helpers/logger' | 7 | import { logger } from '../helpers/logger' |
@@ -78,11 +78,11 @@ class PeerTubeSocket { | |||
78 | .emit(type, data) | 78 | .emit(type, data) |
79 | } | 79 | } |
80 | 80 | ||
81 | sendVideoViewsUpdate (video: MVideo) { | 81 | sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { |
82 | const data: LiveVideoEventPayload = { views: video.views } | 82 | const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } |
83 | const type: LiveVideoEventType = 'views-change' | 83 | const type: LiveVideoEventType = 'views-change' |
84 | 84 | ||
85 | logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) | 85 | logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) |
86 | 86 | ||
87 | this.liveVideosNamespace | 87 | this.liveVideosNamespace |
88 | .in(video.id) | 88 | .in(video.id) |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 46617b07e..76b7868e8 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -13,6 +13,7 @@ import { | |||
13 | RESUMABLE_UPLOAD_SESSION_LIFETIME | 13 | RESUMABLE_UPLOAD_SESSION_LIFETIME |
14 | } from '../initializers/constants' | 14 | } from '../initializers/constants' |
15 | import { CONFIG } from '../initializers/config' | 15 | import { CONFIG } from '../initializers/config' |
16 | import { exists } from '@server/helpers/custom-validators/misc' | ||
16 | 17 | ||
17 | type CachedRoute = { | 18 | type CachedRoute = { |
18 | body: string | 19 | body: string |
@@ -119,16 +120,20 @@ class Redis { | |||
119 | 120 | ||
120 | /* ************ Views per IP ************ */ | 121 | /* ************ Views per IP ************ */ |
121 | 122 | ||
122 | setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { | 123 | setIPVideoView (ip: string, videoUUID: string) { |
123 | const lifetime = isLive | 124 | return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW) |
124 | ? VIEW_LIFETIME.LIVE | 125 | } |
125 | : VIEW_LIFETIME.VIDEO | ||
126 | 126 | ||
127 | return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) | 127 | setIPVideoViewer (ip: string, videoUUID: string) { |
128 | return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER) | ||
128 | } | 129 | } |
129 | 130 | ||
130 | async doesVideoIPViewExist (ip: string, videoUUID: string) { | 131 | async doesVideoIPViewExist (ip: string, videoUUID: string) { |
131 | return this.exists(this.generateViewKey(ip, videoUUID)) | 132 | return this.exists(this.generateIPViewKey(ip, videoUUID)) |
133 | } | ||
134 | |||
135 | async doesVideoIPViewerExist (ip: string, videoUUID: string) { | ||
136 | return this.exists(this.generateIPViewerKey(ip, videoUUID)) | ||
132 | } | 137 | } |
133 | 138 | ||
134 | /* ************ Tracker IP block ************ */ | 139 | /* ************ Tracker IP block ************ */ |
@@ -160,46 +165,85 @@ class Redis { | |||
160 | return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) | 165 | return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) |
161 | } | 166 | } |
162 | 167 | ||
163 | /* ************ Video views ************ */ | 168 | /* ************ Video views stats ************ */ |
164 | 169 | ||
165 | addVideoView (videoId: number) { | 170 | addVideoViewStats (videoId: number) { |
166 | const keyIncr = this.generateVideoViewKey(videoId) | 171 | const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId }) |
167 | const keySet = this.generateVideosViewKey() | ||
168 | 172 | ||
169 | return Promise.all([ | 173 | return Promise.all([ |
170 | this.addToSet(keySet, videoId.toString()), | 174 | this.addToSet(setKey, videoId.toString()), |
171 | this.increment(keyIncr) | 175 | this.increment(videoKey) |
172 | ]) | 176 | ]) |
173 | } | 177 | } |
174 | 178 | ||
175 | async getVideoViews (videoId: number, hour: number) { | 179 | async getVideoViewsStats (videoId: number, hour: number) { |
176 | const key = this.generateVideoViewKey(videoId, hour) | 180 | const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) |
177 | 181 | ||
178 | const valueString = await this.getValue(key) | 182 | const valueString = await this.getValue(videoKey) |
179 | const valueInt = parseInt(valueString, 10) | 183 | const valueInt = parseInt(valueString, 10) |
180 | 184 | ||
181 | if (isNaN(valueInt)) { | 185 | if (isNaN(valueInt)) { |
182 | logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) | 186 | logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) |
183 | return undefined | 187 | return undefined |
184 | } | 188 | } |
185 | 189 | ||
186 | return valueInt | 190 | return valueInt |
187 | } | 191 | } |
188 | 192 | ||
189 | async getVideosIdViewed (hour: number) { | 193 | async listVideosViewedForStats (hour: number) { |
190 | const key = this.generateVideosViewKey(hour) | 194 | const { setKey } = this.generateVideoViewStatsKeys({ hour }) |
191 | 195 | ||
192 | const stringIds = await this.getSet(key) | 196 | const stringIds = await this.getSet(setKey) |
193 | return stringIds.map(s => parseInt(s, 10)) | 197 | return stringIds.map(s => parseInt(s, 10)) |
194 | } | 198 | } |
195 | 199 | ||
196 | deleteVideoViews (videoId: number, hour: number) { | 200 | deleteVideoViewsStats (videoId: number, hour: number) { |
197 | const keySet = this.generateVideosViewKey(hour) | 201 | const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) |
198 | const keyIncr = this.generateVideoViewKey(videoId, hour) | 202 | |
203 | return Promise.all([ | ||
204 | this.deleteFromSet(setKey, videoId.toString()), | ||
205 | this.deleteKey(videoKey) | ||
206 | ]) | ||
207 | } | ||
208 | |||
209 | /* ************ Local video views buffer ************ */ | ||
210 | |||
211 | addLocalVideoView (videoId: number) { | ||
212 | const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId) | ||
199 | 213 | ||
200 | return Promise.all([ | 214 | return Promise.all([ |
201 | this.deleteFromSet(keySet, videoId.toString()), | 215 | this.addToSet(setKey, videoId.toString()), |
202 | this.deleteKey(keyIncr) | 216 | this.increment(videoKey) |
217 | ]) | ||
218 | } | ||
219 | |||
220 | async getLocalVideoViews (videoId: number) { | ||
221 | const { videoKey } = this.generateLocalVideoViewsKeys(videoId) | ||
222 | |||
223 | const valueString = await this.getValue(videoKey) | ||
224 | const valueInt = parseInt(valueString, 10) | ||
225 | |||
226 | if (isNaN(valueInt)) { | ||
227 | logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString) | ||
228 | return undefined | ||
229 | } | ||
230 | |||
231 | return valueInt | ||
232 | } | ||
233 | |||
234 | async listLocalVideosViewed () { | ||
235 | const { setKey } = this.generateLocalVideoViewsKeys() | ||
236 | |||
237 | const stringIds = await this.getSet(setKey) | ||
238 | return stringIds.map(s => parseInt(s, 10)) | ||
239 | } | ||
240 | |||
241 | deleteLocalVideoViews (videoId: number) { | ||
242 | const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId) | ||
243 | |||
244 | return Promise.all([ | ||
245 | this.deleteFromSet(setKey, videoId.toString()), | ||
246 | this.deleteKey(videoKey) | ||
203 | ]) | 247 | ]) |
204 | } | 248 | } |
205 | 249 | ||
@@ -233,16 +277,16 @@ class Redis { | |||
233 | return req.method + '-' + req.originalUrl | 277 | return req.method + '-' + req.originalUrl |
234 | } | 278 | } |
235 | 279 | ||
236 | private generateVideosViewKey (hour?: number) { | 280 | private generateLocalVideoViewsKeys (videoId?: Number) { |
237 | if (!hour) hour = new Date().getHours() | 281 | return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } |
238 | |||
239 | return `videos-view-h${hour}` | ||
240 | } | 282 | } |
241 | 283 | ||
242 | private generateVideoViewKey (videoId: number, hour?: number) { | 284 | private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { |
243 | if (hour === undefined || hour === null) hour = new Date().getHours() | 285 | const hour = exists(options.hour) |
286 | ? options.hour | ||
287 | : new Date().getHours() | ||
244 | 288 | ||
245 | return `video-view-${videoId}-h${hour}` | 289 | return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` } |
246 | } | 290 | } |
247 | 291 | ||
248 | private generateResetPasswordKey (userId: number) { | 292 | private generateResetPasswordKey (userId: number) { |
@@ -253,10 +297,14 @@ class Redis { | |||
253 | return 'verify-email-' + userId | 297 | return 'verify-email-' + userId |
254 | } | 298 | } |
255 | 299 | ||
256 | private generateViewKey (ip: string, videoUUID: string) { | 300 | private generateIPViewKey (ip: string, videoUUID: string) { |
257 | return `views-${videoUUID}-${ip}` | 301 | return `views-${videoUUID}-${ip}` |
258 | } | 302 | } |
259 | 303 | ||
304 | private generateIPViewerKey (ip: string, videoUUID: string) { | ||
305 | return `viewer-${videoUUID}-${ip}` | ||
306 | } | ||
307 | |||
260 | private generateTrackerBlockIPKey (ip: string) { | 308 | private generateTrackerBlockIPKey (ip: string) { |
261 | return `tracker-block-ip-${ip}` | 309 | return `tracker-block-ip-${ip}` |
262 | } | 310 | } |
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts new file mode 100644 index 000000000..c0e72c461 --- /dev/null +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts | |||
@@ -0,0 +1,52 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VideoModel } from '@server/models/video/video' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { federateVideoIfNeeded } from '../activitypub/videos' | ||
5 | import { Redis } from '../redis' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViewsBufferScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const videoIds = await Redis.Instance.listLocalVideosViewed() | ||
22 | if (videoIds.length === 0) return | ||
23 | |||
24 | logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) | ||
25 | |||
26 | for (const videoId of videoIds) { | ||
27 | try { | ||
28 | const views = await Redis.Instance.getLocalVideoViews(videoId) | ||
29 | await Redis.Instance.deleteLocalVideoViews(videoId) | ||
30 | |||
31 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
32 | if (!video) { | ||
33 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) | ||
34 | continue | ||
35 | } | ||
36 | |||
37 | // If this is a remote video, the origin instance will send us an update | ||
38 | await VideoModel.incrementViews(videoId, views) | ||
39 | |||
40 | // Send video update | ||
41 | video.views += views | ||
42 | await federateVideoIfNeeded(video, false) | ||
43 | } catch (err) { | ||
44 | logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) | ||
45 | } | ||
46 | } | ||
47 | } | ||
48 | |||
49 | static get Instance () { | ||
50 | return this.instance || (this.instance = new this()) | ||
51 | } | ||
52 | } | ||
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts new file mode 100644 index 000000000..220b509c2 --- /dev/null +++ b/server/lib/video-views.ts | |||
@@ -0,0 +1,130 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VIEW_LIFETIME } from '@server/initializers/constants' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { PeerTubeSocket } from './peertube-socket' | ||
6 | import { Redis } from './redis' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViews { | ||
11 | |||
12 | // Values are Date().getTime() | ||
13 | private readonly viewersPerVideo = new Map<number, number[]>() | ||
14 | |||
15 | private static instance: VideoViews | ||
16 | |||
17 | private constructor () { | ||
18 | } | ||
19 | |||
20 | init () { | ||
21 | setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) | ||
22 | } | ||
23 | |||
24 | async processView (options: { | ||
25 | video: MVideo | ||
26 | ip: string | null | ||
27 | viewerExpires?: Date | ||
28 | }) { | ||
29 | const { video, ip, viewerExpires } = options | ||
30 | |||
31 | logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) | ||
32 | |||
33 | let success = await this.addView(video, ip) | ||
34 | |||
35 | if (video.isLive) { | ||
36 | const successViewer = await this.addViewer(video, ip, viewerExpires) | ||
37 | success ||= successViewer | ||
38 | } | ||
39 | |||
40 | return success | ||
41 | } | ||
42 | |||
43 | getViewers (video: MVideo) { | ||
44 | const viewers = this.viewersPerVideo.get(video.id) | ||
45 | if (!viewers) return 0 | ||
46 | |||
47 | return viewers.length | ||
48 | } | ||
49 | |||
50 | buildViewerExpireTime () { | ||
51 | return new Date().getTime() + VIEW_LIFETIME.VIEWER | ||
52 | } | ||
53 | |||
54 | private async addView (video: MVideo, ip: string | null) { | ||
55 | const promises: Promise<any>[] = [] | ||
56 | |||
57 | if (ip !== null) { | ||
58 | const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) | ||
59 | if (viewExists) return false | ||
60 | |||
61 | promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) | ||
62 | } | ||
63 | |||
64 | if (video.isOwned()) { | ||
65 | promises.push(Redis.Instance.addLocalVideoView(video.id)) | ||
66 | } | ||
67 | |||
68 | promises.push(Redis.Instance.addVideoViewStats(video.id)) | ||
69 | |||
70 | await Promise.all(promises) | ||
71 | |||
72 | return true | ||
73 | } | ||
74 | |||
75 | private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { | ||
76 | if (ip !== null) { | ||
77 | const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) | ||
78 | if (viewExists) return false | ||
79 | |||
80 | await Redis.Instance.setIPVideoViewer(ip, video.uuid) | ||
81 | } | ||
82 | |||
83 | let watchers = this.viewersPerVideo.get(video.id) | ||
84 | |||
85 | if (!watchers) { | ||
86 | watchers = [] | ||
87 | this.viewersPerVideo.set(video.id, watchers) | ||
88 | } | ||
89 | |||
90 | const expiration = viewerExpires | ||
91 | ? viewerExpires.getTime() | ||
92 | : this.buildViewerExpireTime() | ||
93 | |||
94 | watchers.push(expiration) | ||
95 | await this.notifyClients(video.id, watchers.length) | ||
96 | |||
97 | return true | ||
98 | } | ||
99 | |||
100 | private async cleanViewers () { | ||
101 | logger.info('Cleaning video viewers.', lTags()) | ||
102 | |||
103 | for (const videoId of this.viewersPerVideo.keys()) { | ||
104 | const notBefore = new Date().getTime() | ||
105 | |||
106 | const viewers = this.viewersPerVideo.get(videoId) | ||
107 | |||
108 | // Only keep not expired viewers | ||
109 | const newViewers = viewers.filter(w => w > notBefore) | ||
110 | |||
111 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | ||
112 | else this.viewersPerVideo.set(videoId, newViewers) | ||
113 | |||
114 | await this.notifyClients(videoId, newViewers.length) | ||
115 | } | ||
116 | } | ||
117 | |||
118 | private async notifyClients (videoId: string | number, viewersLength: number) { | ||
119 | const video = await VideoModel.loadImmutableAttributes(videoId) | ||
120 | if (!video) return | ||
121 | |||
122 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | ||
123 | |||
124 | logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) | ||
125 | } | ||
126 | |||
127 | static get Instance () { | ||
128 | return this.instance || (this.instance = new this()) | ||
129 | } | ||
130 | } | ||
diff --git a/server/models/video/formatter/video-format-utils.ts b/server/models/video/formatter/video-format-utils.ts index ba49e41ae..461e296df 100644 --- a/server/models/video/formatter/video-format-utils.ts +++ b/server/models/video/formatter/video-format-utils.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | import { uuidToShort } from '@server/helpers/uuid' | 1 | import { uuidToShort } from '@server/helpers/uuid' |
2 | import { generateMagnetUri } from '@server/helpers/webtorrent' | 2 | import { generateMagnetUri } from '@server/helpers/webtorrent' |
3 | import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' | 3 | import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' |
4 | import { VideoViews } from '@server/lib/video-views' | ||
4 | import { VideosCommonQueryAfterSanitize } from '@shared/models' | 5 | import { VideosCommonQueryAfterSanitize } from '@shared/models' |
5 | import { VideoFile } from '@shared/models/videos/video-file.model' | 6 | import { VideoFile } from '@shared/models/videos/video-file.model' |
6 | import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' | 7 | import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' |
@@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm | |||
121 | pluginData: (video as any).pluginData | 122 | pluginData: (video as any).pluginData |
122 | } | 123 | } |
123 | 124 | ||
125 | if (video.isLive) { | ||
126 | videoObject.viewers = VideoViews.Instance.getViewers(video) | ||
127 | } | ||
128 | |||
124 | const add = options.additionalAttributes | 129 | const add = options.additionalAttributes |
125 | if (add?.state === true) { | 130 | if (add?.state === true) { |
126 | videoObject.state = { | 131 | videoObject.state = { |
diff --git a/server/tests/api/live/live-views.ts b/server/tests/api/live/live-views.ts index 5e3a79c64..9186af8e7 100644 --- a/server/tests/api/live/live-views.ts +++ b/server/tests/api/live/live-views.ts | |||
@@ -19,7 +19,7 @@ import { | |||
19 | 19 | ||
20 | const expect = chai.expect | 20 | const expect = chai.expect |
21 | 21 | ||
22 | describe('Test live', function () { | 22 | describe('Live views', function () { |
23 | let servers: PeerTubeServer[] = [] | 23 | let servers: PeerTubeServer[] = [] |
24 | 24 | ||
25 | before(async function () { | 25 | before(async function () { |
@@ -47,79 +47,86 @@ describe('Test live', function () { | |||
47 | await doubleFollow(servers[0], servers[1]) | 47 | await doubleFollow(servers[0], servers[1]) |
48 | }) | 48 | }) |
49 | 49 | ||
50 | describe('Live views', function () { | 50 | let liveVideoId: string |
51 | let liveVideoId: string | 51 | let command: FfmpegCommand |
52 | let command: FfmpegCommand | ||
53 | 52 | ||
54 | async function countViews (expected: number) { | 53 | async function countViewers (expectedViewers: number) { |
55 | for (const server of servers) { | 54 | for (const server of servers) { |
56 | const video = await server.videos.get({ id: liveVideoId }) | 55 | const video = await server.videos.get({ id: liveVideoId }) |
57 | expect(video.views).to.equal(expected) | 56 | expect(video.viewers).to.equal(expectedViewers) |
58 | } | ||
59 | } | 57 | } |
58 | } | ||
60 | 59 | ||
61 | before(async function () { | 60 | async function countViews (expectedViews: number) { |
62 | this.timeout(30000) | 61 | for (const server of servers) { |
62 | const video = await server.videos.get({ id: liveVideoId }) | ||
63 | expect(video.views).to.equal(expectedViews) | ||
64 | } | ||
65 | } | ||
63 | 66 | ||
64 | const liveAttributes = { | 67 | before(async function () { |
65 | name: 'live video', | 68 | this.timeout(30000) |
66 | channelId: servers[0].store.channel.id, | ||
67 | privacy: VideoPrivacy.PUBLIC | ||
68 | } | ||
69 | 69 | ||
70 | const live = await servers[0].live.create({ fields: liveAttributes }) | 70 | const liveAttributes = { |
71 | liveVideoId = live.uuid | 71 | name: 'live video', |
72 | channelId: servers[0].store.channel.id, | ||
73 | privacy: VideoPrivacy.PUBLIC | ||
74 | } | ||
72 | 75 | ||
73 | command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) | 76 | const live = await servers[0].live.create({ fields: liveAttributes }) |
74 | await waitUntilLivePublishedOnAllServers(servers, liveVideoId) | 77 | liveVideoId = live.uuid |
75 | await waitJobs(servers) | ||
76 | }) | ||
77 | 78 | ||
78 | it('Should display no views for a live', async function () { | 79 | command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) |
79 | await countViews(0) | 80 | await waitUntilLivePublishedOnAllServers(servers, liveVideoId) |
80 | }) | 81 | await waitJobs(servers) |
82 | }) | ||
81 | 83 | ||
82 | it('Should view a live twice and display 1 view', async function () { | 84 | it('Should display no views and viewers for a live', async function () { |
83 | this.timeout(30000) | 85 | await countViews(0) |
86 | await countViewers(0) | ||
87 | }) | ||
84 | 88 | ||
85 | await servers[0].videos.view({ id: liveVideoId }) | 89 | it('Should view a live twice and display 1 view/viewer', async function () { |
86 | await servers[0].videos.view({ id: liveVideoId }) | 90 | this.timeout(30000) |
87 | 91 | ||
88 | await wait(7000) | 92 | await servers[0].videos.view({ id: liveVideoId }) |
93 | await servers[0].videos.view({ id: liveVideoId }) | ||
89 | 94 | ||
90 | await waitJobs(servers) | 95 | await waitJobs(servers) |
96 | await countViewers(1) | ||
91 | 97 | ||
92 | await countViews(1) | 98 | await wait(7000) |
93 | }) | 99 | await countViews(1) |
100 | }) | ||
94 | 101 | ||
95 | it('Should wait and display 0 views', async function () { | 102 | it('Should wait and display 0 viewers while still have 1 view', async function () { |
96 | this.timeout(30000) | 103 | this.timeout(30000) |
97 | 104 | ||
98 | await wait(12000) | 105 | await wait(12000) |
99 | await waitJobs(servers) | 106 | await waitJobs(servers) |
100 | 107 | ||
101 | await countViews(0) | 108 | await countViews(1) |
102 | }) | 109 | await countViewers(0) |
110 | }) | ||
103 | 111 | ||
104 | it('Should view a live on a remote and on local and display 2 views', async function () { | 112 | it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () { |
105 | this.timeout(30000) | 113 | this.timeout(30000) |
106 | 114 | ||
107 | await servers[0].videos.view({ id: liveVideoId }) | 115 | await servers[0].videos.view({ id: liveVideoId }) |
108 | await servers[1].videos.view({ id: liveVideoId }) | 116 | await servers[1].videos.view({ id: liveVideoId }) |
109 | await servers[1].videos.view({ id: liveVideoId }) | 117 | await servers[1].videos.view({ id: liveVideoId }) |
118 | await waitJobs(servers) | ||
110 | 119 | ||
111 | await wait(7000) | 120 | await countViewers(2) |
112 | await waitJobs(servers) | ||
113 | 121 | ||
114 | await countViews(2) | 122 | await wait(7000) |
115 | }) | 123 | await waitJobs(servers) |
116 | 124 | ||
117 | after(async function () { | 125 | await countViews(3) |
118 | await stopFfmpeg(command) | ||
119 | }) | ||
120 | }) | 126 | }) |
121 | 127 | ||
122 | after(async function () { | 128 | after(async function () { |
129 | await stopFfmpeg(command) | ||
123 | await cleanupTests(servers) | 130 | await cleanupTests(servers) |
124 | }) | 131 | }) |
125 | }) | 132 | }) |
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 8c4e01226..5d946f5e8 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts | |||
@@ -56,7 +56,7 @@ describe('Test jobs', function () { | |||
56 | 56 | ||
57 | let job = body.data[0] | 57 | let job = body.data[0] |
58 | // Skip repeat jobs | 58 | // Skip repeat jobs |
59 | if (job.type === 'videos-views') job = body.data[1] | 59 | if (job.type === 'videos-views-stats') job = body.data[1] |
60 | 60 | ||
61 | expect(job.state).to.equal('completed') | 61 | expect(job.state).to.equal('completed') |
62 | expect(job.type.startsWith('activitypub-')).to.be.true | 62 | expect(job.type.startsWith('activitypub-')).to.be.true |