-import * as Bluebird from 'bluebird'
-import * as Bull from 'bull'
-import { checkUrlsSameHost } from '@server/helpers/activitypub'
-import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate'
-import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share'
+import { map } from 'bluebird'
+import { Job } from 'bullmq'
+import {
+ isAnnounceActivityValid,
+ isDislikeActivityValid,
+ isLikeActivityValid
+} from '@server/helpers/custom-validators/activitypub/activity'
import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
-import { doRequest } from '@server/helpers/requests'
-import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants'
+import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
+import { AP_CLEANER } from '@server/initializers/constants'
+import { checkUrlsSameHost } from '@server/lib/activitypub/url'
+import { Redis } from '@server/lib/redis'
import { VideoModel } from '@server/models/video/video'
import { VideoCommentModel } from '@server/models/video/video-comment'
import { VideoShareModel } from '@server/models/video/video-share'
-import { HttpStatusCode } from '@shared/core-utils'
-import { logger } from '../../../helpers/logger'
+import { HttpStatusCode } from '@shared/models'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
+const lTags = loggerTagsFactory('ap-cleaner')
+
// Job to clean remote interactions off local videos
-async function processActivityPubCleaner (_job: Bull.Job) {
- logger.info('Processing ActivityPub cleaner.')
+async function processActivityPubCleaner (_job: Job) {
+ logger.info('Processing ActivityPub cleaner.', lTags())
{
const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
const { bodyValidator, deleter, updater } = rateOptionsFactory()
- await Bluebird.map(rateUrls, async rateUrl => {
- try {
- const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
+ await map(rateUrls, async rateUrl => {
+ // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed
+ if (rateUrl.includes('#')) return
+
+ const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
- if (result?.status === 'deleted') {
- const { videoId, type } = result.data
+ if (result?.status === 'deleted') {
+ const { videoId, type } = result.data
- await VideoModel.updateRatesOf(videoId, type, undefined)
- }
- } catch (err) {
- logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
+ await VideoModel.syncLocalRates(videoId, type, undefined)
}
- }, { concurrency: AP_CLEANER_CONCURRENCY })
+ }, { concurrency: AP_CLEANER.CONCURRENCY })
}
{
const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
const { bodyValidator, deleter, updater } = shareOptionsFactory()
- await Bluebird.map(shareUrls, async shareUrl => {
- try {
- await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter)
- } catch (err) {
- logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
- }
- }, { concurrency: AP_CLEANER_CONCURRENCY })
+ await map(shareUrls, async shareUrl => {
+ await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
+ }, { concurrency: AP_CLEANER.CONCURRENCY })
}
{
const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
const { bodyValidator, deleter, updater } = commentOptionsFactory()
- await Bluebird.map(commentUrls, async commentUrl => {
- try {
- await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter)
- } catch (err) {
- logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
- }
- }, { concurrency: AP_CLEANER_CONCURRENCY })
+ await map(commentUrls, async commentUrl => {
+ await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
+ }, { concurrency: AP_CLEANER.CONCURRENCY })
}
}
// ---------------------------------------------------------------------------
-async function updateObjectIfNeeded <T> (
- url: string,
- bodyValidator: (body: any) => boolean,
- updater: (url: string, newUrl: string) => Promise<T>,
- deleter: (url: string) => Promise<T>
+async function updateObjectIfNeeded <T> (options: {
+ url: string
+ bodyValidator: (body: any) => boolean
+ updater: (url: string, newUrl: string) => Promise<T>
+ deleter: (url: string) => Promise<T> }
): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
- // Fetch url
- const { response, body } = await doRequest<any>({
- uri: url,
- json: true,
- activityPub: true
- })
-
- // Does not exist anymore, remove entry
- if (response.statusCode === HttpStatusCode.NOT_FOUND_404) {
- logger.info('Removing remote AP object %s.', url)
+ const { url, bodyValidator, updater, deleter } = options
+
+ const on404OrTombstone = async () => {
+ logger.info('Removing remote AP object %s.', url, lTags(url))
const data = await deleter(url)
- return { status: 'deleted', data }
+ return { status: 'deleted' as 'deleted', data }
}
- // If not same id, check same host and update
- if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
+ try {
+ const { body } = await doJSONRequest<any>(url, { activityPub: true })
- if (body.type === 'Tombstone') {
- logger.info('Removing remote AP object %s.', url)
- const data = await deleter(url)
+ // If not same id, check same host and update
+ if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
- return { status: 'deleted', data }
- }
+ if (body.type === 'Tombstone') {
+ return on404OrTombstone()
+ }
- const newUrl = body.id
- if (newUrl !== url) {
- if (checkUrlsSameHost(newUrl, url) !== true) {
- throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
+ const newUrl = body.id
+ if (newUrl !== url) {
+ if (checkUrlsSameHost(newUrl, url) !== true) {
+ throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
+ }
+
+ logger.info('Updating remote AP object %s.', url, lTags(url))
+ const data = await updater(url, newUrl)
+
+ return { status: 'updated', data }
+ }
+
+ return null
+ } catch (err) {
+ // Does not exist anymore, remove entry
+ if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) {
+ return on404OrTombstone()
}
- logger.info('Updating remote AP object %s.', url)
- const data = await updater(url, newUrl)
+ logger.debug('Remote AP object %s is unavailable.', url, lTags(url))
- return { status: 'updated', data }
- }
+ const unavailability = await Redis.Instance.addAPUnavailability(url)
+ if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
+ logger.info('Removing unavailable AP resource %s.', url, lTags(url))
+ return on404OrTombstone()
+ }
- return null
+ return null
+ }
}
function rateOptionsFactory () {
function shareOptionsFactory () {
return {
- bodyValidator: (body: any) => isShareActivityValid(body),
+ bodyValidator: (body: any) => isAnnounceActivityValid(body),
updater: async (url: string, newUrl: string) => {
const share = await VideoShareModel.loadByUrl(url, undefined)