diff options
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-cleaner.ts | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index d5e4508fe..1540bf23a 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -8,7 +8,8 @@ import { | |||
8 | } from '@server/helpers/custom-validators/activitypub/activity' | 8 | } from '@server/helpers/custom-validators/activitypub/activity' |
9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | 9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' |
10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' | 10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' |
11 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | 11 | import { AP_CLEANER } from '@server/initializers/constants' |
12 | import { Redis } from '@server/lib/redis' | ||
12 | import { VideoModel } from '@server/models/video/video' | 13 | import { VideoModel } from '@server/models/video/video' |
13 | import { VideoCommentModel } from '@server/models/video/video-comment' | 14 | import { VideoCommentModel } from '@server/models/video/video-comment' |
14 | import { VideoShareModel } from '@server/models/video/video-share' | 15 | import { VideoShareModel } from '@server/models/video/video-share' |
@@ -27,7 +28,7 @@ async function processActivityPubCleaner (_job: Job) { | |||
27 | 28 | ||
28 | await map(rateUrls, async rateUrl => { | 29 | await map(rateUrls, async rateUrl => { |
29 | try { | 30 | try { |
30 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | 31 | const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) |
31 | 32 | ||
32 | if (result?.status === 'deleted') { | 33 | if (result?.status === 'deleted') { |
33 | const { videoId, type } = result.data | 34 | const { videoId, type } = result.data |
@@ -37,7 +38,7 @@ async function processActivityPubCleaner (_job: Job) { | |||
37 | } catch (err) { | 38 | } catch (err) { |
38 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) | 39 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) |
39 | } | 40 | } |
40 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 41 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
41 | } | 42 | } |
42 | 43 | ||
43 | { | 44 | { |
@@ -46,11 +47,11 @@ async function processActivityPubCleaner (_job: Job) { | |||
46 | 47 | ||
47 | await map(shareUrls, async shareUrl => { | 48 | await map(shareUrls, async shareUrl => { |
48 | try { | 49 | try { |
49 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | 50 | await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) |
50 | } catch (err) { | 51 | } catch (err) { |
51 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) | 52 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) |
52 | } | 53 | } |
53 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 54 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
54 | } | 55 | } |
55 | 56 | ||
56 | { | 57 | { |
@@ -59,11 +60,11 @@ async function processActivityPubCleaner (_job: Job) { | |||
59 | 60 | ||
60 | await map(commentUrls, async commentUrl => { | 61 | await map(commentUrls, async commentUrl => { |
61 | try { | 62 | try { |
62 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | 63 | await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) |
63 | } catch (err) { | 64 | } catch (err) { |
64 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) | 65 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) |
65 | } | 66 | } |
66 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 67 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
67 | } | 68 | } |
68 | } | 69 | } |
69 | 70 | ||
@@ -75,12 +76,14 @@ export { | |||
75 | 76 | ||
76 | // --------------------------------------------------------------------------- | 77 | // --------------------------------------------------------------------------- |
77 | 78 | ||
78 | async function updateObjectIfNeeded <T> ( | 79 | async function updateObjectIfNeeded <T> (options: { |
79 | url: string, | 80 | url: string |
80 | bodyValidator: (body: any) => boolean, | 81 | bodyValidator: (body: any) => boolean |
81 | updater: (url: string, newUrl: string) => Promise<T>, | 82 | updater: (url: string, newUrl: string) => Promise<T> |
82 | deleter: (url: string) => Promise<T> | 83 | deleter: (url: string) => Promise<T> } |
83 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | 84 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { |
85 | const { url, bodyValidator, updater, deleter } = options | ||
86 | |||
84 | const on404OrTombstone = async () => { | 87 | const on404OrTombstone = async () => { |
85 | logger.info('Removing remote AP object %s.', url) | 88 | logger.info('Removing remote AP object %s.', url) |
86 | const data = await deleter(url) | 89 | const data = await deleter(url) |
@@ -117,7 +120,15 @@ async function updateObjectIfNeeded <T> ( | |||
117 | return on404OrTombstone() | 120 | return on404OrTombstone() |
118 | } | 121 | } |
119 | 122 | ||
120 | throw err | 123 | logger.debug('Remote AP object %s is unavailable.', url) |
124 | |||
125 | const unavailability = await Redis.Instance.addAPUnavailability(url) | ||
126 | if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { | ||
127 | logger.info('Removing unavailable AP resource %s.', url) | ||
128 | return on404OrTombstone() | ||
129 | } | ||
130 | |||
131 | return null | ||
121 | } | 132 | } |
122 | } | 133 | } |
123 | 134 | ||