diff options
author | Chocobozzz <me@florianbigard.com> | 2021-02-26 16:26:27 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-03-03 10:08:08 +0100 |
commit | 74d249bc1346c7cfaac7ee49bebbebcf2a01f82a (patch) | |
tree | d47bd163ae57ed8f15b445296634cc04f4f67b6f /server/lib/job-queue | |
parent | 095e2258043fcff8a79ab082d11edfbd8f13a8e2 (diff) | |
download | PeerTube-74d249bc1346c7cfaac7ee49bebbebcf2a01f82a.tar.gz PeerTube-74d249bc1346c7cfaac7ee49bebbebcf2a01f82a.tar.zst PeerTube-74d249bc1346c7cfaac7ee49bebbebcf2a01f82a.zip |
Add ability to cleanup remote AP interactions
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-cleaner.ts | 194 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/actor-keys.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 10 |
3 files changed, 205 insertions, 1 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts new file mode 100644 index 000000000..b58bbc983 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -0,0 +1,194 @@ | |||
1 | import * as Bluebird from 'bluebird' | ||
2 | import * as Bull from 'bull' | ||
3 | import { checkUrlsSameHost } from '@server/helpers/activitypub' | ||
4 | import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' | ||
5 | import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' | ||
6 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | ||
7 | import { doRequest } from '@server/helpers/requests' | ||
8 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | ||
9 | import { VideoModel } from '@server/models/video/video' | ||
10 | import { VideoCommentModel } from '@server/models/video/video-comment' | ||
11 | import { VideoShareModel } from '@server/models/video/video-share' | ||
12 | import { HttpStatusCode } from '@shared/core-utils' | ||
13 | import { logger } from '../../../helpers/logger' | ||
14 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | ||
15 | |||
16 | // Job to clean remote interactions off local videos | ||
17 | |||
18 | async function processActivityPubCleaner (_job: Bull.Job) { | ||
19 | logger.info('Processing ActivityPub cleaner.') | ||
20 | |||
21 | { | ||
22 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() | ||
23 | const { bodyValidator, deleter, updater } = rateOptionsFactory() | ||
24 | |||
25 | await Bluebird.map(rateUrls, async rateUrl => { | ||
26 | try { | ||
27 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | ||
28 | |||
29 | if (result?.status === 'deleted') { | ||
30 | const { videoId, type } = result.data | ||
31 | |||
32 | await VideoModel.updateRatesOf(videoId, type, undefined) | ||
33 | } | ||
34 | } catch (err) { | ||
35 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) | ||
36 | } | ||
37 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
38 | } | ||
39 | |||
40 | { | ||
41 | const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() | ||
42 | const { bodyValidator, deleter, updater } = shareOptionsFactory() | ||
43 | |||
44 | await Bluebird.map(shareUrls, async shareUrl => { | ||
45 | try { | ||
46 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | ||
47 | } catch (err) { | ||
48 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) | ||
49 | } | ||
50 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
51 | } | ||
52 | |||
53 | { | ||
54 | const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() | ||
55 | const { bodyValidator, deleter, updater } = commentOptionsFactory() | ||
56 | |||
57 | await Bluebird.map(commentUrls, async commentUrl => { | ||
58 | try { | ||
59 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | ||
60 | } catch (err) { | ||
61 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) | ||
62 | } | ||
63 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
64 | } | ||
65 | } | ||
66 | |||
67 | // --------------------------------------------------------------------------- | ||
68 | |||
69 | export { | ||
70 | processActivityPubCleaner | ||
71 | } | ||
72 | |||
73 | // --------------------------------------------------------------------------- | ||
74 | |||
75 | async function updateObjectIfNeeded <T> ( | ||
76 | url: string, | ||
77 | bodyValidator: (body: any) => boolean, | ||
78 | updater: (url: string, newUrl: string) => Promise<T>, | ||
79 | deleter: (url: string) => Promise<T> | ||
80 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | ||
81 | // Fetch url | ||
82 | const { response, body } = await doRequest<any>({ | ||
83 | uri: url, | ||
84 | json: true, | ||
85 | activityPub: true | ||
86 | }) | ||
87 | |||
88 | // Does not exist anymore, remove entry | ||
89 | if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { | ||
90 | logger.info('Removing remote AP object %s.', url) | ||
91 | const data = await deleter(url) | ||
92 | |||
93 | return { status: 'deleted', data } | ||
94 | } | ||
95 | |||
96 | // If not same id, check same host and update | ||
97 | if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) | ||
98 | |||
99 | if (body.type === 'Tombstone') { | ||
100 | logger.info('Removing remote AP object %s.', url) | ||
101 | const data = await deleter(url) | ||
102 | |||
103 | return { status: 'deleted', data } | ||
104 | } | ||
105 | |||
106 | const newUrl = body.id | ||
107 | if (newUrl !== url) { | ||
108 | if (checkUrlsSameHost(newUrl, url) !== true) { | ||
109 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) | ||
110 | } | ||
111 | |||
112 | logger.info('Updating remote AP object %s.', url) | ||
113 | const data = await updater(url, newUrl) | ||
114 | |||
115 | return { status: 'updated', data } | ||
116 | } | ||
117 | |||
118 | return null | ||
119 | } | ||
120 | |||
121 | function rateOptionsFactory () { | ||
122 | return { | ||
123 | bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), | ||
124 | |||
125 | updater: async (url: string, newUrl: string) => { | ||
126 | const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | ||
127 | rate.url = newUrl | ||
128 | |||
129 | const videoId = rate.videoId | ||
130 | const type = rate.type | ||
131 | |||
132 | await rate.save() | ||
133 | |||
134 | return { videoId, type } | ||
135 | }, | ||
136 | |||
137 | deleter: async (url) => { | ||
138 | const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | ||
139 | |||
140 | const videoId = rate.videoId | ||
141 | const type = rate.type | ||
142 | |||
143 | await rate.destroy() | ||
144 | |||
145 | return { videoId, type } | ||
146 | } | ||
147 | } | ||
148 | } | ||
149 | |||
150 | function shareOptionsFactory () { | ||
151 | return { | ||
152 | bodyValidator: (body: any) => isShareActivityValid(body), | ||
153 | |||
154 | updater: async (url: string, newUrl: string) => { | ||
155 | const share = await VideoShareModel.loadByUrl(url, undefined) | ||
156 | share.url = newUrl | ||
157 | |||
158 | await share.save() | ||
159 | |||
160 | return undefined | ||
161 | }, | ||
162 | |||
163 | deleter: async (url) => { | ||
164 | const share = await VideoShareModel.loadByUrl(url, undefined) | ||
165 | |||
166 | await share.destroy() | ||
167 | |||
168 | return undefined | ||
169 | } | ||
170 | } | ||
171 | } | ||
172 | |||
173 | function commentOptionsFactory () { | ||
174 | return { | ||
175 | bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), | ||
176 | |||
177 | updater: async (url: string, newUrl: string) => { | ||
178 | const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | ||
179 | comment.url = newUrl | ||
180 | |||
181 | await comment.save() | ||
182 | |||
183 | return undefined | ||
184 | }, | ||
185 | |||
186 | deleter: async (url) => { | ||
187 | const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | ||
188 | |||
189 | await comment.destroy() | ||
190 | |||
191 | return undefined | ||
192 | } | ||
193 | } | ||
194 | } | ||
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 8da549640..125307843 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' | |||
6 | 6 | ||
7 | async function processActorKeys (job: Bull.Job) { | 7 | async function processActorKeys (job: Bull.Job) { |
8 | const payload = job.data as ActorKeysPayload | 8 | const payload = job.data as ActorKeysPayload |
9 | logger.info('Processing email in job %d.', job.id) | 9 | logger.info('Processing actor keys in job %d.', job.id) |
10 | 10 | ||
11 | const actor = await ActorModel.load(payload.actorId) | 11 | const actor = await ActorModel.load(payload.actorId) |
12 | 12 | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index efda2e038..42e8347b1 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -21,6 +21,7 @@ import { | |||
21 | import { logger } from '../../helpers/logger' | 21 | import { logger } from '../../helpers/logger' |
22 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 22 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
23 | import { Redis } from '../redis' | 23 | import { Redis } from '../redis' |
24 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | ||
24 | import { processActivityPubFollow } from './handlers/activitypub-follow' | 25 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
25 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 26 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
26 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 27 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
@@ -38,6 +39,7 @@ type CreateJobArgument = | |||
38 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 39 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
39 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 40 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
40 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | 41 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | |
42 | { type: 'activitypub-http-cleaner', payload: {} } | | ||
41 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | 43 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | |
42 | { type: 'video-file-import', payload: VideoFileImportPayload } | | 44 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
43 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | | 45 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | |
@@ -58,6 +60,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
58 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 60 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
59 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 61 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
60 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 62 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
63 | 'activitypub-cleaner': processActivityPubCleaner, | ||
61 | 'activitypub-follow': processActivityPubFollow, | 64 | 'activitypub-follow': processActivityPubFollow, |
62 | 'video-file-import': processVideoFileImport, | 65 | 'video-file-import': processVideoFileImport, |
63 | 'video-transcoding': processVideoTranscoding, | 66 | 'video-transcoding': processVideoTranscoding, |
@@ -75,6 +78,7 @@ const jobTypes: JobType[] = [ | |||
75 | 'activitypub-http-broadcast', | 78 | 'activitypub-http-broadcast', |
76 | 'activitypub-http-fetcher', | 79 | 'activitypub-http-fetcher', |
77 | 'activitypub-http-unicast', | 80 | 'activitypub-http-unicast', |
81 | 'activitypub-cleaner', | ||
78 | 'email', | 82 | 'email', |
79 | 'video-transcoding', | 83 | 'video-transcoding', |
80 | 'video-file-import', | 84 | 'video-file-import', |
@@ -233,6 +237,12 @@ class JobQueue { | |||
233 | this.queues['videos-views'].add({}, { | 237 | this.queues['videos-views'].add({}, { |
234 | repeat: REPEAT_JOBS['videos-views'] | 238 | repeat: REPEAT_JOBS['videos-views'] |
235 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 239 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
240 | |||
241 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | ||
242 | this.queues['activitypub-cleaner'].add({}, { | ||
243 | repeat: REPEAT_JOBS['activitypub-cleaner'] | ||
244 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | ||
245 | } | ||
236 | } | 246 | } |
237 | 247 | ||
238 | private filterJobTypes (jobType?: JobType) { | 248 | private filterJobTypes (jobType?: JobType) { |