diff options
author | Chocobozzz <me@florianbigard.com> | 2021-11-09 10:11:20 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2021-11-09 15:00:31 +0100 |
commit | 51353d9a035fb6b81f903a8b5f391292841649fd (patch) | |
tree | 75acb6eea5e043bf2e15a6a5a92e9a3c5967b156 /server/lib/job-queue | |
parent | 221ee1adc916684d4881d2a9c4c01954dcde986e (diff) | |
download | PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.gz PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.tar.zst PeerTube-51353d9a035fb6b81f903a8b5f391292841649fd.zip |
Refactor video views
Introduce viewers attribute for live videos
Count views for live videos
Reduce delay to see the viewer update for lives
Add ability to configure video views buffer interval and view ip
expiration
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-views-stats.ts (renamed from server/lib/job-queue/handlers/video-views.ts) | 36 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 12 |
2 files changed, 19 insertions, 29 deletions
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts index 86d0a271f..caf5f6962 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { Redis } from '../../redis' | 1 | import { isTestInstance } from '../../../helpers/core-utils' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { VideoModel } from '../../../models/video/video' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-view' | 4 | import { VideoViewModel } from '../../../models/video/video-view' |
5 | import { isTestInstance } from '../../../helpers/core-utils' | 5 | import { Redis } from '../../redis' |
6 | import { federateVideoIfNeeded } from '../../activitypub/videos' | ||
7 | 6 | ||
8 | async function processVideosViews () { | 7 | async function processVideosViewsStats () { |
9 | const lastHour = new Date() | 8 | const lastHour = new Date() |
10 | 9 | ||
11 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour | 10 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour |
@@ -15,23 +14,23 @@ async function processVideosViews () { | |||
15 | const startDate = lastHour.setMinutes(0, 0, 0) | 14 | const startDate = lastHour.setMinutes(0, 0, 0) |
16 | const endDate = lastHour.setMinutes(59, 59, 999) | 15 | const endDate = lastHour.setMinutes(59, 59, 999) |
17 | 16 | ||
18 | const videoIds = await Redis.Instance.getVideosIdViewed(hour) | 17 | const videoIds = await Redis.Instance.listVideosViewedForStats(hour) |
19 | if (videoIds.length === 0) return | 18 | if (videoIds.length === 0) return |
20 | 19 | ||
21 | logger.info('Processing videos views in job for hour %d.', hour) | 20 | logger.info('Processing videos views stats in job for hour %d.', hour) |
22 | 21 | ||
23 | for (const videoId of videoIds) { | 22 | for (const videoId of videoIds) { |
24 | try { | 23 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 24 | const views = await Redis.Instance.getVideoViewsStats(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | 25 | await Redis.Instance.deleteVideoViewsStats(videoId, hour) |
27 | 26 | ||
28 | if (views) { | 27 | if (views) { |
29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 28 | logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) |
30 | 29 | ||
31 | try { | 30 | try { |
32 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | 31 | const video = await VideoModel.load(videoId) |
33 | if (!video) { | 32 | if (!video) { |
34 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) | 33 | logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) |
35 | continue | 34 | continue |
36 | } | 35 | } |
37 | 36 | ||
@@ -41,21 +40,12 @@ async function processVideosViews () { | |||
41 | views, | 40 | views, |
42 | videoId | 41 | videoId |
43 | }) | 42 | }) |
44 | |||
45 | if (video.isOwned()) { | ||
46 | // If this is a remote video, the origin instance will send us an update | ||
47 | await VideoModel.incrementViews(videoId, views) | ||
48 | |||
49 | // Send video update | ||
50 | video.views += views | ||
51 | await federateVideoIfNeeded(video, false) | ||
52 | } | ||
53 | } catch (err) { | 43 | } catch (err) { |
54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 44 | logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) |
55 | } | 45 | } |
56 | } | 46 | } |
57 | } catch (err) { | 47 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 48 | logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 49 | } |
60 | } | 50 | } |
61 | } | 51 | } |
@@ -63,5 +53,5 @@ async function processVideosViews () { | |||
63 | // --------------------------------------------------------------------------- | 53 | // --------------------------------------------------------------------------- |
64 | 54 | ||
65 | export { | 55 | export { |
66 | processVideosViews | 56 | processVideosViewsStats |
67 | } | 57 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0eab720d9..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import' | |||
36 | import { processVideoImport } from './handlers/video-import' | 36 | import { processVideoImport } from './handlers/video-import' |
37 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 37 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
38 | import { processVideoTranscoding } from './handlers/video-transcoding' | 38 | import { processVideoTranscoding } from './handlers/video-transcoding' |
39 | import { processVideosViews } from './handlers/video-views' | 39 | import { processVideosViewsStats } from './handlers/video-views-stats' |
40 | 40 | ||
41 | type CreateJobArgument = | 41 | type CreateJobArgument = |
42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 42 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -49,7 +49,7 @@ type CreateJobArgument = | |||
49 | { type: 'email', payload: EmailPayload } | | 49 | { type: 'email', payload: EmailPayload } | |
50 | { type: 'video-import', payload: VideoImportPayload } | | 50 | { type: 'video-import', payload: VideoImportPayload } | |
51 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 51 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
52 | { type: 'videos-views', payload: {} } | | 52 | { type: 'videos-views-stats', payload: {} } | |
53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | 53 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | |
54 | { type: 'actor-keys', payload: ActorKeysPayload } | | 54 | { type: 'actor-keys', payload: ActorKeysPayload } | |
55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | | 55 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
71 | 'video-transcoding': processVideoTranscoding, | 71 | 'video-transcoding': processVideoTranscoding, |
72 | 'email': processEmail, | 72 | 'email': processEmail, |
73 | 'video-import': processVideoImport, | 73 | 'video-import': processVideoImport, |
74 | 'videos-views': processVideosViews, | 74 | 'videos-views-stats': processVideosViewsStats, |
75 | 'activitypub-refresher': refreshAPObject, | 75 | 'activitypub-refresher': refreshAPObject, |
76 | 'video-live-ending': processVideoLiveEnding, | 76 | 'video-live-ending': processVideoLiveEnding, |
77 | 'actor-keys': processActorKeys, | 77 | 'actor-keys': processActorKeys, |
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [ | |||
89 | 'video-transcoding', | 89 | 'video-transcoding', |
90 | 'video-file-import', | 90 | 'video-file-import', |
91 | 'video-import', | 91 | 'video-import', |
92 | 'videos-views', | 92 | 'videos-views-stats', |
93 | 'activitypub-refresher', | 93 | 'activitypub-refresher', |
94 | 'video-redundancy', | 94 | 'video-redundancy', |
95 | 'actor-keys', | 95 | 'actor-keys', |
@@ -247,8 +247,8 @@ class JobQueue { | |||
247 | } | 247 | } |
248 | 248 | ||
249 | private addRepeatableJobs () { | 249 | private addRepeatableJobs () { |
250 | this.queues['videos-views'].add({}, { | 250 | this.queues['videos-views-stats'].add({}, { |
251 | repeat: REPEAT_JOBS['videos-views'] | 251 | repeat: REPEAT_JOBS['videos-views-stats'] |
252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 252 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
253 | 253 | ||
254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 254 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |