// ---------------------------------------------------------------------------
export {
+ PeerTubeRequestOptions,
+
doRequest,
doJSONRequest,
doRequestAndSaveToFile,
PROCESS_IMAGE: {
CONCURRENCY: 1,
MAX_THREADS: 5
+ },
+ SEQUENTIAL_HTTP_BROADCAST: {
+ CONCURRENCY: 1,
+ MAX_THREADS: 1
+ },
+ PARALLEL_HTTP_BROADCAST: {
+ CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
+ MAX_THREADS: 1
}
}
-import { map } from 'bluebird'
import { Job } from 'bullmq'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
+import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
import { ActivitypubHttpBroadcastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger'
-import { doRequest } from '../../../helpers/requests'
-import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
-async function processActivityPubHttpBroadcast (job: Job) {
+// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
+
+async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
logger.info('Processing ActivityPub broadcast in job %s.', job.id)
- const payload = job.data as ActivitypubHttpBroadcastPayload
+ const requestOptions = await buildRequestOptions(job.data)
- const body = await computeBody(payload)
- const httpSignatureOptions = await buildSignedRequestOptions(payload)
+ const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
- const options = {
- method: 'POST' as 'POST',
- json: body,
- httpSignature: httpSignatureOptions,
- headers: buildGlobalHeaders(body)
- }
+ return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
+}
+
+async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
+ logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
- const badUrls: string[] = []
- const goodUrls: string[] = []
+ const requestOptions = await buildRequestOptions(job.data)
- await map(payload.uris, async uri => {
- try {
- await doRequest(uri, options)
- goodUrls.push(uri)
- } catch (err) {
- logger.debug('HTTP broadcast to %s failed.', uri, { err })
- badUrls.push(uri)
- }
- }, { concurrency: BROADCAST_CONCURRENCY })
+ const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
}
// ---------------------------------------------------------------------------
export {
- processActivityPubHttpBroadcast
+ processActivityPubHttpSequentialBroadcast,
+ processActivityPubParallelHttpBroadcast
+}
+
+// ---------------------------------------------------------------------------
+
+async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
+ const body = await computeBody(payload)
+ const httpSignatureOptions = await buildSignedRequestOptions(payload)
+
+ return {
+ method: 'POST' as 'POST',
+ json: body,
+ httpSignature: httpSignatureOptions,
+ headers: buildGlobalHeaders(body)
+ }
}
import { Hooks } from '../plugins/hooks'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
-import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
+import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { refreshAPObject } from './handlers/activitypub-refresher'
}
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
- 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
- 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
+ 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
+ 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
import Piscina from 'piscina'
import { processImage } from '@server/helpers/image-utils'
import { WORKER_THREADS } from '@server/initializers/constants'
+import { httpBroadcast } from './workers/http-broadcast'
import { downloadImage } from './workers/image-downloader'
let downloadImageWorker: Piscina
return processImageWorker.run(options)
}
+// ---------------------------------------------------------------------------
+
+let parallelHTTPBroadcastWorker: Piscina
+
+function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
+ if (!parallelHTTPBroadcastWorker) {
+ parallelHTTPBroadcastWorker = new Piscina({
+ filename: join(__dirname, 'workers', 'http-broadcast.js'),
+ concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY,
+ maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS
+ })
+ }
+
+ return parallelHTTPBroadcastWorker.run(options)
+}
+
+// ---------------------------------------------------------------------------
+
+let sequentialHTTPBroadcastWorker: Piscina
+
+function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
+ if (!sequentialHTTPBroadcastWorker) {
+ sequentialHTTPBroadcastWorker = new Piscina({
+ filename: join(__dirname, 'workers', 'http-broadcast.js'),
+ concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY,
+ maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS
+ })
+ }
+
+ return sequentialHTTPBroadcastWorker.run(options)
+}
+
export {
downloadImageFromWorker,
- processImageFromWorker
+ processImageFromWorker,
+ parallelHTTPBroadcastFromWorker,
+ sequentialHTTPBroadcastFromWorker
}
--- /dev/null
+import { map } from 'bluebird'
+import { logger } from '@server/helpers/logger'
+import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests'
+import { BROADCAST_CONCURRENCY } from '@server/initializers/constants'
+
+async function httpBroadcast (payload: {
+ uris: string[]
+ requestOptions: PeerTubeRequestOptions
+}) {
+ const { uris, requestOptions } = payload
+
+ const badUrls: string[] = []
+ const goodUrls: string[] = []
+
+ await map(uris, async uri => {
+ try {
+ await doRequest(uri, requestOptions)
+ goodUrls.push(uri)
+ } catch (err) {
+ logger.debug('HTTP broadcast to %s failed.', uri, { err })
+ badUrls.push(uri)
+ }
+ }, { concurrency: BROADCAST_CONCURRENCY })
+
+ return { goodUrls, badUrls }
+}
+
+module.exports = httpBroadcast
+
+export {
+ httpBroadcast
+}