aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts37
1 files changed, 24 insertions, 13 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index d5e4508fe..1540bf23a 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -8,7 +8,8 @@ import {
8} from '@server/helpers/custom-validators/activitypub/activity' 8} from '@server/helpers/custom-validators/activitypub/activity'
9import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' 9import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
10import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' 10import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
11import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' 11import { AP_CLEANER } from '@server/initializers/constants'
12import { Redis } from '@server/lib/redis'
12import { VideoModel } from '@server/models/video/video' 13import { VideoModel } from '@server/models/video/video'
13import { VideoCommentModel } from '@server/models/video/video-comment' 14import { VideoCommentModel } from '@server/models/video/video-comment'
14import { VideoShareModel } from '@server/models/video/video-share' 15import { VideoShareModel } from '@server/models/video/video-share'
@@ -27,7 +28,7 @@ async function processActivityPubCleaner (_job: Job) {
27 28
28 await map(rateUrls, async rateUrl => { 29 await map(rateUrls, async rateUrl => {
29 try { 30 try {
30 const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) 31 const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
31 32
32 if (result?.status === 'deleted') { 33 if (result?.status === 'deleted') {
33 const { videoId, type } = result.data 34 const { videoId, type } = result.data
@@ -37,7 +38,7 @@ async function processActivityPubCleaner (_job: Job) {
37 } catch (err) { 38 } catch (err) {
38 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) 39 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
39 } 40 }
40 }, { concurrency: AP_CLEANER_CONCURRENCY }) 41 }, { concurrency: AP_CLEANER.CONCURRENCY })
41 } 42 }
42 43
43 { 44 {
@@ -46,11 +47,11 @@ async function processActivityPubCleaner (_job: Job) {
46 47
47 await map(shareUrls, async shareUrl => { 48 await map(shareUrls, async shareUrl => {
48 try { 49 try {
49 await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) 50 await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
50 } catch (err) { 51 } catch (err) {
51 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) 52 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
52 } 53 }
53 }, { concurrency: AP_CLEANER_CONCURRENCY }) 54 }, { concurrency: AP_CLEANER.CONCURRENCY })
54 } 55 }
55 56
56 { 57 {
@@ -59,11 +60,11 @@ async function processActivityPubCleaner (_job: Job) {
59 60
60 await map(commentUrls, async commentUrl => { 61 await map(commentUrls, async commentUrl => {
61 try { 62 try {
62 await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) 63 await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
63 } catch (err) { 64 } catch (err) {
64 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) 65 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
65 } 66 }
66 }, { concurrency: AP_CLEANER_CONCURRENCY }) 67 }, { concurrency: AP_CLEANER.CONCURRENCY })
67 } 68 }
68} 69}
69 70
@@ -75,12 +76,14 @@ export {
75 76
76// --------------------------------------------------------------------------- 77// ---------------------------------------------------------------------------
77 78
78async function updateObjectIfNeeded <T> ( 79async function updateObjectIfNeeded <T> (options: {
79 url: string, 80 url: string
80 bodyValidator: (body: any) => boolean, 81 bodyValidator: (body: any) => boolean
81 updater: (url: string, newUrl: string) => Promise<T>, 82 updater: (url: string, newUrl: string) => Promise<T>
82 deleter: (url: string) => Promise<T> 83 deleter: (url: string) => Promise<T> }
83): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { 84): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
85 const { url, bodyValidator, updater, deleter } = options
86
84 const on404OrTombstone = async () => { 87 const on404OrTombstone = async () => {
85 logger.info('Removing remote AP object %s.', url) 88 logger.info('Removing remote AP object %s.', url)
86 const data = await deleter(url) 89 const data = await deleter(url)
@@ -117,7 +120,15 @@ async function updateObjectIfNeeded <T> (
117 return on404OrTombstone() 120 return on404OrTombstone()
118 } 121 }
119 122
120 throw err 123 logger.debug('Remote AP object %s is unavailable.', url)
124
125 const unavailability = await Redis.Instance.addAPUnavailability(url)
126 if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
127 logger.info('Removing unavailable AP resource %s.', url)
128 return on404OrTombstone()
129 }
130
131 return null
121 } 132 }
122} 133}
123 134