diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/process/process-view.ts | 32 | ||||
-rw-r--r-- | server/lib/activitypub/send/send-view.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/videos/updater.ts | 1 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts) | 36 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 12 | ||||
-rw-r--r-- | server/lib/live/live-manager.ts | 49 | ||||
-rw-r--r-- | server/lib/peertube-socket.ts | 8 | ||||
-rw-r--r-- | server/lib/redis.ts | 112 | ||||
-rw-r--r-- | server/lib/schedulers/video-views-buffer-scheduler.ts | 52 | ||||
-rw-r--r-- | server/lib/video-views.ts | 130 |
10 files changed, 302 insertions, 134 deletions
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index 5593ee257..720385f9b 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { getOrCreateAPVideo } from '../videos' | 1 | import { VideoViews } from '@server/lib/video-views' |
2 | import { forwardVideoRelatedActivity } from '../send/utils' | 2 | import { ActivityView } from '../../../../shared/models/activitypub' |
3 | import { Redis } from '../../redis' | ||
4 | import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' | ||
5 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' | 3 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' |
6 | import { MActorSignature } from '../../../types/models' | 4 | import { MActorSignature } from '../../../types/models' |
7 | import { LiveManager } from '@server/lib/live/live-manager' | 5 | import { forwardVideoRelatedActivity } from '../send/utils' |
6 | import { getOrCreateAPVideo } from '../videos' | ||
8 | 7 | ||
9 | async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) { | 8 | async function processViewActivity (options: APProcessorOptions<ActivityView>) { |
10 | const { activity, byActor } = options | 9 | const { activity, byActor } = options |
10 | |||
11 | return processCreateView(activity, byActor) | 11 | return processCreateView(activity, byActor) |
12 | } | 12 | } |
13 | 13 | ||
@@ -19,10 +19,8 @@ export { | |||
19 | 19 | ||
20 | // --------------------------------------------------------------------------- | 20 | // --------------------------------------------------------------------------- |
21 | 21 | ||
22 | async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { | 22 | async function processCreateView (activity: ActivityView, byActor: MActorSignature) { |
23 | const videoObject = activity.type === 'View' | 23 | const videoObject = activity.object |
24 | ? activity.object | ||
25 | : (activity.object as ViewObject).object | ||
26 | 24 | ||
27 | const { video } = await getOrCreateAPVideo({ | 25 | const { video } = await getOrCreateAPVideo({ |
28 | videoObject, | 26 | videoObject, |
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct | |||
30 | allowRefresh: false | 28 | allowRefresh: false |
31 | }) | 29 | }) |
32 | 30 | ||
33 | if (!video.isLive) { | 31 | const viewerExpires = activity.expires |
34 | await Redis.Instance.addVideoView(video.id) | 32 | ? new Date(activity.expires) |
35 | } | 33 | : undefined |
36 | 34 | ||
37 | if (video.isOwned()) { | 35 | await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) |
38 | // Our live manager will increment the counter and send the view to followers | ||
39 | if (video.isLive) { | ||
40 | LiveManager.Instance.addViewTo(video.id) | ||
41 | return | ||
42 | } | ||
43 | 36 | ||
37 | if (video.isOwned()) { | ||
44 | // Forward the view but don't resend the activity to the sender | 38 | // Forward the view but don't resend the activity to the sender |
45 | const exceptions = [ byActor ] | 39 | const exceptions = [ byActor ] |
46 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) | 40 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 153e94295..b12583e26 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { VideoViews } from '@server/lib/video-views' | ||
2 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' | 3 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' |
3 | import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' | 4 | import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' |
4 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU | |||
27 | id: url, | 28 | id: url, |
28 | type: 'View' as 'View', | 29 | type: 'View' as 'View', |
29 | actor: byActor.url, | 30 | actor: byActor.url, |
30 | object: video.url | 31 | object: video.url, |
32 | expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() | ||
31 | }, | 33 | }, |
32 | audience | 34 | audience |
33 | ) | 35 | ) |
diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts index 157569414..f786bb196 100644 --- a/server/lib/activitypub/videos/updater.ts +++ b/server/lib/activitypub/videos/updater.ts | |||
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder { | |||
81 | 81 | ||
82 | if (videoUpdated.isLive) { | 82 | if (videoUpdated.isLive) { |
83 | PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) | 83 | PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) |
84 | PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated) | ||
85 | } | 84 | } |
86 | 85 | ||
87 | logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) | 86 | logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts index 86d0a271f..caf5f6962 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { Redis } from '../../redis' | 1 | import { isTestInstance } from '../../../helpers/core-utils' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { VideoModel } from '../../../models/video/video' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-view' | 4 | import { VideoViewModel } from '../../../models/video/video-view' |
5 | import { isTestInstance } from '../../../helpers/core-utils' | 5 | import { Redis } from '../../redis' |
6 | import { federateVideoIfNeeded } from '../../activitypub/videos' | ||
7 | 6 | ||
8 | async function processVideosViews () { | 7 | async function processVideosViewsStats () { |
9 | const lastHour = new Date() | 8 | const lastHour = new Date() |
10 | 9 | ||
11 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour | 10 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour |
@@ -15,23 +14,23 @@ async function processVideosViews () { | |||
15 | const startDate = lastHour.setMinutes(0, 0, 0) | 14 | const startDate = lastHour.setMinutes(0, 0, 0) |
16 | const endDate = lastHour.setMinutes(59, 59, 999) | 15 | const endDate = lastHour.setMinutes(59, 59, 999) |
17 | 16 | ||
18 | const videoIds = await Redis.Instance.getVideosIdViewed(hour) | 17 | const videoIds = await Redis.Instance.listVideosViewedForStats(hour) |
19 | if (videoIds.length === 0) return | 18 | if (videoIds.length === 0) return |
20 | 19 | ||
21 | logger.info('Processing videos views in job for hour %d.', hour) | 20 | logger.info('Processing videos views stats in job for hour %d.', hour) |
22 | 21 | ||
23 | for (const videoId of videoIds) { | 22 | for (const videoId of videoIds) { |
24 | try { | 23 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 24 | const views = await Redis.Instance.getVideoViewsStats(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | 25 | await Redis.Instance.deleteVideoViewsStats(videoId, hour) |
27 | 26 | ||
28 | if (views) { | 27 | if (views) { |
29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 28 | logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) |
30 | 29 | ||
31 | try { | 30 | try { |
32 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | 31 | const video = await VideoModel.load(videoId) |
33 | if (!video) { | 32 | if (!video) { |
34 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) | 33 | logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) |
35 | continue | 34 | continue |
36 | } | 35 | } |
37 | 36 | ||
@@ -41,21 +40,12 @@ async function processVideosViews () { | |||
41 | views, | 40 | views, |
42 | videoId | 41 | videoId |
43 | }) | 42 | }) |
44 | |||
45 | if (video.isOwned()) { | ||
46 | // If this is a remote video, the origin instance will send us an update | ||
47 | await VideoModel.incrementViews(videoId, views) | ||
48 | |||
49 | // Send video update | ||
50 | video.views += views | ||
51 | await federateVideoIfNeeded(video, false) | ||
52 | } | ||
53 | } catch (err) { | 43 | } catch (err) { |
54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 44 | logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) |
55 | } | 45 | } |
56 | } | 46 | } |
57 | } catch (err) { | 47 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 48 | logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 49 | } |
60 | } | 50 | } |
61 | } | 51 | } |
@@ -63,5 +53,5 @@ async function processVideosViews () { | |||
63 | // --------------------------------------------------------------------------- | 53 | // --------------------------------------------------------------------------- |
64 | 54 | ||
65 | export { | 55 | export { |
66 | processVideosViews | 56 | processVideosViewsStats |
67 | } | 57 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0eab720d9..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import' | |||
36 | import { processVideoImport } from './handlers/video-import' | 36 | import { processVideoImport } from './handlers/video-import' |
37 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 37 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
38 | import { processVideoTranscoding } from './handlers/video-transcoding' | 38 | import { processVideoTranscoding } from './handlers/video-transcoding' |
39 | import { processVideosViews } from './handlers/video-views' | 39 | import { processVideosViewsStats } from './handlers/video-views-stats' |
40 | 40 | ||
41 | type CreateJobArgument = | 41 | type CreateJobArgument = |
42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -49,7 +49,7 @@ type CreateJobArgument = | |||
49 | { type: 'email', payload: EmailPayload } | | 49 | { type: 'email', payload: EmailPayload } | |
50 | { type: 'video-import', payload: VideoImportPayload } | | 50 | { type: 'video-import', payload: VideoImportPayload } | |
51 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 51 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
52 | { type: 'videos-views', payload: {} } | | 52 | { type: 'videos-views-stats', payload: {} } | |
53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | 53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | |
54 | { type: 'actor-keys', payload: ActorKeysPayload } | | 54 | { type: 'actor-keys', payload: ActorKeysPayload } | |
55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | 55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
71 | 'video-transcoding': processVideoTranscoding, | 71 | 'video-transcoding': processVideoTranscoding, |
72 | 'email': processEmail, | 72 | 'email': processEmail, |
73 | 'video-import': processVideoImport, | 73 | 'video-import': processVideoImport, |
74 | 'videos-views': processVideosViews, | 74 | 'videos-views-stats': processVideosViewsStats, |
75 | 'activitypub-refresher': refreshAPObject, | 75 | 'activitypub-refresher': refreshAPObject, |
76 | 'video-live-ending': processVideoLiveEnding, | 76 | 'video-live-ending': processVideoLiveEnding, |
77 | 'actor-keys': processActorKeys, | 77 | 'actor-keys': processActorKeys, |
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [ | |||
89 | 'video-transcoding', | 89 | 'video-transcoding', |
90 | 'video-file-import', | 90 | 'video-file-import', |
91 | 'video-import', | 91 | 'video-import', |
92 | 'videos-views', | 92 | 'videos-views-stats', |
93 | 'activitypub-refresher', | 93 | 'activitypub-refresher', |
94 | 'video-redundancy', | 94 | 'video-redundancy', |
95 | 'actor-keys', | 95 | 'actor-keys', |
@@ -247,8 +247,8 @@ class JobQueue { | |||
247 | } | 247 | } |
248 | 248 | ||
249 | private addRepeatableJobs () { | 249 | private addRepeatableJobs () { |
250 | this.queues['videos-views'].add({}, { | 250 | this.queues['videos-views-stats'].add({}, { |
251 | repeat: REPEAT_JOBS['videos-views'] | 251 | repeat: REPEAT_JOBS['videos-views-stats'] |
252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
253 | 253 | ||
254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1b7b9dd4d..2562edb75 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -2,7 +2,6 @@ | |||
2 | import { readFile } from 'fs-extra' | 2 | import { readFile } from 'fs-extra' |
3 | import { createServer, Server } from 'net' | 3 | import { createServer, Server } from 'net' |
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | 4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' |
5 | import { isTestInstance } from '@server/helpers/core-utils' | ||
6 | import { | 5 | import { |
7 | computeResolutionsToTranscode, | 6 | computeResolutionsToTranscode, |
8 | ffprobePromise, | 7 | ffprobePromise, |
@@ -12,7 +11,7 @@ import { | |||
12 | } from '@server/helpers/ffprobe-utils' | 11 | } from '@server/helpers/ffprobe-utils' |
13 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 12 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
14 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 13 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
15 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' | 14 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' |
16 | import { UserModel } from '@server/models/user/user' | 15 | import { UserModel } from '@server/models/user/user' |
17 | import { VideoModel } from '@server/models/video/video' | 16 | import { VideoModel } from '@server/models/video/video' |
18 | import { VideoLiveModel } from '@server/models/video/video-live' | 17 | import { VideoLiveModel } from '@server/models/video/video-live' |
@@ -53,8 +52,6 @@ class LiveManager { | |||
53 | 52 | ||
54 | private readonly muxingSessions = new Map<string, MuxingSession>() | 53 | private readonly muxingSessions = new Map<string, MuxingSession>() |
55 | private readonly videoSessions = new Map<number, string>() | 54 | private readonly videoSessions = new Map<number, string>() |
56 | // Values are Date().getTime() | ||
57 | private readonly watchersPerVideo = new Map<number, number[]>() | ||
58 | 55 | ||
59 | private rtmpServer: Server | 56 | private rtmpServer: Server |
60 | private rtmpsServer: ServerTLS | 57 | private rtmpsServer: ServerTLS |
@@ -99,8 +96,6 @@ class LiveManager { | |||
99 | // Cleanup broken lives, that were terminated by a server restart for example | 96 | // Cleanup broken lives, that were terminated by a server restart for example |
100 | this.handleBrokenLives() | 97 | this.handleBrokenLives() |
101 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) | 98 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) |
102 | |||
103 | setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) | ||
104 | } | 99 | } |
105 | 100 | ||
106 | async run () { | 101 | async run () { |
@@ -184,19 +179,6 @@ class LiveManager { | |||
184 | this.abortSession(sessionId) | 179 | this.abortSession(sessionId) |
185 | } | 180 | } |
186 | 181 | ||
187 | addViewTo (videoId: number) { | ||
188 | if (this.videoSessions.has(videoId) === false) return | ||
189 | |||
190 | let watchers = this.watchersPerVideo.get(videoId) | ||
191 | |||
192 | if (!watchers) { | ||
193 | watchers = [] | ||
194 | this.watchersPerVideo.set(videoId, watchers) | ||
195 | } | ||
196 | |||
197 | watchers.push(new Date().getTime()) | ||
198 | } | ||
199 | |||
200 | private getContext () { | 182 | private getContext () { |
201 | return context | 183 | return context |
202 | } | 184 | } |
@@ -377,7 +359,6 @@ class LiveManager { | |||
377 | } | 359 | } |
378 | 360 | ||
379 | private onMuxingFFmpegEnd (videoId: number) { | 361 | private onMuxingFFmpegEnd (videoId: number) { |
380 | this.watchersPerVideo.delete(videoId) | ||
381 | this.videoSessions.delete(videoId) | 362 | this.videoSessions.delete(videoId) |
382 | } | 363 | } |
383 | 364 | ||
@@ -411,34 +392,6 @@ class LiveManager { | |||
411 | } | 392 | } |
412 | } | 393 | } |
413 | 394 | ||
414 | private async updateLiveViews () { | ||
415 | if (!this.isRunning()) return | ||
416 | |||
417 | if (!isTestInstance()) logger.info('Updating live video views.', lTags()) | ||
418 | |||
419 | for (const videoId of this.watchersPerVideo.keys()) { | ||
420 | const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE | ||
421 | |||
422 | const watchers = this.watchersPerVideo.get(videoId) | ||
423 | |||
424 | const numWatchers = watchers.length | ||
425 | |||
426 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
427 | video.views = numWatchers | ||
428 | await video.save() | ||
429 | |||
430 | await federateVideoIfNeeded(video, false) | ||
431 | |||
432 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video) | ||
433 | |||
434 | // Only keep not expired watchers | ||
435 | const newWatchers = watchers.filter(w => w > notBefore) | ||
436 | this.watchersPerVideo.set(videoId, newWatchers) | ||
437 | |||
438 | logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) | ||
439 | } | ||
440 | } | ||
441 | |||
442 | private async handleBrokenLives () { | 395 | private async handleBrokenLives () { |
443 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | 396 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() |
444 | 397 | ||
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 901435dea..0398ca61d 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Server as HTTPServer } from 'http' | 1 | import { Server as HTTPServer } from 'http' |
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | 2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' |
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | 3 | import { isIdValid } from '@server/helpers/custom-validators/misc' |
4 | import { MVideo } from '@server/types/models' | 4 | import { MVideo, MVideoImmutable } from '@server/types/models' |
5 | import { UserNotificationModelForApi } from '@server/types/models/user' | 5 | import { UserNotificationModelForApi } from '@server/types/models/user' |
6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | 6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' |
7 | import { logger } from '../helpers/logger' | 7 | import { logger } from '../helpers/logger' |
@@ -78,11 +78,11 @@ class PeerTubeSocket { | |||
78 | .emit(type, data) | 78 | .emit(type, data) |
79 | } | 79 | } |
80 | 80 | ||
81 | sendVideoViewsUpdate (video: MVideo) { | 81 | sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { |
82 | const data: LiveVideoEventPayload = { views: video.views } | 82 | const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } |
83 | const type: LiveVideoEventType = 'views-change' | 83 | const type: LiveVideoEventType = 'views-change' |
84 | 84 | ||
85 | logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) | 85 | logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) |
86 | 86 | ||
87 | this.liveVideosNamespace | 87 | this.liveVideosNamespace |
88 | .in(video.id) | 88 | .in(video.id) |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 46617b07e..76b7868e8 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -13,6 +13,7 @@ import { | |||
13 | RESUMABLE_UPLOAD_SESSION_LIFETIME | 13 | RESUMABLE_UPLOAD_SESSION_LIFETIME |
14 | } from '../initializers/constants' | 14 | } from '../initializers/constants' |
15 | import { CONFIG } from '../initializers/config' | 15 | import { CONFIG } from '../initializers/config' |
16 | import { exists } from '@server/helpers/custom-validators/misc' | ||
16 | 17 | ||
17 | type CachedRoute = { | 18 | type CachedRoute = { |
18 | body: string | 19 | body: string |
@@ -119,16 +120,20 @@ class Redis { | |||
119 | 120 | ||
120 | /* ************ Views per IP ************ */ | 121 | /* ************ Views per IP ************ */ |
121 | 122 | ||
122 | setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { | 123 | setIPVideoView (ip: string, videoUUID: string) { |
123 | const lifetime = isLive | 124 | return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW) |
124 | ? VIEW_LIFETIME.LIVE | 125 | } |
125 | : VIEW_LIFETIME.VIDEO | ||
126 | 126 | ||
127 | return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) | 127 | setIPVideoViewer (ip: string, videoUUID: string) { |
128 | return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER) | ||
128 | } | 129 | } |
129 | 130 | ||
130 | async doesVideoIPViewExist (ip: string, videoUUID: string) { | 131 | async doesVideoIPViewExist (ip: string, videoUUID: string) { |
131 | return this.exists(this.generateViewKey(ip, videoUUID)) | 132 | return this.exists(this.generateIPViewKey(ip, videoUUID)) |
133 | } | ||
134 | |||
135 | async doesVideoIPViewerExist (ip: string, videoUUID: string) { | ||
136 | return this.exists(this.generateIPViewerKey(ip, videoUUID)) | ||
132 | } | 137 | } |
133 | 138 | ||
134 | /* ************ Tracker IP block ************ */ | 139 | /* ************ Tracker IP block ************ */ |
@@ -160,46 +165,85 @@ class Redis { | |||
160 | return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) | 165 | return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) |
161 | } | 166 | } |
162 | 167 | ||
163 | /* ************ Video views ************ */ | 168 | /* ************ Video views stats ************ */ |
164 | 169 | ||
165 | addVideoView (videoId: number) { | 170 | addVideoViewStats (videoId: number) { |
166 | const keyIncr = this.generateVideoViewKey(videoId) | 171 | const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId }) |
167 | const keySet = this.generateVideosViewKey() | ||
168 | 172 | ||
169 | return Promise.all([ | 173 | return Promise.all([ |
170 | this.addToSet(keySet, videoId.toString()), | 174 | this.addToSet(setKey, videoId.toString()), |
171 | this.increment(keyIncr) | 175 | this.increment(videoKey) |
172 | ]) | 176 | ]) |
173 | } | 177 | } |
174 | 178 | ||
175 | async getVideoViews (videoId: number, hour: number) { | 179 | async getVideoViewsStats (videoId: number, hour: number) { |
176 | const key = this.generateVideoViewKey(videoId, hour) | 180 | const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) |
177 | 181 | ||
178 | const valueString = await this.getValue(key) | 182 | const valueString = await this.getValue(videoKey) |
179 | const valueInt = parseInt(valueString, 10) | 183 | const valueInt = parseInt(valueString, 10) |
180 | 184 | ||
181 | if (isNaN(valueInt)) { | 185 | if (isNaN(valueInt)) { |
182 | logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) | 186 | logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) |
183 | return undefined | 187 | return undefined |
184 | } | 188 | } |
185 | 189 | ||
186 | return valueInt | 190 | return valueInt |
187 | } | 191 | } |
188 | 192 | ||
189 | async getVideosIdViewed (hour: number) { | 193 | async listVideosViewedForStats (hour: number) { |
190 | const key = this.generateVideosViewKey(hour) | 194 | const { setKey } = this.generateVideoViewStatsKeys({ hour }) |
191 | 195 | ||
192 | const stringIds = await this.getSet(key) | 196 | const stringIds = await this.getSet(setKey) |
193 | return stringIds.map(s => parseInt(s, 10)) | 197 | return stringIds.map(s => parseInt(s, 10)) |
194 | } | 198 | } |
195 | 199 | ||
196 | deleteVideoViews (videoId: number, hour: number) { | 200 | deleteVideoViewsStats (videoId: number, hour: number) { |
197 | const keySet = this.generateVideosViewKey(hour) | 201 | const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) |
198 | const keyIncr = this.generateVideoViewKey(videoId, hour) | 202 | |
203 | return Promise.all([ | ||
204 | this.deleteFromSet(setKey, videoId.toString()), | ||
205 | this.deleteKey(videoKey) | ||
206 | ]) | ||
207 | } | ||
208 | |||
209 | /* ************ Local video views buffer ************ */ | ||
210 | |||
211 | addLocalVideoView (videoId: number) { | ||
212 | const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId) | ||
199 | 213 | ||
200 | return Promise.all([ | 214 | return Promise.all([ |
201 | this.deleteFromSet(keySet, videoId.toString()), | 215 | this.addToSet(setKey, videoId.toString()), |
202 | this.deleteKey(keyIncr) | 216 | this.increment(videoKey) |
217 | ]) | ||
218 | } | ||
219 | |||
220 | async getLocalVideoViews (videoId: number) { | ||
221 | const { videoKey } = this.generateLocalVideoViewsKeys(videoId) | ||
222 | |||
223 | const valueString = await this.getValue(videoKey) | ||
224 | const valueInt = parseInt(valueString, 10) | ||
225 | |||
226 | if (isNaN(valueInt)) { | ||
227 | logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString) | ||
228 | return undefined | ||
229 | } | ||
230 | |||
231 | return valueInt | ||
232 | } | ||
233 | |||
234 | async listLocalVideosViewed () { | ||
235 | const { setKey } = this.generateLocalVideoViewsKeys() | ||
236 | |||
237 | const stringIds = await this.getSet(setKey) | ||
238 | return stringIds.map(s => parseInt(s, 10)) | ||
239 | } | ||
240 | |||
241 | deleteLocalVideoViews (videoId: number) { | ||
242 | const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId) | ||
243 | |||
244 | return Promise.all([ | ||
245 | this.deleteFromSet(setKey, videoId.toString()), | ||
246 | this.deleteKey(videoKey) | ||
203 | ]) | 247 | ]) |
204 | } | 248 | } |
205 | 249 | ||
@@ -233,16 +277,16 @@ class Redis { | |||
233 | return req.method + '-' + req.originalUrl | 277 | return req.method + '-' + req.originalUrl |
234 | } | 278 | } |
235 | 279 | ||
236 | private generateVideosViewKey (hour?: number) { | 280 | private generateLocalVideoViewsKeys (videoId?: Number) { |
237 | if (!hour) hour = new Date().getHours() | 281 | return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } |
238 | |||
239 | return `videos-view-h${hour}` | ||
240 | } | 282 | } |
241 | 283 | ||
242 | private generateVideoViewKey (videoId: number, hour?: number) { | 284 | private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { |
243 | if (hour === undefined || hour === null) hour = new Date().getHours() | 285 | const hour = exists(options.hour) |
286 | ? options.hour | ||
287 | : new Date().getHours() | ||
244 | 288 | ||
245 | return `video-view-${videoId}-h${hour}` | 289 | return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` } |
246 | } | 290 | } |
247 | 291 | ||
248 | private generateResetPasswordKey (userId: number) { | 292 | private generateResetPasswordKey (userId: number) { |
@@ -253,10 +297,14 @@ class Redis { | |||
253 | return 'verify-email-' + userId | 297 | return 'verify-email-' + userId |
254 | } | 298 | } |
255 | 299 | ||
256 | private generateViewKey (ip: string, videoUUID: string) { | 300 | private generateIPViewKey (ip: string, videoUUID: string) { |
257 | return `views-${videoUUID}-${ip}` | 301 | return `views-${videoUUID}-${ip}` |
258 | } | 302 | } |
259 | 303 | ||
304 | private generateIPViewerKey (ip: string, videoUUID: string) { | ||
305 | return `viewer-${videoUUID}-${ip}` | ||
306 | } | ||
307 | |||
260 | private generateTrackerBlockIPKey (ip: string) { | 308 | private generateTrackerBlockIPKey (ip: string) { |
261 | return `tracker-block-ip-${ip}` | 309 | return `tracker-block-ip-${ip}` |
262 | } | 310 | } |
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts new file mode 100644 index 000000000..c0e72c461 --- /dev/null +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts | |||
@@ -0,0 +1,52 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VideoModel } from '@server/models/video/video' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { federateVideoIfNeeded } from '../activitypub/videos' | ||
5 | import { Redis } from '../redis' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViewsBufferScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const videoIds = await Redis.Instance.listLocalVideosViewed() | ||
22 | if (videoIds.length === 0) return | ||
23 | |||
24 | logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) | ||
25 | |||
26 | for (const videoId of videoIds) { | ||
27 | try { | ||
28 | const views = await Redis.Instance.getLocalVideoViews(videoId) | ||
29 | await Redis.Instance.deleteLocalVideoViews(videoId) | ||
30 | |||
31 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
32 | if (!video) { | ||
33 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) | ||
34 | continue | ||
35 | } | ||
36 | |||
37 | // If this is a remote video, the origin instance will send us an update | ||
38 | await VideoModel.incrementViews(videoId, views) | ||
39 | |||
40 | // Send video update | ||
41 | video.views += views | ||
42 | await federateVideoIfNeeded(video, false) | ||
43 | } catch (err) { | ||
44 | logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) | ||
45 | } | ||
46 | } | ||
47 | } | ||
48 | |||
49 | static get Instance () { | ||
50 | return this.instance || (this.instance = new this()) | ||
51 | } | ||
52 | } | ||
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts new file mode 100644 index 000000000..220b509c2 --- /dev/null +++ b/server/lib/video-views.ts | |||
@@ -0,0 +1,130 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VIEW_LIFETIME } from '@server/initializers/constants' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { PeerTubeSocket } from './peertube-socket' | ||
6 | import { Redis } from './redis' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViews { | ||
11 | |||
12 | // Values are Date().getTime() | ||
13 | private readonly viewersPerVideo = new Map<number, number[]>() | ||
14 | |||
15 | private static instance: VideoViews | ||
16 | |||
17 | private constructor () { | ||
18 | } | ||
19 | |||
20 | init () { | ||
21 | setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) | ||
22 | } | ||
23 | |||
24 | async processView (options: { | ||
25 | video: MVideo | ||
26 | ip: string | null | ||
27 | viewerExpires?: Date | ||
28 | }) { | ||
29 | const { video, ip, viewerExpires } = options | ||
30 | |||
31 | logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) | ||
32 | |||
33 | let success = await this.addView(video, ip) | ||
34 | |||
35 | if (video.isLive) { | ||
36 | const successViewer = await this.addViewer(video, ip, viewerExpires) | ||
37 | success ||= successViewer | ||
38 | } | ||
39 | |||
40 | return success | ||
41 | } | ||
42 | |||
43 | getViewers (video: MVideo) { | ||
44 | const viewers = this.viewersPerVideo.get(video.id) | ||
45 | if (!viewers) return 0 | ||
46 | |||
47 | return viewers.length | ||
48 | } | ||
49 | |||
50 | buildViewerExpireTime () { | ||
51 | return new Date().getTime() + VIEW_LIFETIME.VIEWER | ||
52 | } | ||
53 | |||
54 | private async addView (video: MVideo, ip: string | null) { | ||
55 | const promises: Promise<any>[] = [] | ||
56 | |||
57 | if (ip !== null) { | ||
58 | const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) | ||
59 | if (viewExists) return false | ||
60 | |||
61 | promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) | ||
62 | } | ||
63 | |||
64 | if (video.isOwned()) { | ||
65 | promises.push(Redis.Instance.addLocalVideoView(video.id)) | ||
66 | } | ||
67 | |||
68 | promises.push(Redis.Instance.addVideoViewStats(video.id)) | ||
69 | |||
70 | await Promise.all(promises) | ||
71 | |||
72 | return true | ||
73 | } | ||
74 | |||
75 | private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { | ||
76 | if (ip !== null) { | ||
77 | const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) | ||
78 | if (viewExists) return false | ||
79 | |||
80 | await Redis.Instance.setIPVideoViewer(ip, video.uuid) | ||
81 | } | ||
82 | |||
83 | let watchers = this.viewersPerVideo.get(video.id) | ||
84 | |||
85 | if (!watchers) { | ||
86 | watchers = [] | ||
87 | this.viewersPerVideo.set(video.id, watchers) | ||
88 | } | ||
89 | |||
90 | const expiration = viewerExpires | ||
91 | ? viewerExpires.getTime() | ||
92 | : this.buildViewerExpireTime() | ||
93 | |||
94 | watchers.push(expiration) | ||
95 | await this.notifyClients(video.id, watchers.length) | ||
96 | |||
97 | return true | ||
98 | } | ||
99 | |||
100 | private async cleanViewers () { | ||
101 | logger.info('Cleaning video viewers.', lTags()) | ||
102 | |||
103 | for (const videoId of this.viewersPerVideo.keys()) { | ||
104 | const notBefore = new Date().getTime() | ||
105 | |||
106 | const viewers = this.viewersPerVideo.get(videoId) | ||
107 | |||
108 | // Only keep not expired viewers | ||
109 | const newViewers = viewers.filter(w => w > notBefore) | ||
110 | |||
111 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | ||
112 | else this.viewersPerVideo.set(videoId, newViewers) | ||
113 | |||
114 | await this.notifyClients(videoId, newViewers.length) | ||
115 | } | ||
116 | } | ||
117 | |||
118 | private async notifyClients (videoId: string | number, viewersLength: number) { | ||
119 | const video = await VideoModel.loadImmutableAttributes(videoId) | ||
120 | if (!video) return | ||
121 | |||
122 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | ||
123 | |||
124 | logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) | ||
125 | } | ||
126 | |||
127 | static get Instance () { | ||
128 | return this.instance || (this.instance = new this()) | ||
129 | } | ||
130 | } | ||