]>
Commit | Line | Data |
---|---|---|
508c1b1e | 1 | import { map } from 'bluebird' |
23c0b67d | 2 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' |
0305db28 JB |
3 | import { dirname } from 'path' |
4 | import { Readable } from 'stream' | |
5 | import { | |
9ab330b9 | 6 | _Object, |
23c0b67d | 7 | CompleteMultipartUploadCommandOutput, |
0305db28 JB |
8 | DeleteObjectCommand, |
9 | GetObjectCommand, | |
10 | ListObjectsV2Command, | |
9ab330b9 C |
11 | PutObjectAclCommand, |
12 | PutObjectCommandInput, | |
13 | S3Client | |
0305db28 | 14 | } from '@aws-sdk/client-s3' |
156cdbac | 15 | import { Upload } from '@aws-sdk/lib-storage' |
0305db28 JB |
16 | import { pipelinePromise } from '@server/helpers/core-utils' |
17 | import { isArray } from '@server/helpers/custom-validators/misc' | |
18 | import { logger } from '@server/helpers/logger' | |
19 | import { CONFIG } from '@server/initializers/config' | |
9ab330b9 | 20 | import { getInternalUrl } from '../urls' |
0305db28 JB |
21 | import { getClient } from './client' |
22 | import { lTags } from './logger' | |
23 | ||
24 | type BucketInfo = { | |
25 | BUCKET_NAME: string | |
26 | PREFIX?: string | |
27 | } | |
28 | ||
cfd57d2c C |
29 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { |
30 | const s3Client = getClient() | |
31 | ||
32 | const commandPrefix = bucketInfo.PREFIX + prefix | |
33 | const listCommand = new ListObjectsV2Command({ | |
34 | Bucket: bucketInfo.BUCKET_NAME, | |
35 | Prefix: commandPrefix | |
36 | }) | |
37 | ||
38 | const listedObjects = await s3Client.send(listCommand) | |
39 | ||
40 | if (isArray(listedObjects.Contents) !== true) return [] | |
41 | ||
42 | return listedObjects.Contents.map(c => c.Key) | |
43 | } | |
44 | ||
45 | // --------------------------------------------------------------------------- | |
46 | ||
0305db28 JB |
47 | async function storeObject (options: { |
48 | inputPath: string | |
49 | objectStorageKey: string | |
50 | bucketInfo: BucketInfo | |
9ab330b9 | 51 | isPrivate: boolean |
0305db28 | 52 | }): Promise<string> { |
9ab330b9 | 53 | const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 JB |
54 | |
55 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
56 | ||
156cdbac | 57 | const fileStream = createReadStream(inputPath) |
0305db28 | 58 | |
9ab330b9 | 59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) |
0305db28 JB |
60 | } |
61 | ||
34023e12 C |
62 | async function storeContent (options: { |
63 | content: string | |
64 | inputPath: string | |
65 | objectStorageKey: string | |
66 | bucketInfo: BucketInfo | |
67 | isPrivate: boolean | |
68 | }): Promise<string> { | |
69 | const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options | |
70 | ||
71 | logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
72 | ||
73 | return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate }) | |
74 | } | |
75 | ||
cfd57d2c C |
76 | // --------------------------------------------------------------------------- |
77 | ||
8180f604 | 78 | async function updateObjectACL (options: { |
9ab330b9 C |
79 | objectStorageKey: string |
80 | bucketInfo: BucketInfo | |
81 | isPrivate: boolean | |
82 | }) { | |
83 | const { objectStorageKey, bucketInfo, isPrivate } = options | |
84 | ||
8180f604 C |
85 | const acl = getACL(isPrivate) |
86 | if (!acl) return | |
87 | ||
9ab330b9 C |
88 | const key = buildKey(objectStorageKey, bucketInfo) |
89 | ||
90 | logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) | |
91 | ||
92 | const command = new PutObjectAclCommand({ | |
0305db28 | 93 | Bucket: bucketInfo.BUCKET_NAME, |
9ab330b9 | 94 | Key: key, |
8180f604 | 95 | ACL: acl |
0305db28 JB |
96 | }) |
97 | ||
8180f604 | 98 | await getClient().send(command) |
0305db28 JB |
99 | } |
100 | ||
9ab330b9 C |
101 | function updatePrefixACL (options: { |
102 | prefix: string | |
103 | bucketInfo: BucketInfo | |
104 | isPrivate: boolean | |
105 | }) { | |
106 | const { prefix, bucketInfo, isPrivate } = options | |
107 | ||
8180f604 C |
108 | const acl = getACL(isPrivate) |
109 | if (!acl) return | |
110 | ||
9ab330b9 C |
111 | logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) |
112 | ||
113 | return applyOnPrefix({ | |
114 | prefix, | |
115 | bucketInfo, | |
116 | commandBuilder: obj => { | |
508c1b1e C |
117 | logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) |
118 | ||
9ab330b9 C |
119 | return new PutObjectAclCommand({ |
120 | Bucket: bucketInfo.BUCKET_NAME, | |
121 | Key: obj.Key, | |
8180f604 | 122 | ACL: acl |
9ab330b9 C |
123 | }) |
124 | } | |
0305db28 | 125 | }) |
9ab330b9 | 126 | } |
0305db28 | 127 | |
9ab330b9 | 128 | // --------------------------------------------------------------------------- |
0305db28 | 129 | |
9ab330b9 C |
130 | function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { |
131 | const key = buildKey(objectStorageKey, bucketInfo) | |
0305db28 | 132 | |
aa887096 C |
133 | return removeObjectByFullKey(key, bucketInfo) |
134 | } | |
135 | ||
136 | function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) { | |
137 | logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags()) | |
0305db28 | 138 | |
9ab330b9 C |
139 | const command = new DeleteObjectCommand({ |
140 | Bucket: bucketInfo.BUCKET_NAME, | |
aa887096 | 141 | Key: fullKey |
9ab330b9 | 142 | }) |
0305db28 | 143 | |
9ab330b9 C |
144 | return getClient().send(command) |
145 | } | |
0305db28 | 146 | |
508c1b1e | 147 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { |
0305db28 | 148 | // FIXME: use bulk delete when s3ninja will support this operation |
0305db28 | 149 | |
9ab330b9 C |
150 | logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) |
151 | ||
152 | return applyOnPrefix({ | |
153 | prefix, | |
154 | bucketInfo, | |
155 | commandBuilder: obj => { | |
508c1b1e C |
156 | logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) |
157 | ||
9ab330b9 C |
158 | return new DeleteObjectCommand({ |
159 | Bucket: bucketInfo.BUCKET_NAME, | |
160 | Key: obj.Key | |
161 | }) | |
162 | } | |
163 | }) | |
0305db28 JB |
164 | } |
165 | ||
cfd57d2c C |
166 | // --------------------------------------------------------------------------- |
167 | ||
0305db28 JB |
168 | async function makeAvailable (options: { |
169 | key: string | |
170 | destination: string | |
171 | bucketInfo: BucketInfo | |
172 | }) { | |
173 | const { key, destination, bucketInfo } = options | |
174 | ||
175 | await ensureDir(dirname(options.destination)) | |
176 | ||
177 | const command = new GetObjectCommand({ | |
178 | Bucket: bucketInfo.BUCKET_NAME, | |
179 | Key: buildKey(key, bucketInfo) | |
180 | }) | |
181 | const response = await getClient().send(command) | |
182 | ||
183 | const file = createWriteStream(destination) | |
184 | await pipelinePromise(response.Body as Readable, file) | |
185 | ||
186 | file.close() | |
187 | } | |
188 | ||
189 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
190 | return bucketInfo.PREFIX + key | |
191 | } | |
192 | ||
193 | // --------------------------------------------------------------------------- | |
194 | ||
9ab330b9 C |
195 | async function createObjectReadStream (options: { |
196 | key: string | |
197 | bucketInfo: BucketInfo | |
198 | rangeHeader: string | |
199 | }) { | |
200 | const { key, bucketInfo, rangeHeader } = options | |
201 | ||
202 | const command = new GetObjectCommand({ | |
203 | Bucket: bucketInfo.BUCKET_NAME, | |
204 | Key: buildKey(key, bucketInfo), | |
205 | Range: rangeHeader | |
206 | }) | |
207 | ||
208 | const response = await getClient().send(command) | |
209 | ||
57e11a20 C |
210 | return { |
211 | response, | |
212 | stream: response.Body as Readable | |
213 | } | |
9ab330b9 C |
214 | } |
215 | ||
216 | // --------------------------------------------------------------------------- | |
217 | ||
0305db28 JB |
218 | export { |
219 | BucketInfo, | |
220 | buildKey, | |
9ab330b9 | 221 | |
0305db28 | 222 | storeObject, |
34023e12 | 223 | storeContent, |
9ab330b9 | 224 | |
0305db28 | 225 | removeObject, |
aa887096 | 226 | removeObjectByFullKey, |
0305db28 | 227 | removePrefix, |
9ab330b9 | 228 | |
cfd57d2c | 229 | makeAvailable, |
9ab330b9 C |
230 | |
231 | updateObjectACL, | |
232 | updatePrefixACL, | |
233 | ||
234 | listKeysOfPrefix, | |
235 | createObjectReadStream | |
0305db28 JB |
236 | } |
237 | ||
238 | // --------------------------------------------------------------------------- | |
239 | ||
23c0b67d | 240 | async function uploadToStorage (options: { |
34023e12 | 241 | content: ReadStream | string |
0305db28 JB |
242 | objectStorageKey: string |
243 | bucketInfo: BucketInfo | |
9ab330b9 | 244 | isPrivate: boolean |
0305db28 | 245 | }) { |
9ab330b9 | 246 | const { content, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 | 247 | |
156cdbac | 248 | const input: PutObjectCommandInput = { |
249 | Body: content, | |
0305db28 | 250 | Bucket: bucketInfo.BUCKET_NAME, |
8180f604 | 251 | Key: buildKey(objectStorageKey, bucketInfo) |
f9915efa DL |
252 | } |
253 | ||
8180f604 C |
254 | const acl = getACL(isPrivate) |
255 | if (acl) input.ACL = acl | |
256 | ||
156cdbac | 257 | const parallelUploads3 = new Upload({ |
258 | client: getClient(), | |
259 | queueSize: 4, | |
260 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
23c0b67d | 261 | |
262 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | |
263 | // More detailed explanation: | |
264 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | |
265 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | |
266 | leavePartsOnError: true, | |
156cdbac | 267 | params: input |
0305db28 | 268 | }) |
156cdbac | 269 | |
23c0b67d | 270 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput |
271 | // Check is needed even if the HTTP status code is 200 OK | |
272 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | |
273 | if (!response.Bucket) { | |
274 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | |
275 | logger.error(message, { response, ...lTags() }) | |
276 | throw new Error(message) | |
277 | } | |
0305db28 JB |
278 | |
279 | logger.debug( | |
156cdbac | 280 | 'Completed %s%s in bucket %s', |
49b80bd9 | 281 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata } |
0305db28 JB |
282 | ) |
283 | ||
9ab330b9 C |
284 | return getInternalUrl(bucketInfo, objectStorageKey) |
285 | } | |
286 | ||
287 | async function applyOnPrefix (options: { | |
288 | prefix: string | |
289 | bucketInfo: BucketInfo | |
290 | commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0] | |
291 | ||
292 | continuationToken?: string | |
293 | }) { | |
294 | const { prefix, bucketInfo, commandBuilder, continuationToken } = options | |
295 | ||
296 | const s3Client = getClient() | |
297 | ||
508c1b1e | 298 | const commandPrefix = buildKey(prefix, bucketInfo) |
9ab330b9 C |
299 | const listCommand = new ListObjectsV2Command({ |
300 | Bucket: bucketInfo.BUCKET_NAME, | |
301 | Prefix: commandPrefix, | |
302 | ContinuationToken: continuationToken | |
303 | }) | |
304 | ||
305 | const listedObjects = await s3Client.send(listCommand) | |
306 | ||
307 | if (isArray(listedObjects.Contents) !== true) { | |
308 | const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
309 | ||
310 | logger.error(message, { response: listedObjects, ...lTags() }) | |
311 | throw new Error(message) | |
312 | } | |
313 | ||
508c1b1e | 314 | await map(listedObjects.Contents, object => { |
9ab330b9 C |
315 | const command = commandBuilder(object) |
316 | ||
508c1b1e C |
317 | return s3Client.send(command) |
318 | }, { concurrency: 10 }) | |
9ab330b9 C |
319 | |
320 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
321 | if (listedObjects.IsTruncated) { | |
322 | await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) | |
323 | } | |
324 | } | |
325 | ||
326 | function getACL (isPrivate: boolean) { | |
327 | return isPrivate | |
328 | ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE | |
329 | : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC | |
0305db28 | 330 | } |