diff options
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-cleaner.ts | 194 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/actor-keys.ts | 2 |
2 files changed, 195 insertions, 1 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts new file mode 100644 index 000000000..b58bbc983 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -0,0 +1,194 @@ | |||
1 | import * as Bluebird from 'bluebird' | ||
2 | import * as Bull from 'bull' | ||
3 | import { checkUrlsSameHost } from '@server/helpers/activitypub' | ||
4 | import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' | ||
5 | import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' | ||
6 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | ||
7 | import { doRequest } from '@server/helpers/requests' | ||
8 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | ||
9 | import { VideoModel } from '@server/models/video/video' | ||
10 | import { VideoCommentModel } from '@server/models/video/video-comment' | ||
11 | import { VideoShareModel } from '@server/models/video/video-share' | ||
12 | import { HttpStatusCode } from '@shared/core-utils' | ||
13 | import { logger } from '../../../helpers/logger' | ||
14 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | ||
15 | |||
16 | // Job to clean remote interactions off local videos | ||
17 | |||
18 | async function processActivityPubCleaner (_job: Bull.Job) { | ||
19 | logger.info('Processing ActivityPub cleaner.') | ||
20 | |||
21 | { | ||
22 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() | ||
23 | const { bodyValidator, deleter, updater } = rateOptionsFactory() | ||
24 | |||
25 | await Bluebird.map(rateUrls, async rateUrl => { | ||
26 | try { | ||
27 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | ||
28 | |||
29 | if (result?.status === 'deleted') { | ||
30 | const { videoId, type } = result.data | ||
31 | |||
32 | await VideoModel.updateRatesOf(videoId, type, undefined) | ||
33 | } | ||
34 | } catch (err) { | ||
35 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) | ||
36 | } | ||
37 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
38 | } | ||
39 | |||
40 | { | ||
41 | const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() | ||
42 | const { bodyValidator, deleter, updater } = shareOptionsFactory() | ||
43 | |||
44 | await Bluebird.map(shareUrls, async shareUrl => { | ||
45 | try { | ||
46 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | ||
47 | } catch (err) { | ||
48 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) | ||
49 | } | ||
50 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
51 | } | ||
52 | |||
53 | { | ||
54 | const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() | ||
55 | const { bodyValidator, deleter, updater } = commentOptionsFactory() | ||
56 | |||
57 | await Bluebird.map(commentUrls, async commentUrl => { | ||
58 | try { | ||
59 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | ||
60 | } catch (err) { | ||
61 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) | ||
62 | } | ||
63 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
64 | } | ||
65 | } | ||
66 | |||
67 | // --------------------------------------------------------------------------- | ||
68 | |||
69 | export { | ||
70 | processActivityPubCleaner | ||
71 | } | ||
72 | |||
73 | // --------------------------------------------------------------------------- | ||
74 | |||
75 | async function updateObjectIfNeeded <T> ( | ||
76 | url: string, | ||
77 | bodyValidator: (body: any) => boolean, | ||
78 | updater: (url: string, newUrl: string) => Promise<T>, | ||
79 | deleter: (url: string) => Promise<T> | ||
80 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | ||
81 | // Fetch url | ||
82 | const { response, body } = await doRequest<any>({ | ||
83 | uri: url, | ||
84 | json: true, | ||
85 | activityPub: true | ||
86 | }) | ||
87 | |||
88 | // Does not exist anymore, remove entry | ||
89 | if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { | ||
90 | logger.info('Removing remote AP object %s.', url) | ||
91 | const data = await deleter(url) | ||
92 | |||
93 | return { status: 'deleted', data } | ||
94 | } | ||
95 | |||
96 | // If not same id, check same host and update | ||
97 | if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) | ||
98 | |||
99 | if (body.type === 'Tombstone') { | ||
100 | logger.info('Removing remote AP object %s.', url) | ||
101 | const data = await deleter(url) | ||
102 | |||
103 | return { status: 'deleted', data } | ||
104 | } | ||
105 | |||
106 | const newUrl = body.id | ||
107 | if (newUrl !== url) { | ||
108 | if (checkUrlsSameHost(newUrl, url) !== true) { | ||
109 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) | ||
110 | } | ||
111 | |||
112 | logger.info('Updating remote AP object %s.', url) | ||
113 | const data = await updater(url, newUrl) | ||
114 | |||
115 | return { status: 'updated', data } | ||
116 | } | ||
117 | |||
118 | return null | ||
119 | } | ||
120 | |||
121 | function rateOptionsFactory () { | ||
122 | return { | ||
123 | bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), | ||
124 | |||
125 | updater: async (url: string, newUrl: string) => { | ||
126 | const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | ||
127 | rate.url = newUrl | ||
128 | |||
129 | const videoId = rate.videoId | ||
130 | const type = rate.type | ||
131 | |||
132 | await rate.save() | ||
133 | |||
134 | return { videoId, type } | ||
135 | }, | ||
136 | |||
137 | deleter: async (url) => { | ||
138 | const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | ||
139 | |||
140 | const videoId = rate.videoId | ||
141 | const type = rate.type | ||
142 | |||
143 | await rate.destroy() | ||
144 | |||
145 | return { videoId, type } | ||
146 | } | ||
147 | } | ||
148 | } | ||
149 | |||
150 | function shareOptionsFactory () { | ||
151 | return { | ||
152 | bodyValidator: (body: any) => isShareActivityValid(body), | ||
153 | |||
154 | updater: async (url: string, newUrl: string) => { | ||
155 | const share = await VideoShareModel.loadByUrl(url, undefined) | ||
156 | share.url = newUrl | ||
157 | |||
158 | await share.save() | ||
159 | |||
160 | return undefined | ||
161 | }, | ||
162 | |||
163 | deleter: async (url) => { | ||
164 | const share = await VideoShareModel.loadByUrl(url, undefined) | ||
165 | |||
166 | await share.destroy() | ||
167 | |||
168 | return undefined | ||
169 | } | ||
170 | } | ||
171 | } | ||
172 | |||
173 | function commentOptionsFactory () { | ||
174 | return { | ||
175 | bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), | ||
176 | |||
177 | updater: async (url: string, newUrl: string) => { | ||
178 | const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | ||
179 | comment.url = newUrl | ||
180 | |||
181 | await comment.save() | ||
182 | |||
183 | return undefined | ||
184 | }, | ||
185 | |||
186 | deleter: async (url) => { | ||
187 | const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | ||
188 | |||
189 | await comment.destroy() | ||
190 | |||
191 | return undefined | ||
192 | } | ||
193 | } | ||
194 | } | ||
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 8da549640..125307843 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' | |||
6 | 6 | ||
7 | async function processActorKeys (job: Bull.Job) { | 7 | async function processActorKeys (job: Bull.Job) { |
8 | const payload = job.data as ActorKeysPayload | 8 | const payload = job.data as ActorKeysPayload |
9 | logger.info('Processing email in job %d.', job.id) | 9 | logger.info('Processing actor keys in job %d.', job.id) |
10 | 10 | ||
11 | const actor = await ActorModel.load(payload.actorId) | 11 | const actor = await ActorModel.load(payload.actorId) |
12 | 12 | ||