diff options
-rw-r--r-- | server/initializers/constants.ts | 15 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-cleaner.ts | 37 | ||||
-rw-r--r-- | server/lib/redis.ts | 25 | ||||
-rw-r--r-- | server/tests/api/activitypub/cleaner.ts | 53 |
4 files changed, 110 insertions, 20 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 7816561fd..c899812a6 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -19,7 +19,7 @@ import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type' | |||
19 | import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model' | 19 | import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model' |
20 | import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model' | 20 | import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model' |
21 | // Do not use barrels, remain constants as independent as possible | 21 | // Do not use barrels, remain constants as independent as possible |
22 | import { isTestInstance, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' | 22 | import { isTestInstance, parseDurationToMs, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' |
23 | import { CONFIG, registerConfigChangedHandler } from './config' | 23 | import { CONFIG, registerConfigChangedHandler } from './config' |
24 | 24 | ||
25 | // --------------------------------------------------------------------------- | 25 | // --------------------------------------------------------------------------- |
@@ -200,8 +200,14 @@ const JOB_PRIORITY = { | |||
200 | } | 200 | } |
201 | 201 | ||
202 | const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job | 202 | const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job |
203 | const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job | ||
204 | const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) | 203 | const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) |
204 | |||
205 | const AP_CLEANER = { | ||
206 | CONCURRENCY: 10, // How many requests in parallel we do in activitypub-cleaner job | ||
207 | UNAVAILABLE_TRESHOLD: 3, // How many attemps we do before removing an unavailable remote resource | ||
208 | PERIOD: parseDurationToMs('1 week') // /!\ Has to be sync with REPEAT_JOBS | ||
209 | } | ||
210 | |||
205 | const REQUEST_TIMEOUTS = { | 211 | const REQUEST_TIMEOUTS = { |
206 | DEFAULT: 7000, // 7 seconds | 212 | DEFAULT: 7000, // 7 seconds |
207 | FILE: 30000, // 30 seconds | 213 | FILE: 30000, // 30 seconds |
@@ -796,8 +802,11 @@ if (isTestInstance() === true) { | |||
796 | SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 | 802 | SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 |
797 | SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 | 803 | SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 |
798 | SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 | 804 | SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 |
805 | |||
799 | REPEAT_JOBS['videos-views-stats'] = { every: 5000 } | 806 | REPEAT_JOBS['videos-views-stats'] = { every: 5000 } |
807 | |||
800 | REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } | 808 | REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } |
809 | AP_CLEANER.PERIOD = 5000 | ||
801 | 810 | ||
802 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 | 811 | REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 |
803 | 812 | ||
@@ -858,7 +867,7 @@ export { | |||
858 | REDUNDANCY, | 867 | REDUNDANCY, |
859 | JOB_CONCURRENCY, | 868 | JOB_CONCURRENCY, |
860 | JOB_ATTEMPTS, | 869 | JOB_ATTEMPTS, |
861 | AP_CLEANER_CONCURRENCY, | 870 | AP_CLEANER, |
862 | LAST_MIGRATION_VERSION, | 871 | LAST_MIGRATION_VERSION, |
863 | OAUTH_LIFETIME, | 872 | OAUTH_LIFETIME, |
864 | CUSTOM_HTML_TAG_COMMENTS, | 873 | CUSTOM_HTML_TAG_COMMENTS, |
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index d5e4508fe..1540bf23a 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -8,7 +8,8 @@ import { | |||
8 | } from '@server/helpers/custom-validators/activitypub/activity' | 8 | } from '@server/helpers/custom-validators/activitypub/activity' |
9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | 9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' |
10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' | 10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' |
11 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | 11 | import { AP_CLEANER } from '@server/initializers/constants' |
12 | import { Redis } from '@server/lib/redis' | ||
12 | import { VideoModel } from '@server/models/video/video' | 13 | import { VideoModel } from '@server/models/video/video' |
13 | import { VideoCommentModel } from '@server/models/video/video-comment' | 14 | import { VideoCommentModel } from '@server/models/video/video-comment' |
14 | import { VideoShareModel } from '@server/models/video/video-share' | 15 | import { VideoShareModel } from '@server/models/video/video-share' |
@@ -27,7 +28,7 @@ async function processActivityPubCleaner (_job: Job) { | |||
27 | 28 | ||
28 | await map(rateUrls, async rateUrl => { | 29 | await map(rateUrls, async rateUrl => { |
29 | try { | 30 | try { |
30 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | 31 | const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) |
31 | 32 | ||
32 | if (result?.status === 'deleted') { | 33 | if (result?.status === 'deleted') { |
33 | const { videoId, type } = result.data | 34 | const { videoId, type } = result.data |
@@ -37,7 +38,7 @@ async function processActivityPubCleaner (_job: Job) { | |||
37 | } catch (err) { | 38 | } catch (err) { |
38 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) | 39 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) |
39 | } | 40 | } |
40 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 41 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
41 | } | 42 | } |
42 | 43 | ||
43 | { | 44 | { |
@@ -46,11 +47,11 @@ async function processActivityPubCleaner (_job: Job) { | |||
46 | 47 | ||
47 | await map(shareUrls, async shareUrl => { | 48 | await map(shareUrls, async shareUrl => { |
48 | try { | 49 | try { |
49 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | 50 | await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) |
50 | } catch (err) { | 51 | } catch (err) { |
51 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) | 52 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) |
52 | } | 53 | } |
53 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 54 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
54 | } | 55 | } |
55 | 56 | ||
56 | { | 57 | { |
@@ -59,11 +60,11 @@ async function processActivityPubCleaner (_job: Job) { | |||
59 | 60 | ||
60 | await map(commentUrls, async commentUrl => { | 61 | await map(commentUrls, async commentUrl => { |
61 | try { | 62 | try { |
62 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | 63 | await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) |
63 | } catch (err) { | 64 | } catch (err) { |
64 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) | 65 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) |
65 | } | 66 | } |
66 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 67 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
67 | } | 68 | } |
68 | } | 69 | } |
69 | 70 | ||
@@ -75,12 +76,14 @@ export { | |||
75 | 76 | ||
76 | // --------------------------------------------------------------------------- | 77 | // --------------------------------------------------------------------------- |
77 | 78 | ||
78 | async function updateObjectIfNeeded <T> ( | 79 | async function updateObjectIfNeeded <T> (options: { |
79 | url: string, | 80 | url: string |
80 | bodyValidator: (body: any) => boolean, | 81 | bodyValidator: (body: any) => boolean |
81 | updater: (url: string, newUrl: string) => Promise<T>, | 82 | updater: (url: string, newUrl: string) => Promise<T> |
82 | deleter: (url: string) => Promise<T> | 83 | deleter: (url: string) => Promise<T> } |
83 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | 84 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { |
85 | const { url, bodyValidator, updater, deleter } = options | ||
86 | |||
84 | const on404OrTombstone = async () => { | 87 | const on404OrTombstone = async () => { |
85 | logger.info('Removing remote AP object %s.', url) | 88 | logger.info('Removing remote AP object %s.', url) |
86 | const data = await deleter(url) | 89 | const data = await deleter(url) |
@@ -117,7 +120,15 @@ async function updateObjectIfNeeded <T> ( | |||
117 | return on404OrTombstone() | 120 | return on404OrTombstone() |
118 | } | 121 | } |
119 | 122 | ||
120 | throw err | 123 | logger.debug('Remote AP object %s is unavailable.', url) |
124 | |||
125 | const unavailability = await Redis.Instance.addAPUnavailability(url) | ||
126 | if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { | ||
127 | logger.info('Removing unavailable AP resource %s.', url) | ||
128 | return on404OrTombstone() | ||
129 | } | ||
130 | |||
131 | return null | ||
121 | } | 132 | } |
122 | } | 133 | } |
123 | 134 | ||
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 9ef9d7702..4dcbcddb5 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -1,9 +1,11 @@ | |||
1 | import { createClient } from 'redis' | 1 | import { createClient } from 'redis' |
2 | import { exists } from '@server/helpers/custom-validators/misc' | 2 | import { exists } from '@server/helpers/custom-validators/misc' |
3 | import { sha256 } from '@shared/extra-utils' | ||
3 | import { logger } from '../helpers/logger' | 4 | import { logger } from '../helpers/logger' |
4 | import { generateRandomString } from '../helpers/utils' | 5 | import { generateRandomString } from '../helpers/utils' |
5 | import { CONFIG } from '../initializers/config' | 6 | import { CONFIG } from '../initializers/config' |
6 | import { | 7 | import { |
8 | AP_CLEANER, | ||
7 | CONTACT_FORM_LIFETIME, | 9 | CONTACT_FORM_LIFETIME, |
8 | RESUMABLE_UPLOAD_SESSION_LIFETIME, | 10 | RESUMABLE_UPLOAD_SESSION_LIFETIME, |
9 | TRACKER_RATE_LIMITS, | 11 | TRACKER_RATE_LIMITS, |
@@ -260,6 +262,17 @@ class Redis { | |||
260 | return this.deleteKey('resumable-upload-' + uploadId) | 262 | return this.deleteKey('resumable-upload-' + uploadId) |
261 | } | 263 | } |
262 | 264 | ||
265 | /* ************ AP ressource unavailability ************ */ | ||
266 | |||
267 | async addAPUnavailability (url: string) { | ||
268 | const key = this.generateAPUnavailabilityKey(url) | ||
269 | |||
270 | const value = await this.increment(key) | ||
271 | await this.setExpiration(key, AP_CLEANER.PERIOD * 2) | ||
272 | |||
273 | return value | ||
274 | } | ||
275 | |||
263 | /* ************ Keys generation ************ */ | 276 | /* ************ Keys generation ************ */ |
264 | 277 | ||
265 | private generateLocalVideoViewsKeys (videoId?: Number) { | 278 | private generateLocalVideoViewsKeys (videoId?: Number) { |
@@ -298,6 +311,10 @@ class Redis { | |||
298 | return 'contact-form-' + ip | 311 | return 'contact-form-' + ip |
299 | } | 312 | } |
300 | 313 | ||
314 | private generateAPUnavailabilityKey (url: string) { | ||
315 | return 'ap-unavailability-' + sha256(url) | ||
316 | } | ||
317 | |||
301 | /* ************ Redis helpers ************ */ | 318 | /* ************ Redis helpers ************ */ |
302 | 319 | ||
303 | private getValue (key: string) { | 320 | private getValue (key: string) { |
@@ -330,10 +347,6 @@ class Redis { | |||
330 | return this.client.del(this.prefix + key) | 347 | return this.client.del(this.prefix + key) |
331 | } | 348 | } |
332 | 349 | ||
333 | private getObject (key: string) { | ||
334 | return this.client.hGetAll(this.prefix + key) | ||
335 | } | ||
336 | |||
337 | private increment (key: string) { | 350 | private increment (key: string) { |
338 | return this.client.incr(this.prefix + key) | 351 | return this.client.incr(this.prefix + key) |
339 | } | 352 | } |
@@ -342,6 +355,10 @@ class Redis { | |||
342 | return this.client.exists(this.prefix + key) | 355 | return this.client.exists(this.prefix + key) |
343 | } | 356 | } |
344 | 357 | ||
358 | private setExpiration (key: string, ms: number) { | ||
359 | return this.client.expire(this.prefix + key, ms / 1000) | ||
360 | } | ||
361 | |||
345 | static get Instance () { | 362 | static get Instance () { |
346 | return this.instance || (this.instance = new this()) | 363 | return this.instance || (this.instance = new this()) |
347 | } | 364 | } |
diff --git a/server/tests/api/activitypub/cleaner.ts b/server/tests/api/activitypub/cleaner.ts index 7a443b553..dc885023a 100644 --- a/server/tests/api/activitypub/cleaner.ts +++ b/server/tests/api/activitypub/cleaner.ts | |||
@@ -270,6 +270,59 @@ describe('Test AP cleaner', function () { | |||
270 | await checkRemote('kyle') | 270 | await checkRemote('kyle') |
271 | }) | 271 | }) |
272 | 272 | ||
273 | it('Should remove unavailable remote resources', async function () { | ||
274 | this.timeout(240000) | ||
275 | |||
276 | async function expectNotDeleted () { | ||
277 | { | ||
278 | const video = await servers[0].videos.get({ id: uuid }) | ||
279 | |||
280 | expect(video.likes).to.equal(3) | ||
281 | expect(video.dislikes).to.equal(0) | ||
282 | } | ||
283 | |||
284 | { | ||
285 | const { total } = await servers[0].comments.listThreads({ videoId: uuid }) | ||
286 | expect(total).to.equal(3) | ||
287 | } | ||
288 | } | ||
289 | |||
290 | async function expectDeleted () { | ||
291 | { | ||
292 | const video = await servers[0].videos.get({ id: uuid }) | ||
293 | |||
294 | expect(video.likes).to.equal(3) | ||
295 | expect(video.dislikes).to.equal(0) | ||
296 | } | ||
297 | |||
298 | { | ||
299 | const { total } = await servers[0].comments.listThreads({ videoId: videoUUID1 }) | ||
300 | expect(total).to.equal(2) | ||
301 | } | ||
302 | } | ||
303 | |||
304 | const uuid = (await servers[0].videos.quickUpload({ name: 'server 1 video 2' })).uuid | ||
305 | |||
306 | await waitJobs(servers) | ||
307 | |||
308 | for (const server of servers) { | ||
309 | await server.videos.rate({ id: uuid, rating: 'like' }) | ||
310 | await server.comments.createThread({ videoId: uuid, text: 'comment' }) | ||
311 | } | ||
312 | |||
313 | await waitJobs(servers) | ||
314 | |||
315 | await expectNotDeleted() | ||
316 | |||
317 | await servers[1].kill() | ||
318 | |||
319 | await wait(5000) | ||
320 | await expectNotDeleted() | ||
321 | |||
322 | await wait(15000) | ||
323 | await expectDeleted() | ||
324 | }) | ||
325 | |||
273 | after(async function () { | 326 | after(async function () { |
274 | await cleanupTests(servers) | 327 | await cleanupTests(servers) |
275 | }) | 328 | }) |