]>
Commit | Line | Data |
---|---|---|
1 | import { map } from 'bluebird' | |
2 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' | |
3 | import { dirname } from 'path' | |
4 | import { Readable } from 'stream' | |
5 | import { | |
6 | _Object, | |
7 | CompleteMultipartUploadCommandOutput, | |
8 | DeleteObjectCommand, | |
9 | GetObjectCommand, | |
10 | ListObjectsV2Command, | |
11 | PutObjectAclCommand, | |
12 | PutObjectCommandInput, | |
13 | S3Client | |
14 | } from '@aws-sdk/client-s3' | |
15 | import { Upload } from '@aws-sdk/lib-storage' | |
16 | import { pipelinePromise } from '@server/helpers/core-utils' | |
17 | import { isArray } from '@server/helpers/custom-validators/misc' | |
18 | import { logger } from '@server/helpers/logger' | |
19 | import { CONFIG } from '@server/initializers/config' | |
20 | import { getInternalUrl } from '../urls' | |
21 | import { getClient } from './client' | |
22 | import { lTags } from './logger' | |
23 | ||
24 | type BucketInfo = { | |
25 | BUCKET_NAME: string | |
26 | PREFIX?: string | |
27 | } | |
28 | ||
29 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { | |
30 | const s3Client = getClient() | |
31 | ||
32 | const commandPrefix = bucketInfo.PREFIX + prefix | |
33 | const listCommand = new ListObjectsV2Command({ | |
34 | Bucket: bucketInfo.BUCKET_NAME, | |
35 | Prefix: commandPrefix | |
36 | }) | |
37 | ||
38 | const listedObjects = await s3Client.send(listCommand) | |
39 | ||
40 | if (isArray(listedObjects.Contents) !== true) return [] | |
41 | ||
42 | return listedObjects.Contents.map(c => c.Key) | |
43 | } | |
44 | ||
45 | // --------------------------------------------------------------------------- | |
46 | ||
47 | async function storeObject (options: { | |
48 | inputPath: string | |
49 | objectStorageKey: string | |
50 | bucketInfo: BucketInfo | |
51 | isPrivate: boolean | |
52 | }): Promise<string> { | |
53 | const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options | |
54 | ||
55 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
56 | ||
57 | const fileStream = createReadStream(inputPath) | |
58 | ||
59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) | |
60 | } | |
61 | ||
62 | async function storeContent (options: { | |
63 | content: string | |
64 | inputPath: string | |
65 | objectStorageKey: string | |
66 | bucketInfo: BucketInfo | |
67 | isPrivate: boolean | |
68 | }): Promise<string> { | |
69 | const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options | |
70 | ||
71 | logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
72 | ||
73 | return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate }) | |
74 | } | |
75 | ||
76 | // --------------------------------------------------------------------------- | |
77 | ||
78 | async function updateObjectACL (options: { | |
79 | objectStorageKey: string | |
80 | bucketInfo: BucketInfo | |
81 | isPrivate: boolean | |
82 | }) { | |
83 | const { objectStorageKey, bucketInfo, isPrivate } = options | |
84 | ||
85 | const acl = getACL(isPrivate) | |
86 | if (!acl) return | |
87 | ||
88 | const key = buildKey(objectStorageKey, bucketInfo) | |
89 | ||
90 | logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) | |
91 | ||
92 | const command = new PutObjectAclCommand({ | |
93 | Bucket: bucketInfo.BUCKET_NAME, | |
94 | Key: key, | |
95 | ACL: acl | |
96 | }) | |
97 | ||
98 | await getClient().send(command) | |
99 | } | |
100 | ||
101 | function updatePrefixACL (options: { | |
102 | prefix: string | |
103 | bucketInfo: BucketInfo | |
104 | isPrivate: boolean | |
105 | }) { | |
106 | const { prefix, bucketInfo, isPrivate } = options | |
107 | ||
108 | const acl = getACL(isPrivate) | |
109 | if (!acl) return | |
110 | ||
111 | logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) | |
112 | ||
113 | return applyOnPrefix({ | |
114 | prefix, | |
115 | bucketInfo, | |
116 | commandBuilder: obj => { | |
117 | logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) | |
118 | ||
119 | return new PutObjectAclCommand({ | |
120 | Bucket: bucketInfo.BUCKET_NAME, | |
121 | Key: obj.Key, | |
122 | ACL: acl | |
123 | }) | |
124 | } | |
125 | }) | |
126 | } | |
127 | ||
128 | // --------------------------------------------------------------------------- | |
129 | ||
130 | function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { | |
131 | const key = buildKey(objectStorageKey, bucketInfo) | |
132 | ||
133 | return removeObjectByFullKey(key, bucketInfo) | |
134 | } | |
135 | ||
136 | function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) { | |
137 | logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags()) | |
138 | ||
139 | const command = new DeleteObjectCommand({ | |
140 | Bucket: bucketInfo.BUCKET_NAME, | |
141 | Key: fullKey | |
142 | }) | |
143 | ||
144 | return getClient().send(command) | |
145 | } | |
146 | ||
147 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |
148 | // FIXME: use bulk delete when s3ninja will support this operation | |
149 | ||
150 | logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) | |
151 | ||
152 | return applyOnPrefix({ | |
153 | prefix, | |
154 | bucketInfo, | |
155 | commandBuilder: obj => { | |
156 | logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) | |
157 | ||
158 | return new DeleteObjectCommand({ | |
159 | Bucket: bucketInfo.BUCKET_NAME, | |
160 | Key: obj.Key | |
161 | }) | |
162 | } | |
163 | }) | |
164 | } | |
165 | ||
166 | // --------------------------------------------------------------------------- | |
167 | ||
168 | async function makeAvailable (options: { | |
169 | key: string | |
170 | destination: string | |
171 | bucketInfo: BucketInfo | |
172 | }) { | |
173 | const { key, destination, bucketInfo } = options | |
174 | ||
175 | await ensureDir(dirname(options.destination)) | |
176 | ||
177 | const command = new GetObjectCommand({ | |
178 | Bucket: bucketInfo.BUCKET_NAME, | |
179 | Key: buildKey(key, bucketInfo) | |
180 | }) | |
181 | const response = await getClient().send(command) | |
182 | ||
183 | const file = createWriteStream(destination) | |
184 | await pipelinePromise(response.Body as Readable, file) | |
185 | ||
186 | file.close() | |
187 | } | |
188 | ||
189 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
190 | return bucketInfo.PREFIX + key | |
191 | } | |
192 | ||
193 | // --------------------------------------------------------------------------- | |
194 | ||
195 | async function createObjectReadStream (options: { | |
196 | key: string | |
197 | bucketInfo: BucketInfo | |
198 | rangeHeader: string | |
199 | }) { | |
200 | const { key, bucketInfo, rangeHeader } = options | |
201 | ||
202 | const command = new GetObjectCommand({ | |
203 | Bucket: bucketInfo.BUCKET_NAME, | |
204 | Key: buildKey(key, bucketInfo), | |
205 | Range: rangeHeader | |
206 | }) | |
207 | ||
208 | const response = await getClient().send(command) | |
209 | ||
210 | return { | |
211 | response, | |
212 | stream: response.Body as Readable | |
213 | } | |
214 | } | |
215 | ||
216 | // --------------------------------------------------------------------------- | |
217 | ||
218 | export { | |
219 | BucketInfo, | |
220 | buildKey, | |
221 | ||
222 | storeObject, | |
223 | storeContent, | |
224 | ||
225 | removeObject, | |
226 | removeObjectByFullKey, | |
227 | removePrefix, | |
228 | ||
229 | makeAvailable, | |
230 | ||
231 | updateObjectACL, | |
232 | updatePrefixACL, | |
233 | ||
234 | listKeysOfPrefix, | |
235 | createObjectReadStream | |
236 | } | |
237 | ||
238 | // --------------------------------------------------------------------------- | |
239 | ||
240 | async function uploadToStorage (options: { | |
241 | content: ReadStream | string | |
242 | objectStorageKey: string | |
243 | bucketInfo: BucketInfo | |
244 | isPrivate: boolean | |
245 | }) { | |
246 | const { content, objectStorageKey, bucketInfo, isPrivate } = options | |
247 | ||
248 | const input: PutObjectCommandInput = { | |
249 | Body: content, | |
250 | Bucket: bucketInfo.BUCKET_NAME, | |
251 | Key: buildKey(objectStorageKey, bucketInfo) | |
252 | } | |
253 | ||
254 | const acl = getACL(isPrivate) | |
255 | if (acl) input.ACL = acl | |
256 | ||
257 | const parallelUploads3 = new Upload({ | |
258 | client: getClient(), | |
259 | queueSize: 4, | |
260 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
261 | ||
262 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | |
263 | // More detailed explanation: | |
264 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | |
265 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | |
266 | leavePartsOnError: true, | |
267 | params: input | |
268 | }) | |
269 | ||
270 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput | |
271 | // Check is needed even if the HTTP status code is 200 OK | |
272 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | |
273 | if (!response.Bucket) { | |
274 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | |
275 | logger.error(message, { response, ...lTags() }) | |
276 | throw new Error(message) | |
277 | } | |
278 | ||
279 | logger.debug( | |
280 | 'Completed %s%s in bucket %s', | |
281 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata } | |
282 | ) | |
283 | ||
284 | return getInternalUrl(bucketInfo, objectStorageKey) | |
285 | } | |
286 | ||
287 | async function applyOnPrefix (options: { | |
288 | prefix: string | |
289 | bucketInfo: BucketInfo | |
290 | commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0] | |
291 | ||
292 | continuationToken?: string | |
293 | }) { | |
294 | const { prefix, bucketInfo, commandBuilder, continuationToken } = options | |
295 | ||
296 | const s3Client = getClient() | |
297 | ||
298 | const commandPrefix = buildKey(prefix, bucketInfo) | |
299 | const listCommand = new ListObjectsV2Command({ | |
300 | Bucket: bucketInfo.BUCKET_NAME, | |
301 | Prefix: commandPrefix, | |
302 | ContinuationToken: continuationToken | |
303 | }) | |
304 | ||
305 | const listedObjects = await s3Client.send(listCommand) | |
306 | ||
307 | if (isArray(listedObjects.Contents) !== true) { | |
308 | const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
309 | ||
310 | logger.error(message, { response: listedObjects, ...lTags() }) | |
311 | throw new Error(message) | |
312 | } | |
313 | ||
314 | await map(listedObjects.Contents, object => { | |
315 | const command = commandBuilder(object) | |
316 | ||
317 | return s3Client.send(command) | |
318 | }, { concurrency: 10 }) | |
319 | ||
320 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
321 | if (listedObjects.IsTruncated) { | |
322 | await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) | |
323 | } | |
324 | } | |
325 | ||
326 | function getACL (isPrivate: boolean) { | |
327 | return isPrivate | |
328 | ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE | |
329 | : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC | |
330 | } |