]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Merge branch 'release/4.3.0' into develop
authorChocobozzz <me@florianbigard.com>
Wed, 16 Nov 2022 13:40:34 +0000 (14:40 +0100)
committerChocobozzz <me@florianbigard.com>
Wed, 16 Nov 2022 13:40:34 +0000 (14:40 +0100)
server/initializers/constants.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/worker/parent-process.ts

index 65940f4ae8c83a456112d0448c3b0a83f48db5fd..3908bbf057b07457012ecbfcc6100bdafb5ac494 100644 (file)
@@ -828,14 +828,6 @@ const WORKER_THREADS = {
   PROCESS_IMAGE: {
     CONCURRENCY: 1,
     MAX_THREADS: 5
-  },
-  SEQUENTIAL_HTTP_BROADCAST: {
-    CONCURRENCY: 1,
-    MAX_THREADS: 1
-  },
-  PARALLEL_HTTP_BROADCAST: {
-    CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
-    MAX_THREADS: 1
   }
 }
 
index 733c1378a7af3a3b33041c749047e0db924595b1..57ecf0acc41793b4eb87a526f652d6b6cb6ba53e 100644 (file)
@@ -1,7 +1,7 @@
 import { Job } from 'bullmq'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
-import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
+import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
 import { ActivitypubHttpBroadcastPayload } from '@shared/models'
 import { logger } from '../../../helpers/logger'
 
@@ -22,7 +22,7 @@ async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttp
 
   const requestOptions = await buildRequestOptions(job.data)
 
-  const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
+  const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
 
   return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
 }
index 7d410204786ab5fca790b6e198af13d370cfc978..48b6c682be75536468fccbd3864b79a7423ff8ed 100644 (file)
@@ -1,7 +1,7 @@
 import { join } from 'path'
 import Piscina from 'piscina'
 import { processImage } from '@server/helpers/image-utils'
-import { WORKER_THREADS } from '@server/initializers/constants'
+import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants'
 import { httpBroadcast } from './workers/http-broadcast'
 import { downloadImage } from './workers/image-downloader'
 
@@ -43,8 +43,9 @@ function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadca
   if (!parallelHTTPBroadcastWorker) {
     parallelHTTPBroadcastWorker = new Piscina({
       filename: join(__dirname, 'workers', 'http-broadcast.js'),
-      concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY,
-      maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS
+      // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
+      concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
+      maxThreads: 1
     })
   }
 
@@ -59,8 +60,9 @@ function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroad
   if (!sequentialHTTPBroadcastWorker) {
     sequentialHTTPBroadcastWorker = new Piscina({
       filename: join(__dirname, 'workers', 'http-broadcast.js'),
-      concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY,
-      maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS
+      // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
+      concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'],
+      maxThreads: 1
     })
   }