-import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
-import { min } from 'lodash'
+import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
import { dirname } from 'path'
import { Readable } from 'stream'
import {
- CompletedPart,
- CompleteMultipartUploadCommand,
- CreateMultipartUploadCommand,
DeleteObjectCommand,
GetObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
- UploadPartCommand
+ PutObjectCommandInput
} from '@aws-sdk/client-s3'
+import { Upload } from '@aws-sdk/lib-storage'
import { pipelinePromise } from '@server/helpers/core-utils'
import { isArray } from '@server/helpers/custom-validators/misc'
import { logger } from '@server/helpers/logger'
logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
const stats = await stat(inputPath)
+ const fileStream = createReadStream(inputPath)
- // If bigger than max allowed size we do a multipart upload
if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
- return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
+ return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
}
- const fileStream = createReadStream(inputPath)
return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
}
}) {
const { objectStorageKey, content, bucketInfo } = options
- const command = new PutObjectCommand({
+ const input: PutObjectCommandInput = {
Bucket: bucketInfo.BUCKET_NAME,
Key: buildKey(objectStorageKey, bucketInfo),
- Body: content,
- ACL: 'public-read'
- })
+ Body: content
+ }
+
+ if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
+ input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
+ }
+
+ const command = new PutObjectCommand(input)
await getClient().send(command)
}
async function multiPartUpload (options: {
- inputPath: string
+ content: ReadStream
objectStorageKey: string
bucketInfo: BucketInfo
}) {
- const { objectStorageKey, inputPath, bucketInfo } = options
+ const { content, objectStorageKey, bucketInfo } = options
- const key = buildKey(objectStorageKey, bucketInfo)
- const s3Client = getClient()
-
- const statResult = await stat(inputPath)
-
- const createMultipartCommand = new CreateMultipartUploadCommand({
+ const input: PutObjectCommandInput = {
+ Body: content,
Bucket: bucketInfo.BUCKET_NAME,
- Key: key,
- ACL: 'public-read'
- })
- const createResponse = await s3Client.send(createMultipartCommand)
-
- const fd = await open(inputPath, 'r')
- let partNumber = 1
- const parts: CompletedPart[] = []
- const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
-
- for (let start = 0; start < statResult.size; start += partSize) {
- logger.debug(
- 'Uploading part %d of file to %s%s in bucket %s',
- partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
- )
-
- // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
- // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
- // streams with start and end set, so it just tries to stat the file in stream.path.
- // This fails for us because we only want to send part of the file. The stream type
- // is modified so we can set the byteLength here, which s3 detects because array buffers
- // have this field set
- const stream: ReadStream & { byteLength: number } =
- createReadStream(
- inputPath,
- { fd, autoClose: false, start, end: (start + partSize) - 1 }
- ) as ReadStream & { byteLength: number }
-
- // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
- stream.byteLength = min([ statResult.size - start, partSize ])
-
- const uploadPartCommand = new UploadPartCommand({
- Bucket: bucketInfo.BUCKET_NAME,
- Key: key,
- UploadId: createResponse.UploadId,
- PartNumber: partNumber,
- Body: stream
- })
- const uploadResponse = await s3Client.send(uploadPartCommand)
+ Key: buildKey(objectStorageKey, bucketInfo)
+ }
- parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
- partNumber += 1
+ if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
+ input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
}
- await close(fd)
- const completeUploadCommand = new CompleteMultipartUploadCommand({
- Bucket: bucketInfo.BUCKET_NAME,
- Key: key,
- UploadId: createResponse.UploadId,
- MultipartUpload: { Parts: parts }
+ const parallelUploads3 = new Upload({
+ client: getClient(),
+ queueSize: 4,
+ partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
+ leavePartsOnError: false,
+ params: input
})
- await s3Client.send(completeUploadCommand)
+
+ await parallelUploads3.done()
logger.debug(
- 'Completed %s%s in bucket %s in %d parts',
- bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
+ 'Completed %s%s in bucket %s',
+ bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
)
return getPrivateUrl(bucketInfo, objectStorageKey)