diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/process/process-create.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views.ts | 40 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 20 | ||||
-rw-r--r-- | server/lib/redis.ts | 88 |
4 files changed, 144 insertions, 8 deletions
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index 75f07d131..16f426e23 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts | |||
@@ -7,11 +7,11 @@ import { sequelizeTypescript } from '../../../initializers' | |||
7 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | 7 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' |
8 | import { ActorModel } from '../../../models/activitypub/actor' | 8 | import { ActorModel } from '../../../models/activitypub/actor' |
9 | import { VideoAbuseModel } from '../../../models/video/video-abuse' | 9 | import { VideoAbuseModel } from '../../../models/video/video-abuse' |
10 | import { VideoCommentModel } from '../../../models/video/video-comment' | ||
11 | import { getOrCreateActorAndServerAndModel } from '../actor' | 10 | import { getOrCreateActorAndServerAndModel } from '../actor' |
12 | import { addVideoComment, resolveThread } from '../video-comments' | 11 | import { addVideoComment, resolveThread } from '../video-comments' |
13 | import { getOrCreateVideoAndAccountAndChannel } from '../videos' | 12 | import { getOrCreateVideoAndAccountAndChannel } from '../videos' |
14 | import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' | 13 | import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' |
14 | import { Redis } from '../../redis' | ||
15 | 15 | ||
16 | async function processCreateActivity (activity: ActivityCreate) { | 16 | async function processCreateActivity (activity: ActivityCreate) { |
17 | const activityObject = activity.object | 17 | const activityObject = activity.object |
@@ -88,7 +88,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) | |||
88 | const actor = await ActorModel.loadByUrl(view.actor) | 88 | const actor = await ActorModel.loadByUrl(view.actor) |
89 | if (!actor) throw new Error('Unknown actor ' + view.actor) | 89 | if (!actor) throw new Error('Unknown actor ' + view.actor) |
90 | 90 | ||
91 | await video.increment('views') | 91 | await Redis.Instance.addVideoView(video.id) |
92 | 92 | ||
93 | if (video.isOwned()) { | 93 | if (video.isOwned()) { |
94 | // Don't resend the activity to the sender | 94 | // Don't resend the activity to the sender |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts new file mode 100644 index 000000000..875d8ab88 --- /dev/null +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -0,0 +1,40 @@ | |||
1 | import { Redis } from '../../redis' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { VideoModel } from '../../../models/video/video' | ||
4 | import { VideoViewModel } from '../../../models/video/video-views' | ||
5 | |||
6 | async function processVideosViewsViews () { | ||
7 | const hour = new Date().getHours() | ||
8 | const startDate = new Date().setMinutes(0, 0, 0) | ||
9 | const endDate = new Date().setMinutes(59, 59, 999) | ||
10 | |||
11 | const videoIds = await Redis.Instance.getVideosIdViewed(hour) | ||
12 | if (videoIds.length === 0) return | ||
13 | |||
14 | logger.info('Processing videos views in job for hour %d.', hour) | ||
15 | |||
16 | for (const videoId of videoIds) { | ||
17 | const views = await Redis.Instance.getVideoViews(videoId, hour) | ||
18 | if (isNaN(views)) { | ||
19 | logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour) | ||
20 | } else { | ||
21 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | ||
22 | |||
23 | await VideoModel.incrementViews(videoId, views) | ||
24 | await VideoViewModel.create({ | ||
25 | startDate, | ||
26 | endDate, | ||
27 | views, | ||
28 | videoId | ||
29 | }) | ||
30 | } | ||
31 | |||
32 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
33 | } | ||
34 | } | ||
35 | |||
36 | // --------------------------------------------------------------------------- | ||
37 | |||
38 | export { | ||
39 | processVideosViewsViews | ||
40 | } | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ddb357db5..0696ba43c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -2,7 +2,7 @@ import * as Bull from 'bull' | |||
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { Redis } from '../redis' | 4 | import { Redis } from '../redis' |
5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' | 5 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' |
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
@@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email' | |||
10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' | 10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' | 12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' |
13 | import { processVideosViewsViews } from './handlers/video-views' | ||
13 | 14 | ||
14 | type CreateJobArgument = | 15 | type CreateJobArgument = |
15 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 16 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -19,7 +20,8 @@ type CreateJobArgument = | |||
19 | { type: 'video-file-import', payload: VideoFileImportPayload } | | 20 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
20 | { type: 'video-file', payload: VideoFilePayload } | | 21 | { type: 'video-file', payload: VideoFilePayload } | |
21 | { type: 'email', payload: EmailPayload } | | 22 | { type: 'email', payload: EmailPayload } | |
22 | { type: 'video-import', payload: VideoImportPayload } | 23 | { type: 'video-import', payload: VideoImportPayload } | |
24 | { type: 'videos-views', payload: {} } | ||
23 | 25 | ||
24 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 26 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
25 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 27 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
@@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
29 | 'video-file-import': processVideoFileImport, | 31 | 'video-file-import': processVideoFileImport, |
30 | 'video-file': processVideoFile, | 32 | 'video-file': processVideoFile, |
31 | 'email': processEmail, | 33 | 'email': processEmail, |
32 | 'video-import': processVideoImport | 34 | 'video-import': processVideoImport, |
35 | 'videos-views': processVideosViewsViews | ||
33 | } | 36 | } |
34 | 37 | ||
35 | const jobTypes: JobType[] = [ | 38 | const jobTypes: JobType[] = [ |
@@ -40,7 +43,8 @@ const jobTypes: JobType[] = [ | |||
40 | 'email', | 43 | 'email', |
41 | 'video-file', | 44 | 'video-file', |
42 | 'video-file-import', | 45 | 'video-file-import', |
43 | 'video-import' | 46 | 'video-import', |
47 | 'videos-views' | ||
44 | ] | 48 | ] |
45 | 49 | ||
46 | class JobQueue { | 50 | class JobQueue { |
@@ -85,6 +89,8 @@ class JobQueue { | |||
85 | 89 | ||
86 | this.queues[handlerName] = queue | 90 | this.queues[handlerName] = queue |
87 | } | 91 | } |
92 | |||
93 | this.addRepeatableJobs() | ||
88 | } | 94 | } |
89 | 95 | ||
90 | terminate () { | 96 | terminate () { |
@@ -163,6 +169,12 @@ class JobQueue { | |||
163 | } | 169 | } |
164 | } | 170 | } |
165 | 171 | ||
172 | private addRepeatableJobs () { | ||
173 | this.queues['videos-views'].add({}, { | ||
174 | repeat: REPEAT_JOBS['videos-views'] | ||
175 | }) | ||
176 | } | ||
177 | |||
166 | static get Instance () { | 178 | static get Instance () { |
167 | return this.instance || (this.instance = new this()) | 179 | return this.instance || (this.instance = new this()) |
168 | } | 180 | } |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 941f7d557..0b4b41e4e 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -60,11 +60,11 @@ class Redis { | |||
60 | return this.getValue(this.generateResetPasswordKey(userId)) | 60 | return this.getValue(this.generateResetPasswordKey(userId)) |
61 | } | 61 | } |
62 | 62 | ||
63 | setView (ip: string, videoUUID: string) { | 63 | setIPVideoView (ip: string, videoUUID: string) { |
64 | return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) | 64 | return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) |
65 | } | 65 | } |
66 | 66 | ||
67 | async isViewExists (ip: string, videoUUID: string) { | 67 | async isVideoIPViewExists (ip: string, videoUUID: string) { |
68 | return this.exists(this.buildViewKey(ip, videoUUID)) | 68 | return this.exists(this.buildViewKey(ip, videoUUID)) |
69 | } | 69 | } |
70 | 70 | ||
@@ -85,6 +85,52 @@ class Redis { | |||
85 | return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) | 85 | return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) |
86 | } | 86 | } |
87 | 87 | ||
88 | addVideoView (videoId: number) { | ||
89 | const keyIncr = this.generateVideoViewKey(videoId) | ||
90 | const keySet = this.generateVideosViewKey() | ||
91 | |||
92 | return Promise.all([ | ||
93 | this.addToSet(keySet, videoId.toString()), | ||
94 | this.increment(keyIncr) | ||
95 | ]) | ||
96 | } | ||
97 | |||
98 | async getVideoViews (videoId: number, hour: number) { | ||
99 | const key = this.generateVideoViewKey(videoId, hour) | ||
100 | |||
101 | const valueString = await this.getValue(key) | ||
102 | return parseInt(valueString, 10) | ||
103 | } | ||
104 | |||
105 | async getVideosIdViewed (hour: number) { | ||
106 | const key = this.generateVideosViewKey(hour) | ||
107 | |||
108 | const stringIds = await this.getSet(key) | ||
109 | return stringIds.map(s => parseInt(s, 10)) | ||
110 | } | ||
111 | |||
112 | deleteVideoViews (videoId: number, hour: number) { | ||
113 | const keySet = this.generateVideosViewKey(hour) | ||
114 | const keyIncr = this.generateVideoViewKey(videoId, hour) | ||
115 | |||
116 | return Promise.all([ | ||
117 | this.deleteFromSet(keySet, videoId.toString()), | ||
118 | this.deleteKey(keyIncr) | ||
119 | ]) | ||
120 | } | ||
121 | |||
122 | generateVideosViewKey (hour?: number) { | ||
123 | if (!hour) hour = new Date().getHours() | ||
124 | |||
125 | return `videos-view-h${hour}` | ||
126 | } | ||
127 | |||
128 | generateVideoViewKey (videoId: number, hour?: number) { | ||
129 | if (!hour) hour = new Date().getHours() | ||
130 | |||
131 | return `video-view-${videoId}-h${hour}` | ||
132 | } | ||
133 | |||
88 | generateResetPasswordKey (userId: number) { | 134 | generateResetPasswordKey (userId: number) { |
89 | return 'reset-password-' + userId | 135 | return 'reset-password-' + userId |
90 | } | 136 | } |
@@ -107,6 +153,34 @@ class Redis { | |||
107 | }) | 153 | }) |
108 | } | 154 | } |
109 | 155 | ||
156 | private getSet (key: string) { | ||
157 | return new Promise<string[]>((res, rej) => { | ||
158 | this.client.smembers(this.prefix + key, (err, value) => { | ||
159 | if (err) return rej(err) | ||
160 | |||
161 | return res(value) | ||
162 | }) | ||
163 | }) | ||
164 | } | ||
165 | |||
166 | private addToSet (key: string, value: string) { | ||
167 | return new Promise<string[]>((res, rej) => { | ||
168 | this.client.sadd(this.prefix + key, value, err => err ? rej(err) : res()) | ||
169 | }) | ||
170 | } | ||
171 | |||
172 | private deleteFromSet (key: string, value: string) { | ||
173 | return new Promise<void>((res, rej) => { | ||
174 | this.client.srem(this.prefix + key, value, err => err ? rej(err) : res()) | ||
175 | }) | ||
176 | } | ||
177 | |||
178 | private deleteKey (key: string) { | ||
179 | return new Promise<void>((res, rej) => { | ||
180 | this.client.del(this.prefix + key, err => err ? rej(err) : res()) | ||
181 | }) | ||
182 | } | ||
183 | |||
110 | private setValue (key: string, value: string, expirationMilliseconds: number) { | 184 | private setValue (key: string, value: string, expirationMilliseconds: number) { |
111 | return new Promise<void>((res, rej) => { | 185 | return new Promise<void>((res, rej) => { |
112 | this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => { | 186 | this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => { |
@@ -145,6 +219,16 @@ class Redis { | |||
145 | }) | 219 | }) |
146 | } | 220 | } |
147 | 221 | ||
222 | private increment (key: string) { | ||
223 | return new Promise<number>((res, rej) => { | ||
224 | this.client.incr(this.prefix + key, (err, value) => { | ||
225 | if (err) return rej(err) | ||
226 | |||
227 | return res(value) | ||
228 | }) | ||
229 | }) | ||
230 | } | ||
231 | |||
148 | private exists (key: string) { | 232 | private exists (key: string) { |
149 | return new Promise<boolean>((res, rej) => { | 233 | return new Promise<boolean>((res, rej) => { |
150 | this.client.exists(this.prefix + key, (err, existsNumber) => { | 234 | this.client.exists(this.prefix + key, (err, existsNumber) => { |