From 405c83f9af377a663a4c8e9ad025fd5c10496922 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 8 Sep 2022 12:26:46 +0200 Subject: Use worker thread to send HTTP requests Compute HTTP signature could be CPU intensive --- .../handlers/activitypub-http-broadcast.ts | 54 ++++++++++++---------- server/lib/job-queue/job-queue.ts | 6 +-- 2 files changed, 32 insertions(+), 28 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 13eff5211..733c1378a 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,39 +1,28 @@ -import { map } from 'bluebird' import { Job } from 'bullmq' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' +import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' import { ActivitypubHttpBroadcastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' -async function processActivityPubHttpBroadcast (job: Job) { +// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive + +async function processActivityPubHttpSequentialBroadcast (job: Job) { logger.info('Processing ActivityPub broadcast in job %s.', job.id) - const payload = job.data as ActivitypubHttpBroadcastPayload + const requestOptions = await buildRequestOptions(job.data) - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) + const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) - const options = { - method: 'POST' as 'POST', - json: body, - httpSignature: httpSignatureOptions, - headers: buildGlobalHeaders(body) - } + return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) +} + +async function processActivityPubParallelHttpBroadcast (job: Job) { + logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id) - const badUrls: string[] = [] - const goodUrls: string[] = [] + const requestOptions = await buildRequestOptions(job.data) - await map(payload.uris, async uri => { - try { - await doRequest(uri, options) - goodUrls.push(uri) - } catch (err) { - logger.debug('HTTP broadcast to %s failed.', uri, { err }) - badUrls.push(uri) - } - }, { concurrency: BROADCAST_CONCURRENCY }) + const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) } @@ -41,5 +30,20 @@ async function processActivityPubHttpBroadcast (job: Job) { // --------------------------------------------------------------------------- export { - processActivityPubHttpBroadcast + processActivityPubHttpSequentialBroadcast, + processActivityPubParallelHttpBroadcast +} + +// --------------------------------------------------------------------------- + +async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) { + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + return { + method: 'POST' as 'POST', + json: body, + httpSignature: httpSignatureOptions, + headers: buildGlobalHeaders(body) + } } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0fcaaf466..e54d12acd 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -45,7 +45,7 @@ import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_ import { Hooks } from '../plugins/hooks' import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubFollow } from './handlers/activitypub-follow' -import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { refreshAPObject } from './handlers/activitypub-refresher' @@ -96,8 +96,8 @@ export type CreateJobOptions = { } const handlers: { [id in JobType]: (job: Job) => Promise } = { - 'activitypub-http-broadcast': processActivityPubHttpBroadcast, - 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, + 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, + 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, -- cgit v1.2.3