]>
Commit | Line | Data |
---|---|---|
508c1b1e | 1 | import { map } from 'bluebird' |
23c0b67d | 2 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' |
0305db28 JB |
3 | import { dirname } from 'path' |
4 | import { Readable } from 'stream' | |
5 | import { | |
9ab330b9 | 6 | _Object, |
23c0b67d | 7 | CompleteMultipartUploadCommandOutput, |
0305db28 JB |
8 | DeleteObjectCommand, |
9 | GetObjectCommand, | |
10 | ListObjectsV2Command, | |
9ab330b9 C |
11 | PutObjectAclCommand, |
12 | PutObjectCommandInput, | |
13 | S3Client | |
0305db28 | 14 | } from '@aws-sdk/client-s3' |
156cdbac | 15 | import { Upload } from '@aws-sdk/lib-storage' |
0305db28 JB |
16 | import { pipelinePromise } from '@server/helpers/core-utils' |
17 | import { isArray } from '@server/helpers/custom-validators/misc' | |
18 | import { logger } from '@server/helpers/logger' | |
19 | import { CONFIG } from '@server/initializers/config' | |
9ab330b9 | 20 | import { getInternalUrl } from '../urls' |
0305db28 JB |
21 | import { getClient } from './client' |
22 | import { lTags } from './logger' | |
23 | ||
24 | type BucketInfo = { | |
25 | BUCKET_NAME: string | |
26 | PREFIX?: string | |
27 | } | |
28 | ||
cfd57d2c C |
29 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { |
30 | const s3Client = getClient() | |
31 | ||
32 | const commandPrefix = bucketInfo.PREFIX + prefix | |
33 | const listCommand = new ListObjectsV2Command({ | |
34 | Bucket: bucketInfo.BUCKET_NAME, | |
35 | Prefix: commandPrefix | |
36 | }) | |
37 | ||
38 | const listedObjects = await s3Client.send(listCommand) | |
39 | ||
40 | if (isArray(listedObjects.Contents) !== true) return [] | |
41 | ||
42 | return listedObjects.Contents.map(c => c.Key) | |
43 | } | |
44 | ||
45 | // --------------------------------------------------------------------------- | |
46 | ||
0305db28 JB |
47 | async function storeObject (options: { |
48 | inputPath: string | |
49 | objectStorageKey: string | |
50 | bucketInfo: BucketInfo | |
9ab330b9 | 51 | isPrivate: boolean |
0305db28 | 52 | }): Promise<string> { |
9ab330b9 | 53 | const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 JB |
54 | |
55 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
56 | ||
156cdbac | 57 | const fileStream = createReadStream(inputPath) |
0305db28 | 58 | |
9ab330b9 | 59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) |
0305db28 JB |
60 | } |
61 | ||
cfd57d2c C |
62 | // --------------------------------------------------------------------------- |
63 | ||
8180f604 | 64 | async function updateObjectACL (options: { |
9ab330b9 C |
65 | objectStorageKey: string |
66 | bucketInfo: BucketInfo | |
67 | isPrivate: boolean | |
68 | }) { | |
69 | const { objectStorageKey, bucketInfo, isPrivate } = options | |
70 | ||
8180f604 C |
71 | const acl = getACL(isPrivate) |
72 | if (!acl) return | |
73 | ||
9ab330b9 C |
74 | const key = buildKey(objectStorageKey, bucketInfo) |
75 | ||
76 | logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) | |
77 | ||
78 | const command = new PutObjectAclCommand({ | |
0305db28 | 79 | Bucket: bucketInfo.BUCKET_NAME, |
9ab330b9 | 80 | Key: key, |
8180f604 | 81 | ACL: acl |
0305db28 JB |
82 | }) |
83 | ||
8180f604 | 84 | await getClient().send(command) |
0305db28 JB |
85 | } |
86 | ||
9ab330b9 C |
87 | function updatePrefixACL (options: { |
88 | prefix: string | |
89 | bucketInfo: BucketInfo | |
90 | isPrivate: boolean | |
91 | }) { | |
92 | const { prefix, bucketInfo, isPrivate } = options | |
93 | ||
8180f604 C |
94 | const acl = getACL(isPrivate) |
95 | if (!acl) return | |
96 | ||
9ab330b9 C |
97 | logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) |
98 | ||
99 | return applyOnPrefix({ | |
100 | prefix, | |
101 | bucketInfo, | |
102 | commandBuilder: obj => { | |
508c1b1e C |
103 | logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) |
104 | ||
9ab330b9 C |
105 | return new PutObjectAclCommand({ |
106 | Bucket: bucketInfo.BUCKET_NAME, | |
107 | Key: obj.Key, | |
8180f604 | 108 | ACL: acl |
9ab330b9 C |
109 | }) |
110 | } | |
0305db28 | 111 | }) |
9ab330b9 | 112 | } |
0305db28 | 113 | |
9ab330b9 | 114 | // --------------------------------------------------------------------------- |
0305db28 | 115 | |
9ab330b9 C |
116 | function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { |
117 | const key = buildKey(objectStorageKey, bucketInfo) | |
0305db28 | 118 | |
aa887096 C |
119 | return removeObjectByFullKey(key, bucketInfo) |
120 | } | |
121 | ||
122 | function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) { | |
123 | logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags()) | |
0305db28 | 124 | |
9ab330b9 C |
125 | const command = new DeleteObjectCommand({ |
126 | Bucket: bucketInfo.BUCKET_NAME, | |
aa887096 | 127 | Key: fullKey |
9ab330b9 | 128 | }) |
0305db28 | 129 | |
9ab330b9 C |
130 | return getClient().send(command) |
131 | } | |
0305db28 | 132 | |
508c1b1e | 133 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { |
0305db28 | 134 | // FIXME: use bulk delete when s3ninja will support this operation |
0305db28 | 135 | |
9ab330b9 C |
136 | logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) |
137 | ||
138 | return applyOnPrefix({ | |
139 | prefix, | |
140 | bucketInfo, | |
141 | commandBuilder: obj => { | |
508c1b1e C |
142 | logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) |
143 | ||
9ab330b9 C |
144 | return new DeleteObjectCommand({ |
145 | Bucket: bucketInfo.BUCKET_NAME, | |
146 | Key: obj.Key | |
147 | }) | |
148 | } | |
149 | }) | |
0305db28 JB |
150 | } |
151 | ||
cfd57d2c C |
152 | // --------------------------------------------------------------------------- |
153 | ||
0305db28 JB |
154 | async function makeAvailable (options: { |
155 | key: string | |
156 | destination: string | |
157 | bucketInfo: BucketInfo | |
158 | }) { | |
159 | const { key, destination, bucketInfo } = options | |
160 | ||
161 | await ensureDir(dirname(options.destination)) | |
162 | ||
163 | const command = new GetObjectCommand({ | |
164 | Bucket: bucketInfo.BUCKET_NAME, | |
165 | Key: buildKey(key, bucketInfo) | |
166 | }) | |
167 | const response = await getClient().send(command) | |
168 | ||
169 | const file = createWriteStream(destination) | |
170 | await pipelinePromise(response.Body as Readable, file) | |
171 | ||
172 | file.close() | |
173 | } | |
174 | ||
175 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
176 | return bucketInfo.PREFIX + key | |
177 | } | |
178 | ||
179 | // --------------------------------------------------------------------------- | |
180 | ||
9ab330b9 C |
181 | async function createObjectReadStream (options: { |
182 | key: string | |
183 | bucketInfo: BucketInfo | |
184 | rangeHeader: string | |
185 | }) { | |
186 | const { key, bucketInfo, rangeHeader } = options | |
187 | ||
188 | const command = new GetObjectCommand({ | |
189 | Bucket: bucketInfo.BUCKET_NAME, | |
190 | Key: buildKey(key, bucketInfo), | |
191 | Range: rangeHeader | |
192 | }) | |
193 | ||
194 | const response = await getClient().send(command) | |
195 | ||
57e11a20 C |
196 | return { |
197 | response, | |
198 | stream: response.Body as Readable | |
199 | } | |
9ab330b9 C |
200 | } |
201 | ||
202 | // --------------------------------------------------------------------------- | |
203 | ||
0305db28 JB |
204 | export { |
205 | BucketInfo, | |
206 | buildKey, | |
9ab330b9 | 207 | |
0305db28 | 208 | storeObject, |
9ab330b9 | 209 | |
0305db28 | 210 | removeObject, |
aa887096 | 211 | removeObjectByFullKey, |
0305db28 | 212 | removePrefix, |
9ab330b9 | 213 | |
cfd57d2c | 214 | makeAvailable, |
9ab330b9 C |
215 | |
216 | updateObjectACL, | |
217 | updatePrefixACL, | |
218 | ||
219 | listKeysOfPrefix, | |
220 | createObjectReadStream | |
0305db28 JB |
221 | } |
222 | ||
223 | // --------------------------------------------------------------------------- | |
224 | ||
23c0b67d | 225 | async function uploadToStorage (options: { |
156cdbac | 226 | content: ReadStream |
0305db28 JB |
227 | objectStorageKey: string |
228 | bucketInfo: BucketInfo | |
9ab330b9 | 229 | isPrivate: boolean |
0305db28 | 230 | }) { |
9ab330b9 | 231 | const { content, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 | 232 | |
156cdbac | 233 | const input: PutObjectCommandInput = { |
234 | Body: content, | |
0305db28 | 235 | Bucket: bucketInfo.BUCKET_NAME, |
8180f604 | 236 | Key: buildKey(objectStorageKey, bucketInfo) |
f9915efa DL |
237 | } |
238 | ||
8180f604 C |
239 | const acl = getACL(isPrivate) |
240 | if (acl) input.ACL = acl | |
241 | ||
156cdbac | 242 | const parallelUploads3 = new Upload({ |
243 | client: getClient(), | |
244 | queueSize: 4, | |
245 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
23c0b67d | 246 | |
247 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | |
248 | // More detailed explanation: | |
249 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | |
250 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | |
251 | leavePartsOnError: true, | |
156cdbac | 252 | params: input |
0305db28 | 253 | }) |
156cdbac | 254 | |
23c0b67d | 255 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput |
256 | // Check is needed even if the HTTP status code is 200 OK | |
257 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | |
258 | if (!response.Bucket) { | |
259 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | |
260 | logger.error(message, { response, ...lTags() }) | |
261 | throw new Error(message) | |
262 | } | |
0305db28 JB |
263 | |
264 | logger.debug( | |
156cdbac | 265 | 'Completed %s%s in bucket %s', |
266 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
0305db28 JB |
267 | ) |
268 | ||
9ab330b9 C |
269 | return getInternalUrl(bucketInfo, objectStorageKey) |
270 | } | |
271 | ||
272 | async function applyOnPrefix (options: { | |
273 | prefix: string | |
274 | bucketInfo: BucketInfo | |
275 | commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0] | |
276 | ||
277 | continuationToken?: string | |
278 | }) { | |
279 | const { prefix, bucketInfo, commandBuilder, continuationToken } = options | |
280 | ||
281 | const s3Client = getClient() | |
282 | ||
508c1b1e | 283 | const commandPrefix = buildKey(prefix, bucketInfo) |
9ab330b9 C |
284 | const listCommand = new ListObjectsV2Command({ |
285 | Bucket: bucketInfo.BUCKET_NAME, | |
286 | Prefix: commandPrefix, | |
287 | ContinuationToken: continuationToken | |
288 | }) | |
289 | ||
290 | const listedObjects = await s3Client.send(listCommand) | |
291 | ||
292 | if (isArray(listedObjects.Contents) !== true) { | |
293 | const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
294 | ||
295 | logger.error(message, { response: listedObjects, ...lTags() }) | |
296 | throw new Error(message) | |
297 | } | |
298 | ||
508c1b1e | 299 | await map(listedObjects.Contents, object => { |
9ab330b9 C |
300 | const command = commandBuilder(object) |
301 | ||
508c1b1e C |
302 | return s3Client.send(command) |
303 | }, { concurrency: 10 }) | |
9ab330b9 C |
304 | |
305 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
306 | if (listedObjects.IsTruncated) { | |
307 | await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) | |
308 | } | |
309 | } | |
310 | ||
311 | function getACL (isPrivate: boolean) { | |
312 | return isPrivate | |
313 | ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE | |
314 | : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC | |
0305db28 | 315 | } |