aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/object-storage/shared/object-storage-helpers.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/object-storage/shared/object-storage-helpers.ts')
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts223
1 files changed, 176 insertions, 47 deletions
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index 16161362c..3046d76bc 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -1,19 +1,23 @@
1import { map } from 'bluebird'
1import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' 2import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
2import { dirname } from 'path' 3import { dirname } from 'path'
3import { Readable } from 'stream' 4import { Readable } from 'stream'
4import { 5import {
6 _Object,
5 CompleteMultipartUploadCommandOutput, 7 CompleteMultipartUploadCommandOutput,
6 DeleteObjectCommand, 8 DeleteObjectCommand,
7 GetObjectCommand, 9 GetObjectCommand,
8 ListObjectsV2Command, 10 ListObjectsV2Command,
9 PutObjectCommandInput 11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
10} from '@aws-sdk/client-s3' 14} from '@aws-sdk/client-s3'
11import { Upload } from '@aws-sdk/lib-storage' 15import { Upload } from '@aws-sdk/lib-storage'
12import { pipelinePromise } from '@server/helpers/core-utils' 16import { pipelinePromise } from '@server/helpers/core-utils'
13import { isArray } from '@server/helpers/custom-validators/misc' 17import { isArray } from '@server/helpers/custom-validators/misc'
14import { logger } from '@server/helpers/logger' 18import { logger } from '@server/helpers/logger'
15import { CONFIG } from '@server/initializers/config' 19import { CONFIG } from '@server/initializers/config'
16import { getPrivateUrl } from '../urls' 20import { getInternalUrl } from '../urls'
17import { getClient } from './client' 21import { getClient } from './client'
18import { lTags } from './logger' 22import { lTags } from './logger'
19 23
@@ -22,73 +26,125 @@ type BucketInfo = {
22 PREFIX?: string 26 PREFIX?: string
23} 27}
24 28
29async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43}
44
45// ---------------------------------------------------------------------------
46
25async function storeObject (options: { 47async function storeObject (options: {
26 inputPath: string 48 inputPath: string
27 objectStorageKey: string 49 objectStorageKey: string
28 bucketInfo: BucketInfo 50 bucketInfo: BucketInfo
51 isPrivate: boolean
29}): Promise<string> { 52}): Promise<string> {
30 const { inputPath, objectStorageKey, bucketInfo } = options 53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
31 54
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) 55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
33 56
34 const fileStream = createReadStream(inputPath) 57 const fileStream = createReadStream(inputPath)
35 58
36 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) 59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
37} 60}
38 61
39async function removeObject (filename: string, bucketInfo: BucketInfo) { 62// ---------------------------------------------------------------------------
40 const command = new DeleteObjectCommand({ 63
64function updateObjectACL (options: {
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}) {
69 const { objectStorageKey, bucketInfo, isPrivate } = options
70
71 const key = buildKey(objectStorageKey, bucketInfo)
72
73 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
74
75 const command = new PutObjectAclCommand({
41 Bucket: bucketInfo.BUCKET_NAME, 76 Bucket: bucketInfo.BUCKET_NAME,
42 Key: buildKey(filename, bucketInfo) 77 Key: key,
78 ACL: getACL(isPrivate)
43 }) 79 })
44 80
45 return getClient().send(command) 81 return getClient().send(command)
46} 82}
47 83
48async function removePrefix (prefix: string, bucketInfo: BucketInfo) { 84function updatePrefixACL (options: {
49 const s3Client = getClient() 85 prefix: string
50 86 bucketInfo: BucketInfo
51 const commandPrefix = bucketInfo.PREFIX + prefix 87 isPrivate: boolean
52 const listCommand = new ListObjectsV2Command({ 88}) {
53 Bucket: bucketInfo.BUCKET_NAME, 89 const { prefix, bucketInfo, isPrivate } = options
54 Prefix: commandPrefix 90
91 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
92
93 return applyOnPrefix({
94 prefix,
95 bucketInfo,
96 commandBuilder: obj => {
97 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
98
99 return new PutObjectAclCommand({
100 Bucket: bucketInfo.BUCKET_NAME,
101 Key: obj.Key,
102 ACL: getACL(isPrivate)
103 })
104 }
55 }) 105 })
106}
56 107
57 const listedObjects = await s3Client.send(listCommand) 108// ---------------------------------------------------------------------------
58
59 // FIXME: use bulk delete when s3ninja will support this operation
60 // const deleteParams = {
61 // Bucket: bucketInfo.BUCKET_NAME,
62 // Delete: { Objects: [] }
63 // }
64 109
65 if (isArray(listedObjects.Contents) !== true) { 110function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
66 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` 111 const key = buildKey(objectStorageKey, bucketInfo)
67 112
68 logger.error(message, { response: listedObjects, ...lTags() }) 113 return removeObjectByFullKey(key, bucketInfo)
69 throw new Error(message) 114}
70 }
71 115
72 for (const object of listedObjects.Contents) { 116function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
73 const command = new DeleteObjectCommand({ 117 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
74 Bucket: bucketInfo.BUCKET_NAME,
75 Key: object.Key
76 })
77 118
78 await s3Client.send(command) 119 const command = new DeleteObjectCommand({
120 Bucket: bucketInfo.BUCKET_NAME,
121 Key: fullKey
122 })
79 123
80 // FIXME: use bulk delete when s3ninja will support this operation 124 return getClient().send(command)
81 // deleteParams.Delete.Objects.push({ Key: object.Key }) 125}
82 }
83 126
127async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
84 // FIXME: use bulk delete when s3ninja will support this operation 128 // FIXME: use bulk delete when s3ninja will support this operation
85 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
86 // await s3Client.send(deleteCommand)
87 129
88 // Repeat if not all objects could be listed at once (limit of 1000?) 130 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
89 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) 131
132 return applyOnPrefix({
133 prefix,
134 bucketInfo,
135 commandBuilder: obj => {
136 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
137
138 return new DeleteObjectCommand({
139 Bucket: bucketInfo.BUCKET_NAME,
140 Key: obj.Key
141 })
142 }
143 })
90} 144}
91 145
146// ---------------------------------------------------------------------------
147
92async function makeAvailable (options: { 148async function makeAvailable (options: {
93 key: string 149 key: string
94 destination: string 150 destination: string
@@ -116,13 +172,43 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
116 172
117// --------------------------------------------------------------------------- 173// ---------------------------------------------------------------------------
118 174
175async function createObjectReadStream (options: {
176 key: string
177 bucketInfo: BucketInfo
178 rangeHeader: string
179}) {
180 const { key, bucketInfo, rangeHeader } = options
181
182 const command = new GetObjectCommand({
183 Bucket: bucketInfo.BUCKET_NAME,
184 Key: buildKey(key, bucketInfo),
185 Range: rangeHeader
186 })
187
188 const response = await getClient().send(command)
189
190 return response.Body as Readable
191}
192
193// ---------------------------------------------------------------------------
194
119export { 195export {
120 BucketInfo, 196 BucketInfo,
121 buildKey, 197 buildKey,
198
122 storeObject, 199 storeObject,
200
123 removeObject, 201 removeObject,
202 removeObjectByFullKey,
124 removePrefix, 203 removePrefix,
125 makeAvailable 204
205 makeAvailable,
206
207 updateObjectACL,
208 updatePrefixACL,
209
210 listKeysOfPrefix,
211 createObjectReadStream
126} 212}
127 213
128// --------------------------------------------------------------------------- 214// ---------------------------------------------------------------------------
@@ -131,17 +217,15 @@ async function uploadToStorage (options: {
131 content: ReadStream 217 content: ReadStream
132 objectStorageKey: string 218 objectStorageKey: string
133 bucketInfo: BucketInfo 219 bucketInfo: BucketInfo
220 isPrivate: boolean
134}) { 221}) {
135 const { content, objectStorageKey, bucketInfo } = options 222 const { content, objectStorageKey, bucketInfo, isPrivate } = options
136 223
137 const input: PutObjectCommandInput = { 224 const input: PutObjectCommandInput = {
138 Body: content, 225 Body: content,
139 Bucket: bucketInfo.BUCKET_NAME, 226 Bucket: bucketInfo.BUCKET_NAME,
140 Key: buildKey(objectStorageKey, bucketInfo) 227 Key: buildKey(objectStorageKey, bucketInfo),
141 } 228 ACL: getACL(isPrivate)
142
143 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
144 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
145 } 229 }
146 230
147 const parallelUploads3 = new Upload({ 231 const parallelUploads3 = new Upload({
@@ -171,5 +255,50 @@ async function uploadToStorage (options: {
171 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() 255 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
172 ) 256 )
173 257
174 return getPrivateUrl(bucketInfo, objectStorageKey) 258 return getInternalUrl(bucketInfo, objectStorageKey)
259}
260
261async function applyOnPrefix (options: {
262 prefix: string
263 bucketInfo: BucketInfo
264 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
265
266 continuationToken?: string
267}) {
268 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
269
270 const s3Client = getClient()
271
272 const commandPrefix = buildKey(prefix, bucketInfo)
273 const listCommand = new ListObjectsV2Command({
274 Bucket: bucketInfo.BUCKET_NAME,
275 Prefix: commandPrefix,
276 ContinuationToken: continuationToken
277 })
278
279 const listedObjects = await s3Client.send(listCommand)
280
281 if (isArray(listedObjects.Contents) !== true) {
282 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
283
284 logger.error(message, { response: listedObjects, ...lTags() })
285 throw new Error(message)
286 }
287
288 await map(listedObjects.Contents, object => {
289 const command = commandBuilder(object)
290
291 return s3Client.send(command)
292 }, { concurrency: 10 })
293
294 // Repeat if not all objects could be listed at once (limit of 1000?)
295 if (listedObjects.IsTruncated) {
296 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
297 }
298}
299
300function getACL (isPrivate: boolean) {
301 return isPrivate
302 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
303 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
175} 304}