diff options
author | Chocobozzz <me@florianbigard.com> | 2022-06-24 16:31:32 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-06-24 16:31:48 +0200 |
commit | c53853ca1b8e32aea5259d436d3d284b9d178919 (patch) | |
tree | dfb081e7fb471ca4b75fd67eabd3216cc6b33129 /server | |
parent | 2873f00bd89d8f1b5f88614415f8142a5c2065c3 (diff) | |
download | PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.tar.gz PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.tar.zst PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.zip |
Introduce worker threads to process remote images
Diffstat (limited to 'server')
-rw-r--r-- | server/controllers/lazy-static.ts | 4 | ||||
-rw-r--r-- | server/helpers/requests.ts | 19 | ||||
-rw-r--r-- | server/initializers/constants.ts | 9 | ||||
-rw-r--r-- | server/lib/local-actor.ts | 35 | ||||
-rw-r--r-- | server/lib/thumbnail.ts | 11 | ||||
-rw-r--r-- | server/lib/worker/parent-process.ts | 18 | ||||
-rw-r--r-- | server/lib/worker/workers/image-downloader.ts | 33 |
7 files changed, 82 insertions, 47 deletions
diff --git a/server/controllers/lazy-static.ts b/server/controllers/lazy-static.ts index 8a180b5bc..0cab5dcd0 100644 --- a/server/controllers/lazy-static.ts +++ b/server/controllers/lazy-static.ts | |||
@@ -6,7 +6,7 @@ import { HttpStatusCode } from '../../shared/models/http/http-error-codes' | |||
6 | import { logger } from '../helpers/logger' | 6 | import { logger } from '../helpers/logger' |
7 | import { ACTOR_IMAGES_SIZE, LAZY_STATIC_PATHS, STATIC_MAX_AGE } from '../initializers/constants' | 7 | import { ACTOR_IMAGES_SIZE, LAZY_STATIC_PATHS, STATIC_MAX_AGE } from '../initializers/constants' |
8 | import { VideosCaptionCache, VideosPreviewCache } from '../lib/files-cache' | 8 | import { VideosCaptionCache, VideosPreviewCache } from '../lib/files-cache' |
9 | import { actorImagePathUnsafeCache, pushActorImageProcessInQueue } from '../lib/local-actor' | 9 | import { actorImagePathUnsafeCache, downloadActorImageFromWorker } from '../lib/local-actor' |
10 | import { asyncMiddleware } from '../middlewares' | 10 | import { asyncMiddleware } from '../middlewares' |
11 | import { ActorImageModel } from '../models/actor/actor-image' | 11 | import { ActorImageModel } from '../models/actor/actor-image' |
12 | 12 | ||
@@ -65,7 +65,7 @@ async function getActorImage (req: express.Request, res: express.Response, next: | |||
65 | logger.info('Lazy serve remote actor image %s.', image.fileUrl) | 65 | logger.info('Lazy serve remote actor image %s.', image.fileUrl) |
66 | 66 | ||
67 | try { | 67 | try { |
68 | await pushActorImageProcessInQueue({ | 68 | await downloadActorImageFromWorker({ |
69 | filename: image.filename, | 69 | filename: image.filename, |
70 | fileUrl: image.fileUrl, | 70 | fileUrl: image.fileUrl, |
71 | size: getActorImageSize(image), | 71 | size: getActorImageSize(image), |
diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts index a9869e987..495e83558 100644 --- a/server/helpers/requests.ts +++ b/server/helpers/requests.ts | |||
@@ -1,11 +1,8 @@ | |||
1 | import { createWriteStream, remove } from 'fs-extra' | 1 | import { createWriteStream, remove } from 'fs-extra' |
2 | import got, { CancelableRequest, NormalizedOptions, Options as GotOptions, RequestError, Response } from 'got' | 2 | import got, { CancelableRequest, NormalizedOptions, Options as GotOptions, RequestError, Response } from 'got' |
3 | import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent' | 3 | import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent' |
4 | import { join } from 'path' | ||
5 | import { CONFIG } from '../initializers/config' | ||
6 | import { ACTIVITY_PUB, BINARY_CONTENT_TYPES, PEERTUBE_VERSION, REQUEST_TIMEOUTS, WEBSERVER } from '../initializers/constants' | 4 | import { ACTIVITY_PUB, BINARY_CONTENT_TYPES, PEERTUBE_VERSION, REQUEST_TIMEOUTS, WEBSERVER } from '../initializers/constants' |
7 | import { pipelinePromise } from './core-utils' | 5 | import { pipelinePromise } from './core-utils' |
8 | import { processImage } from './image-utils' | ||
9 | import { logger, loggerTagsFactory } from './logger' | 6 | import { logger, loggerTagsFactory } from './logger' |
10 | import { getProxy, isProxyEnabled } from './proxy' | 7 | import { getProxy, isProxyEnabled } from './proxy' |
11 | 8 | ||
@@ -147,21 +144,6 @@ async function doRequestAndSaveToFile ( | |||
147 | } | 144 | } |
148 | } | 145 | } |
149 | 146 | ||
150 | async function downloadImage (url: string, destDir: string, destName: string, size: { width: number, height: number }) { | ||
151 | const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName) | ||
152 | await doRequestAndSaveToFile(url, tmpPath) | ||
153 | |||
154 | const destPath = join(destDir, destName) | ||
155 | |||
156 | try { | ||
157 | await processImage(tmpPath, destPath, size) | ||
158 | } catch (err) { | ||
159 | await remove(tmpPath) | ||
160 | |||
161 | throw err | ||
162 | } | ||
163 | } | ||
164 | |||
165 | function getAgent () { | 147 | function getAgent () { |
166 | if (!isProxyEnabled()) return {} | 148 | if (!isProxyEnabled()) return {} |
167 | 149 | ||
@@ -211,7 +193,6 @@ export { | |||
211 | doJSONRequest, | 193 | doJSONRequest, |
212 | doRequestAndSaveToFile, | 194 | doRequestAndSaveToFile, |
213 | isBinaryResponse, | 195 | isBinaryResponse, |
214 | downloadImage, | ||
215 | getAgent, | 196 | getAgent, |
216 | findLatestRedirection, | 197 | findLatestRedirection, |
217 | peertubeGot | 198 | peertubeGot |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index d469ce425..175935835 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -744,8 +744,11 @@ const MEMOIZE_LENGTH = { | |||
744 | VIDEO_DURATION: 200 | 744 | VIDEO_DURATION: 200 |
745 | } | 745 | } |
746 | 746 | ||
747 | const QUEUE_CONCURRENCY = { | 747 | const WORKER_THREADS = { |
748 | ACTOR_PROCESS_IMAGE: 3 | 748 | DOWNLOAD_IMAGE: { |
749 | CONCURRENCY: 3, | ||
750 | MAX_THREADS: 1 | ||
751 | } | ||
749 | } | 752 | } |
750 | 753 | ||
751 | const REDUNDANCY = { | 754 | const REDUNDANCY = { |
@@ -955,7 +958,7 @@ export { | |||
955 | VIDEO_PRIVACIES, | 958 | VIDEO_PRIVACIES, |
956 | VIDEO_LICENCES, | 959 | VIDEO_LICENCES, |
957 | VIDEO_STATES, | 960 | VIDEO_STATES, |
958 | QUEUE_CONCURRENCY, | 961 | WORKER_THREADS, |
959 | VIDEO_RATE_TYPES, | 962 | VIDEO_RATE_TYPES, |
960 | JOB_PRIORITY, | 963 | JOB_PRIORITY, |
961 | VIDEO_TRANSCODING_FPS, | 964 | VIDEO_TRANSCODING_FPS, |
diff --git a/server/lib/local-actor.ts b/server/lib/local-actor.ts index 01046d017..e3b04c094 100644 --- a/server/lib/local-actor.ts +++ b/server/lib/local-actor.ts | |||
@@ -1,4 +1,3 @@ | |||
1 | import { queue } from 'async' | ||
2 | import { remove } from 'fs-extra' | 1 | import { remove } from 'fs-extra' |
3 | import LRUCache from 'lru-cache' | 2 | import LRUCache from 'lru-cache' |
4 | import { join } from 'path' | 3 | import { join } from 'path' |
@@ -8,13 +7,13 @@ import { buildUUID } from '@shared/extra-utils' | |||
8 | import { ActivityPubActorType, ActorImageType } from '@shared/models' | 7 | import { ActivityPubActorType, ActorImageType } from '@shared/models' |
9 | import { retryTransactionWrapper } from '../helpers/database-utils' | 8 | import { retryTransactionWrapper } from '../helpers/database-utils' |
10 | import { processImage } from '../helpers/image-utils' | 9 | import { processImage } from '../helpers/image-utils' |
11 | import { downloadImage } from '../helpers/requests' | ||
12 | import { CONFIG } from '../initializers/config' | 10 | import { CONFIG } from '../initializers/config' |
13 | import { ACTOR_IMAGES_SIZE, LRU_CACHE, QUEUE_CONCURRENCY, WEBSERVER } from '../initializers/constants' | 11 | import { ACTOR_IMAGES_SIZE, LRU_CACHE, WEBSERVER } from '../initializers/constants' |
14 | import { sequelizeTypescript } from '../initializers/database' | 12 | import { sequelizeTypescript } from '../initializers/database' |
15 | import { MAccountDefault, MActor, MChannelDefault } from '../types/models' | 13 | import { MAccountDefault, MActor, MChannelDefault } from '../types/models' |
16 | import { deleteActorImages, updateActorImages } from './activitypub/actors' | 14 | import { deleteActorImages, updateActorImages } from './activitypub/actors' |
17 | import { sendUpdateActor } from './activitypub/send' | 15 | import { sendUpdateActor } from './activitypub/send' |
16 | import { downloadImageFromWorker } from './worker/parent-process' | ||
18 | 17 | ||
19 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) { | 18 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) { |
20 | return new ActorModel({ | 19 | return new ActorModel({ |
@@ -87,27 +86,22 @@ async function deleteLocalActorImageFile (accountOrChannel: MAccountDefault | MC | |||
87 | }) | 86 | }) |
88 | } | 87 | } |
89 | 88 | ||
90 | type DownloadImageQueueTask = { | 89 | // --------------------------------------------------------------------------- |
90 | |||
91 | function downloadActorImageFromWorker (options: { | ||
91 | fileUrl: string | 92 | fileUrl: string |
92 | filename: string | 93 | filename: string |
93 | type: ActorImageType | 94 | type: ActorImageType |
94 | size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0] | 95 | size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0] |
95 | } | 96 | }) { |
96 | 97 | const downloaderOptions = { | |
97 | const downloadImageQueue = queue<DownloadImageQueueTask, Error>((task, cb) => { | 98 | url: options.fileUrl, |
98 | downloadImage(task.fileUrl, CONFIG.STORAGE.ACTOR_IMAGES, task.filename, task.size) | 99 | destDir: CONFIG.STORAGE.ACTOR_IMAGES, |
99 | .then(() => cb()) | 100 | destName: options.filename, |
100 | .catch(err => cb(err)) | 101 | size: options.size |
101 | }, QUEUE_CONCURRENCY.ACTOR_PROCESS_IMAGE) | 102 | } |
102 | |||
103 | function pushActorImageProcessInQueue (task: DownloadImageQueueTask) { | ||
104 | return new Promise<void>((res, rej) => { | ||
105 | downloadImageQueue.push(task, err => { | ||
106 | if (err) return rej(err) | ||
107 | 103 | ||
108 | return res() | 104 | return downloadImageFromWorker(downloaderOptions) |
109 | }) | ||
110 | }) | ||
111 | } | 105 | } |
112 | 106 | ||
113 | // Unsafe so could returns paths that does not exist anymore | 107 | // Unsafe so could returns paths that does not exist anymore |
@@ -116,7 +110,8 @@ const actorImagePathUnsafeCache = new LRUCache<string, string>({ max: LRU_CACHE. | |||
116 | export { | 110 | export { |
117 | actorImagePathUnsafeCache, | 111 | actorImagePathUnsafeCache, |
118 | updateLocalActorImageFiles, | 112 | updateLocalActorImageFiles, |
113 | downloadActorImageFromWorker, | ||
119 | deleteLocalActorImageFile, | 114 | deleteLocalActorImageFile, |
120 | pushActorImageProcessInQueue, | 115 | downloadImageFromWorker, |
121 | buildActorInstance | 116 | buildActorInstance |
122 | } | 117 | } |
diff --git a/server/lib/thumbnail.ts b/server/lib/thumbnail.ts index aa2d7a813..f00c87623 100644 --- a/server/lib/thumbnail.ts +++ b/server/lib/thumbnail.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { join } from 'path' | 1 | import { join } from 'path' |
2 | import { ThumbnailType } from '@shared/models' | 2 | import { ThumbnailType } from '@shared/models' |
3 | import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils' | 3 | import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils' |
4 | import { downloadImage } from '../helpers/requests' | ||
5 | import { CONFIG } from '../initializers/config' | 4 | import { CONFIG } from '../initializers/config' |
6 | import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants' | 5 | import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants' |
7 | import { ThumbnailModel } from '../models/video/thumbnail' | 6 | import { ThumbnailModel } from '../models/video/thumbnail' |
8 | import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models' | 7 | import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models' |
9 | import { MThumbnail } from '../types/models/video/thumbnail' | 8 | import { MThumbnail } from '../types/models/video/thumbnail' |
10 | import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist' | 9 | import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist' |
10 | import { downloadImageFromWorker } from './local-actor' | ||
11 | import { VideoPathManager } from './video-path-manager' | 11 | import { VideoPathManager } from './video-path-manager' |
12 | 12 | ||
13 | type ImageSize = { height?: number, width?: number } | 13 | type ImageSize = { height?: number, width?: number } |
@@ -49,7 +49,10 @@ function updatePlaylistMiniatureFromUrl (options: { | |||
49 | ? null | 49 | ? null |
50 | : downloadUrl | 50 | : downloadUrl |
51 | 51 | ||
52 | const thumbnailCreator = () => downloadImage(downloadUrl, basePath, filename, { width, height }) | 52 | const thumbnailCreator = () => { |
53 | return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } }) | ||
54 | } | ||
55 | |||
53 | return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl }) | 56 | return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl }) |
54 | } | 57 | } |
55 | 58 | ||
@@ -75,7 +78,9 @@ function updateVideoMiniatureFromUrl (options: { | |||
75 | : existingThumbnail.filename | 78 | : existingThumbnail.filename |
76 | 79 | ||
77 | const thumbnailCreator = () => { | 80 | const thumbnailCreator = () => { |
78 | if (thumbnailUrlChanged) return downloadImage(downloadUrl, basePath, filename, { width, height }) | 81 | if (thumbnailUrlChanged) { |
82 | return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } }) | ||
83 | } | ||
79 | 84 | ||
80 | return Promise.resolve() | 85 | return Promise.resolve() |
81 | } | 86 | } |
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts new file mode 100644 index 000000000..18dabd97f --- /dev/null +++ b/server/lib/worker/parent-process.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import { join } from 'path' | ||
2 | import Piscina from 'piscina' | ||
3 | import { WORKER_THREADS } from '@server/initializers/constants' | ||
4 | import { downloadImage } from './workers/image-downloader' | ||
5 | |||
6 | const downloadImagerWorker = new Piscina({ | ||
7 | filename: join(__dirname, 'workers', 'image-downloader.js'), | ||
8 | concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY, | ||
9 | maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS | ||
10 | }) | ||
11 | |||
12 | function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> { | ||
13 | return downloadImagerWorker.run(options) | ||
14 | } | ||
15 | |||
16 | export { | ||
17 | downloadImageFromWorker | ||
18 | } | ||
diff --git a/server/lib/worker/workers/image-downloader.ts b/server/lib/worker/workers/image-downloader.ts new file mode 100644 index 000000000..8d4a6b37e --- /dev/null +++ b/server/lib/worker/workers/image-downloader.ts | |||
@@ -0,0 +1,33 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { processImage } from '@server/helpers/image-utils' | ||
4 | import { doRequestAndSaveToFile } from '@server/helpers/requests' | ||
5 | import { CONFIG } from '@server/initializers/config' | ||
6 | |||
7 | async function downloadImage (options: { | ||
8 | url: string | ||
9 | destDir: string | ||
10 | destName: string | ||
11 | size: { width: number, height: number } | ||
12 | }) { | ||
13 | const { url, destDir, destName, size } = options | ||
14 | |||
15 | const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName) | ||
16 | await doRequestAndSaveToFile(url, tmpPath) | ||
17 | |||
18 | const destPath = join(destDir, destName) | ||
19 | |||
20 | try { | ||
21 | await processImage(tmpPath, destPath, size) | ||
22 | } catch (err) { | ||
23 | await remove(tmpPath) | ||
24 | |||
25 | throw err | ||
26 | } | ||
27 | } | ||
28 | |||
29 | module.exports = downloadImage | ||
30 | |||
31 | export { | ||
32 | downloadImage | ||
33 | } | ||