X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Factivitypub-cleaner.ts;h=a25f00b0a8491dad5ead3ac7013940d00a0e4a56;hb=26818a73ba0d7fd53ca69eba0c8e525f3670b5a8;hp=1540bf23af23bb956af6dbfde8792874675a43e0;hpb=f1569117f9bc468bc38b5b8d32df1bce40cc42ad;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index 1540bf23a..a25f00b0a 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts @@ -1,6 +1,5 @@ import { map } from 'bluebird' -import { Job } from 'bull' -import { checkUrlsSameHost } from '@server/helpers/activitypub' +import { Job } from 'bullmq' import { isAnnounceActivityValid, isDislikeActivityValid, @@ -9,34 +8,36 @@ import { import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' import { AP_CLEANER } from '@server/initializers/constants' +import { checkUrlsSameHost } from '@server/lib/activitypub/url' import { Redis } from '@server/lib/redis' import { VideoModel } from '@server/models/video/video' import { VideoCommentModel } from '@server/models/video/video-comment' import { VideoShareModel } from '@server/models/video/video-share' import { HttpStatusCode } from '@shared/models' -import { logger } from '../../../helpers/logger' +import { logger, loggerTagsFactory } from '../../../helpers/logger' import { AccountVideoRateModel } from '../../../models/account/account-video-rate' +const lTags = loggerTagsFactory('ap-cleaner') + // Job to clean remote interactions off local videos async function processActivityPubCleaner (_job: Job) { - logger.info('Processing ActivityPub cleaner.') + logger.info('Processing ActivityPub cleaner.', lTags()) { const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() const { bodyValidator, deleter, updater } = rateOptionsFactory() await map(rateUrls, async rateUrl => { - try { - const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) + // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed + if (rateUrl.includes('#')) return + + const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) - if (result?.status === 'deleted') { - const { videoId, type } = result.data + if (result?.status === 'deleted') { + const { videoId, type } = result.data - await VideoModel.updateRatesOf(videoId, type, undefined) - } - } catch (err) { - logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) + await VideoModel.syncLocalRates(videoId, type, undefined) } }, { concurrency: AP_CLEANER.CONCURRENCY }) } @@ -46,11 +47,7 @@ async function processActivityPubCleaner (_job: Job) { const { bodyValidator, deleter, updater } = shareOptionsFactory() await map(shareUrls, async shareUrl => { - try { - await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) - } catch (err) { - logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) - } + await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) }, { concurrency: AP_CLEANER.CONCURRENCY }) } @@ -59,11 +56,7 @@ async function processActivityPubCleaner (_job: Job) { const { bodyValidator, deleter, updater } = commentOptionsFactory() await map(commentUrls, async commentUrl => { - try { - await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) - } catch (err) { - logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) - } + await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) }, { concurrency: AP_CLEANER.CONCURRENCY }) } } @@ -85,7 +78,7 @@ async function updateObjectIfNeeded (options: { const { url, bodyValidator, updater, deleter } = options const on404OrTombstone = async () => { - logger.info('Removing remote AP object %s.', url) + logger.info('Removing remote AP object %s.', url, lTags(url)) const data = await deleter(url) return { status: 'deleted' as 'deleted', data } @@ -95,7 +88,7 @@ async function updateObjectIfNeeded (options: { const { body } = await doJSONRequest(url, { activityPub: true }) // If not same id, check same host and update - if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) + if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) if (body.type === 'Tombstone') { return on404OrTombstone() @@ -107,7 +100,7 @@ async function updateObjectIfNeeded (options: { throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) } - logger.info('Updating remote AP object %s.', url) + logger.info('Updating remote AP object %s.', url, lTags(url)) const data = await updater(url, newUrl) return { status: 'updated', data } @@ -120,11 +113,11 @@ async function updateObjectIfNeeded (options: { return on404OrTombstone() } - logger.debug('Remote AP object %s is unavailable.', url) + logger.debug('Remote AP object %s is unavailable.', url, lTags(url)) const unavailability = await Redis.Instance.addAPUnavailability(url) if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { - logger.info('Removing unavailable AP resource %s.', url) + logger.info('Removing unavailable AP resource %s.', url, lTags(url)) return on404OrTombstone() }