1 import { Job } from 'bullmq'
2 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4 import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5 import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6 import { logger } from '../../../helpers/logger'
8 // Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
10 async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
13 const requestOptions = await buildRequestOptions(job.data)
15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
20 async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
23 const requestOptions = await buildRequestOptions(job.data)
25 const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
30 // ---------------------------------------------------------------------------
33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
37 // ---------------------------------------------------------------------------
39 async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions(payload)
44 method: 'POST' as 'POST',
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)