]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/handlers/activitypub-http-broadcast.ts
Bumped to version v5.2.1
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / activitypub-http-broadcast.ts
CommitLineData
5a921e7b 1import { Job } from 'bullmq'
a219c910 2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
9db437c8 3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
f240fb4b 4import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
dcd75f78 5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
da854ddd 6import { logger } from '../../../helpers/logger'
94a5ff8a 7
405c83f9
C
8// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
9
10async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
bd911b54 11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
94a5ff8a 12
405c83f9 13 const requestOptions = await buildRequestOptions(job.data)
afffe988 14
405c83f9 15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
afffe988 16
405c83f9
C
17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
18}
19
20async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
afffe988 22
405c83f9 23 const requestOptions = await buildRequestOptions(job.data)
60650c77 24
f240fb4b 25 const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
60650c77 26
9db437c8 27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
afffe988
C
28}
29
afffe988
C
30// ---------------------------------------------------------------------------
31
32export {
405c83f9
C
33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
35}
36
37// ---------------------------------------------------------------------------
38
39async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions(payload)
42
43 return {
44 method: 'POST' as 'POST',
45 json: body,
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)
48 }
afffe988 49}