]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/job-queue/handlers/activitypub-http-broadcast.ts
Bumped to version v5.2.1
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / activitypub-http-broadcast.ts
1 import { Job } from 'bullmq'
2 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4 import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5 import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6 import { logger } from '../../../helpers/logger'
7
8 // Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
9
10 async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12
13 const requestOptions = await buildRequestOptions(job.data)
14
15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
16
17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
18 }
19
20 async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
22
23 const requestOptions = await buildRequestOptions(job.data)
24
25 const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
26
27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
28 }
29
30 // ---------------------------------------------------------------------------
31
32 export {
33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
35 }
36
37 // ---------------------------------------------------------------------------
38
39 async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions(payload)
42
43 return {
44 method: 'POST' as 'POST',
45 json: body,
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)
48 }
49 }