From 6b9c966f6428c9e47bead3410a0401e8ebd744bf Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 6 Aug 2019 17:19:53 +0200 Subject: Automatically remove bad followings --- server/lib/activitypub/actor.ts | 10 +- server/lib/activitypub/process/process-create.ts | 11 +- server/lib/activitypub/video-comments.ts | 230 ++++++++++----------- server/lib/activitypub/videos.ts | 16 +- server/lib/files-cache/actor-follow-score-cache.ts | 28 ++- .../job-queue/handlers/activitypub-http-fetcher.ts | 2 +- server/lib/schedulers/actor-follow-scheduler.ts | 13 +- 7 files changed, 173 insertions(+), 137 deletions(-) (limited to 'server/lib') diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index 38eb87d1e..0e6596f10 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts @@ -254,14 +254,14 @@ async function refreshActorIfNeeded ( await actor.save({ transaction: t }) if (actor.Account) { - actor.Account.set('name', result.name) - actor.Account.set('description', result.summary) + actor.Account.name = result.name + actor.Account.description = result.summary await actor.Account.save({ transaction: t }) } else if (actor.VideoChannel) { - actor.VideoChannel.set('name', result.name) - actor.VideoChannel.set('description', result.summary) - actor.VideoChannel.set('support', result.support) + actor.VideoChannel.name = result.name + actor.VideoChannel.description = result.summary + actor.VideoChannel.support = result.support await actor.VideoChannel.save({ transaction: t }) } diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index a979771b6..b81021163 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts @@ -4,7 +4,7 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils' import { logger } from '../../../helpers/logger' import { sequelizeTypescript } from '../../../initializers' import { ActorModel } from '../../../models/activitypub/actor' -import { addVideoComment, resolveThread } from '../video-comments' +import { resolveThread } from '../video-comments' import { getOrCreateVideoAndAccountAndChannel } from '../videos' import { forwardVideoRelatedActivity } from '../send/utils' import { createOrUpdateCacheFile } from '../cache-file' @@ -13,6 +13,7 @@ import { PlaylistObject } from '../../../../shared/models/activitypub/objects/pl import { createOrUpdateVideoPlaylist } from '../playlist' import { VideoModel } from '../../../models/video/video' import { APProcessorOptions } from '../../../typings/activitypub-processor.model' +import { VideoCommentModel } from '../../../models/video/video-comment' async function processCreateActivity (options: APProcessorOptions) { const { activity, byActor } = options @@ -83,9 +84,13 @@ async function processCreateVideoComment (activity: ActivityCreate, byActor: Act if (!byAccount) throw new Error('Cannot create video comment with the non account actor ' + byActor.url) let video: VideoModel + let created: boolean + let comment: VideoCommentModel try { - const resolveThreadResult = await resolveThread(commentObject.inReplyTo) + const resolveThreadResult = await resolveThread({ url: commentObject.id, isVideo: false }) video = resolveThreadResult.video + created = resolveThreadResult.commentCreated + comment = resolveThreadResult.comment } catch (err) { logger.debug( 'Cannot process video comment because we could not resolve thread %s. Maybe it was not a video thread, so skip it.', @@ -95,8 +100,6 @@ async function processCreateVideoComment (activity: ActivityCreate, byActor: Act return } - const { comment, created } = await addVideoComment(video, commentObject.id) - if (video.isOwned() && created === true) { // Don't resend the activity to the sender const exceptions = [ byActor ] diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts index 921abdb8d..92e1a9020 100644 --- a/server/lib/activitypub/video-comments.ts +++ b/server/lib/activitypub/video-comments.ts @@ -1,9 +1,7 @@ -import { VideoCommentObject } from '../../../shared/models/activitypub/objects/video-comment-object' import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments' import { logger } from '../../helpers/logger' import { doRequest } from '../../helpers/requests' import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers/constants' -import { ActorModel } from '../../models/activitypub/actor' import { VideoModel } from '../../models/video/video' import { VideoCommentModel } from '../../models/video/video-comment' import { getOrCreateActorAndServerAndModel } from './actor' @@ -11,79 +9,53 @@ import { getOrCreateVideoAndAccountAndChannel } from './videos' import * as Bluebird from 'bluebird' import { checkUrlsSameHost } from '../../helpers/activitypub' -async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) { - let originCommentId: number = null - let inReplyToCommentId: number = null - - // If this is not a reply to the video (thread), create or get the parent comment - if (video.url !== comment.inReplyTo) { - const { comment: parent } = await addVideoComment(video, comment.inReplyTo) - if (!parent) { - logger.warn('Cannot fetch or get parent comment %s of comment %s.', comment.inReplyTo, comment.id) - return undefined - } - - originCommentId = parent.originCommentId || parent.id - inReplyToCommentId = parent.id - } - - return { - url: comment.id, - text: comment.content, - videoId: video.id, - accountId: actor.Account.id, - inReplyToCommentId, - originCommentId, - createdAt: new Date(comment.published) - } +type ResolveThreadParams = { + url: string, + comments?: VideoCommentModel[], + isVideo?: boolean, + commentCreated?: boolean } +type ResolveThreadResult = Promise<{ video: VideoModel, comment: VideoCommentModel, commentCreated: boolean }> -async function addVideoComments (commentUrls: string[], instance: VideoModel) { +async function addVideoComments (commentUrls: string[]) { return Bluebird.map(commentUrls, commentUrl => { - return addVideoComment(instance, commentUrl) + return resolveThread({ url: commentUrl, isVideo: false }) }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) } -async function addVideoComment (videoInstance: VideoModel, commentUrl: string) { - logger.info('Fetching remote video comment %s.', commentUrl) +async function resolveThread (params: ResolveThreadParams): ResolveThreadResult { + const { url, isVideo } = params + if (params.commentCreated === undefined) params.commentCreated = false + if (params.comments === undefined) params.comments = [] - const { body } = await doRequest({ - uri: commentUrl, - json: true, - activityPub: true - }) - - if (sanitizeAndCheckVideoCommentObject(body) === false) { - logger.debug('Remote video comment JSON %s is not valid.', commentUrl, { body }) - return { created: false } + // Already have this comment? + if (isVideo !== true) { + const result = await resolveCommentFromDB(params) + if (result) return result } - const actorUrl = body.attributedTo - if (!actorUrl) return { created: false } + try { + if (isVideo !== false) return await tryResolveThreadFromVideo(params) - if (checkUrlsSameHost(commentUrl, actorUrl) !== true) { - throw new Error(`Actor url ${actorUrl} has not the same host than the comment url ${commentUrl}`) - } + return resolveParentComment(params) + } catch (err) { + logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err }) - if (checkUrlsSameHost(body.id, commentUrl) !== true) { - throw new Error(`Comment url ${commentUrl} host is different from the AP object id ${body.id}`) + return resolveParentComment(params) } +} - const actor = await getOrCreateActorAndServerAndModel(actorUrl, 'all') - const entry = await videoCommentActivityObjectToDBAttributes(videoInstance, actor, body) - if (!entry) return { created: false } +export { + addVideoComments, + resolveThread +} - const [ comment, created ] = await VideoCommentModel.upsert(entry, { returning: true }) - comment.Account = actor.Account - comment.Video = videoInstance +// --------------------------------------------------------------------------- - return { comment, created } -} +async function resolveCommentFromDB (params: ResolveThreadParams) { + const { url, comments, commentCreated } = params -type ResolveThreadResult = Promise<{ video: VideoModel, parents: VideoCommentModel[] }> -async function resolveThread (url: string, comments: VideoCommentModel[] = []): ResolveThreadResult { - // Already have this comment? - const commentFromDatabase = await VideoCommentModel.loadByUrlAndPopulateReplyAndVideo(url) + const commentFromDatabase = await VideoCommentModel.loadByUrlAndPopulateReplyAndVideoUrlAndAccount(url) if (commentFromDatabase) { let parentComments = comments.concat([ commentFromDatabase ]) @@ -94,79 +66,97 @@ async function resolveThread (url: string, comments: VideoCommentModel[] = []): parentComments = parentComments.concat(data) } - return resolveThread(commentFromDatabase.Video.url, parentComments) + return resolveThread({ + url: commentFromDatabase.Video.url, + comments: parentComments, + isVideo: true, + commentCreated + }) } - try { - // Maybe it's a reply to a video? - // If yes, it's done: we resolved all the thread - const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: url }) - - if (comments.length !== 0) { - const firstReply = comments[ comments.length - 1 ] - firstReply.inReplyToCommentId = null - firstReply.originCommentId = null - firstReply.videoId = video.id - comments[comments.length - 1] = await firstReply.save() - - for (let i = comments.length - 2; i >= 0; i--) { - const comment = comments[ i ] - comment.originCommentId = firstReply.id - comment.inReplyToCommentId = comments[ i + 1 ].id - comment.videoId = video.id - - comments[i] = await comment.save() - } + return undefined +} + +async function tryResolveThreadFromVideo (params: ResolveThreadParams) { + const { url, comments, commentCreated } = params + + // Maybe it's a reply to a video? + // If yes, it's done: we resolved all the thread + const syncParam = { likes: true, dislikes: true, shares: true, comments: false, thumbnail: true, refreshVideo: false } + const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: url, syncParam }) + + let resultComment: VideoCommentModel + if (comments.length !== 0) { + const firstReply = comments[ comments.length - 1 ] + firstReply.inReplyToCommentId = null + firstReply.originCommentId = null + firstReply.videoId = video.id + firstReply.changed('updatedAt', true) + firstReply.Video = video + + comments[comments.length - 1] = await firstReply.save() + + for (let i = comments.length - 2; i >= 0; i--) { + const comment = comments[ i ] + comment.originCommentId = firstReply.id + comment.inReplyToCommentId = comments[ i + 1 ].id + comment.videoId = video.id + comment.changed('updatedAt', true) + comment.Video = video + + comments[i] = await comment.save() } - return { video, parents: comments } - } catch (err) { - logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err }) + resultComment = comments[0] + } - if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) { - throw new Error('Recursion limit reached when resolving a thread') - } + return { video, comment: resultComment, commentCreated } +} - const { body } = await doRequest({ - uri: url, - json: true, - activityPub: true - }) +async function resolveParentComment (params: ResolveThreadParams) { + const { url, comments } = params - if (sanitizeAndCheckVideoCommentObject(body) === false) { - throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body)) - } + if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) { + throw new Error('Recursion limit reached when resolving a thread') + } - const actorUrl = body.attributedTo - if (!actorUrl) throw new Error('Miss attributed to in comment') + const { body } = await doRequest({ + uri: url, + json: true, + activityPub: true + }) - if (checkUrlsSameHost(url, actorUrl) !== true) { - throw new Error(`Actor url ${actorUrl} has not the same host than the comment url ${url}`) - } + if (sanitizeAndCheckVideoCommentObject(body) === false) { + throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body)) + } - if (checkUrlsSameHost(body.id, url) !== true) { - throw new Error(`Comment url ${url} host is different from the AP object id ${body.id}`) - } + const actorUrl = body.attributedTo + if (!actorUrl) throw new Error('Miss attributed to in comment') - const actor = await getOrCreateActorAndServerAndModel(actorUrl) - const comment = new VideoCommentModel({ - url: body.id, - text: body.content, - videoId: null, - accountId: actor.Account.id, - inReplyToCommentId: null, - originCommentId: null, - createdAt: new Date(body.published), - updatedAt: new Date(body.updated) - }) + if (checkUrlsSameHost(url, actorUrl) !== true) { + throw new Error(`Actor url ${actorUrl} has not the same host than the comment url ${url}`) + } - return resolveThread(body.inReplyTo, comments.concat([ comment ])) + if (checkUrlsSameHost(body.id, url) !== true) { + throw new Error(`Comment url ${url} host is different from the AP object id ${body.id}`) } -} -export { - videoCommentActivityObjectToDBAttributes, - addVideoComments, - addVideoComment, - resolveThread + const actor = await getOrCreateActorAndServerAndModel(actorUrl) + const comment = new VideoCommentModel({ + url: body.id, + text: body.content, + videoId: null, + accountId: actor.Account.id, + inReplyToCommentId: null, + originCommentId: null, + createdAt: new Date(body.published), + updatedAt: new Date(body.updated) + }) + comment.Account = actor.Account + + return resolveThread({ + url: body.inReplyTo, + comments: comments.concat([ comment ]), + commentCreated: true + }) } diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index d7bc3d650..2102702e1 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -56,6 +56,7 @@ import { join } from 'path' import { FilteredModelAttributes } from '../../typings/sequelize' import { Hooks } from '../plugins/hooks' import { autoBlacklistVideoIfNeeded } from '../video-blacklist' +import { ActorFollowScoreCache } from '../files-cache' async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { if ( @@ -182,7 +183,7 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid } if (syncParam.comments === true) { - const handler = items => addVideoComments(items, video) + const handler = items => addVideoComments(items) const cleaner = crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate) await crawlCollectionPage(fetchedVideo.comments, handler, cleaner) @@ -421,10 +422,14 @@ async function refreshVideoIfNeeded (options: { await retryTransactionWrapper(updateVideoFromAP, updateOptions) await syncVideoExternalAttributes(video, videoObject, options.syncParam) + ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) + return video } catch (err) { logger.warn('Cannot refresh video %s.', options.video.url, { err }) + ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) + // Don't refresh in loop await video.setAsRefreshed() return video @@ -500,7 +505,7 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor const videoStreamingPlaylists = streamingPlaylistActivityUrlToDBAttributes(videoCreated, videoObject, videoFiles) const playlistPromises = videoStreamingPlaylists.map(p => VideoStreamingPlaylistModel.create(p, { transaction: t })) - await Promise.all(playlistPromises) + const streamingPlaylists = await Promise.all(playlistPromises) // Process tags const tags = videoObject.tag @@ -513,7 +518,12 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor const videoCaptionsPromises = videoObject.subtitleLanguage.map(c => { return VideoCaptionModel.insertOrReplaceLanguage(videoCreated.id, c.identifier, t) }) - await Promise.all(videoCaptionsPromises) + const captions = await Promise.all(videoCaptionsPromises) + + video.VideoFiles = videoFiles + video.VideoStreamingPlaylists = streamingPlaylists + video.Tags = tagInstances + video.VideoCaptions = captions const autoBlacklisted = await autoBlacklistVideoIfNeeded({ video, diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/files-cache/actor-follow-score-cache.ts index 5f8ee806f..086605726 100644 --- a/server/lib/files-cache/actor-follow-score-cache.ts +++ b/server/lib/files-cache/actor-follow-score-cache.ts @@ -7,6 +7,8 @@ class ActorFollowScoreCache { private static instance: ActorFollowScoreCache private pendingFollowsScore: { [ url: string ]: number } = {} + private pendingBadServer = new Set() + private pendingGoodServer = new Set() private constructor () {} @@ -32,7 +34,31 @@ class ActorFollowScoreCache { } } - getPendingFollowsScoreCopy () { + addBadServerId (serverId: number) { + this.pendingBadServer.add(serverId) + } + + getBadFollowingServerIds () { + return Array.from(this.pendingBadServer) + } + + clearBadFollowingServerIds () { + this.pendingBadServer = new Set() + } + + addGoodServerId (serverId: number) { + this.pendingGoodServer.add(serverId) + } + + getGoodFollowingServerIds () { + return Array.from(this.pendingGoodServer) + } + + clearGoodFollowingServerIds () { + this.pendingGoodServer = new Set() + } + + getPendingFollowsScore () { return this.pendingFollowsScore } diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 4da645f07..c3f59dc77 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -37,7 +37,7 @@ async function processActivityPubHttpFetcher (job: Bull.Job) { 'video-likes': items => createRates(items, video, 'like'), 'video-dislikes': items => createRates(items, video, 'dislike'), 'video-shares': items => addVideoShares(items, video), - 'video-comments': items => addVideoComments(items, video), + 'video-comments': items => addVideoComments(items), 'account-playlists': items => createAccountPlaylists(items, account) } diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts index fdd3ad5fa..598c0211f 100644 --- a/server/lib/schedulers/actor-follow-scheduler.ts +++ b/server/lib/schedulers/actor-follow-scheduler.ts @@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils' import { logger } from '../../helpers/logger' import { ActorFollowModel } from '../../models/activitypub/actor-follow' import { AbstractScheduler } from './abstract-scheduler' -import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { ActorFollowScoreCache } from '../files-cache' export class ActorFollowScheduler extends AbstractScheduler { @@ -22,13 +22,20 @@ export class ActorFollowScheduler extends AbstractScheduler { } private async processPendingScores () { - const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() + const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore() + const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds() + const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds() ActorFollowScoreCache.Instance.clearPendingFollowsScore() + ActorFollowScoreCache.Instance.clearBadFollowingServerIds() + ActorFollowScoreCache.Instance.clearGoodFollowingServerIds() for (const inbox of Object.keys(pendingScores)) { - await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) + await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) } + + await ActorFollowModel.updateScoreByFollowingServers(badServerIds, ACTOR_FOLLOW_SCORE.PENALTY) + await ActorFollowModel.updateScoreByFollowingServers(goodServerIds, ACTOR_FOLLOW_SCORE.BONUS) } private async removeBadActorFollows () { -- cgit v1.2.3