From 156cdbac227b5501116a66063812b9fc9b0e89c7 Mon Sep 17 00:00:00 2001 From: kontrollanten <6680299+kontrollanten@users.noreply.github.com> Date: Tue, 19 Apr 2022 15:22:18 +0200 Subject: object-storage: @aws-sdk/lib-storage for multipart (#4903) * object-storage: @aws-sdk/lib-storage for multipart * gitignore: add .DS_Store * test(object-storage): remove only * test(object-storage/multipart): generate video * fix lint issue * test(obj-storage/video): ensure file size * Styling Co-authored-by: Chocobozzz --- .../shared/object-storage-helpers.ts | 89 +++++----------------- 1 file changed, 19 insertions(+), 70 deletions(-) (limited to 'server/lib/object-storage') diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index ecb82856e..a2de92532 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts @@ -1,19 +1,14 @@ -import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' -import { min } from 'lodash' +import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' import { dirname } from 'path' import { Readable } from 'stream' import { - CompletedPart, - CompleteMultipartUploadCommand, - CreateMultipartUploadCommand, - CreateMultipartUploadCommandInput, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, - PutObjectCommandInput, - UploadPartCommand + PutObjectCommandInput } from '@aws-sdk/client-s3' +import { Upload } from '@aws-sdk/lib-storage' import { pipelinePromise } from '@server/helpers/core-utils' import { isArray } from '@server/helpers/custom-validators/misc' import { logger } from '@server/helpers/logger' @@ -37,13 +32,12 @@ async function storeObject (options: { logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) const stats = await stat(inputPath) + const fileStream = createReadStream(inputPath) - // If bigger than max allowed size we do a multipart upload if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { - return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) + return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) } - const fileStream = createReadStream(inputPath) return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) } @@ -163,18 +157,14 @@ async function objectStoragePut (options: { } async function multiPartUpload (options: { - inputPath: string + content: ReadStream objectStorageKey: string bucketInfo: BucketInfo }) { - const { objectStorageKey, inputPath, bucketInfo } = options + const { content, objectStorageKey, bucketInfo } = options - const key = buildKey(objectStorageKey, bucketInfo) - const s3Client = getClient() - - const statResult = await stat(inputPath) - - const input: CreateMultipartUploadCommandInput = { + const input: PutObjectCommandInput = { + Body: content, Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(objectStorageKey, bucketInfo) } @@ -183,60 +173,19 @@ async function multiPartUpload (options: { input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL } - const createMultipartCommand = new CreateMultipartUploadCommand(input) - const createResponse = await s3Client.send(createMultipartCommand) - - const fd = await open(inputPath, 'r') - let partNumber = 1 - const parts: CompletedPart[] = [] - const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART - - for (let start = 0; start < statResult.size; start += partSize) { - logger.debug( - 'Uploading part %d of file to %s%s in bucket %s', - partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() - ) - - // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released - // The s3 sdk needs to know the length of the http body beforehand, but doesn't support - // streams with start and end set, so it just tries to stat the file in stream.path. - // This fails for us because we only want to send part of the file. The stream type - // is modified so we can set the byteLength here, which s3 detects because array buffers - // have this field set - const stream: ReadStream & { byteLength: number } = - createReadStream( - inputPath, - { fd, autoClose: false, start, end: (start + partSize) - 1 } - ) as ReadStream & { byteLength: number } - - // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength - stream.byteLength = min([ statResult.size - start, partSize ]) - - const uploadPartCommand = new UploadPartCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: key, - UploadId: createResponse.UploadId, - PartNumber: partNumber, - Body: stream - }) - const uploadResponse = await s3Client.send(uploadPartCommand) - - parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) - partNumber += 1 - } - await close(fd) - - const completeUploadCommand = new CompleteMultipartUploadCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: key, - UploadId: createResponse.UploadId, - MultipartUpload: { Parts: parts } + const parallelUploads3 = new Upload({ + client: getClient(), + queueSize: 4, + partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, + leavePartsOnError: false, + params: input }) - await s3Client.send(completeUploadCommand) + + await parallelUploads3.done() logger.debug( - 'Completed %s%s in bucket %s in %d parts', - bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() + 'Completed %s%s in bucket %s', + bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() ) return getPrivateUrl(bucketInfo, objectStorageKey) -- cgit v1.2.3