]>
Commit | Line | Data |
---|---|---|
508c1b1e | 1 | import { map } from 'bluebird' |
23c0b67d | 2 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' |
0305db28 JB |
3 | import { dirname } from 'path' |
4 | import { Readable } from 'stream' | |
5 | import { | |
9ab330b9 | 6 | _Object, |
23c0b67d | 7 | CompleteMultipartUploadCommandOutput, |
0305db28 JB |
8 | DeleteObjectCommand, |
9 | GetObjectCommand, | |
10 | ListObjectsV2Command, | |
9ab330b9 C |
11 | PutObjectAclCommand, |
12 | PutObjectCommandInput, | |
13 | S3Client | |
0305db28 | 14 | } from '@aws-sdk/client-s3' |
156cdbac | 15 | import { Upload } from '@aws-sdk/lib-storage' |
0305db28 JB |
16 | import { pipelinePromise } from '@server/helpers/core-utils' |
17 | import { isArray } from '@server/helpers/custom-validators/misc' | |
18 | import { logger } from '@server/helpers/logger' | |
19 | import { CONFIG } from '@server/initializers/config' | |
9ab330b9 | 20 | import { getInternalUrl } from '../urls' |
0305db28 JB |
21 | import { getClient } from './client' |
22 | import { lTags } from './logger' | |
23 | ||
24 | type BucketInfo = { | |
25 | BUCKET_NAME: string | |
26 | PREFIX?: string | |
27 | } | |
28 | ||
cfd57d2c C |
29 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { |
30 | const s3Client = getClient() | |
31 | ||
32 | const commandPrefix = bucketInfo.PREFIX + prefix | |
33 | const listCommand = new ListObjectsV2Command({ | |
34 | Bucket: bucketInfo.BUCKET_NAME, | |
35 | Prefix: commandPrefix | |
36 | }) | |
37 | ||
38 | const listedObjects = await s3Client.send(listCommand) | |
39 | ||
40 | if (isArray(listedObjects.Contents) !== true) return [] | |
41 | ||
42 | return listedObjects.Contents.map(c => c.Key) | |
43 | } | |
44 | ||
45 | // --------------------------------------------------------------------------- | |
46 | ||
0305db28 JB |
47 | async function storeObject (options: { |
48 | inputPath: string | |
49 | objectStorageKey: string | |
50 | bucketInfo: BucketInfo | |
9ab330b9 | 51 | isPrivate: boolean |
0305db28 | 52 | }): Promise<string> { |
9ab330b9 | 53 | const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 JB |
54 | |
55 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
56 | ||
156cdbac | 57 | const fileStream = createReadStream(inputPath) |
0305db28 | 58 | |
9ab330b9 | 59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) |
0305db28 JB |
60 | } |
61 | ||
cfd57d2c C |
62 | // --------------------------------------------------------------------------- |
63 | ||
9ab330b9 C |
64 | function updateObjectACL (options: { |
65 | objectStorageKey: string | |
66 | bucketInfo: BucketInfo | |
67 | isPrivate: boolean | |
68 | }) { | |
69 | const { objectStorageKey, bucketInfo, isPrivate } = options | |
70 | ||
71 | const key = buildKey(objectStorageKey, bucketInfo) | |
72 | ||
73 | logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) | |
74 | ||
75 | const command = new PutObjectAclCommand({ | |
0305db28 | 76 | Bucket: bucketInfo.BUCKET_NAME, |
9ab330b9 C |
77 | Key: key, |
78 | ACL: getACL(isPrivate) | |
0305db28 JB |
79 | }) |
80 | ||
81 | return getClient().send(command) | |
82 | } | |
83 | ||
9ab330b9 C |
84 | function updatePrefixACL (options: { |
85 | prefix: string | |
86 | bucketInfo: BucketInfo | |
87 | isPrivate: boolean | |
88 | }) { | |
89 | const { prefix, bucketInfo, isPrivate } = options | |
90 | ||
91 | logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) | |
92 | ||
93 | return applyOnPrefix({ | |
94 | prefix, | |
95 | bucketInfo, | |
96 | commandBuilder: obj => { | |
508c1b1e C |
97 | logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) |
98 | ||
9ab330b9 C |
99 | return new PutObjectAclCommand({ |
100 | Bucket: bucketInfo.BUCKET_NAME, | |
101 | Key: obj.Key, | |
102 | ACL: getACL(isPrivate) | |
103 | }) | |
104 | } | |
0305db28 | 105 | }) |
9ab330b9 | 106 | } |
0305db28 | 107 | |
9ab330b9 | 108 | // --------------------------------------------------------------------------- |
0305db28 | 109 | |
9ab330b9 C |
110 | function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { |
111 | const key = buildKey(objectStorageKey, bucketInfo) | |
0305db28 | 112 | |
aa887096 C |
113 | return removeObjectByFullKey(key, bucketInfo) |
114 | } | |
115 | ||
116 | function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) { | |
117 | logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags()) | |
0305db28 | 118 | |
9ab330b9 C |
119 | const command = new DeleteObjectCommand({ |
120 | Bucket: bucketInfo.BUCKET_NAME, | |
aa887096 | 121 | Key: fullKey |
9ab330b9 | 122 | }) |
0305db28 | 123 | |
9ab330b9 C |
124 | return getClient().send(command) |
125 | } | |
0305db28 | 126 | |
508c1b1e | 127 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { |
0305db28 | 128 | // FIXME: use bulk delete when s3ninja will support this operation |
0305db28 | 129 | |
9ab330b9 C |
130 | logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) |
131 | ||
132 | return applyOnPrefix({ | |
133 | prefix, | |
134 | bucketInfo, | |
135 | commandBuilder: obj => { | |
508c1b1e C |
136 | logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) |
137 | ||
9ab330b9 C |
138 | return new DeleteObjectCommand({ |
139 | Bucket: bucketInfo.BUCKET_NAME, | |
140 | Key: obj.Key | |
141 | }) | |
142 | } | |
143 | }) | |
0305db28 JB |
144 | } |
145 | ||
cfd57d2c C |
146 | // --------------------------------------------------------------------------- |
147 | ||
0305db28 JB |
148 | async function makeAvailable (options: { |
149 | key: string | |
150 | destination: string | |
151 | bucketInfo: BucketInfo | |
152 | }) { | |
153 | const { key, destination, bucketInfo } = options | |
154 | ||
155 | await ensureDir(dirname(options.destination)) | |
156 | ||
157 | const command = new GetObjectCommand({ | |
158 | Bucket: bucketInfo.BUCKET_NAME, | |
159 | Key: buildKey(key, bucketInfo) | |
160 | }) | |
161 | const response = await getClient().send(command) | |
162 | ||
163 | const file = createWriteStream(destination) | |
164 | await pipelinePromise(response.Body as Readable, file) | |
165 | ||
166 | file.close() | |
167 | } | |
168 | ||
169 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
170 | return bucketInfo.PREFIX + key | |
171 | } | |
172 | ||
173 | // --------------------------------------------------------------------------- | |
174 | ||
9ab330b9 C |
175 | async function createObjectReadStream (options: { |
176 | key: string | |
177 | bucketInfo: BucketInfo | |
178 | rangeHeader: string | |
179 | }) { | |
180 | const { key, bucketInfo, rangeHeader } = options | |
181 | ||
182 | const command = new GetObjectCommand({ | |
183 | Bucket: bucketInfo.BUCKET_NAME, | |
184 | Key: buildKey(key, bucketInfo), | |
185 | Range: rangeHeader | |
186 | }) | |
187 | ||
188 | const response = await getClient().send(command) | |
189 | ||
57e11a20 C |
190 | return { |
191 | response, | |
192 | stream: response.Body as Readable | |
193 | } | |
9ab330b9 C |
194 | } |
195 | ||
196 | // --------------------------------------------------------------------------- | |
197 | ||
0305db28 JB |
198 | export { |
199 | BucketInfo, | |
200 | buildKey, | |
9ab330b9 | 201 | |
0305db28 | 202 | storeObject, |
9ab330b9 | 203 | |
0305db28 | 204 | removeObject, |
aa887096 | 205 | removeObjectByFullKey, |
0305db28 | 206 | removePrefix, |
9ab330b9 | 207 | |
cfd57d2c | 208 | makeAvailable, |
9ab330b9 C |
209 | |
210 | updateObjectACL, | |
211 | updatePrefixACL, | |
212 | ||
213 | listKeysOfPrefix, | |
214 | createObjectReadStream | |
0305db28 JB |
215 | } |
216 | ||
217 | // --------------------------------------------------------------------------- | |
218 | ||
23c0b67d | 219 | async function uploadToStorage (options: { |
156cdbac | 220 | content: ReadStream |
0305db28 JB |
221 | objectStorageKey: string |
222 | bucketInfo: BucketInfo | |
9ab330b9 | 223 | isPrivate: boolean |
0305db28 | 224 | }) { |
9ab330b9 | 225 | const { content, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 | 226 | |
156cdbac | 227 | const input: PutObjectCommandInput = { |
228 | Body: content, | |
0305db28 | 229 | Bucket: bucketInfo.BUCKET_NAME, |
9ab330b9 C |
230 | Key: buildKey(objectStorageKey, bucketInfo), |
231 | ACL: getACL(isPrivate) | |
f9915efa DL |
232 | } |
233 | ||
156cdbac | 234 | const parallelUploads3 = new Upload({ |
235 | client: getClient(), | |
236 | queueSize: 4, | |
237 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
23c0b67d | 238 | |
239 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | |
240 | // More detailed explanation: | |
241 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | |
242 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | |
243 | leavePartsOnError: true, | |
156cdbac | 244 | params: input |
0305db28 | 245 | }) |
156cdbac | 246 | |
23c0b67d | 247 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput |
248 | // Check is needed even if the HTTP status code is 200 OK | |
249 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | |
250 | if (!response.Bucket) { | |
251 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | |
252 | logger.error(message, { response, ...lTags() }) | |
253 | throw new Error(message) | |
254 | } | |
0305db28 JB |
255 | |
256 | logger.debug( | |
156cdbac | 257 | 'Completed %s%s in bucket %s', |
258 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
0305db28 JB |
259 | ) |
260 | ||
9ab330b9 C |
261 | return getInternalUrl(bucketInfo, objectStorageKey) |
262 | } | |
263 | ||
264 | async function applyOnPrefix (options: { | |
265 | prefix: string | |
266 | bucketInfo: BucketInfo | |
267 | commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0] | |
268 | ||
269 | continuationToken?: string | |
270 | }) { | |
271 | const { prefix, bucketInfo, commandBuilder, continuationToken } = options | |
272 | ||
273 | const s3Client = getClient() | |
274 | ||
508c1b1e | 275 | const commandPrefix = buildKey(prefix, bucketInfo) |
9ab330b9 C |
276 | const listCommand = new ListObjectsV2Command({ |
277 | Bucket: bucketInfo.BUCKET_NAME, | |
278 | Prefix: commandPrefix, | |
279 | ContinuationToken: continuationToken | |
280 | }) | |
281 | ||
282 | const listedObjects = await s3Client.send(listCommand) | |
283 | ||
284 | if (isArray(listedObjects.Contents) !== true) { | |
285 | const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
286 | ||
287 | logger.error(message, { response: listedObjects, ...lTags() }) | |
288 | throw new Error(message) | |
289 | } | |
290 | ||
508c1b1e | 291 | await map(listedObjects.Contents, object => { |
9ab330b9 C |
292 | const command = commandBuilder(object) |
293 | ||
508c1b1e C |
294 | return s3Client.send(command) |
295 | }, { concurrency: 10 }) | |
9ab330b9 C |
296 | |
297 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
298 | if (listedObjects.IsTruncated) { | |
299 | await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) | |
300 | } | |
301 | } | |
302 | ||
303 | function getACL (isPrivate: boolean) { | |
304 | return isPrivate | |
305 | ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE | |
306 | : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC | |
0305db28 | 307 | } |