]>
Commit | Line | Data |
---|---|---|
23c0b67d | 1 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' |
0305db28 JB |
2 | import { dirname } from 'path' |
3 | import { Readable } from 'stream' | |
4 | import { | |
9ab330b9 | 5 | _Object, |
23c0b67d | 6 | CompleteMultipartUploadCommandOutput, |
0305db28 JB |
7 | DeleteObjectCommand, |
8 | GetObjectCommand, | |
9 | ListObjectsV2Command, | |
9ab330b9 C |
10 | PutObjectAclCommand, |
11 | PutObjectCommandInput, | |
12 | S3Client | |
0305db28 | 13 | } from '@aws-sdk/client-s3' |
156cdbac | 14 | import { Upload } from '@aws-sdk/lib-storage' |
0305db28 JB |
15 | import { pipelinePromise } from '@server/helpers/core-utils' |
16 | import { isArray } from '@server/helpers/custom-validators/misc' | |
17 | import { logger } from '@server/helpers/logger' | |
18 | import { CONFIG } from '@server/initializers/config' | |
9ab330b9 | 19 | import { getInternalUrl } from '../urls' |
0305db28 JB |
20 | import { getClient } from './client' |
21 | import { lTags } from './logger' | |
22 | ||
23 | type BucketInfo = { | |
24 | BUCKET_NAME: string | |
25 | PREFIX?: string | |
26 | } | |
27 | ||
cfd57d2c C |
28 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { |
29 | const s3Client = getClient() | |
30 | ||
31 | const commandPrefix = bucketInfo.PREFIX + prefix | |
32 | const listCommand = new ListObjectsV2Command({ | |
33 | Bucket: bucketInfo.BUCKET_NAME, | |
34 | Prefix: commandPrefix | |
35 | }) | |
36 | ||
37 | const listedObjects = await s3Client.send(listCommand) | |
38 | ||
39 | if (isArray(listedObjects.Contents) !== true) return [] | |
40 | ||
41 | return listedObjects.Contents.map(c => c.Key) | |
42 | } | |
43 | ||
44 | // --------------------------------------------------------------------------- | |
45 | ||
0305db28 JB |
46 | async function storeObject (options: { |
47 | inputPath: string | |
48 | objectStorageKey: string | |
49 | bucketInfo: BucketInfo | |
9ab330b9 | 50 | isPrivate: boolean |
0305db28 | 51 | }): Promise<string> { |
9ab330b9 | 52 | const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 JB |
53 | |
54 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
55 | ||
156cdbac | 56 | const fileStream = createReadStream(inputPath) |
0305db28 | 57 | |
9ab330b9 | 58 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) |
0305db28 JB |
59 | } |
60 | ||
cfd57d2c C |
61 | // --------------------------------------------------------------------------- |
62 | ||
9ab330b9 C |
63 | function updateObjectACL (options: { |
64 | objectStorageKey: string | |
65 | bucketInfo: BucketInfo | |
66 | isPrivate: boolean | |
67 | }) { | |
68 | const { objectStorageKey, bucketInfo, isPrivate } = options | |
69 | ||
70 | const key = buildKey(objectStorageKey, bucketInfo) | |
71 | ||
72 | logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) | |
73 | ||
74 | const command = new PutObjectAclCommand({ | |
0305db28 | 75 | Bucket: bucketInfo.BUCKET_NAME, |
9ab330b9 C |
76 | Key: key, |
77 | ACL: getACL(isPrivate) | |
0305db28 JB |
78 | }) |
79 | ||
80 | return getClient().send(command) | |
81 | } | |
82 | ||
9ab330b9 C |
83 | function updatePrefixACL (options: { |
84 | prefix: string | |
85 | bucketInfo: BucketInfo | |
86 | isPrivate: boolean | |
87 | }) { | |
88 | const { prefix, bucketInfo, isPrivate } = options | |
89 | ||
90 | logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) | |
91 | ||
92 | return applyOnPrefix({ | |
93 | prefix, | |
94 | bucketInfo, | |
95 | commandBuilder: obj => { | |
96 | return new PutObjectAclCommand({ | |
97 | Bucket: bucketInfo.BUCKET_NAME, | |
98 | Key: obj.Key, | |
99 | ACL: getACL(isPrivate) | |
100 | }) | |
101 | } | |
0305db28 | 102 | }) |
9ab330b9 | 103 | } |
0305db28 | 104 | |
9ab330b9 | 105 | // --------------------------------------------------------------------------- |
0305db28 | 106 | |
9ab330b9 C |
107 | function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { |
108 | const key = buildKey(objectStorageKey, bucketInfo) | |
0305db28 | 109 | |
9ab330b9 | 110 | logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) |
0305db28 | 111 | |
9ab330b9 C |
112 | const command = new DeleteObjectCommand({ |
113 | Bucket: bucketInfo.BUCKET_NAME, | |
114 | Key: key | |
115 | }) | |
0305db28 | 116 | |
9ab330b9 C |
117 | return getClient().send(command) |
118 | } | |
0305db28 | 119 | |
9ab330b9 | 120 | function removePrefix (prefix: string, bucketInfo: BucketInfo) { |
0305db28 | 121 | // FIXME: use bulk delete when s3ninja will support this operation |
0305db28 | 122 | |
9ab330b9 C |
123 | logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) |
124 | ||
125 | return applyOnPrefix({ | |
126 | prefix, | |
127 | bucketInfo, | |
128 | commandBuilder: obj => { | |
129 | return new DeleteObjectCommand({ | |
130 | Bucket: bucketInfo.BUCKET_NAME, | |
131 | Key: obj.Key | |
132 | }) | |
133 | } | |
134 | }) | |
0305db28 JB |
135 | } |
136 | ||
cfd57d2c C |
137 | // --------------------------------------------------------------------------- |
138 | ||
0305db28 JB |
139 | async function makeAvailable (options: { |
140 | key: string | |
141 | destination: string | |
142 | bucketInfo: BucketInfo | |
143 | }) { | |
144 | const { key, destination, bucketInfo } = options | |
145 | ||
146 | await ensureDir(dirname(options.destination)) | |
147 | ||
148 | const command = new GetObjectCommand({ | |
149 | Bucket: bucketInfo.BUCKET_NAME, | |
150 | Key: buildKey(key, bucketInfo) | |
151 | }) | |
152 | const response = await getClient().send(command) | |
153 | ||
154 | const file = createWriteStream(destination) | |
155 | await pipelinePromise(response.Body as Readable, file) | |
156 | ||
157 | file.close() | |
158 | } | |
159 | ||
160 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
161 | return bucketInfo.PREFIX + key | |
162 | } | |
163 | ||
164 | // --------------------------------------------------------------------------- | |
165 | ||
9ab330b9 C |
166 | async function createObjectReadStream (options: { |
167 | key: string | |
168 | bucketInfo: BucketInfo | |
169 | rangeHeader: string | |
170 | }) { | |
171 | const { key, bucketInfo, rangeHeader } = options | |
172 | ||
173 | const command = new GetObjectCommand({ | |
174 | Bucket: bucketInfo.BUCKET_NAME, | |
175 | Key: buildKey(key, bucketInfo), | |
176 | Range: rangeHeader | |
177 | }) | |
178 | ||
179 | const response = await getClient().send(command) | |
180 | ||
181 | return response.Body as Readable | |
182 | } | |
183 | ||
184 | // --------------------------------------------------------------------------- | |
185 | ||
0305db28 JB |
186 | export { |
187 | BucketInfo, | |
188 | buildKey, | |
9ab330b9 | 189 | |
0305db28 | 190 | storeObject, |
9ab330b9 | 191 | |
0305db28 JB |
192 | removeObject, |
193 | removePrefix, | |
9ab330b9 | 194 | |
cfd57d2c | 195 | makeAvailable, |
9ab330b9 C |
196 | |
197 | updateObjectACL, | |
198 | updatePrefixACL, | |
199 | ||
200 | listKeysOfPrefix, | |
201 | createObjectReadStream | |
0305db28 JB |
202 | } |
203 | ||
204 | // --------------------------------------------------------------------------- | |
205 | ||
23c0b67d | 206 | async function uploadToStorage (options: { |
156cdbac | 207 | content: ReadStream |
0305db28 JB |
208 | objectStorageKey: string |
209 | bucketInfo: BucketInfo | |
9ab330b9 | 210 | isPrivate: boolean |
0305db28 | 211 | }) { |
9ab330b9 | 212 | const { content, objectStorageKey, bucketInfo, isPrivate } = options |
0305db28 | 213 | |
156cdbac | 214 | const input: PutObjectCommandInput = { |
215 | Body: content, | |
0305db28 | 216 | Bucket: bucketInfo.BUCKET_NAME, |
9ab330b9 C |
217 | Key: buildKey(objectStorageKey, bucketInfo), |
218 | ACL: getACL(isPrivate) | |
f9915efa DL |
219 | } |
220 | ||
156cdbac | 221 | const parallelUploads3 = new Upload({ |
222 | client: getClient(), | |
223 | queueSize: 4, | |
224 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
23c0b67d | 225 | |
226 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | |
227 | // More detailed explanation: | |
228 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | |
229 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | |
230 | leavePartsOnError: true, | |
156cdbac | 231 | params: input |
0305db28 | 232 | }) |
156cdbac | 233 | |
23c0b67d | 234 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput |
235 | // Check is needed even if the HTTP status code is 200 OK | |
236 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | |
237 | if (!response.Bucket) { | |
238 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | |
239 | logger.error(message, { response, ...lTags() }) | |
240 | throw new Error(message) | |
241 | } | |
0305db28 JB |
242 | |
243 | logger.debug( | |
156cdbac | 244 | 'Completed %s%s in bucket %s', |
245 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
0305db28 JB |
246 | ) |
247 | ||
9ab330b9 C |
248 | return getInternalUrl(bucketInfo, objectStorageKey) |
249 | } | |
250 | ||
251 | async function applyOnPrefix (options: { | |
252 | prefix: string | |
253 | bucketInfo: BucketInfo | |
254 | commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0] | |
255 | ||
256 | continuationToken?: string | |
257 | }) { | |
258 | const { prefix, bucketInfo, commandBuilder, continuationToken } = options | |
259 | ||
260 | const s3Client = getClient() | |
261 | ||
262 | const commandPrefix = bucketInfo.PREFIX + prefix | |
263 | const listCommand = new ListObjectsV2Command({ | |
264 | Bucket: bucketInfo.BUCKET_NAME, | |
265 | Prefix: commandPrefix, | |
266 | ContinuationToken: continuationToken | |
267 | }) | |
268 | ||
269 | const listedObjects = await s3Client.send(listCommand) | |
270 | ||
271 | if (isArray(listedObjects.Contents) !== true) { | |
272 | const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
273 | ||
274 | logger.error(message, { response: listedObjects, ...lTags() }) | |
275 | throw new Error(message) | |
276 | } | |
277 | ||
278 | for (const object of listedObjects.Contents) { | |
279 | const command = commandBuilder(object) | |
280 | ||
281 | await s3Client.send(command) | |
282 | } | |
283 | ||
284 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
285 | if (listedObjects.IsTruncated) { | |
286 | await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) | |
287 | } | |
288 | } | |
289 | ||
290 | function getACL (isPrivate: boolean) { | |
291 | return isPrivate | |
292 | ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE | |
293 | : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC | |
0305db28 | 294 | } |