aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-08-29 16:26:25 +0200
committerChocobozzz <me@florianbigard.com>2018-08-30 15:03:18 +0200
commit6b6168606bc86430f6b7821c9d5f1c80d0425ebf (patch)
tree9aea6cf0875c9fee30c373eb4924b12d47d1e23c /server/lib
parent2d9fea161fd4fc73994fc77951bafdccdc2071fd (diff)
downloadPeerTube-6b6168606bc86430f6b7821c9d5f1c80d0425ebf.tar.gz
PeerTube-6b6168606bc86430f6b7821c9d5f1c80d0425ebf.tar.zst
PeerTube-6b6168606bc86430f6b7821c9d5f1c80d0425ebf.zip
Bufferize videos views in redis
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/process/process-create.ts4
-rw-r--r--server/lib/job-queue/handlers/video-views.ts40
-rw-r--r--server/lib/job-queue/job-queue.ts20
-rw-r--r--server/lib/redis.ts88
4 files changed, 144 insertions, 8 deletions
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts
index 75f07d131..16f426e23 100644
--- a/server/lib/activitypub/process/process-create.ts
+++ b/server/lib/activitypub/process/process-create.ts
@@ -7,11 +7,11 @@ import { sequelizeTypescript } from '../../../initializers'
7import { AccountVideoRateModel } from '../../../models/account/account-video-rate' 7import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
8import { ActorModel } from '../../../models/activitypub/actor' 8import { ActorModel } from '../../../models/activitypub/actor'
9import { VideoAbuseModel } from '../../../models/video/video-abuse' 9import { VideoAbuseModel } from '../../../models/video/video-abuse'
10import { VideoCommentModel } from '../../../models/video/video-comment'
11import { getOrCreateActorAndServerAndModel } from '../actor' 10import { getOrCreateActorAndServerAndModel } from '../actor'
12import { addVideoComment, resolveThread } from '../video-comments' 11import { addVideoComment, resolveThread } from '../video-comments'
13import { getOrCreateVideoAndAccountAndChannel } from '../videos' 12import { getOrCreateVideoAndAccountAndChannel } from '../videos'
14import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' 13import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils'
14import { Redis } from '../../redis'
15 15
16async function processCreateActivity (activity: ActivityCreate) { 16async function processCreateActivity (activity: ActivityCreate) {
17 const activityObject = activity.object 17 const activityObject = activity.object
@@ -88,7 +88,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate)
88 const actor = await ActorModel.loadByUrl(view.actor) 88 const actor = await ActorModel.loadByUrl(view.actor)
89 if (!actor) throw new Error('Unknown actor ' + view.actor) 89 if (!actor) throw new Error('Unknown actor ' + view.actor)
90 90
91 await video.increment('views') 91 await Redis.Instance.addVideoView(video.id)
92 92
93 if (video.isOwned()) { 93 if (video.isOwned()) {
94 // Don't resend the activity to the sender 94 // Don't resend the activity to the sender
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts
new file mode 100644
index 000000000..875d8ab88
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-views.ts
@@ -0,0 +1,40 @@
1import { Redis } from '../../redis'
2import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-views'
5
6async function processVideosViewsViews () {
7 const hour = new Date().getHours()
8 const startDate = new Date().setMinutes(0, 0, 0)
9 const endDate = new Date().setMinutes(59, 59, 999)
10
11 const videoIds = await Redis.Instance.getVideosIdViewed(hour)
12 if (videoIds.length === 0) return
13
14 logger.info('Processing videos views in job for hour %d.', hour)
15
16 for (const videoId of videoIds) {
17 const views = await Redis.Instance.getVideoViews(videoId, hour)
18 if (isNaN(views)) {
19 logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour)
20 } else {
21 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
22
23 await VideoModel.incrementViews(videoId, views)
24 await VideoViewModel.create({
25 startDate,
26 endDate,
27 views,
28 videoId
29 })
30 }
31
32 await Redis.Instance.deleteVideoViews(videoId, hour)
33 }
34}
35
36// ---------------------------------------------------------------------------
37
38export {
39 processVideosViewsViews
40}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index ddb357db5..0696ba43c 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -2,7 +2,7 @@ import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { Redis } from '../redis' 4import { Redis } from '../redis'
5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' 5import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
@@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' 10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import' 12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
13import { processVideosViewsViews } from './handlers/video-views'
13 14
14type CreateJobArgument = 15type CreateJobArgument =
15 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 16 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -19,7 +20,8 @@ type CreateJobArgument =
19 { type: 'video-file-import', payload: VideoFileImportPayload } | 20 { type: 'video-file-import', payload: VideoFileImportPayload } |
20 { type: 'video-file', payload: VideoFilePayload } | 21 { type: 'video-file', payload: VideoFilePayload } |
21 { type: 'email', payload: EmailPayload } | 22 { type: 'email', payload: EmailPayload } |
22 { type: 'video-import', payload: VideoImportPayload } 23 { type: 'video-import', payload: VideoImportPayload } |
24 { type: 'videos-views', payload: {} }
23 25
24const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 26const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
25 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 27 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
@@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
29 'video-file-import': processVideoFileImport, 31 'video-file-import': processVideoFileImport,
30 'video-file': processVideoFile, 32 'video-file': processVideoFile,
31 'email': processEmail, 33 'email': processEmail,
32 'video-import': processVideoImport 34 'video-import': processVideoImport,
35 'videos-views': processVideosViewsViews
33} 36}
34 37
35const jobTypes: JobType[] = [ 38const jobTypes: JobType[] = [
@@ -40,7 +43,8 @@ const jobTypes: JobType[] = [
40 'email', 43 'email',
41 'video-file', 44 'video-file',
42 'video-file-import', 45 'video-file-import',
43 'video-import' 46 'video-import',
47 'videos-views'
44] 48]
45 49
46class JobQueue { 50class JobQueue {
@@ -85,6 +89,8 @@ class JobQueue {
85 89
86 this.queues[handlerName] = queue 90 this.queues[handlerName] = queue
87 } 91 }
92
93 this.addRepeatableJobs()
88 } 94 }
89 95
90 terminate () { 96 terminate () {
@@ -163,6 +169,12 @@ class JobQueue {
163 } 169 }
164 } 170 }
165 171
172 private addRepeatableJobs () {
173 this.queues['videos-views'].add({}, {
174 repeat: REPEAT_JOBS['videos-views']
175 })
176 }
177
166 static get Instance () { 178 static get Instance () {
167 return this.instance || (this.instance = new this()) 179 return this.instance || (this.instance = new this())
168 } 180 }
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 941f7d557..0b4b41e4e 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -60,11 +60,11 @@ class Redis {
60 return this.getValue(this.generateResetPasswordKey(userId)) 60 return this.getValue(this.generateResetPasswordKey(userId))
61 } 61 }
62 62
63 setView (ip: string, videoUUID: string) { 63 setIPVideoView (ip: string, videoUUID: string) {
64 return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) 64 return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME)
65 } 65 }
66 66
67 async isViewExists (ip: string, videoUUID: string) { 67 async isVideoIPViewExists (ip: string, videoUUID: string) {
68 return this.exists(this.buildViewKey(ip, videoUUID)) 68 return this.exists(this.buildViewKey(ip, videoUUID))
69 } 69 }
70 70
@@ -85,6 +85,52 @@ class Redis {
85 return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) 85 return this.setObject(this.buildCachedRouteKey(req), cached, lifetime)
86 } 86 }
87 87
88 addVideoView (videoId: number) {
89 const keyIncr = this.generateVideoViewKey(videoId)
90 const keySet = this.generateVideosViewKey()
91
92 return Promise.all([
93 this.addToSet(keySet, videoId.toString()),
94 this.increment(keyIncr)
95 ])
96 }
97
98 async getVideoViews (videoId: number, hour: number) {
99 const key = this.generateVideoViewKey(videoId, hour)
100
101 const valueString = await this.getValue(key)
102 return parseInt(valueString, 10)
103 }
104
105 async getVideosIdViewed (hour: number) {
106 const key = this.generateVideosViewKey(hour)
107
108 const stringIds = await this.getSet(key)
109 return stringIds.map(s => parseInt(s, 10))
110 }
111
112 deleteVideoViews (videoId: number, hour: number) {
113 const keySet = this.generateVideosViewKey(hour)
114 const keyIncr = this.generateVideoViewKey(videoId, hour)
115
116 return Promise.all([
117 this.deleteFromSet(keySet, videoId.toString()),
118 this.deleteKey(keyIncr)
119 ])
120 }
121
122 generateVideosViewKey (hour?: number) {
123 if (!hour) hour = new Date().getHours()
124
125 return `videos-view-h${hour}`
126 }
127
128 generateVideoViewKey (videoId: number, hour?: number) {
129 if (!hour) hour = new Date().getHours()
130
131 return `video-view-${videoId}-h${hour}`
132 }
133
88 generateResetPasswordKey (userId: number) { 134 generateResetPasswordKey (userId: number) {
89 return 'reset-password-' + userId 135 return 'reset-password-' + userId
90 } 136 }
@@ -107,6 +153,34 @@ class Redis {
107 }) 153 })
108 } 154 }
109 155
156 private getSet (key: string) {
157 return new Promise<string[]>((res, rej) => {
158 this.client.smembers(this.prefix + key, (err, value) => {
159 if (err) return rej(err)
160
161 return res(value)
162 })
163 })
164 }
165
166 private addToSet (key: string, value: string) {
167 return new Promise<string[]>((res, rej) => {
168 this.client.sadd(this.prefix + key, value, err => err ? rej(err) : res())
169 })
170 }
171
172 private deleteFromSet (key: string, value: string) {
173 return new Promise<void>((res, rej) => {
174 this.client.srem(this.prefix + key, value, err => err ? rej(err) : res())
175 })
176 }
177
178 private deleteKey (key: string) {
179 return new Promise<void>((res, rej) => {
180 this.client.del(this.prefix + key, err => err ? rej(err) : res())
181 })
182 }
183
110 private setValue (key: string, value: string, expirationMilliseconds: number) { 184 private setValue (key: string, value: string, expirationMilliseconds: number) {
111 return new Promise<void>((res, rej) => { 185 return new Promise<void>((res, rej) => {
112 this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => { 186 this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => {
@@ -145,6 +219,16 @@ class Redis {
145 }) 219 })
146 } 220 }
147 221
222 private increment (key: string) {
223 return new Promise<number>((res, rej) => {
224 this.client.incr(this.prefix + key, (err, value) => {
225 if (err) return rej(err)
226
227 return res(value)
228 })
229 })
230 }
231
148 private exists (key: string) { 232 private exists (key: string) {
149 return new Promise<boolean>((res, rej) => { 233 return new Promise<boolean>((res, rej) => {
150 this.client.exists(this.prefix + key, (err, existsNumber) => { 234 this.client.exists(this.prefix + key, (err, existsNumber) => {