1 import { map } from 'bluebird'
2 import { Job } from 'bullmq'
4 isAnnounceActivityValid,
5 isDislikeActivityValid,
7 } from '@server/helpers/custom-validators/activitypub/activity'
8 import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
9 import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
10 import { AP_CLEANER } from '@server/initializers/constants'
11 import { checkUrlsSameHost } from '@server/lib/activitypub/url'
12 import { Redis } from '@server/lib/redis'
13 import { VideoModel } from '@server/models/video/video'
14 import { VideoCommentModel } from '@server/models/video/video-comment'
15 import { VideoShareModel } from '@server/models/video/video-share'
16 import { HttpStatusCode } from '@shared/models'
17 import { logger, loggerTagsFactory } from '../../../helpers/logger'
18 import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
20 const lTags = loggerTagsFactory('ap-cleaner')
22 // Job to clean remote interactions off local videos
24 async function processActivityPubCleaner (_job: Job) {
25 logger.info('Processing ActivityPub cleaner.', lTags())
28 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
29 const { bodyValidator, deleter, updater } = rateOptionsFactory()
31 await map(rateUrls, async rateUrl => {
32 // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed
33 if (rateUrl.includes('#')) return
35 const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
37 if (result?.status === 'deleted') {
38 const { videoId, type } = result.data
40 await VideoModel.syncLocalRates(videoId, type, undefined)
42 }, { concurrency: AP_CLEANER.CONCURRENCY })
46 const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
47 const { bodyValidator, deleter, updater } = shareOptionsFactory()
49 await map(shareUrls, async shareUrl => {
50 await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
51 }, { concurrency: AP_CLEANER.CONCURRENCY })
55 const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
56 const { bodyValidator, deleter, updater } = commentOptionsFactory()
58 await map(commentUrls, async commentUrl => {
59 await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
60 }, { concurrency: AP_CLEANER.CONCURRENCY })
64 // ---------------------------------------------------------------------------
67 processActivityPubCleaner
70 // ---------------------------------------------------------------------------
72 async function updateObjectIfNeeded <T> (options: {
74 bodyValidator: (body: any) => boolean
75 updater: (url: string, newUrl: string) => Promise<T>
76 deleter: (url: string) => Promise<T> }
77 ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
78 const { url, bodyValidator, updater, deleter } = options
80 const on404OrTombstone = async () => {
81 logger.info('Removing remote AP object %s.', url, lTags(url))
82 const data = await deleter(url)
84 return { status: 'deleted' as 'deleted', data }
88 const { body } = await doJSONRequest<any>(url, { activityPub: true })
90 // If not same id, check same host and update
91 if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
93 if (body.type === 'Tombstone') {
94 return on404OrTombstone()
97 const newUrl = body.id
99 if (checkUrlsSameHost(newUrl, url) !== true) {
100 throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
103 logger.info('Updating remote AP object %s.', url, lTags(url))
104 const data = await updater(url, newUrl)
106 return { status: 'updated', data }
111 // Does not exist anymore, remove entry
112 if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) {
113 return on404OrTombstone()
116 logger.debug('Remote AP object %s is unavailable.', url, lTags(url))
118 const unavailability = await Redis.Instance.addAPUnavailability(url)
119 if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
120 logger.info('Removing unavailable AP resource %s.', url, lTags(url))
121 return on404OrTombstone()
128 function rateOptionsFactory () {
130 bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body),
132 updater: async (url: string, newUrl: string) => {
133 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
136 const videoId = rate.videoId
137 const type = rate.type
141 return { videoId, type }
144 deleter: async (url) => {
145 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
147 const videoId = rate.videoId
148 const type = rate.type
152 return { videoId, type }
157 function shareOptionsFactory () {
159 bodyValidator: (body: any) => isAnnounceActivityValid(body),
161 updater: async (url: string, newUrl: string) => {
162 const share = await VideoShareModel.loadByUrl(url, undefined)
170 deleter: async (url) => {
171 const share = await VideoShareModel.loadByUrl(url, undefined)
173 await share.destroy()
180 function commentOptionsFactory () {
182 bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body),
184 updater: async (url: string, newUrl: string) => {
185 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
193 deleter: async (url) => {
194 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
196 await comment.destroy()