]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/job-queue/handlers/activitypub-cleaner.ts
Cleanup unavailable remote AP resource
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / activitypub-cleaner.ts
1 import { map } from 'bluebird'
2 import { Job } from 'bull'
3 import { checkUrlsSameHost } from '@server/helpers/activitypub'
4 import {
5 isAnnounceActivityValid,
6 isDislikeActivityValid,
7 isLikeActivityValid
8 } from '@server/helpers/custom-validators/activitypub/activity'
9 import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
10 import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
11 import { AP_CLEANER } from '@server/initializers/constants'
12 import { Redis } from '@server/lib/redis'
13 import { VideoModel } from '@server/models/video/video'
14 import { VideoCommentModel } from '@server/models/video/video-comment'
15 import { VideoShareModel } from '@server/models/video/video-share'
16 import { HttpStatusCode } from '@shared/models'
17 import { logger } from '../../../helpers/logger'
18 import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
19
20 // Job to clean remote interactions off local videos
21
22 async function processActivityPubCleaner (_job: Job) {
23 logger.info('Processing ActivityPub cleaner.')
24
25 {
26 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
27 const { bodyValidator, deleter, updater } = rateOptionsFactory()
28
29 await map(rateUrls, async rateUrl => {
30 try {
31 const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
32
33 if (result?.status === 'deleted') {
34 const { videoId, type } = result.data
35
36 await VideoModel.updateRatesOf(videoId, type, undefined)
37 }
38 } catch (err) {
39 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
40 }
41 }, { concurrency: AP_CLEANER.CONCURRENCY })
42 }
43
44 {
45 const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
46 const { bodyValidator, deleter, updater } = shareOptionsFactory()
47
48 await map(shareUrls, async shareUrl => {
49 try {
50 await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
51 } catch (err) {
52 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
53 }
54 }, { concurrency: AP_CLEANER.CONCURRENCY })
55 }
56
57 {
58 const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
59 const { bodyValidator, deleter, updater } = commentOptionsFactory()
60
61 await map(commentUrls, async commentUrl => {
62 try {
63 await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
64 } catch (err) {
65 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
66 }
67 }, { concurrency: AP_CLEANER.CONCURRENCY })
68 }
69 }
70
71 // ---------------------------------------------------------------------------
72
73 export {
74 processActivityPubCleaner
75 }
76
77 // ---------------------------------------------------------------------------
78
79 async function updateObjectIfNeeded <T> (options: {
80 url: string
81 bodyValidator: (body: any) => boolean
82 updater: (url: string, newUrl: string) => Promise<T>
83 deleter: (url: string) => Promise<T> }
84 ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
85 const { url, bodyValidator, updater, deleter } = options
86
87 const on404OrTombstone = async () => {
88 logger.info('Removing remote AP object %s.', url)
89 const data = await deleter(url)
90
91 return { status: 'deleted' as 'deleted', data }
92 }
93
94 try {
95 const { body } = await doJSONRequest<any>(url, { activityPub: true })
96
97 // If not same id, check same host and update
98 if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
99
100 if (body.type === 'Tombstone') {
101 return on404OrTombstone()
102 }
103
104 const newUrl = body.id
105 if (newUrl !== url) {
106 if (checkUrlsSameHost(newUrl, url) !== true) {
107 throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
108 }
109
110 logger.info('Updating remote AP object %s.', url)
111 const data = await updater(url, newUrl)
112
113 return { status: 'updated', data }
114 }
115
116 return null
117 } catch (err) {
118 // Does not exist anymore, remove entry
119 if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) {
120 return on404OrTombstone()
121 }
122
123 logger.debug('Remote AP object %s is unavailable.', url)
124
125 const unavailability = await Redis.Instance.addAPUnavailability(url)
126 if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
127 logger.info('Removing unavailable AP resource %s.', url)
128 return on404OrTombstone()
129 }
130
131 return null
132 }
133 }
134
135 function rateOptionsFactory () {
136 return {
137 bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body),
138
139 updater: async (url: string, newUrl: string) => {
140 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
141 rate.url = newUrl
142
143 const videoId = rate.videoId
144 const type = rate.type
145
146 await rate.save()
147
148 return { videoId, type }
149 },
150
151 deleter: async (url) => {
152 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
153
154 const videoId = rate.videoId
155 const type = rate.type
156
157 await rate.destroy()
158
159 return { videoId, type }
160 }
161 }
162 }
163
164 function shareOptionsFactory () {
165 return {
166 bodyValidator: (body: any) => isAnnounceActivityValid(body),
167
168 updater: async (url: string, newUrl: string) => {
169 const share = await VideoShareModel.loadByUrl(url, undefined)
170 share.url = newUrl
171
172 await share.save()
173
174 return undefined
175 },
176
177 deleter: async (url) => {
178 const share = await VideoShareModel.loadByUrl(url, undefined)
179
180 await share.destroy()
181
182 return undefined
183 }
184 }
185 }
186
187 function commentOptionsFactory () {
188 return {
189 bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body),
190
191 updater: async (url: string, newUrl: string) => {
192 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
193 comment.url = newUrl
194
195 await comment.save()
196
197 return undefined
198 },
199
200 deleter: async (url) => {
201 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
202
203 await comment.destroy()
204
205 return undefined
206 }
207 }
208 }