diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/initializers/constants.ts | 8 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 4 | ||||
-rw-r--r-- | server/lib/worker/parent-process.ts | 12 |
3 files changed, 9 insertions, 15 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 7039ab457..eb54781b6 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -786,14 +786,6 @@ const WORKER_THREADS = { | |||
786 | PROCESS_IMAGE: { | 786 | PROCESS_IMAGE: { |
787 | CONCURRENCY: 1, | 787 | CONCURRENCY: 1, |
788 | MAX_THREADS: 5 | 788 | MAX_THREADS: 5 |
789 | }, | ||
790 | SEQUENTIAL_HTTP_BROADCAST: { | ||
791 | CONCURRENCY: 1, | ||
792 | MAX_THREADS: 1 | ||
793 | }, | ||
794 | PARALLEL_HTTP_BROADCAST: { | ||
795 | CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'], | ||
796 | MAX_THREADS: 1 | ||
797 | } | 789 | } |
798 | } | 790 | } |
799 | 791 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 733c1378a..57ecf0acc 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' | 2 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' |
3 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | 3 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' |
4 | import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' | 4 | import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' |
5 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | 5 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' |
6 | import { logger } from '../../../helpers/logger' | 6 | import { logger } from '../../../helpers/logger' |
7 | 7 | ||
@@ -22,7 +22,7 @@ async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttp | |||
22 | 22 | ||
23 | const requestOptions = await buildRequestOptions(job.data) | 23 | const requestOptions = await buildRequestOptions(job.data) |
24 | 24 | ||
25 | const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) | 25 | const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) |
26 | 26 | ||
27 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) | 27 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) |
28 | } | 28 | } |
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts index 7d4102047..48b6c682b 100644 --- a/server/lib/worker/parent-process.ts +++ b/server/lib/worker/parent-process.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { join } from 'path' | 1 | import { join } from 'path' |
2 | import Piscina from 'piscina' | 2 | import Piscina from 'piscina' |
3 | import { processImage } from '@server/helpers/image-utils' | 3 | import { processImage } from '@server/helpers/image-utils' |
4 | import { WORKER_THREADS } from '@server/initializers/constants' | 4 | import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants' |
5 | import { httpBroadcast } from './workers/http-broadcast' | 5 | import { httpBroadcast } from './workers/http-broadcast' |
6 | import { downloadImage } from './workers/image-downloader' | 6 | import { downloadImage } from './workers/image-downloader' |
7 | 7 | ||
@@ -43,8 +43,9 @@ function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadca | |||
43 | if (!parallelHTTPBroadcastWorker) { | 43 | if (!parallelHTTPBroadcastWorker) { |
44 | parallelHTTPBroadcastWorker = new Piscina({ | 44 | parallelHTTPBroadcastWorker = new Piscina({ |
45 | filename: join(__dirname, 'workers', 'http-broadcast.js'), | 45 | filename: join(__dirname, 'workers', 'http-broadcast.js'), |
46 | concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY, | 46 | // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs |
47 | maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS | 47 | concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'], |
48 | maxThreads: 1 | ||
48 | }) | 49 | }) |
49 | } | 50 | } |
50 | 51 | ||
@@ -59,8 +60,9 @@ function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroad | |||
59 | if (!sequentialHTTPBroadcastWorker) { | 60 | if (!sequentialHTTPBroadcastWorker) { |
60 | sequentialHTTPBroadcastWorker = new Piscina({ | 61 | sequentialHTTPBroadcastWorker = new Piscina({ |
61 | filename: join(__dirname, 'workers', 'http-broadcast.js'), | 62 | filename: join(__dirname, 'workers', 'http-broadcast.js'), |
62 | concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY, | 63 | // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs |
63 | maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS | 64 | concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'], |
65 | maxThreads: 1 | ||
64 | }) | 66 | }) |
65 | } | 67 | } |
66 | 68 | ||