aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts44
1 files changed, 3 insertions, 41 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 4683beb2f..10c0e606f 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,9 +1,9 @@
1import * as kue from 'kue' 1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers'
5import { processActivities } from '../../activitypub/process' 3import { processActivities } from '../../activitypub/process'
6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' 4import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
5import { crawlCollectionPage } from '../../activitypub/crawl'
6import { Activity } from '../../../../shared/models/activitypub'
7 7
8export type ActivitypubHttpFetcherPayload = { 8export type ActivitypubHttpFetcherPayload = {
9 uris: string[] 9 uris: string[]
@@ -14,46 +14,8 @@ async function processActivityPubHttpFetcher (job: kue.Job) {
14 14
15 const payload = job.data as ActivitypubHttpBroadcastPayload 15 const payload = job.data as ActivitypubHttpBroadcastPayload
16 16
17 const options = {
18 method: 'GET',
19 uri: '',
20 json: true,
21 activityPub: true,
22 timeout: JOB_REQUEST_TIMEOUT
23 }
24
25 for (const uri of payload.uris) { 17 for (const uri of payload.uris) {
26 options.uri = uri 18 await crawlCollectionPage<Activity>(uri, (items) => processActivities(items))
27 logger.info('Fetching ActivityPub data on %s.', uri)
28
29 const response = await doRequest(options)
30 const firstBody = response.body
31
32 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
33 const activities = firstBody.first.orderedItems
34
35 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
36
37 await processActivities(activities)
38 }
39
40 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
41 let i = 0
42 let nextLink = firstBody.first.next
43 while (nextLink && i < limit) {
44 options.uri = nextLink
45
46 const { body } = await doRequest(options)
47 nextLink = body.next
48 i++
49
50 if (Array.isArray(body.orderedItems)) {
51 const activities = body.orderedItems
52 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
53
54 await processActivities(activities)
55 }
56 }
57 } 19 }
58} 20}
59 21