aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-02-26 16:26:27 +0100
committerChocobozzz <me@florianbigard.com>2021-03-03 10:08:08 +0100
commit74d249bc1346c7cfaac7ee49bebbebcf2a01f82a (patch)
treed47bd163ae57ed8f15b445296634cc04f4f67b6f /server/lib/job-queue
parent095e2258043fcff8a79ab082d11edfbd8f13a8e2 (diff)
downloadPeerTube-74d249bc1346c7cfaac7ee49bebbebcf2a01f82a.tar.gz
PeerTube-74d249bc1346c7cfaac7ee49bebbebcf2a01f82a.tar.zst
PeerTube-74d249bc1346c7cfaac7ee49bebbebcf2a01f82a.zip
Add ability to cleanup remote AP interactions
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts194
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts2
-rw-r--r--server/lib/job-queue/job-queue.ts10
3 files changed, 205 insertions, 1 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
new file mode 100644
index 000000000..b58bbc983
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -0,0 +1,194 @@
1import * as Bluebird from 'bluebird'
2import * as Bull from 'bull'
3import { checkUrlsSameHost } from '@server/helpers/activitypub'
4import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate'
5import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share'
6import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
7import { doRequest } from '@server/helpers/requests'
8import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants'
9import { VideoModel } from '@server/models/video/video'
10import { VideoCommentModel } from '@server/models/video/video-comment'
11import { VideoShareModel } from '@server/models/video/video-share'
12import { HttpStatusCode } from '@shared/core-utils'
13import { logger } from '../../../helpers/logger'
14import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
15
16// Job to clean remote interactions off local videos
17
18async function processActivityPubCleaner (_job: Bull.Job) {
19 logger.info('Processing ActivityPub cleaner.')
20
21 {
22 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
23 const { bodyValidator, deleter, updater } = rateOptionsFactory()
24
25 await Bluebird.map(rateUrls, async rateUrl => {
26 try {
27 const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
28
29 if (result?.status === 'deleted') {
30 const { videoId, type } = result.data
31
32 await VideoModel.updateRatesOf(videoId, type, undefined)
33 }
34 } catch (err) {
35 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
36 }
37 }, { concurrency: AP_CLEANER_CONCURRENCY })
38 }
39
40 {
41 const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
42 const { bodyValidator, deleter, updater } = shareOptionsFactory()
43
44 await Bluebird.map(shareUrls, async shareUrl => {
45 try {
46 await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter)
47 } catch (err) {
48 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
49 }
50 }, { concurrency: AP_CLEANER_CONCURRENCY })
51 }
52
53 {
54 const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
55 const { bodyValidator, deleter, updater } = commentOptionsFactory()
56
57 await Bluebird.map(commentUrls, async commentUrl => {
58 try {
59 await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter)
60 } catch (err) {
61 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
62 }
63 }, { concurrency: AP_CLEANER_CONCURRENCY })
64 }
65}
66
67// ---------------------------------------------------------------------------
68
69export {
70 processActivityPubCleaner
71}
72
73// ---------------------------------------------------------------------------
74
75async function updateObjectIfNeeded <T> (
76 url: string,
77 bodyValidator: (body: any) => boolean,
78 updater: (url: string, newUrl: string) => Promise<T>,
79 deleter: (url: string) => Promise<T>
80): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
81 // Fetch url
82 const { response, body } = await doRequest<any>({
83 uri: url,
84 json: true,
85 activityPub: true
86 })
87
88 // Does not exist anymore, remove entry
89 if (response.statusCode === HttpStatusCode.NOT_FOUND_404) {
90 logger.info('Removing remote AP object %s.', url)
91 const data = await deleter(url)
92
93 return { status: 'deleted', data }
94 }
95
96 // If not same id, check same host and update
97 if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
98
99 if (body.type === 'Tombstone') {
100 logger.info('Removing remote AP object %s.', url)
101 const data = await deleter(url)
102
103 return { status: 'deleted', data }
104 }
105
106 const newUrl = body.id
107 if (newUrl !== url) {
108 if (checkUrlsSameHost(newUrl, url) !== true) {
109 throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
110 }
111
112 logger.info('Updating remote AP object %s.', url)
113 const data = await updater(url, newUrl)
114
115 return { status: 'updated', data }
116 }
117
118 return null
119}
120
121function rateOptionsFactory () {
122 return {
123 bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body),
124
125 updater: async (url: string, newUrl: string) => {
126 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
127 rate.url = newUrl
128
129 const videoId = rate.videoId
130 const type = rate.type
131
132 await rate.save()
133
134 return { videoId, type }
135 },
136
137 deleter: async (url) => {
138 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
139
140 const videoId = rate.videoId
141 const type = rate.type
142
143 await rate.destroy()
144
145 return { videoId, type }
146 }
147 }
148}
149
150function shareOptionsFactory () {
151 return {
152 bodyValidator: (body: any) => isShareActivityValid(body),
153
154 updater: async (url: string, newUrl: string) => {
155 const share = await VideoShareModel.loadByUrl(url, undefined)
156 share.url = newUrl
157
158 await share.save()
159
160 return undefined
161 },
162
163 deleter: async (url) => {
164 const share = await VideoShareModel.loadByUrl(url, undefined)
165
166 await share.destroy()
167
168 return undefined
169 }
170 }
171}
172
173function commentOptionsFactory () {
174 return {
175 bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body),
176
177 updater: async (url: string, newUrl: string) => {
178 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
179 comment.url = newUrl
180
181 await comment.save()
182
183 return undefined
184 },
185
186 deleter: async (url) => {
187 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
188
189 await comment.destroy()
190
191 return undefined
192 }
193 }
194}
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 8da549640..125307843 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger'
6 6
7async function processActorKeys (job: Bull.Job) { 7async function processActorKeys (job: Bull.Job) {
8 const payload = job.data as ActorKeysPayload 8 const payload = job.data as ActorKeysPayload
9 logger.info('Processing email in job %d.', job.id) 9 logger.info('Processing actor keys in job %d.', job.id)
10 10
11 const actor = await ActorModel.load(payload.actorId) 11 const actor = await ActorModel.load(payload.actorId)
12 12
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index efda2e038..42e8347b1 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -21,6 +21,7 @@ import {
21import { logger } from '../../helpers/logger' 21import { logger } from '../../helpers/logger'
22import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 22import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
23import { Redis } from '../redis' 23import { Redis } from '../redis'
24import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
24import { processActivityPubFollow } from './handlers/activitypub-follow' 25import { processActivityPubFollow } from './handlers/activitypub-follow'
25import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 26import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
26import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 27import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
@@ -38,6 +39,7 @@ type CreateJobArgument =
38 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 39 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
39 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 40 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
40 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | 41 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
42 { type: 'activitypub-http-cleaner', payload: {} } |
41 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | 43 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
42 { type: 'video-file-import', payload: VideoFileImportPayload } | 44 { type: 'video-file-import', payload: VideoFileImportPayload } |
43 { type: 'video-transcoding', payload: VideoTranscodingPayload } | 45 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
@@ -58,6 +60,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
58 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 60 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
59 'activitypub-http-unicast': processActivityPubHttpUnicast, 61 'activitypub-http-unicast': processActivityPubHttpUnicast,
60 'activitypub-http-fetcher': processActivityPubHttpFetcher, 62 'activitypub-http-fetcher': processActivityPubHttpFetcher,
63 'activitypub-cleaner': processActivityPubCleaner,
61 'activitypub-follow': processActivityPubFollow, 64 'activitypub-follow': processActivityPubFollow,
62 'video-file-import': processVideoFileImport, 65 'video-file-import': processVideoFileImport,
63 'video-transcoding': processVideoTranscoding, 66 'video-transcoding': processVideoTranscoding,
@@ -75,6 +78,7 @@ const jobTypes: JobType[] = [
75 'activitypub-http-broadcast', 78 'activitypub-http-broadcast',
76 'activitypub-http-fetcher', 79 'activitypub-http-fetcher',
77 'activitypub-http-unicast', 80 'activitypub-http-unicast',
81 'activitypub-cleaner',
78 'email', 82 'email',
79 'video-transcoding', 83 'video-transcoding',
80 'video-file-import', 84 'video-file-import',
@@ -233,6 +237,12 @@ class JobQueue {
233 this.queues['videos-views'].add({}, { 237 this.queues['videos-views'].add({}, {
234 repeat: REPEAT_JOBS['videos-views'] 238 repeat: REPEAT_JOBS['videos-views']
235 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 239 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
240
241 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
242 this.queues['activitypub-cleaner'].add({}, {
243 repeat: REPEAT_JOBS['activitypub-cleaner']
244 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
245 }
236 } 246 }
237 247
238 private filterJobTypes (jobType?: JobType) { 248 private filterJobTypes (jobType?: JobType) {