diff options
author | Chocobozzz <me@florianbigard.com> | 2022-06-24 16:31:32 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-06-24 16:31:48 +0200 |
commit | c53853ca1b8e32aea5259d436d3d284b9d178919 (patch) | |
tree | dfb081e7fb471ca4b75fd67eabd3216cc6b33129 /server/lib | |
parent | 2873f00bd89d8f1b5f88614415f8142a5c2065c3 (diff) | |
download | PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.tar.gz PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.tar.zst PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.zip |
Introduce worker threads to process remote images
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/local-actor.ts | 35 | ||||
-rw-r--r-- | server/lib/thumbnail.ts | 11 | ||||
-rw-r--r-- | server/lib/worker/parent-process.ts | 18 | ||||
-rw-r--r-- | server/lib/worker/workers/image-downloader.ts | 33 |
4 files changed, 74 insertions, 23 deletions
diff --git a/server/lib/local-actor.ts b/server/lib/local-actor.ts index 01046d017..e3b04c094 100644 --- a/server/lib/local-actor.ts +++ b/server/lib/local-actor.ts | |||
@@ -1,4 +1,3 @@ | |||
1 | import { queue } from 'async' | ||
2 | import { remove } from 'fs-extra' | 1 | import { remove } from 'fs-extra' |
3 | import LRUCache from 'lru-cache' | 2 | import LRUCache from 'lru-cache' |
4 | import { join } from 'path' | 3 | import { join } from 'path' |
@@ -8,13 +7,13 @@ import { buildUUID } from '@shared/extra-utils' | |||
8 | import { ActivityPubActorType, ActorImageType } from '@shared/models' | 7 | import { ActivityPubActorType, ActorImageType } from '@shared/models' |
9 | import { retryTransactionWrapper } from '../helpers/database-utils' | 8 | import { retryTransactionWrapper } from '../helpers/database-utils' |
10 | import { processImage } from '../helpers/image-utils' | 9 | import { processImage } from '../helpers/image-utils' |
11 | import { downloadImage } from '../helpers/requests' | ||
12 | import { CONFIG } from '../initializers/config' | 10 | import { CONFIG } from '../initializers/config' |
13 | import { ACTOR_IMAGES_SIZE, LRU_CACHE, QUEUE_CONCURRENCY, WEBSERVER } from '../initializers/constants' | 11 | import { ACTOR_IMAGES_SIZE, LRU_CACHE, WEBSERVER } from '../initializers/constants' |
14 | import { sequelizeTypescript } from '../initializers/database' | 12 | import { sequelizeTypescript } from '../initializers/database' |
15 | import { MAccountDefault, MActor, MChannelDefault } from '../types/models' | 13 | import { MAccountDefault, MActor, MChannelDefault } from '../types/models' |
16 | import { deleteActorImages, updateActorImages } from './activitypub/actors' | 14 | import { deleteActorImages, updateActorImages } from './activitypub/actors' |
17 | import { sendUpdateActor } from './activitypub/send' | 15 | import { sendUpdateActor } from './activitypub/send' |
16 | import { downloadImageFromWorker } from './worker/parent-process' | ||
18 | 17 | ||
19 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) { | 18 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) { |
20 | return new ActorModel({ | 19 | return new ActorModel({ |
@@ -87,27 +86,22 @@ async function deleteLocalActorImageFile (accountOrChannel: MAccountDefault | MC | |||
87 | }) | 86 | }) |
88 | } | 87 | } |
89 | 88 | ||
90 | type DownloadImageQueueTask = { | 89 | // --------------------------------------------------------------------------- |
90 | |||
91 | function downloadActorImageFromWorker (options: { | ||
91 | fileUrl: string | 92 | fileUrl: string |
92 | filename: string | 93 | filename: string |
93 | type: ActorImageType | 94 | type: ActorImageType |
94 | size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0] | 95 | size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0] |
95 | } | 96 | }) { |
96 | 97 | const downloaderOptions = { | |
97 | const downloadImageQueue = queue<DownloadImageQueueTask, Error>((task, cb) => { | 98 | url: options.fileUrl, |
98 | downloadImage(task.fileUrl, CONFIG.STORAGE.ACTOR_IMAGES, task.filename, task.size) | 99 | destDir: CONFIG.STORAGE.ACTOR_IMAGES, |
99 | .then(() => cb()) | 100 | destName: options.filename, |
100 | .catch(err => cb(err)) | 101 | size: options.size |
101 | }, QUEUE_CONCURRENCY.ACTOR_PROCESS_IMAGE) | 102 | } |
102 | |||
103 | function pushActorImageProcessInQueue (task: DownloadImageQueueTask) { | ||
104 | return new Promise<void>((res, rej) => { | ||
105 | downloadImageQueue.push(task, err => { | ||
106 | if (err) return rej(err) | ||
107 | 103 | ||
108 | return res() | 104 | return downloadImageFromWorker(downloaderOptions) |
109 | }) | ||
110 | }) | ||
111 | } | 105 | } |
112 | 106 | ||
113 | // Unsafe so could returns paths that does not exist anymore | 107 | // Unsafe so could returns paths that does not exist anymore |
@@ -116,7 +110,8 @@ const actorImagePathUnsafeCache = new LRUCache<string, string>({ max: LRU_CACHE. | |||
116 | export { | 110 | export { |
117 | actorImagePathUnsafeCache, | 111 | actorImagePathUnsafeCache, |
118 | updateLocalActorImageFiles, | 112 | updateLocalActorImageFiles, |
113 | downloadActorImageFromWorker, | ||
119 | deleteLocalActorImageFile, | 114 | deleteLocalActorImageFile, |
120 | pushActorImageProcessInQueue, | 115 | downloadImageFromWorker, |
121 | buildActorInstance | 116 | buildActorInstance |
122 | } | 117 | } |
diff --git a/server/lib/thumbnail.ts b/server/lib/thumbnail.ts index aa2d7a813..f00c87623 100644 --- a/server/lib/thumbnail.ts +++ b/server/lib/thumbnail.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { join } from 'path' | 1 | import { join } from 'path' |
2 | import { ThumbnailType } from '@shared/models' | 2 | import { ThumbnailType } from '@shared/models' |
3 | import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils' | 3 | import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils' |
4 | import { downloadImage } from '../helpers/requests' | ||
5 | import { CONFIG } from '../initializers/config' | 4 | import { CONFIG } from '../initializers/config' |
6 | import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants' | 5 | import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants' |
7 | import { ThumbnailModel } from '../models/video/thumbnail' | 6 | import { ThumbnailModel } from '../models/video/thumbnail' |
8 | import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models' | 7 | import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models' |
9 | import { MThumbnail } from '../types/models/video/thumbnail' | 8 | import { MThumbnail } from '../types/models/video/thumbnail' |
10 | import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist' | 9 | import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist' |
10 | import { downloadImageFromWorker } from './local-actor' | ||
11 | import { VideoPathManager } from './video-path-manager' | 11 | import { VideoPathManager } from './video-path-manager' |
12 | 12 | ||
13 | type ImageSize = { height?: number, width?: number } | 13 | type ImageSize = { height?: number, width?: number } |
@@ -49,7 +49,10 @@ function updatePlaylistMiniatureFromUrl (options: { | |||
49 | ? null | 49 | ? null |
50 | : downloadUrl | 50 | : downloadUrl |
51 | 51 | ||
52 | const thumbnailCreator = () => downloadImage(downloadUrl, basePath, filename, { width, height }) | 52 | const thumbnailCreator = () => { |
53 | return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } }) | ||
54 | } | ||
55 | |||
53 | return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl }) | 56 | return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl }) |
54 | } | 57 | } |
55 | 58 | ||
@@ -75,7 +78,9 @@ function updateVideoMiniatureFromUrl (options: { | |||
75 | : existingThumbnail.filename | 78 | : existingThumbnail.filename |
76 | 79 | ||
77 | const thumbnailCreator = () => { | 80 | const thumbnailCreator = () => { |
78 | if (thumbnailUrlChanged) return downloadImage(downloadUrl, basePath, filename, { width, height }) | 81 | if (thumbnailUrlChanged) { |
82 | return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } }) | ||
83 | } | ||
79 | 84 | ||
80 | return Promise.resolve() | 85 | return Promise.resolve() |
81 | } | 86 | } |
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts new file mode 100644 index 000000000..18dabd97f --- /dev/null +++ b/server/lib/worker/parent-process.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import { join } from 'path' | ||
2 | import Piscina from 'piscina' | ||
3 | import { WORKER_THREADS } from '@server/initializers/constants' | ||
4 | import { downloadImage } from './workers/image-downloader' | ||
5 | |||
6 | const downloadImagerWorker = new Piscina({ | ||
7 | filename: join(__dirname, 'workers', 'image-downloader.js'), | ||
8 | concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY, | ||
9 | maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS | ||
10 | }) | ||
11 | |||
12 | function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> { | ||
13 | return downloadImagerWorker.run(options) | ||
14 | } | ||
15 | |||
16 | export { | ||
17 | downloadImageFromWorker | ||
18 | } | ||
diff --git a/server/lib/worker/workers/image-downloader.ts b/server/lib/worker/workers/image-downloader.ts new file mode 100644 index 000000000..8d4a6b37e --- /dev/null +++ b/server/lib/worker/workers/image-downloader.ts | |||
@@ -0,0 +1,33 @@ | |||
1 | import { remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { processImage } from '@server/helpers/image-utils' | ||
4 | import { doRequestAndSaveToFile } from '@server/helpers/requests' | ||
5 | import { CONFIG } from '@server/initializers/config' | ||
6 | |||
7 | async function downloadImage (options: { | ||
8 | url: string | ||
9 | destDir: string | ||
10 | destName: string | ||
11 | size: { width: number, height: number } | ||
12 | }) { | ||
13 | const { url, destDir, destName, size } = options | ||
14 | |||
15 | const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName) | ||
16 | await doRequestAndSaveToFile(url, tmpPath) | ||
17 | |||
18 | const destPath = join(destDir, destName) | ||
19 | |||
20 | try { | ||
21 | await processImage(tmpPath, destPath, size) | ||
22 | } catch (err) { | ||
23 | await remove(tmpPath) | ||
24 | |||
25 | throw err | ||
26 | } | ||
27 | } | ||
28 | |||
29 | module.exports = downloadImage | ||
30 | |||
31 | export { | ||
32 | downloadImage | ||
33 | } | ||