1 import { logger } from '../../../helpers'
2 import { buildSignedActivity } from '../../../helpers/activitypub'
3 import { doRequest } from '../../../helpers/requests'
4 import { database as db } from '../../../initializers'
5 import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler'
6 import { processActivities } from '../../activitypub/process/process'
7 import { ACTIVITY_PUB } from '../../../initializers/constants'
9 async function process (payload: ActivityPubHttpPayload, jobId: number) {
10 logger.info('Processing ActivityPub fetcher in job %d.', jobId)
18 for (const uri of payload.uris) {
20 logger.info('Fetching ActivityPub data on %s.', uri)
22 const response = await doRequest(options)
23 const firstBody = response.body
25 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
26 const activities = firstBody.first.orderedItems
28 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
30 await processActivities(activities)
33 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
35 let nextLink = firstBody.first.next
36 while (nextLink && i < limit) {
37 options.uri = nextLink
39 const { body } = await doRequest(options)
43 if (Array.isArray(body.orderedItems)) {
44 const activities = body.orderedItems
45 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
47 await processActivities(activities)
53 function onError (err: Error, jobId: number) {
54 logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
55 return Promise.resolve()
58 function onSuccess (jobId: number) {
59 logger.info('Job %d is a success.', jobId)
60 return Promise.resolve()
63 // ---------------------------------------------------------------------------