* **What do you see instead?**
-* **Browser console log if useful (Gist/Pastebin...):**
-* **Server log if useful (Gist/Pastebin...):**
+* **Browser console log if useful:**
+* **Server log if useful (journalctl or /var/www/peertube/storage/logs):**
const videoInstance = res.locals.video
const ip = req.ip
- const exists = await Redis.Instance.isViewExists(ip, videoInstance.uuid)
+ const exists = await Redis.Instance.isVideoIPViewExists(ip, videoInstance.uuid)
if (exists) {
logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid)
return res.status(204).end()
}
- await videoInstance.increment('views')
- await Redis.Instance.setView(ip, videoInstance.uuid)
+ await Promise.all([
+ Redis.Instance.addVideoView(videoInstance.id),
+ Redis.Instance.setIPVideoView(ip, videoInstance.uuid)
+ ])
const serverAccount = await getServerActor()
import { JobType, VideoRateType, VideoState } from '../../shared/models'
import { ActivityPubActorType } from '../../shared/models/activitypub'
import { FollowState } from '../../shared/models/actors'
-import { VideoPrivacy, VideoAbuseState, VideoImportState } from '../../shared/models/videos'
+import { VideoAbuseState, VideoImportState, VideoPrivacy } from '../../shared/models/videos'
// Do not use barrels, remain constants as independent as possible
import { buildPath, isTestInstance, root, sanitizeHost, sanitizeUrl } from '../helpers/core-utils'
import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type'
import { invert } from 'lodash'
+import { CronRepeatOptions, EveryRepeatOptions } from 'bull'
// Use a variable to reload the configuration if we need
let config: IConfig = require('config')
'video-file-import': 1,
'video-file': 1,
'video-import': 1,
- 'email': 5
+ 'email': 5,
+ 'videos-views': 1
}
const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
'activitypub-http-broadcast': 1,
'video-file-import': 1,
'video-file': 1,
'video-import': 1,
- 'email': 5
+ 'email': 5,
+ 'videos-views': 1
}
const JOB_TTL: { [ id in JobType ]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
'video-file-import': 1000 * 3600, // 1 hour
'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long
'video-import': 1000 * 3600 * 5, // 5 hours
- 'email': 60000 * 10 // 10 minutes
+ 'email': 60000 * 10, // 10 minutes
+ 'videos-views': undefined // Unlimited
}
+const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
+ 'videos-views': {
+ cron: '1 * * * *' // At 1 minutes past the hour
+ }
+}
+
const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...)
const JOB_REQUEST_TIMEOUT = 3000 // 3 seconds
SCHEDULER_INTERVALS_MS.badActorFollow = 10000
SCHEDULER_INTERVALS_MS.removeOldJobs = 10000
SCHEDULER_INTERVALS_MS.updateVideos = 5000
+ REPEAT_JOBS['videos-views'] = { every: 5000 }
VIDEO_VIEW_LIFETIME = 1000 // 1 second
USER_PASSWORD_RESET_LIFETIME,
IMAGE_MIMETYPE_EXT,
SCHEDULER_INTERVALS_MS,
+ REPEAT_JOBS,
STATIC_DOWNLOAD_PATHS,
RATES_LIMIT,
VIDEO_EXT_MIMETYPE,
import { ScheduleVideoUpdateModel } from '../models/video/schedule-video-update'
import { VideoCaptionModel } from '../models/video/video-caption'
import { VideoImportModel } from '../models/video/video-import'
+import { VideoViewModel } from '../models/video/video-views'
require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string
VideoModel,
VideoCommentModel,
ScheduleVideoUpdateModel,
- VideoImportModel
+ VideoImportModel,
+ VideoViewModel
])
// Check extensions exist in the database
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
import { ActorModel } from '../../../models/activitypub/actor'
import { VideoAbuseModel } from '../../../models/video/video-abuse'
-import { VideoCommentModel } from '../../../models/video/video-comment'
import { getOrCreateActorAndServerAndModel } from '../actor'
import { addVideoComment, resolveThread } from '../video-comments'
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils'
+import { Redis } from '../../redis'
async function processCreateActivity (activity: ActivityCreate) {
const activityObject = activity.object
const actor = await ActorModel.loadByUrl(view.actor)
if (!actor) throw new Error('Unknown actor ' + view.actor)
- await video.increment('views')
+ await Redis.Instance.addVideoView(video.id)
if (video.isOwned()) {
// Don't resend the activity to the sender
--- /dev/null
+import { Redis } from '../../redis'
+import { logger } from '../../../helpers/logger'
+import { VideoModel } from '../../../models/video/video'
+import { VideoViewModel } from '../../../models/video/video-views'
+
+async function processVideosViewsViews () {
+ const hour = new Date().getHours()
+ const startDate = new Date().setMinutes(0, 0, 0)
+ const endDate = new Date().setMinutes(59, 59, 999)
+
+ const videoIds = await Redis.Instance.getVideosIdViewed(hour)
+ if (videoIds.length === 0) return
+
+ logger.info('Processing videos views in job for hour %d.', hour)
+
+ for (const videoId of videoIds) {
+ const views = await Redis.Instance.getVideoViews(videoId, hour)
+ if (isNaN(views)) {
+ logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour)
+ } else {
+ logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
+
+ await VideoModel.incrementViews(videoId, views)
+ await VideoViewModel.create({
+ startDate,
+ endDate,
+ views,
+ videoId
+ })
+ }
+
+ await Redis.Instance.deleteVideoViews(videoId, hour)
+ }
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ processVideosViewsViews
+}
import { JobState, JobType } from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { Redis } from '../redis'
-import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers'
+import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
import { processVideoImport, VideoImportPayload } from './handlers/video-import'
+import { processVideosViewsViews } from './handlers/video-views'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'video-file-import', payload: VideoFileImportPayload } |
{ type: 'video-file', payload: VideoFilePayload } |
{ type: 'email', payload: EmailPayload } |
- { type: 'video-import', payload: VideoImportPayload }
+ { type: 'video-import', payload: VideoImportPayload } |
+ { type: 'videos-views', payload: {} }
const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'video-file-import': processVideoFileImport,
'video-file': processVideoFile,
'email': processEmail,
- 'video-import': processVideoImport
+ 'video-import': processVideoImport,
+ 'videos-views': processVideosViewsViews
}
const jobTypes: JobType[] = [
'email',
'video-file',
'video-file-import',
- 'video-import'
+ 'video-import',
+ 'videos-views'
]
class JobQueue {
this.queues[handlerName] = queue
}
+
+ this.addRepeatableJobs()
}
terminate () {
}
}
+ private addRepeatableJobs () {
+ this.queues['videos-views'].add({}, {
+ repeat: REPEAT_JOBS['videos-views']
+ })
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}
return this.getValue(this.generateResetPasswordKey(userId))
}
- setView (ip: string, videoUUID: string) {
+ setIPVideoView (ip: string, videoUUID: string) {
return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME)
}
- async isViewExists (ip: string, videoUUID: string) {
+ async isVideoIPViewExists (ip: string, videoUUID: string) {
return this.exists(this.buildViewKey(ip, videoUUID))
}
return this.setObject(this.buildCachedRouteKey(req), cached, lifetime)
}
+ addVideoView (videoId: number) {
+ const keyIncr = this.generateVideoViewKey(videoId)
+ const keySet = this.generateVideosViewKey()
+
+ return Promise.all([
+ this.addToSet(keySet, videoId.toString()),
+ this.increment(keyIncr)
+ ])
+ }
+
+ async getVideoViews (videoId: number, hour: number) {
+ const key = this.generateVideoViewKey(videoId, hour)
+
+ const valueString = await this.getValue(key)
+ return parseInt(valueString, 10)
+ }
+
+ async getVideosIdViewed (hour: number) {
+ const key = this.generateVideosViewKey(hour)
+
+ const stringIds = await this.getSet(key)
+ return stringIds.map(s => parseInt(s, 10))
+ }
+
+ deleteVideoViews (videoId: number, hour: number) {
+ const keySet = this.generateVideosViewKey(hour)
+ const keyIncr = this.generateVideoViewKey(videoId, hour)
+
+ return Promise.all([
+ this.deleteFromSet(keySet, videoId.toString()),
+ this.deleteKey(keyIncr)
+ ])
+ }
+
+ generateVideosViewKey (hour?: number) {
+ if (!hour) hour = new Date().getHours()
+
+ return `videos-view-h${hour}`
+ }
+
+ generateVideoViewKey (videoId: number, hour?: number) {
+ if (!hour) hour = new Date().getHours()
+
+ return `video-view-${videoId}-h${hour}`
+ }
+
generateResetPasswordKey (userId: number) {
return 'reset-password-' + userId
}
})
}
+ private getSet (key: string) {
+ return new Promise<string[]>((res, rej) => {
+ this.client.smembers(this.prefix + key, (err, value) => {
+ if (err) return rej(err)
+
+ return res(value)
+ })
+ })
+ }
+
+ private addToSet (key: string, value: string) {
+ return new Promise<string[]>((res, rej) => {
+ this.client.sadd(this.prefix + key, value, err => err ? rej(err) : res())
+ })
+ }
+
+ private deleteFromSet (key: string, value: string) {
+ return new Promise<void>((res, rej) => {
+ this.client.srem(this.prefix + key, value, err => err ? rej(err) : res())
+ })
+ }
+
+ private deleteKey (key: string) {
+ return new Promise<void>((res, rej) => {
+ this.client.del(this.prefix + key, err => err ? rej(err) : res())
+ })
+ }
+
private setValue (key: string, value: string, expirationMilliseconds: number) {
return new Promise<void>((res, rej) => {
this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => {
})
}
+ private increment (key: string) {
+ return new Promise<number>((res, rej) => {
+ this.client.incr(this.prefix + key, (err, value) => {
+ if (err) return rej(err)
+
+ return res(value)
+ })
+ })
+ }
+
private exists (key: string) {
return new Promise<boolean>((res, rej) => {
this.client.exists(this.prefix + key, (err, existsNumber) => {
--- /dev/null
+import { AllowNull, BelongsTo, Column, CreatedAt, ForeignKey, Model, Table } from 'sequelize-typescript'
+import { VideoModel } from './video'
+import * as Sequelize from 'sequelize'
+
+@Table({
+ tableName: 'videoView',
+ indexes: [
+ {
+ fields: [ 'videoId' ]
+ }
+ ]
+})
+export class VideoViewModel extends Model<VideoViewModel> {
+ @CreatedAt
+ createdAt: Date
+
+ @AllowNull(false)
+ @Column(Sequelize.DATE)
+ startDate: Date
+
+ @AllowNull(false)
+ @Column(Sequelize.DATE)
+ endDate: Date
+
+ @AllowNull(false)
+ @Column
+ views: number
+
+ @ForeignKey(() => VideoModel)
+ @Column
+ videoId: number
+
+ @BelongsTo(() => VideoModel, {
+ foreignKey: {
+ allowNull: false
+ },
+ onDelete: 'CASCADE'
+ })
+ Video: VideoModel
+
+}
}
}
+ static incrementViews (id: number, views: number) {
+ return VideoModel.increment('views', {
+ by: views,
+ where: {
+ id
+ }
+ })
+ }
+
private static buildActorWhereWithFilter (filter?: VideoFilter) {
if (filter && filter === 'local') {
return {
import * as chai from 'chai'
import { About } from '../../../../shared/models/server/about.model'
import { CustomConfig } from '../../../../shared/models/server/custom-config.model'
-import { deleteCustomConfig, getAbout, getVideo, killallServers, login, reRunServer, uploadVideo, userLogin, viewVideo } from '../../utils'
+import {
+ deleteCustomConfig,
+ getAbout,
+ getVideo,
+ killallServers,
+ login,
+ reRunServer,
+ uploadVideo,
+ userLogin,
+ viewVideo,
+ wait
+} from '../../utils'
const expect = chai.expect
import {
})
it('Should view a video only once with the same IP by default', async function () {
+ this.timeout(20000)
+
await viewVideo(server.url, videoId)
await viewVideo(server.url, videoId)
+ // Wait the repeatable job
+ await wait(8000)
+
const { body } = await getVideo(server.url, videoId)
expect(body.views).to.equal(1)
})
it('Should view a video 2 times with the X-Forwarded-For header set', async function () {
+ this.timeout(20000)
+
await viewVideo(server.url, videoId, 204, '0.0.0.1,127.0.0.1')
await viewVideo(server.url, videoId, 204, '0.0.0.2,127.0.0.1')
+ // Wait the repeatable job
+ await wait(8000)
+
const { body } = await getVideo(server.url, videoId)
expect(body.views).to.equal(3)
})
it('Should view a video only once with the same client IP in the X-Forwarded-For header', async function () {
+ this.timeout(20000)
+
await viewVideo(server.url, videoId, 204, '0.0.0.4,0.0.0.3,::ffff:127.0.0.1')
await viewVideo(server.url, videoId, 204, '0.0.0.5,0.0.0.3,127.0.0.1')
+ // Wait the repeatable job
+ await wait(8000)
+
const { body } = await getVideo(server.url, videoId)
expect(body.views).to.equal(4)
})
it('Should view a video two times with a different client IP in the X-Forwarded-For header', async function () {
+ this.timeout(20000)
+
await viewVideo(server.url, videoId, 204, '0.0.0.8,0.0.0.6,127.0.0.1')
await viewVideo(server.url, videoId, 204, '0.0.0.8,0.0.0.7,127.0.0.1')
+ // Wait the repeatable job
+ await wait(8000)
+
const { body } = await getVideo(server.url, videoId)
expect(body.views).to.equal(6)
})
await viewVideo(servers[0].url, videoUUID)
+ // Wait the video views repeatable job
+ await wait(8000)
+
await follow(servers[2].url, [ servers[0].url ], servers[2].accessToken)
await waitJobs(servers)
})
})
it('Should view multiple videos on owned servers', async function () {
- this.timeout(15000)
+ this.timeout(30000)
const tasks: Promise<any>[] = []
await viewVideo(servers[2].url, localVideosServer3[0])
await waitJobs(servers)
+ // Wait the repeatable job
+ await wait(6000)
+
for (const server of servers) {
const res = await getVideosList(server.url)
})
it('Should view multiple videos on each servers', async function () {
- this.timeout(15000)
+ this.timeout(30000)
const tasks: Promise<any>[] = []
tasks.push(viewVideo(servers[0].url, remoteVideosServer1[0]))
await waitJobs(servers)
+ // Wait the repeatable job
+ await wait(8000)
+
let baseVideos = null
for (const server of servers) {
})
it('Should have the views updated', async function () {
- this.timeout(10000)
+ this.timeout(20000)
await viewVideo(server.url, videoId)
await viewVideo(server.url, videoId)
await viewVideo(server.url, videoId)
await viewVideo(server.url, videoId)
+ // Wait the repeatable job
+ await wait(8000)
+
const res = await getVideo(server.url, videoId)
const video = res.body
import * as request from 'supertest'
-import { JobState } from '../../../../shared/models'
+import { Job, JobState } from '../../../../shared/models'
import { ServerInfo, wait } from '../index'
function getJobsList (url: string, accessToken: string, state: JobState) {
for (const server of servers) {
for (const state of states) {
const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
- .then(res => {
- if (res.body.total > 0) pendingRequests = true
+ .then(res => res.body.data)
+ .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
+ .then(jobs => {
+ if (jobs.length !== 0) pendingRequests = true
})
tasks.push(p)
}
'video-file-import' |
'video-file' |
'email' |
- 'video-import'
+ 'video-import' |
+ 'videos-views'
export interface Job {
id: number