const processors: { [id in SendDebugCommand['command']]: () => Promise<any> } = {
'remove-dandling-resumable-uploads': () => RemoveDanglingResumableUploadsScheduler.Instance.execute(),
'process-video-views-buffer': () => VideoViewsBufferScheduler.Instance.execute(),
- 'process-video-viewers': () => VideoViewsManager.Instance.processViewers()
+ 'process-video-viewers': () => VideoViewsManager.Instance.processViewerStats()
}
await processors[body.command]()
import express from 'express'
-import { sendView } from '@server/lib/activitypub/send/send-view'
import { Hooks } from '@server/lib/plugins/hooks'
import { VideoViewsManager } from '@server/lib/views/video-views-manager'
-import { getServerActor } from '@server/models/application/application'
import { MVideoId } from '@server/types/models'
import { HttpStatusCode, VideoView } from '@shared/models'
import { asyncMiddleware, methodsValidator, openapiOperationDoc, optionalAuthenticate, videoViewValidator } from '../../../middlewares'
const body = req.body as VideoView
const ip = req.ip
- const { successView, successViewer } = await VideoViewsManager.Instance.processLocalView({
+ const { successView } = await VideoViewsManager.Instance.processLocalView({
video,
ip,
currentTime: body.currentTime,
})
if (successView) {
- await sendView({ byActor: await getServerActor(), video, type: 'view' })
-
Hooks.runAction('action:api.video.viewed', { video: video, ip, req, res })
}
- if (successViewer) {
- await sendView({ byActor: await getServerActor(), video, type: 'viewer' })
- }
-
await updateUserHistoryIfNeeded(body, video, res)
return res.status(HttpStatusCode.NO_CONTENT_204).end()
const VIEW_LIFETIME = {
VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
- VIEWER_COUNTER: 60000 * 5, // 5 minutes
+ VIEWER_COUNTER: 60000 * 1, // 1 minute
VIEWER_STATS: 60000 * 60 // 1 hour
}
? new Date(activity.expires)
: undefined
- await VideoViewsManager.Instance.processRemoteView({ video, viewerExpires })
+ await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires })
if (video.isOwned()) {
// Forward the view but don't resend the activity to the sender
byActor: MActorLight
type: ViewType
video: MVideoImmutable
+ viewerIdentifier: string
transaction?: Transaction
}) {
- const { byActor, type, video, transaction } = options
+ const { byActor, type, video, viewerIdentifier, transaction } = options
logger.info('Creating job to send %s of %s.', type, video.url)
const activityBuilder = (audience: ActivityAudience) => {
- const url = getLocalVideoViewActivityPubUrl(byActor, video)
+ const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier)
return buildViewActivity({ url, byActor, video, audience, type })
}
return WEBSERVER.URL + '/admin/abuses/' + abuse.id
}
-function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId) {
- return byActor.url + '/views/videos/' + video.id + '/' + new Date().toISOString()
+function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId, viewerIdentifier: string) {
+ return byActor.url + '/views/videos/' + video.id + '/' + viewerIdentifier
}
function getLocalVideoViewerActivityPubUrl (stats: MLocalVideoViewer) {
return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
}
- setIPVideoViewer (ip: string, videoUUID: string) {
- return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER_COUNTER)
- }
-
async doesVideoIPViewExist (ip: string, videoUUID: string) {
return this.exists(this.generateIPViewKey(ip, videoUUID))
}
- async doesVideoIPViewerExist (ip: string, videoUUID: string) {
- return this.exists(this.generateIPViewerKey(ip, videoUUID))
- }
-
/* ************ Tracker IP block ************ */
setTrackerBlockIP (ip: string) {
return `views-${videoUUID}-${ip}`
}
- private generateIPViewerKey (ip: string, videoUUID: string) {
- return `viewer-${videoUUID}-${ip}`
- }
-
private generateTrackerBlockIPKey (ip: string) {
return `tracker-block-ip-${ip}`
}
-export * from './video-viewers'
+export * from './video-viewer-counters'
+export * from './video-viewer-stats'
export * from './video-views'
--- /dev/null
+
+import { isTestInstance } from '@server/helpers/core-utils'
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { VIEW_LIFETIME } from '@server/initializers/constants'
+import { sendView } from '@server/lib/activitypub/send/send-view'
+import { PeerTubeSocket } from '@server/lib/peertube-socket'
+import { getServerActor } from '@server/models/application/application'
+import { VideoModel } from '@server/models/video/video'
+import { MVideo } from '@server/types/models'
+import { buildUUID, sha256 } from '@shared/extra-utils'
+
+const lTags = loggerTagsFactory('views')
+
+type Viewer = {
+ expires: number
+ id: string
+ lastFederation?: number
+}
+
+export class VideoViewerCounters {
+
+ // expires is new Date().getTime()
+ private readonly viewersPerVideo = new Map<number, Viewer[]>()
+ private readonly idToViewer = new Map<string, Viewer>()
+
+ private readonly salt = buildUUID()
+
+ private processingViewerCounters = false
+
+ constructor () {
+ setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER)
+ }
+
+ // ---------------------------------------------------------------------------
+
+ async addLocalViewer (options: {
+ video: MVideo
+ ip: string
+ }) {
+ const { video, ip } = options
+
+ logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) })
+
+ const viewerId = this.generateViewerId(ip, video.uuid)
+ const viewer = this.idToViewer.get(viewerId)
+
+ if (viewer) {
+ viewer.expires = this.buildViewerExpireTime()
+ await this.federateViewerIfNeeded(video, viewer)
+
+ return false
+ }
+
+ const newViewer = await this.addViewerToVideo({ viewerId, video })
+ await this.federateViewerIfNeeded(video, newViewer)
+
+ return true
+ }
+
+ async addRemoteViewer (options: {
+ video: MVideo
+ viewerId: string
+ viewerExpires: Date
+ }) {
+ const { video, viewerExpires, viewerId } = options
+
+ logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
+
+ await this.addViewerToVideo({ video, viewerExpires, viewerId })
+
+ return true
+ }
+
+ // ---------------------------------------------------------------------------
+
+ getViewers (video: MVideo) {
+ const viewers = this.viewersPerVideo.get(video.id)
+ if (!viewers) return 0
+
+ return viewers.length
+ }
+
+ buildViewerExpireTime () {
+ return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER
+ }
+
+ // ---------------------------------------------------------------------------
+
+ private async addViewerToVideo (options: {
+ video: MVideo
+ viewerId: string
+ viewerExpires?: Date
+ }) {
+ const { video, viewerExpires, viewerId } = options
+
+ let watchers = this.viewersPerVideo.get(video.id)
+
+ if (!watchers) {
+ watchers = []
+ this.viewersPerVideo.set(video.id, watchers)
+ }
+
+ const expires = viewerExpires
+ ? viewerExpires.getTime()
+ : this.buildViewerExpireTime()
+
+ const viewer = { id: viewerId, expires }
+ watchers.push(viewer)
+
+ this.idToViewer.set(viewerId, viewer)
+
+ await this.notifyClients(video.id, watchers.length)
+
+ return viewer
+ }
+
+ private async cleanViewerCounters () {
+ if (this.processingViewerCounters) return
+ this.processingViewerCounters = true
+
+ if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags())
+
+ try {
+ for (const videoId of this.viewersPerVideo.keys()) {
+ const notBefore = new Date().getTime()
+
+ const viewers = this.viewersPerVideo.get(videoId)
+
+ // Only keep not expired viewers
+ const newViewers: Viewer[] = []
+
+ // Filter new viewers
+ for (const viewer of viewers) {
+ if (viewer.expires > notBefore) {
+ newViewers.push(viewer)
+ } else {
+ this.idToViewer.delete(viewer.id)
+ }
+ }
+
+ if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
+ else this.viewersPerVideo.set(videoId, newViewers)
+
+ await this.notifyClients(videoId, newViewers.length)
+ }
+ } catch (err) {
+ logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
+ }
+
+ this.processingViewerCounters = false
+ }
+
+ private async notifyClients (videoId: string | number, viewersLength: number) {
+ const video = await VideoModel.loadImmutableAttributes(videoId)
+ if (!video) return
+
+ PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
+
+ logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
+ }
+
+ private generateViewerId (ip: string, videoUUID: string) {
+ return sha256(this.salt + '-' + ip + '-' + videoUUID)
+ }
+
+ private async federateViewerIfNeeded (video: MVideo, viewer: Viewer) {
+ // Federate the viewer if it's been a "long" time we did not
+ const now = new Date().getTime()
+ const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER / 2)
+
+ if (viewer.lastFederation && viewer.lastFederation > federationLimit) return
+
+ await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id })
+ viewer.lastFederation = now
+ }
+}
import { sequelizeTypescript } from '@server/initializers/database'
import { sendCreateWatchAction } from '@server/lib/activitypub/send'
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url'
-import { PeerTubeSocket } from '@server/lib/peertube-socket'
import { Redis } from '@server/lib/redis'
import { VideoModel } from '@server/models/video/video'
import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
videoId: number
}
-export class VideoViewers {
-
- // Values are Date().getTime()
- private readonly viewersPerVideo = new Map<number, number[]>()
-
- private processingViewerCounters = false
- private processingViewerStats = false
+export class VideoViewerStats {
+ private processingViewersStats = false
constructor () {
- setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER)
-
setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
}
// ---------------------------------------------------------------------------
- getViewers (video: MVideo) {
- const viewers = this.viewersPerVideo.get(video.id)
- if (!viewers) return 0
-
- return viewers.length
- }
-
- buildViewerExpireTime () {
- return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER
- }
-
- async getWatchTime (videoId: number, ip: string) {
- const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
-
- return stats?.watchTime || 0
- }
-
async addLocalViewer (options: {
video: MVideo
currentTime: number
}) {
const { video, ip, viewEvent, currentTime } = options
- logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
-
- await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
-
- const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
- if (viewExists) return false
+ logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
- await Redis.Instance.setIPVideoViewer(ip, video.uuid)
-
- return this.addViewerToVideo({ video })
+ return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
}
- async addRemoteViewer (options: {
- video: MVideo
- viewerExpires: Date
- }) {
- const { video, viewerExpires } = options
+ // ---------------------------------------------------------------------------
- logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
+ async getWatchTime (videoId: number, ip: string) {
+ const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
- return this.addViewerToVideo({ video, viewerExpires })
+ return stats?.watchTime || 0
}
- private async addViewerToVideo (options: {
- video: MVideo
- viewerExpires?: Date
- }) {
- const { video, viewerExpires } = options
-
- let watchers = this.viewersPerVideo.get(video.id)
-
- if (!watchers) {
- watchers = []
- this.viewersPerVideo.set(video.id, watchers)
- }
-
- const expiration = viewerExpires
- ? viewerExpires.getTime()
- : this.buildViewerExpireTime()
-
- watchers.push(expiration)
- await this.notifyClients(video.id, watchers.length)
-
- return true
- }
+ // ---------------------------------------------------------------------------
private async updateLocalViewerStats (options: {
video: MVideo
await Redis.Instance.setLocalVideoViewer(ip, video.id, stats)
}
- private async cleanViewerCounters () {
- if (this.processingViewerCounters) return
- this.processingViewerCounters = true
-
- if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags())
-
- try {
- for (const videoId of this.viewersPerVideo.keys()) {
- const notBefore = new Date().getTime()
-
- const viewers = this.viewersPerVideo.get(videoId)
-
- // Only keep not expired viewers
- const newViewers = viewers.filter(w => w > notBefore)
-
- if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
- else this.viewersPerVideo.set(videoId, newViewers)
-
- await this.notifyClients(videoId, newViewers.length)
- }
- } catch (err) {
- logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
- }
-
- this.processingViewerCounters = false
- }
-
- private async notifyClients (videoId: string | number, viewersLength: number) {
- const video = await VideoModel.loadImmutableAttributes(videoId)
- if (!video) return
-
- PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
-
- logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
- }
-
async processViewerStats () {
- if (this.processingViewerStats) return
- this.processingViewerStats = true
+ if (this.processingViewersStats) return
+ this.processingViewersStats = true
if (!isTestInstance()) logger.info('Processing viewer statistics.', lTags())
logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
}
- this.processingViewerStats = false
+ this.processingViewersStats = false
}
private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {
import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { sendView } from '@server/lib/activitypub/send/send-view'
+import { getServerActor } from '@server/models/application/application'
import { MVideo } from '@server/types/models'
+import { buildUUID } from '@shared/extra-utils'
import { Redis } from '../../redis'
const lTags = loggerTagsFactory('views')
await this.addView(video)
+ await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() })
+
return true
}
return true
}
+ // ---------------------------------------------------------------------------
+
private async addView (video: MVideo) {
const promises: Promise<any>[] = []
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { MVideo } from '@server/types/models'
import { VideoViewEvent } from '@shared/models'
-import { VideoViewers, VideoViews } from './shared'
+import { VideoViewerCounters, VideoViewerStats, VideoViews } from './shared'
/**
* If processing a local view:
private static instance: VideoViewsManager
- private videoViewers: VideoViewers
+ private videoViewerStats: VideoViewerStats
+ private videoViewerCounters: VideoViewerCounters
private videoViews: VideoViews
private constructor () {
}
init () {
- this.videoViewers = new VideoViewers()
+ this.videoViewerStats = new VideoViewerStats()
+ this.videoViewerCounters = new VideoViewerCounters()
this.videoViews = new VideoViews()
}
logger.debug('Processing local view for %s and ip %s.', video.url, ip, lTags())
- const successViewer = await this.videoViewers.addLocalViewer({ video, ip, viewEvent, currentTime })
+ await this.videoViewerStats.addLocalViewer({ video, ip, viewEvent, currentTime })
+
+ const successViewer = await this.videoViewerCounters.addLocalViewer({ video, ip })
// Do it after added local viewer to fetch updated information
- const watchTime = await this.videoViewers.getWatchTime(video.id, ip)
+ const watchTime = await this.videoViewerStats.getWatchTime(video.id, ip)
const successView = await this.videoViews.addLocalView({ video, watchTime, ip })
async processRemoteView (options: {
video: MVideo
+ viewerId: string | null
viewerExpires?: Date
}) {
- const { video, viewerExpires } = options
+ const { video, viewerId, viewerExpires } = options
- logger.debug('Processing remote view for %s.', video.url, { viewerExpires, ...lTags() })
+ logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() })
- if (viewerExpires) await this.videoViewers.addRemoteViewer({ video, viewerExpires })
+ if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires })
else await this.videoViews.addRemoteView({ video })
}
getViewers (video: MVideo) {
- return this.videoViewers.getViewers(video)
+ return this.videoViewerCounters.getViewers(video)
}
buildViewerExpireTime () {
- return this.videoViewers.buildViewerExpireTime()
+ return this.videoViewerCounters.buildViewerExpireTime()
}
- processViewers () {
- return this.videoViewers.processViewerStats()
+ processViewerStats () {
+ return this.videoViewerStats.processViewerStats()
}
static get Instance () {
let userAccessToken: string
before(async function () {
- this.timeout(30000)
+ this.timeout(120000)
servers = await createMultipleServers(2)
await setAccessTokensToServers(servers)
})
it('Should not view again this video with the same IP', async function () {
- await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
+ await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
+ await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
- await checkCounter('views', videoUUID, 1)
+ await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
await waitJobs(servers)
await processViewsBuffer(servers)
- await checkCounter('views', videoUUID, 2)
+ await checkCounter('views', videoUUID, 3)
})
})
let command: FfmpegCommand
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
let vodVideoId: string
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true }))
})
let command: FfmpegCommand
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
let videoUUID: string
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId: videoUUID } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
let vodVideoId: string
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true }))
})
let vodVideoId: string
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true }))
})
}
before(async function () {
- this.timeout(60000);
+ this.timeout(120000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})