aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-09-08 12:26:46 +0200
committerChocobozzz <me@florianbigard.com>2022-09-08 12:27:22 +0200
commit405c83f9af377a663a4c8e9ad025fd5c10496922 (patch)
tree88062f7d9e23bdf879358963975891387e388e2d /server/lib/job-queue
parentd800ec5f36fa8d57f29c577bcf797a79081342d5 (diff)
downloadPeerTube-405c83f9af377a663a4c8e9ad025fd5c10496922.tar.gz
PeerTube-405c83f9af377a663a4c8e9ad025fd5c10496922.tar.zst
PeerTube-405c83f9af377a663a4c8e9ad025fd5c10496922.zip
Use worker thread to send HTTP requests
Compute HTTP signature could be CPU intensive
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts54
-rw-r--r--server/lib/job-queue/job-queue.ts6
2 files changed, 32 insertions, 28 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 13eff5211..733c1378a 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,39 +1,28 @@
1import { map } from 'bluebird'
2import { Job } from 'bullmq' 1import { Job } from 'bullmq'
3import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
4import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' 3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models' 5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6import { logger } from '../../../helpers/logger' 6import { logger } from '../../../helpers/logger'
7import { doRequest } from '../../../helpers/requests'
8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
9 7
10async function processActivityPubHttpBroadcast (job: Job) { 8// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
9
10async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
11 logger.info('Processing ActivityPub broadcast in job %s.', job.id) 11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12 12
13 const payload = job.data as ActivitypubHttpBroadcastPayload 13 const requestOptions = await buildRequestOptions(job.data)
14 14
15 const body = await computeBody(payload) 15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
16 const httpSignatureOptions = await buildSignedRequestOptions(payload)
17 16
18 const options = { 17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
19 method: 'POST' as 'POST', 18}
20 json: body, 19
21 httpSignature: httpSignatureOptions, 20async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
22 headers: buildGlobalHeaders(body) 21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
23 }
24 22
25 const badUrls: string[] = [] 23 const requestOptions = await buildRequestOptions(job.data)
26 const goodUrls: string[] = []
27 24
28 await map(payload.uris, async uri => { 25 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
29 try {
30 await doRequest(uri, options)
31 goodUrls.push(uri)
32 } catch (err) {
33 logger.debug('HTTP broadcast to %s failed.', uri, { err })
34 badUrls.push(uri)
35 }
36 }, { concurrency: BROADCAST_CONCURRENCY })
37 26
38 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) 27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
39} 28}
@@ -41,5 +30,20 @@ async function processActivityPubHttpBroadcast (job: Job) {
41// --------------------------------------------------------------------------- 30// ---------------------------------------------------------------------------
42 31
43export { 32export {
44 processActivityPubHttpBroadcast 33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
35}
36
37// ---------------------------------------------------------------------------
38
39async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions(payload)
42
43 return {
44 method: 'POST' as 'POST',
45 json: body,
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)
48 }
45} 49}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0fcaaf466..e54d12acd 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -45,7 +45,7 @@ import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_
45import { Hooks } from '../plugins/hooks' 45import { Hooks } from '../plugins/hooks'
46import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
47import { processActivityPubFollow } from './handlers/activitypub-follow' 47import { processActivityPubFollow } from './handlers/activitypub-follow'
48import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
51import { refreshAPObject } from './handlers/activitypub-refresher' 51import { refreshAPObject } from './handlers/activitypub-refresher'
@@ -96,8 +96,8 @@ export type CreateJobOptions = {
96} 96}
97 97
98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { 98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
99 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 99 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
100 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, 100 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
101 'activitypub-http-unicast': processActivityPubHttpUnicast, 101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher, 102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
103 'activitypub-cleaner': processActivityPubCleaner, 103 'activitypub-cleaner': processActivityPubCleaner,