1 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
2 import { dirname } from 'path'
3 import { Readable } from 'stream'
6 CompleteMultipartUploadCommandOutput,
11 PutObjectCommandInput,
13 } from '@aws-sdk/client-s3'
14 import { Upload } from '@aws-sdk/lib-storage'
15 import { pipelinePromise } from '@server/helpers/core-utils'
16 import { isArray } from '@server/helpers/custom-validators/misc'
17 import { logger } from '@server/helpers/logger'
18 import { CONFIG } from '@server/initializers/config'
19 import { getInternalUrl } from '../urls'
20 import { getClient } from './client'
21 import { lTags } from './logger'
28 async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
29 const s3Client = getClient()
31 const commandPrefix = bucketInfo.PREFIX + prefix
32 const listCommand = new ListObjectsV2Command({
33 Bucket: bucketInfo.BUCKET_NAME,
37 const listedObjects = await s3Client.send(listCommand)
39 if (isArray(listedObjects.Contents) !== true) return []
41 return listedObjects.Contents.map(c => c.Key)
44 // ---------------------------------------------------------------------------
46 async function storeObject (options: {
48 objectStorageKey: string
49 bucketInfo: BucketInfo
52 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
54 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
56 const fileStream = createReadStream(inputPath)
58 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
61 // ---------------------------------------------------------------------------
63 function updateObjectACL (options: {
64 objectStorageKey: string
65 bucketInfo: BucketInfo
68 const { objectStorageKey, bucketInfo, isPrivate } = options
70 const key = buildKey(objectStorageKey, bucketInfo)
72 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
74 const command = new PutObjectAclCommand({
75 Bucket: bucketInfo.BUCKET_NAME,
77 ACL: getACL(isPrivate)
80 return getClient().send(command)
83 function updatePrefixACL (options: {
85 bucketInfo: BucketInfo
88 const { prefix, bucketInfo, isPrivate } = options
90 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
92 return applyOnPrefix({
95 commandBuilder: obj => {
96 return new PutObjectAclCommand({
97 Bucket: bucketInfo.BUCKET_NAME,
99 ACL: getACL(isPrivate)
105 // ---------------------------------------------------------------------------
107 function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
108 const key = buildKey(objectStorageKey, bucketInfo)
110 logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
112 const command = new DeleteObjectCommand({
113 Bucket: bucketInfo.BUCKET_NAME,
117 return getClient().send(command)
120 function removePrefix (prefix: string, bucketInfo: BucketInfo) {
121 // FIXME: use bulk delete when s3ninja will support this operation
123 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
125 return applyOnPrefix({
128 commandBuilder: obj => {
129 return new DeleteObjectCommand({
130 Bucket: bucketInfo.BUCKET_NAME,
137 // ---------------------------------------------------------------------------
139 async function makeAvailable (options: {
142 bucketInfo: BucketInfo
144 const { key, destination, bucketInfo } = options
146 await ensureDir(dirname(options.destination))
148 const command = new GetObjectCommand({
149 Bucket: bucketInfo.BUCKET_NAME,
150 Key: buildKey(key, bucketInfo)
152 const response = await getClient().send(command)
154 const file = createWriteStream(destination)
155 await pipelinePromise(response.Body as Readable, file)
160 function buildKey (key: string, bucketInfo: BucketInfo) {
161 return bucketInfo.PREFIX + key
164 // ---------------------------------------------------------------------------
166 async function createObjectReadStream (options: {
168 bucketInfo: BucketInfo
171 const { key, bucketInfo, rangeHeader } = options
173 const command = new GetObjectCommand({
174 Bucket: bucketInfo.BUCKET_NAME,
175 Key: buildKey(key, bucketInfo),
179 const response = await getClient().send(command)
181 return response.Body as Readable
184 // ---------------------------------------------------------------------------
201 createObjectReadStream
204 // ---------------------------------------------------------------------------
206 async function uploadToStorage (options: {
208 objectStorageKey: string
209 bucketInfo: BucketInfo
212 const { content, objectStorageKey, bucketInfo, isPrivate } = options
214 const input: PutObjectCommandInput = {
216 Bucket: bucketInfo.BUCKET_NAME,
217 Key: buildKey(objectStorageKey, bucketInfo),
218 ACL: getACL(isPrivate)
221 const parallelUploads3 = new Upload({
224 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
226 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
227 // More detailed explanation:
228 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
229 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
230 leavePartsOnError: true,
234 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
235 // Check is needed even if the HTTP status code is 200 OK
236 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
237 if (!response.Bucket) {
238 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
239 logger.error(message, { response, ...lTags() })
240 throw new Error(message)
244 'Completed %s%s in bucket %s',
245 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
248 return getInternalUrl(bucketInfo, objectStorageKey)
251 async function applyOnPrefix (options: {
253 bucketInfo: BucketInfo
254 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
256 continuationToken?: string
258 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
260 const s3Client = getClient()
262 const commandPrefix = bucketInfo.PREFIX + prefix
263 const listCommand = new ListObjectsV2Command({
264 Bucket: bucketInfo.BUCKET_NAME,
265 Prefix: commandPrefix,
266 ContinuationToken: continuationToken
269 const listedObjects = await s3Client.send(listCommand)
271 if (isArray(listedObjects.Contents) !== true) {
272 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
274 logger.error(message, { response: listedObjects, ...lTags() })
275 throw new Error(message)
278 for (const object of listedObjects.Contents) {
279 const command = commandBuilder(object)
281 await s3Client.send(command)
284 // Repeat if not all objects could be listed at once (limit of 1000?)
285 if (listedObjects.IsTruncated) {
286 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
290 function getACL (isPrivate: boolean) {
292 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
293 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC