1 import * as kue from 'kue'
2 import { logger } from '../../../helpers/logger'
3 import { doRequest } from '../../../helpers/requests'
4 import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers'
5 import { processActivities } from '../../activitypub/process'
6 import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
8 export type ActivitypubHttpFetcherPayload = {
12 async function processActivityPubHttpFetcher (job: kue.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
15 const payload = job.data as ActivitypubHttpBroadcastPayload
22 timeout: JOB_REQUEST_TIMEOUT
25 for (const uri of payload.uris) {
27 logger.info('Fetching ActivityPub data on %s.', uri)
29 const response = await doRequest(options)
30 const firstBody = response.body
32 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
33 const activities = firstBody.first.orderedItems
35 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
37 await processActivities(activities)
40 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
42 let nextLink = firstBody.first.next
43 while (nextLink && i < limit) {
44 options.uri = nextLink
46 const { body } = await doRequest(options)
50 if (Array.isArray(body.orderedItems)) {
51 const activities = body.orderedItems
52 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
54 await processActivities(activities)
60 // ---------------------------------------------------------------------------
63 processActivityPubHttpFetcher