From c53853ca1b8e32aea5259d436d3d284b9d178919 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 24 Jun 2022 16:31:32 +0200 Subject: Introduce worker threads to process remote images --- server/lib/local-actor.ts | 35 ++++++++++++--------------- server/lib/thumbnail.ts | 11 ++++++--- server/lib/worker/parent-process.ts | 18 ++++++++++++++ server/lib/worker/workers/image-downloader.ts | 33 +++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 23 deletions(-) create mode 100644 server/lib/worker/parent-process.ts create mode 100644 server/lib/worker/workers/image-downloader.ts (limited to 'server/lib') diff --git a/server/lib/local-actor.ts b/server/lib/local-actor.ts index 01046d017..e3b04c094 100644 --- a/server/lib/local-actor.ts +++ b/server/lib/local-actor.ts @@ -1,4 +1,3 @@ -import { queue } from 'async' import { remove } from 'fs-extra' import LRUCache from 'lru-cache' import { join } from 'path' @@ -8,13 +7,13 @@ import { buildUUID } from '@shared/extra-utils' import { ActivityPubActorType, ActorImageType } from '@shared/models' import { retryTransactionWrapper } from '../helpers/database-utils' import { processImage } from '../helpers/image-utils' -import { downloadImage } from '../helpers/requests' import { CONFIG } from '../initializers/config' -import { ACTOR_IMAGES_SIZE, LRU_CACHE, QUEUE_CONCURRENCY, WEBSERVER } from '../initializers/constants' +import { ACTOR_IMAGES_SIZE, LRU_CACHE, WEBSERVER } from '../initializers/constants' import { sequelizeTypescript } from '../initializers/database' import { MAccountDefault, MActor, MChannelDefault } from '../types/models' import { deleteActorImages, updateActorImages } from './activitypub/actors' import { sendUpdateActor } from './activitypub/send' +import { downloadImageFromWorker } from './worker/parent-process' function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) { return new ActorModel({ @@ -87,27 +86,22 @@ async function deleteLocalActorImageFile (accountOrChannel: MAccountDefault | MC }) } -type DownloadImageQueueTask = { +// --------------------------------------------------------------------------- + +function downloadActorImageFromWorker (options: { fileUrl: string filename: string type: ActorImageType size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0] -} - -const downloadImageQueue = queue((task, cb) => { - downloadImage(task.fileUrl, CONFIG.STORAGE.ACTOR_IMAGES, task.filename, task.size) - .then(() => cb()) - .catch(err => cb(err)) -}, QUEUE_CONCURRENCY.ACTOR_PROCESS_IMAGE) - -function pushActorImageProcessInQueue (task: DownloadImageQueueTask) { - return new Promise((res, rej) => { - downloadImageQueue.push(task, err => { - if (err) return rej(err) +}) { + const downloaderOptions = { + url: options.fileUrl, + destDir: CONFIG.STORAGE.ACTOR_IMAGES, + destName: options.filename, + size: options.size + } - return res() - }) - }) + return downloadImageFromWorker(downloaderOptions) } // Unsafe so could returns paths that does not exist anymore @@ -116,7 +110,8 @@ const actorImagePathUnsafeCache = new LRUCache({ max: LRU_CACHE. export { actorImagePathUnsafeCache, updateLocalActorImageFiles, + downloadActorImageFromWorker, deleteLocalActorImageFile, - pushActorImageProcessInQueue, + downloadImageFromWorker, buildActorInstance } diff --git a/server/lib/thumbnail.ts b/server/lib/thumbnail.ts index aa2d7a813..f00c87623 100644 --- a/server/lib/thumbnail.ts +++ b/server/lib/thumbnail.ts @@ -1,13 +1,13 @@ import { join } from 'path' import { ThumbnailType } from '@shared/models' import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils' -import { downloadImage } from '../helpers/requests' import { CONFIG } from '../initializers/config' import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants' import { ThumbnailModel } from '../models/video/thumbnail' import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models' import { MThumbnail } from '../types/models/video/thumbnail' import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist' +import { downloadImageFromWorker } from './local-actor' import { VideoPathManager } from './video-path-manager' type ImageSize = { height?: number, width?: number } @@ -49,7 +49,10 @@ function updatePlaylistMiniatureFromUrl (options: { ? null : downloadUrl - const thumbnailCreator = () => downloadImage(downloadUrl, basePath, filename, { width, height }) + const thumbnailCreator = () => { + return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } }) + } + return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl }) } @@ -75,7 +78,9 @@ function updateVideoMiniatureFromUrl (options: { : existingThumbnail.filename const thumbnailCreator = () => { - if (thumbnailUrlChanged) return downloadImage(downloadUrl, basePath, filename, { width, height }) + if (thumbnailUrlChanged) { + return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } }) + } return Promise.resolve() } diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts new file mode 100644 index 000000000..18dabd97f --- /dev/null +++ b/server/lib/worker/parent-process.ts @@ -0,0 +1,18 @@ +import { join } from 'path' +import Piscina from 'piscina' +import { WORKER_THREADS } from '@server/initializers/constants' +import { downloadImage } from './workers/image-downloader' + +const downloadImagerWorker = new Piscina({ + filename: join(__dirname, 'workers', 'image-downloader.js'), + concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY, + maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS +}) + +function downloadImageFromWorker (options: Parameters[0]): Promise> { + return downloadImagerWorker.run(options) +} + +export { + downloadImageFromWorker +} diff --git a/server/lib/worker/workers/image-downloader.ts b/server/lib/worker/workers/image-downloader.ts new file mode 100644 index 000000000..8d4a6b37e --- /dev/null +++ b/server/lib/worker/workers/image-downloader.ts @@ -0,0 +1,33 @@ +import { remove } from 'fs-extra' +import { join } from 'path' +import { processImage } from '@server/helpers/image-utils' +import { doRequestAndSaveToFile } from '@server/helpers/requests' +import { CONFIG } from '@server/initializers/config' + +async function downloadImage (options: { + url: string + destDir: string + destName: string + size: { width: number, height: number } +}) { + const { url, destDir, destName, size } = options + + const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName) + await doRequestAndSaveToFile(url, tmpPath) + + const destPath = join(destDir, destName) + + try { + await processImage(tmpPath, destPath, size) + } catch (err) { + await remove(tmpPath) + + throw err + } +} + +module.exports = downloadImage + +export { + downloadImage +} -- cgit v1.2.3