]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Introduce worker threads to process remote images
authorChocobozzz <me@florianbigard.com>
Fri, 24 Jun 2022 14:31:32 +0000 (16:31 +0200)
committerChocobozzz <me@florianbigard.com>
Fri, 24 Jun 2022 14:31:48 +0000 (16:31 +0200)
package.json
server/controllers/lazy-static.ts
server/helpers/requests.ts
server/initializers/constants.ts
server/lib/local-actor.ts
server/lib/thumbnail.ts
server/lib/worker/parent-process.ts [new file with mode: 0644]
server/lib/worker/workers/image-downloader.ts [new file with mode: 0644]
yarn.lock

index 0715cd22480fe0f8b7844ec0407945ad02a96edc..67b80ac0ea5d4c1c3657ca57d65795da6e47e7f2 100644 (file)
     "password-generator": "^2.0.2",
     "pem": "^1.12.3",
     "pg": "^8.2.1",
+    "piscina": "^3.2.0",
     "prompt": "^1.0.0",
     "proxy-addr": "^2.0.7",
     "pug": "^3.0.0",
index 8a180b5bcc5e07400d7433ef0bc66753ea679a7e..0cab5dcd082f63a4cee7ddb7c279067b9861e47e 100644 (file)
@@ -6,7 +6,7 @@ import { HttpStatusCode } from '../../shared/models/http/http-error-codes'
 import { logger } from '../helpers/logger'
 import { ACTOR_IMAGES_SIZE, LAZY_STATIC_PATHS, STATIC_MAX_AGE } from '../initializers/constants'
 import { VideosCaptionCache, VideosPreviewCache } from '../lib/files-cache'
-import { actorImagePathUnsafeCache, pushActorImageProcessInQueue } from '../lib/local-actor'
+import { actorImagePathUnsafeCache, downloadActorImageFromWorker } from '../lib/local-actor'
 import { asyncMiddleware } from '../middlewares'
 import { ActorImageModel } from '../models/actor/actor-image'
 
@@ -65,7 +65,7 @@ async function getActorImage (req: express.Request, res: express.Response, next:
     logger.info('Lazy serve remote actor image %s.', image.fileUrl)
 
     try {
-      await pushActorImageProcessInQueue({
+      await downloadActorImageFromWorker({
         filename: image.filename,
         fileUrl: image.fileUrl,
         size: getActorImageSize(image),
index a9869e987cdff4aa85d8174312416de456e1ab37..495e83558703ecfd4c2e51ce8550a21452eaa15c 100644 (file)
@@ -1,11 +1,8 @@
 import { createWriteStream, remove } from 'fs-extra'
 import got, { CancelableRequest, NormalizedOptions, Options as GotOptions, RequestError, Response } from 'got'
 import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent'
-import { join } from 'path'
-import { CONFIG } from '../initializers/config'
 import { ACTIVITY_PUB, BINARY_CONTENT_TYPES, PEERTUBE_VERSION, REQUEST_TIMEOUTS, WEBSERVER } from '../initializers/constants'
 import { pipelinePromise } from './core-utils'
-import { processImage } from './image-utils'
 import { logger, loggerTagsFactory } from './logger'
 import { getProxy, isProxyEnabled } from './proxy'
 
@@ -147,21 +144,6 @@ async function doRequestAndSaveToFile (
   }
 }
 
-async function downloadImage (url: string, destDir: string, destName: string, size: { width: number, height: number }) {
-  const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName)
-  await doRequestAndSaveToFile(url, tmpPath)
-
-  const destPath = join(destDir, destName)
-
-  try {
-    await processImage(tmpPath, destPath, size)
-  } catch (err) {
-    await remove(tmpPath)
-
-    throw err
-  }
-}
-
 function getAgent () {
   if (!isProxyEnabled()) return {}
 
@@ -211,7 +193,6 @@ export {
   doJSONRequest,
   doRequestAndSaveToFile,
   isBinaryResponse,
-  downloadImage,
   getAgent,
   findLatestRedirection,
   peertubeGot
index d469ce425146bd6d3d0415e1163324b575a82e45..175935835bcba09b363fd1e2055ca5da2a92e120 100644 (file)
@@ -744,8 +744,11 @@ const MEMOIZE_LENGTH = {
   VIDEO_DURATION: 200
 }
 
-const QUEUE_CONCURRENCY = {
-  ACTOR_PROCESS_IMAGE: 3
+const WORKER_THREADS = {
+  DOWNLOAD_IMAGE: {
+    CONCURRENCY: 3,
+    MAX_THREADS: 1
+  }
 }
 
 const REDUNDANCY = {
@@ -955,7 +958,7 @@ export {
   VIDEO_PRIVACIES,
   VIDEO_LICENCES,
   VIDEO_STATES,
-  QUEUE_CONCURRENCY,
+  WORKER_THREADS,
   VIDEO_RATE_TYPES,
   JOB_PRIORITY,
   VIDEO_TRANSCODING_FPS,
index 01046d0179324853833a96e1188ed63792d9ff1b..e3b04c09467b23123e66f4bcbe820578bab972be 100644 (file)
@@ -1,4 +1,3 @@
-import { queue } from 'async'
 import { remove } from 'fs-extra'
 import LRUCache from 'lru-cache'
 import { join } from 'path'
@@ -8,13 +7,13 @@ import { buildUUID } from '@shared/extra-utils'
 import { ActivityPubActorType, ActorImageType } from '@shared/models'
 import { retryTransactionWrapper } from '../helpers/database-utils'
 import { processImage } from '../helpers/image-utils'
-import { downloadImage } from '../helpers/requests'
 import { CONFIG } from '../initializers/config'
-import { ACTOR_IMAGES_SIZE, LRU_CACHE, QUEUE_CONCURRENCY, WEBSERVER } from '../initializers/constants'
+import { ACTOR_IMAGES_SIZE, LRU_CACHE, WEBSERVER } from '../initializers/constants'
 import { sequelizeTypescript } from '../initializers/database'
 import { MAccountDefault, MActor, MChannelDefault } from '../types/models'
 import { deleteActorImages, updateActorImages } from './activitypub/actors'
 import { sendUpdateActor } from './activitypub/send'
+import { downloadImageFromWorker } from './worker/parent-process'
 
 function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) {
   return new ActorModel({
@@ -87,27 +86,22 @@ async function deleteLocalActorImageFile (accountOrChannel: MAccountDefault | MC
   })
 }
 
-type DownloadImageQueueTask = {
+// ---------------------------------------------------------------------------
+
+function downloadActorImageFromWorker (options: {
   fileUrl: string
   filename: string
   type: ActorImageType
   size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0]
-}
-
-const downloadImageQueue = queue<DownloadImageQueueTask, Error>((task, cb) => {
-  downloadImage(task.fileUrl, CONFIG.STORAGE.ACTOR_IMAGES, task.filename, task.size)
-    .then(() => cb())
-    .catch(err => cb(err))
-}, QUEUE_CONCURRENCY.ACTOR_PROCESS_IMAGE)
-
-function pushActorImageProcessInQueue (task: DownloadImageQueueTask) {
-  return new Promise<void>((res, rej) => {
-    downloadImageQueue.push(task, err => {
-      if (err) return rej(err)
+}) {
+  const downloaderOptions = {
+    url: options.fileUrl,
+    destDir: CONFIG.STORAGE.ACTOR_IMAGES,
+    destName: options.filename,
+    size: options.size
+  }
 
-      return res()
-    })
-  })
+  return downloadImageFromWorker(downloaderOptions)
 }
 
 // Unsafe so could returns paths that does not exist anymore
@@ -116,7 +110,8 @@ const actorImagePathUnsafeCache = new LRUCache<string, string>({ max: LRU_CACHE.
 export {
   actorImagePathUnsafeCache,
   updateLocalActorImageFiles,
+  downloadActorImageFromWorker,
   deleteLocalActorImageFile,
-  pushActorImageProcessInQueue,
+  downloadImageFromWorker,
   buildActorInstance
 }
index aa2d7a8132923a77e0b0a1eaa9f34c0d861e0c3e..f00c876238d245686426dfeb8c9efb046256fab8 100644 (file)
@@ -1,13 +1,13 @@
 import { join } from 'path'
 import { ThumbnailType } from '@shared/models'
 import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils'
-import { downloadImage } from '../helpers/requests'
 import { CONFIG } from '../initializers/config'
 import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants'
 import { ThumbnailModel } from '../models/video/thumbnail'
 import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models'
 import { MThumbnail } from '../types/models/video/thumbnail'
 import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist'
+import { downloadImageFromWorker } from './local-actor'
 import { VideoPathManager } from './video-path-manager'
 
 type ImageSize = { height?: number, width?: number }
@@ -49,7 +49,10 @@ function updatePlaylistMiniatureFromUrl (options: {
     ? null
     : downloadUrl
 
-  const thumbnailCreator = () => downloadImage(downloadUrl, basePath, filename, { width, height })
+  const thumbnailCreator = () => {
+    return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } })
+  }
+
   return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl })
 }
 
@@ -75,7 +78,9 @@ function updateVideoMiniatureFromUrl (options: {
     : existingThumbnail.filename
 
   const thumbnailCreator = () => {
-    if (thumbnailUrlChanged) return downloadImage(downloadUrl, basePath, filename, { width, height })
+    if (thumbnailUrlChanged) {
+      return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } })
+    }
 
     return Promise.resolve()
   }
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts
new file mode 100644 (file)
index 0000000..18dabd9
--- /dev/null
@@ -0,0 +1,18 @@
+import { join } from 'path'
+import Piscina from 'piscina'
+import { WORKER_THREADS } from '@server/initializers/constants'
+import { downloadImage } from './workers/image-downloader'
+
+const downloadImagerWorker = new Piscina({
+  filename: join(__dirname, 'workers', 'image-downloader.js'),
+  concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY,
+  maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS
+})
+
+function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> {
+  return downloadImagerWorker.run(options)
+}
+
+export {
+  downloadImageFromWorker
+}
diff --git a/server/lib/worker/workers/image-downloader.ts b/server/lib/worker/workers/image-downloader.ts
new file mode 100644 (file)
index 0000000..8d4a6b3
--- /dev/null
@@ -0,0 +1,33 @@
+import { remove } from 'fs-extra'
+import { join } from 'path'
+import { processImage } from '@server/helpers/image-utils'
+import { doRequestAndSaveToFile } from '@server/helpers/requests'
+import { CONFIG } from '@server/initializers/config'
+
+async function downloadImage (options: {
+  url: string
+  destDir: string
+  destName: string
+  size: { width: number, height: number }
+}) {
+  const { url, destDir, destName, size } = options
+
+  const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName)
+  await doRequestAndSaveToFile(url, tmpPath)
+
+  const destPath = join(destDir, destName)
+
+  try {
+    await processImage(tmpPath, destPath, size)
+  } catch (err) {
+    await remove(tmpPath)
+
+    throw err
+  }
+}
+
+module.exports = downloadImage
+
+export {
+  downloadImage
+}
index e8386afe3b93ef62efe674fbe88bafe7946b3777..a0519d54ed80be5f87d5e7d8a030672df07fb23f 100644 (file)
--- a/yarn.lock
+++ b/yarn.lock
     ajv-draft-04 "^1.0.0"
     call-me-maybe "^1.0.1"
 
+"@assemblyscript/loader@^0.10.1":
+  version "0.10.1"
+  resolved "https://registry.yarnpkg.com/@assemblyscript/loader/-/loader-0.10.1.tgz#70e45678f06c72fa2e350e8553ec4a4d72b92e06"
+  integrity sha512-H71nDOOL8Y7kWRLqf6Sums+01Q5msqBW2KhDUTemh1tvY04eSkSXrK0uj/4mmY0Xr16/3zyZmsrxN7CKuRbNRg==
+
 "@assemblyscript/loader@^0.19.21":
   version "0.19.23"
   resolved "https://registry.yarnpkg.com/@assemblyscript/loader/-/loader-0.19.23.tgz#7fccae28d0a2692869f1d1219d36093bc24d5e72"
@@ -4357,6 +4362,11 @@ event-target-shim@^5.0.0:
   resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789"
   integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==
 
+eventemitter-asyncresource@^1.0.0:
+  version "1.0.0"
+  resolved "https://registry.yarnpkg.com/eventemitter-asyncresource/-/eventemitter-asyncresource-1.0.0.tgz#734ff2e44bf448e627f7748f905d6bdd57bdb65b"
+  integrity sha512-39F7TBIV0G7gTelxwbEqnwhp90eqCPON1k0NwNfwhgKn4Co4ybUbj2pECcXT0B3ztRKZ7Pw1JujUUgmQJHcVAQ==
+
 events@3.3.0, events@^3.3.0:
   version "3.3.0"
   resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400"
@@ -4985,6 +4995,15 @@ has@^1.0.3:
   dependencies:
     function-bind "^1.1.1"
 
+hdr-histogram-js@^2.0.1:
+  version "2.0.3"
+  resolved "https://registry.yarnpkg.com/hdr-histogram-js/-/hdr-histogram-js-2.0.3.tgz#0b860534655722b6e3f3e7dca7b78867cf43dcb5"
+  integrity sha512-Hkn78wwzWHNCp2uarhzQ2SGFLU3JY8SBDDd3TAABK4fc30wm+MuPOrg5QVFVfkKOQd6Bfz3ukJEI+q9sXEkK1g==
+  dependencies:
+    "@assemblyscript/loader" "^0.10.1"
+    base64-js "^1.2.0"
+    pako "^1.0.3"
+
 hdr-histogram-js@^3.0.0:
   version "3.0.0"
   resolved "https://registry.yarnpkg.com/hdr-histogram-js/-/hdr-histogram-js-3.0.0.tgz#8e2d9a68e3313147804c47d85a9c22a93f85e24b"
@@ -6477,7 +6496,15 @@ next-tick@1, next-tick@^1.1.0:
   resolved "https://registry.yarnpkg.com/next-tick/-/next-tick-1.1.0.tgz#1836ee30ad56d67ef281b22bd199f709449b35eb"
   integrity sha512-CXdUiJembsNjuToQvxayPZF9Vqht7hewsvy2sOWafLvi2awflj9mOC6bHIg50orX8IJvWKY9wYQ/zB2kogPslQ==
 
-node-addon-api@^3.1.0:
+nice-napi@^1.0.2:
+  version "1.0.2"
+  resolved "https://registry.yarnpkg.com/nice-napi/-/nice-napi-1.0.2.tgz#dc0ab5a1eac20ce548802fc5686eaa6bc654927b"
+  integrity sha512-px/KnJAJZf5RuBGcfD+Sp2pAKq0ytz8j+1NehvgIGFkvtvFrDM3T8E4x/JJODXK9WZow8RRGrbA9QQ3hs+pDhA==
+  dependencies:
+    node-addon-api "^3.0.0"
+    node-gyp-build "^4.2.2"
+
+node-addon-api@^3.0.0, node-addon-api@^3.1.0:
   version "3.2.1"
   resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-3.2.1.tgz#81325e0a2117789c0128dab65e7e38f07ceba161"
   integrity sha512-mmcei9JghVNDYydghQmeDX8KoAm0FAiYyIcUt/N4nhyAipB17pllZQDOJD2fotxABnt4Mdz+dKTO7eftLg4d0A==
@@ -6513,7 +6540,7 @@ node-gyp-build-optional-packages@5.0.2:
   resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.2.tgz#3de7d30bd1f9057b5dfbaeab4a4442b7fe9c5901"
   integrity sha512-PiN4NWmlQPqvbEFcH/omQsswWQbe5Z9YK/zdB23irp5j2XibaA2IrGvpSWmVVG4qMZdmPdwPctSy4a86rOMn6g==
 
-node-gyp-build@^4.2.0, node-gyp-build@^4.3.0:
+node-gyp-build@^4.2.0, node-gyp-build@^4.2.2, node-gyp-build@^4.3.0:
   version "4.4.0"
   resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.4.0.tgz#42e99687ce87ddeaf3a10b99dc06abc11021f3f4"
   integrity sha512-amJnQCcgtRVw9SvoebO3BKGESClrfXGCUTX9hSn1OuGQTQBOZmVd0Z0OlecpuRksKvbsUqALE8jls/ErClAPuQ==
@@ -7033,6 +7060,17 @@ pify@^4.0.1:
   resolved "https://registry.yarnpkg.com/pify/-/pify-4.0.1.tgz#4b2cd25c50d598735c50292224fd8c6df41e3231"
   integrity sha512-uB80kBFb/tfd68bVleG9T5GGsGPjJrLAUpR5PZIrhBnIaRTQRjqdJSsIKkOP6OAIFbj7GOrcudc5pNjZ+geV2g==
 
+piscina@^3.2.0:
+  version "3.2.0"
+  resolved "https://registry.yarnpkg.com/piscina/-/piscina-3.2.0.tgz#f5a1dde0c05567775690cccefe59d9223924d154"
+  integrity sha512-yn/jMdHRw+q2ZJhFhyqsmANcbF6V2QwmD84c6xRau+QpQOmtrBCoRGdvTfeuFDYXB5W2m6MfLkjkvQa9lUSmIA==
+  dependencies:
+    eventemitter-asyncresource "^1.0.0"
+    hdr-histogram-js "^2.0.1"
+    hdr-histogram-percentiles-obj "^3.0.0"
+  optionalDependencies:
+    nice-napi "^1.0.2"
+
 pixelmatch@^4.0.2:
   version "4.0.2"
   resolved "https://registry.yarnpkg.com/pixelmatch/-/pixelmatch-4.0.2.tgz#8f47dcec5011b477b67db03c243bc1f3085e8854"