aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/worker
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/worker')
-rw-r--r--server/lib/worker/parent-process.ts37
-rw-r--r--server/lib/worker/workers/http-broadcast.ts32
2 files changed, 68 insertions, 1 deletions
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts
index 4bc7f2620..7d4102047 100644
--- a/server/lib/worker/parent-process.ts
+++ b/server/lib/worker/parent-process.ts
@@ -2,6 +2,7 @@ import { join } from 'path'
2import Piscina from 'piscina' 2import Piscina from 'piscina'
3import { processImage } from '@server/helpers/image-utils' 3import { processImage } from '@server/helpers/image-utils'
4import { WORKER_THREADS } from '@server/initializers/constants' 4import { WORKER_THREADS } from '@server/initializers/constants'
5import { httpBroadcast } from './workers/http-broadcast'
5import { downloadImage } from './workers/image-downloader' 6import { downloadImage } from './workers/image-downloader'
6 7
7let downloadImageWorker: Piscina 8let downloadImageWorker: Piscina
@@ -34,7 +35,41 @@ function processImageFromWorker (options: Parameters<typeof processImage>[0]): P
34 return processImageWorker.run(options) 35 return processImageWorker.run(options)
35} 36}
36 37
38// ---------------------------------------------------------------------------
39
40let parallelHTTPBroadcastWorker: Piscina
41
42function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
43 if (!parallelHTTPBroadcastWorker) {
44 parallelHTTPBroadcastWorker = new Piscina({
45 filename: join(__dirname, 'workers', 'http-broadcast.js'),
46 concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY,
47 maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS
48 })
49 }
50
51 return parallelHTTPBroadcastWorker.run(options)
52}
53
54// ---------------------------------------------------------------------------
55
56let sequentialHTTPBroadcastWorker: Piscina
57
58function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
59 if (!sequentialHTTPBroadcastWorker) {
60 sequentialHTTPBroadcastWorker = new Piscina({
61 filename: join(__dirname, 'workers', 'http-broadcast.js'),
62 concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY,
63 maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS
64 })
65 }
66
67 return sequentialHTTPBroadcastWorker.run(options)
68}
69
37export { 70export {
38 downloadImageFromWorker, 71 downloadImageFromWorker,
39 processImageFromWorker 72 processImageFromWorker,
73 parallelHTTPBroadcastFromWorker,
74 sequentialHTTPBroadcastFromWorker
40} 75}
diff --git a/server/lib/worker/workers/http-broadcast.ts b/server/lib/worker/workers/http-broadcast.ts
new file mode 100644
index 000000000..8c9c6b8ca
--- /dev/null
+++ b/server/lib/worker/workers/http-broadcast.ts
@@ -0,0 +1,32 @@
1import { map } from 'bluebird'
2import { logger } from '@server/helpers/logger'
3import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests'
4import { BROADCAST_CONCURRENCY } from '@server/initializers/constants'
5
6async function httpBroadcast (payload: {
7 uris: string[]
8 requestOptions: PeerTubeRequestOptions
9}) {
10 const { uris, requestOptions } = payload
11
12 const badUrls: string[] = []
13 const goodUrls: string[] = []
14
15 await map(uris, async uri => {
16 try {
17 await doRequest(uri, requestOptions)
18 goodUrls.push(uri)
19 } catch (err) {
20 logger.debug('HTTP broadcast to %s failed.', uri, { err })
21 badUrls.push(uri)
22 }
23 }, { concurrency: BROADCAST_CONCURRENCY })
24
25 return { goodUrls, badUrls }
26}
27
28module.exports = httpBroadcast
29
30export {
31 httpBroadcast
32}