blob: c0e72c4615c82913c55d215e0a7f2d33096dc206 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { VideoModel } from '@server/models/video/video'
import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { federateVideoIfNeeded } from '../activitypub/videos'
import { Redis } from '../redis'
import { AbstractScheduler } from './abstract-scheduler'
const lTags = loggerTagsFactory('views')
export class VideoViewsBufferScheduler extends AbstractScheduler {
private static instance: AbstractScheduler
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
private constructor () {
super()
}
protected async internalExecute () {
const videoIds = await Redis.Instance.listLocalVideosViewed()
if (videoIds.length === 0) return
logger.info('Processing local video views buffer.', { videoIds, ...lTags() })
for (const videoId of videoIds) {
try {
const views = await Redis.Instance.getLocalVideoViews(videoId)
await Redis.Instance.deleteLocalVideoViews(videoId)
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
if (!video) {
logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
continue
}
// If this is a remote video, the origin instance will send us an update
await VideoModel.incrementViews(videoId, views)
// Send video update
video.views += views
await federateVideoIfNeeded(video, false)
} catch (err) {
logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
}
}
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
|