X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Factivitypub-http-broadcast.ts;h=733c1378a7af3a3b33041c749047e0db924595b1;hb=405c83f9af377a663a4c8e9ad025fd5c10496922;hp=c69ff9e83fd2dcf5a920e51c47ad55a7d15ae66e;hpb=903353d67a8d0fdda8465ed6c57b77a9a5afbe92;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index c69ff9e83..733c1378a 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,42 +1,49 @@ -import * as Bluebird from 'bluebird' -import * as Bull from 'bull' +import { Job } from 'bullmq' +import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' +import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' +import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' import { ActivitypubHttpBroadcastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { BROADCAST_CONCURRENCY, REQUEST_TIMEOUT } from '../../../initializers/constants' -import { ActorFollowScoreCache } from '../../files-cache' -import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' -async function processActivityPubHttpBroadcast (job: Bull.Job) { - logger.info('Processing ActivityPub broadcast in job %d.', job.id) +// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive - const payload = job.data as ActivitypubHttpBroadcastPayload +async function processActivityPubHttpSequentialBroadcast (job: Job) { + logger.info('Processing ActivityPub broadcast in job %s.', job.id) - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) + const requestOptions = await buildRequestOptions(job.data) - const options = { - method: 'POST' as 'POST', - json: body, - httpSignature: httpSignatureOptions, - timeout: REQUEST_TIMEOUT, - headers: buildGlobalHeaders(body) - } + const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) + + return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) +} - const badUrls: string[] = [] - const goodUrls: string[] = [] +async function processActivityPubParallelHttpBroadcast (job: Job) { + logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id) - await Bluebird.map(payload.uris, uri => { - return doRequest(uri, options) - .then(() => goodUrls.push(uri)) - .catch(() => badUrls.push(uri)) - }, { concurrency: BROADCAST_CONCURRENCY }) + const requestOptions = await buildRequestOptions(job.data) - return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) + const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) + + return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) } // --------------------------------------------------------------------------- export { - processActivityPubHttpBroadcast + processActivityPubHttpSequentialBroadcast, + processActivityPubParallelHttpBroadcast +} + +// --------------------------------------------------------------------------- + +async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) { + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + return { + method: 'POST' as 'POST', + json: body, + httpSignature: httpSignatureOptions, + headers: buildGlobalHeaders(body) + } }