1 import { map } from 'bluebird'
2 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
3 import { dirname } from 'path'
4 import { Readable } from 'stream'
7 CompleteMultipartUploadCommandOutput,
12 PutObjectCommandInput,
14 } from '@aws-sdk/client-s3'
15 import { Upload } from '@aws-sdk/lib-storage'
16 import { pipelinePromise } from '@server/helpers/core-utils'
17 import { isArray } from '@server/helpers/custom-validators/misc'
18 import { logger } from '@server/helpers/logger'
19 import { CONFIG } from '@server/initializers/config'
20 import { getInternalUrl } from '../urls'
21 import { getClient } from './client'
22 import { lTags } from './logger'
29 async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
38 const listedObjects = await s3Client.send(listCommand)
40 if (isArray(listedObjects.Contents) !== true) return []
42 return listedObjects.Contents.map(c => c.Key)
45 // ---------------------------------------------------------------------------
47 async function storeObject (options: {
49 objectStorageKey: string
50 bucketInfo: BucketInfo
53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
57 const fileStream = createReadStream(inputPath)
59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
62 async function storeContent (options: {
65 objectStorageKey: string
66 bucketInfo: BucketInfo
69 const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
71 logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
73 return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
76 // ---------------------------------------------------------------------------
78 async function updateObjectACL (options: {
79 objectStorageKey: string
80 bucketInfo: BucketInfo
83 const { objectStorageKey, bucketInfo, isPrivate } = options
85 const acl = getACL(isPrivate)
88 const key = buildKey(objectStorageKey, bucketInfo)
90 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
92 const command = new PutObjectAclCommand({
93 Bucket: bucketInfo.BUCKET_NAME,
98 await getClient().send(command)
101 function updatePrefixACL (options: {
103 bucketInfo: BucketInfo
106 const { prefix, bucketInfo, isPrivate } = options
108 const acl = getACL(isPrivate)
111 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
113 return applyOnPrefix({
116 commandBuilder: obj => {
117 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
119 return new PutObjectAclCommand({
120 Bucket: bucketInfo.BUCKET_NAME,
128 // ---------------------------------------------------------------------------
130 function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
131 const key = buildKey(objectStorageKey, bucketInfo)
133 return removeObjectByFullKey(key, bucketInfo)
136 function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
137 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
139 const command = new DeleteObjectCommand({
140 Bucket: bucketInfo.BUCKET_NAME,
144 return getClient().send(command)
147 async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
148 // FIXME: use bulk delete when s3ninja will support this operation
150 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
152 return applyOnPrefix({
155 commandBuilder: obj => {
156 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
158 return new DeleteObjectCommand({
159 Bucket: bucketInfo.BUCKET_NAME,
166 // ---------------------------------------------------------------------------
168 async function makeAvailable (options: {
171 bucketInfo: BucketInfo
173 const { key, destination, bucketInfo } = options
175 await ensureDir(dirname(options.destination))
177 const command = new GetObjectCommand({
178 Bucket: bucketInfo.BUCKET_NAME,
179 Key: buildKey(key, bucketInfo)
181 const response = await getClient().send(command)
183 const file = createWriteStream(destination)
184 await pipelinePromise(response.Body as Readable, file)
189 function buildKey (key: string, bucketInfo: BucketInfo) {
190 return bucketInfo.PREFIX + key
193 // ---------------------------------------------------------------------------
195 async function createObjectReadStream (options: {
197 bucketInfo: BucketInfo
200 const { key, bucketInfo, rangeHeader } = options
202 const command = new GetObjectCommand({
203 Bucket: bucketInfo.BUCKET_NAME,
204 Key: buildKey(key, bucketInfo),
208 const response = await getClient().send(command)
212 stream: response.Body as Readable
216 // ---------------------------------------------------------------------------
226 removeObjectByFullKey,
235 createObjectReadStream
238 // ---------------------------------------------------------------------------
240 async function uploadToStorage (options: {
241 content: ReadStream | string
242 objectStorageKey: string
243 bucketInfo: BucketInfo
246 const { content, objectStorageKey, bucketInfo, isPrivate } = options
248 const input: PutObjectCommandInput = {
250 Bucket: bucketInfo.BUCKET_NAME,
251 Key: buildKey(objectStorageKey, bucketInfo)
254 const acl = getACL(isPrivate)
255 if (acl) input.ACL = acl
257 const parallelUploads3 = new Upload({
260 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
262 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
263 // More detailed explanation:
264 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
265 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
266 leavePartsOnError: true,
270 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
271 // Check is needed even if the HTTP status code is 200 OK
272 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
273 if (!response.Bucket) {
274 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
275 logger.error(message, { response, ...lTags() })
276 throw new Error(message)
280 'Completed %s%s in bucket %s',
281 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
284 return getInternalUrl(bucketInfo, objectStorageKey)
287 async function applyOnPrefix (options: {
289 bucketInfo: BucketInfo
290 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
292 continuationToken?: string
294 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
296 const s3Client = getClient()
298 const commandPrefix = buildKey(prefix, bucketInfo)
299 const listCommand = new ListObjectsV2Command({
300 Bucket: bucketInfo.BUCKET_NAME,
301 Prefix: commandPrefix,
302 ContinuationToken: continuationToken
305 const listedObjects = await s3Client.send(listCommand)
307 if (isArray(listedObjects.Contents) !== true) {
308 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
310 logger.error(message, { response: listedObjects, ...lTags() })
311 throw new Error(message)
314 await map(listedObjects.Contents, object => {
315 const command = commandBuilder(object)
317 return s3Client.send(command)
318 }, { concurrency: 10 })
320 // Repeat if not all objects could be listed at once (limit of 1000?)
321 if (listedObjects.IsTruncated) {
322 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
326 function getACL (isPrivate: boolean) {
328 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
329 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC