diff options
Diffstat (limited to 'server/lib')
20 files changed, 658 insertions, 166 deletions
diff --git a/server/lib/activitypub/activity.ts b/server/lib/activitypub/activity.ts index cccb7b1c1..e6cec1ba7 100644 --- a/server/lib/activitypub/activity.ts +++ b/server/lib/activitypub/activity.ts | |||
@@ -4,6 +4,17 @@ function getAPId (object: string | { id: string }) { | |||
4 | return object.id | 4 | return object.id |
5 | } | 5 | } |
6 | 6 | ||
7 | function getActivityStreamDuration (duration: number) { | ||
8 | // https://www.w3.org/TR/activitystreams-vocabulary/#dfn-duration | ||
9 | return 'PT' + duration + 'S' | ||
10 | } | ||
11 | |||
12 | function getDurationFromActivityStream (duration: string) { | ||
13 | return parseInt(duration.replace(/[^\d]+/, '')) | ||
14 | } | ||
15 | |||
7 | export { | 16 | export { |
8 | getAPId | 17 | getAPId, |
18 | getActivityStreamDuration, | ||
19 | getDurationFromActivityStream | ||
9 | } | 20 | } |
diff --git a/server/lib/activitypub/context.ts b/server/lib/activitypub/context.ts index 3bc40e2aa..b452cf9b3 100644 --- a/server/lib/activitypub/context.ts +++ b/server/lib/activitypub/context.ts | |||
@@ -15,7 +15,7 @@ export { | |||
15 | 15 | ||
16 | type ContextValue = { [ id: string ]: (string | { '@type': string, '@id': string }) } | 16 | type ContextValue = { [ id: string ]: (string | { '@type': string, '@id': string }) } |
17 | 17 | ||
18 | const contextStore = { | 18 | const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string })[] } = { |
19 | Video: buildContext({ | 19 | Video: buildContext({ |
20 | Hashtag: 'as:Hashtag', | 20 | Hashtag: 'as:Hashtag', |
21 | uuid: 'sc:identifier', | 21 | uuid: 'sc:identifier', |
@@ -109,7 +109,8 @@ const contextStore = { | |||
109 | stopTimestamp: { | 109 | stopTimestamp: { |
110 | '@type': 'sc:Number', | 110 | '@type': 'sc:Number', |
111 | '@id': 'pt:stopTimestamp' | 111 | '@id': 'pt:stopTimestamp' |
112 | } | 112 | }, |
113 | uuid: 'sc:identifier' | ||
113 | }), | 114 | }), |
114 | 115 | ||
115 | CacheFile: buildContext({ | 116 | CacheFile: buildContext({ |
@@ -128,6 +129,24 @@ const contextStore = { | |||
128 | } | 129 | } |
129 | }), | 130 | }), |
130 | 131 | ||
132 | WatchAction: buildContext({ | ||
133 | WatchAction: 'sc:WatchAction', | ||
134 | startTimestamp: { | ||
135 | '@type': 'sc:Number', | ||
136 | '@id': 'pt:startTimestamp' | ||
137 | }, | ||
138 | stopTimestamp: { | ||
139 | '@type': 'sc:Number', | ||
140 | '@id': 'pt:stopTimestamp' | ||
141 | }, | ||
142 | watchSection: { | ||
143 | '@type': 'sc:Number', | ||
144 | '@id': 'pt:stopTimestamp' | ||
145 | }, | ||
146 | uuid: 'sc:identifier' | ||
147 | }), | ||
148 | |||
149 | Collection: buildContext(), | ||
131 | Follow: buildContext(), | 150 | Follow: buildContext(), |
132 | Reject: buildContext(), | 151 | Reject: buildContext(), |
133 | Accept: buildContext(), | 152 | Accept: buildContext(), |
diff --git a/server/lib/activitypub/local-video-viewer.ts b/server/lib/activitypub/local-video-viewer.ts new file mode 100644 index 000000000..738083adc --- /dev/null +++ b/server/lib/activitypub/local-video-viewer.ts | |||
@@ -0,0 +1,42 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' | ||
3 | import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { WatchActionObject } from '@shared/models' | ||
6 | import { getDurationFromActivityStream } from './activity' | ||
7 | |||
8 | async function createOrUpdateLocalVideoViewer (watchAction: WatchActionObject, video: MVideo, t: Transaction) { | ||
9 | const stats = await LocalVideoViewerModel.loadByUrl(watchAction.id) | ||
10 | if (stats) await stats.destroy({ transaction: t }) | ||
11 | |||
12 | const localVideoViewer = await LocalVideoViewerModel.create({ | ||
13 | url: watchAction.id, | ||
14 | uuid: watchAction.uuid, | ||
15 | |||
16 | watchTime: getDurationFromActivityStream(watchAction.duration), | ||
17 | |||
18 | startDate: new Date(watchAction.startTime), | ||
19 | endDate: new Date(watchAction.endTime), | ||
20 | |||
21 | country: watchAction.location | ||
22 | ? watchAction.location.addressCountry | ||
23 | : null, | ||
24 | |||
25 | videoId: video.id | ||
26 | }) | ||
27 | |||
28 | await LocalVideoViewerWatchSectionModel.bulkCreateSections({ | ||
29 | localVideoViewerId: localVideoViewer.id, | ||
30 | |||
31 | watchSections: watchAction.watchSections.map(s => ({ | ||
32 | start: s.startTimestamp, | ||
33 | end: s.endTimestamp | ||
34 | })) | ||
35 | }) | ||
36 | } | ||
37 | |||
38 | // --------------------------------------------------------------------------- | ||
39 | |||
40 | export { | ||
41 | createOrUpdateLocalVideoViewer | ||
42 | } | ||
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index b5b1a0feb..3e7931bb2 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | import { isBlockedByServerOrAccount } from '@server/lib/blocklist' | 1 | import { isBlockedByServerOrAccount } from '@server/lib/blocklist' |
2 | import { isRedundancyAccepted } from '@server/lib/redundancy' | 2 | import { isRedundancyAccepted } from '@server/lib/redundancy' |
3 | import { ActivityCreate, CacheFileObject, PlaylistObject, VideoCommentObject, VideoObject } from '@shared/models' | 3 | import { VideoModel } from '@server/models/video/video' |
4 | import { ActivityCreate, CacheFileObject, PlaylistObject, VideoCommentObject, VideoObject, WatchActionObject } from '@shared/models' | ||
4 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 5 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
5 | import { logger } from '../../../helpers/logger' | 6 | import { logger } from '../../../helpers/logger' |
6 | import { sequelizeTypescript } from '../../../initializers/database' | 7 | import { sequelizeTypescript } from '../../../initializers/database' |
@@ -8,6 +9,7 @@ import { APProcessorOptions } from '../../../types/activitypub-processor.model' | |||
8 | import { MActorSignature, MCommentOwnerVideo, MVideoAccountLightBlacklistAllFiles } from '../../../types/models' | 9 | import { MActorSignature, MCommentOwnerVideo, MVideoAccountLightBlacklistAllFiles } from '../../../types/models' |
9 | import { Notifier } from '../../notifier' | 10 | import { Notifier } from '../../notifier' |
10 | import { createOrUpdateCacheFile } from '../cache-file' | 11 | import { createOrUpdateCacheFile } from '../cache-file' |
12 | import { createOrUpdateLocalVideoViewer } from '../local-video-viewer' | ||
11 | import { createOrUpdateVideoPlaylist } from '../playlists' | 13 | import { createOrUpdateVideoPlaylist } from '../playlists' |
12 | import { forwardVideoRelatedActivity } from '../send/shared/send-utils' | 14 | import { forwardVideoRelatedActivity } from '../send/shared/send-utils' |
13 | import { resolveThread } from '../video-comments' | 15 | import { resolveThread } from '../video-comments' |
@@ -32,6 +34,10 @@ async function processCreateActivity (options: APProcessorOptions<ActivityCreate | |||
32 | return retryTransactionWrapper(processCreateVideoComment, activity, byActor, notify) | 34 | return retryTransactionWrapper(processCreateVideoComment, activity, byActor, notify) |
33 | } | 35 | } |
34 | 36 | ||
37 | if (activityType === 'WatchAction') { | ||
38 | return retryTransactionWrapper(processCreateWatchAction, activity) | ||
39 | } | ||
40 | |||
35 | if (activityType === 'CacheFile') { | 41 | if (activityType === 'CacheFile') { |
36 | return retryTransactionWrapper(processCreateCacheFile, activity, byActor) | 42 | return retryTransactionWrapper(processCreateCacheFile, activity, byActor) |
37 | } | 43 | } |
@@ -81,6 +87,19 @@ async function processCreateCacheFile (activity: ActivityCreate, byActor: MActor | |||
81 | } | 87 | } |
82 | } | 88 | } |
83 | 89 | ||
90 | async function processCreateWatchAction (activity: ActivityCreate) { | ||
91 | const watchAction = activity.object as WatchActionObject | ||
92 | |||
93 | if (watchAction.actionStatus !== 'CompletedActionStatus') return | ||
94 | |||
95 | const video = await VideoModel.loadByUrl(watchAction.object) | ||
96 | if (video.remote) return | ||
97 | |||
98 | await sequelizeTypescript.transaction(async t => { | ||
99 | return createOrUpdateLocalVideoViewer(watchAction, video, t) | ||
100 | }) | ||
101 | } | ||
102 | |||
84 | async function processCreateVideoComment (activity: ActivityCreate, byActor: MActorSignature, notify: boolean) { | 103 | async function processCreateVideoComment (activity: ActivityCreate, byActor: MActorSignature, notify: boolean) { |
85 | const commentObject = activity.object as VideoCommentObject | 104 | const commentObject = activity.object as VideoCommentObject |
86 | const byAccount = byActor.Account | 105 | const byAccount = byActor.Account |
diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index c59940164..bad079843 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { VideoViews } from '@server/lib/video-views' | 1 | import { VideoViewsManager } from '@server/lib/views/video-views-manager' |
2 | import { ActivityView } from '../../../../shared/models/activitypub' | 2 | import { ActivityView } from '../../../../shared/models/activitypub' |
3 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' | 3 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' |
4 | import { MActorSignature } from '../../../types/models' | 4 | import { MActorSignature } from '../../../types/models' |
@@ -32,7 +32,7 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu | |||
32 | ? new Date(activity.expires) | 32 | ? new Date(activity.expires) |
33 | : undefined | 33 | : undefined |
34 | 34 | ||
35 | await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) | 35 | await VideoViewsManager.Instance.processRemoteView({ video, viewerExpires }) |
36 | 36 | ||
37 | if (video.isOwned()) { | 37 | if (video.isOwned()) { |
38 | // Forward the view but don't resend the activity to the sender | 38 | // Forward the view but don't resend the activity to the sender |
diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 5d8763495..7c3a6bdd0 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts | |||
@@ -6,6 +6,7 @@ import { VideoCommentModel } from '../../../models/video/video-comment' | |||
6 | import { | 6 | import { |
7 | MActorLight, | 7 | MActorLight, |
8 | MCommentOwnerVideo, | 8 | MCommentOwnerVideo, |
9 | MLocalVideoViewerWithWatchSections, | ||
9 | MVideoAccountLight, | 10 | MVideoAccountLight, |
10 | MVideoAP, | 11 | MVideoAP, |
11 | MVideoPlaylistFull, | 12 | MVideoPlaylistFull, |
@@ -19,6 +20,7 @@ import { | |||
19 | getActorsInvolvedInVideo, | 20 | getActorsInvolvedInVideo, |
20 | getAudienceFromFollowersOf, | 21 | getAudienceFromFollowersOf, |
21 | getVideoCommentAudience, | 22 | getVideoCommentAudience, |
23 | sendVideoActivityToOrigin, | ||
22 | sendVideoRelatedActivity, | 24 | sendVideoRelatedActivity, |
23 | unicastTo | 25 | unicastTo |
24 | } from './shared' | 26 | } from './shared' |
@@ -61,6 +63,18 @@ async function sendCreateCacheFile ( | |||
61 | }) | 63 | }) |
62 | } | 64 | } |
63 | 65 | ||
66 | async function sendCreateWatchAction (stats: MLocalVideoViewerWithWatchSections, transaction: Transaction) { | ||
67 | logger.info('Creating job to send create watch action %s.', stats.url, lTags(stats.uuid)) | ||
68 | |||
69 | const byActor = await getServerActor() | ||
70 | |||
71 | const activityBuilder = (audience: ActivityAudience) => { | ||
72 | return buildCreateActivity(stats.url, byActor, stats.toActivityPubObject(), audience) | ||
73 | } | ||
74 | |||
75 | return sendVideoActivityToOrigin(activityBuilder, { byActor, video: stats.Video, transaction, contextType: 'WatchAction' }) | ||
76 | } | ||
77 | |||
64 | async function sendCreateVideoPlaylist (playlist: MVideoPlaylistFull, transaction: Transaction) { | 78 | async function sendCreateVideoPlaylist (playlist: MVideoPlaylistFull, transaction: Transaction) { |
65 | if (playlist.privacy === VideoPlaylistPrivacy.PRIVATE) return undefined | 79 | if (playlist.privacy === VideoPlaylistPrivacy.PRIVATE) return undefined |
66 | 80 | ||
@@ -175,7 +189,8 @@ export { | |||
175 | buildCreateActivity, | 189 | buildCreateActivity, |
176 | sendCreateVideoComment, | 190 | sendCreateVideoComment, |
177 | sendCreateVideoPlaylist, | 191 | sendCreateVideoPlaylist, |
178 | sendCreateCacheFile | 192 | sendCreateCacheFile, |
193 | sendCreateWatchAction | ||
179 | } | 194 | } |
180 | 195 | ||
181 | // --------------------------------------------------------------------------- | 196 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 1f97307b9..1088bf258 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts | |||
@@ -1,27 +1,49 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { VideoViews } from '@server/lib/video-views' | 2 | import { VideoViewsManager } from '@server/lib/views/video-views-manager' |
3 | import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' | 3 | import { MActorAudience, MActorLight, MVideoImmutable, MVideoUrl } from '@server/types/models' |
4 | import { ActivityAudience, ActivityView } from '@shared/models' | 4 | import { ActivityAudience, ActivityView } from '@shared/models' |
5 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
6 | import { ActorModel } from '../../../models/actor/actor' | ||
7 | import { audiencify, getAudience } from '../audience' | 6 | import { audiencify, getAudience } from '../audience' |
8 | import { getLocalVideoViewActivityPubUrl } from '../url' | 7 | import { getLocalVideoViewActivityPubUrl } from '../url' |
9 | import { sendVideoRelatedActivity } from './shared/send-utils' | 8 | import { sendVideoRelatedActivity } from './shared/send-utils' |
10 | 9 | ||
11 | async function sendView (byActor: ActorModel, video: MVideoImmutable, t: Transaction) { | 10 | type ViewType = 'view' | 'viewer' |
12 | logger.info('Creating job to send view of %s.', video.url) | 11 | |
12 | async function sendView (options: { | ||
13 | byActor: MActorLight | ||
14 | type: ViewType | ||
15 | video: MVideoImmutable | ||
16 | transaction?: Transaction | ||
17 | }) { | ||
18 | const { byActor, type, video, transaction } = options | ||
19 | |||
20 | logger.info('Creating job to send %s of %s.', type, video.url) | ||
13 | 21 | ||
14 | const activityBuilder = (audience: ActivityAudience) => { | 22 | const activityBuilder = (audience: ActivityAudience) => { |
15 | const url = getLocalVideoViewActivityPubUrl(byActor, video) | 23 | const url = getLocalVideoViewActivityPubUrl(byActor, video) |
16 | 24 | ||
17 | return buildViewActivity(url, byActor, video, audience) | 25 | return buildViewActivity({ url, byActor, video, audience, type }) |
18 | } | 26 | } |
19 | 27 | ||
20 | return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction: t, contextType: 'View' }) | 28 | return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' }) |
21 | } | 29 | } |
22 | 30 | ||
23 | function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoUrl, audience?: ActivityAudience): ActivityView { | 31 | // --------------------------------------------------------------------------- |
24 | if (!audience) audience = getAudience(byActor) | 32 | |
33 | export { | ||
34 | sendView | ||
35 | } | ||
36 | |||
37 | // --------------------------------------------------------------------------- | ||
38 | |||
39 | function buildViewActivity (options: { | ||
40 | url: string | ||
41 | byActor: MActorAudience | ||
42 | video: MVideoUrl | ||
43 | type: ViewType | ||
44 | audience?: ActivityAudience | ||
45 | }): ActivityView { | ||
46 | const { url, byActor, type, video, audience = getAudience(byActor) } = options | ||
25 | 47 | ||
26 | return audiencify( | 48 | return audiencify( |
27 | { | 49 | { |
@@ -29,14 +51,11 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU | |||
29 | type: 'View' as 'View', | 51 | type: 'View' as 'View', |
30 | actor: byActor.url, | 52 | actor: byActor.url, |
31 | object: video.url, | 53 | object: video.url, |
32 | expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() | 54 | |
55 | expires: type === 'viewer' | ||
56 | ? new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString() | ||
57 | : undefined | ||
33 | }, | 58 | }, |
34 | audience | 59 | audience |
35 | ) | 60 | ) |
36 | } | 61 | } |
37 | |||
38 | // --------------------------------------------------------------------------- | ||
39 | |||
40 | export { | ||
41 | sendView | ||
42 | } | ||
diff --git a/server/lib/activitypub/url.ts b/server/lib/activitypub/url.ts index 50be4fac9..8443fef4c 100644 --- a/server/lib/activitypub/url.ts +++ b/server/lib/activitypub/url.ts | |||
@@ -7,6 +7,7 @@ import { | |||
7 | MActorId, | 7 | MActorId, |
8 | MActorUrl, | 8 | MActorUrl, |
9 | MCommentId, | 9 | MCommentId, |
10 | MLocalVideoViewer, | ||
10 | MVideoId, | 11 | MVideoId, |
11 | MVideoPlaylistElement, | 12 | MVideoPlaylistElement, |
12 | MVideoUrl, | 13 | MVideoUrl, |
@@ -59,6 +60,10 @@ function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId) { | |||
59 | return byActor.url + '/views/videos/' + video.id + '/' + new Date().toISOString() | 60 | return byActor.url + '/views/videos/' + video.id + '/' + new Date().toISOString() |
60 | } | 61 | } |
61 | 62 | ||
63 | function getLocalVideoViewerActivityPubUrl (stats: MLocalVideoViewer) { | ||
64 | return WEBSERVER.URL + '/videos/local-viewer/' + stats.uuid | ||
65 | } | ||
66 | |||
62 | function getVideoLikeActivityPubUrlByLocalActor (byActor: MActorUrl, video: MVideoId) { | 67 | function getVideoLikeActivityPubUrlByLocalActor (byActor: MActorUrl, video: MVideoId) { |
63 | return byActor.url + '/likes/' + video.id | 68 | return byActor.url + '/likes/' + video.id |
64 | } | 69 | } |
@@ -167,6 +172,7 @@ export { | |||
167 | getLocalVideoCommentsActivityPubUrl, | 172 | getLocalVideoCommentsActivityPubUrl, |
168 | getLocalVideoLikesActivityPubUrl, | 173 | getLocalVideoLikesActivityPubUrl, |
169 | getLocalVideoDislikesActivityPubUrl, | 174 | getLocalVideoDislikesActivityPubUrl, |
175 | getLocalVideoViewerActivityPubUrl, | ||
170 | 176 | ||
171 | getAbuseTargetUrl, | 177 | getAbuseTargetUrl, |
172 | checkUrlsSameHost, | 178 | checkUrlsSameHost, |
diff --git a/server/lib/activitypub/videos/shared/object-to-model-attributes.ts b/server/lib/activitypub/videos/shared/object-to-model-attributes.ts index c97217669..f02b9cba6 100644 --- a/server/lib/activitypub/videos/shared/object-to-model-attributes.ts +++ b/server/lib/activitypub/videos/shared/object-to-model-attributes.ts | |||
@@ -24,6 +24,7 @@ import { | |||
24 | VideoPrivacy, | 24 | VideoPrivacy, |
25 | VideoStreamingPlaylistType | 25 | VideoStreamingPlaylistType |
26 | } from '@shared/models' | 26 | } from '@shared/models' |
27 | import { getDurationFromActivityStream } from '../../activity' | ||
27 | 28 | ||
28 | function getThumbnailFromIcons (videoObject: VideoObject) { | 29 | function getThumbnailFromIcons (videoObject: VideoObject) { |
29 | let validIcons = videoObject.icon.filter(i => i.width > THUMBNAILS_SIZE.minWidth) | 30 | let validIcons = videoObject.icon.filter(i => i.width > THUMBNAILS_SIZE.minWidth) |
@@ -170,7 +171,6 @@ function getVideoAttributesFromObject (videoChannel: MChannelId, videoObject: Vi | |||
170 | ? VideoPrivacy.PUBLIC | 171 | ? VideoPrivacy.PUBLIC |
171 | : VideoPrivacy.UNLISTED | 172 | : VideoPrivacy.UNLISTED |
172 | 173 | ||
173 | const duration = videoObject.duration.replace(/[^\d]+/, '') | ||
174 | const language = videoObject.language?.identifier | 174 | const language = videoObject.language?.identifier |
175 | 175 | ||
176 | const category = videoObject.category | 176 | const category = videoObject.category |
@@ -200,7 +200,7 @@ function getVideoAttributesFromObject (videoChannel: MChannelId, videoObject: Vi | |||
200 | isLive: videoObject.isLiveBroadcast, | 200 | isLive: videoObject.isLiveBroadcast, |
201 | state: videoObject.state, | 201 | state: videoObject.state, |
202 | channelId: videoChannel.id, | 202 | channelId: videoChannel.id, |
203 | duration: parseInt(duration, 10), | 203 | duration: getDurationFromActivityStream(videoObject.duration), |
204 | createdAt: new Date(videoObject.published), | 204 | createdAt: new Date(videoObject.published), |
205 | publishedAt: new Date(videoObject.published), | 205 | publishedAt: new Date(videoObject.published), |
206 | 206 | ||
diff --git a/server/lib/client-html.ts b/server/lib/client-html.ts index a9c835fbf..337364ac9 100644 --- a/server/lib/client-html.ts +++ b/server/lib/client-html.ts | |||
@@ -23,11 +23,11 @@ import { | |||
23 | WEBSERVER | 23 | WEBSERVER |
24 | } from '../initializers/constants' | 24 | } from '../initializers/constants' |
25 | import { AccountModel } from '../models/account/account' | 25 | import { AccountModel } from '../models/account/account' |
26 | import { getActivityStreamDuration } from '../models/video/formatter/video-format-utils' | ||
27 | import { VideoModel } from '../models/video/video' | 26 | import { VideoModel } from '../models/video/video' |
28 | import { VideoChannelModel } from '../models/video/video-channel' | 27 | import { VideoChannelModel } from '../models/video/video-channel' |
29 | import { VideoPlaylistModel } from '../models/video/video-playlist' | 28 | import { VideoPlaylistModel } from '../models/video/video-playlist' |
30 | import { MAccountActor, MChannelActor } from '../types/models' | 29 | import { MAccountActor, MChannelActor } from '../types/models' |
30 | import { getActivityStreamDuration } from './activitypub/activity' | ||
31 | import { getBiggestActorImage } from './actor-image' | 31 | import { getBiggestActorImage } from './actor-image' |
32 | import { ServerConfigManager } from './server-config-manager' | 32 | import { ServerConfigManager } from './server-config-manager' |
33 | 33 | ||
diff --git a/server/lib/job-queue/handlers/video-views-stats.ts b/server/lib/job-queue/handlers/video-views-stats.ts index caf5f6962..689a5a3b4 100644 --- a/server/lib/job-queue/handlers/video-views-stats.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { VideoViewModel } from '@server/models/view/video-view' | ||
1 | import { isTestInstance } from '../../../helpers/core-utils' | 2 | import { isTestInstance } from '../../../helpers/core-utils' |
2 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
3 | import { VideoModel } from '../../../models/video/video' | 4 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-view' | ||
5 | import { Redis } from '../../redis' | 5 | import { Redis } from '../../redis' |
6 | 6 | ||
7 | async function processVideosViewsStats () { | 7 | async function processVideosViewsStats () { |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index c4c1fa443..b86aefa0e 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -249,6 +249,45 @@ class Redis { | |||
249 | ]) | 249 | ]) |
250 | } | 250 | } |
251 | 251 | ||
252 | /* ************ Video viewers stats ************ */ | ||
253 | |||
254 | getLocalVideoViewer (options: { | ||
255 | key?: string | ||
256 | // Or | ||
257 | ip?: string | ||
258 | videoId?: number | ||
259 | }) { | ||
260 | if (options.key) return this.getObject(options.key) | ||
261 | |||
262 | const { viewerKey } = this.generateLocalVideoViewerKeys(options.ip, options.videoId) | ||
263 | |||
264 | return this.getObject(viewerKey) | ||
265 | } | ||
266 | |||
267 | setLocalVideoViewer (ip: string, videoId: number, object: any) { | ||
268 | const { setKey, viewerKey } = this.generateLocalVideoViewerKeys(ip, videoId) | ||
269 | |||
270 | return Promise.all([ | ||
271 | this.addToSet(setKey, viewerKey), | ||
272 | this.setObject(viewerKey, object) | ||
273 | ]) | ||
274 | } | ||
275 | |||
276 | listLocalVideoViewerKeys () { | ||
277 | const { setKey } = this.generateLocalVideoViewerKeys() | ||
278 | |||
279 | return this.getSet(setKey) | ||
280 | } | ||
281 | |||
282 | deleteLocalVideoViewersKeys (key: string) { | ||
283 | const { setKey } = this.generateLocalVideoViewerKeys() | ||
284 | |||
285 | return Promise.all([ | ||
286 | this.deleteFromSet(setKey, key), | ||
287 | this.deleteKey(key) | ||
288 | ]) | ||
289 | } | ||
290 | |||
252 | /* ************ Resumable uploads final responses ************ */ | 291 | /* ************ Resumable uploads final responses ************ */ |
253 | 292 | ||
254 | setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) { | 293 | setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) { |
@@ -290,10 +329,18 @@ class Redis { | |||
290 | 329 | ||
291 | /* ************ Keys generation ************ */ | 330 | /* ************ Keys generation ************ */ |
292 | 331 | ||
293 | private generateLocalVideoViewsKeys (videoId?: Number) { | 332 | private generateLocalVideoViewsKeys (videoId: number): { setKey: string, videoKey: string } |
333 | private generateLocalVideoViewsKeys (): { setKey: string } | ||
334 | private generateLocalVideoViewsKeys (videoId?: number) { | ||
294 | return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } | 335 | return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } |
295 | } | 336 | } |
296 | 337 | ||
338 | private generateLocalVideoViewerKeys (ip: string, videoId: number): { setKey: string, viewerKey: string } | ||
339 | private generateLocalVideoViewerKeys (): { setKey: string } | ||
340 | private generateLocalVideoViewerKeys (ip?: string, videoId?: number) { | ||
341 | return { setKey: `local-video-viewer-stats-keys`, viewerKey: `local-video-viewer-stats-${ip}-${videoId}` } | ||
342 | } | ||
343 | |||
297 | private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { | 344 | private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { |
298 | const hour = exists(options.hour) | 345 | const hour = exists(options.hour) |
299 | ? options.hour | 346 | ? options.hour |
@@ -352,8 +399,23 @@ class Redis { | |||
352 | return this.client.del(this.prefix + key) | 399 | return this.client.del(this.prefix + key) |
353 | } | 400 | } |
354 | 401 | ||
355 | private async setValue (key: string, value: string, expirationMilliseconds: number) { | 402 | private async getObject (key: string) { |
356 | const result = await this.client.set(this.prefix + key, value, { PX: expirationMilliseconds }) | 403 | const value = await this.getValue(key) |
404 | if (!value) return null | ||
405 | |||
406 | return JSON.parse(value) | ||
407 | } | ||
408 | |||
409 | private setObject (key: string, value: { [ id: string ]: number | string }) { | ||
410 | return this.setValue(key, JSON.stringify(value)) | ||
411 | } | ||
412 | |||
413 | private async setValue (key: string, value: string, expirationMilliseconds?: number) { | ||
414 | const options = expirationMilliseconds | ||
415 | ? { PX: expirationMilliseconds } | ||
416 | : {} | ||
417 | |||
418 | const result = await this.client.set(this.prefix + key, value, options) | ||
357 | 419 | ||
358 | if (result !== 'OK') throw new Error('Redis set result is not OK.') | 420 | if (result !== 'OK') throw new Error('Redis set result is not OK.') |
359 | } | 421 | } |
diff --git a/server/lib/schedulers/geo-ip-update-scheduler.ts b/server/lib/schedulers/geo-ip-update-scheduler.ts new file mode 100644 index 000000000..9dda6d76c --- /dev/null +++ b/server/lib/schedulers/geo-ip-update-scheduler.ts | |||
@@ -0,0 +1,22 @@ | |||
1 | import { GeoIP } from '@server/helpers/geo-ip' | ||
2 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
3 | import { AbstractScheduler } from './abstract-scheduler' | ||
4 | |||
5 | export class GeoIPUpdateScheduler extends AbstractScheduler { | ||
6 | |||
7 | private static instance: AbstractScheduler | ||
8 | |||
9 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.YOUTUBE_DL_UPDATE | ||
10 | |||
11 | private constructor () { | ||
12 | super() | ||
13 | } | ||
14 | |||
15 | protected internalExecute () { | ||
16 | return GeoIP.Instance.updateDatabase() | ||
17 | } | ||
18 | |||
19 | static get Instance () { | ||
20 | return this.instance || (this.instance = new this()) | ||
21 | } | ||
22 | } | ||
diff --git a/server/lib/schedulers/remove-old-views-scheduler.ts b/server/lib/schedulers/remove-old-views-scheduler.ts index 64bef97fe..8bc53a045 100644 --- a/server/lib/schedulers/remove-old-views-scheduler.ts +++ b/server/lib/schedulers/remove-old-views-scheduler.ts | |||
@@ -1,8 +1,8 @@ | |||
1 | import { VideoViewModel } from '@server/models/view/video-view' | ||
1 | import { logger } from '../../helpers/logger' | 2 | import { logger } from '../../helpers/logger' |
2 | import { AbstractScheduler } from './abstract-scheduler' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { CONFIG } from '../../initializers/config' | 3 | import { CONFIG } from '../../initializers/config' |
5 | import { VideoViewModel } from '../../models/video/video-view' | 4 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' |
5 | import { AbstractScheduler } from './abstract-scheduler' | ||
6 | 6 | ||
7 | export class RemoveOldViewsScheduler extends AbstractScheduler { | 7 | export class RemoveOldViewsScheduler extends AbstractScheduler { |
8 | 8 | ||
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts index c0e72c461..937764155 100644 --- a/server/lib/schedulers/video-views-buffer-scheduler.ts +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts | |||
@@ -21,8 +21,6 @@ export class VideoViewsBufferScheduler extends AbstractScheduler { | |||
21 | const videoIds = await Redis.Instance.listLocalVideosViewed() | 21 | const videoIds = await Redis.Instance.listLocalVideosViewed() |
22 | if (videoIds.length === 0) return | 22 | if (videoIds.length === 0) return |
23 | 23 | ||
24 | logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) | ||
25 | |||
26 | for (const videoId of videoIds) { | 24 | for (const videoId of videoIds) { |
27 | try { | 25 | try { |
28 | const views = await Redis.Instance.getLocalVideoViews(videoId) | 26 | const views = await Redis.Instance.getLocalVideoViews(videoId) |
@@ -34,6 +32,8 @@ export class VideoViewsBufferScheduler extends AbstractScheduler { | |||
34 | continue | 32 | continue |
35 | } | 33 | } |
36 | 34 | ||
35 | logger.info('Processing local video %s views buffer.', video.uuid, lTags(video.uuid)) | ||
36 | |||
37 | // If this is a remote video, the origin instance will send us an update | 37 | // If this is a remote video, the origin instance will send us an update |
38 | await VideoModel.incrementViews(videoId, views) | 38 | await VideoModel.incrementViews(videoId, views) |
39 | 39 | ||
diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts deleted file mode 100644 index c024eb93c..000000000 --- a/server/lib/video-views.ts +++ /dev/null | |||
@@ -1,131 +0,0 @@ | |||
1 | import { isTestInstance } from '@server/helpers/core-utils' | ||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
3 | import { VIEW_LIFETIME } from '@server/initializers/constants' | ||
4 | import { VideoModel } from '@server/models/video/video' | ||
5 | import { MVideo } from '@server/types/models' | ||
6 | import { PeerTubeSocket } from './peertube-socket' | ||
7 | import { Redis } from './redis' | ||
8 | |||
9 | const lTags = loggerTagsFactory('views') | ||
10 | |||
11 | export class VideoViews { | ||
12 | |||
13 | // Values are Date().getTime() | ||
14 | private readonly viewersPerVideo = new Map<number, number[]>() | ||
15 | |||
16 | private static instance: VideoViews | ||
17 | |||
18 | private constructor () { | ||
19 | } | ||
20 | |||
21 | init () { | ||
22 | setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) | ||
23 | } | ||
24 | |||
25 | async processView (options: { | ||
26 | video: MVideo | ||
27 | ip: string | null | ||
28 | viewerExpires?: Date | ||
29 | }) { | ||
30 | const { video, ip, viewerExpires } = options | ||
31 | |||
32 | logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) | ||
33 | |||
34 | let success = await this.addView(video, ip) | ||
35 | |||
36 | if (video.isLive) { | ||
37 | const successViewer = await this.addViewer(video, ip, viewerExpires) | ||
38 | success ||= successViewer | ||
39 | } | ||
40 | |||
41 | return success | ||
42 | } | ||
43 | |||
44 | getViewers (video: MVideo) { | ||
45 | const viewers = this.viewersPerVideo.get(video.id) | ||
46 | if (!viewers) return 0 | ||
47 | |||
48 | return viewers.length | ||
49 | } | ||
50 | |||
51 | buildViewerExpireTime () { | ||
52 | return new Date().getTime() + VIEW_LIFETIME.VIEWER | ||
53 | } | ||
54 | |||
55 | private async addView (video: MVideo, ip: string | null) { | ||
56 | const promises: Promise<any>[] = [] | ||
57 | |||
58 | if (ip !== null) { | ||
59 | const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) | ||
60 | if (viewExists) return false | ||
61 | |||
62 | promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) | ||
63 | } | ||
64 | |||
65 | if (video.isOwned()) { | ||
66 | promises.push(Redis.Instance.addLocalVideoView(video.id)) | ||
67 | } | ||
68 | |||
69 | promises.push(Redis.Instance.addVideoViewStats(video.id)) | ||
70 | |||
71 | await Promise.all(promises) | ||
72 | |||
73 | return true | ||
74 | } | ||
75 | |||
76 | private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { | ||
77 | if (ip !== null) { | ||
78 | const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) | ||
79 | if (viewExists) return false | ||
80 | |||
81 | await Redis.Instance.setIPVideoViewer(ip, video.uuid) | ||
82 | } | ||
83 | |||
84 | let watchers = this.viewersPerVideo.get(video.id) | ||
85 | |||
86 | if (!watchers) { | ||
87 | watchers = [] | ||
88 | this.viewersPerVideo.set(video.id, watchers) | ||
89 | } | ||
90 | |||
91 | const expiration = viewerExpires | ||
92 | ? viewerExpires.getTime() | ||
93 | : this.buildViewerExpireTime() | ||
94 | |||
95 | watchers.push(expiration) | ||
96 | await this.notifyClients(video.id, watchers.length) | ||
97 | |||
98 | return true | ||
99 | } | ||
100 | |||
101 | private async cleanViewers () { | ||
102 | if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) | ||
103 | |||
104 | for (const videoId of this.viewersPerVideo.keys()) { | ||
105 | const notBefore = new Date().getTime() | ||
106 | |||
107 | const viewers = this.viewersPerVideo.get(videoId) | ||
108 | |||
109 | // Only keep not expired viewers | ||
110 | const newViewers = viewers.filter(w => w > notBefore) | ||
111 | |||
112 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | ||
113 | else this.viewersPerVideo.set(videoId, newViewers) | ||
114 | |||
115 | await this.notifyClients(videoId, newViewers.length) | ||
116 | } | ||
117 | } | ||
118 | |||
119 | private async notifyClients (videoId: string | number, viewersLength: number) { | ||
120 | const video = await VideoModel.loadImmutableAttributes(videoId) | ||
121 | if (!video) return | ||
122 | |||
123 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | ||
124 | |||
125 | logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) | ||
126 | } | ||
127 | |||
128 | static get Instance () { | ||
129 | return this.instance || (this.instance = new this()) | ||
130 | } | ||
131 | } | ||
diff --git a/server/lib/views/shared/index.ts b/server/lib/views/shared/index.ts new file mode 100644 index 000000000..dd510f4e2 --- /dev/null +++ b/server/lib/views/shared/index.ts | |||
@@ -0,0 +1,2 @@ | |||
1 | export * from './video-viewers' | ||
2 | export * from './video-views' | ||
diff --git a/server/lib/views/shared/video-viewers.ts b/server/lib/views/shared/video-viewers.ts new file mode 100644 index 000000000..5c26f8982 --- /dev/null +++ b/server/lib/views/shared/video-viewers.ts | |||
@@ -0,0 +1,276 @@ | |||
1 | import { Transaction } from 'sequelize/types' | ||
2 | import { isTestInstance } from '@server/helpers/core-utils' | ||
3 | import { GeoIP } from '@server/helpers/geo-ip' | ||
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
5 | import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants' | ||
6 | import { sequelizeTypescript } from '@server/initializers/database' | ||
7 | import { sendCreateWatchAction } from '@server/lib/activitypub/send' | ||
8 | import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url' | ||
9 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | ||
10 | import { Redis } from '@server/lib/redis' | ||
11 | import { VideoModel } from '@server/models/video/video' | ||
12 | import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' | ||
13 | import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' | ||
14 | import { MVideo } from '@server/types/models' | ||
15 | import { VideoViewEvent } from '@shared/models' | ||
16 | |||
17 | const lTags = loggerTagsFactory('views') | ||
18 | |||
19 | type LocalViewerStats = { | ||
20 | firstUpdated: number // Date.getTime() | ||
21 | lastUpdated: number // Date.getTime() | ||
22 | |||
23 | watchSections: { | ||
24 | start: number | ||
25 | end: number | ||
26 | }[] | ||
27 | |||
28 | watchTime: number | ||
29 | |||
30 | country: string | ||
31 | |||
32 | videoId: number | ||
33 | } | ||
34 | |||
35 | export class VideoViewers { | ||
36 | |||
37 | // Values are Date().getTime() | ||
38 | private readonly viewersPerVideo = new Map<number, number[]>() | ||
39 | |||
40 | private processingViewerCounters = false | ||
41 | private processingViewerStats = false | ||
42 | |||
43 | constructor () { | ||
44 | setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER) | ||
45 | |||
46 | setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) | ||
47 | } | ||
48 | |||
49 | // --------------------------------------------------------------------------- | ||
50 | |||
51 | getViewers (video: MVideo) { | ||
52 | const viewers = this.viewersPerVideo.get(video.id) | ||
53 | if (!viewers) return 0 | ||
54 | |||
55 | return viewers.length | ||
56 | } | ||
57 | |||
58 | buildViewerExpireTime () { | ||
59 | return new Date().getTime() + VIEW_LIFETIME.VIEWER | ||
60 | } | ||
61 | |||
62 | async getWatchTime (videoId: number, ip: string) { | ||
63 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId }) | ||
64 | |||
65 | return stats?.watchTime || 0 | ||
66 | } | ||
67 | |||
68 | async addLocalViewer (options: { | ||
69 | video: MVideo | ||
70 | currentTime: number | ||
71 | ip: string | ||
72 | viewEvent?: VideoViewEvent | ||
73 | }) { | ||
74 | const { video, ip, viewEvent, currentTime } = options | ||
75 | |||
76 | logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) }) | ||
77 | |||
78 | await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip }) | ||
79 | |||
80 | const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) | ||
81 | if (viewExists) return false | ||
82 | |||
83 | await Redis.Instance.setIPVideoViewer(ip, video.uuid) | ||
84 | |||
85 | return this.addViewerToVideo({ video }) | ||
86 | } | ||
87 | |||
88 | async addRemoteViewer (options: { | ||
89 | video: MVideo | ||
90 | viewerExpires: Date | ||
91 | }) { | ||
92 | const { video, viewerExpires } = options | ||
93 | |||
94 | logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) | ||
95 | |||
96 | return this.addViewerToVideo({ video, viewerExpires }) | ||
97 | } | ||
98 | |||
99 | private async addViewerToVideo (options: { | ||
100 | video: MVideo | ||
101 | viewerExpires?: Date | ||
102 | }) { | ||
103 | const { video, viewerExpires } = options | ||
104 | |||
105 | let watchers = this.viewersPerVideo.get(video.id) | ||
106 | |||
107 | if (!watchers) { | ||
108 | watchers = [] | ||
109 | this.viewersPerVideo.set(video.id, watchers) | ||
110 | } | ||
111 | |||
112 | const expiration = viewerExpires | ||
113 | ? viewerExpires.getTime() | ||
114 | : this.buildViewerExpireTime() | ||
115 | |||
116 | watchers.push(expiration) | ||
117 | await this.notifyClients(video.id, watchers.length) | ||
118 | |||
119 | return true | ||
120 | } | ||
121 | |||
122 | private async updateLocalViewerStats (options: { | ||
123 | video: MVideo | ||
124 | ip: string | ||
125 | currentTime: number | ||
126 | viewEvent?: VideoViewEvent | ||
127 | }) { | ||
128 | const { video, ip, viewEvent, currentTime } = options | ||
129 | const nowMs = new Date().getTime() | ||
130 | |||
131 | let stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId: video.id }) | ||
132 | |||
133 | if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) { | ||
134 | logger.warn('Too much watch section to store for a viewer, skipping this one', { currentTime, viewEvent, ...lTags(video.uuid) }) | ||
135 | return | ||
136 | } | ||
137 | |||
138 | if (!stats) { | ||
139 | const country = await GeoIP.Instance.safeCountryISOLookup(ip) | ||
140 | |||
141 | stats = { | ||
142 | firstUpdated: nowMs, | ||
143 | lastUpdated: nowMs, | ||
144 | |||
145 | watchSections: [], | ||
146 | |||
147 | watchTime: 0, | ||
148 | |||
149 | country, | ||
150 | videoId: video.id | ||
151 | } | ||
152 | } | ||
153 | |||
154 | stats.lastUpdated = nowMs | ||
155 | |||
156 | if (viewEvent === 'seek' || stats.watchSections.length === 0) { | ||
157 | stats.watchSections.push({ | ||
158 | start: currentTime, | ||
159 | end: currentTime | ||
160 | }) | ||
161 | } else { | ||
162 | const lastSection = stats.watchSections[stats.watchSections.length - 1] | ||
163 | lastSection.end = currentTime | ||
164 | } | ||
165 | |||
166 | stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections) | ||
167 | |||
168 | logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) | ||
169 | |||
170 | await Redis.Instance.setLocalVideoViewer(ip, video.id, stats) | ||
171 | } | ||
172 | |||
173 | private async cleanViewerCounters () { | ||
174 | if (this.processingViewerCounters) return | ||
175 | this.processingViewerCounters = true | ||
176 | |||
177 | if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags()) | ||
178 | |||
179 | try { | ||
180 | for (const videoId of this.viewersPerVideo.keys()) { | ||
181 | const notBefore = new Date().getTime() | ||
182 | |||
183 | const viewers = this.viewersPerVideo.get(videoId) | ||
184 | |||
185 | // Only keep not expired viewers | ||
186 | const newViewers = viewers.filter(w => w > notBefore) | ||
187 | |||
188 | if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) | ||
189 | else this.viewersPerVideo.set(videoId, newViewers) | ||
190 | |||
191 | await this.notifyClients(videoId, newViewers.length) | ||
192 | } | ||
193 | } catch (err) { | ||
194 | logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) | ||
195 | } | ||
196 | |||
197 | this.processingViewerCounters = false | ||
198 | } | ||
199 | |||
200 | private async notifyClients (videoId: string | number, viewersLength: number) { | ||
201 | const video = await VideoModel.loadImmutableAttributes(videoId) | ||
202 | if (!video) return | ||
203 | |||
204 | PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) | ||
205 | |||
206 | logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags()) | ||
207 | } | ||
208 | |||
209 | async processViewerStats () { | ||
210 | if (this.processingViewerStats) return | ||
211 | this.processingViewerStats = true | ||
212 | |||
213 | if (!isTestInstance()) logger.info('Processing viewers.', lTags()) | ||
214 | |||
215 | const now = new Date().getTime() | ||
216 | |||
217 | try { | ||
218 | const allKeys = await Redis.Instance.listLocalVideoViewerKeys() | ||
219 | |||
220 | for (const key of allKeys) { | ||
221 | const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ key }) | ||
222 | |||
223 | if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) { | ||
224 | continue | ||
225 | } | ||
226 | |||
227 | try { | ||
228 | await sequelizeTypescript.transaction(async t => { | ||
229 | const video = await VideoModel.load(stats.videoId, t) | ||
230 | |||
231 | const statsModel = await this.saveViewerStats(video, stats, t) | ||
232 | |||
233 | if (video.remote) { | ||
234 | await sendCreateWatchAction(statsModel, t) | ||
235 | } | ||
236 | }) | ||
237 | |||
238 | await Redis.Instance.deleteLocalVideoViewersKeys(key) | ||
239 | } catch (err) { | ||
240 | logger.error('Cannot process viewer stats for Redis key %s.', key, { err, ...lTags() }) | ||
241 | } | ||
242 | } | ||
243 | } catch (err) { | ||
244 | logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() }) | ||
245 | } | ||
246 | |||
247 | this.processingViewerStats = false | ||
248 | } | ||
249 | |||
250 | private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) { | ||
251 | const statsModel = new LocalVideoViewerModel({ | ||
252 | startDate: new Date(stats.firstUpdated), | ||
253 | endDate: new Date(stats.lastUpdated), | ||
254 | watchTime: stats.watchTime, | ||
255 | country: stats.country, | ||
256 | videoId: video.id | ||
257 | }) | ||
258 | |||
259 | statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel) | ||
260 | statsModel.Video = video as VideoModel | ||
261 | |||
262 | await statsModel.save({ transaction }) | ||
263 | |||
264 | statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({ | ||
265 | localVideoViewerId: statsModel.id, | ||
266 | watchSections: stats.watchSections, | ||
267 | transaction | ||
268 | }) | ||
269 | |||
270 | return statsModel | ||
271 | } | ||
272 | |||
273 | private buildWatchTimeFromSections (sections: { start: number, end: number }[]) { | ||
274 | return sections.reduce((p, current) => p + (current.end - current.start), 0) | ||
275 | } | ||
276 | } | ||
diff --git a/server/lib/views/shared/video-views.ts b/server/lib/views/shared/video-views.ts new file mode 100644 index 000000000..19250f993 --- /dev/null +++ b/server/lib/views/shared/video-views.ts | |||
@@ -0,0 +1,60 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { MVideo } from '@server/types/models' | ||
3 | import { Redis } from '../../redis' | ||
4 | |||
5 | const lTags = loggerTagsFactory('views') | ||
6 | |||
7 | export class VideoViews { | ||
8 | |||
9 | async addLocalView (options: { | ||
10 | video: MVideo | ||
11 | ip: string | ||
12 | watchTime: number | ||
13 | }) { | ||
14 | const { video, ip, watchTime } = options | ||
15 | |||
16 | logger.debug('Adding local view to video %s.', video.uuid, { watchTime, ...lTags(video.uuid) }) | ||
17 | |||
18 | if (!this.hasEnoughWatchTime(video, watchTime)) return false | ||
19 | |||
20 | const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) | ||
21 | if (viewExists) return false | ||
22 | |||
23 | await Redis.Instance.setIPVideoView(ip, video.uuid) | ||
24 | |||
25 | await this.addView(video) | ||
26 | |||
27 | return true | ||
28 | } | ||
29 | |||
30 | async addRemoteView (options: { | ||
31 | video: MVideo | ||
32 | }) { | ||
33 | const { video } = options | ||
34 | |||
35 | logger.debug('Adding remote view to video %s.', video.uuid, { ...lTags(video.uuid) }) | ||
36 | |||
37 | await this.addView(video) | ||
38 | |||
39 | return true | ||
40 | } | ||
41 | |||
42 | private async addView (video: MVideo) { | ||
43 | const promises: Promise<any>[] = [] | ||
44 | |||
45 | if (video.isOwned()) { | ||
46 | promises.push(Redis.Instance.addLocalVideoView(video.id)) | ||
47 | } | ||
48 | |||
49 | promises.push(Redis.Instance.addVideoViewStats(video.id)) | ||
50 | |||
51 | await Promise.all(promises) | ||
52 | } | ||
53 | |||
54 | private hasEnoughWatchTime (video: MVideo, watchTime: number) { | ||
55 | if (video.isLive || video.duration >= 30) return watchTime >= 30 | ||
56 | |||
57 | // Check more than 50% of the video is watched | ||
58 | return video.duration / watchTime < 2 | ||
59 | } | ||
60 | } | ||
diff --git a/server/lib/views/video-views-manager.ts b/server/lib/views/video-views-manager.ts new file mode 100644 index 000000000..e07af1ca9 --- /dev/null +++ b/server/lib/views/video-views-manager.ts | |||
@@ -0,0 +1,70 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { MVideo } from '@server/types/models' | ||
3 | import { VideoViewEvent } from '@shared/models' | ||
4 | import { VideoViewers, VideoViews } from './shared' | ||
5 | |||
6 | const lTags = loggerTagsFactory('views') | ||
7 | |||
8 | export class VideoViewsManager { | ||
9 | |||
10 | private static instance: VideoViewsManager | ||
11 | |||
12 | private videoViewers: VideoViewers | ||
13 | private videoViews: VideoViews | ||
14 | |||
15 | private constructor () { | ||
16 | } | ||
17 | |||
18 | init () { | ||
19 | this.videoViewers = new VideoViewers() | ||
20 | this.videoViews = new VideoViews() | ||
21 | } | ||
22 | |||
23 | async processLocalView (options: { | ||
24 | video: MVideo | ||
25 | currentTime: number | ||
26 | ip: string | null | ||
27 | viewEvent?: VideoViewEvent | ||
28 | }) { | ||
29 | const { video, ip, viewEvent, currentTime } = options | ||
30 | |||
31 | logger.debug('Processing local view for %s and ip %s.', video.url, ip, lTags()) | ||
32 | |||
33 | const successViewer = await this.videoViewers.addLocalViewer({ video, ip, viewEvent, currentTime }) | ||
34 | |||
35 | // Do it after added local viewer to fetch updated information | ||
36 | const watchTime = await this.videoViewers.getWatchTime(video.id, ip) | ||
37 | |||
38 | const successView = await this.videoViews.addLocalView({ video, watchTime, ip }) | ||
39 | |||
40 | return { successView, successViewer } | ||
41 | } | ||
42 | |||
43 | async processRemoteView (options: { | ||
44 | video: MVideo | ||
45 | viewerExpires?: Date | ||
46 | }) { | ||
47 | const { video, viewerExpires } = options | ||
48 | |||
49 | logger.debug('Processing remote view for %s.', video.url, { viewerExpires, ...lTags() }) | ||
50 | |||
51 | if (viewerExpires) await this.videoViewers.addRemoteViewer({ video, viewerExpires }) | ||
52 | else await this.videoViews.addRemoteView({ video }) | ||
53 | } | ||
54 | |||
55 | getViewers (video: MVideo) { | ||
56 | return this.videoViewers.getViewers(video) | ||
57 | } | ||
58 | |||
59 | buildViewerExpireTime () { | ||
60 | return this.videoViewers.buildViewerExpireTime() | ||
61 | } | ||
62 | |||
63 | processViewers () { | ||
64 | return this.videoViewers.processViewerStats() | ||
65 | } | ||
66 | |||
67 | static get Instance () { | ||
68 | return this.instance || (this.instance = new this()) | ||
69 | } | ||
70 | } | ||