]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Refactor video views
authorChocobozzz <me@florianbigard.com>
Tue, 9 Nov 2021 09:11:20 +0000 (10:11 +0100)
committerChocobozzz <chocobozzz@cpy.re>
Tue, 9 Nov 2021 14:00:31 +0000 (15:00 +0100)
Introduce viewers attribute for live videos
Count views for live videos
Reduce delay to see the viewer update for lives
Add ability to configure video views buffer interval and view ip
expiration

31 files changed:
client/src/app/+admin/system/jobs/jobs.component.ts
client/src/app/+videos/+video-watch/video-watch.component.ts
client/src/app/shared/shared-main/video/video.model.ts
client/src/app/shared/shared-video/video-views-counter.component.html
config/default.yaml
config/production.yaml.example
config/test.yaml
server.ts
server/controllers/api/videos/index.ts
server/initializers/checker-before-init.ts
server/initializers/config.ts
server/initializers/constants.ts
server/lib/activitypub/process/process-view.ts
server/lib/activitypub/send/send-view.ts
server/lib/activitypub/videos/updater.ts
server/lib/job-queue/handlers/video-views-stats.ts [moved from server/lib/job-queue/handlers/video-views.ts with 52% similarity]
server/lib/job-queue/job-queue.ts
server/lib/live/live-manager.ts
server/lib/peertube-socket.ts
server/lib/redis.ts
server/lib/schedulers/video-views-buffer-scheduler.ts [new file with mode: 0644]
server/lib/video-views.ts [new file with mode: 0644]
server/models/video/formatter/video-format-utils.ts
server/tests/api/live/live-views.ts
server/tests/api/server/jobs.ts
shared/extra-utils/server/jobs.ts
shared/models/activitypub/activity.ts
shared/models/server/job.model.ts
shared/models/videos/live/live-video-event-payload.model.ts
shared/models/videos/video.model.ts
support/doc/api/openapi.yaml

index b12d7f80a5e7d29e723fdf4cc152ed9c8e0b1739..2cf1bff7ad592c7817b4c110a53dce5c68aead34 100644 (file)
@@ -36,7 +36,7 @@ export class JobsComponent extends RestTable implements OnInit {
     'video-live-ending',
     'video-redundancy',
     'video-transcoding',
-    'videos-views',
+    'videos-views-stats',
     'move-to-object-storage'
   ]
 
index 5ca9d5fa9d26b74ddb1710b4fa961b1c850531ed..fd61bcbf044f83317593fa12b15020c3096a860c 100644 (file)
@@ -658,7 +658,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
     return this.peertubeSocket.getLiveVideosObservable()
       .subscribe(({ type, payload }) => {
         if (type === 'state-change') return this.handleLiveStateChange(payload.state)
-        if (type === 'views-change') return this.handleLiveViewsChange(payload.views)
+        if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers)
       })
   }
 
@@ -677,7 +677,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
     this.loadVideo(videoUUID)
   }
 
-  private handleLiveViewsChange (newViews: number) {
+  private handleLiveViewsChange (newViewers: number) {
     if (!this.video) {
       console.error('Cannot update video live views because video is no defined.')
       return
@@ -685,7 +685,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
 
     console.log('Updating live views.')
 
-    this.video.views = newViews
+    this.video.viewers = newViewers
   }
 
   private initHotkeys () {
index b11316471fe0772d3e05edcbfebfdea712c294ca..472a8c8108b360b1ffdfa91b04229ce3b9630b52 100644 (file)
@@ -57,6 +57,9 @@ export class Video implements VideoServerModel {
   url: string
 
   views: number
+  // If live
+  viewers?: number
+
   likes: number
   dislikes: number
   nsfw: boolean
@@ -150,6 +153,7 @@ export class Video implements VideoServerModel {
     this.url = hash.url
 
     this.views = hash.views
+    this.viewers = hash.viewers
     this.likes = hash.likes
     this.dislikes = hash.dislikes
 
index a6679f74da64436a15c7153e8c6db3e421da83d1..b19c8b1375fa76c8b4f63ba6cf190f8b0ec3be85 100644 (file)
@@ -4,6 +4,6 @@
   </ng-container>
 
   <ng-container i18n *ngIf="video.isLive">
-    {video.views, plural, =1 {1 viewer} other {{{ video.views | myNumberFormatter }} viewers}}
+    {video.viewers, plural, =1 {1 viewer} other {{{ video.viewers | myNumberFormatter }} viewers}}
   </ng-container>
 </span>
index c30c29a6b8d2278a8ae179d3eef5bc25a67f5da6..ee7acb437d0f63ce95e39c42f530977bb14a2d1a 100644 (file)
@@ -232,6 +232,11 @@ views:
     remote:
       max_age: '30 days'
 
+    # PeerTube buffers local video views before updating and federating the video
+    local_buffer_update_interval: '30 minutes'
+
+    ip_view_expiration: '1 hour'
+
 plugins:
   # The website PeerTube will ask for available PeerTube plugins and themes
   # This is an unmoderated plugin index, so only install plugins/themes you trust
index 4dc5c281def176b07517542957be9673d8c3fa70..0175c7a122a823e174d6a09fb0065c1ab243cc18 100644 (file)
@@ -230,6 +230,11 @@ views:
     remote:
       max_age: '30 days'
 
+    # PeerTube buffers local video views before updating and federating the video
+    local_buffer_update_interval: '30 minutes'
+
+    ip_view_expiration: '1 hour'
+
 plugins:
   # The website PeerTube will ask for available PeerTube plugins and themes
   # This is an unmoderated plugin index, so only install plugins/themes you trust
index e9731d863ccf2a94dae1f2929bd6d2c16e7aa125..2e7f982d3ca01204da1cd58a3ab73b21174cff2c 100644 (file)
@@ -160,3 +160,6 @@ views:
   videos:
     remote:
       max_age: -1
+
+    local_buffer_update_interval: '5 seconds'
+    ip_view_expiration: '1 second'
index b8c1d12517e21666a1b00d314d6e20dbf34cade2..6a7dad0cdf00b6e0ddbd191890ea5c9d2a056b2c 100644 (file)
--- a/server.ts
+++ b/server.ts
@@ -117,6 +117,7 @@ import { VideosRedundancyScheduler } from './server/lib/schedulers/videos-redund
 import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler'
 import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances'
 import { RemoveDanglingResumableUploadsScheduler } from './server/lib/schedulers/remove-dangling-resumable-uploads-scheduler'
+import { VideoViewsBufferScheduler } from './server/lib/schedulers/video-views-buffer-scheduler'
 import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto'
 import { PeerTubeSocket } from './server/lib/peertube-socket'
 import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls'
@@ -128,6 +129,7 @@ import { LiveManager } from './server/lib/live'
 import { HttpStatusCode } from './shared/models/http/http-error-codes'
 import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache'
 import { ServerConfigManager } from '@server/lib/server-config-manager'
+import { VideoViews } from '@server/lib/video-views'
 
 // ----------- Command line -----------
 
@@ -296,11 +298,11 @@ async function startApplication () {
   PeerTubeVersionCheckScheduler.Instance.enable()
   AutoFollowIndexInstances.Instance.enable()
   RemoveDanglingResumableUploadsScheduler.Instance.enable()
+  VideoViewsBufferScheduler.Instance.enable()
 
-  // Redis initialization
   Redis.Instance.init()
-
   PeerTubeSocket.Instance.init(server)
+  VideoViews.Instance.init()
 
   updateStreamingPlaylistsInfohashesIfNeeded()
     .catch(err => logger.error('Cannot update streaming playlist infohashes.', { err }))
index 821161c646f0c9c47159320a66c556b8b07fe0ee..72b382595865356697030c6421d97977e6100e39 100644 (file)
@@ -2,7 +2,7 @@ import express from 'express'
 import toInt from 'validator/lib/toInt'
 import { pickCommonVideoQuery } from '@server/helpers/query'
 import { doJSONRequest } from '@server/helpers/requests'
-import { LiveManager } from '@server/lib/live'
+import { VideoViews } from '@server/lib/video-views'
 import { openapiOperationDoc } from '@server/middlewares/doc'
 import { getServerActor } from '@server/models/application/application'
 import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils'
@@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
 import { sendView } from '../../../lib/activitypub/send/send-view'
 import { JobQueue } from '../../../lib/job-queue'
 import { Hooks } from '../../../lib/plugins/hooks'
-import { Redis } from '../../../lib/redis'
 import {
   asyncMiddleware,
   asyncRetryTransactionMiddleware,
@@ -107,7 +106,7 @@ videosRouter.get('/:id',
 )
 videosRouter.post('/:id/views',
   openapiOperationDoc({ operationId: 'addView' }),
-  asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')),
+  asyncMiddleware(videosCustomGetValidator('only-video')),
   asyncMiddleware(viewVideo)
 )
 
@@ -153,44 +152,17 @@ function getVideo (_req: express.Request, res: express.Response) {
 }
 
 async function viewVideo (req: express.Request, res: express.Response) {
-  const immutableVideoAttrs = res.locals.onlyImmutableVideo
+  const video = res.locals.onlyVideo
 
   const ip = req.ip
-  const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid)
-  if (exists) {
-    logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
-    return res.status(HttpStatusCode.NO_CONTENT_204).end()
-  }
-
-  const video = await VideoModel.load(immutableVideoAttrs.id)
-
-  const promises: Promise<any>[] = [
-    Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
-  ]
-
-  let federateView = true
-
-  // Increment our live manager
-  if (video.isLive && video.isOwned()) {
-    LiveManager.Instance.addViewTo(video.id)
-
-    // Views of our local live will be sent by our live manager
-    federateView = false
-  }
-
-  // Increment our video views cache counter
-  if (!video.isLive) {
-    promises.push(Redis.Instance.addVideoView(video.id))
-  }
+  const success = await VideoViews.Instance.processView({ video, ip })
 
-  if (federateView) {
+  if (success) {
     const serverActor = await getServerActor()
-    promises.push(sendView(serverActor, video, undefined))
-  }
-
-  await Promise.all(promises)
+    await sendView(serverActor, video, undefined)
 
-  Hooks.runAction('action:api.video.viewed', { video, ip })
+    Hooks.runAction('action:api.video.viewed', { video: video, ip })
+  }
 
   return res.status(HttpStatusCode.NO_CONTENT_204).end()
 }
index 1015c5e451a8502bcfbf1fab7d6d63f7ffeacd72..51c39654839709f5efa7d86a77047775e1818b49 100644 (file)
@@ -38,7 +38,7 @@ function checkMissedConfig () {
     'services.twitter.username', 'services.twitter.whitelisted',
     'followers.instance.enabled', 'followers.instance.manual_approval',
     'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces',
-    'history.videos.max_age', 'views.videos.remote.max_age',
+    'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration',
     'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
     'theme.default',
     'remote_redundancy.videos.accept_from',
index 1288768d82b08768739bb9f3b5790cf5654e043c..dadda2a774d52e3dc3bf2d312cda612b118517fa 100644 (file)
@@ -182,7 +182,9 @@ const CONFIG = {
     VIDEOS: {
       REMOTE: {
         MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age'))
-      }
+      },
+      LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')),
+      IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration'))
     }
   },
   PLUGINS: {
index 845576667d8b70ca34a4a1d013c28458368a366d..b65741bbd29457de648a082c64e812245b77f4c9 100644 (file)
@@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'video-import': 1,
   'email': 5,
   'actor-keys': 3,
-  'videos-views': 1,
+  'videos-views-stats': 1,
   'activitypub-refresher': 1,
   'video-redundancy': 1,
   'video-live-ending': 1,
@@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
   'video-file-import': 1,
   'email': 5,
   'actor-keys': 1,
-  'videos-views': 1,
+  'videos-views-stats': 1,
   'activitypub-refresher': 1,
   'video-redundancy': 1,
   'video-live-ending': 10,
@@ -181,14 +181,14 @@ const JOB_TTL: { [id in JobType]: number } = {
   'video-import': 1000 * 3600 * 2, // 2 hours
   'email': 60000 * 10, // 10 minutes
   'actor-keys': 60000 * 20, // 20 minutes
-  'videos-views': undefined, // Unlimited
+  'videos-views-stats': undefined, // Unlimited
   'activitypub-refresher': 60000 * 10, // 10 minutes
   'video-redundancy': 1000 * 3600 * 3, // 3 hours
   'video-live-ending': 1000 * 60 * 10, // 10 minutes
   'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
 }
-const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
-  'videos-views': {
+const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
+  'videos-views-stats': {
     cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
   },
   'activitypub-cleaner': {
@@ -211,6 +211,7 @@ const SCHEDULER_INTERVALS_MS = {
   REMOVE_OLD_JOBS: 60000 * 60, // 1 hour
   UPDATE_VIDEOS: 60000, // 1 minute
   YOUTUBE_DL_UPDATE: 60000 * 60 * 24, // 1 day
+  VIDEO_VIEWS_BUFFER_UPDATE: CONFIG.VIEWS.VIDEOS.LOCAL_BUFFER_UPDATE_INTERVAL,
   CHECK_PLUGINS: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
   CHECK_PEERTUBE_VERSION: 60000 * 60 * 24, // 1 day
   AUTO_FOLLOW_INDEX_INSTANCES: 60000 * 60 * 24, // 1 day
@@ -343,8 +344,8 @@ const CONSTRAINTS_FIELDS = {
 }
 
 const VIEW_LIFETIME = {
-  VIDEO: 60000 * 60, // 1 hour
-  LIVE: 60000 * 5 // 5 minutes
+  VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
+  VIEWER: 60000 * 5 // 5 minutes
 }
 
 let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour
@@ -789,13 +790,12 @@ if (isTestInstance() === true) {
   SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
   SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
   SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
-  REPEAT_JOBS['videos-views'] = { every: 5000 }
+  REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
   REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
 
   REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
 
-  VIEW_LIFETIME.VIDEO = 1000 // 1 second
-  VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
+  VIEW_LIFETIME.VIEWER = 1000 * 5 // 5 second
   CONTACT_FORM_LIFETIME = 1000 // 1 second
 
   JOB_ATTEMPTS['email'] = 1
index 5593ee2570b14ba3115437b05d3bf598d806d230..720385f9bb049cbf95d5ad70e5fcf0fcae4e0869 100644 (file)
@@ -1,13 +1,13 @@
-import { getOrCreateAPVideo } from '../videos'
-import { forwardVideoRelatedActivity } from '../send/utils'
-import { Redis } from '../../redis'
-import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
+import { VideoViews } from '@server/lib/video-views'
+import { ActivityView } from '../../../../shared/models/activitypub'
 import { APProcessorOptions } from '../../../types/activitypub-processor.model'
 import { MActorSignature } from '../../../types/models'
-import { LiveManager } from '@server/lib/live/live-manager'
+import { forwardVideoRelatedActivity } from '../send/utils'
+import { getOrCreateAPVideo } from '../videos'
 
-async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
+async function processViewActivity (options: APProcessorOptions<ActivityView>) {
   const { activity, byActor } = options
+
   return processCreateView(activity, byActor)
 }
 
@@ -19,10 +19,8 @@ export {
 
 // ---------------------------------------------------------------------------
 
-async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) {
-  const videoObject = activity.type === 'View'
-    ? activity.object
-    : (activity.object as ViewObject).object
+async function processCreateView (activity: ActivityView, byActor: MActorSignature) {
+  const videoObject = activity.object
 
   const { video } = await getOrCreateAPVideo({
     videoObject,
@@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
     allowRefresh: false
   })
 
-  if (!video.isLive) {
-    await Redis.Instance.addVideoView(video.id)
-  }
+  const viewerExpires = activity.expires
+    ? new Date(activity.expires)
+    : undefined
 
-  if (video.isOwned()) {
-    // Our live manager will increment the counter and send the view to followers
-    if (video.isLive) {
-      LiveManager.Instance.addViewTo(video.id)
-      return
-    }
+  await VideoViews.Instance.processView({ video, ip: null, viewerExpires })
 
+  if (video.isOwned()) {
     // Forward the view but don't resend the activity to the sender
     const exceptions = [ byActor ]
     await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
index 153e942959160f4b536527d149e03b15a16b5f97..b12583e261e7eb3fd0300e0660abcd7fb40bbe6d 100644 (file)
@@ -1,4 +1,5 @@
 import { Transaction } from 'sequelize'
+import { VideoViews } from '@server/lib/video-views'
 import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models'
 import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub'
 import { logger } from '../../../helpers/logger'
@@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU
       id: url,
       type: 'View' as 'View',
       actor: byActor.url,
-      object: video.url
+      object: video.url,
+      expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString()
     },
     audience
   )
index 157569414a3cb095a93630e505ca29600c1c97bc..f786bb196efc73573e8b263c2493d5455b9e9d74 100644 (file)
@@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
 
       if (videoUpdated.isLive) {
         PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated)
-        PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated)
       }
 
       logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags())
similarity index 52%
rename from server/lib/job-queue/handlers/video-views.ts
rename to server/lib/job-queue/handlers/video-views-stats.ts
index 86d0a271f12a52d33111c56b6630ba986b9f8b22..caf5f69623e5c9bf6c8d88c76148ad3d172d8df6 100644 (file)
@@ -1,11 +1,10 @@
-import { Redis } from '../../redis'
+import { isTestInstance } from '../../../helpers/core-utils'
 import { logger } from '../../../helpers/logger'
 import { VideoModel } from '../../../models/video/video'
 import { VideoViewModel } from '../../../models/video/video-view'
-import { isTestInstance } from '../../../helpers/core-utils'
-import { federateVideoIfNeeded } from '../../activitypub/videos'
+import { Redis } from '../../redis'
 
-async function processVideosViews () {
+async function processVideosViewsStats () {
   const lastHour = new Date()
 
   // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@@ -15,23 +14,23 @@ async function processVideosViews () {
   const startDate = lastHour.setMinutes(0, 0, 0)
   const endDate = lastHour.setMinutes(59, 59, 999)
 
-  const videoIds = await Redis.Instance.getVideosIdViewed(hour)
+  const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
   if (videoIds.length === 0) return
 
-  logger.info('Processing videos views in job for hour %d.', hour)
+  logger.info('Processing videos views stats in job for hour %d.', hour)
 
   for (const videoId of videoIds) {
     try {
-      const views = await Redis.Instance.getVideoViews(videoId, hour)
-      await Redis.Instance.deleteVideoViews(videoId, hour)
+      const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
+      await Redis.Instance.deleteVideoViewsStats(videoId, hour)
 
       if (views) {
-        logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
+        logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
 
         try {
-          const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+          const video = await VideoModel.load(videoId)
           if (!video) {
-            logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId)
+            logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
             continue
           }
 
@@ -41,21 +40,12 @@ async function processVideosViews () {
             views,
             videoId
           })
-
-          if (video.isOwned()) {
-            // If this is a remote video, the origin instance will send us an update
-            await VideoModel.incrementViews(videoId, views)
-
-            // Send video update
-            video.views += views
-            await federateVideoIfNeeded(video, false)
-          }
         } catch (err) {
-          logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err })
+          logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
         }
       }
     } catch (err) {
-      logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err })
+      logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
     }
   }
 }
@@ -63,5 +53,5 @@ async function processVideosViews () {
 // ---------------------------------------------------------------------------
 
 export {
-  processVideosViews
+  processVideosViewsStats
 }
index 0eab720d9a20097460525606ba99829329d833ab..4c1597b334a7c877500d1d52f4ee52b384aa7a6f 100644 (file)
@@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import'
 import { processVideoImport } from './handlers/video-import'
 import { processVideoLiveEnding } from './handlers/video-live-ending'
 import { processVideoTranscoding } from './handlers/video-transcoding'
-import { processVideosViews } from './handlers/video-views'
+import { processVideosViewsStats } from './handlers/video-views-stats'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -49,7 +49,7 @@ type CreateJobArgument =
   { type: 'email', payload: EmailPayload } |
   { type: 'video-import', payload: VideoImportPayload } |
   { type: 'activitypub-refresher', payload: RefreshPayload } |
-  { type: 'videos-views', payload: {} } |
+  { type: 'videos-views-stats', payload: {} } |
   { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
   { type: 'actor-keys', payload: ActorKeysPayload } |
   { type: 'video-redundancy', payload: VideoRedundancyPayload } |
@@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
   'video-transcoding': processVideoTranscoding,
   'email': processEmail,
   'video-import': processVideoImport,
-  'videos-views': processVideosViews,
+  'videos-views-stats': processVideosViewsStats,
   'activitypub-refresher': refreshAPObject,
   'video-live-ending': processVideoLiveEnding,
   'actor-keys': processActorKeys,
@@ -89,7 +89,7 @@ const jobTypes: JobType[] = [
   'video-transcoding',
   'video-file-import',
   'video-import',
-  'videos-views',
+  'videos-views-stats',
   'activitypub-refresher',
   'video-redundancy',
   'actor-keys',
@@ -247,8 +247,8 @@ class JobQueue {
   }
 
   private addRepeatableJobs () {
-    this.queues['videos-views'].add({}, {
-      repeat: REPEAT_JOBS['videos-views']
+    this.queues['videos-views-stats'].add({}, {
+      repeat: REPEAT_JOBS['videos-views-stats']
     }).catch(err => logger.error('Cannot add repeatable job.', { err }))
 
     if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
index 1b7b9dd4d6f3cc20287030818d7085b7b886e600..2562edb7566507d23ffc3eaa27c8742c1656848e 100644 (file)
@@ -2,7 +2,6 @@
 import { readFile } from 'fs-extra'
 import { createServer, Server } from 'net'
 import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
-import { isTestInstance } from '@server/helpers/core-utils'
 import {
   computeResolutionsToTranscode,
   ffprobePromise,
@@ -12,7 +11,7 @@ import {
 } from '@server/helpers/ffprobe-utils'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
-import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants'
+import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
 import { UserModel } from '@server/models/user/user'
 import { VideoModel } from '@server/models/video/video'
 import { VideoLiveModel } from '@server/models/video/video-live'
@@ -53,8 +52,6 @@ class LiveManager {
 
   private readonly muxingSessions = new Map<string, MuxingSession>()
   private readonly videoSessions = new Map<number, string>()
-  // Values are Date().getTime()
-  private readonly watchersPerVideo = new Map<number, number[]>()
 
   private rtmpServer: Server
   private rtmpsServer: ServerTLS
@@ -99,8 +96,6 @@ class LiveManager {
     // Cleanup broken lives, that were terminated by a server restart for example
     this.handleBrokenLives()
       .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
-
-    setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
   }
 
   async run () {
@@ -184,19 +179,6 @@ class LiveManager {
     this.abortSession(sessionId)
   }
 
-  addViewTo (videoId: number) {
-    if (this.videoSessions.has(videoId) === false) return
-
-    let watchers = this.watchersPerVideo.get(videoId)
-
-    if (!watchers) {
-      watchers = []
-      this.watchersPerVideo.set(videoId, watchers)
-    }
-
-    watchers.push(new Date().getTime())
-  }
-
   private getContext () {
     return context
   }
@@ -377,7 +359,6 @@ class LiveManager {
   }
 
   private onMuxingFFmpegEnd (videoId: number) {
-    this.watchersPerVideo.delete(videoId)
     this.videoSessions.delete(videoId)
   }
 
@@ -411,34 +392,6 @@ class LiveManager {
     }
   }
 
-  private async updateLiveViews () {
-    if (!this.isRunning()) return
-
-    if (!isTestInstance()) logger.info('Updating live video views.', lTags())
-
-    for (const videoId of this.watchersPerVideo.keys()) {
-      const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
-
-      const watchers = this.watchersPerVideo.get(videoId)
-
-      const numWatchers = watchers.length
-
-      const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
-      video.views = numWatchers
-      await video.save()
-
-      await federateVideoIfNeeded(video, false)
-
-      PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
-
-      // Only keep not expired watchers
-      const newWatchers = watchers.filter(w => w > notBefore)
-      this.watchersPerVideo.set(videoId, newWatchers)
-
-      logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
-    }
-  }
-
   private async handleBrokenLives () {
     const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
 
index 901435dea792277d2aa02f9461e0d86ee87fc6b3..0398ca61dbf8f71e68c52be44cbed4fe2023dd7b 100644 (file)
@@ -1,7 +1,7 @@
 import { Server as HTTPServer } from 'http'
 import { Namespace, Server as SocketServer, Socket } from 'socket.io'
 import { isIdValid } from '@server/helpers/custom-validators/misc'
-import { MVideo } from '@server/types/models'
+import { MVideo, MVideoImmutable } from '@server/types/models'
 import { UserNotificationModelForApi } from '@server/types/models/user'
 import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
 import { logger } from '../helpers/logger'
@@ -78,11 +78,11 @@ class PeerTubeSocket {
       .emit(type, data)
   }
 
-  sendVideoViewsUpdate (video: MVideo) {
-    const data: LiveVideoEventPayload = { views: video.views }
+  sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
+    const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
     const type: LiveVideoEventType = 'views-change'
 
-    logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views })
+    logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
 
     this.liveVideosNamespace
       .in(video.id)
index 46617b07e4ac6405978d9249b1c1f8681b30f9c7..76b7868e850352b7647c6d0c91c4ac5a1c9c9b73 100644 (file)
@@ -13,6 +13,7 @@ import {
   RESUMABLE_UPLOAD_SESSION_LIFETIME
 } from '../initializers/constants'
 import { CONFIG } from '../initializers/config'
+import { exists } from '@server/helpers/custom-validators/misc'
 
 type CachedRoute = {
   body: string
@@ -119,16 +120,20 @@ class Redis {
 
   /* ************ Views per IP ************ */
 
-  setIPVideoView (ip: string, videoUUID: string, isLive: boolean) {
-    const lifetime = isLive
-      ? VIEW_LIFETIME.LIVE
-      : VIEW_LIFETIME.VIDEO
+  setIPVideoView (ip: string, videoUUID: string) {
+    return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
+  }
 
-    return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime)
+  setIPVideoViewer (ip: string, videoUUID: string) {
+    return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER)
   }
 
   async doesVideoIPViewExist (ip: string, videoUUID: string) {
-    return this.exists(this.generateViewKey(ip, videoUUID))
+    return this.exists(this.generateIPViewKey(ip, videoUUID))
+  }
+
+  async doesVideoIPViewerExist (ip: string, videoUUID: string) {
+    return this.exists(this.generateIPViewerKey(ip, videoUUID))
   }
 
   /* ************ Tracker IP block ************ */
@@ -160,46 +165,85 @@ class Redis {
     return this.setObject(this.generateCachedRouteKey(req), cached, lifetime)
   }
 
-  /* ************ Video views ************ */
+  /* ************ Video views stats ************ */
 
-  addVideoView (videoId: number) {
-    const keyIncr = this.generateVideoViewKey(videoId)
-    const keySet = this.generateVideosViewKey()
+  addVideoViewStats (videoId: number) {
+    const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId })
 
     return Promise.all([
-      this.addToSet(keySet, videoId.toString()),
-      this.increment(keyIncr)
+      this.addToSet(setKey, videoId.toString()),
+      this.increment(videoKey)
     ])
   }
 
-  async getVideoViews (videoId: number, hour: number) {
-    const key = this.generateVideoViewKey(videoId, hour)
+  async getVideoViewsStats (videoId: number, hour: number) {
+    const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
 
-    const valueString = await this.getValue(key)
+    const valueString = await this.getValue(videoKey)
     const valueInt = parseInt(valueString, 10)
 
     if (isNaN(valueInt)) {
-      logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
+      logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString)
       return undefined
     }
 
     return valueInt
   }
 
-  async getVideosIdViewed (hour: number) {
-    const key = this.generateVideosViewKey(hour)
+  async listVideosViewedForStats (hour: number) {
+    const { setKey } = this.generateVideoViewStatsKeys({ hour })
 
-    const stringIds = await this.getSet(key)
+    const stringIds = await this.getSet(setKey)
     return stringIds.map(s => parseInt(s, 10))
   }
 
-  deleteVideoViews (videoId: number, hour: number) {
-    const keySet = this.generateVideosViewKey(hour)
-    const keyIncr = this.generateVideoViewKey(videoId, hour)
+  deleteVideoViewsStats (videoId: number, hour: number) {
+    const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour })
+
+    return Promise.all([
+      this.deleteFromSet(setKey, videoId.toString()),
+      this.deleteKey(videoKey)
+    ])
+  }
+
+  /* ************ Local video views buffer ************ */
+
+  addLocalVideoView (videoId: number) {
+    const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId)
 
     return Promise.all([
-      this.deleteFromSet(keySet, videoId.toString()),
-      this.deleteKey(keyIncr)
+      this.addToSet(setKey, videoId.toString()),
+      this.increment(videoKey)
+    ])
+  }
+
+  async getLocalVideoViews (videoId: number) {
+    const { videoKey } = this.generateLocalVideoViewsKeys(videoId)
+
+    const valueString = await this.getValue(videoKey)
+    const valueInt = parseInt(valueString, 10)
+
+    if (isNaN(valueInt)) {
+      logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString)
+      return undefined
+    }
+
+    return valueInt
+  }
+
+  async listLocalVideosViewed () {
+    const { setKey } = this.generateLocalVideoViewsKeys()
+
+    const stringIds = await this.getSet(setKey)
+    return stringIds.map(s => parseInt(s, 10))
+  }
+
+  deleteLocalVideoViews (videoId: number) {
+    const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId)
+
+    return Promise.all([
+      this.deleteFromSet(setKey, videoId.toString()),
+      this.deleteKey(videoKey)
     ])
   }
 
@@ -233,16 +277,16 @@ class Redis {
     return req.method + '-' + req.originalUrl
   }
 
-  private generateVideosViewKey (hour?: number) {
-    if (!hour) hour = new Date().getHours()
-
-    return `videos-view-h${hour}`
+  private generateLocalVideoViewsKeys (videoId?: Number) {
+    return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` }
   }
 
-  private generateVideoViewKey (videoId: number, hour?: number) {
-    if (hour === undefined || hour === null) hour = new Date().getHours()
+  private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) {
+    const hour = exists(options.hour)
+      ? options.hour
+      : new Date().getHours()
 
-    return `video-view-${videoId}-h${hour}`
+    return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` }
   }
 
   private generateResetPasswordKey (userId: number) {
@@ -253,10 +297,14 @@ class Redis {
     return 'verify-email-' + userId
   }
 
-  private generateViewKey (ip: string, videoUUID: string) {
+  private generateIPViewKey (ip: string, videoUUID: string) {
     return `views-${videoUUID}-${ip}`
   }
 
+  private generateIPViewerKey (ip: string, videoUUID: string) {
+    return `viewer-${videoUUID}-${ip}`
+  }
+
   private generateTrackerBlockIPKey (ip: string) {
     return `tracker-block-ip-${ip}`
   }
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts
new file mode 100644 (file)
index 0000000..c0e72c4
--- /dev/null
@@ -0,0 +1,52 @@
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { VideoModel } from '@server/models/video/video'
+import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
+import { federateVideoIfNeeded } from '../activitypub/videos'
+import { Redis } from '../redis'
+import { AbstractScheduler } from './abstract-scheduler'
+
+const lTags = loggerTagsFactory('views')
+
+export class VideoViewsBufferScheduler extends AbstractScheduler {
+
+  private static instance: AbstractScheduler
+
+  protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
+
+  private constructor () {
+    super()
+  }
+
+  protected async internalExecute () {
+    const videoIds = await Redis.Instance.listLocalVideosViewed()
+    if (videoIds.length === 0) return
+
+    logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
+
+    for (const videoId of videoIds) {
+      try {
+        const views = await Redis.Instance.getLocalVideoViews(videoId)
+        await Redis.Instance.deleteLocalVideoViews(videoId)
+
+        const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+        if (!video) {
+          logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
+          continue
+        }
+
+        // If this is a remote video, the origin instance will send us an update
+        await VideoModel.incrementViews(videoId, views)
+
+        // Send video update
+        video.views += views
+        await federateVideoIfNeeded(video, false)
+      } catch (err) {
+        logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
+      }
+    }
+  }
+
+  static get Instance () {
+    return this.instance || (this.instance = new this())
+  }
+}
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts
new file mode 100644 (file)
index 0000000..220b509
--- /dev/null
@@ -0,0 +1,130 @@
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { VIEW_LIFETIME } from '@server/initializers/constants'
+import { VideoModel } from '@server/models/video/video'
+import { MVideo } from '@server/types/models'
+import { PeerTubeSocket } from './peertube-socket'
+import { Redis } from './redis'
+
+const lTags = loggerTagsFactory('views')
+
+export class VideoViews {
+
+  // Values are Date().getTime()
+  private readonly viewersPerVideo = new Map<number, number[]>()
+
+  private static instance: VideoViews
+
+  private constructor () {
+  }
+
+  init () {
+    setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER)
+  }
+
+  async processView (options: {
+    video: MVideo
+    ip: string | null
+    viewerExpires?: Date
+  }) {
+    const { video, ip, viewerExpires } = options
+
+    logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags())
+
+    let success = await this.addView(video, ip)
+
+    if (video.isLive) {
+      const successViewer = await this.addViewer(video, ip, viewerExpires)
+      success ||= successViewer
+    }
+
+    return success
+  }
+
+  getViewers (video: MVideo) {
+    const viewers = this.viewersPerVideo.get(video.id)
+    if (!viewers) return 0
+
+    return viewers.length
+  }
+
+  buildViewerExpireTime () {
+    return new Date().getTime() + VIEW_LIFETIME.VIEWER
+  }
+
+  private async addView (video: MVideo, ip: string | null) {
+    const promises: Promise<any>[] = []
+
+    if (ip !== null) {
+      const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid)
+      if (viewExists) return false
+
+      promises.push(Redis.Instance.setIPVideoView(ip, video.uuid))
+    }
+
+    if (video.isOwned()) {
+      promises.push(Redis.Instance.addLocalVideoView(video.id))
+    }
+
+    promises.push(Redis.Instance.addVideoViewStats(video.id))
+
+    await Promise.all(promises)
+
+    return true
+  }
+
+  private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) {
+    if (ip !== null) {
+      const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
+      if (viewExists) return false
+
+      await Redis.Instance.setIPVideoViewer(ip, video.uuid)
+    }
+
+    let watchers = this.viewersPerVideo.get(video.id)
+
+    if (!watchers) {
+      watchers = []
+      this.viewersPerVideo.set(video.id, watchers)
+    }
+
+    const expiration = viewerExpires
+      ? viewerExpires.getTime()
+      : this.buildViewerExpireTime()
+
+    watchers.push(expiration)
+    await this.notifyClients(video.id, watchers.length)
+
+    return true
+  }
+
+  private async cleanViewers () {
+    logger.info('Cleaning video viewers.', lTags())
+
+    for (const videoId of this.viewersPerVideo.keys()) {
+      const notBefore = new Date().getTime()
+
+      const viewers = this.viewersPerVideo.get(videoId)
+
+      // Only keep not expired viewers
+      const newViewers = viewers.filter(w => w > notBefore)
+
+      if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
+      else this.viewersPerVideo.set(videoId, newViewers)
+
+      await this.notifyClients(videoId, newViewers.length)
+    }
+  }
+
+  private async notifyClients (videoId: string | number, viewersLength: number) {
+    const video = await VideoModel.loadImmutableAttributes(videoId)
+    if (!video) return
+
+    PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
+
+    logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags())
+  }
+
+  static get Instance () {
+    return this.instance || (this.instance = new this())
+  }
+}
index ba49e41aedab8de0068110f1d8aed04fc643f27a..461e296df91db3b51c1602c4d1f5c95cb1494f87 100644 (file)
@@ -1,6 +1,7 @@
 import { uuidToShort } from '@server/helpers/uuid'
 import { generateMagnetUri } from '@server/helpers/webtorrent'
 import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls'
+import { VideoViews } from '@server/lib/video-views'
 import { VideosCommonQueryAfterSanitize } from '@shared/models'
 import { VideoFile } from '@shared/models/videos/video-file.model'
 import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects'
@@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm
     pluginData: (video as any).pluginData
   }
 
+  if (video.isLive) {
+    videoObject.viewers = VideoViews.Instance.getViewers(video)
+  }
+
   const add = options.additionalAttributes
   if (add?.state === true) {
     videoObject.state = {
index 5e3a79c6415f9511e33fcc8f501bfabbb47dfe34..9186af8e740f1e310cb1bc9fcf36630ec7339878 100644 (file)
@@ -19,7 +19,7 @@ import {
 
 const expect = chai.expect
 
-describe('Test live', function () {
+describe('Live views', function () {
   let servers: PeerTubeServer[] = []
 
   before(async function () {
@@ -47,79 +47,86 @@ describe('Test live', function () {
     await doubleFollow(servers[0], servers[1])
   })
 
-  describe('Live views', function () {
-    let liveVideoId: string
-    let command: FfmpegCommand
+  let liveVideoId: string
+  let command: FfmpegCommand
 
-    async function countViews (expected: number) {
-      for (const server of servers) {
-        const video = await server.videos.get({ id: liveVideoId })
-        expect(video.views).to.equal(expected)
-      }
+  async function countViewers (expectedViewers: number) {
+    for (const server of servers) {
+      const video = await server.videos.get({ id: liveVideoId })
+      expect(video.viewers).to.equal(expectedViewers)
     }
+  }
 
-    before(async function () {
-      this.timeout(30000)
+  async function countViews (expectedViews: number) {
+    for (const server of servers) {
+      const video = await server.videos.get({ id: liveVideoId })
+      expect(video.views).to.equal(expectedViews)
+    }
+  }
 
-      const liveAttributes = {
-        name: 'live video',
-        channelId: servers[0].store.channel.id,
-        privacy: VideoPrivacy.PUBLIC
-      }
+  before(async function () {
+    this.timeout(30000)
 
-      const live = await servers[0].live.create({ fields: liveAttributes })
-      liveVideoId = live.uuid
+    const liveAttributes = {
+      name: 'live video',
+      channelId: servers[0].store.channel.id,
+      privacy: VideoPrivacy.PUBLIC
+    }
 
-      command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
-      await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
-      await waitJobs(servers)
-    })
+    const live = await servers[0].live.create({ fields: liveAttributes })
+    liveVideoId = live.uuid
 
-    it('Should display no views for a live', async function () {
-      await countViews(0)
-    })
+    command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId })
+    await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
+    await waitJobs(servers)
+  })
 
-    it('Should view a live twice and display 1 view', async function () {
-      this.timeout(30000)
+  it('Should display no views and viewers for a live', async function () {
+    await countViews(0)
+    await countViewers(0)
+  })
 
-      await servers[0].videos.view({ id: liveVideoId })
-      await servers[0].videos.view({ id: liveVideoId })
+  it('Should view a live twice and display 1 view/viewer', async function () {
+    this.timeout(30000)
 
-      await wait(7000)
+    await servers[0].videos.view({ id: liveVideoId })
+    await servers[0].videos.view({ id: liveVideoId })
 
-      await waitJobs(servers)
+    await waitJobs(servers)
+    await countViewers(1)
 
-      await countViews(1)
-    })
+    await wait(7000)
+    await countViews(1)
+  })
 
-    it('Should wait and display 0 views', async function () {
-      this.timeout(30000)
+  it('Should wait and display 0 viewers while still have 1 view', async function () {
+    this.timeout(30000)
 
-      await wait(12000)
-      await waitJobs(servers)
+    await wait(12000)
+    await waitJobs(servers)
 
-      await countViews(0)
-    })
+    await countViews(1)
+    await countViewers(0)
+  })
 
-    it('Should view a live on a remote and on local and display 2 views', async function () {
-      this.timeout(30000)
+  it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () {
+    this.timeout(30000)
 
-      await servers[0].videos.view({ id: liveVideoId })
-      await servers[1].videos.view({ id: liveVideoId })
-      await servers[1].videos.view({ id: liveVideoId })
+    await servers[0].videos.view({ id: liveVideoId })
+    await servers[1].videos.view({ id: liveVideoId })
+    await servers[1].videos.view({ id: liveVideoId })
+    await waitJobs(servers)
 
-      await wait(7000)
-      await waitJobs(servers)
+    await countViewers(2)
 
-      await countViews(2)
-    })
+    await wait(7000)
+    await waitJobs(servers)
 
-    after(async function () {
-      await stopFfmpeg(command)
-    })
+    await countViews(3)
   })
 
   after(async function () {
+    await stopFfmpeg(command)
     await cleanupTests(servers)
   })
 })
index 8c4e012265cd28ac9bbe77f289ab36272d768718..5d946f5e819bdac055f2d672ee1e20c2c6825b91 100644 (file)
@@ -56,7 +56,7 @@ describe('Test jobs', function () {
 
       let job = body.data[0]
       // Skip repeat jobs
-      if (job.type === 'videos-views') job = body.data[1]
+      if (job.type === 'videos-views-stats') job = body.data[1]
 
       expect(job.state).to.equal('completed')
       expect(job.type.startsWith('activitypub-')).to.be.true
index 79b8c318342f57c790d2abd3388f47eda79db0e9..afaaa5cd6097fc3a64525859ac2b1941760d94ed 100644 (file)
@@ -1,5 +1,5 @@
 
-import { JobState } from '../../models'
+import { JobState, JobType } from '../../models'
 import { wait } from '../miscs'
 import { PeerTubeServer } from './server'
 
@@ -16,7 +16,7 @@ async function waitJobs (serversArg: PeerTubeServer[] | PeerTubeServer, skipDela
   const states: JobState[] = [ 'waiting', 'active' ]
   if (!skipDelayed) states.push('delayed')
 
-  const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
+  const repeatableJobs: JobType[] = [ 'videos-views-stats', 'activitypub-cleaner' ]
   let pendingRequests: boolean
 
   function tasksBuilder () {
index 548d8858ece4aea334ef47195f749d17301497dc..d6284e283fb7699f9e42ab22f05ace17a82c1ba4 100644 (file)
@@ -6,7 +6,6 @@ import { DislikeObject } from './objects/dislike-object'
 import { APObject } from './objects/object.model'
 import { PlaylistObject } from './objects/playlist-object'
 import { VideoCommentObject } from './objects/video-comment-object'
-import { ViewObject } from './objects/view-object'
 
 export type Activity =
   ActivityCreate |
@@ -53,7 +52,7 @@ export interface BaseActivity {
 
 export interface ActivityCreate extends BaseActivity {
   type: 'Create'
-  object: VideoObject | AbuseObject | ViewObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject
+  object: VideoObject | AbuseObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject
 }
 
 export interface ActivityUpdate extends BaseActivity {
@@ -100,6 +99,7 @@ export interface ActivityView extends BaseActivity {
   type: 'View'
   actor: string
   object: APObject
+  expires: string
 }
 
 export interface ActivityDislike extends BaseActivity {
index 12e0fcf85ed91567130aca79e248e5f4dd68cf3a..6da2753b33c12c13bf2f1d1d13b1baaa96a759de 100644 (file)
@@ -14,7 +14,7 @@ export type JobType =
   | 'video-transcoding'
   | 'email'
   | 'video-import'
-  | 'videos-views'
+  | 'videos-views-stats'
   | 'activitypub-refresher'
   | 'video-redundancy'
   | 'video-live-ending'
index 6cd7540e8a2f8f0b3bbc8013264c0ff193afe959..1a9ac512c98c621539617d8ebf7c6c8afe669288 100644 (file)
@@ -2,5 +2,9 @@ import { VideoState } from '../video-state.enum'
 
 export interface LiveVideoEventPayload {
   state?: VideoState
+
+  // FIXME: deprecated in 4.0 in favour of viewers
   views?: number
+
+  viewers?: number
 }
index 26cb595e73ab0acf305ca41f7c04e72d3d3fcac3..8d223cded5b85b10c85383d390dc798b330074fb 100644 (file)
@@ -39,6 +39,9 @@ export interface Video {
   url: string
 
   views: number
+  // If live
+  viewers?: number
+
   likes: number
   dislikes: number
   nsfw: boolean
index 0f72b08d2e48b22f7aadd226211d51b1663c9588..13757152cabe62c509c794df550f99eb5f3d14d8 100644 (file)
@@ -4892,7 +4892,7 @@ components:
           - video-transcoding
           - video-file-import
           - video-import
-          - videos-views
+          - videos-views-stats
           - activitypub-refresher
           - video-redundancy
           - video-live-ending
@@ -5397,6 +5397,9 @@ components:
         - $ref: '#/components/schemas/Video'
         - type: object
           properties:
+            viewers:
+              type: integer
+              description: If the video is a live, you have the amount of current viewers
             descriptionPath:
               type: string
               example: /api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/description
@@ -6300,7 +6303,7 @@ components:
             - video-transcoding
             - email
             - video-import
-            - videos-views
+            - videos-views-stats
             - activitypub-refresher
             - video-redundancy
         data: