diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/controllers/api/videos/index.ts | 44 | ||||
-rw-r--r-- | server/initializers/checker-before-init.ts | 2 | ||||
-rw-r--r-- | server/initializers/config.ts | 4 | ||||
-rw-r--r-- | server/initializers/constants.ts | 20 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-view.ts | 32 | ||||
-rw-r--r-- | server/lib/activitypub/send/send-view.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/videos/updater.ts | 1 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts) | 36 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 12 | ||||
-rw-r--r-- | server/lib/live/live-manager.ts | 49 | ||||
-rw-r--r-- | server/lib/peertube-socket.ts | 8 | ||||
-rw-r--r-- | server/lib/redis.ts | 112 | ||||
-rw-r--r-- | server/lib/schedulers/video-views-buffer-scheduler.ts | 52 | ||||
-rw-r--r-- | server/lib/video-views.ts | 130 | ||||
-rw-r--r-- | server/models/video/formatter/video-format-utils.ts | 5 | ||||
-rw-r--r-- | server/tests/api/live/live-views.ts | 109 | ||||
-rw-r--r-- | server/tests/api/server/jobs.ts | 2 |
17 files changed, 388 insertions, 234 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index 821161c64..72b382595 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts | |||
@@ -2,7 +2,7 @@ import express from 'express' | |||
2 | import toInt from 'validator/lib/toInt' | 2 | import toInt from 'validator/lib/toInt' |
3 | import { pickCommonVideoQuery } from '@server/helpers/query' | 3 | import { pickCommonVideoQuery } from '@server/helpers/query' |
4 | import { doJSONRequest } from '@server/helpers/requests' | 4 | import { doJSONRequest } from '@server/helpers/requests' |
5 | import { LiveManager } from '@server/lib/live' | 5 | import { VideoViews } from '@server/lib/video-views' |
6 | import { openapiOperationDoc } from '@server/middlewares/doc' | 6 | import { openapiOperationDoc } from '@server/middlewares/doc' |
7 | import { getServerActor } from '@server/models/application/application' | 7 | import { getServerActor } from '@server/models/application/application' |
8 | import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' | 8 | import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' |
@@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database' | |||
17 | import { sendView } from '../../../lib/activitypub/send/send-view' | 17 | import { sendView } from '../../../lib/activitypub/send/send-view' |
18 | import { JobQueue } from '../../../lib/job-queue' | 18 | import { JobQueue } from '../../../lib/job-queue' |
19 | import { Hooks } from '../../../lib/plugins/hooks' | 19 | import { Hooks } from '../../../lib/plugins/hooks' |
20 | import { Redis } from '../../../lib/redis' | ||
21 | import { | 20 | import { |
22 | asyncMiddleware, | 21 | asyncMiddleware, |
23 | asyncRetryTransactionMiddleware, | 22 | asyncRetryTransactionMiddleware, |
@@ -107,7 +106,7 @@ videosRouter.get('/:id', | |||
107 | ) | 106 | ) |
108 | videosRouter.post('/:id/views', | 107 | videosRouter.post('/:id/views', |
109 | openapiOperationDoc({ operationId: 'addView' }), | 108 | openapiOperationDoc({ operationId: 'addView' }), |
110 | asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')), | 109 | asyncMiddleware(videosCustomGetValidator('only-video')), |
111 | asyncMiddleware(viewVideo) | 110 | asyncMiddleware(viewVideo) |
112 | ) | 111 | ) |
113 | 112 | ||
@@ -153,44 +152,17 @@ function getVideo (_req: express.Request, res: express.Response) { | |||
153 | } | 152 | } |
154 | 153 | ||
155 | async function viewVideo (req: express.Request, res: express.Response) { | 154 | async function viewVideo (req: express.Request, res: express.Response) { |
156 | const immutableVideoAttrs = res.locals.onlyImmutableVideo | 155 | const video = res.locals.onlyVideo |
157 | 156 | ||
158 | const ip = req.ip | 157 | const ip = req.ip |
159 | const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) | 158 | const success = await VideoViews.Instance.processView({ video, ip }) |
160 | if (exists) { | ||
161 | logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid) | ||
162 | return res.status(HttpStatusCode.NO_CONTENT_204).end() | ||
163 | } | ||
164 | |||
165 | const video = await VideoModel.load(immutableVideoAttrs.id) | ||
166 | |||
167 | const promises: Promise<any>[] = [ | ||
168 | Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive) | ||
169 | ] | ||
170 | |||
171 | let federateView = true | ||
172 | |||
173 | // Increment our live manager | ||
174 | if (video.isLive && video.isOwned()) { | ||
175 | LiveManager.Instance.addViewTo(video.id) | ||
176 | |||
177 | // Views of our local live will be sent by our live manager | ||
178 | federateView = false | ||
179 | } | ||
180 | |||
181 | // Increment our video views cache counter | ||
182 | if (!video.isLive) { | ||
183 | promises.push(Redis.Instance.addVideoView(video.id)) | ||
184 | } | ||
185 | 159 | ||
186 | if (federateView) { | 160 | if (success) { |
187 | const serverActor = await getServerActor() | 161 | const serverActor = await getServerActor() |
188 | promises.push(sendView(serverActor, video, undefined)) | 162 | await sendView(serverActor, video, undefined) |
189 | } | ||
190 | |||
191 | await Promise.all(promises) | ||
192 | 163 | ||
193 | Hooks.runAction('action:api.video.viewed', { video, ip }) | 164 | Hooks.runAction('action:api.video.viewed', { video: video, ip }) |
165 | } | ||
194 | 166 | ||
195 | return res.status(HttpStatusCode.NO_CONTENT_204).end() | 167 | return res.status(HttpStatusCode.NO_CONTENT_204).end() |
196 | } | 168 | } |
diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts index 1015c5e45..51c396548 100644 --- a/server/initializers/checker-before-init.ts +++ b/server/initializers/checker-before-init.ts | |||
@@ -38,7 +38,7 @@ function checkMissedConfig () { | |||
38 | 'services.twitter.username', 'services.twitter.whitelisted', | 38 | 'services.twitter.username', 'services.twitter.whitelisted', |
39 | 'followers.instance.enabled', 'followers.instance.manual_approval', | 39 | 'followers.instance.enabled', 'followers.instance.manual_approval', |
40 | 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', | 40 | 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', |
41 | 'history.videos.max_age', 'views.videos.remote.max_age', | 41 | 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration', |
42 | 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', | 42 | 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', |
43 | 'theme.default', | 43 | 'theme.default', |
44 | 'remote_redundancy.videos.accept_from', | 44 | 'remote_redundancy.videos.accept_from', |
diff --git a/server/initializers/config.ts b/server/initializers/config.ts index 1288768d8..dadda2a77 100644 --- a/server/initializers/config.ts +++ b/server/initializers/config.ts | |||
@@ -182,7 +182,9 @@ const CONFIG = { | |||
182 | VIDEOS: { | 182 | VIDEOS: { |
183 | REMOTE: { | 183 | REMOTE: { |
184 | MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) | 184 | MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) |
185 | } | 185 | }, |
186 | LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')), | ||
187 | IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration')) | ||
186 | } | 188 | } |
187 | }, | 189 | }, |
188 | PLUGINS: { | 190 | PLUGINS: { |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 845576667..b65741bbd 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
148 | 'video-import': 1, | 148 | 'video-import': 1, |
149 | 'email': 5, | 149 | 'email': 5, |
150 | 'actor-keys': 3, | 150 | 'actor-keys': 3, |
151 | 'videos-views': 1, | 151 | 'videos-views-stats': 1, |
152 | 'activitypub-refresher': 1, | 152 | 'activitypub-refresher': 1, |
153 | 'video-redundancy': 1, | 153 | 'video-redundancy': 1, |
154 | 'video-live-ending': 1, | 154 | 'video-live-ending': 1, |
@@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im | |||
164 | 'video-file-import': 1, | 164 | 'video-file-import': 1, |
165 | 'email': 5, | 165 | 'email': 5, |
166 | 'actor-keys': 1, | 166 | 'actor-keys': 1, |
167 | 'videos-views': 1, | 167 | 'videos-views-stats': 1, |
168 | 'activitypub-refresher': 1, | 168 | 'activitypub-refresher': 1, |
169 | 'video-redundancy': 1, | 169 | 'video-redundancy': 1, |
170 | 'video-live-ending': 10, | 170 | 'video-live-ending': 10, |
@@ -181,14 +181,14 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
181 | 'video-import': 1000 * 3600 * 2, // 2 hours | 181 | 'video-import': 1000 * 3600 * 2, // 2 hours |
182 | 'email': 60000 * 10, // 10 minutes | 182 | 'email': 60000 * 10, // 10 minutes |
183 | 'actor-keys': 60000 * 20, // 20 minutes | 183 | 'actor-keys': 60000 * 20, // 20 minutes |
184 | 'videos-views': undefined, // Unlimited | 184 | 'videos-views-stats': undefined, // Unlimited |
185 | 'activitypub-refresher': 60000 * 10, // 10 minutes | 185 | 'activitypub-refresher': 60000 * 10, // 10 minutes |
186 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours | 186 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours |
187 | 'video-live-ending': 1000 * 60 * 10, // 10 minutes | 187 | 'video-live-ending': 1000 * 60 * 10, // 10 minutes |
188 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours | 188 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours |
189 | } | 189 | } |
190 | const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | 190 | const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = { |
191 | 'videos-views': { | 191 | 'videos-views-stats': { |
192 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour | 192 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour |
193 | }, | 193 | }, |
194 | 'activitypub-cleaner': { | 194 | 'activitypub-cleaner': { |
@@ -211,6 +211,7 @@ const SCHEDULER_INTERVALS_MS = { | |||
211 | REMOVE_OLD_JOBS: 60000 * 60, // 1 hour | 211 | REMOVE_OLD_JOBS: 60000 * 60, // 1 hour |
212 | UPDATE_VIDEOS: 60000, // 1 minute | 212 | UPDATE_VIDEOS: 60000, // 1 minute |
213 | YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day | 213 | YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day |
214 | VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL, | ||
214 | CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, | 215 | CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, |
215 | CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day | 216 | CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day |
216 | AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day | 217 | AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day |
@@ -343,8 +344,8 @@ const CONSTRAINTS_FIELDS = { | |||
343 | } | 344 | } |
344 | 345 | ||
345 | const VIEW_LIFETIME = { | 346 | const VIEW_LIFETIME = { |
346 | VIDEO: 60000 * 60, // 1 hour | 347 | VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION, |
347 | LIVE: 60000 * 5 // 5 minutes | 348 | VIEWER: 60000 * 5 // 5 minutes |
348 | } | 349 | } |
349 | 350 | ||
350 | let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour | 351 | let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour |
@@ -789,13 +790,12 @@ if (isTestInstance() === true) { | |||
789 | SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 | 790 | SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 |
790 | SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 | 791 | SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 |
791 | SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 | 792 | SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 |
792 | REPEAT_JOBS['videos-views'] = { every: 5000 } | 793 | REPEAT_JOBS['videos-views-stats'] = { every: 5000 } |
793 | REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } | 794 | REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } |
794 | 795 | ||
795 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 | 796 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 |
796 | 797 | ||
797 | VIEW_LIFETIME.VIDEO = 1000 // 1 second | 798 | VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second |
798 | VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second | ||
799 | CONTACT_FORM_LIFETIME = 1000 // 1 second | 799 | CONTACT_FORM_LIFETIME = 1000 // 1 second |
800 | 800 | ||
801 | JOB_ATTEMPTS['email'] = 1 | 801 | JOB_ATTEMPTS['email'] = 1 |
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index 5593ee257..720385f9b 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { getOrCreateAPVideo } from '../videos' | 1 | import { VideoViews } from '@server/lib/video-views' |
2 | import { forwardVideoRelatedActivity } from '../send/utils' | 2 | import { ActivityView } from '../../../../shared/models/activitypub' |
3 | import { Redis } from '../../redis' | ||
4 | import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' | ||
5 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' | 3 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' |
6 | import { MActorSignature } from '../../../types/models' | 4 | import { MActorSignature } from '../../../types/models' |
7 | import { LiveManager } from '@server/lib/live/live-manager' | 5 | import { forwardVideoRelatedActivity } from '../send/utils' |
6 | import { getOrCreateAPVideo } from '../videos' | ||
8 | 7 | ||
9 | async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { | 8 | async function processViewActivity (options: APProcessorOptions<ActivityView>) { |
10 | const { activity, byActor } = options | 9 | const { activity, byActor } = options |
10 | |||
11 | return processCreateView(activity, byActor) | 11 | return processCreateView(activity, byActor) |
12 | } | 12 | } |
13 | 13 | ||
@@ -19,10 +19,8 @@ export { | |||
19 | 19 | ||
20 | // --------------------------------------------------------------------------- | 20 | // --------------------------------------------------------------------------- |
21 | 21 | ||
22 | async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { | 22 | async function processCreateView (activity: ActivityView, byActor: MActorSignature) { |
23 | const videoObject = activity.type === 'View' | 23 | const videoObject = activity.object |
24 | ? activity.object | ||
25 | : (activity.object as ViewObject).object | ||
26 | 24 | ||
27 | const { video } = await getOrCreateAPVideo({ | 25 | const { video } = await getOrCreateAPVideo({ |
28 | videoObject, | 26 | videoObject, |
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct | |||
30 | allowRefresh: false | 28 | allowRefresh: false |
31 | }) | 29 | }) |
32 | 30 | ||
33 | if (!video.isLive) { | 31 | const viewerExpires = activity.expires |
34 | await Redis.Instance.addVideoView(video.id) | 32 | ? new Date(activity.expires) |
35 | } | 33 | : undefined |
36 | 34 | ||
37 | if (video.isOwned()) { | 35 | await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) |
38 | // Our live manager will increment the counter and send the view to followers | ||
39 | if (video.isLive) { | ||
40 | LiveManager.Instance.addViewTo(video.id) | ||
41 | return | ||
42 | } | ||
43 | 36 | ||
37 | if (video.isOwned()) { | ||
44 | // Forward the view but don't resend the activity to the sender | 38 | // Forward the view but don't resend the activity to the sender |
45 | const exceptions = [ byActor ] | 39 | const exceptions = [ byActor ] |
46 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) | 40 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 153e94295..b12583e26 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { VideoViews } from '@server/lib/video-views' | ||
2 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' | 3 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' |
3 | import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' | 4 | import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' |
4 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU | |||
27 | id: url, | 28 | id: url, |
28 | type: 'View' as 'View', | 29 | type: 'View' as 'View', |
29 | actor: byActor.url, | 30 | actor: byActor.url, |
30 | object: video.url | 31 | object: video.url, |
32 | expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() | ||
31 | }, | 33 | }, |
32 | audience | 34 | audience |
33 | ) | 35 | ) |
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts index 157569414..f786bb196 100644 --- a/server/lib/activitypub/videos/updater.ts +++ b/server/lib/activitypub/videos/updater.ts | |||
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder { | |||
81 | 81 | ||
82 | if (videoUpdated.isLive) { | 82 | if (videoUpdated.isLive) { |
83 | PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) | 83 | PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) |
84 | PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated) | ||
85 | } | 84 | } |
86 | 85 | ||
87 | logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) | 86 | logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts index 86d0a271f..caf5f6962 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { Redis } from '../../redis' | 1 | import { isTestInstance } from '../../../helpers/core-utils' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { VideoModel } from '../../../models/video/video' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-view' | 4 | import { VideoViewModel } from '../../../models/video/video-view' |
5 | import { isTestInstance } from '../../../helpers/core-utils' | 5 | import { Redis } from '../../redis' |
6 | import { federateVideoIfNeeded } from '../../activitypub/videos' | ||
7 | 6 | ||
8 | async function processVideosViews () { | 7 | async function processVideosViewsStats () { |
9 | const lastHour = new Date() | 8 | const lastHour = new Date() |
10 | 9 | ||
11 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour | 10 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour |
@@ -15,23 +14,23 @@ async function processVideosViews () { | |||
15 | const startDate = lastHour.setMinutes(0, 0, 0) | 14 | const startDate = lastHour.setMinutes(0, 0, 0) |
16 | const endDate = lastHour.setMinutes(59, 59, 999) | 15 | const endDate = lastHour.setMinutes(59, 59, 999) |
17 | 16 | ||
18 | const videoIds = await Redis.Instance.getVideosIdViewed(hour) | 17 | const videoIds = await Redis.Instance.listVideosViewedForStats(hour) |
19 | if (videoIds.length === 0) return | 18 | if (videoIds.length === 0) return |
20 | 19 | ||
21 | logger.info('Processing videos views in job for hour %d.', hour) | 20 | logger.info('Processing videos views stats in job for hour %d.', hour) |
22 | 21 | ||
23 | for (const videoId of videoIds) { | 22 | for (const videoId of videoIds) { |
24 | try { | 23 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 24 | const views = await Redis.Instance.getVideoViewsStats(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | 25 | await Redis.Instance.deleteVideoViewsStats(videoId, hour) |
27 | 26 | ||
28 | if (views) { | 27 | if (views) { |
29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 28 | logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) |
30 | 29 | ||
31 | try { | 30 | try { |
32 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | 31 | const video = await VideoModel.load(videoId) |
33 | if (!video) { | 32 | if (!video) { |
34 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) | 33 | logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) |
35 | continue | 34 | continue |
36 | } | 35 | } |
37 | 36 | ||
@@ -41,21 +40,12 @@ async function processVideosViews () { | |||
41 | views, | 40 | views, |
42 | videoId | 41 | videoId |
43 | }) | 42 | }) |
44 | |||
45 | if (video.isOwned()) { | ||
46 | // If this is a remote video, the origin instance will send us an update | ||
47 | await VideoModel.incrementViews(videoId, views) | ||
48 | |||
49 | // Send video update | ||
50 | video.views += views | ||
51 | await federateVideoIfNeeded(video, false) | ||
52 | } | ||
53 | } catch (err) { | 43 | } catch (err) { |
54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 44 | logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) |
55 | } | 45 | } |
56 | } | 46 | } |
57 | } catch (err) { | 47 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 48 | logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 49 | } |
60 | } | 50 | } |
61 | } | 51 | } |
@@ -63,5 +53,5 @@ async function processVideosViews () { | |||
63 | // --------------------------------------------------------------------------- | 53 | // --------------------------------------------------------------------------- |
64 | 54 | ||
65 | export { | 55 | export { |
66 | processVideosViews | 56 | processVideosViewsStats |
67 | } | 57 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0eab720d9..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import' | |||
36 | import { processVideoImport } from './handlers/video-import' | 36 | import { processVideoImport } from './handlers/video-import' |
37 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 37 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
38 | import { processVideoTranscoding } from './handlers/video-transcoding' | 38 | import { processVideoTranscoding } from './handlers/video-transcoding' |
39 | import { processVideosViews } from './handlers/video-views' | 39 | import { processVideosViewsStats } from './handlers/video-views-stats' |
40 | 40 | ||
41 | type CreateJobArgument = | 41 | type CreateJobArgument = |
42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -49,7 +49,7 @@ type CreateJobArgument = | |||
49 | { type: 'email', payload: EmailPayload } | | 49 | { type: 'email', payload: EmailPayload } | |
50 | { type: 'video-import', payload: VideoImportPayload } | | 50 | { type: 'video-import', payload: VideoImportPayload } | |
51 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 51 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
52 | { type: 'videos-views', payload: {} } | | 52 | { type: 'videos-views-stats', payload: {} } | |
53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | 53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | |
54 | { type: 'actor-keys', payload: ActorKeysPayload } | | 54 | { type: 'actor-keys', payload: ActorKeysPayload } | |
55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | 55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
71 | 'video-transcoding': processVideoTranscoding, | 71 | 'video-transcoding': processVideoTranscoding, |
72 | 'email': processEmail, | 72 | 'email': processEmail, |
73 | 'video-import': processVideoImport, | 73 | 'video-import': processVideoImport, |
74 | 'videos-views': processVideosViews, | 74 | 'videos-views-stats': processVideosViewsStats, |
75 | 'activitypub-refresher': refreshAPObject, | 75 | 'activitypub-refresher': refreshAPObject, |
76 | 'video-live-ending': processVideoLiveEnding, | 76 | 'video-live-ending': processVideoLiveEnding, |
77 | 'actor-keys': processActorKeys, | 77 | 'actor-keys': processActorKeys, |
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [ | |||
89 | 'video-transcoding', | 89 | 'video-transcoding', |
90 | 'video-file-import', | 90 | 'video-file-import', |
91 | 'video-import', | 91 | 'video-import', |
92 | 'videos-views', | 92 | 'videos-views-stats', |
93 | 'activitypub-refresher', | 93 | 'activitypub-refresher', |
94 | 'video-redundancy', | 94 | 'video-redundancy', |
95 | 'actor-keys', | 95 | 'actor-keys', |
@@ -247,8 +247,8 @@ class JobQueue { | |||
247 | } | 247 | } |
248 | 248 | ||
249 | private addRepeatableJobs () { | 249 | private addRepeatableJobs () { |
250 | this.queues['videos-views'].add({}, { | 250 | this.queues['videos-views-stats'].add({}, { |
251 | repeat: REPEAT_JOBS['videos-views'] | 251 | repeat: REPEAT_JOBS['videos-views-stats'] |
252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
253 | 253 | ||
254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1b7b9dd4d..2562edb75 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -2,7 +2,6 @@ | |||
2 | import { readFile } from 'fs-extra' | 2 | import { readFile } from 'fs-extra' |
3 | import { createServer, Server } from 'net' | 3 | import { createServer, Server } from 'net' |
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | 4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' |
5 | import { isTestInstance } from '@server/helpers/core-utils' | ||
6 | import { | 5 | import { |
7 | computeResolutionsToTranscode, | 6 | computeResolutionsToTranscode, |
8 | ffprobePromise, | 7 | ffprobePromise, |
@@ -12,7 +11,7 @@ import { | |||
12 | } from '@server/helpers/ffprobe-utils' | 11 | } from '@server/helpers/ffprobe-utils' |
13 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 12 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
14 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 13 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
15 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' | 14 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' |
16 | import { UserModel } from '@server/models/user/user' | 15 | import { UserModel } from '@server/models/user/user' |
17 | import { VideoModel } from '@server/models/video/video' | 16 | import { VideoModel } from '@server/models/video/video' |
18 | import { VideoLiveModel } from '@server/models/video/video-live' | 17 | import { VideoLiveModel } from '@server/models/video/video-live' |
@@ -53,8 +52,6 @@ class LiveManager { | |||
53 | 52 | ||
54 | private readonly muxingSessions = new Map<string, MuxingSession>() | 53 | private readonly muxingSessions = new Map<string, MuxingSession>() |
55 | private readonly videoSessions = new Map<number, string>() | 54 | private readonly videoSessions = new Map<number, string>() |
56 | // Values are Date().getTime() | ||
57 | private readonly watchersPerVideo = new Map<number, number[]>() | ||
58 | 55 | ||
59 | private rtmpServer: Server | 56 | private rtmpServer: Server |
60 | private rtmpsServer: ServerTLS | 57 | private rtmpsServer: ServerTLS |
@@ -99,8 +96,6 @@ class LiveManager { | |||
99 | // Cleanup broken lives, that were terminated by a server restart for example | 96 | // Cleanup broken lives, that were terminated by a server restart for example |
100 | this.handleBrokenLives() | 97 | this.handleBrokenLives() |
101 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) | 98 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) |
102 | |||
103 | setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) | ||
104 | } | 99 | } |
105 | 100 | ||
106 | async run () { | 101 | async run () { |
@@ -184,19 +179,6 @@ class LiveManager { | |||
184 | this.abortSession(sessionId) | 179 | this.abortSession(sessionId) |
185 | } | 180 | } |
186 | 181 | ||
187 | addViewTo (videoId: number) { | ||
188 | if (this.videoSessions.has(videoId) === false) return | ||
189 | |||
190 | let watchers = this.watchersPerVideo.get(videoId) | ||
191 | |||
192 | if (!watchers) { | ||
193 | watchers = [] | ||
194 | this.watchersPerVideo.set(videoId, watchers) | ||
195 | } | ||
196 | |||
197 | watchers.push(new Date().getTime()) | ||
198 | } | ||
199 | |||
200 | private getContext () { | 182 | private getContext () { |
201 | return context | 183 | return context |
202 | } | 184 | } |
@@ -377,7 +359,6 @@ class LiveManager { | |||
377 | } | 359 | } |
378 | 360 | ||
379 | private onMuxingFFmpegEnd (videoId: number) { | 361 | private onMuxingFFmpegEnd (videoId: number) { |
380 | this.watchersPerVideo.delete(videoId) | ||
381 | this.videoSessions.delete(videoId) | 362 | this.videoSessions.delete(videoId) |
382 | } | 363 | } |
383 | 364 | ||
@@ -411,34 +392,6 @@ class LiveManager { | |||
411 | } | 392 | } |
412 | } | 393 | } |
413 | 394 | ||
414 | private async updateLiveViews () { | ||
415 | if (!this.isRunning()) return | ||
416 | |||
417 | if (!isTestInstance()) logger.info('Updating live video views.', lTags()) | ||
418 | |||
419 | for (const videoId of this.watchersPerVideo.keys()) { | ||
420 | const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE | ||
421 | |||
422 | const watchers = this.watchersPerVideo.get(videoId) | ||
423 | |||
424 | const numWatchers = watchers.length | ||
425 | |||
426 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
427 | video.views = numWatchers | ||
428 | await video.save() | ||
429 | |||
430 | await federateVideoIfNeeded(video, false) | ||
431 | |||
432 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video) | ||
433 | |||
434 | // Only keep not expired watchers | ||
435 | const newWatchers = watchers.filter(w => w > notBefore) | ||
436 | this.watchersPerVideo.set(videoId, newWatchers) | ||
437 | |||
438 | logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) | ||
439 | } | ||
440 | } | ||
441 | |||
442 | private async handleBrokenLives () { | 395 | private async handleBrokenLives () { |
443 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | 396 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() |
444 | 397 | ||
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 901435dea..0398ca61d 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Server as HTTPServer } from 'http' | 1 | import { Server as HTTPServer } from 'http' |
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | 2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' |
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | 3 | import { isIdValid } from '@server/helpers/custom-validators/misc' |
4 | import { MVideo } from '@server/types/models' | 4 | import { MVideo, MVideoImmutable } from '@server/types/models' |
5 | import { UserNotificationModelForApi } from '@server/types/models/user' | 5 | import { UserNotificationModelForApi } from '@server/types/models/user' |
6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | 6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' |
7 | import { logger } from '../helpers/logger' | 7 | import { logger } from '../helpers/logger' |
@@ -78,11 +78,11 @@ class PeerTubeSocket { | |||
78 | .emit(type, data) | 78 | .emit(type, data) |
79 | } | 79 | } |
80 | 80 | ||
81 | sendVideoViewsUpdate (video: MVideo) { | 81 | sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { |
82 | const data: LiveVideoEventPayload = { views: video.views } | 82 | const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } |
83 | const type: LiveVideoEventType = 'views-change' | 83 | const type: LiveVideoEventType = 'views-change' |
84 | 84 | ||
85 | logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) | 85 | logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) |
86 | 86 | ||
87 | this.liveVideosNamespace | 87 | this.liveVideosNamespace |
88 | .in(video.id) | 88 | .in(video.id) |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 46617b07e..76b7868e8 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -13,6 +13,7 @@ import { | |||
13 | RESUMABLE_UPLOAD_SESSION_LIFETIME | 13 | RESUMABLE_UPLOAD_SESSION_LIFETIME |
14 | } from '../initializers/constants' | 14 | } from '../initializers/constants' |
15 | import { CONFIG } from '../initializers/config' | 15 | import { CONFIG } from '../initializers/config' |
16 | import { exists } from '@server/helpers/custom-validators/misc' | ||
16 | 17 | ||
17 | type CachedRoute = { | 18 | type CachedRoute = { |
18 | body: string | 19 | body: string |
@@ -119,16 +120,20 @@ class Redis { | |||
119 | 120 | ||
120 | /* ************ Views per IP ************ */ | 121 | /* ************ Views per IP ************ */ |
121 | 122 | ||
122 | setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { | 123 | setIPVideoView (ip: string, videoUUID: string) { |
123 | const lifetime = isLive | 124 | return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW) |
124 | ? VIEW_LIFETIME.LIVE | 125 | } |
125 | : VIEW_LIFETIME.VIDEO | ||
126 | 126 | ||
127 | return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) | 127 | setIPVideoViewer (ip: string, videoUUID: string) { |
128 | return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER) | ||
128 | } | 129 | } |
129 | 130 | ||
130 | async doesVideoIPViewExist (ip: string, videoUUID: string) { | 131 | async doesVideoIPViewExist (ip: string, videoUUID: string) { |
131 | return this.exists(this.generateViewKey(ip, videoUUID)) | 132 | return this.exists(this.generateIPViewKey(ip, videoUUID)) |
133 | } | ||
134 | |||
135 | async doesVideoIPViewerExist (ip: string, videoUUID: string) { | ||
136 | return this.exists(this.generateIPViewerKey(ip, videoUUID)) | ||
132 | } | 137 | } |
133 | 138 | ||
134 | /* ************ Tracker IP block ************ */ | 139 | /* ************ Tracker IP block ************ */ |
@@ -160,46 +165,85 @@ class Redis { | |||
160 | return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) | 165 | return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) |
161 | } | 166 | } |
162 | 167 | ||
163 | /* ************ Video views ************ */ | 168 | /* ************ Video views stats ************ */ |
164 | 169 | ||
165 | addVideoView (videoId: number) { | 170 | addVideoViewStats (videoId: number) { |
166 | const keyIncr = this.generateVideoViewKey(videoId) | 171 | const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId }) |
167 | const keySet = this.generateVideosViewKey() | ||
168 | 172 | ||
169 | return Promise.all([ | 173 | return Promise.all([ |
170 | this.addToSet(keySet, videoId.toString()), | 174 | this.addToSet(setKey, videoId.toString()), |
171 | this.increment(keyIncr) | 175 | this.increment(videoKey) |
172 | ]) | 176 | ]) |
173 | } | 177 | } |
174 | 178 | ||
175 | async getVideoViews (videoId: number, hour: number) { | 179 | async getVideoViewsStats (videoId: number, hour: number) { |
176 | const key = this.generateVideoViewKey(videoId, hour) | 180 | const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) |
177 | 181 | ||
178 | const valueString = await this.getValue(key) | 182 | const valueString = await this.getValue(videoKey) |
179 | const valueInt = parseInt(valueString, 10) | 183 | const valueInt = parseInt(valueString, 10) |
180 | 184 | ||
181 | if (isNaN(valueInt)) { | 185 | if (isNaN(valueInt)) { |
182 | logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) | 186 | logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) |
183 | return undefined | 187 | return undefined |
184 | } | 188 | } |
185 | 189 | ||
186 | return valueInt | 190 | return valueInt |
187 | } | 191 | } |
188 | 192 | ||
189 | async getVideosIdViewed (hour: number) { | 193 | async listVideosViewedForStats (hour: number) { |
190 | const key = this.generateVideosViewKey(hour) | 194 | const { setKey } = this.generateVideoViewStatsKeys({ hour }) |
191 | 195 | ||
192 | const stringIds = await this.getSet(key) | 196 | const stringIds = await this.getSet(setKey) |
193 | return stringIds.map(s => parseInt(s, 10)) | 197 | return stringIds.map(s => parseInt(s, 10)) |
194 | } | 198 | } |
195 | 199 | ||
196 | deleteVideoViews (videoId: number, hour: number) { | 200 | deleteVideoViewsStats (videoId: number, hour: number) { |
197 | const keySet = this.generateVideosViewKey(hour) | 201 | const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) |
198 | const keyIncr = this.generateVideoViewKey(videoId, hour) | 202 | |
203 | return Promise.all([ | ||
204 | this.deleteFromSet(setKey, videoId.toString()), | ||
205 | this.deleteKey(videoKey) | ||
206 | ]) | ||
207 | } | ||
208 | |||
209 | /* ************ Local video views buffer ************ */ | ||
210 | |||
211 | addLocalVideoView (videoId: number) { | ||
212 | const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId) | ||
199 | 213 | ||
200 | return Promise.all([ | 214 | return Promise.all([ |
201 | this.deleteFromSet(keySet, videoId.toString()), | 215 | this.addToSet(setKey, videoId.toString()), |
202 | this.deleteKey(keyIncr) | 216 | this.increment(videoKey) |
217 | ]) | ||
218 | } | ||
219 | |||
220 | async getLocalVideoViews (videoId: number) { | ||
221 | const { videoKey } = this.generateLocalVideoViewsKeys(videoId) | ||
222 | |||
223 | const valueString = await this.getValue(videoKey) | ||
224 | const valueInt = parseInt(valueString, 10) | ||
225 | |||
226 | if (isNaN(valueInt)) { | ||
227 | logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString) | ||
228 | return undefined | ||
229 | } | ||
230 | |||
231 | return valueInt | ||
232 | } | ||
233 | |||
234 | async listLocalVideosViewed () { | ||
235 | const { setKey } = this.generateLocalVideoViewsKeys() | ||
236 | |||
237 | const stringIds = await this.getSet(setKey) | ||
238 | return stringIds.map(s => parseInt(s, 10)) | ||
239 | } | ||
240 | |||
241 | deleteLocalVideoViews (videoId: number) { | ||
242 | const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId) | ||
243 | |||
244 | return Promise.all([ | ||
245 | this.deleteFromSet(setKey, videoId.toString()), | ||
246 | this.deleteKey(videoKey) | ||
203 | ]) | 247 | ]) |
204 | } | 248 | } |
205 | 249 | ||
@@ -233,16 +277,16 @@ class Redis { | |||
233 | return req.method + '-' + req.originalUrl | 277 | return req.method + '-' + req.originalUrl |
234 | } | 278 | } |
235 | 279 | ||
236 | private generateVideosViewKey (hour?: number) { | 280 | private generateLocalVideoViewsKeys (videoId?: Number) { |
237 | if (!hour) hour = new Date().getHours() | 281 | return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } |
238 | |||
239 | return `videos-view-h${hour}` | ||
240 | } | 282 | } |
241 | 283 | ||
242 | private generateVideoViewKey (videoId: number, hour?: number) { | 284 | private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { |
243 | if (hour === undefined || hour === null) hour = new Date().getHours() | 285 | const hour = exists(options.hour) |
286 | ? options.hour | ||
287 | : new Date().getHours() | ||
244 | 288 | ||
245 | return `video-view-${videoId}-h${hour}` | 289 | return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` } |
246 | } | 290 | } |
247 | 291 | ||
248 | private generateResetPasswordKey (userId: number) { | 292 | private generateResetPasswordKey (userId: number) { |
@@ -253,10 +297,14 @@ class Redis { | |||
253 | return 'verify-email-' + userId | 297 | return 'verify-email-' + userId |
254 | } | 298 | } |
255 | 299 | ||
256 | private generateViewKey (ip: string, videoUUID: string) { | 300 | private generateIPViewKey (ip: string, videoUUID: string) { |
257 | return `views-${videoUUID}-${ip}` | 301 | return `views-${videoUUID}-${ip}` |
258 | } | 302 | } |
259 | 303 | ||
304 | private generateIPViewerKey (ip: string, videoUUID: string) { | ||
305 | return `viewer-${videoUUID}-${ip}` | ||
306 | } | ||
307 | |||
260 | private generateTrackerBlockIPKey (ip: string) { | 308 | private generateTrackerBlockIPKey (ip: string) { |
261 | return `tracker-block-ip-${ip}` | 309 | return `tracker-block-ip-${ip}` |
262 | } | 310 | } |
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts new file mode 100644 index 000000000..c0e72c461 --- /dev/null +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts | |||
@@ -0,0 +1,52 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VideoModel } from '@server/models/video/video' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { federateVideoIfNeeded } from '../activitypub/videos' | ||
5 | import { Redis } from '../redis' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViewsBufferScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const videoIds = await Redis.Instance.listLocalVideosViewed() | ||
22 | if (videoIds.length === 0) return | ||
23 | |||
24 | logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) | ||
25 | |||
26 | for (const videoId of videoIds) { | ||
27 | try { | ||
28 | const views = await Redis.Instance.getLocalVideoViews(videoId) | ||
29 | await Redis.Instance.deleteLocalVideoViews(videoId) | ||
30 | |||
31 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
32 | if (!video) { | ||
33 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) | ||
34 | continue | ||
35 | } | ||
36 | |||
37 | // If this is a remote video, the origin instance will send us an update | ||
38 | await VideoModel.incrementViews(videoId, views) | ||
39 | |||
40 | // Send video update | ||
41 | video.views += views | ||
42 | await federateVideoIfNeeded(video, false) | ||
43 | } catch (err) { | ||
44 | logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) | ||
45 | } | ||
46 | } | ||
47 | } | ||
48 | |||
49 | static get Instance () { | ||
50 | return this.instance || (this.instance = new this()) | ||
51 | } | ||
52 | } | ||
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts new file mode 100644 index 000000000..220b509c2 --- /dev/null +++ b/server/lib/video-views.ts | |||
@@ -0,0 +1,130 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VIEW_LIFETIME } from '@server/initializers/constants' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { PeerTubeSocket } from './peertube-socket' | ||
6 | import { Redis } from './redis' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViews { | ||
11 | |||
12 | // Values are Date().getTime() | ||
13 | private readonly viewersPerVideo = new Map<number, number[]>() | ||
14 | |||
15 | private static instance: VideoViews | ||
16 | |||
17 | private constructor () { | ||
18 | } | ||
19 | |||
20 | init () { | ||
21 | setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) | ||
22 | } | ||
23 | |||
24 | async processView (options: { | ||
25 | video: MVideo | ||
26 | ip: string | null | ||
27 | viewerExpires?: Date | ||
28 | }) { | ||
29 | const { video, ip, viewerExpires } = options | ||
30 | |||
31 | logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) | ||
32 | |||
33 | let success = await this.addView(video, ip) | ||
34 | |||
35 | if (video.isLive) { | ||
36 | const successViewer = await this.addViewer(video, ip, viewerExpires) | ||
37 | success ||= successViewer | ||
38 | } | ||
39 | |||
40 | return success | ||
41 | } | ||
42 | |||
43 | getViewers (video: MVideo) { | ||
44 | const viewers = this.viewersPerVideo.get(video.id) | ||
45 | if (!viewers) return 0 | ||
46 | |||
47 | return viewers.length | ||
48 | } | ||
49 | |||
50 | buildViewerExpireTime () { | ||
51 | return new Date().getTime() + VIEW_LIFETIME.VIEWER | ||
52 | } | ||
53 | |||
54 | private async addView (video: MVideo, ip: string | null) { | ||
55 | const promises: Promise<any>[] = [] | ||
56 | |||
57 | if (ip !== null) { | ||
58 | const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) | ||
59 | if (viewExists) return false | ||
60 | |||
61 | promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) | ||
62 | } | ||
63 | |||
64 | if (video.isOwned()) { | ||
65 | promises.push(Redis.Instance.addLocalVideoView(video.id)) | ||
66 | } | ||
67 | |||
68 | promises.push(Redis.Instance.addVideoViewStats(video.id)) | ||
69 | |||
70 | await Promise.all(promises) | ||
71 | |||
72 | return true | ||
73 | } | ||
74 | |||
75 | private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { | ||
76 | if (ip !== null) { | ||
77 | const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) | ||
78 | if (viewExists) return false | ||
79 | |||
80 | await Redis.Instance.setIPVideoViewer(ip, video.uuid) | ||
81 | } | ||
82 | |||
83 | let watchers = this.viewersPerVideo.get(video.id) | ||
84 | |||
85 | if (!watchers) { | ||
86 | watchers = [] | ||
87 | this.viewersPerVideo.set(video.id, watchers) | ||
88 | } | ||
89 | |||
90 | const expiration = viewerExpires | ||
91 | ? viewerExpires.getTime() | ||
92 | : this.buildViewerExpireTime() | ||
93 | |||
94 | watchers.push(expiration) | ||
95 | await this.notifyClients(video.id, watchers.length) | ||
96 | |||
97 | return true | ||
98 | } | ||
99 | |||
100 | private async cleanViewers () { | ||
101 | logger.info('Cleaning video viewers.', lTags()) | ||
102 | |||
103 | for (const videoId of this.viewersPerVideo.keys()) { | ||
104 | const notBefore = new Date().getTime() | ||
105 | |||
106 | const viewers = this.viewersPerVideo.get(videoId) | ||
107 | |||
108 | // Only keep not expired viewers | ||
109 | const newViewers = viewers.filter(w => w > notBefore) | ||
110 | |||
111 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | ||
112 | else this.viewersPerVideo.set(videoId, newViewers) | ||
113 | |||
114 | await this.notifyClients(videoId, newViewers.length) | ||
115 | } | ||
116 | } | ||
117 | |||
118 | private async notifyClients (videoId: string | number, viewersLength: number) { | ||
119 | const video = await VideoModel.loadImmutableAttributes(videoId) | ||
120 | if (!video) return | ||
121 | |||
122 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | ||
123 | |||
124 | logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) | ||
125 | } | ||
126 | |||
127 | static get Instance () { | ||
128 | return this.instance || (this.instance = new this()) | ||
129 | } | ||
130 | } | ||
diff --git a/server/models/video/formatter/video-format-utils.ts b/server/models/video/formatter/video-format-utils.ts index ba49e41ae..461e296df 100644 --- a/server/models/video/formatter/video-format-utils.ts +++ b/server/models/video/formatter/video-format-utils.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | import { uuidToShort } from '@server/helpers/uuid' | 1 | import { uuidToShort } from '@server/helpers/uuid' |
2 | import { generateMagnetUri } from '@server/helpers/webtorrent' | 2 | import { generateMagnetUri } from '@server/helpers/webtorrent' |
3 | import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' | 3 | import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' |
4 | import { VideoViews } from '@server/lib/video-views' | ||
4 | import { VideosCommonQueryAfterSanitize } from '@shared/models' | 5 | import { VideosCommonQueryAfterSanitize } from '@shared/models' |
5 | import { VideoFile } from '@shared/models/videos/video-file.model' | 6 | import { VideoFile } from '@shared/models/videos/video-file.model' |
6 | import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' | 7 | import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' |
@@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm | |||
121 | pluginData: (video as any).pluginData | 122 | pluginData: (video as any).pluginData |
122 | } | 123 | } |
123 | 124 | ||
125 | if (video.isLive) { | ||
126 | videoObject.viewers = VideoViews.Instance.getViewers(video) | ||
127 | } | ||
128 | |||
124 | const add = options.additionalAttributes | 129 | const add = options.additionalAttributes |
125 | if (add?.state === true) { | 130 | if (add?.state === true) { |
126 | videoObject.state = { | 131 | videoObject.state = { |
diff --git a/server/tests/api/live/live-views.ts b/server/tests/api/live/live-views.ts index 5e3a79c64..9186af8e7 100644 --- a/server/tests/api/live/live-views.ts +++ b/server/tests/api/live/live-views.ts | |||
@@ -19,7 +19,7 @@ import { | |||
19 | 19 | ||
20 | const expect = chai.expect | 20 | const expect = chai.expect |
21 | 21 | ||
22 | describe('Test live', function () { | 22 | describe('Live views', function () { |
23 | let servers: PeerTubeServer[] = [] | 23 | let servers: PeerTubeServer[] = [] |
24 | 24 | ||
25 | before(async function () { | 25 | before(async function () { |
@@ -47,79 +47,86 @@ describe('Test live', function () { | |||
47 | await doubleFollow(servers[0], servers[1]) | 47 | await doubleFollow(servers[0], servers[1]) |
48 | }) | 48 | }) |
49 | 49 | ||
50 | describe('Live views', function () { | 50 | let liveVideoId: string |
51 | let liveVideoId: string | 51 | let command: FfmpegCommand |
52 | let command: FfmpegCommand | ||
53 | 52 | ||
54 | async function countViews (expected: number) { | 53 | async function countViewers (expectedViewers: number) { |
55 | for (const server of servers) { | 54 | for (const server of servers) { |
56 | const video = await server.videos.get({ id: liveVideoId }) | 55 | const video = await server.videos.get({ id: liveVideoId }) |
57 | expect(video.views).to.equal(expected) | 56 | expect(video.viewers).to.equal(expectedViewers) |
58 | } | ||
59 | } | 57 | } |
58 | } | ||
60 | 59 | ||
61 | before(async function () { | 60 | async function countViews (expectedViews: number) { |
62 | this.timeout(30000) | 61 | for (const server of servers) { |
62 | const video = await server.videos.get({ id: liveVideoId }) | ||
63 | expect(video.views).to.equal(expectedViews) | ||
64 | } | ||
65 | } | ||
63 | 66 | ||
64 | const liveAttributes = { | 67 | before(async function () { |
65 | name: 'live video', | 68 | this.timeout(30000) |
66 | channelId: servers[0].store.channel.id, | ||
67 | privacy: VideoPrivacy.PUBLIC | ||
68 | } | ||
69 | 69 | ||
70 | const live = await servers[0].live.create({ fields: liveAttributes }) | 70 | const liveAttributes = { |
71 | liveVideoId = live.uuid | 71 | name: 'live video', |
72 | channelId: servers[0].store.channel.id, | ||
73 | privacy: VideoPrivacy.PUBLIC | ||
74 | } | ||
72 | 75 | ||
73 | command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) | 76 | const live = await servers[0].live.create({ fields: liveAttributes }) |
74 | await waitUntilLivePublishedOnAllServers(servers, liveVideoId) | 77 | liveVideoId = live.uuid |
75 | await waitJobs(servers) | ||
76 | }) | ||
77 | 78 | ||
78 | it('Should display no views for a live', async function () { | 79 | command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) |
79 | await countViews(0) | 80 | await waitUntilLivePublishedOnAllServers(servers, liveVideoId) |
80 | }) | 81 | await waitJobs(servers) |
82 | }) | ||
81 | 83 | ||
82 | it('Should view a live twice and display 1 view', async function () { | 84 | it('Should display no views and viewers for a live', async function () { |
83 | this.timeout(30000) | 85 | await countViews(0) |
86 | await countViewers(0) | ||
87 | }) | ||
84 | 88 | ||
85 | await servers[0].videos.view({ id: liveVideoId }) | 89 | it('Should view a live twice and display 1 view/viewer', async function () { |
86 | await servers[0].videos.view({ id: liveVideoId }) | 90 | this.timeout(30000) |
87 | 91 | ||
88 | await wait(7000) | 92 | await servers[0].videos.view({ id: liveVideoId }) |
93 | await servers[0].videos.view({ id: liveVideoId }) | ||
89 | 94 | ||
90 | await waitJobs(servers) | 95 | await waitJobs(servers) |
96 | await countViewers(1) | ||
91 | 97 | ||
92 | await countViews(1) | 98 | await wait(7000) |
93 | }) | 99 | await countViews(1) |
100 | }) | ||
94 | 101 | ||
95 | it('Should wait and display 0 views', async function () { | 102 | it('Should wait and display 0 viewers while still have 1 view', async function () { |
96 | this.timeout(30000) | 103 | this.timeout(30000) |
97 | 104 | ||
98 | await wait(12000) | 105 | await wait(12000) |
99 | await waitJobs(servers) | 106 | await waitJobs(servers) |
100 | 107 | ||
101 | await countViews(0) | 108 | await countViews(1) |
102 | }) | 109 | await countViewers(0) |
110 | }) | ||
103 | 111 | ||
104 | it('Should view a live on a remote and on local and display 2 views', async function () { | 112 | it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () { |
105 | this.timeout(30000) | 113 | this.timeout(30000) |
106 | 114 | ||
107 | await servers[0].videos.view({ id: liveVideoId }) | 115 | await servers[0].videos.view({ id: liveVideoId }) |
108 | await servers[1].videos.view({ id: liveVideoId }) | 116 | await servers[1].videos.view({ id: liveVideoId }) |
109 | await servers[1].videos.view({ id: liveVideoId }) | 117 | await servers[1].videos.view({ id: liveVideoId }) |
118 | await waitJobs(servers) | ||
110 | 119 | ||
111 | await wait(7000) | 120 | await countViewers(2) |
112 | await waitJobs(servers) | ||
113 | 121 | ||
114 | await countViews(2) | 122 | await wait(7000) |
115 | }) | 123 | await waitJobs(servers) |
116 | 124 | ||
117 | after(async function () { | 125 | await countViews(3) |
118 | await stopFfmpeg(command) | ||
119 | }) | ||
120 | }) | 126 | }) |
121 | 127 | ||
122 | after(async function () { | 128 | after(async function () { |
129 | await stopFfmpeg(command) | ||
123 | await cleanupTests(servers) | 130 | await cleanupTests(servers) |
124 | }) | 131 | }) |
125 | }) | 132 | }) |
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 8c4e01226..5d946f5e8 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts | |||
@@ -56,7 +56,7 @@ describe('Test jobs', function () { | |||
56 | 56 | ||
57 | let job = body.data[0] | 57 | let job = body.data[0] |
58 | // Skip repeat jobs | 58 | // Skip repeat jobs |
59 | if (job.type === 'videos-views') job = body.data[1] | 59 | if (job.type === 'videos-views-stats') job = body.data[1] |
60 | 60 | ||
61 | expect(job.state).to.equal('completed') | 61 | expect(job.state).to.equal('completed') |
62 | expect(job.type.startsWith('activitypub-')).to.be.true | 62 | expect(job.type.startsWith('activitypub-')).to.be.true |