aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts54
-rw-r--r--server/lib/job-queue/job-queue.ts6
-rw-r--r--server/lib/worker/parent-process.ts37
-rw-r--r--server/lib/worker/workers/http-broadcast.ts32
4 files changed, 100 insertions, 29 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 13eff5211..733c1378a 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,39 +1,28 @@
1import { map } from 'bluebird'
2import { Job } from 'bullmq' 1import { Job } from 'bullmq'
3import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
4import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' 3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models' 5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6import { logger } from '../../../helpers/logger' 6import { logger } from '../../../helpers/logger'
7import { doRequest } from '../../../helpers/requests'
8import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
9 7
10async function processActivityPubHttpBroadcast (job: Job) { 8// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
9
10async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
11 logger.info('Processing ActivityPub broadcast in job %s.', job.id) 11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12 12
13 const payload = job.data as ActivitypubHttpBroadcastPayload 13 const requestOptions = await buildRequestOptions(job.data)
14 14
15 const body = await computeBody(payload) 15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
16 const httpSignatureOptions = await buildSignedRequestOptions(payload)
17 16
18 const options = { 17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
19 method: 'POST' as 'POST', 18}
20 json: body, 19
21 httpSignature: httpSignatureOptions, 20async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
22 headers: buildGlobalHeaders(body) 21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
23 }
24 22
25 const badUrls: string[] = [] 23 const requestOptions = await buildRequestOptions(job.data)
26 const goodUrls: string[] = []
27 24
28 await map(payload.uris, async uri => { 25 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
29 try {
30 await doRequest(uri, options)
31 goodUrls.push(uri)
32 } catch (err) {
33 logger.debug('HTTP broadcast to %s failed.', uri, { err })
34 badUrls.push(uri)
35 }
36 }, { concurrency: BROADCAST_CONCURRENCY })
37 26
38 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) 27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
39} 28}
@@ -41,5 +30,20 @@ async function processActivityPubHttpBroadcast (job: Job) {
41// --------------------------------------------------------------------------- 30// ---------------------------------------------------------------------------
42 31
43export { 32export {
44 processActivityPubHttpBroadcast 33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
35}
36
37// ---------------------------------------------------------------------------
38
39async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions(payload)
42
43 return {
44 method: 'POST' as 'POST',
45 json: body,
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)
48 }
45} 49}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0fcaaf466..e54d12acd 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -45,7 +45,7 @@ import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_
45import { Hooks } from '../plugins/hooks' 45import { Hooks } from '../plugins/hooks'
46import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
47import { processActivityPubFollow } from './handlers/activitypub-follow' 47import { processActivityPubFollow } from './handlers/activitypub-follow'
48import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
51import { refreshAPObject } from './handlers/activitypub-refresher' 51import { refreshAPObject } from './handlers/activitypub-refresher'
@@ -96,8 +96,8 @@ export type CreateJobOptions = {
96} 96}
97 97
98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { 98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
99 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 99 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
100 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, 100 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
101 'activitypub-http-unicast': processActivityPubHttpUnicast, 101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher, 102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
103 'activitypub-cleaner': processActivityPubCleaner, 103 'activitypub-cleaner': processActivityPubCleaner,
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts
index 4bc7f2620..7d4102047 100644
--- a/server/lib/worker/parent-process.ts
+++ b/server/lib/worker/parent-process.ts
@@ -2,6 +2,7 @@ import { join } from 'path'
2import Piscina from 'piscina' 2import Piscina from 'piscina'
3import { processImage } from '@server/helpers/image-utils' 3import { processImage } from '@server/helpers/image-utils'
4import { WORKER_THREADS } from '@server/initializers/constants' 4import { WORKER_THREADS } from '@server/initializers/constants'
5import { httpBroadcast } from './workers/http-broadcast'
5import { downloadImage } from './workers/image-downloader' 6import { downloadImage } from './workers/image-downloader'
6 7
7let downloadImageWorker: Piscina 8let downloadImageWorker: Piscina
@@ -34,7 +35,41 @@ function processImageFromWorker (options: Parameters<typeof processImage>[0]): P
34 return processImageWorker.run(options) 35 return processImageWorker.run(options)
35} 36}
36 37
38// ---------------------------------------------------------------------------
39
40let parallelHTTPBroadcastWorker: Piscina
41
42function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
43 if (!parallelHTTPBroadcastWorker) {
44 parallelHTTPBroadcastWorker = new Piscina({
45 filename: join(__dirname, 'workers', 'http-broadcast.js'),
46 concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY,
47 maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS
48 })
49 }
50
51 return parallelHTTPBroadcastWorker.run(options)
52}
53
54// ---------------------------------------------------------------------------
55
56let sequentialHTTPBroadcastWorker: Piscina
57
58function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
59 if (!sequentialHTTPBroadcastWorker) {
60 sequentialHTTPBroadcastWorker = new Piscina({
61 filename: join(__dirname, 'workers', 'http-broadcast.js'),
62 concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY,
63 maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS
64 })
65 }
66
67 return sequentialHTTPBroadcastWorker.run(options)
68}
69
37export { 70export {
38 downloadImageFromWorker, 71 downloadImageFromWorker,
39 processImageFromWorker 72 processImageFromWorker,
73 parallelHTTPBroadcastFromWorker,
74 sequentialHTTPBroadcastFromWorker
40} 75}
diff --git a/server/lib/worker/workers/http-broadcast.ts b/server/lib/worker/workers/http-broadcast.ts
new file mode 100644
index 000000000..8c9c6b8ca
--- /dev/null
+++ b/server/lib/worker/workers/http-broadcast.ts
@@ -0,0 +1,32 @@
1import { map } from 'bluebird'
2import { logger } from '@server/helpers/logger'
3import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests'
4import { BROADCAST_CONCURRENCY } from '@server/initializers/constants'
5
6async function httpBroadcast (payload: {
7 uris: string[]
8 requestOptions: PeerTubeRequestOptions
9}) {
10 const { uris, requestOptions } = payload
11
12 const badUrls: string[] = []
13 const goodUrls: string[] = []
14
15 await map(uris, async uri => {
16 try {
17 await doRequest(uri, requestOptions)
18 goodUrls.push(uri)
19 } catch (err) {
20 logger.debug('HTTP broadcast to %s failed.', uri, { err })
21 badUrls.push(uri)
22 }
23 }, { concurrency: BROADCAST_CONCURRENCY })
24
25 return { goodUrls, badUrls }
26}
27
28module.exports = httpBroadcast
29
30export {
31 httpBroadcast
32}