'video-live-ending',
'video-redundancy',
'video-transcoding',
- 'videos-views',
+ 'videos-views-stats',
'move-to-object-storage'
]
return this.peertubeSocket.getLiveVideosObservable()
.subscribe(({ type, payload }) => {
if (type === 'state-change') return this.handleLiveStateChange(payload.state)
- if (type === 'views-change') return this.handleLiveViewsChange(payload.views)
+ if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers)
})
}
this.loadVideo(videoUUID)
}
- private handleLiveViewsChange (newViews: number) {
+ private handleLiveViewsChange (newViewers: number) {
if (!this.video) {
console.error('Cannot update video live views because video is no defined.')
return
console.log('Updating live views.')
- this.video.views = newViews
+ this.video.viewers = newViewers
}
private initHotkeys () {
url: string
views: number
+ // If live
+ viewers?: number
+
likes: number
dislikes: number
nsfw: boolean
this.url = hash.url
this.views = hash.views
+ this.viewers = hash.viewers
this.likes = hash.likes
this.dislikes = hash.dislikes
</ng-container>
<ng-container i18n *ngIf="video.isLive">
- {video.views, plural, =1 {1 viewer} other {{{ video.views | myNumberFormatter }} viewers}}
+ {video.viewers, plural, =1 {1 viewer} other {{{ video.viewers | myNumberFormatter }} viewers}}
</ng-container>
</span>
remote:
max_age: '30 days'
+ # PeerTube buffers local video views before updating and federating the video
+ local_buffer_update_interval: '30 minutes'
+
+ ip_view_expiration: '1 hour'
+
plugins:
# The website PeerTube will ask for available PeerTube plugins and themes
# This is an unmoderated plugin index, so only install plugins/themes you trust
remote:
max_age: '30 days'
+ # PeerTube buffers local video views before updating and federating the video
+ local_buffer_update_interval: '30 minutes'
+
+ ip_view_expiration: '1 hour'
+
plugins:
# The website PeerTube will ask for available PeerTube plugins and themes
# This is an unmoderated plugin index, so only install plugins/themes you trust
videos:
remote:
max_age: -1
+
+ local_buffer_update_interval: '5 seconds'
+ ip_view_expiration: '1 second'
import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler'
import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances'
import { RemoveDanglingResumableUploadsScheduler } from './server/lib/schedulers/remove-dangling-resumable-uploads-scheduler'
+import { VideoViewsBufferScheduler } from './server/lib/schedulers/video-views-buffer-scheduler'
import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto'
import { PeerTubeSocket } from './server/lib/peertube-socket'
import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls'
import { HttpStatusCode } from './shared/models/http/http-error-codes'
import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache'
import { ServerConfigManager } from '@server/lib/server-config-manager'
+import { VideoViews } from '@server/lib/video-views'
// ----------- Command line -----------
PeerTubeVersionCheckScheduler.Instance.enable()
AutoFollowIndexInstances.Instance.enable()
RemoveDanglingResumableUploadsScheduler.Instance.enable()
+ VideoViewsBufferScheduler.Instance.enable()
- // Redis initialization
Redis.Instance.init()
-
PeerTubeSocket.Instance.init(server)
+ VideoViews.Instance.init()
updateStreamingPlaylistsInfohashesIfNeeded()
.catch(err => logger.error('Cannot update streaming playlist infohashes.', { err }))
import toInt from 'validator/lib/toInt'
import { pickCommonVideoQuery } from '@server/helpers/query'
import { doJSONRequest } from '@server/helpers/requests'
-import { LiveManager } from '@server/lib/live'
+import { VideoViews } from '@server/lib/video-views'
import { openapiOperationDoc } from '@server/middlewares/doc'
import { getServerActor } from '@server/models/application/application'
import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils'
import { sendView } from '../../../lib/activitypub/send/send-view'
import { JobQueue } from '../../../lib/job-queue'
import { Hooks } from '../../../lib/plugins/hooks'
-import { Redis } from '../../../lib/redis'
import {
asyncMiddleware,
asyncRetryTransactionMiddleware,
)
videosRouter.post('/:id/views',
openapiOperationDoc({ operationId: 'addView' }),
- asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')),
+ asyncMiddleware(videosCustomGetValidator('only-video')),
asyncMiddleware(viewVideo)
)
}
async function viewVideo (req: express.Request, res: express.Response) {
- const immutableVideoAttrs = res.locals.onlyImmutableVideo
+ const video = res.locals.onlyVideo
const ip = req.ip
- const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid)
- if (exists) {
- logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
- return res.status(HttpStatusCode.NO_CONTENT_204).end()
- }
-
- const video = await VideoModel.load(immutableVideoAttrs.id)
-
- const promises: Promise<any>[] = [
- Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
- ]
-
- let federateView = true
-
- // Increment our live manager
- if (video.isLive && video.isOwned()) {
- LiveManager.Instance.addViewTo(video.id)
-
- // Views of our local live will be sent by our live manager
- federateView = false
- }
-
- // Increment our video views cache counter
- if (!video.isLive) {
- promises.push(Redis.Instance.addVideoView(video.id))
- }
+ const success = await VideoViews.Instance.processView({ video, ip })
- if (federateView) {
+ if (success) {
const serverActor = await getServerActor()
- promises.push(sendView(serverActor, video, undefined))
- }
-
- await Promise.all(promises)
+ await sendView(serverActor, video, undefined)
- Hooks.runAction('action:api.video.viewed', { video, ip })
+ Hooks.runAction('action:api.video.viewed', { video: video, ip })
+ }
return res.status(HttpStatusCode.NO_CONTENT_204).end()
}
'services.twitter.username', 'services.twitter.whitelisted',
'followers.instance.enabled', 'followers.instance.manual_approval',
'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces',
- 'history.videos.max_age', 'views.videos.remote.max_age',
+ 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration',
'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
'theme.default',
'remote_redundancy.videos.accept_from',
VIDEOS: {
REMOTE: {
MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age'))
- }
+ },
+ LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')),
+ IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration'))
}
},
PLUGINS: {
'video-import': 1,
'email': 5,
'actor-keys': 3,
- 'videos-views': 1,
+ 'videos-views-stats': 1,
'activitypub-refresher': 1,
'video-redundancy': 1,
'video-live-ending': 1,
'video-file-import': 1,
'email': 5,
'actor-keys': 1,
- 'videos-views': 1,
+ 'videos-views-stats': 1,
'activitypub-refresher': 1,
'video-redundancy': 1,
'video-live-ending': 10,
'video-import': 1000 * 3600 * 2, // 2 hours
'email': 60000 * 10, // 10 minutes
'actor-keys': 60000 * 20, // 20 minutes
- 'videos-views': undefined, // Unlimited
+ 'videos-views-stats': undefined, // Unlimited
'activitypub-refresher': 60000 * 10, // 10 minutes
'video-redundancy': 1000 * 3600 * 3, // 3 hours
'video-live-ending': 1000 * 60 * 10, // 10 minutes
'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
}
-const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
- 'videos-views': {
+const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
+ 'videos-views-stats': {
cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
},
'activitypub-cleaner': {
REMOVE_OLD_JOBS: 60000 * 60, // 1 hour
UPDATE_VIDEOS: 60000, // 1 minute
YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day
+ VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL,
CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day
AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day
}
const VIEW_LIFETIME = {
- VIDEO: 60000 * 60, // 1 hour
- LIVE: 60000 * 5 // 5 minutes
+ VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
+ VIEWER: 60000 * 5 // 5 minutes
}
let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
- REPEAT_JOBS['videos-views'] = { every: 5000 }
+ REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
- VIEW_LIFETIME.VIDEO = 1000 // 1 second
- VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
+ VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second
CONTACT_FORM_LIFETIME = 1000 // 1 second
JOB_ATTEMPTS['email'] = 1
-import { getOrCreateAPVideo } from '../videos'
-import { forwardVideoRelatedActivity } from '../send/utils'
-import { Redis } from '../../redis'
-import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
+import { VideoViews } from '@server/lib/video-views'
+import { ActivityView } from '../../../../shared/models/activitypub'
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
import { MActorSignature } from '../../../types/models'
-import { LiveManager } from '@server/lib/live/live-manager'
+import { forwardVideoRelatedActivity } from '../send/utils'
+import { getOrCreateAPVideo } from '../videos'
-async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
+async function processViewActivity (options: APProcessorOptions<ActivityView>) {
const { activity, byActor } = options
+
return processCreateView(activity, byActor)
}
// ---------------------------------------------------------------------------
-async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) {
- const videoObject = activity.type === 'View'
- ? activity.object
- : (activity.object as ViewObject).object
+async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
+ const videoObject = activity.object
const { video } = await getOrCreateAPVideo({
videoObject,
allowRefresh: false
})
- if (!video.isLive) {
- await Redis.Instance.addVideoView(video.id)
- }
+ const viewerExpires = activity.expires
+ ? new Date(activity.expires)
+ : undefined
- if (video.isOwned()) {
- // Our live manager will increment the counter and send the view to followers
- if (video.isLive) {
- LiveManager.Instance.addViewTo(video.id)
- return
- }
+ await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
+ if (video.isOwned()) {
// Forward the view but don't resend the activity to the sender
const exceptions = [ byActor ]
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
import { Transaction } from 'sequelize'
+import { VideoViews } from '@server/lib/video-views'
import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
import { logger } from '../../../helpers/logger'
id: url,
type: 'View' as 'View',
actor: byActor.url,
- object: video.url
+ object: video.url,
+ expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
},
audience
)
if (videoUpdated.isLive) {
PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
- PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
}
logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())
-import { Redis } from '../../redis'
+import { isTestInstance } from '../../../helpers/core-utils'
import { logger } from '../../../helpers/logger'
import { VideoModel } from '../../../models/video/video'
import { VideoViewModel } from '../../../models/video/video-view'
-import { isTestInstance } from '../../../helpers/core-utils'
-import { federateVideoIfNeeded } from '../../activitypub/videos'
+import { Redis } from '../../redis'
-async function processVideosViews () {
+async function processVideosViewsStats () {
const lastHour = new Date()
// In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
const startDate = lastHour.setMinutes(0, 0, 0)
const endDate = lastHour.setMinutes(59, 59, 999)
- const videoIds = await Redis.Instance.getVideosIdViewed(hour)
+ const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
if (videoIds.length === 0) return
- logger.info('Processing videos views in job for hour %d.', hour)
+ logger.info('Processing videos views stats in job for hour %d.', hour)
for (const videoId of videoIds) {
try {
- const views = await Redis.Instance.getVideoViews(videoId, hour)
- await Redis.Instance.deleteVideoViews(videoId, hour)
+ const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
+ await Redis.Instance.deleteVideoViewsStats(videoId, hour)
if (views) {
- logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
+ logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
try {
- const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+ const video = await VideoModel.load(videoId)
if (!video) {
- logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId)
+ logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
continue
}
views,
videoId
})
-
- if (video.isOwned()) {
- // If this is a remote video, the origin instance will send us an update
- await VideoModel.incrementViews(videoId, views)
-
- // Send video update
- video.views += views
- await federateVideoIfNeeded(video, false)
- }
} catch (err) {
- logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err })
+ logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
}
}
} catch (err) {
- logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err })
+ logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
}
}
}
// ---------------------------------------------------------------------------
export {
- processVideosViews
+ processVideosViewsStats
}
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
-import { processVideosViews } from './handlers/video-views'
+import { processVideosViewsStats } from './handlers/video-views-stats'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'email', payload: EmailPayload } |
{ type: 'video-import', payload: VideoImportPayload } |
{ type: 'activitypub-refresher', payload: RefreshPayload } |
- { type: 'videos-views', payload: {} } |
+ { type: 'videos-views-stats', payload: {} } |
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'actor-keys', payload: ActorKeysPayload } |
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
'video-transcoding': processVideoTranscoding,
'email': processEmail,
'video-import': processVideoImport,
- 'videos-views': processVideosViews,
+ 'videos-views-stats': processVideosViewsStats,
'activitypub-refresher': refreshAPObject,
'video-live-ending': processVideoLiveEnding,
'actor-keys': processActorKeys,
'video-transcoding',
'video-file-import',
'video-import',
- 'videos-views',
+ 'videos-views-stats',
'activitypub-refresher',
'video-redundancy',
'actor-keys',
}
private addRepeatableJobs () {
- this.queues['videos-views'].add({}, {
- repeat: REPEAT_JOBS['videos-views']
+ this.queues['videos-views-stats'].add({}, {
+ repeat: REPEAT_JOBS['videos-views-stats']
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
import { readFile } from 'fs-extra'
import { createServer, Server } from 'net'
import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
-import { isTestInstance } from '@server/helpers/core-utils'
import {
computeResolutionsToTranscode,
ffprobePromise,
} from '@server/helpers/ffprobe-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants'
+import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user'
import { VideoModel } from '@server/models/video/video'
import { VideoLiveModel } from '@server/models/video/video-live'
private readonly muxingSessions = new Map<string, MuxingSession>()
private readonly videoSessions = new Map<number, string>()
- // Values are Date().getTime()
- private readonly watchersPerVideo = new Map<number, number[]>()
private rtmpServer: Server
private rtmpsServer: ServerTLS
// Cleanup broken lives, that were terminated by a server restart for example
this.handleBrokenLives()
.catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
-
- setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
}
async run () {
this.abortSession(sessionId)
}
- addViewTo (videoId: number) {
- if (this.videoSessions.has(videoId) === false) return
-
- let watchers = this.watchersPerVideo.get(videoId)
-
- if (!watchers) {
- watchers = []
- this.watchersPerVideo.set(videoId, watchers)
- }
-
- watchers.push(new Date().getTime())
- }
-
private getContext () {
return context
}
}
private onMuxingFFmpegEnd (videoId: number) {
- this.watchersPerVideo.delete(videoId)
this.videoSessions.delete(videoId)
}
}
}
- private async updateLiveViews () {
- if (!this.isRunning()) return
-
- if (!isTestInstance()) logger.info('Updating live video views.', lTags())
-
- for (const videoId of this.watchersPerVideo.keys()) {
- const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
-
- const watchers = this.watchersPerVideo.get(videoId)
-
- const numWatchers = watchers.length
-
- const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
- video.views = numWatchers
- await video.save()
-
- await federateVideoIfNeeded(video, false)
-
- PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
-
- // Only keep not expired watchers
- const newWatchers = watchers.filter(w => w > notBefore)
- this.watchersPerVideo.set(videoId, newWatchers)
-
- logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
- }
- }
-
private async handleBrokenLives () {
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
import { Server as HTTPServer } from 'http'
import { Namespace, Server as SocketServer, Socket } from 'socket.io'
import { isIdValid } from '@server/helpers/custom-validators/misc'
-import { MVideo } from '@server/types/models'
+import { MVideo, MVideoImmutable } from '@server/types/models'
import { UserNotificationModelForApi } from '@server/types/models/user'
import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
import { logger } from '../helpers/logger'
.emit(type, data)
}
- sendVideoViewsUpdate (video: MVideo) {
- const data: LiveVideoEventPayload = { views: video.views }
+ sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
+ const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
const type: LiveVideoEventType = 'views-change'
- logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views })
+ logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
this.liveVideosNamespace
.in(video.id)
RESUMABLE_UPLOAD_SESSION_LIFETIME
} from '../initializers/constants'
import { CONFIG } from '../initializers/config'
+import { exists } from '@server/helpers/custom-validators/misc'
type CachedRoute = {
body: string
/* ************ Views per IP ************ */
- setIPVideoView (ip: string, videoUUID: string, isLive: boolean) {
- const lifetime = isLive
- ? VIEW_LIFETIME.LIVE
- : VIEW_LIFETIME.VIDEO
+ setIPVideoView (ip: string, videoUUID: string) {
+ return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
+ }
- return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime)
+ setIPVideoViewer (ip: string, videoUUID: string) {
+ return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
}
async doesVideoIPViewExist (ip: string, videoUUID: string) {
- return this.exists(this.generateViewKey(ip, videoUUID))
+ return this.exists(this.generateIPViewKey(ip, videoUUID))
+ }
+
+ async doesVideoIPViewerExist (ip: string, videoUUID: string) {
+ return this.exists(this.generateIPViewerKey(ip, videoUUID))
}
/* ************ Tracker IP block ************ */
return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
}
- /* ************ Video views ************ */
+ /* ************ Video views stats ************ */
- addVideoView (videoId: number) {
- const keyIncr = this.generateVideoViewKey(videoId)
- const keySet = this.generateVideosViewKey()
+ addVideoViewStats (videoId: number) {
+ const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
return Promise.all([
- this.addToSet(keySet, videoId.toString()),
- this.increment(keyIncr)
+ this.addToSet(setKey, videoId.toString()),
+ this.increment(videoKey)
])
}
- async getVideoViews (videoId: number, hour: number) {
- const key = this.generateVideoViewKey(videoId, hour)
+ async getVideoViewsStats (videoId: number, hour: number) {
+ const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
- const valueString = await this.getValue(key)
+ const valueString = await this.getValue(videoKey)
const valueInt = parseInt(valueString, 10)
if (isNaN(valueInt)) {
- logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
+ logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
return undefined
}
return valueInt
}
- async getVideosIdViewed (hour: number) {
- const key = this.generateVideosViewKey(hour)
+ async listVideosViewedForStats (hour: number) {
+ const { setKey } = this.generateVideoViewStatsKeys({ hour })
- const stringIds = await this.getSet(key)
+ const stringIds = await this.getSet(setKey)
return stringIds.map(s => parseInt(s, 10))
}
- deleteVideoViews (videoId: number, hour: number) {
- const keySet = this.generateVideosViewKey(hour)
- const keyIncr = this.generateVideoViewKey(videoId, hour)
+ deleteVideoViewsStats (videoId: number, hour: number) {
+ const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
+
+ return Promise.all([
+ this.deleteFromSet(setKey, videoId.toString()),
+ this.deleteKey(videoKey)
+ ])
+ }
+
+ /* ************ Local video views buffer ************ */
+
+ addLocalVideoView (videoId: number) {
+ const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
return Promise.all([
- this.deleteFromSet(keySet, videoId.toString()),
- this.deleteKey(keyIncr)
+ this.addToSet(setKey, videoId.toString()),
+ this.increment(videoKey)
+ ])
+ }
+
+ async getLocalVideoViews (videoId: number) {
+ const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
+
+ const valueString = await this.getValue(videoKey)
+ const valueInt = parseInt(valueString, 10)
+
+ if (isNaN(valueInt)) {
+ logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
+ return undefined
+ }
+
+ return valueInt
+ }
+
+ async listLocalVideosViewed () {
+ const { setKey } = this.generateLocalVideoViewsKeys()
+
+ const stringIds = await this.getSet(setKey)
+ return stringIds.map(s => parseInt(s, 10))
+ }
+
+ deleteLocalVideoViews (videoId: number) {
+ const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
+
+ return Promise.all([
+ this.deleteFromSet(setKey, videoId.toString()),
+ this.deleteKey(videoKey)
])
}
return req.method + '-' + req.originalUrl
}
- private generateVideosViewKey (hour?: number) {
- if (!hour) hour = new Date().getHours()
-
- return `videos-view-h${hour}`
+ private generateLocalVideoViewsKeys (videoId?: Number) {
+ return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
}
- private generateVideoViewKey (videoId: number, hour?: number) {
- if (hour === undefined || hour === null) hour = new Date().getHours()
+ private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
+ const hour = exists(options.hour)
+ ? options.hour
+ : new Date().getHours()
- return `video-view-${videoId}-h${hour}`
+ return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
}
private generateResetPasswordKey (userId: number) {
return 'verify-email-' + userId
}
- private generateViewKey (ip: string, videoUUID: string) {
+ private generateIPViewKey (ip: string, videoUUID: string) {
return `views-${videoUUID}-${ip}`
}
+ private generateIPViewerKey (ip: string, videoUUID: string) {
+ return `viewer-${videoUUID}-${ip}`
+ }
+
private generateTrackerBlockIPKey (ip: string) {
return `tracker-block-ip-${ip}`
}
--- /dev/null
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { VideoModel } from '@server/models/video/video'
+import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
+import { federateVideoIfNeeded } from '../activitypub/videos'
+import { Redis } from '../redis'
+import { AbstractScheduler } from './abstract-scheduler'
+
+const lTags = loggerTagsFactory('views')
+
+export class VideoViewsBufferScheduler extends AbstractScheduler {
+
+ private static instance: AbstractScheduler
+
+ protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
+
+ private constructor () {
+ super()
+ }
+
+ protected async internalExecute () {
+ const videoIds = await Redis.Instance.listLocalVideosViewed()
+ if (videoIds.length === 0) return
+
+ logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
+
+ for (const videoId of videoIds) {
+ try {
+ const views = await Redis.Instance.getLocalVideoViews(videoId)
+ await Redis.Instance.deleteLocalVideoViews(videoId)
+
+ const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+ if (!video) {
+ logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
+ continue
+ }
+
+ // If this is a remote video, the origin instance will send us an update
+ await VideoModel.incrementViews(videoId, views)
+
+ // Send video update
+ video.views += views
+ await federateVideoIfNeeded(video, false)
+ } catch (err) {
+ logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
+ }
+ }
+ }
+
+ static get Instance () {
+ return this.instance || (this.instance = new this())
+ }
+}
--- /dev/null
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { VIEW_LIFETIME } from '@server/initializers/constants'
+import { VideoModel } from '@server/models/video/video'
+import { MVideo } from '@server/types/models'
+import { PeerTubeSocket } from './peertube-socket'
+import { Redis } from './redis'
+
+const lTags = loggerTagsFactory('views')
+
+export class VideoViews {
+
+ // Values are Date().getTime()
+ private readonly viewersPerVideo = new Map<number, number[]>()
+
+ private static instance: VideoViews
+
+ private constructor () {
+ }
+
+ init () {
+ setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
+ }
+
+ async processView (options: {
+ video: MVideo
+ ip: string | null
+ viewerExpires?: Date
+ }) {
+ const { video, ip, viewerExpires } = options
+
+ logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
+
+ let success = await this.addView(video, ip)
+
+ if (video.isLive) {
+ const successViewer = await this.addViewer(video, ip, viewerExpires)
+ success ||= successViewer
+ }
+
+ return success
+ }
+
+ getViewers (video: MVideo) {
+ const viewers = this.viewersPerVideo.get(video.id)
+ if (!viewers) return 0
+
+ return viewers.length
+ }
+
+ buildViewerExpireTime () {
+ return new Date().getTime() + VIEW_LIFETIME.VIEWER
+ }
+
+ private async addView (video: MVideo, ip: string | null) {
+ const promises: Promise<any>[] = []
+
+ if (ip !== null) {
+ const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
+ if (viewExists) return false
+
+ promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
+ }
+
+ if (video.isOwned()) {
+ promises.push(Redis.Instance.addLocalVideoView(video.id))
+ }
+
+ promises.push(Redis.Instance.addVideoViewStats(video.id))
+
+ await Promise.all(promises)
+
+ return true
+ }
+
+ private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
+ if (ip !== null) {
+ const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
+ if (viewExists) return false
+
+ await Redis.Instance.setIPVideoViewer(ip, video.uuid)
+ }
+
+ let watchers = this.viewersPerVideo.get(video.id)
+
+ if (!watchers) {
+ watchers = []
+ this.viewersPerVideo.set(video.id, watchers)
+ }
+
+ const expiration = viewerExpires
+ ? viewerExpires.getTime()
+ : this.buildViewerExpireTime()
+
+ watchers.push(expiration)
+ await this.notifyClients(video.id, watchers.length)
+
+ return true
+ }
+
+ private async cleanViewers () {
+ logger.info('Cleaning video viewers.', lTags())
+
+ for (const videoId of this.viewersPerVideo.keys()) {
+ const notBefore = new Date().getTime()
+
+ const viewers = this.viewersPerVideo.get(videoId)
+
+ // Only keep not expired viewers
+ const newViewers = viewers.filter(w => w > notBefore)
+
+ if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
+ else this.viewersPerVideo.set(videoId, newViewers)
+
+ await this.notifyClients(videoId, newViewers.length)
+ }
+ }
+
+ private async notifyClients (videoId: string | number, viewersLength: number) {
+ const video = await VideoModel.loadImmutableAttributes(videoId)
+ if (!video) return
+
+ PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
+
+ logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
+ }
+
+ static get Instance () {
+ return this.instance || (this.instance = new this())
+ }
+}
import { uuidToShort } from '@server/helpers/uuid'
import { generateMagnetUri } from '@server/helpers/webtorrent'
import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls'
+import { VideoViews } from '@server/lib/video-views'
import { VideosCommonQueryAfterSanitize } from '@shared/models'
import { VideoFile } from '@shared/models/videos/video-file.model'
import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects'
pluginData: (video as any).pluginData
}
+ if (video.isLive) {
+ videoObject.viewers = VideoViews.Instance.getViewers(video)
+ }
+
const add = options.additionalAttributes
if (add?.state === true) {
videoObject.state = {
const expect = chai.expect
-describe('Test live', function () {
+describe('Live views', function () {
let servers: PeerTubeServer[] = []
before(async function () {
await doubleFollow(servers[0], servers[1])
})
- describe('Live views', function () {
- let liveVideoId: string
- let command: FfmpegCommand
+ let liveVideoId: string
+ let command: FfmpegCommand
- async function countViews (expected: number) {
- for (const server of servers) {
- const video = await server.videos.get({ id: liveVideoId })
- expect(video.views).to.equal(expected)
- }
+ async function countViewers (expectedViewers: number) {
+ for (const server of servers) {
+ const video = await server.videos.get({ id: liveVideoId })
+ expect(video.viewers).to.equal(expectedViewers)
}
+ }
- before(async function () {
- this.timeout(30000)
+ async function countViews (expectedViews: number) {
+ for (const server of servers) {
+ const video = await server.videos.get({ id: liveVideoId })
+ expect(video.views).to.equal(expectedViews)
+ }
+ }
- const liveAttributes = {
- name: 'live video',
- channelId: servers[0].store.channel.id,
- privacy: VideoPrivacy.PUBLIC
- }
+ before(async function () {
+ this.timeout(30000)
- const live = await servers[0].live.create({ fields: liveAttributes })
- liveVideoId = live.uuid
+ const liveAttributes = {
+ name: 'live video',
+ channelId: servers[0].store.channel.id,
+ privacy: VideoPrivacy.PUBLIC
+ }
- command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
- await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
- await waitJobs(servers)
- })
+ const live = await servers[0].live.create({ fields: liveAttributes })
+ liveVideoId = live.uuid
- it('Should display no views for a live', async function () {
- await countViews(0)
- })
+ command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
+ await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
+ await waitJobs(servers)
+ })
- it('Should view a live twice and display 1 view', async function () {
- this.timeout(30000)
+ it('Should display no views and viewers for a live', async function () {
+ await countViews(0)
+ await countViewers(0)
+ })
- await servers[0].videos.view({ id: liveVideoId })
- await servers[0].videos.view({ id: liveVideoId })
+ it('Should view a live twice and display 1 view/viewer', async function () {
+ this.timeout(30000)
- await wait(7000)
+ await servers[0].videos.view({ id: liveVideoId })
+ await servers[0].videos.view({ id: liveVideoId })
- await waitJobs(servers)
+ await waitJobs(servers)
+ await countViewers(1)
- await countViews(1)
- })
+ await wait(7000)
+ await countViews(1)
+ })
- it('Should wait and display 0 views', async function () {
- this.timeout(30000)
+ it('Should wait and display 0 viewers while still have 1 view', async function () {
+ this.timeout(30000)
- await wait(12000)
- await waitJobs(servers)
+ await wait(12000)
+ await waitJobs(servers)
- await countViews(0)
- })
+ await countViews(1)
+ await countViewers(0)
+ })
- it('Should view a live on a remote and on local and display 2 views', async function () {
- this.timeout(30000)
+ it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () {
+ this.timeout(30000)
- await servers[0].videos.view({ id: liveVideoId })
- await servers[1].videos.view({ id: liveVideoId })
- await servers[1].videos.view({ id: liveVideoId })
+ await servers[0].videos.view({ id: liveVideoId })
+ await servers[1].videos.view({ id: liveVideoId })
+ await servers[1].videos.view({ id: liveVideoId })
+ await waitJobs(servers)
- await wait(7000)
- await waitJobs(servers)
+ await countViewers(2)
- await countViews(2)
- })
+ await wait(7000)
+ await waitJobs(servers)
- after(async function () {
- await stopFfmpeg(command)
- })
+ await countViews(3)
})
after(async function () {
+ await stopFfmpeg(command)
await cleanupTests(servers)
})
})
let job = body.data[0]
// Skip repeat jobs
- if (job.type === 'videos-views') job = body.data[1]
+ if (job.type === 'videos-views-stats') job = body.data[1]
expect(job.state).to.equal('completed')
expect(job.type.startsWith('activitypub-')).to.be.true
-import { JobState } from '../../models'
+import { JobState, JobType } from '../../models'
import { wait } from '../miscs'
import { PeerTubeServer } from './server'
const states: JobState[] = [ 'waiting', 'active' ]
if (!skipDelayed) states.push('delayed')
- const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
+ const repeatableJobs: JobType[] = [ 'videos-views-stats', 'activitypub-cleaner' ]
let pendingRequests: boolean
function tasksBuilder () {
import { APObject } from './objects/object.model'
import { PlaylistObject } from './objects/playlist-object'
import { VideoCommentObject } from './objects/video-comment-object'
-import { ViewObject } from './objects/view-object'
export type Activity =
ActivityCreate |
export interface ActivityCreate extends BaseActivity {
type: 'Create'
- object: VideoObject | AbuseObject | ViewObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject
+ object: VideoObject | AbuseObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject
}
export interface ActivityUpdate extends BaseActivity {
type: 'View'
actor: string
object: APObject
+ expires: string
}
export interface ActivityDislike extends BaseActivity {
| 'video-transcoding'
| 'email'
| 'video-import'
- | 'videos-views'
+ | 'videos-views-stats'
| 'activitypub-refresher'
| 'video-redundancy'
| 'video-live-ending'
export interface LiveVideoEventPayload {
state?: VideoState
+
+ // FIXME: deprecated in 4.0 in favour of viewers
views?: number
+
+ viewers?: number
}
url: string
views: number
+ // If live
+ viewers?: number
+
likes: number
dislikes: number
nsfw: boolean
- video-transcoding
- video-file-import
- video-import
- - videos-views
+ - videos-views-stats
- activitypub-refresher
- video-redundancy
- video-live-ending
- $ref: '#/components/schemas/Video'
- type: object
properties:
+ viewers:
+ type: integer
+ description: If the video is a live, you have the amount of current viewers
descriptionPath:
type: string
example: /api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/description
- video-transcoding
- email
- video-import
- - videos-views
+ - videos-views-stats
- activitypub-refresher
- video-redundancy
data: