diff options
Diffstat (limited to 'server/lib/schedulers')
-rw-r--r-- | server/lib/schedulers/video-views-buffer-scheduler.ts | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts new file mode 100644 index 000000000..c0e72c461 --- /dev/null +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts | |||
@@ -0,0 +1,52 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VideoModel } from '@server/models/video/video' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { federateVideoIfNeeded } from '../activitypub/videos' | ||
5 | import { Redis } from '../redis' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViewsBufferScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const videoIds = await Redis.Instance.listLocalVideosViewed() | ||
22 | if (videoIds.length === 0) return | ||
23 | |||
24 | logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) | ||
25 | |||
26 | for (const videoId of videoIds) { | ||
27 | try { | ||
28 | const views = await Redis.Instance.getLocalVideoViews(videoId) | ||
29 | await Redis.Instance.deleteLocalVideoViews(videoId) | ||
30 | |||
31 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
32 | if (!video) { | ||
33 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) | ||
34 | continue | ||
35 | } | ||
36 | |||
37 | // If this is a remote video, the origin instance will send us an update | ||
38 | await VideoModel.incrementViews(videoId, views) | ||
39 | |||
40 | // Send video update | ||
41 | video.views += views | ||
42 | await federateVideoIfNeeded(video, false) | ||
43 | } catch (err) { | ||
44 | logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) | ||
45 | } | ||
46 | } | ||
47 | } | ||
48 | |||
49 | static get Instance () { | ||
50 | return this.instance || (this.instance = new this()) | ||
51 | } | ||
52 | } | ||