From 74d249bc1346c7cfaac7ee49bebbebcf2a01f82a Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 26 Feb 2021 16:26:27 +0100 Subject: Add ability to cleanup remote AP interactions --- .../lib/job-queue/handlers/activitypub-cleaner.ts | 194 +++++++++++++++++++++ server/lib/job-queue/handlers/actor-keys.ts | 2 +- server/lib/job-queue/job-queue.ts | 10 ++ 3 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 server/lib/job-queue/handlers/activitypub-cleaner.ts (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts new file mode 100644 index 000000000..b58bbc983 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -0,0 +1,194 @@ +import * as Bluebird from 'bluebird' +import * as Bull from 'bull' +import { checkUrlsSameHost } from '@server/helpers/activitypub' +import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' +import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' +import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' +import { doRequest } from '@server/helpers/requests' +import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' +import { VideoCommentModel } from '@server/models/video/video-comment' +import { VideoShareModel } from '@server/models/video/video-share' +import { HttpStatusCode } from '@shared/core-utils' +import { logger } from '../../../helpers/logger' +import { AccountVideoRateModel } from '../../../models/account/account-video-rate' + +// Job to clean remote interactions off local videos + +async function processActivityPubCleaner (_job: Bull.Job) { + logger.info('Processing ActivityPub cleaner.') + + { + const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() + const { bodyValidator, deleter, updater } = rateOptionsFactory() + + await Bluebird.map(rateUrls, async rateUrl => { + try { + const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) + + if (result?.status === 'deleted') { + const { videoId, type } = result.data + + await VideoModel.updateRatesOf(videoId, type, undefined) + } + } catch (err) { + logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) + } + }, { concurrency: AP_CLEANER_CONCURRENCY }) + } + + { + const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() + const { bodyValidator, deleter, updater } = shareOptionsFactory() + + await Bluebird.map(shareUrls, async shareUrl => { + try { + await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) + } catch (err) { + logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) + } + }, { concurrency: AP_CLEANER_CONCURRENCY }) + } + + { + const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() + const { bodyValidator, deleter, updater } = commentOptionsFactory() + + await Bluebird.map(commentUrls, async commentUrl => { + try { + await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) + } catch (err) { + logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) + } + }, { concurrency: AP_CLEANER_CONCURRENCY }) + } +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubCleaner +} + +// --------------------------------------------------------------------------- + +async function updateObjectIfNeeded ( + url: string, + bodyValidator: (body: any) => boolean, + updater: (url: string, newUrl: string) => Promise, + deleter: (url: string) => Promise +): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { + // Fetch url + const { response, body } = await doRequest({ + uri: url, + json: true, + activityPub: true + }) + + // Does not exist anymore, remove entry + if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { + logger.info('Removing remote AP object %s.', url) + const data = await deleter(url) + + return { status: 'deleted', data } + } + + // If not same id, check same host and update + if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) + + if (body.type === 'Tombstone') { + logger.info('Removing remote AP object %s.', url) + const data = await deleter(url) + + return { status: 'deleted', data } + } + + const newUrl = body.id + if (newUrl !== url) { + if (checkUrlsSameHost(newUrl, url) !== true) { + throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) + } + + logger.info('Updating remote AP object %s.', url) + const data = await updater(url, newUrl) + + return { status: 'updated', data } + } + + return null +} + +function rateOptionsFactory () { + return { + bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), + + updater: async (url: string, newUrl: string) => { + const rate = await AccountVideoRateModel.loadByUrl(url, undefined) + rate.url = newUrl + + const videoId = rate.videoId + const type = rate.type + + await rate.save() + + return { videoId, type } + }, + + deleter: async (url) => { + const rate = await AccountVideoRateModel.loadByUrl(url, undefined) + + const videoId = rate.videoId + const type = rate.type + + await rate.destroy() + + return { videoId, type } + } + } +} + +function shareOptionsFactory () { + return { + bodyValidator: (body: any) => isShareActivityValid(body), + + updater: async (url: string, newUrl: string) => { + const share = await VideoShareModel.loadByUrl(url, undefined) + share.url = newUrl + + await share.save() + + return undefined + }, + + deleter: async (url) => { + const share = await VideoShareModel.loadByUrl(url, undefined) + + await share.destroy() + + return undefined + } + } +} + +function commentOptionsFactory () { + return { + bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), + + updater: async (url: string, newUrl: string) => { + const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) + comment.url = newUrl + + await comment.save() + + return undefined + }, + + deleter: async (url) => { + const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) + + await comment.destroy() + + return undefined + } + } +} diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 8da549640..125307843 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts @@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' async function processActorKeys (job: Bull.Job) { const payload = job.data as ActorKeysPayload - logger.info('Processing email in job %d.', job.id) + logger.info('Processing actor keys in job %d.', job.id) const actor = await ActorModel.load(payload.actorId) diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index efda2e038..42e8347b1 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -21,6 +21,7 @@ import { import { logger } from '../../helpers/logger' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { Redis } from '../redis' +import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' @@ -38,6 +39,7 @@ type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | + { type: 'activitypub-http-cleaner', payload: {} } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-transcoding', payload: VideoTranscodingPayload } | @@ -58,6 +60,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-cleaner': processActivityPubCleaner, 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, 'video-transcoding': processVideoTranscoding, @@ -75,6 +78,7 @@ const jobTypes: JobType[] = [ 'activitypub-http-broadcast', 'activitypub-http-fetcher', 'activitypub-http-unicast', + 'activitypub-cleaner', 'email', 'video-transcoding', 'video-file-import', @@ -233,6 +237,12 @@ class JobQueue { this.queues['videos-views'].add({}, { repeat: REPEAT_JOBS['videos-views'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) + + if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { + this.queues['activitypub-cleaner'].add({}, { + repeat: REPEAT_JOBS['activitypub-cleaner'] + }).catch(err => logger.error('Cannot add repeatable job.', { err })) + } } private filterJobTypes (jobType?: JobType) { -- cgit v1.2.3