aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-06-24 16:31:32 +0200
committerChocobozzz <me@florianbigard.com>2022-06-24 16:31:48 +0200
commitc53853ca1b8e32aea5259d436d3d284b9d178919 (patch)
treedfb081e7fb471ca4b75fd67eabd3216cc6b33129 /server/lib
parent2873f00bd89d8f1b5f88614415f8142a5c2065c3 (diff)
downloadPeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.tar.gz
PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.tar.zst
PeerTube-c53853ca1b8e32aea5259d436d3d284b9d178919.zip
Introduce worker threads to process remote images
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/local-actor.ts35
-rw-r--r--server/lib/thumbnail.ts11
-rw-r--r--server/lib/worker/parent-process.ts18
-rw-r--r--server/lib/worker/workers/image-downloader.ts33
4 files changed, 74 insertions, 23 deletions
diff --git a/server/lib/local-actor.ts b/server/lib/local-actor.ts
index 01046d017..e3b04c094 100644
--- a/server/lib/local-actor.ts
+++ b/server/lib/local-actor.ts
@@ -1,4 +1,3 @@
1import { queue } from 'async'
2import { remove } from 'fs-extra' 1import { remove } from 'fs-extra'
3import LRUCache from 'lru-cache' 2import LRUCache from 'lru-cache'
4import { join } from 'path' 3import { join } from 'path'
@@ -8,13 +7,13 @@ import { buildUUID } from '@shared/extra-utils'
8import { ActivityPubActorType, ActorImageType } from '@shared/models' 7import { ActivityPubActorType, ActorImageType } from '@shared/models'
9import { retryTransactionWrapper } from '../helpers/database-utils' 8import { retryTransactionWrapper } from '../helpers/database-utils'
10import { processImage } from '../helpers/image-utils' 9import { processImage } from '../helpers/image-utils'
11import { downloadImage } from '../helpers/requests'
12import { CONFIG } from '../initializers/config' 10import { CONFIG } from '../initializers/config'
13import { ACTOR_IMAGES_SIZE, LRU_CACHE, QUEUE_CONCURRENCY, WEBSERVER } from '../initializers/constants' 11import { ACTOR_IMAGES_SIZE, LRU_CACHE, WEBSERVER } from '../initializers/constants'
14import { sequelizeTypescript } from '../initializers/database' 12import { sequelizeTypescript } from '../initializers/database'
15import { MAccountDefault, MActor, MChannelDefault } from '../types/models' 13import { MAccountDefault, MActor, MChannelDefault } from '../types/models'
16import { deleteActorImages, updateActorImages } from './activitypub/actors' 14import { deleteActorImages, updateActorImages } from './activitypub/actors'
17import { sendUpdateActor } from './activitypub/send' 15import { sendUpdateActor } from './activitypub/send'
16import { downloadImageFromWorker } from './worker/parent-process'
18 17
19function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) { 18function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) {
20 return new ActorModel({ 19 return new ActorModel({
@@ -87,27 +86,22 @@ async function deleteLocalActorImageFile (accountOrChannel: MAccountDefault | MC
87 }) 86 })
88} 87}
89 88
90type DownloadImageQueueTask = { 89// ---------------------------------------------------------------------------
90
91function downloadActorImageFromWorker (options: {
91 fileUrl: string 92 fileUrl: string
92 filename: string 93 filename: string
93 type: ActorImageType 94 type: ActorImageType
94 size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0] 95 size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0]
95} 96}) {
96 97 const downloaderOptions = {
97const downloadImageQueue = queue<DownloadImageQueueTask, Error>((task, cb) => { 98 url: options.fileUrl,
98 downloadImage(task.fileUrl, CONFIG.STORAGE.ACTOR_IMAGES, task.filename, task.size) 99 destDir: CONFIG.STORAGE.ACTOR_IMAGES,
99 .then(() => cb()) 100 destName: options.filename,
100 .catch(err => cb(err)) 101 size: options.size
101}, QUEUE_CONCURRENCY.ACTOR_PROCESS_IMAGE) 102 }
102
103function pushActorImageProcessInQueue (task: DownloadImageQueueTask) {
104 return new Promise<void>((res, rej) => {
105 downloadImageQueue.push(task, err => {
106 if (err) return rej(err)
107 103
108 return res() 104 return downloadImageFromWorker(downloaderOptions)
109 })
110 })
111} 105}
112 106
113// Unsafe so could returns paths that does not exist anymore 107// Unsafe so could returns paths that does not exist anymore
@@ -116,7 +110,8 @@ const actorImagePathUnsafeCache = new LRUCache<string, string>({ max: LRU_CACHE.
116export { 110export {
117 actorImagePathUnsafeCache, 111 actorImagePathUnsafeCache,
118 updateLocalActorImageFiles, 112 updateLocalActorImageFiles,
113 downloadActorImageFromWorker,
119 deleteLocalActorImageFile, 114 deleteLocalActorImageFile,
120 pushActorImageProcessInQueue, 115 downloadImageFromWorker,
121 buildActorInstance 116 buildActorInstance
122} 117}
diff --git a/server/lib/thumbnail.ts b/server/lib/thumbnail.ts
index aa2d7a813..f00c87623 100644
--- a/server/lib/thumbnail.ts
+++ b/server/lib/thumbnail.ts
@@ -1,13 +1,13 @@
1import { join } from 'path' 1import { join } from 'path'
2import { ThumbnailType } from '@shared/models' 2import { ThumbnailType } from '@shared/models'
3import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils' 3import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils'
4import { downloadImage } from '../helpers/requests'
5import { CONFIG } from '../initializers/config' 4import { CONFIG } from '../initializers/config'
6import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants' 5import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants'
7import { ThumbnailModel } from '../models/video/thumbnail' 6import { ThumbnailModel } from '../models/video/thumbnail'
8import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models' 7import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models'
9import { MThumbnail } from '../types/models/video/thumbnail' 8import { MThumbnail } from '../types/models/video/thumbnail'
10import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist' 9import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist'
10import { downloadImageFromWorker } from './local-actor'
11import { VideoPathManager } from './video-path-manager' 11import { VideoPathManager } from './video-path-manager'
12 12
13type ImageSize = { height?: number, width?: number } 13type ImageSize = { height?: number, width?: number }
@@ -49,7 +49,10 @@ function updatePlaylistMiniatureFromUrl (options: {
49 ? null 49 ? null
50 : downloadUrl 50 : downloadUrl
51 51
52 const thumbnailCreator = () => downloadImage(downloadUrl, basePath, filename, { width, height }) 52 const thumbnailCreator = () => {
53 return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } })
54 }
55
53 return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl }) 56 return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl })
54} 57}
55 58
@@ -75,7 +78,9 @@ function updateVideoMiniatureFromUrl (options: {
75 : existingThumbnail.filename 78 : existingThumbnail.filename
76 79
77 const thumbnailCreator = () => { 80 const thumbnailCreator = () => {
78 if (thumbnailUrlChanged) return downloadImage(downloadUrl, basePath, filename, { width, height }) 81 if (thumbnailUrlChanged) {
82 return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } })
83 }
79 84
80 return Promise.resolve() 85 return Promise.resolve()
81 } 86 }
diff --git a/server/lib/worker/parent-process.ts b/server/lib/worker/parent-process.ts
new file mode 100644
index 000000000..18dabd97f
--- /dev/null
+++ b/server/lib/worker/parent-process.ts
@@ -0,0 +1,18 @@
1import { join } from 'path'
2import Piscina from 'piscina'
3import { WORKER_THREADS } from '@server/initializers/constants'
4import { downloadImage } from './workers/image-downloader'
5
6const downloadImagerWorker = new Piscina({
7 filename: join(__dirname, 'workers', 'image-downloader.js'),
8 concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY,
9 maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS
10})
11
12function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> {
13 return downloadImagerWorker.run(options)
14}
15
16export {
17 downloadImageFromWorker
18}
diff --git a/server/lib/worker/workers/image-downloader.ts b/server/lib/worker/workers/image-downloader.ts
new file mode 100644
index 000000000..8d4a6b37e
--- /dev/null
+++ b/server/lib/worker/workers/image-downloader.ts
@@ -0,0 +1,33 @@
1import { remove } from 'fs-extra'
2import { join } from 'path'
3import { processImage } from '@server/helpers/image-utils'
4import { doRequestAndSaveToFile } from '@server/helpers/requests'
5import { CONFIG } from '@server/initializers/config'
6
7async function downloadImage (options: {
8 url: string
9 destDir: string
10 destName: string
11 size: { width: number, height: number }
12}) {
13 const { url, destDir, destName, size } = options
14
15 const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName)
16 await doRequestAndSaveToFile(url, tmpPath)
17
18 const destPath = join(destDir, destName)
19
20 try {
21 await processImage(tmpPath, destPath, size)
22 } catch (err) {
23 await remove(tmpPath)
24
25 throw err
26 }
27}
28
29module.exports = downloadImage
30
31export {
32 downloadImage
33}