+import { map } from 'bluebird'
import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
import { dirname } from 'path'
import { Readable } from 'stream'
return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
}
+async function storeContent (options: {
+ content: string
+ inputPath: string
+ objectStorageKey: string
+ bucketInfo: BucketInfo
+ isPrivate: boolean
+}): Promise<string> {
+ const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+ logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+ return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
+}
+
// ---------------------------------------------------------------------------
-function updateObjectACL (options: {
+async function updateObjectACL (options: {
objectStorageKey: string
bucketInfo: BucketInfo
isPrivate: boolean
}) {
const { objectStorageKey, bucketInfo, isPrivate } = options
+ const acl = getACL(isPrivate)
+ if (!acl) return
+
const key = buildKey(objectStorageKey, bucketInfo)
logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
const command = new PutObjectAclCommand({
Bucket: bucketInfo.BUCKET_NAME,
Key: key,
- ACL: getACL(isPrivate)
+ ACL: acl
})
- return getClient().send(command)
+ await getClient().send(command)
}
function updatePrefixACL (options: {
}) {
const { prefix, bucketInfo, isPrivate } = options
+ const acl = getACL(isPrivate)
+ if (!acl) return
+
logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
return applyOnPrefix({
prefix,
bucketInfo,
commandBuilder: obj => {
+ logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
return new PutObjectAclCommand({
Bucket: bucketInfo.BUCKET_NAME,
Key: obj.Key,
- ACL: getACL(isPrivate)
+ ACL: acl
})
}
})
function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
const key = buildKey(objectStorageKey, bucketInfo)
- logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
+ return removeObjectByFullKey(key, bucketInfo)
+}
+
+function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
+ logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
const command = new DeleteObjectCommand({
Bucket: bucketInfo.BUCKET_NAME,
- Key: key
+ Key: fullKey
})
return getClient().send(command)
}
-function removePrefix (prefix: string, bucketInfo: BucketInfo) {
+async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
// FIXME: use bulk delete when s3ninja will support this operation
logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
prefix,
bucketInfo,
commandBuilder: obj => {
+ logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
return new DeleteObjectCommand({
Bucket: bucketInfo.BUCKET_NAME,
Key: obj.Key
const response = await getClient().send(command)
- return response.Body as Readable
+ return {
+ response,
+ stream: response.Body as Readable
+ }
}
// ---------------------------------------------------------------------------
buildKey,
storeObject,
+ storeContent,
removeObject,
+ removeObjectByFullKey,
removePrefix,
makeAvailable,
// ---------------------------------------------------------------------------
async function uploadToStorage (options: {
- content: ReadStream
+ content: ReadStream | string
objectStorageKey: string
bucketInfo: BucketInfo
isPrivate: boolean
const input: PutObjectCommandInput = {
Body: content,
Bucket: bucketInfo.BUCKET_NAME,
- Key: buildKey(objectStorageKey, bucketInfo),
- ACL: getACL(isPrivate)
+ Key: buildKey(objectStorageKey, bucketInfo)
}
+ const acl = getACL(isPrivate)
+ if (acl) input.ACL = acl
+
const parallelUploads3 = new Upload({
client: getClient(),
queueSize: 4,
logger.debug(
'Completed %s%s in bucket %s',
- bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
+ bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
)
return getInternalUrl(bucketInfo, objectStorageKey)
const s3Client = getClient()
- const commandPrefix = bucketInfo.PREFIX + prefix
+ const commandPrefix = buildKey(prefix, bucketInfo)
const listCommand = new ListObjectsV2Command({
Bucket: bucketInfo.BUCKET_NAME,
Prefix: commandPrefix,
throw new Error(message)
}
- for (const object of listedObjects.Contents) {
+ await map(listedObjects.Contents, object => {
const command = commandBuilder(object)
- await s3Client.send(command)
- }
+ return s3Client.send(command)
+ }, { concurrency: 10 })
// Repeat if not all objects could be listed at once (limit of 1000?)
if (listedObjects.IsTruncated) {