From 6b6168606bc86430f6b7821c9d5f1c80d0425ebf Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 29 Aug 2018 16:26:25 +0200 Subject: [PATCH] Bufferize videos views in redis --- .github/ISSUE_TEMPLATE.md | 4 +- server/controllers/api/videos/index.ts | 8 +- server/initializers/constants.ts | 20 ++++- server/initializers/database.ts | 4 +- .../lib/activitypub/process/process-create.ts | 4 +- server/lib/job-queue/handlers/video-views.ts | 40 +++++++++ server/lib/job-queue/job-queue.ts | 20 ++++- server/lib/redis.ts | 88 ++++++++++++++++++- server/models/video/video-views.ts | 41 +++++++++ server/models/video/video.ts | 9 ++ server/tests/api/server/reverse-proxy.ts | 33 ++++++- server/tests/api/server/stats.ts | 3 + server/tests/api/videos/multiple-servers.ts | 10 ++- server/tests/api/videos/single-server.ts | 5 +- server/tests/utils/server/jobs.ts | 8 +- shared/models/server/job.model.ts | 3 +- 16 files changed, 274 insertions(+), 26 deletions(-) create mode 100644 server/lib/job-queue/handlers/video-views.ts create mode 100644 server/models/video/video-views.ts diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md index 6b34c2cf7..8edb00b9c 100644 --- a/.github/ISSUE_TEMPLATE.md +++ b/.github/ISSUE_TEMPLATE.md @@ -11,6 +11,6 @@ * **What do you see instead?** -* **Browser console log if useful (Gist/Pastebin...):** -* **Server log if useful (Gist/Pastebin...):** +* **Browser console log if useful:** +* **Server log if useful (journalctl or /var/www/peertube/storage/logs):** diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index a86cf4f99..be803490b 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -380,14 +380,16 @@ async function viewVideo (req: express.Request, res: express.Response) { const videoInstance = res.locals.video const ip = req.ip - const exists = await Redis.Instance.isViewExists(ip, videoInstance.uuid) + const exists = await Redis.Instance.isVideoIPViewExists(ip, videoInstance.uuid) if (exists) { logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid) return res.status(204).end() } - await videoInstance.increment('views') - await Redis.Instance.setView(ip, videoInstance.uuid) + await Promise.all([ + Redis.Instance.addVideoView(videoInstance.id), + Redis.Instance.setIPVideoView(ip, videoInstance.uuid) + ]) const serverAccount = await getServerActor() diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 7f1b25654..2d9a2e670 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -3,11 +3,12 @@ import { dirname, join } from 'path' import { JobType, VideoRateType, VideoState } from '../../shared/models' import { ActivityPubActorType } from '../../shared/models/activitypub' import { FollowState } from '../../shared/models/actors' -import { VideoPrivacy, VideoAbuseState, VideoImportState } from '../../shared/models/videos' +import { VideoAbuseState, VideoImportState, VideoPrivacy } from '../../shared/models/videos' // Do not use barrels, remain constants as independent as possible import { buildPath, isTestInstance, root, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type' import { invert } from 'lodash' +import { CronRepeatOptions, EveryRepeatOptions } from 'bull' // Use a variable to reload the configuration if we need let config: IConfig = require('config') @@ -90,7 +91,8 @@ const JOB_ATTEMPTS: { [ id in JobType ]: number } = { 'video-file-import': 1, 'video-file': 1, 'video-import': 1, - 'email': 5 + 'email': 5, + 'videos-views': 1 } const JOB_CONCURRENCY: { [ id in JobType ]: number } = { 'activitypub-http-broadcast': 1, @@ -100,7 +102,8 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = { 'video-file-import': 1, 'video-file': 1, 'video-import': 1, - 'email': 5 + 'email': 5, + 'videos-views': 1 } const JOB_TTL: { [ id in JobType ]: number } = { 'activitypub-http-broadcast': 60000 * 10, // 10 minutes @@ -110,8 +113,15 @@ const JOB_TTL: { [ id in JobType ]: number } = { 'video-file-import': 1000 * 3600, // 1 hour 'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long 'video-import': 1000 * 3600 * 5, // 5 hours - 'email': 60000 * 10 // 10 minutes + 'email': 60000 * 10, // 10 minutes + 'videos-views': undefined // Unlimited } +const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { + 'videos-views': { + cron: '1 * * * *' // At 1 minutes past the hour + } +} + const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) const JOB_REQUEST_TIMEOUT = 3000 // 3 seconds @@ -591,6 +601,7 @@ if (isTestInstance() === true) { SCHEDULER_INTERVALS_MS.badActorFollow = 10000 SCHEDULER_INTERVALS_MS.removeOldJobs = 10000 SCHEDULER_INTERVALS_MS.updateVideos = 5000 + REPEAT_JOBS['videos-views'] = { every: 5000 } VIDEO_VIEW_LIFETIME = 1000 // 1 second @@ -652,6 +663,7 @@ export { USER_PASSWORD_RESET_LIFETIME, IMAGE_MIMETYPE_EXT, SCHEDULER_INTERVALS_MS, + REPEAT_JOBS, STATIC_DOWNLOAD_PATHS, RATES_LIMIT, VIDEO_EXT_MIMETYPE, diff --git a/server/initializers/database.ts b/server/initializers/database.ts index 0be752363..78bc8101c 100644 --- a/server/initializers/database.ts +++ b/server/initializers/database.ts @@ -25,6 +25,7 @@ import { CONFIG } from './constants' import { ScheduleVideoUpdateModel } from '../models/video/schedule-video-update' import { VideoCaptionModel } from '../models/video/video-caption' import { VideoImportModel } from '../models/video/video-import' +import { VideoViewModel } from '../models/video/video-views' require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string @@ -83,7 +84,8 @@ async function initDatabaseModels (silent: boolean) { VideoModel, VideoCommentModel, ScheduleVideoUpdateModel, - VideoImportModel + VideoImportModel, + VideoViewModel ]) // Check extensions exist in the database diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index 75f07d131..16f426e23 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts @@ -7,11 +7,11 @@ import { sequelizeTypescript } from '../../../initializers' import { AccountVideoRateModel } from '../../../models/account/account-video-rate' import { ActorModel } from '../../../models/activitypub/actor' import { VideoAbuseModel } from '../../../models/video/video-abuse' -import { VideoCommentModel } from '../../../models/video/video-comment' import { getOrCreateActorAndServerAndModel } from '../actor' import { addVideoComment, resolveThread } from '../video-comments' import { getOrCreateVideoAndAccountAndChannel } from '../videos' import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' +import { Redis } from '../../redis' async function processCreateActivity (activity: ActivityCreate) { const activityObject = activity.object @@ -88,7 +88,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) const actor = await ActorModel.loadByUrl(view.actor) if (!actor) throw new Error('Unknown actor ' + view.actor) - await video.increment('views') + await Redis.Instance.addVideoView(video.id) if (video.isOwned()) { // Don't resend the activity to the sender diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts new file mode 100644 index 000000000..875d8ab88 --- /dev/null +++ b/server/lib/job-queue/handlers/video-views.ts @@ -0,0 +1,40 @@ +import { Redis } from '../../redis' +import { logger } from '../../../helpers/logger' +import { VideoModel } from '../../../models/video/video' +import { VideoViewModel } from '../../../models/video/video-views' + +async function processVideosViewsViews () { + const hour = new Date().getHours() + const startDate = new Date().setMinutes(0, 0, 0) + const endDate = new Date().setMinutes(59, 59, 999) + + const videoIds = await Redis.Instance.getVideosIdViewed(hour) + if (videoIds.length === 0) return + + logger.info('Processing videos views in job for hour %d.', hour) + + for (const videoId of videoIds) { + const views = await Redis.Instance.getVideoViews(videoId, hour) + if (isNaN(views)) { + logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour) + } else { + logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) + + await VideoModel.incrementViews(videoId, views) + await VideoViewModel.create({ + startDate, + endDate, + views, + videoId + }) + } + + await Redis.Instance.deleteVideoViews(videoId, hour) + } +} + +// --------------------------------------------------------------------------- + +export { + processVideosViewsViews +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ddb357db5..0696ba43c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -2,7 +2,7 @@ import * as Bull from 'bull' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' import { Redis } from '../redis' -import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' +import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' @@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email' import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' import { processVideoImport, VideoImportPayload } from './handlers/video-import' +import { processVideosViewsViews } from './handlers/video-views' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -19,7 +20,8 @@ type CreateJobArgument = { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } | - { type: 'video-import', payload: VideoImportPayload } + { type: 'video-import', payload: VideoImportPayload } | + { type: 'videos-views', payload: {} } const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, @@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'video-file-import': processVideoFileImport, 'video-file': processVideoFile, 'email': processEmail, - 'video-import': processVideoImport + 'video-import': processVideoImport, + 'videos-views': processVideosViewsViews } const jobTypes: JobType[] = [ @@ -40,7 +43,8 @@ const jobTypes: JobType[] = [ 'email', 'video-file', 'video-file-import', - 'video-import' + 'video-import', + 'videos-views' ] class JobQueue { @@ -85,6 +89,8 @@ class JobQueue { this.queues[handlerName] = queue } + + this.addRepeatableJobs() } terminate () { @@ -163,6 +169,12 @@ class JobQueue { } } + private addRepeatableJobs () { + this.queues['videos-views'].add({}, { + repeat: REPEAT_JOBS['videos-views'] + }) + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 941f7d557..0b4b41e4e 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -60,11 +60,11 @@ class Redis { return this.getValue(this.generateResetPasswordKey(userId)) } - setView (ip: string, videoUUID: string) { + setIPVideoView (ip: string, videoUUID: string) { return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) } - async isViewExists (ip: string, videoUUID: string) { + async isVideoIPViewExists (ip: string, videoUUID: string) { return this.exists(this.buildViewKey(ip, videoUUID)) } @@ -85,6 +85,52 @@ class Redis { return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) } + addVideoView (videoId: number) { + const keyIncr = this.generateVideoViewKey(videoId) + const keySet = this.generateVideosViewKey() + + return Promise.all([ + this.addToSet(keySet, videoId.toString()), + this.increment(keyIncr) + ]) + } + + async getVideoViews (videoId: number, hour: number) { + const key = this.generateVideoViewKey(videoId, hour) + + const valueString = await this.getValue(key) + return parseInt(valueString, 10) + } + + async getVideosIdViewed (hour: number) { + const key = this.generateVideosViewKey(hour) + + const stringIds = await this.getSet(key) + return stringIds.map(s => parseInt(s, 10)) + } + + deleteVideoViews (videoId: number, hour: number) { + const keySet = this.generateVideosViewKey(hour) + const keyIncr = this.generateVideoViewKey(videoId, hour) + + return Promise.all([ + this.deleteFromSet(keySet, videoId.toString()), + this.deleteKey(keyIncr) + ]) + } + + generateVideosViewKey (hour?: number) { + if (!hour) hour = new Date().getHours() + + return `videos-view-h${hour}` + } + + generateVideoViewKey (videoId: number, hour?: number) { + if (!hour) hour = new Date().getHours() + + return `video-view-${videoId}-h${hour}` + } + generateResetPasswordKey (userId: number) { return 'reset-password-' + userId } @@ -107,6 +153,34 @@ class Redis { }) } + private getSet (key: string) { + return new Promise((res, rej) => { + this.client.smembers(this.prefix + key, (err, value) => { + if (err) return rej(err) + + return res(value) + }) + }) + } + + private addToSet (key: string, value: string) { + return new Promise((res, rej) => { + this.client.sadd(this.prefix + key, value, err => err ? rej(err) : res()) + }) + } + + private deleteFromSet (key: string, value: string) { + return new Promise((res, rej) => { + this.client.srem(this.prefix + key, value, err => err ? rej(err) : res()) + }) + } + + private deleteKey (key: string) { + return new Promise((res, rej) => { + this.client.del(this.prefix + key, err => err ? rej(err) : res()) + }) + } + private setValue (key: string, value: string, expirationMilliseconds: number) { return new Promise((res, rej) => { this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => { @@ -145,6 +219,16 @@ class Redis { }) } + private increment (key: string) { + return new Promise((res, rej) => { + this.client.incr(this.prefix + key, (err, value) => { + if (err) return rej(err) + + return res(value) + }) + }) + } + private exists (key: string) { return new Promise((res, rej) => { this.client.exists(this.prefix + key, (err, existsNumber) => { diff --git a/server/models/video/video-views.ts b/server/models/video/video-views.ts new file mode 100644 index 000000000..90ce671fd --- /dev/null +++ b/server/models/video/video-views.ts @@ -0,0 +1,41 @@ +import { AllowNull, BelongsTo, Column, CreatedAt, ForeignKey, Model, Table } from 'sequelize-typescript' +import { VideoModel } from './video' +import * as Sequelize from 'sequelize' + +@Table({ + tableName: 'videoView', + indexes: [ + { + fields: [ 'videoId' ] + } + ] +}) +export class VideoViewModel extends Model { + @CreatedAt + createdAt: Date + + @AllowNull(false) + @Column(Sequelize.DATE) + startDate: Date + + @AllowNull(false) + @Column(Sequelize.DATE) + endDate: Date + + @AllowNull(false) + @Column + views: number + + @ForeignKey(() => VideoModel) + @Column + videoId: number + + @BelongsTo(() => VideoModel, { + foreignKey: { + allowNull: false + }, + onDelete: 'CASCADE' + }) + Video: VideoModel + +} diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 6271db1b3..3410833c8 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -1074,6 +1074,15 @@ export class VideoModel extends Model { } } + static incrementViews (id: number, views: number) { + return VideoModel.increment('views', { + by: views, + where: { + id + } + }) + } + private static buildActorWhereWithFilter (filter?: VideoFilter) { if (filter && filter === 'local') { return { diff --git a/server/tests/api/server/reverse-proxy.ts b/server/tests/api/server/reverse-proxy.ts index 908b4a68c..e2c2a293e 100644 --- a/server/tests/api/server/reverse-proxy.ts +++ b/server/tests/api/server/reverse-proxy.ts @@ -4,7 +4,18 @@ import 'mocha' import * as chai from 'chai' import { About } from '../../../../shared/models/server/about.model' import { CustomConfig } from '../../../../shared/models/server/custom-config.model' -import { deleteCustomConfig, getAbout, getVideo, killallServers, login, reRunServer, uploadVideo, userLogin, viewVideo } from '../../utils' +import { + deleteCustomConfig, + getAbout, + getVideo, + killallServers, + login, + reRunServer, + uploadVideo, + userLogin, + viewVideo, + wait +} from '../../utils' const expect = chai.expect import { @@ -30,33 +41,53 @@ describe('Test application behind a reverse proxy', function () { }) it('Should view a video only once with the same IP by default', async function () { + this.timeout(20000) + await viewVideo(server.url, videoId) await viewVideo(server.url, videoId) + // Wait the repeatable job + await wait(8000) + const { body } = await getVideo(server.url, videoId) expect(body.views).to.equal(1) }) it('Should view a video 2 times with the X-Forwarded-For header set', async function () { + this.timeout(20000) + await viewVideo(server.url, videoId, 204, '0.0.0.1,127.0.0.1') await viewVideo(server.url, videoId, 204, '0.0.0.2,127.0.0.1') + // Wait the repeatable job + await wait(8000) + const { body } = await getVideo(server.url, videoId) expect(body.views).to.equal(3) }) it('Should view a video only once with the same client IP in the X-Forwarded-For header', async function () { + this.timeout(20000) + await viewVideo(server.url, videoId, 204, '0.0.0.4,0.0.0.3,::ffff:127.0.0.1') await viewVideo(server.url, videoId, 204, '0.0.0.5,0.0.0.3,127.0.0.1') + // Wait the repeatable job + await wait(8000) + const { body } = await getVideo(server.url, videoId) expect(body.views).to.equal(4) }) it('Should view a video two times with a different client IP in the X-Forwarded-For header', async function () { + this.timeout(20000) + await viewVideo(server.url, videoId, 204, '0.0.0.8,0.0.0.6,127.0.0.1') await viewVideo(server.url, videoId, 204, '0.0.0.8,0.0.0.7,127.0.0.1') + // Wait the repeatable job + await wait(8000) + const { body } = await getVideo(server.url, videoId) expect(body.views).to.equal(6) }) diff --git a/server/tests/api/server/stats.ts b/server/tests/api/server/stats.ts index e75089a14..fc9b88805 100644 --- a/server/tests/api/server/stats.ts +++ b/server/tests/api/server/stats.ts @@ -46,6 +46,9 @@ describe('Test stats', function () { await viewVideo(servers[0].url, videoUUID) + // Wait the video views repeatable job + await wait(8000) + await follow(servers[2].url, [ servers[0].url ], servers[2].accessToken) await waitJobs(servers) }) diff --git a/server/tests/api/videos/multiple-servers.ts b/server/tests/api/videos/multiple-servers.ts index c551ccc59..4553ee855 100644 --- a/server/tests/api/videos/multiple-servers.ts +++ b/server/tests/api/videos/multiple-servers.ts @@ -492,7 +492,7 @@ describe('Test multiple servers', function () { }) it('Should view multiple videos on owned servers', async function () { - this.timeout(15000) + this.timeout(30000) const tasks: Promise[] = [] await viewVideo(servers[2].url, localVideosServer3[0]) @@ -511,6 +511,9 @@ describe('Test multiple servers', function () { await waitJobs(servers) + // Wait the repeatable job + await wait(6000) + for (const server of servers) { const res = await getVideosList(server.url) @@ -524,7 +527,7 @@ describe('Test multiple servers', function () { }) it('Should view multiple videos on each servers', async function () { - this.timeout(15000) + this.timeout(30000) const tasks: Promise[] = [] tasks.push(viewVideo(servers[0].url, remoteVideosServer1[0])) @@ -542,6 +545,9 @@ describe('Test multiple servers', function () { await waitJobs(servers) + // Wait the repeatable job + await wait(8000) + let baseVideos = null for (const server of servers) { diff --git a/server/tests/api/videos/single-server.ts b/server/tests/api/videos/single-server.ts index a757ad9da..89408fec6 100644 --- a/server/tests/api/videos/single-server.ts +++ b/server/tests/api/videos/single-server.ts @@ -196,7 +196,7 @@ describe('Test a single server', function () { }) it('Should have the views updated', async function () { - this.timeout(10000) + this.timeout(20000) await viewVideo(server.url, videoId) await viewVideo(server.url, videoId) @@ -212,6 +212,9 @@ describe('Test a single server', function () { await viewVideo(server.url, videoId) await viewVideo(server.url, videoId) + // Wait the repeatable job + await wait(8000) + const res = await getVideo(server.url, videoId) const video = res.body diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts index c9cb8d3a3..4c02cace5 100644 --- a/server/tests/utils/server/jobs.ts +++ b/server/tests/utils/server/jobs.ts @@ -1,5 +1,5 @@ import * as request from 'supertest' -import { JobState } from '../../../../shared/models' +import { Job, JobState } from '../../../../shared/models' import { ServerInfo, wait } from '../index' function getJobsList (url: string, accessToken: string, state: JobState) { @@ -44,8 +44,10 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { for (const server of servers) { for (const state of states) { const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt') - .then(res => { - if (res.body.total > 0) pendingRequests = true + .then(res => res.body.data) + .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views')) + .then(jobs => { + if (jobs.length !== 0) pendingRequests = true }) tasks.push(p) } diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 2565479f6..4046297c4 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -7,7 +7,8 @@ export type JobType = 'activitypub-http-unicast' | 'video-file-import' | 'video-file' | 'email' | - 'video-import' + 'video-import' | + 'videos-views' export interface Job { id: number -- 2.41.0