diff options
author | Chocobozzz <me@florianbigard.com> | 2018-08-29 16:26:25 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-08-30 15:03:18 +0200 |
commit | 6b6168606bc86430f6b7821c9d5f1c80d0425ebf (patch) | |
tree | 9aea6cf0875c9fee30c373eb4924b12d47d1e23c /server/lib/job-queue | |
parent | 2d9fea161fd4fc73994fc77951bafdccdc2071fd (diff) | |
download | PeerTube-6b6168606bc86430f6b7821c9d5f1c80d0425ebf.tar.gz PeerTube-6b6168606bc86430f6b7821c9d5f1c80d0425ebf.tar.zst PeerTube-6b6168606bc86430f6b7821c9d5f1c80d0425ebf.zip |
Bufferize videos views in redis
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-views.ts | 40 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 20 |
2 files changed, 56 insertions, 4 deletions
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts new file mode 100644 index 000000000..875d8ab88 --- /dev/null +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -0,0 +1,40 @@ | |||
1 | import { Redis } from '../../redis' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { VideoModel } from '../../../models/video/video' | ||
4 | import { VideoViewModel } from '../../../models/video/video-views' | ||
5 | |||
6 | async function processVideosViewsViews () { | ||
7 | const hour = new Date().getHours() | ||
8 | const startDate = new Date().setMinutes(0, 0, 0) | ||
9 | const endDate = new Date().setMinutes(59, 59, 999) | ||
10 | |||
11 | const videoIds = await Redis.Instance.getVideosIdViewed(hour) | ||
12 | if (videoIds.length === 0) return | ||
13 | |||
14 | logger.info('Processing videos views in job for hour %d.', hour) | ||
15 | |||
16 | for (const videoId of videoIds) { | ||
17 | const views = await Redis.Instance.getVideoViews(videoId, hour) | ||
18 | if (isNaN(views)) { | ||
19 | logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour) | ||
20 | } else { | ||
21 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | ||
22 | |||
23 | await VideoModel.incrementViews(videoId, views) | ||
24 | await VideoViewModel.create({ | ||
25 | startDate, | ||
26 | endDate, | ||
27 | views, | ||
28 | videoId | ||
29 | }) | ||
30 | } | ||
31 | |||
32 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
33 | } | ||
34 | } | ||
35 | |||
36 | // --------------------------------------------------------------------------- | ||
37 | |||
38 | export { | ||
39 | processVideosViewsViews | ||
40 | } | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ddb357db5..0696ba43c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -2,7 +2,7 @@ import * as Bull from 'bull' | |||
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { Redis } from '../redis' | 4 | import { Redis } from '../redis' |
5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' | 5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' |
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
@@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email' | |||
10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' | 10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' | 12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' |
13 | import { processVideosViewsViews } from './handlers/video-views' | ||
13 | 14 | ||
14 | type CreateJobArgument = | 15 | type CreateJobArgument = |
15 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 16 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -19,7 +20,8 @@ type CreateJobArgument = | |||
19 | { type: 'video-file-import', payload: VideoFileImportPayload } | | 20 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
20 | { type: 'video-file', payload: VideoFilePayload } | | 21 | { type: 'video-file', payload: VideoFilePayload } | |
21 | { type: 'email', payload: EmailPayload } | | 22 | { type: 'email', payload: EmailPayload } | |
22 | { type: 'video-import', payload: VideoImportPayload } | 23 | { type: 'video-import', payload: VideoImportPayload } | |
24 | { type: 'videos-views', payload: {} } | ||
23 | 25 | ||
24 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 26 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
25 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 27 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
@@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
29 | 'video-file-import': processVideoFileImport, | 31 | 'video-file-import': processVideoFileImport, |
30 | 'video-file': processVideoFile, | 32 | 'video-file': processVideoFile, |
31 | 'email': processEmail, | 33 | 'email': processEmail, |
32 | 'video-import': processVideoImport | 34 | 'video-import': processVideoImport, |
35 | 'videos-views': processVideosViewsViews | ||
33 | } | 36 | } |
34 | 37 | ||
35 | const jobTypes: JobType[] = [ | 38 | const jobTypes: JobType[] = [ |
@@ -40,7 +43,8 @@ const jobTypes: JobType[] = [ | |||
40 | 'email', | 43 | 'email', |
41 | 'video-file', | 44 | 'video-file', |
42 | 'video-file-import', | 45 | 'video-file-import', |
43 | 'video-import' | 46 | 'video-import', |
47 | 'videos-views' | ||
44 | ] | 48 | ] |
45 | 49 | ||
46 | class JobQueue { | 50 | class JobQueue { |
@@ -85,6 +89,8 @@ class JobQueue { | |||
85 | 89 | ||
86 | this.queues[handlerName] = queue | 90 | this.queues[handlerName] = queue |
87 | } | 91 | } |
92 | |||
93 | this.addRepeatableJobs() | ||
88 | } | 94 | } |
89 | 95 | ||
90 | terminate () { | 96 | terminate () { |
@@ -163,6 +169,12 @@ class JobQueue { | |||
163 | } | 169 | } |
164 | } | 170 | } |
165 | 171 | ||
172 | private addRepeatableJobs () { | ||
173 | this.queues['videos-views'].add({}, { | ||
174 | repeat: REPEAT_JOBS['videos-views'] | ||
175 | }) | ||
176 | } | ||
177 | |||
166 | static get Instance () { | 178 | static get Instance () { |
167 | return this.instance || (this.instance = new this()) | 179 | return this.instance || (this.instance = new this()) |
168 | } | 180 | } |