import { Job } from 'bullmq'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
-import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
+import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
import { ActivitypubHttpBroadcastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger'
const requestOptions = await buildRequestOptions(job.data)
- const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
+ const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
}
import { join } from 'path'
import Piscina from 'piscina'
import { processImage } from '@server/helpers/image-utils'
-import { WORKER_THREADS } from '@server/initializers/constants'
+import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants'
import { httpBroadcast } from './workers/http-broadcast'
import { downloadImage } from './workers/image-downloader'
if (!parallelHTTPBroadcastWorker) {
parallelHTTPBroadcastWorker = new Piscina({
filename: join(__dirname, 'workers', 'http-broadcast.js'),
- concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY,
- maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS
+ // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
+ concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
+ maxThreads: 1
})
}
if (!sequentialHTTPBroadcastWorker) {
sequentialHTTPBroadcastWorker = new Piscina({
filename: join(__dirname, 'workers', 'http-broadcast.js'),
- concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY,
- maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS
+ // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
+ concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'],
+ maxThreads: 1
})
}