aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2019-08-06 17:19:53 +0200
committerChocobozzz <me@florianbigard.com>2019-08-06 17:26:51 +0200
commit6b9c966f6428c9e47bead3410a0401e8ebd744bf (patch)
tree282218ec56725b0e2e878b0471cd08a54fd91998 /server
parent466e3f20a537f1eff4b4fd03297df11ba371d049 (diff)
downloadPeerTube-6b9c966f6428c9e47bead3410a0401e8ebd744bf.tar.gz
PeerTube-6b9c966f6428c9e47bead3410a0401e8ebd744bf.tar.zst
PeerTube-6b9c966f6428c9e47bead3410a0401e8ebd744bf.zip
Automatically remove bad followings
Diffstat (limited to 'server')
-rw-r--r--server/lib/activitypub/actor.ts10
-rw-r--r--server/lib/activitypub/process/process-create.ts11
-rw-r--r--server/lib/activitypub/video-comments.ts230
-rw-r--r--server/lib/activitypub/videos.ts16
-rw-r--r--server/lib/files-cache/actor-follow-score-cache.ts28
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/schedulers/actor-follow-scheduler.ts13
-rw-r--r--server/models/account/account-video-rate.ts24
-rw-r--r--server/models/activitypub/actor-follow.ts26
-rw-r--r--server/models/utils.ts20
-rw-r--r--server/models/video/video-comment.ts38
-rw-r--r--server/models/video/video-share.ts18
-rw-r--r--server/tests/api/server/handle-down.ts54
13 files changed, 291 insertions, 199 deletions
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts
index 38eb87d1e..0e6596f10 100644
--- a/server/lib/activitypub/actor.ts
+++ b/server/lib/activitypub/actor.ts
@@ -254,14 +254,14 @@ async function refreshActorIfNeeded (
254 await actor.save({ transaction: t }) 254 await actor.save({ transaction: t })
255 255
256 if (actor.Account) { 256 if (actor.Account) {
257 actor.Account.set('name', result.name) 257 actor.Account.name = result.name
258 actor.Account.set('description', result.summary) 258 actor.Account.description = result.summary
259 259
260 await actor.Account.save({ transaction: t }) 260 await actor.Account.save({ transaction: t })
261 } else if (actor.VideoChannel) { 261 } else if (actor.VideoChannel) {
262 actor.VideoChannel.set('name', result.name) 262 actor.VideoChannel.name = result.name
263 actor.VideoChannel.set('description', result.summary) 263 actor.VideoChannel.description = result.summary
264 actor.VideoChannel.set('support', result.support) 264 actor.VideoChannel.support = result.support
265 265
266 await actor.VideoChannel.save({ transaction: t }) 266 await actor.VideoChannel.save({ transaction: t })
267 } 267 }
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts
index a979771b6..b81021163 100644
--- a/server/lib/activitypub/process/process-create.ts
+++ b/server/lib/activitypub/process/process-create.ts
@@ -4,7 +4,7 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
5import { sequelizeTypescript } from '../../../initializers' 5import { sequelizeTypescript } from '../../../initializers'
6import { ActorModel } from '../../../models/activitypub/actor' 6import { ActorModel } from '../../../models/activitypub/actor'
7import { addVideoComment, resolveThread } from '../video-comments' 7import { resolveThread } from '../video-comments'
8import { getOrCreateVideoAndAccountAndChannel } from '../videos' 8import { getOrCreateVideoAndAccountAndChannel } from '../videos'
9import { forwardVideoRelatedActivity } from '../send/utils' 9import { forwardVideoRelatedActivity } from '../send/utils'
10import { createOrUpdateCacheFile } from '../cache-file' 10import { createOrUpdateCacheFile } from '../cache-file'
@@ -13,6 +13,7 @@ import { PlaylistObject } from '../../../../shared/models/activitypub/objects/pl
13import { createOrUpdateVideoPlaylist } from '../playlist' 13import { createOrUpdateVideoPlaylist } from '../playlist'
14import { VideoModel } from '../../../models/video/video' 14import { VideoModel } from '../../../models/video/video'
15import { APProcessorOptions } from '../../../typings/activitypub-processor.model' 15import { APProcessorOptions } from '../../../typings/activitypub-processor.model'
16import { VideoCommentModel } from '../../../models/video/video-comment'
16 17
17async function processCreateActivity (options: APProcessorOptions<ActivityCreate>) { 18async function processCreateActivity (options: APProcessorOptions<ActivityCreate>) {
18 const { activity, byActor } = options 19 const { activity, byActor } = options
@@ -83,9 +84,13 @@ async function processCreateVideoComment (activity: ActivityCreate, byActor: Act
83 if (!byAccount) throw new Error('Cannot create video comment with the non account actor ' + byActor.url) 84 if (!byAccount) throw new Error('Cannot create video comment with the non account actor ' + byActor.url)
84 85
85 let video: VideoModel 86 let video: VideoModel
87 let created: boolean
88 let comment: VideoCommentModel
86 try { 89 try {
87 const resolveThreadResult = await resolveThread(commentObject.inReplyTo) 90 const resolveThreadResult = await resolveThread({ url: commentObject.id, isVideo: false })
88 video = resolveThreadResult.video 91 video = resolveThreadResult.video
92 created = resolveThreadResult.commentCreated
93 comment = resolveThreadResult.comment
89 } catch (err) { 94 } catch (err) {
90 logger.debug( 95 logger.debug(
91 'Cannot process video comment because we could not resolve thread %s. Maybe it was not a video thread, so skip it.', 96 'Cannot process video comment because we could not resolve thread %s. Maybe it was not a video thread, so skip it.',
@@ -95,8 +100,6 @@ async function processCreateVideoComment (activity: ActivityCreate, byActor: Act
95 return 100 return
96 } 101 }
97 102
98 const { comment, created } = await addVideoComment(video, commentObject.id)
99
100 if (video.isOwned() && created === true) { 103 if (video.isOwned() && created === true) {
101 // Don't resend the activity to the sender 104 // Don't resend the activity to the sender
102 const exceptions = [ byActor ] 105 const exceptions = [ byActor ]
diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts
index 921abdb8d..92e1a9020 100644
--- a/server/lib/activitypub/video-comments.ts
+++ b/server/lib/activitypub/video-comments.ts
@@ -1,9 +1,7 @@
1import { VideoCommentObject } from '../../../shared/models/activitypub/objects/video-comment-object'
2import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments' 1import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments'
3import { logger } from '../../helpers/logger' 2import { logger } from '../../helpers/logger'
4import { doRequest } from '../../helpers/requests' 3import { doRequest } from '../../helpers/requests'
5import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers/constants' 4import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers/constants'
6import { ActorModel } from '../../models/activitypub/actor'
7import { VideoModel } from '../../models/video/video' 5import { VideoModel } from '../../models/video/video'
8import { VideoCommentModel } from '../../models/video/video-comment' 6import { VideoCommentModel } from '../../models/video/video-comment'
9import { getOrCreateActorAndServerAndModel } from './actor' 7import { getOrCreateActorAndServerAndModel } from './actor'
@@ -11,79 +9,53 @@ import { getOrCreateVideoAndAccountAndChannel } from './videos'
11import * as Bluebird from 'bluebird' 9import * as Bluebird from 'bluebird'
12import { checkUrlsSameHost } from '../../helpers/activitypub' 10import { checkUrlsSameHost } from '../../helpers/activitypub'
13 11
14async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) { 12type ResolveThreadParams = {
15 let originCommentId: number = null 13 url: string,
16 let inReplyToCommentId: number = null 14 comments?: VideoCommentModel[],
17 15 isVideo?: boolean,
18 // If this is not a reply to the video (thread), create or get the parent comment 16 commentCreated?: boolean
19 if (video.url !== comment.inReplyTo) {
20 const { comment: parent } = await addVideoComment(video, comment.inReplyTo)
21 if (!parent) {
22 logger.warn('Cannot fetch or get parent comment %s of comment %s.', comment.inReplyTo, comment.id)
23 return undefined
24 }
25
26 originCommentId = parent.originCommentId || parent.id
27 inReplyToCommentId = parent.id
28 }
29
30 return {
31 url: comment.id,
32 text: comment.content,
33 videoId: video.id,
34 accountId: actor.Account.id,
35 inReplyToCommentId,
36 originCommentId,
37 createdAt: new Date(comment.published)
38 }
39} 17}
18type ResolveThreadResult = Promise<{ video: VideoModel, comment: VideoCommentModel, commentCreated: boolean }>
40 19
41async function addVideoComments (commentUrls: string[], instance: VideoModel) { 20async function addVideoComments (commentUrls: string[]) {
42 return Bluebird.map(commentUrls, commentUrl => { 21 return Bluebird.map(commentUrls, commentUrl => {
43 return addVideoComment(instance, commentUrl) 22 return resolveThread({ url: commentUrl, isVideo: false })
44 }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) 23 }, { concurrency: CRAWL_REQUEST_CONCURRENCY })
45} 24}
46 25
47async function addVideoComment (videoInstance: VideoModel, commentUrl: string) { 26async function resolveThread (params: ResolveThreadParams): ResolveThreadResult {
48 logger.info('Fetching remote video comment %s.', commentUrl) 27 const { url, isVideo } = params
28 if (params.commentCreated === undefined) params.commentCreated = false
29 if (params.comments === undefined) params.comments = []
49 30
50 const { body } = await doRequest({ 31 // Already have this comment?
51 uri: commentUrl, 32 if (isVideo !== true) {
52 json: true, 33 const result = await resolveCommentFromDB(params)
53 activityPub: true 34 if (result) return result
54 })
55
56 if (sanitizeAndCheckVideoCommentObject(body) === false) {
57 logger.debug('Remote video comment JSON %s is not valid.', commentUrl, { body })
58 return { created: false }
59 } 35 }
60 36
61 const actorUrl = body.attributedTo 37 try {
62 if (!actorUrl) return { created: false } 38 if (isVideo !== false) return await tryResolveThreadFromVideo(params)
63 39
64 if (checkUrlsSameHost(commentUrl, actorUrl) !== true) { 40 return resolveParentComment(params)
65 throw new Error(`Actor url ${actorUrl} has not the same host than the comment url ${commentUrl}`) 41 } catch (err) {
66 } 42 logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err })
67 43
68 if (checkUrlsSameHost(body.id, commentUrl) !== true) { 44 return resolveParentComment(params)
69 throw new Error(`Comment url ${commentUrl} host is different from the AP object id ${body.id}`)
70 } 45 }
46}
71 47
72 const actor = await getOrCreateActorAndServerAndModel(actorUrl, 'all') 48export {
73 const entry = await videoCommentActivityObjectToDBAttributes(videoInstance, actor, body) 49 addVideoComments,
74 if (!entry) return { created: false } 50 resolveThread
51}
75 52
76 const [ comment, created ] = await VideoCommentModel.upsert<VideoCommentModel>(entry, { returning: true }) 53// ---------------------------------------------------------------------------
77 comment.Account = actor.Account
78 comment.Video = videoInstance
79 54
80 return { comment, created } 55async function resolveCommentFromDB (params: ResolveThreadParams) {
81} 56 const { url, comments, commentCreated } = params
82 57
83type ResolveThreadResult = Promise<{ video: VideoModel, parents: VideoCommentModel[] }> 58 const commentFromDatabase = await VideoCommentModel.loadByUrlAndPopulateReplyAndVideoUrlAndAccount(url)
84async function resolveThread (url: string, comments: VideoCommentModel[] = []): ResolveThreadResult {
85 // Already have this comment?
86 const commentFromDatabase = await VideoCommentModel.loadByUrlAndPopulateReplyAndVideo(url)
87 if (commentFromDatabase) { 59 if (commentFromDatabase) {
88 let parentComments = comments.concat([ commentFromDatabase ]) 60 let parentComments = comments.concat([ commentFromDatabase ])
89 61
@@ -94,79 +66,97 @@ async function resolveThread (url: string, comments: VideoCommentModel[] = []):
94 parentComments = parentComments.concat(data) 66 parentComments = parentComments.concat(data)
95 } 67 }
96 68
97 return resolveThread(commentFromDatabase.Video.url, parentComments) 69 return resolveThread({
70 url: commentFromDatabase.Video.url,
71 comments: parentComments,
72 isVideo: true,
73 commentCreated
74 })
98 } 75 }
99 76
100 try { 77 return undefined
101 // Maybe it's a reply to a video? 78}
102 // If yes, it's done: we resolved all the thread 79
103 const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: url }) 80async function tryResolveThreadFromVideo (params: ResolveThreadParams) {
104 81 const { url, comments, commentCreated } = params
105 if (comments.length !== 0) { 82
106 const firstReply = comments[ comments.length - 1 ] 83 // Maybe it's a reply to a video?
107 firstReply.inReplyToCommentId = null 84 // If yes, it's done: we resolved all the thread
108 firstReply.originCommentId = null 85 const syncParam = { likes: true, dislikes: true, shares: true, comments: false, thumbnail: true, refreshVideo: false }
109 firstReply.videoId = video.id 86 const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: url, syncParam })
110 comments[comments.length - 1] = await firstReply.save() 87
111 88 let resultComment: VideoCommentModel
112 for (let i = comments.length - 2; i >= 0; i--) { 89 if (comments.length !== 0) {
113 const comment = comments[ i ] 90 const firstReply = comments[ comments.length - 1 ]
114 comment.originCommentId = firstReply.id 91 firstReply.inReplyToCommentId = null
115 comment.inReplyToCommentId = comments[ i + 1 ].id 92 firstReply.originCommentId = null
116 comment.videoId = video.id 93 firstReply.videoId = video.id
117 94 firstReply.changed('updatedAt', true)
118 comments[i] = await comment.save() 95 firstReply.Video = video
119 } 96
97 comments[comments.length - 1] = await firstReply.save()
98
99 for (let i = comments.length - 2; i >= 0; i--) {
100 const comment = comments[ i ]
101 comment.originCommentId = firstReply.id
102 comment.inReplyToCommentId = comments[ i + 1 ].id
103 comment.videoId = video.id
104 comment.changed('updatedAt', true)
105 comment.Video = video
106
107 comments[i] = await comment.save()
120 } 108 }
121 109
122 return { video, parents: comments } 110 resultComment = comments[0]
123 } catch (err) { 111 }
124 logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err })
125 112
126 if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) { 113 return { video, comment: resultComment, commentCreated }
127 throw new Error('Recursion limit reached when resolving a thread') 114}
128 }
129 115
130 const { body } = await doRequest({ 116async function resolveParentComment (params: ResolveThreadParams) {
131 uri: url, 117 const { url, comments } = params
132 json: true,
133 activityPub: true
134 })
135 118
136 if (sanitizeAndCheckVideoCommentObject(body) === false) { 119 if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) {
137 throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body)) 120 throw new Error('Recursion limit reached when resolving a thread')
138 } 121 }
139 122
140 const actorUrl = body.attributedTo 123 const { body } = await doRequest({
141 if (!actorUrl) throw new Error('Miss attributed to in comment') 124 uri: url,
125 json: true,
126 activityPub: true
127 })
142 128
143 if (checkUrlsSameHost(url, actorUrl) !== true) { 129 if (sanitizeAndCheckVideoCommentObject(body) === false) {
144 throw new Error(`Actor url ${actorUrl} has not the same host than the comment url ${url}`) 130 throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body))
145 } 131 }
146 132
147 if (checkUrlsSameHost(body.id, url) !== true) { 133 const actorUrl = body.attributedTo
148 throw new Error(`Comment url ${url} host is different from the AP object id ${body.id}`) 134 if (!actorUrl) throw new Error('Miss attributed to in comment')
149 }
150 135
151 const actor = await getOrCreateActorAndServerAndModel(actorUrl) 136 if (checkUrlsSameHost(url, actorUrl) !== true) {
152 const comment = new VideoCommentModel({ 137 throw new Error(`Actor url ${actorUrl} has not the same host than the comment url ${url}`)
153 url: body.id, 138 }
154 text: body.content,
155 videoId: null,
156 accountId: actor.Account.id,
157 inReplyToCommentId: null,
158 originCommentId: null,
159 createdAt: new Date(body.published),
160 updatedAt: new Date(body.updated)
161 })
162 139
163 return resolveThread(body.inReplyTo, comments.concat([ comment ])) 140 if (checkUrlsSameHost(body.id, url) !== true) {
141 throw new Error(`Comment url ${url} host is different from the AP object id ${body.id}`)
164 } 142 }
165}
166 143
167export { 144 const actor = await getOrCreateActorAndServerAndModel(actorUrl)
168 videoCommentActivityObjectToDBAttributes, 145 const comment = new VideoCommentModel({
169 addVideoComments, 146 url: body.id,
170 addVideoComment, 147 text: body.content,
171 resolveThread 148 videoId: null,
149 accountId: actor.Account.id,
150 inReplyToCommentId: null,
151 originCommentId: null,
152 createdAt: new Date(body.published),
153 updatedAt: new Date(body.updated)
154 })
155 comment.Account = actor.Account
156
157 return resolveThread({
158 url: body.inReplyTo,
159 comments: comments.concat([ comment ]),
160 commentCreated: true
161 })
172} 162}
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts
index d7bc3d650..2102702e1 100644
--- a/server/lib/activitypub/videos.ts
+++ b/server/lib/activitypub/videos.ts
@@ -56,6 +56,7 @@ import { join } from 'path'
56import { FilteredModelAttributes } from '../../typings/sequelize' 56import { FilteredModelAttributes } from '../../typings/sequelize'
57import { Hooks } from '../plugins/hooks' 57import { Hooks } from '../plugins/hooks'
58import { autoBlacklistVideoIfNeeded } from '../video-blacklist' 58import { autoBlacklistVideoIfNeeded } from '../video-blacklist'
59import { ActorFollowScoreCache } from '../files-cache'
59 60
60async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { 61async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
61 if ( 62 if (
@@ -182,7 +183,7 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid
182 } 183 }
183 184
184 if (syncParam.comments === true) { 185 if (syncParam.comments === true) {
185 const handler = items => addVideoComments(items, video) 186 const handler = items => addVideoComments(items)
186 const cleaner = crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate) 187 const cleaner = crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
187 188
188 await crawlCollectionPage<string>(fetchedVideo.comments, handler, cleaner) 189 await crawlCollectionPage<string>(fetchedVideo.comments, handler, cleaner)
@@ -421,10 +422,14 @@ async function refreshVideoIfNeeded (options: {
421 await retryTransactionWrapper(updateVideoFromAP, updateOptions) 422 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
422 await syncVideoExternalAttributes(video, videoObject, options.syncParam) 423 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
423 424
425 ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
426
424 return video 427 return video
425 } catch (err) { 428 } catch (err) {
426 logger.warn('Cannot refresh video %s.', options.video.url, { err }) 429 logger.warn('Cannot refresh video %s.', options.video.url, { err })
427 430
431 ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
432
428 // Don't refresh in loop 433 // Don't refresh in loop
429 await video.setAsRefreshed() 434 await video.setAsRefreshed()
430 return video 435 return video
@@ -500,7 +505,7 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor
500 505
501 const videoStreamingPlaylists = streamingPlaylistActivityUrlToDBAttributes(videoCreated, videoObject, videoFiles) 506 const videoStreamingPlaylists = streamingPlaylistActivityUrlToDBAttributes(videoCreated, videoObject, videoFiles)
502 const playlistPromises = videoStreamingPlaylists.map(p => VideoStreamingPlaylistModel.create(p, { transaction: t })) 507 const playlistPromises = videoStreamingPlaylists.map(p => VideoStreamingPlaylistModel.create(p, { transaction: t }))
503 await Promise.all(playlistPromises) 508 const streamingPlaylists = await Promise.all(playlistPromises)
504 509
505 // Process tags 510 // Process tags
506 const tags = videoObject.tag 511 const tags = videoObject.tag
@@ -513,7 +518,12 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor
513 const videoCaptionsPromises = videoObject.subtitleLanguage.map(c => { 518 const videoCaptionsPromises = videoObject.subtitleLanguage.map(c => {
514 return VideoCaptionModel.insertOrReplaceLanguage(videoCreated.id, c.identifier, t) 519 return VideoCaptionModel.insertOrReplaceLanguage(videoCreated.id, c.identifier, t)
515 }) 520 })
516 await Promise.all(videoCaptionsPromises) 521 const captions = await Promise.all(videoCaptionsPromises)
522
523 video.VideoFiles = videoFiles
524 video.VideoStreamingPlaylists = streamingPlaylists
525 video.Tags = tagInstances
526 video.VideoCaptions = captions
517 527
518 const autoBlacklisted = await autoBlacklistVideoIfNeeded({ 528 const autoBlacklisted = await autoBlacklistVideoIfNeeded({
519 video, 529 video,
diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/files-cache/actor-follow-score-cache.ts
index 5f8ee806f..086605726 100644
--- a/server/lib/files-cache/actor-follow-score-cache.ts
+++ b/server/lib/files-cache/actor-follow-score-cache.ts
@@ -7,6 +7,8 @@ class ActorFollowScoreCache {
7 7
8 private static instance: ActorFollowScoreCache 8 private static instance: ActorFollowScoreCache
9 private pendingFollowsScore: { [ url: string ]: number } = {} 9 private pendingFollowsScore: { [ url: string ]: number } = {}
10 private pendingBadServer = new Set<number>()
11 private pendingGoodServer = new Set<number>()
10 12
11 private constructor () {} 13 private constructor () {}
12 14
@@ -32,7 +34,31 @@ class ActorFollowScoreCache {
32 } 34 }
33 } 35 }
34 36
35 getPendingFollowsScoreCopy () { 37 addBadServerId (serverId: number) {
38 this.pendingBadServer.add(serverId)
39 }
40
41 getBadFollowingServerIds () {
42 return Array.from(this.pendingBadServer)
43 }
44
45 clearBadFollowingServerIds () {
46 this.pendingBadServer = new Set<number>()
47 }
48
49 addGoodServerId (serverId: number) {
50 this.pendingGoodServer.add(serverId)
51 }
52
53 getGoodFollowingServerIds () {
54 return Array.from(this.pendingGoodServer)
55 }
56
57 clearGoodFollowingServerIds () {
58 this.pendingGoodServer = new Set<number>()
59 }
60
61 getPendingFollowsScore () {
36 return this.pendingFollowsScore 62 return this.pendingFollowsScore
37 } 63 }
38 64
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 4da645f07..c3f59dc77 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -37,7 +37,7 @@ async function processActivityPubHttpFetcher (job: Bull.Job) {
37 'video-likes': items => createRates(items, video, 'like'), 37 'video-likes': items => createRates(items, video, 'like'),
38 'video-dislikes': items => createRates(items, video, 'dislike'), 38 'video-dislikes': items => createRates(items, video, 'dislike'),
39 'video-shares': items => addVideoShares(items, video), 39 'video-shares': items => addVideoShares(items, video),
40 'video-comments': items => addVideoComments(items, video), 40 'video-comments': items => addVideoComments(items),
41 'account-playlists': items => createAccountPlaylists(items, account) 41 'account-playlists': items => createAccountPlaylists(items, account)
42 } 42 }
43 43
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
index fdd3ad5fa..598c0211f 100644
--- a/server/lib/schedulers/actor-follow-scheduler.ts
+++ b/server/lib/schedulers/actor-follow-scheduler.ts
@@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils'
2import { logger } from '../../helpers/logger' 2import { logger } from '../../helpers/logger'
3import { ActorFollowModel } from '../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../models/activitypub/actor-follow'
4import { AbstractScheduler } from './abstract-scheduler' 4import { AbstractScheduler } from './abstract-scheduler'
5import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' 5import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
6import { ActorFollowScoreCache } from '../files-cache' 6import { ActorFollowScoreCache } from '../files-cache'
7 7
8export class ActorFollowScheduler extends AbstractScheduler { 8export class ActorFollowScheduler extends AbstractScheduler {
@@ -22,13 +22,20 @@ export class ActorFollowScheduler extends AbstractScheduler {
22 } 22 }
23 23
24 private async processPendingScores () { 24 private async processPendingScores () {
25 const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() 25 const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore()
26 const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds()
27 const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds()
26 28
27 ActorFollowScoreCache.Instance.clearPendingFollowsScore() 29 ActorFollowScoreCache.Instance.clearPendingFollowsScore()
30 ActorFollowScoreCache.Instance.clearBadFollowingServerIds()
31 ActorFollowScoreCache.Instance.clearGoodFollowingServerIds()
28 32
29 for (const inbox of Object.keys(pendingScores)) { 33 for (const inbox of Object.keys(pendingScores)) {
30 await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) 34 await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
31 } 35 }
36
37 await ActorFollowModel.updateScoreByFollowingServers(badServerIds, ACTOR_FOLLOW_SCORE.PENALTY)
38 await ActorFollowModel.updateScoreByFollowingServers(goodServerIds, ACTOR_FOLLOW_SCORE.BONUS)
32 } 39 }
33 40
34 private async removeBadActorFollows () { 41 private async removeBadActorFollows () {
diff --git a/server/models/account/account-video-rate.ts b/server/models/account/account-video-rate.ts
index d5c214ecb..4bd8114cf 100644
--- a/server/models/account/account-video-rate.ts
+++ b/server/models/account/account-video-rate.ts
@@ -6,7 +6,7 @@ import { CONSTRAINTS_FIELDS, VIDEO_RATE_TYPES } from '../../initializers/constan
6import { VideoModel } from '../video/video' 6import { VideoModel } from '../video/video'
7import { AccountModel } from './account' 7import { AccountModel } from './account'
8import { ActorModel } from '../activitypub/actor' 8import { ActorModel } from '../activitypub/actor'
9import { getSort, throwIfNotValid } from '../utils' 9import { buildLocalAccountIdsIn, getSort, throwIfNotValid } from '../utils'
10import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc' 10import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc'
11import { AccountVideoRate } from '../../../shared' 11import { AccountVideoRate } from '../../../shared'
12import { ScopeNames as VideoChannelScopeNames, SummaryOptions, VideoChannelModel } from '../video/video-channel' 12import { ScopeNames as VideoChannelScopeNames, SummaryOptions, VideoChannelModel } from '../video/video-channel'
@@ -219,25 +219,11 @@ export class AccountVideoRateModel extends Model<AccountVideoRateModel> {
219 [Op.lt]: beforeUpdatedAt 219 [Op.lt]: beforeUpdatedAt
220 }, 220 },
221 videoId, 221 videoId,
222 type 222 type,
223 }, 223 accountId: {
224 include: [ 224 [Op.notIn]: buildLocalAccountIdsIn()
225 {
226 model: AccountModel.unscoped(),
227 required: true,
228 include: [
229 {
230 model: ActorModel.unscoped(),
231 required: true,
232 where: {
233 serverId: {
234 [Op.ne]: null
235 }
236 }
237 }
238 ]
239 } 225 }
240 ], 226 },
241 transaction: t 227 transaction: t
242 } 228 }
243 229
diff --git a/server/models/activitypub/actor-follow.ts b/server/models/activitypub/actor-follow.ts
index 3039b90c7..99a5fd117 100644
--- a/server/models/activitypub/actor-follow.ts
+++ b/server/models/activitypub/actor-follow.ts
@@ -23,7 +23,7 @@ import { logger } from '../../helpers/logger'
23import { getServerActor } from '../../helpers/utils' 23import { getServerActor } from '../../helpers/utils'
24import { ACTOR_FOLLOW_SCORE, FOLLOW_STATES } from '../../initializers/constants' 24import { ACTOR_FOLLOW_SCORE, FOLLOW_STATES } from '../../initializers/constants'
25import { ServerModel } from '../server/server' 25import { ServerModel } from '../server/server'
26import { getSort } from '../utils' 26import { createSafeIn, getSort } from '../utils'
27import { ActorModel, unusedActorAttributesForAPI } from './actor' 27import { ActorModel, unusedActorAttributesForAPI } from './actor'
28import { VideoChannelModel } from '../video/video-channel' 28import { VideoChannelModel } from '../video/video-channel'
29import { AccountModel } from '../account/account' 29import { AccountModel } from '../account/account'
@@ -464,7 +464,7 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
464 } 464 }
465 } 465 }
466 466
467 static updateFollowScore (inboxUrl: string, value: number, t?: Transaction) { 467 static updateScore (inboxUrl: string, value: number, t?: Transaction) {
468 const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + 468 const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
469 'WHERE id IN (' + 469 'WHERE id IN (' +
470 'SELECT "actorFollow"."id" FROM "actorFollow" ' + 470 'SELECT "actorFollow"."id" FROM "actorFollow" ' +
@@ -480,6 +480,28 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
480 return ActorFollowModel.sequelize.query(query, options) 480 return ActorFollowModel.sequelize.query(query, options)
481 } 481 }
482 482
483 static async updateScoreByFollowingServers (serverIds: number[], value: number, t?: Transaction) {
484 if (serverIds.length === 0) return
485
486 const me = await getServerActor()
487 const serverIdsString = createSafeIn(ActorFollowModel, serverIds)
488
489 const query = `UPDATE "actorFollow" SET "score" = "score" + ${value} ` +
490 'WHERE id IN (' +
491 'SELECT "actorFollow"."id" FROM "actorFollow" ' +
492 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."targetActorId" ' +
493 `WHERE "actorFollow"."actorId" = ${me.Account.actorId} ` + // I'm the follower
494 `AND "actor"."serverId" IN (${serverIdsString})` + // Criteria on followings
495 ')'
496
497 const options = {
498 type: QueryTypes.BULKUPDATE,
499 transaction: t
500 }
501
502 return ActorFollowModel.sequelize.query(query, options)
503 }
504
483 private static async createListAcceptedFollowForApiQuery ( 505 private static async createListAcceptedFollowForApiQuery (
484 type: 'followers' | 'following', 506 type: 'followers' | 'following',
485 actorIds: number[], 507 actorIds: number[],
diff --git a/server/models/utils.ts b/server/models/utils.ts
index 30de91e1d..24890f961 100644
--- a/server/models/utils.ts
+++ b/server/models/utils.ts
@@ -1,7 +1,7 @@
1import { Model, Sequelize } from 'sequelize-typescript' 1import { Model, Sequelize } from 'sequelize-typescript'
2import * as validator from 'validator' 2import * as validator from 'validator'
3import { Col } from 'sequelize/types/lib/utils' 3import { Col } from 'sequelize/types/lib/utils'
4import { OrderItem } from 'sequelize/types' 4import { OrderItem, literal } from 'sequelize'
5 5
6type SortType = { sortModel: any, sortValue: string } 6type SortType = { sortModel: any, sortValue: string }
7 7
@@ -129,16 +129,30 @@ function parseAggregateResult (result: any) {
129 return total 129 return total
130} 130}
131 131
132const createSafeIn = (model: typeof Model, stringArr: string[]) => { 132const createSafeIn = (model: typeof Model, stringArr: (string | number)[]) => {
133 return stringArr.map(t => model.sequelize.escape(t)) 133 return stringArr.map(t => model.sequelize.escape('' + t))
134 .join(', ') 134 .join(', ')
135} 135}
136 136
137function buildLocalAccountIdsIn () {
138 return literal(
139 '(SELECT "account"."id" FROM "account" INNER JOIN "actor" ON "actor"."id" = "account"."actorId" AND "actor"."serverId" IS NULL)'
140 )
141}
142
143function buildLocalActorIdsIn () {
144 return literal(
145 '(SELECT "actor"."id" FROM "actor" WHERE "actor"."serverId" IS NULL)'
146 )
147}
148
137// --------------------------------------------------------------------------- 149// ---------------------------------------------------------------------------
138 150
139export { 151export {
140 buildBlockedAccountSQL, 152 buildBlockedAccountSQL,
153 buildLocalActorIdsIn,
141 SortType, 154 SortType,
155 buildLocalAccountIdsIn,
142 getSort, 156 getSort,
143 getVideoSort, 157 getVideoSort,
144 getSortOnModel, 158 getSortOnModel,
diff --git a/server/models/video/video-comment.ts b/server/models/video/video-comment.ts
index 28e5818cd..6eda32f05 100644
--- a/server/models/video/video-comment.ts
+++ b/server/models/video/video-comment.ts
@@ -22,7 +22,7 @@ import { AccountModel } from '../account/account'
22import { ActorModel } from '../activitypub/actor' 22import { ActorModel } from '../activitypub/actor'
23import { AvatarModel } from '../avatar/avatar' 23import { AvatarModel } from '../avatar/avatar'
24import { ServerModel } from '../server/server' 24import { ServerModel } from '../server/server'
25import { buildBlockedAccountSQL, getSort, throwIfNotValid } from '../utils' 25import { buildBlockedAccountSQL, buildLocalAccountIdsIn, getSort, throwIfNotValid } from '../utils'
26import { VideoModel } from './video' 26import { VideoModel } from './video'
27import { VideoChannelModel } from './video-channel' 27import { VideoChannelModel } from './video-channel'
28import { getServerActor } from '../../helpers/utils' 28import { getServerActor } from '../../helpers/utils'
@@ -30,7 +30,7 @@ import { UserModel } from '../account/user'
30import { actorNameAlphabet } from '../../helpers/custom-validators/activitypub/actor' 30import { actorNameAlphabet } from '../../helpers/custom-validators/activitypub/actor'
31import { regexpCapture } from '../../helpers/regexp' 31import { regexpCapture } from '../../helpers/regexp'
32import { uniq } from 'lodash' 32import { uniq } from 'lodash'
33import { FindOptions, Op, Order, ScopeOptions, Sequelize, Transaction } from 'sequelize' 33import { FindOptions, literal, Op, Order, ScopeOptions, Sequelize, Transaction } from 'sequelize'
34 34
35enum ScopeNames { 35enum ScopeNames {
36 WITH_ACCOUNT = 'WITH_ACCOUNT', 36 WITH_ACCOUNT = 'WITH_ACCOUNT',
@@ -281,16 +281,22 @@ export class VideoCommentModel extends Model<VideoCommentModel> {
281 return VideoCommentModel.scope([ ScopeNames.WITH_ACCOUNT ]).findOne(query) 281 return VideoCommentModel.scope([ ScopeNames.WITH_ACCOUNT ]).findOne(query)
282 } 282 }
283 283
284 static loadByUrlAndPopulateReplyAndVideo (url: string, t?: Transaction) { 284 static loadByUrlAndPopulateReplyAndVideoUrlAndAccount (url: string, t?: Transaction) {
285 const query: FindOptions = { 285 const query: FindOptions = {
286 where: { 286 where: {
287 url 287 url
288 } 288 },
289 include: [
290 {
291 attributes: [ 'id', 'url' ],
292 model: VideoModel.unscoped()
293 }
294 ]
289 } 295 }
290 296
291 if (t !== undefined) query.transaction = t 297 if (t !== undefined) query.transaction = t
292 298
293 return VideoCommentModel.scope([ ScopeNames.WITH_IN_REPLY_TO, ScopeNames.WITH_VIDEO ]).findOne(query) 299 return VideoCommentModel.scope([ ScopeNames.WITH_IN_REPLY_TO, ScopeNames.WITH_ACCOUNT ]).findOne(query)
294 } 300 }
295 301
296 static async listThreadsForApi (parameters: { 302 static async listThreadsForApi (parameters: {
@@ -471,25 +477,11 @@ export class VideoCommentModel extends Model<VideoCommentModel> {
471 updatedAt: { 477 updatedAt: {
472 [Op.lt]: beforeUpdatedAt 478 [Op.lt]: beforeUpdatedAt
473 }, 479 },
474 videoId 480 videoId,
475 }, 481 accountId: {
476 include: [ 482 [Op.notIn]: buildLocalAccountIdsIn()
477 {
478 required: true,
479 model: AccountModel.unscoped(),
480 include: [
481 {
482 required: true,
483 model: ActorModel.unscoped(),
484 where: {
485 serverId: {
486 [Op.ne]: null
487 }
488 }
489 }
490 ]
491 } 483 }
492 ] 484 }
493 } 485 }
494 486
495 return VideoCommentModel.destroy(query) 487 return VideoCommentModel.destroy(query)
diff --git a/server/models/video/video-share.ts b/server/models/video/video-share.ts
index 3bab3c027..d8ed64557 100644
--- a/server/models/video/video-share.ts
+++ b/server/models/video/video-share.ts
@@ -4,7 +4,7 @@ import { isActivityPubUrlValid } from '../../helpers/custom-validators/activityp
4import { CONSTRAINTS_FIELDS } from '../../initializers/constants' 4import { CONSTRAINTS_FIELDS } from '../../initializers/constants'
5import { AccountModel } from '../account/account' 5import { AccountModel } from '../account/account'
6import { ActorModel } from '../activitypub/actor' 6import { ActorModel } from '../activitypub/actor'
7import { throwIfNotValid } from '../utils' 7import { buildLocalActorIdsIn, throwIfNotValid } from '../utils'
8import { VideoModel } from './video' 8import { VideoModel } from './video'
9import { VideoChannelModel } from './video-channel' 9import { VideoChannelModel } from './video-channel'
10import { Op, Transaction } from 'sequelize' 10import { Op, Transaction } from 'sequelize'
@@ -207,19 +207,11 @@ export class VideoShareModel extends Model<VideoShareModel> {
207 updatedAt: { 207 updatedAt: {
208 [Op.lt]: beforeUpdatedAt 208 [Op.lt]: beforeUpdatedAt
209 }, 209 },
210 videoId 210 videoId,
211 }, 211 actorId: {
212 include: [ 212 [Op.notIn]: buildLocalActorIdsIn()
213 {
214 model: ActorModel.unscoped(),
215 required: true,
216 where: {
217 serverId: {
218 [ Op.ne ]: null
219 }
220 }
221 } 213 }
222 ] 214 }
223 } 215 }
224 216
225 return VideoShareModel.destroy(query) 217 return VideoShareModel.destroy(query)
diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts
index a225443c5..420289bf4 100644
--- a/server/tests/api/server/handle-down.ts
+++ b/server/tests/api/server/handle-down.ts
@@ -19,8 +19,9 @@ import {
19 setAccessTokensToServers, 19 setAccessTokensToServers,
20 unfollow, 20 unfollow,
21 updateVideo, 21 updateVideo,
22 uploadVideo, 22 uploadVideo, uploadVideoAndGetId,
23 wait 23 wait,
24 setActorFollowScores, closeAllSequelize
24} from '../../../../shared/extra-utils' 25} from '../../../../shared/extra-utils'
25import { follow, getFollowersListPaginationAndSort } from '../../../../shared/extra-utils/server/follows' 26import { follow, getFollowersListPaginationAndSort } from '../../../../shared/extra-utils/server/follows'
26import { getJobsListPaginationAndSort, waitJobs } from '../../../../shared/extra-utils/server/jobs' 27import { getJobsListPaginationAndSort, waitJobs } from '../../../../shared/extra-utils/server/jobs'
@@ -43,6 +44,8 @@ describe('Test handle downs', function () {
43 let missedVideo2: Video 44 let missedVideo2: Video
44 let unlistedVideo: Video 45 let unlistedVideo: Video
45 46
47 let videoIdsServer1: number[] = []
48
46 const videoAttributes = { 49 const videoAttributes = {
47 name: 'my super name for server 1', 50 name: 'my super name for server 1',
48 category: 5, 51 category: 5,
@@ -299,7 +302,54 @@ describe('Test handle downs', function () {
299 } 302 }
300 }) 303 })
301 304
305 it('Should upload many videos on server 1', async function () {
306 this.timeout(120000)
307
308 for (let i = 0; i < 10; i++) {
309 const uuid = (await uploadVideoAndGetId({ server: servers[ 0 ], videoName: 'video ' + i })).uuid
310 videoIdsServer1.push(uuid)
311 }
312
313 await waitJobs(servers)
314
315 for (const id of videoIdsServer1) {
316 await getVideo(servers[ 1 ].url, id)
317 }
318
319 await waitJobs(servers)
320 await setActorFollowScores(servers[1].internalServerNumber, 20)
321
322 // Wait video expiration
323 await wait(11000)
324
325 // Refresh video -> score + 10 = 30
326 await getVideo(servers[1].url, videoIdsServer1[0])
327
328 await waitJobs(servers)
329 })
330
331 it('Should remove followings that are down', async function () {
332 this.timeout(120000)
333
334 killallServers([ servers[0] ])
335
336 // Wait video expiration
337 await wait(11000)
338
339 for (let i = 0; i < 3; i++) {
340 await getVideo(servers[1].url, videoIdsServer1[i])
341 await wait(1000)
342 await waitJobs([ servers[1] ])
343 }
344
345 for (const id of videoIdsServer1) {
346 await getVideo(servers[1].url, id, 403)
347 }
348 })
349
302 after(async function () { 350 after(async function () {
351 await closeAllSequelize([ servers[1] ])
352
303 await cleanupTests(servers) 353 await cleanupTests(servers)
304 }) 354 })
305}) 355})