From 6b6168606bc86430f6b7821c9d5f1c80d0425ebf Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 29 Aug 2018 16:26:25 +0200 Subject: Bufferize videos views in redis --- server/lib/job-queue/handlers/video-views.ts | 40 ++++++++++++++++++++++++++++ server/lib/job-queue/job-queue.ts | 20 +++++++++++--- 2 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 server/lib/job-queue/handlers/video-views.ts (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts new file mode 100644 index 000000000..875d8ab88 --- /dev/null +++ b/server/lib/job-queue/handlers/video-views.ts @@ -0,0 +1,40 @@ +import { Redis } from '../../redis' +import { logger } from '../../../helpers/logger' +import { VideoModel } from '../../../models/video/video' +import { VideoViewModel } from '../../../models/video/video-views' + +async function processVideosViewsViews () { + const hour = new Date().getHours() + const startDate = new Date().setMinutes(0, 0, 0) + const endDate = new Date().setMinutes(59, 59, 999) + + const videoIds = await Redis.Instance.getVideosIdViewed(hour) + if (videoIds.length === 0) return + + logger.info('Processing videos views in job for hour %d.', hour) + + for (const videoId of videoIds) { + const views = await Redis.Instance.getVideoViews(videoId, hour) + if (isNaN(views)) { + logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour) + } else { + logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) + + await VideoModel.incrementViews(videoId, views) + await VideoViewModel.create({ + startDate, + endDate, + views, + videoId + }) + } + + await Redis.Instance.deleteVideoViews(videoId, hour) + } +} + +// --------------------------------------------------------------------------- + +export { + processVideosViewsViews +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ddb357db5..0696ba43c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -2,7 +2,7 @@ import * as Bull from 'bull' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' import { Redis } from '../redis' -import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' +import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' @@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email' import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' import { processVideoImport, VideoImportPayload } from './handlers/video-import' +import { processVideosViewsViews } from './handlers/video-views' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -19,7 +20,8 @@ type CreateJobArgument = { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } | - { type: 'video-import', payload: VideoImportPayload } + { type: 'video-import', payload: VideoImportPayload } | + { type: 'videos-views', payload: {} } const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, @@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'video-file-import': processVideoFileImport, 'video-file': processVideoFile, 'email': processEmail, - 'video-import': processVideoImport + 'video-import': processVideoImport, + 'videos-views': processVideosViewsViews } const jobTypes: JobType[] = [ @@ -40,7 +43,8 @@ const jobTypes: JobType[] = [ 'email', 'video-file', 'video-file-import', - 'video-import' + 'video-import', + 'videos-views' ] class JobQueue { @@ -85,6 +89,8 @@ class JobQueue { this.queues[handlerName] = queue } + + this.addRepeatableJobs() } terminate () { @@ -163,6 +169,12 @@ class JobQueue { } } + private addRepeatableJobs () { + this.queues['videos-views'].add({}, { + repeat: REPEAT_JOBS['videos-views'] + }) + } + static get Instance () { return this.instance || (this.instance = new this()) } -- cgit v1.2.3