1 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
2 import { dirname } from 'path'
3 import { Readable } from 'stream'
5 CompleteMultipartUploadCommandOutput,
10 } from '@aws-sdk/client-s3'
11 import { Upload } from '@aws-sdk/lib-storage'
12 import { pipelinePromise } from '@server/helpers/core-utils'
13 import { isArray } from '@server/helpers/custom-validators/misc'
14 import { logger } from '@server/helpers/logger'
15 import { CONFIG } from '@server/initializers/config'
16 import { getPrivateUrl } from '../urls'
17 import { getClient } from './client'
18 import { lTags } from './logger'
25 async function storeObject (options: {
27 objectStorageKey: string
28 bucketInfo: BucketInfo
30 const { inputPath, objectStorageKey, bucketInfo } = options
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
34 const fileStream = createReadStream(inputPath)
36 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
39 async function removeObject (filename: string, bucketInfo: BucketInfo) {
40 const command = new DeleteObjectCommand({
41 Bucket: bucketInfo.BUCKET_NAME,
42 Key: buildKey(filename, bucketInfo)
45 return getClient().send(command)
48 async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
49 const s3Client = getClient()
51 const commandPrefix = bucketInfo.PREFIX + prefix
52 const listCommand = new ListObjectsV2Command({
53 Bucket: bucketInfo.BUCKET_NAME,
57 const listedObjects = await s3Client.send(listCommand)
59 // FIXME: use bulk delete when s3ninja will support this operation
60 // const deleteParams = {
61 // Bucket: bucketInfo.BUCKET_NAME,
62 // Delete: { Objects: [] }
65 if (isArray(listedObjects.Contents) !== true) {
66 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
68 logger.error(message, { response: listedObjects, ...lTags() })
69 throw new Error(message)
72 for (const object of listedObjects.Contents) {
73 const command = new DeleteObjectCommand({
74 Bucket: bucketInfo.BUCKET_NAME,
78 await s3Client.send(command)
80 // FIXME: use bulk delete when s3ninja will support this operation
81 // deleteParams.Delete.Objects.push({ Key: object.Key })
84 // FIXME: use bulk delete when s3ninja will support this operation
85 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
86 // await s3Client.send(deleteCommand)
88 // Repeat if not all objects could be listed at once (limit of 1000?)
89 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
92 async function makeAvailable (options: {
95 bucketInfo: BucketInfo
97 const { key, destination, bucketInfo } = options
99 await ensureDir(dirname(options.destination))
101 const command = new GetObjectCommand({
102 Bucket: bucketInfo.BUCKET_NAME,
103 Key: buildKey(key, bucketInfo)
105 const response = await getClient().send(command)
107 const file = createWriteStream(destination)
108 await pipelinePromise(response.Body as Readable, file)
113 function buildKey (key: string, bucketInfo: BucketInfo) {
114 return bucketInfo.PREFIX + key
117 // ---------------------------------------------------------------------------
128 // ---------------------------------------------------------------------------
130 async function uploadToStorage (options: {
132 objectStorageKey: string
133 bucketInfo: BucketInfo
135 const { content, objectStorageKey, bucketInfo } = options
137 const input: PutObjectCommandInput = {
139 Bucket: bucketInfo.BUCKET_NAME,
140 Key: buildKey(objectStorageKey, bucketInfo)
143 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
144 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
147 const parallelUploads3 = new Upload({
150 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
152 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
153 // More detailed explanation:
154 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
155 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
156 leavePartsOnError: true,
160 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
161 // Check is needed even if the HTTP status code is 200 OK
162 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
163 if (!response.Bucket) {
164 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
165 logger.error(message, { response, ...lTags() })
166 throw new Error(message)
170 'Completed %s%s in bucket %s',
171 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
174 return getPrivateUrl(bucketInfo, objectStorageKey)