1 import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
2 import { dirname } from 'path'
3 import { Readable } from 'stream'
10 } from '@aws-sdk/client-s3'
11 import { Upload } from '@aws-sdk/lib-storage'
12 import { pipelinePromise } from '@server/helpers/core-utils'
13 import { isArray } from '@server/helpers/custom-validators/misc'
14 import { logger } from '@server/helpers/logger'
15 import { CONFIG } from '@server/initializers/config'
16 import { getPrivateUrl } from '../urls'
17 import { getClient } from './client'
18 import { lTags } from './logger'
25 async function storeObject (options: {
27 objectStorageKey: string
28 bucketInfo: BucketInfo
30 const { inputPath, objectStorageKey, bucketInfo } = options
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
34 const stats = await stat(inputPath)
35 const fileStream = createReadStream(inputPath)
37 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
38 return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
41 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
44 async function removeObject (filename: string, bucketInfo: BucketInfo) {
45 const command = new DeleteObjectCommand({
46 Bucket: bucketInfo.BUCKET_NAME,
47 Key: buildKey(filename, bucketInfo)
50 return getClient().send(command)
53 async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
54 const s3Client = getClient()
56 const commandPrefix = bucketInfo.PREFIX + prefix
57 const listCommand = new ListObjectsV2Command({
58 Bucket: bucketInfo.BUCKET_NAME,
62 const listedObjects = await s3Client.send(listCommand)
64 // FIXME: use bulk delete when s3ninja will support this operation
65 // const deleteParams = {
66 // Bucket: bucketInfo.BUCKET_NAME,
67 // Delete: { Objects: [] }
70 if (isArray(listedObjects.Contents) !== true) {
71 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
73 logger.error(message, { response: listedObjects, ...lTags() })
74 throw new Error(message)
77 for (const object of listedObjects.Contents) {
78 const command = new DeleteObjectCommand({
79 Bucket: bucketInfo.BUCKET_NAME,
83 await s3Client.send(command)
85 // FIXME: use bulk delete when s3ninja will support this operation
86 // deleteParams.Delete.Objects.push({ Key: object.Key })
89 // FIXME: use bulk delete when s3ninja will support this operation
90 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
91 // await s3Client.send(deleteCommand)
93 // Repeat if not all objects could be listed at once (limit of 1000?)
94 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
97 async function makeAvailable (options: {
100 bucketInfo: BucketInfo
102 const { key, destination, bucketInfo } = options
104 await ensureDir(dirname(options.destination))
106 const command = new GetObjectCommand({
107 Bucket: bucketInfo.BUCKET_NAME,
108 Key: buildKey(key, bucketInfo)
110 const response = await getClient().send(command)
112 const file = createWriteStream(destination)
113 await pipelinePromise(response.Body as Readable, file)
118 function buildKey (key: string, bucketInfo: BucketInfo) {
119 return bucketInfo.PREFIX + key
122 // ---------------------------------------------------------------------------
133 // ---------------------------------------------------------------------------
135 async function objectStoragePut (options: {
136 objectStorageKey: string
138 bucketInfo: BucketInfo
140 const { objectStorageKey, content, bucketInfo } = options
142 const input: PutObjectCommandInput = {
143 Bucket: bucketInfo.BUCKET_NAME,
144 Key: buildKey(objectStorageKey, bucketInfo),
148 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
149 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
152 const command = new PutObjectCommand(input)
154 await getClient().send(command)
156 return getPrivateUrl(bucketInfo, objectStorageKey)
159 async function multiPartUpload (options: {
161 objectStorageKey: string
162 bucketInfo: BucketInfo
164 const { content, objectStorageKey, bucketInfo } = options
166 const input: PutObjectCommandInput = {
168 Bucket: bucketInfo.BUCKET_NAME,
169 Key: buildKey(objectStorageKey, bucketInfo)
172 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
173 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
176 const parallelUploads3 = new Upload({
179 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
180 leavePartsOnError: false,
184 await parallelUploads3.done()
187 'Completed %s%s in bucket %s',
188 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
191 return getPrivateUrl(bucketInfo, objectStorageKey)