-import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
-import { min } from 'lodash'
+import { map } from 'bluebird'
+import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
import { dirname } from 'path'
import { Readable } from 'stream'
import {
- CompletedPart,
- CompleteMultipartUploadCommand,
- CreateMultipartUploadCommand,
+ _Object,
+ CompleteMultipartUploadCommandOutput,
DeleteObjectCommand,
GetObjectCommand,
ListObjectsV2Command,
- PutObjectCommand,
- UploadPartCommand
+ PutObjectAclCommand,
+ PutObjectCommandInput,
+ S3Client
} from '@aws-sdk/client-s3'
+import { Upload } from '@aws-sdk/lib-storage'
import { pipelinePromise } from '@server/helpers/core-utils'
import { isArray } from '@server/helpers/custom-validators/misc'
import { logger } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
-import { getPrivateUrl } from '../urls'
+import { getInternalUrl } from '../urls'
import { getClient } from './client'
import { lTags } from './logger'
PREFIX?: string
}
+async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
+ const s3Client = getClient()
+
+ const commandPrefix = bucketInfo.PREFIX + prefix
+ const listCommand = new ListObjectsV2Command({
+ Bucket: bucketInfo.BUCKET_NAME,
+ Prefix: commandPrefix
+ })
+
+ const listedObjects = await s3Client.send(listCommand)
+
+ if (isArray(listedObjects.Contents) !== true) return []
+
+ return listedObjects.Contents.map(c => c.Key)
+}
+
+// ---------------------------------------------------------------------------
+
async function storeObject (options: {
inputPath: string
objectStorageKey: string
bucketInfo: BucketInfo
+ isPrivate: boolean
}): Promise<string> {
- const { inputPath, objectStorageKey, bucketInfo } = options
+ const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
- const stats = await stat(inputPath)
+ const fileStream = createReadStream(inputPath)
- // If bigger than max allowed size we do a multipart upload
- if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
- return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
- }
+ return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
+}
- const fileStream = createReadStream(inputPath)
- return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
+async function storeContent (options: {
+ content: string
+ inputPath: string
+ objectStorageKey: string
+ bucketInfo: BucketInfo
+ isPrivate: boolean
+}): Promise<string> {
+ const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+ logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+ return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
}
-async function removeObject (filename: string, bucketInfo: BucketInfo) {
- const command = new DeleteObjectCommand({
+// ---------------------------------------------------------------------------
+
+async function updateObjectACL (options: {
+ objectStorageKey: string
+ bucketInfo: BucketInfo
+ isPrivate: boolean
+}) {
+ const { objectStorageKey, bucketInfo, isPrivate } = options
+
+ const acl = getACL(isPrivate)
+ if (!acl) return
+
+ const key = buildKey(objectStorageKey, bucketInfo)
+
+ logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
+
+ const command = new PutObjectAclCommand({
Bucket: bucketInfo.BUCKET_NAME,
- Key: buildKey(filename, bucketInfo)
+ Key: key,
+ ACL: acl
})
- return getClient().send(command)
+ await getClient().send(command)
}
-async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
- const s3Client = getClient()
+function updatePrefixACL (options: {
+ prefix: string
+ bucketInfo: BucketInfo
+ isPrivate: boolean
+}) {
+ const { prefix, bucketInfo, isPrivate } = options
- const commandPrefix = bucketInfo.PREFIX + prefix
- const listCommand = new ListObjectsV2Command({
- Bucket: bucketInfo.BUCKET_NAME,
- Prefix: commandPrefix
- })
+ const acl = getACL(isPrivate)
+ if (!acl) return
- const listedObjects = await s3Client.send(listCommand)
+ logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
- // FIXME: use bulk delete when s3ninja will support this operation
- // const deleteParams = {
- // Bucket: bucketInfo.BUCKET_NAME,
- // Delete: { Objects: [] }
- // }
+ return applyOnPrefix({
+ prefix,
+ bucketInfo,
+ commandBuilder: obj => {
+ logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
- if (isArray(listedObjects.Contents) !== true) {
- const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+ return new PutObjectAclCommand({
+ Bucket: bucketInfo.BUCKET_NAME,
+ Key: obj.Key,
+ ACL: acl
+ })
+ }
+ })
+}
- logger.error(message, { response: listedObjects, ...lTags() })
- throw new Error(message)
- }
+// ---------------------------------------------------------------------------
- for (const object of listedObjects.Contents) {
- const command = new DeleteObjectCommand({
- Bucket: bucketInfo.BUCKET_NAME,
- Key: object.Key
- })
+function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
+ const key = buildKey(objectStorageKey, bucketInfo)
- await s3Client.send(command)
+ return removeObjectByFullKey(key, bucketInfo)
+}
- // FIXME: use bulk delete when s3ninja will support this operation
- // deleteParams.Delete.Objects.push({ Key: object.Key })
- }
+function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
+ logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
+
+ const command = new DeleteObjectCommand({
+ Bucket: bucketInfo.BUCKET_NAME,
+ Key: fullKey
+ })
+
+ return getClient().send(command)
+}
+async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
// FIXME: use bulk delete when s3ninja will support this operation
- // const deleteCommand = new DeleteObjectsCommand(deleteParams)
- // await s3Client.send(deleteCommand)
- // Repeat if not all objects could be listed at once (limit of 1000?)
- if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
+ logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
+
+ return applyOnPrefix({
+ prefix,
+ bucketInfo,
+ commandBuilder: obj => {
+ logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
+ return new DeleteObjectCommand({
+ Bucket: bucketInfo.BUCKET_NAME,
+ Key: obj.Key
+ })
+ }
+ })
}
+// ---------------------------------------------------------------------------
+
async function makeAvailable (options: {
key: string
destination: string
// ---------------------------------------------------------------------------
+async function createObjectReadStream (options: {
+ key: string
+ bucketInfo: BucketInfo
+ rangeHeader: string
+}) {
+ const { key, bucketInfo, rangeHeader } = options
+
+ const command = new GetObjectCommand({
+ Bucket: bucketInfo.BUCKET_NAME,
+ Key: buildKey(key, bucketInfo),
+ Range: rangeHeader
+ })
+
+ const response = await getClient().send(command)
+
+ return {
+ response,
+ stream: response.Body as Readable
+ }
+}
+
+// ---------------------------------------------------------------------------
+
export {
BucketInfo,
buildKey,
+
storeObject,
+ storeContent,
+
removeObject,
+ removeObjectByFullKey,
removePrefix,
- makeAvailable
+
+ makeAvailable,
+
+ updateObjectACL,
+ updatePrefixACL,
+
+ listKeysOfPrefix,
+ createObjectReadStream
}
// ---------------------------------------------------------------------------
-async function objectStoragePut (options: {
+async function uploadToStorage (options: {
+ content: ReadStream | string
objectStorageKey: string
- content: ReadStream
bucketInfo: BucketInfo
+ isPrivate: boolean
}) {
- const { objectStorageKey, content, bucketInfo } = options
+ const { content, objectStorageKey, bucketInfo, isPrivate } = options
- const command = new PutObjectCommand({
+ const input: PutObjectCommandInput = {
+ Body: content,
Bucket: bucketInfo.BUCKET_NAME,
- Key: buildKey(objectStorageKey, bucketInfo),
- Body: content
+ Key: buildKey(objectStorageKey, bucketInfo)
+ }
+
+ const acl = getACL(isPrivate)
+ if (acl) input.ACL = acl
+
+ const parallelUploads3 = new Upload({
+ client: getClient(),
+ queueSize: 4,
+ partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
+
+ // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
+ // More detailed explanation:
+ // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
+ // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
+ leavePartsOnError: true,
+ params: input
})
- await getClient().send(command)
+ const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
+ // Check is needed even if the HTTP status code is 200 OK
+ // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
+ if (!response.Bucket) {
+ const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
+ logger.error(message, { response, ...lTags() })
+ throw new Error(message)
+ }
+
+ logger.debug(
+ 'Completed %s%s in bucket %s',
+ bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
+ )
- return getPrivateUrl(bucketInfo, objectStorageKey)
+ return getInternalUrl(bucketInfo, objectStorageKey)
}
-async function multiPartUpload (options: {
- inputPath: string
- objectStorageKey: string
+async function applyOnPrefix (options: {
+ prefix: string
bucketInfo: BucketInfo
+ commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
+
+ continuationToken?: string
}) {
- const { objectStorageKey, inputPath, bucketInfo } = options
+ const { prefix, bucketInfo, commandBuilder, continuationToken } = options
- const key = buildKey(objectStorageKey, bucketInfo)
const s3Client = getClient()
- const statResult = await stat(inputPath)
-
- const createMultipartCommand = new CreateMultipartUploadCommand({
+ const commandPrefix = buildKey(prefix, bucketInfo)
+ const listCommand = new ListObjectsV2Command({
Bucket: bucketInfo.BUCKET_NAME,
- Key: key
+ Prefix: commandPrefix,
+ ContinuationToken: continuationToken
})
- const createResponse = await s3Client.send(createMultipartCommand)
-
- const fd = await open(inputPath, 'r')
- let partNumber = 1
- const parts: CompletedPart[] = []
- const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
-
- for (let start = 0; start < statResult.size; start += partSize) {
- logger.debug(
- 'Uploading part %d of file to %s%s in bucket %s',
- partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
- )
-
- // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
- // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
- // streams with start and end set, so it just tries to stat the file in stream.path.
- // This fails for us because we only want to send part of the file. The stream type
- // is modified so we can set the byteLength here, which s3 detects because array buffers
- // have this field set
- const stream: ReadStream & { byteLength: number } =
- createReadStream(
- inputPath,
- { fd, autoClose: false, start, end: (start + partSize) - 1 }
- ) as ReadStream & { byteLength: number }
-
- // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
- stream.byteLength = min([ statResult.size - start, partSize ])
-
- const uploadPartCommand = new UploadPartCommand({
- Bucket: bucketInfo.BUCKET_NAME,
- Key: key,
- UploadId: createResponse.UploadId,
- PartNumber: partNumber,
- Body: stream
- })
- const uploadResponse = await s3Client.send(uploadPartCommand)
-
- parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
- partNumber += 1
+
+ const listedObjects = await s3Client.send(listCommand)
+
+ if (isArray(listedObjects.Contents) !== true) {
+ const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+
+ logger.error(message, { response: listedObjects, ...lTags() })
+ throw new Error(message)
}
- await close(fd)
- const completeUploadCommand = new CompleteMultipartUploadCommand({
- Bucket: bucketInfo.BUCKET_NAME,
- Key: objectStorageKey,
- UploadId: createResponse.UploadId,
- MultipartUpload: { Parts: parts }
- })
- await s3Client.send(completeUploadCommand)
+ await map(listedObjects.Contents, object => {
+ const command = commandBuilder(object)
- logger.debug(
- 'Completed %s%s in bucket %s in %d parts',
- bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
- )
+ return s3Client.send(command)
+ }, { concurrency: 10 })
+
+ // Repeat if not all objects could be listed at once (limit of 1000?)
+ if (listedObjects.IsTruncated) {
+ await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
+ }
+}
- return getPrivateUrl(bucketInfo, objectStorageKey)
+function getACL (isPrivate: boolean) {
+ return isPrivate
+ ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
+ : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
}