aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-views.ts40
-rw-r--r--server/lib/job-queue/job-queue.ts20
2 files changed, 56 insertions, 4 deletions
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts
new file mode 100644
index 000000000..875d8ab88
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-views.ts
@@ -0,0 +1,40 @@
1import { Redis } from '../../redis'
2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-views'
5
6async function processVideosViewsViews () {
7 const hour = new Date().getHours()
8 const startDate = new Date().setMinutes(0, 0, 0)
9 const endDate = new Date().setMinutes(59, 59, 999)
10
11 const videoIds = await Redis.Instance.getVideosIdViewed(hour)
12 if (videoIds.length === 0) return
13
14 logger.info('Processing videos views in job for hour %d.', hour)
15
16 for (const videoId of videoIds) {
17 const views = await Redis.Instance.getVideoViews(videoId, hour)
18 if (isNaN(views)) {
19 logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour)
20 } else {
21 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
22
23 await VideoModel.incrementViews(videoId, views)
24 await VideoViewModel.create({
25 startDate,
26 endDate,
27 views,
28 videoId
29 })
30 }
31
32 await Redis.Instance.deleteVideoViews(videoId, hour)
33 }
34}
35
36// ---------------------------------------------------------------------------
37
38export {
39 processVideosViewsViews
40}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index ddb357db5..0696ba43c 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -2,7 +2,7 @@ import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { Redis } from '../redis' 4import { Redis } from '../redis'
5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' 5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
@@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' 10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import' 12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
13import { processVideosViewsViews } from './handlers/video-views'
13 14
14type CreateJobArgument = 15type CreateJobArgument =
15 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 16 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -19,7 +20,8 @@ type CreateJobArgument =
19 { type: 'video-file-import', payload: VideoFileImportPayload } | 20 { type: 'video-file-import', payload: VideoFileImportPayload } |
20 { type: 'video-file', payload: VideoFilePayload } | 21 { type: 'video-file', payload: VideoFilePayload } |
21 { type: 'email', payload: EmailPayload } | 22 { type: 'email', payload: EmailPayload } |
22 { type: 'video-import', payload: VideoImportPayload } 23 { type: 'video-import', payload: VideoImportPayload } |
24 { type: 'videos-views', payload: {} }
23 25
24const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 26const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
25 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 27 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
@@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
29 'video-file-import': processVideoFileImport, 31 'video-file-import': processVideoFileImport,
30 'video-file': processVideoFile, 32 'video-file': processVideoFile,
31 'email': processEmail, 33 'email': processEmail,
32 'video-import': processVideoImport 34 'video-import': processVideoImport,
35 'videos-views': processVideosViewsViews
33} 36}
34 37
35const jobTypes: JobType[] = [ 38const jobTypes: JobType[] = [
@@ -40,7 +43,8 @@ const jobTypes: JobType[] = [
40 'email', 43 'email',
41 'video-file', 44 'video-file',
42 'video-file-import', 45 'video-file-import',
43 'video-import' 46 'video-import',
47 'videos-views'
44] 48]
45 49
46class JobQueue { 50class JobQueue {
@@ -85,6 +89,8 @@ class JobQueue {
85 89
86 this.queues[handlerName] = queue 90 this.queues[handlerName] = queue
87 } 91 }
92
93 this.addRepeatableJobs()
88 } 94 }
89 95
90 terminate () { 96 terminate () {
@@ -163,6 +169,12 @@ class JobQueue {
163 } 169 }
164 } 170 }
165 171
172 private addRepeatableJobs () {
173 this.queues['videos-views'].add({}, {
174 repeat: REPEAT_JOBS['videos-views']
175 })
176 }
177
166 static get Instance () { 178 static get Instance () {
167 return this.instance || (this.instance = new this()) 179 return this.instance || (this.instance = new this())
168 } 180 }