diff options
author | Chocobozzz <me@florianbigard.com> | 2022-09-08 12:26:46 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-09-08 12:27:22 +0200 |
commit | 405c83f9af377a663a4c8e9ad025fd5c10496922 (patch) | |
tree | 88062f7d9e23bdf879358963975891387e388e2d /server/lib/worker | |
parent | d800ec5f36fa8d57f29c577bcf797a79081342d5 (diff) | |
download | PeerTube-405c83f9af377a663a4c8e9ad025fd5c10496922.tar.gz PeerTube-405c83f9af377a663a4c8e9ad025fd5c10496922.tar.zst PeerTube-405c83f9af377a663a4c8e9ad025fd5c10496922.zip |
Use worker thread to send HTTP requests
Compute HTTP signature could be CPU intensive
Diffstat (limited to 'server/lib/worker')
-rw-r--r-- | server/lib/worker/parent-process.ts | 37 | ||||
-rw-r--r-- | server/lib/worker/workers/http-broadcast.ts | 32 |
2 files changed, 68 insertions, 1 deletions
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts index 4bc7f2620..7d4102047 100644 --- a/server/lib/worker/parent-process.ts +++ b/server/lib/worker/parent-process.ts | |||
@@ -2,6 +2,7 @@ import { join } from 'path' | |||
2 | import Piscina from 'piscina' | 2 | import Piscina from 'piscina' |
3 | import { processImage } from '@server/helpers/image-utils' | 3 | import { processImage } from '@server/helpers/image-utils' |
4 | import { WORKER_THREADS } from '@server/initializers/constants' | 4 | import { WORKER_THREADS } from '@server/initializers/constants' |
5 | import { httpBroadcast } from './workers/http-broadcast' | ||
5 | import { downloadImage } from './workers/image-downloader' | 6 | import { downloadImage } from './workers/image-downloader' |
6 | 7 | ||
7 | let downloadImageWorker: Piscina | 8 | let downloadImageWorker: Piscina |
@@ -34,7 +35,41 @@ function processImageFromWorker (options: Parameters<typeof processImage>[0]): P | |||
34 | return processImageWorker.run(options) | 35 | return processImageWorker.run(options) |
35 | } | 36 | } |
36 | 37 | ||
38 | // --------------------------------------------------------------------------- | ||
39 | |||
40 | let parallelHTTPBroadcastWorker: Piscina | ||
41 | |||
42 | function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> { | ||
43 | if (!parallelHTTPBroadcastWorker) { | ||
44 | parallelHTTPBroadcastWorker = new Piscina({ | ||
45 | filename: join(__dirname, 'workers', 'http-broadcast.js'), | ||
46 | concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY, | ||
47 | maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS | ||
48 | }) | ||
49 | } | ||
50 | |||
51 | return parallelHTTPBroadcastWorker.run(options) | ||
52 | } | ||
53 | |||
54 | // --------------------------------------------------------------------------- | ||
55 | |||
56 | let sequentialHTTPBroadcastWorker: Piscina | ||
57 | |||
58 | function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> { | ||
59 | if (!sequentialHTTPBroadcastWorker) { | ||
60 | sequentialHTTPBroadcastWorker = new Piscina({ | ||
61 | filename: join(__dirname, 'workers', 'http-broadcast.js'), | ||
62 | concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY, | ||
63 | maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS | ||
64 | }) | ||
65 | } | ||
66 | |||
67 | return sequentialHTTPBroadcastWorker.run(options) | ||
68 | } | ||
69 | |||
37 | export { | 70 | export { |
38 | downloadImageFromWorker, | 71 | downloadImageFromWorker, |
39 | processImageFromWorker | 72 | processImageFromWorker, |
73 | parallelHTTPBroadcastFromWorker, | ||
74 | sequentialHTTPBroadcastFromWorker | ||
40 | } | 75 | } |
diff --git a/server/lib/worker/workers/http-broadcast.ts b/server/lib/worker/workers/http-broadcast.ts new file mode 100644 index 000000000..8c9c6b8ca --- /dev/null +++ b/server/lib/worker/workers/http-broadcast.ts | |||
@@ -0,0 +1,32 @@ | |||
1 | import { map } from 'bluebird' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests' | ||
4 | import { BROADCAST_CONCURRENCY } from '@server/initializers/constants' | ||
5 | |||
6 | async function httpBroadcast (payload: { | ||
7 | uris: string[] | ||
8 | requestOptions: PeerTubeRequestOptions | ||
9 | }) { | ||
10 | const { uris, requestOptions } = payload | ||
11 | |||
12 | const badUrls: string[] = [] | ||
13 | const goodUrls: string[] = [] | ||
14 | |||
15 | await map(uris, async uri => { | ||
16 | try { | ||
17 | await doRequest(uri, requestOptions) | ||
18 | goodUrls.push(uri) | ||
19 | } catch (err) { | ||
20 | logger.debug('HTTP broadcast to %s failed.', uri, { err }) | ||
21 | badUrls.push(uri) | ||
22 | } | ||
23 | }, { concurrency: BROADCAST_CONCURRENCY }) | ||
24 | |||
25 | return { goodUrls, badUrls } | ||
26 | } | ||
27 | |||
28 | module.exports = httpBroadcast | ||
29 | |||
30 | export { | ||
31 | httpBroadcast | ||
32 | } | ||