aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/initializers/constants.ts15
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts37
-rw-r--r--server/lib/redis.ts25
-rw-r--r--server/tests/api/activitypub/cleaner.ts53
4 files changed, 110 insertions, 20 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 7816561fd..c899812a6 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -19,7 +19,7 @@ import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type'
19import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model' 19import { VideoPlaylistPrivacy } from '../../shared/models/videos/playlist/video-playlist-privacy.model'
20import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model' 20import { VideoPlaylistType } from '../../shared/models/videos/playlist/video-playlist-type.model'
21// Do not use barrels, remain constants as independent as possible 21// Do not use barrels, remain constants as independent as possible
22import { isTestInstance, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' 22import { isTestInstance, parseDurationToMs, sanitizeHost, sanitizeUrl } from '../helpers/core-utils'
23import { CONFIG, registerConfigChangedHandler } from './config' 23import { CONFIG, registerConfigChangedHandler } from './config'
24 24
25// --------------------------------------------------------------------------- 25// ---------------------------------------------------------------------------
@@ -200,8 +200,14 @@ const JOB_PRIORITY = {
200} 200}
201 201
202const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job 202const BROADCAST_CONCURRENCY = 30 // How many requests in parallel we do in activitypub-http-broadcast job
203const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job
204const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) 203const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...)
204
205const AP_CLEANER = {
206 CONCURRENCY: 10, // How many requests in parallel we do in activitypub-cleaner job
207 UNAVAILABLE_TRESHOLD: 3, // How many attemps we do before removing an unavailable remote resource
208 PERIOD: parseDurationToMs('1 week') // /!\ Has to be sync with REPEAT_JOBS
209}
210
205const REQUEST_TIMEOUTS = { 211const REQUEST_TIMEOUTS = {
206 DEFAULT: 7000, // 7 seconds 212 DEFAULT: 7000, // 7 seconds
207 FILE: 30000, // 30 seconds 213 FILE: 30000, // 30 seconds
@@ -796,8 +802,11 @@ if (isTestInstance() === true) {
796 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000 802 SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES = 5000
797 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000 803 SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS = 5000
798 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000 804 SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION = 2000
805
799 REPEAT_JOBS['videos-views-stats'] = { every: 5000 } 806 REPEAT_JOBS['videos-views-stats'] = { every: 5000 }
807
800 REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } 808 REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
809 AP_CLEANER.PERIOD = 5000
801 810
802 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 811 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
803 812
@@ -858,7 +867,7 @@ export {
858 REDUNDANCY, 867 REDUNDANCY,
859 JOB_CONCURRENCY, 868 JOB_CONCURRENCY,
860 JOB_ATTEMPTS, 869 JOB_ATTEMPTS,
861 AP_CLEANER_CONCURRENCY, 870 AP_CLEANER,
862 LAST_MIGRATION_VERSION, 871 LAST_MIGRATION_VERSION,
863 OAUTH_LIFETIME, 872 OAUTH_LIFETIME,
864 CUSTOM_HTML_TAG_COMMENTS, 873 CUSTOM_HTML_TAG_COMMENTS,
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index d5e4508fe..1540bf23a 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -8,7 +8,8 @@ import {
8} from '@server/helpers/custom-validators/activitypub/activity' 8} from '@server/helpers/custom-validators/activitypub/activity'
9import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' 9import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
10import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' 10import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
11import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' 11import { AP_CLEANER } from '@server/initializers/constants'
12import { Redis } from '@server/lib/redis'
12import { VideoModel } from '@server/models/video/video' 13import { VideoModel } from '@server/models/video/video'
13import { VideoCommentModel } from '@server/models/video/video-comment' 14import { VideoCommentModel } from '@server/models/video/video-comment'
14import { VideoShareModel } from '@server/models/video/video-share' 15import { VideoShareModel } from '@server/models/video/video-share'
@@ -27,7 +28,7 @@ async function processActivityPubCleaner (_job: Job) {
27 28
28 await map(rateUrls, async rateUrl => { 29 await map(rateUrls, async rateUrl => {
29 try { 30 try {
30 const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) 31 const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
31 32
32 if (result?.status === 'deleted') { 33 if (result?.status === 'deleted') {
33 const { videoId, type } = result.data 34 const { videoId, type } = result.data
@@ -37,7 +38,7 @@ async function processActivityPubCleaner (_job: Job) {
37 } catch (err) { 38 } catch (err) {
38 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) 39 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
39 } 40 }
40 }, { concurrency: AP_CLEANER_CONCURRENCY }) 41 }, { concurrency: AP_CLEANER.CONCURRENCY })
41 } 42 }
42 43
43 { 44 {
@@ -46,11 +47,11 @@ async function processActivityPubCleaner (_job: Job) {
46 47
47 await map(shareUrls, async shareUrl => { 48 await map(shareUrls, async shareUrl => {
48 try { 49 try {
49 await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) 50 await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
50 } catch (err) { 51 } catch (err) {
51 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) 52 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
52 } 53 }
53 }, { concurrency: AP_CLEANER_CONCURRENCY }) 54 }, { concurrency: AP_CLEANER.CONCURRENCY })
54 } 55 }
55 56
56 { 57 {
@@ -59,11 +60,11 @@ async function processActivityPubCleaner (_job: Job) {
59 60
60 await map(commentUrls, async commentUrl => { 61 await map(commentUrls, async commentUrl => {
61 try { 62 try {
62 await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) 63 await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
63 } catch (err) { 64 } catch (err) {
64 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) 65 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
65 } 66 }
66 }, { concurrency: AP_CLEANER_CONCURRENCY }) 67 }, { concurrency: AP_CLEANER.CONCURRENCY })
67 } 68 }
68} 69}
69 70
@@ -75,12 +76,14 @@ export {
75 76
76// --------------------------------------------------------------------------- 77// ---------------------------------------------------------------------------
77 78
78async function updateObjectIfNeeded <T> ( 79async function updateObjectIfNeeded <T> (options: {
79 url: string, 80 url: string
80 bodyValidator: (body: any) => boolean, 81 bodyValidator: (body: any) => boolean
81 updater: (url: string, newUrl: string) => Promise<T>, 82 updater: (url: string, newUrl: string) => Promise<T>
82 deleter: (url: string) => Promise<T> 83 deleter: (url: string) => Promise<T> }
83): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { 84): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
85 const { url, bodyValidator, updater, deleter } = options
86
84 const on404OrTombstone = async () => { 87 const on404OrTombstone = async () => {
85 logger.info('Removing remote AP object %s.', url) 88 logger.info('Removing remote AP object %s.', url)
86 const data = await deleter(url) 89 const data = await deleter(url)
@@ -117,7 +120,15 @@ async function updateObjectIfNeeded <T> (
117 return on404OrTombstone() 120 return on404OrTombstone()
118 } 121 }
119 122
120 throw err 123 logger.debug('Remote AP object %s is unavailable.', url)
124
125 const unavailability = await Redis.Instance.addAPUnavailability(url)
126 if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
127 logger.info('Removing unavailable AP resource %s.', url)
128 return on404OrTombstone()
129 }
130
131 return null
121 } 132 }
122} 133}
123 134
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 9ef9d7702..4dcbcddb5 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -1,9 +1,11 @@
1import { createClient } from 'redis' 1import { createClient } from 'redis'
2import { exists } from '@server/helpers/custom-validators/misc' 2import { exists } from '@server/helpers/custom-validators/misc'
3import { sha256 } from '@shared/extra-utils'
3import { logger } from '../helpers/logger' 4import { logger } from '../helpers/logger'
4import { generateRandomString } from '../helpers/utils' 5import { generateRandomString } from '../helpers/utils'
5import { CONFIG } from '../initializers/config' 6import { CONFIG } from '../initializers/config'
6import { 7import {
8 AP_CLEANER,
7 CONTACT_FORM_LIFETIME, 9 CONTACT_FORM_LIFETIME,
8 RESUMABLE_UPLOAD_SESSION_LIFETIME, 10 RESUMABLE_UPLOAD_SESSION_LIFETIME,
9 TRACKER_RATE_LIMITS, 11 TRACKER_RATE_LIMITS,
@@ -260,6 +262,17 @@ class Redis {
260 return this.deleteKey('resumable-upload-' + uploadId) 262 return this.deleteKey('resumable-upload-' + uploadId)
261 } 263 }
262 264
265 /* ************ AP ressource unavailability ************ */
266
267 async addAPUnavailability (url: string) {
268 const key = this.generateAPUnavailabilityKey(url)
269
270 const value = await this.increment(key)
271 await this.setExpiration(key, AP_CLEANER.PERIOD * 2)
272
273 return value
274 }
275
263 /* ************ Keys generation ************ */ 276 /* ************ Keys generation ************ */
264 277
265 private generateLocalVideoViewsKeys (videoId?: Number) { 278 private generateLocalVideoViewsKeys (videoId?: Number) {
@@ -298,6 +311,10 @@ class Redis {
298 return 'contact-form-' + ip 311 return 'contact-form-' + ip
299 } 312 }
300 313
314 private generateAPUnavailabilityKey (url: string) {
315 return 'ap-unavailability-' + sha256(url)
316 }
317
301 /* ************ Redis helpers ************ */ 318 /* ************ Redis helpers ************ */
302 319
303 private getValue (key: string) { 320 private getValue (key: string) {
@@ -330,10 +347,6 @@ class Redis {
330 return this.client.del(this.prefix + key) 347 return this.client.del(this.prefix + key)
331 } 348 }
332 349
333 private getObject (key: string) {
334 return this.client.hGetAll(this.prefix + key)
335 }
336
337 private increment (key: string) { 350 private increment (key: string) {
338 return this.client.incr(this.prefix + key) 351 return this.client.incr(this.prefix + key)
339 } 352 }
@@ -342,6 +355,10 @@ class Redis {
342 return this.client.exists(this.prefix + key) 355 return this.client.exists(this.prefix + key)
343 } 356 }
344 357
358 private setExpiration (key: string, ms: number) {
359 return this.client.expire(this.prefix + key, ms / 1000)
360 }
361
345 static get Instance () { 362 static get Instance () {
346 return this.instance || (this.instance = new this()) 363 return this.instance || (this.instance = new this())
347 } 364 }
diff --git a/server/tests/api/activitypub/cleaner.ts b/server/tests/api/activitypub/cleaner.ts
index 7a443b553..dc885023a 100644
--- a/server/tests/api/activitypub/cleaner.ts
+++ b/server/tests/api/activitypub/cleaner.ts
@@ -270,6 +270,59 @@ describe('Test AP cleaner', function () {
270 await checkRemote('kyle') 270 await checkRemote('kyle')
271 }) 271 })
272 272
273 it('Should remove unavailable remote resources', async function () {
274 this.timeout(240000)
275
276 async function expectNotDeleted () {
277 {
278 const video = await servers[0].videos.get({ id: uuid })
279
280 expect(video.likes).to.equal(3)
281 expect(video.dislikes).to.equal(0)
282 }
283
284 {
285 const { total } = await servers[0].comments.listThreads({ videoId: uuid })
286 expect(total).to.equal(3)
287 }
288 }
289
290 async function expectDeleted () {
291 {
292 const video = await servers[0].videos.get({ id: uuid })
293
294 expect(video.likes).to.equal(3)
295 expect(video.dislikes).to.equal(0)
296 }
297
298 {
299 const { total } = await servers[0].comments.listThreads({ videoId: videoUUID1 })
300 expect(total).to.equal(2)
301 }
302 }
303
304 const uuid = (await servers[0].videos.quickUpload({ name: 'server 1 video 2' })).uuid
305
306 await waitJobs(servers)
307
308 for (const server of servers) {
309 await server.videos.rate({ id: uuid, rating: 'like' })
310 await server.comments.createThread({ videoId: uuid, text: 'comment' })
311 }
312
313 await waitJobs(servers)
314
315 await expectNotDeleted()
316
317 await servers[1].kill()
318
319 await wait(5000)
320 await expectNotDeleted()
321
322 await wait(15000)
323 await expectDeleted()
324 })
325
273 after(async function () { 326 after(async function () {
274 await cleanupTests(servers) 327 await cleanupTests(servers)
275 }) 328 })