aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-11-16 14:40:34 +0100
committerChocobozzz <me@florianbigard.com>2022-11-16 14:40:34 +0100
commitb1dbb9fefc870a90b25f5c0153589f45c9e75e3e (patch)
treeb48b9bb27bca807099e0a4b01f1e61dcfa4dfcca /server
parente57a840ede1f0eadec209e58401008f232249300 (diff)
parentf240fb4bea61484854cecbe711ac20d77c31915d (diff)
downloadPeerTube-b1dbb9fefc870a90b25f5c0153589f45c9e75e3e.tar.gz
PeerTube-b1dbb9fefc870a90b25f5c0153589f45c9e75e3e.tar.zst
PeerTube-b1dbb9fefc870a90b25f5c0153589f45c9e75e3e.zip
Merge branch 'release/4.3.0' into develop
Diffstat (limited to 'server')
-rw-r--r--server/initializers/constants.ts8
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts4
-rw-r--r--server/lib/worker/parent-process.ts12
3 files changed, 9 insertions, 15 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 65940f4ae..3908bbf05 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -828,14 +828,6 @@ const WORKER_THREADS = {
828 PROCESS_IMAGE: { 828 PROCESS_IMAGE: {
829 CONCURRENCY: 1, 829 CONCURRENCY: 1,
830 MAX_THREADS: 5 830 MAX_THREADS: 5
831 },
832 SEQUENTIAL_HTTP_BROADCAST: {
833 CONCURRENCY: 1,
834 MAX_THREADS: 1
835 },
836 PARALLEL_HTTP_BROADCAST: {
837 CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
838 MAX_THREADS: 1
839 } 831 }
840} 832}
841 833
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 733c1378a..57ecf0acc 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,7 +1,7 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' 3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' 4import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models' 5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6import { logger } from '../../../helpers/logger' 6import { logger } from '../../../helpers/logger'
7 7
@@ -22,7 +22,7 @@ async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttp
22 22
23 const requestOptions = await buildRequestOptions(job.data) 23 const requestOptions = await buildRequestOptions(job.data)
24 24
25 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) 25 const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
26 26
27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) 27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
28} 28}
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts
index 7d4102047..48b6c682b 100644
--- a/server/lib/worker/parent-process.ts
+++ b/server/lib/worker/parent-process.ts
@@ -1,7 +1,7 @@
1import { join } from 'path' 1import { join } from 'path'
2import Piscina from 'piscina' 2import Piscina from 'piscina'
3import { processImage } from '@server/helpers/image-utils' 3import { processImage } from '@server/helpers/image-utils'
4import { WORKER_THREADS } from '@server/initializers/constants' 4import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants'
5import { httpBroadcast } from './workers/http-broadcast' 5import { httpBroadcast } from './workers/http-broadcast'
6import { downloadImage } from './workers/image-downloader' 6import { downloadImage } from './workers/image-downloader'
7 7
@@ -43,8 +43,9 @@ function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadca
43 if (!parallelHTTPBroadcastWorker) { 43 if (!parallelHTTPBroadcastWorker) {
44 parallelHTTPBroadcastWorker = new Piscina({ 44 parallelHTTPBroadcastWorker = new Piscina({
45 filename: join(__dirname, 'workers', 'http-broadcast.js'), 45 filename: join(__dirname, 'workers', 'http-broadcast.js'),
46 concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY, 46 // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
47 maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS 47 concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
48 maxThreads: 1
48 }) 49 })
49 } 50 }
50 51
@@ -59,8 +60,9 @@ function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroad
59 if (!sequentialHTTPBroadcastWorker) { 60 if (!sequentialHTTPBroadcastWorker) {
60 sequentialHTTPBroadcastWorker = new Piscina({ 61 sequentialHTTPBroadcastWorker = new Piscina({
61 filename: join(__dirname, 'workers', 'http-broadcast.js'), 62 filename: join(__dirname, 'workers', 'http-broadcast.js'),
62 concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY, 63 // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
63 maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS 64 concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'],
65 maxThreads: 1
64 }) 66 })
65 } 67 }
66 68