]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame_incremental - server/lib/job-queue/handlers/activitypub-http-broadcast.ts
Bumped to version v5.2.1
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / activitypub-http-broadcast.ts
... / ...
CommitLineData
1import { Job } from 'bullmq'
2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6import { logger } from '../../../helpers/logger'
7
8// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
9
10async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12
13 const requestOptions = await buildRequestOptions(job.data)
14
15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
16
17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
18}
19
20async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
22
23 const requestOptions = await buildRequestOptions(job.data)
24
25 const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
26
27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
28}
29
30// ---------------------------------------------------------------------------
31
32export {
33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
35}
36
37// ---------------------------------------------------------------------------
38
39async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions(payload)
42
43 return {
44 method: 'POST' as 'POST',
45 json: body,
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)
48 }
49}