]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/object-storage/shared/object-storage-helpers.ts
object-storage: @aws-sdk/lib-storage for multipart (#4903)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
index ecb82856e7da4348cdefae7484cf2b830bca4470..a2de9253290b57208df8742fdf9677e43150c4ad 100644 (file)
@@ -1,19 +1,14 @@
-import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
-import { min } from 'lodash'
+import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
 import { dirname } from 'path'
 import { Readable } from 'stream'
 import {
-  CompletedPart,
-  CompleteMultipartUploadCommand,
-  CreateMultipartUploadCommand,
-  CreateMultipartUploadCommandInput,
   DeleteObjectCommand,
   GetObjectCommand,
   ListObjectsV2Command,
   PutObjectCommand,
-  PutObjectCommandInput,
-  UploadPartCommand
+  PutObjectCommandInput
 } from '@aws-sdk/client-s3'
+import { Upload } from '@aws-sdk/lib-storage'
 import { pipelinePromise } from '@server/helpers/core-utils'
 import { isArray } from '@server/helpers/custom-validators/misc'
 import { logger } from '@server/helpers/logger'
@@ -37,13 +32,12 @@ async function storeObject (options: {
   logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
 
   const stats = await stat(inputPath)
+  const fileStream = createReadStream(inputPath)
 
-  // If bigger than max allowed size we do a multipart upload
   if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
-    return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
+    return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
   }
 
-  const fileStream = createReadStream(inputPath)
   return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
 }
 
@@ -163,18 +157,14 @@ async function objectStoragePut (options: {
 }
 
 async function multiPartUpload (options: {
-  inputPath: string
+  content: ReadStream
   objectStorageKey: string
   bucketInfo: BucketInfo
 }) {
-  const { objectStorageKey, inputPath, bucketInfo } = options
+  const { content, objectStorageKey, bucketInfo } = options
 
-  const key = buildKey(objectStorageKey, bucketInfo)
-  const s3Client = getClient()
-
-  const statResult = await stat(inputPath)
-
-  const input: CreateMultipartUploadCommandInput = {
+  const input: PutObjectCommandInput = {
+    Body: content,
     Bucket: bucketInfo.BUCKET_NAME,
     Key: buildKey(objectStorageKey, bucketInfo)
   }
@@ -183,60 +173,19 @@ async function multiPartUpload (options: {
     input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
   }
 
-  const createMultipartCommand = new CreateMultipartUploadCommand(input)
-  const createResponse = await s3Client.send(createMultipartCommand)
-
-  const fd = await open(inputPath, 'r')
-  let partNumber = 1
-  const parts: CompletedPart[] = []
-  const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
-
-  for (let start = 0; start < statResult.size; start += partSize) {
-    logger.debug(
-      'Uploading part %d of file to %s%s in bucket %s',
-      partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
-    )
-
-    // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
-    // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
-    // streams with start and end set, so it just tries to stat the file in stream.path.
-    // This fails for us because we only want to send part of the file. The stream type
-    // is modified so we can set the byteLength here, which s3 detects because array buffers
-    // have this field set
-    const stream: ReadStream & { byteLength: number } =
-      createReadStream(
-        inputPath,
-        { fd, autoClose: false, start, end: (start + partSize) - 1 }
-      ) as ReadStream & { byteLength: number }
-
-    // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
-    stream.byteLength = min([ statResult.size - start, partSize ])
-
-    const uploadPartCommand = new UploadPartCommand({
-      Bucket: bucketInfo.BUCKET_NAME,
-      Key: key,
-      UploadId: createResponse.UploadId,
-      PartNumber: partNumber,
-      Body: stream
-    })
-    const uploadResponse = await s3Client.send(uploadPartCommand)
-
-    parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
-    partNumber += 1
-  }
-  await close(fd)
-
-  const completeUploadCommand = new CompleteMultipartUploadCommand({
-    Bucket: bucketInfo.BUCKET_NAME,
-    Key: key,
-    UploadId: createResponse.UploadId,
-    MultipartUpload: { Parts: parts }
+  const parallelUploads3 = new Upload({
+    client: getClient(),
+    queueSize: 4,
+    partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
+    leavePartsOnError: false,
+    params: input
   })
-  await s3Client.send(completeUploadCommand)
+
+  await parallelUploads3.done()
 
   logger.debug(
-    'Completed %s%s in bucket %s in %d parts',
-    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
+    'Completed %s%s in bucket %s',
+    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
   )
 
   return getPrivateUrl(bucketInfo, objectStorageKey)