aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/handlers/activitypub-http-fetcher.ts')
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts63
1 files changed, 63 insertions, 0 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
new file mode 100644
index 000000000..062211c85
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -0,0 +1,63 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ACTIVITY_PUB } from '../../../initializers'
5import { processActivities } from '../../activitypub/process'
6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
7
8export type ActivitypubHttpFetcherPayload = {
9 uris: string[]
10}
11
12async function processActivityPubHttpFetcher (job: kue.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14
15 const payload = job.data as ActivitypubHttpBroadcastPayload
16
17 const options = {
18 method: 'GET',
19 uri: '',
20 json: true,
21 activityPub: true
22 }
23
24 for (const uri of payload.uris) {
25 options.uri = uri
26 logger.info('Fetching ActivityPub data on %s.', uri)
27
28 const response = await doRequest(options)
29 const firstBody = response.body
30
31 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
32 const activities = firstBody.first.orderedItems
33
34 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
35
36 await processActivities(activities)
37 }
38
39 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
40 let i = 0
41 let nextLink = firstBody.first.next
42 while (nextLink && i < limit) {
43 options.uri = nextLink
44
45 const { body } = await doRequest(options)
46 nextLink = body.next
47 i++
48
49 if (Array.isArray(body.orderedItems)) {
50 const activities = body.orderedItems
51 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
52
53 await processActivities(activities)
54 }
55 }
56 }
57}
58
59// ---------------------------------------------------------------------------
60
61export {
62 processActivityPubHttpFetcher
63}