X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Factivitypub-cleaner.ts;h=a25f00b0a8491dad5ead3ac7013940d00a0e4a56;hb=99b757488c077cee7d0ab89eeec181a7ee6290eb;hp=b58bbc98347124be85b7da90253a785730811623;hpb=74d249bc1346c7cfaac7ee49bebbebcf2a01f82a;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index b58bbc983..a25f00b0a 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -1,66 +1,63 @@ -import * as Bluebird from 'bluebird' -import * as Bull from 'bull' -import { checkUrlsSameHost } from '@server/helpers/activitypub' -import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' -import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' +import { map } from 'bluebird' +import { Job } from 'bullmq' +import { + isAnnounceActivityValid, + isDislikeActivityValid, + isLikeActivityValid +} from '@server/helpers/custom-validators/activitypub/activity' import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' -import { doRequest } from '@server/helpers/requests' -import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' +import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' +import { AP_CLEANER } from '@server/initializers/constants' +import { checkUrlsSameHost } from '@server/lib/activitypub/url' +import { Redis } from '@server/lib/redis' import { VideoModel } from '@server/models/video/video' import { VideoCommentModel } from '@server/models/video/video-comment' import { VideoShareModel } from '@server/models/video/video-share' -import { HttpStatusCode } from '@shared/core-utils' -import { logger } from '../../../helpers/logger' +import { HttpStatusCode } from '@shared/models' +import { logger, loggerTagsFactory } from '../../../helpers/logger' import { AccountVideoRateModel } from '../../../models/account/account-video-rate' +const lTags = loggerTagsFactory('ap-cleaner') + // Job to clean remote interactions off local videos -async function processActivityPubCleaner (_job: Bull.Job) { - logger.info('Processing ActivityPub cleaner.') +async function processActivityPubCleaner (_job: Job) { + logger.info('Processing ActivityPub cleaner.', lTags()) { const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = rateOptionsFactory() - await Bluebird.map(rateUrls, async rateUrl => { - try { - const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) + await map(rateUrls, async rateUrl => { + // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed + if (rateUrl.includes('#')) return + + const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) - if (result?.status === 'deleted') { - const { videoId, type } = result.data + if (result?.status === 'deleted') { + const { videoId, type } = result.data - await VideoModel.updateRatesOf(videoId, type, undefined) - } - } catch (err) { - logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) + await VideoModel.syncLocalRates(videoId, type, undefined) } - }, { concurrency: AP_CLEANER_CONCURRENCY }) + }, { concurrency: AP_CLEANER.CONCURRENCY }) } { const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = shareOptionsFactory() - await Bluebird.map(shareUrls, async shareUrl => { - try { - await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) - } catch (err) { - logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) - } - }, { concurrency: AP_CLEANER_CONCURRENCY }) + await map(shareUrls, async shareUrl => { + await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) + }, { concurrency: AP_CLEANER.CONCURRENCY }) } { const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = commentOptionsFactory() - await Bluebird.map(commentUrls, async commentUrl => { - try { - await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) - } catch (err) { - logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) - } - }, { concurrency: AP_CLEANER_CONCURRENCY }) + await map(commentUrls, async commentUrl => { + await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) + }, { concurrency: AP_CLEANER.CONCURRENCY }) } } @@ -72,50 +69,60 @@ export { // --------------------------------------------------------------------------- -async function updateObjectIfNeeded ( - url: string, - bodyValidator: (body: any) => boolean, - updater: (url: string, newUrl: string) => Promise, - deleter: (url: string) => Promise +async function updateObjectIfNeeded (options: { + url: string + bodyValidator: (body: any) => boolean + updater: (url: string, newUrl: string) => Promise + deleter: (url: string) => Promise } ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { - // Fetch url - const { response, body } = await doRequest({ - uri: url, - json: true, - activityPub: true - }) - - // Does not exist anymore, remove entry - if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { - logger.info('Removing remote AP object %s.', url) + const { url, bodyValidator, updater, deleter } = options + + const on404OrTombstone = async () => { + logger.info('Removing remote AP object %s.', url, lTags(url)) const data = await deleter(url) - return { status: 'deleted', data } + return { status: 'deleted' as 'deleted', data } } - // If not same id, check same host and update - if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) + try { + const { body } = await doJSONRequest(url, { activityPub: true }) - if (body.type === 'Tombstone') { - logger.info('Removing remote AP object %s.', url) - const data = await deleter(url) + // If not same id, check same host and update + if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) - return { status: 'deleted', data } - } + if (body.type === 'Tombstone') { + return on404OrTombstone() + } - const newUrl = body.id - if (newUrl !== url) { - if (checkUrlsSameHost(newUrl, url) !== true) { - throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) + const newUrl = body.id + if (newUrl !== url) { + if (checkUrlsSameHost(newUrl, url) !== true) { + throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) + } + + logger.info('Updating remote AP object %s.', url, lTags(url)) + const data = await updater(url, newUrl) + + return { status: 'updated', data } + } + + return null + } catch (err) { + // Does not exist anymore, remove entry + if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) { + return on404OrTombstone() } - logger.info('Updating remote AP object %s.', url) - const data = await updater(url, newUrl) + logger.debug('Remote AP object %s is unavailable.', url, lTags(url)) - return { status: 'updated', data } - } + const unavailability = await Redis.Instance.addAPUnavailability(url) + if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { + logger.info('Removing unavailable AP resource %s.', url, lTags(url)) + return on404OrTombstone() + } - return null + return null + } } function rateOptionsFactory () { @@ -149,7 +156,7 @@ function rateOptionsFactory () { function shareOptionsFactory () { return { - bodyValidator: (body: any) => isShareActivityValid(body), + bodyValidator: (body: any) => isAnnounceActivityValid(body), updater: async (url: string, newUrl: string) => { const share = await VideoShareModel.loadByUrl(url, undefined)