]>
Commit | Line | Data |
---|---|---|
0305db28 JB |
1 | import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' |
2 | import { min } from 'lodash' | |
3 | import { dirname } from 'path' | |
4 | import { Readable } from 'stream' | |
5 | import { | |
6 | CompletedPart, | |
7 | CompleteMultipartUploadCommand, | |
8 | CreateMultipartUploadCommand, | |
f9915efa | 9 | CreateMultipartUploadCommandInput, |
0305db28 JB |
10 | DeleteObjectCommand, |
11 | GetObjectCommand, | |
12 | ListObjectsV2Command, | |
13 | PutObjectCommand, | |
f9915efa | 14 | PutObjectCommandInput, |
0305db28 JB |
15 | UploadPartCommand |
16 | } from '@aws-sdk/client-s3' | |
17 | import { pipelinePromise } from '@server/helpers/core-utils' | |
18 | import { isArray } from '@server/helpers/custom-validators/misc' | |
19 | import { logger } from '@server/helpers/logger' | |
20 | import { CONFIG } from '@server/initializers/config' | |
21 | import { getPrivateUrl } from '../urls' | |
22 | import { getClient } from './client' | |
23 | import { lTags } from './logger' | |
24 | ||
25 | type BucketInfo = { | |
26 | BUCKET_NAME: string | |
27 | PREFIX?: string | |
28 | } | |
29 | ||
30 | async function storeObject (options: { | |
31 | inputPath: string | |
32 | objectStorageKey: string | |
33 | bucketInfo: BucketInfo | |
34 | }): Promise<string> { | |
35 | const { inputPath, objectStorageKey, bucketInfo } = options | |
36 | ||
37 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
38 | ||
39 | const stats = await stat(inputPath) | |
40 | ||
41 | // If bigger than max allowed size we do a multipart upload | |
42 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | |
43 | return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) | |
44 | } | |
45 | ||
46 | const fileStream = createReadStream(inputPath) | |
47 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | |
48 | } | |
49 | ||
50 | async function removeObject (filename: string, bucketInfo: BucketInfo) { | |
51 | const command = new DeleteObjectCommand({ | |
52 | Bucket: bucketInfo.BUCKET_NAME, | |
53 | Key: buildKey(filename, bucketInfo) | |
54 | }) | |
55 | ||
56 | return getClient().send(command) | |
57 | } | |
58 | ||
59 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |
60 | const s3Client = getClient() | |
61 | ||
62 | const commandPrefix = bucketInfo.PREFIX + prefix | |
63 | const listCommand = new ListObjectsV2Command({ | |
64 | Bucket: bucketInfo.BUCKET_NAME, | |
65 | Prefix: commandPrefix | |
66 | }) | |
67 | ||
68 | const listedObjects = await s3Client.send(listCommand) | |
69 | ||
70 | // FIXME: use bulk delete when s3ninja will support this operation | |
71 | // const deleteParams = { | |
72 | // Bucket: bucketInfo.BUCKET_NAME, | |
73 | // Delete: { Objects: [] } | |
74 | // } | |
75 | ||
76 | if (isArray(listedObjects.Contents) !== true) { | |
77 | const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
78 | ||
79 | logger.error(message, { response: listedObjects, ...lTags() }) | |
80 | throw new Error(message) | |
81 | } | |
82 | ||
83 | for (const object of listedObjects.Contents) { | |
84 | const command = new DeleteObjectCommand({ | |
85 | Bucket: bucketInfo.BUCKET_NAME, | |
86 | Key: object.Key | |
87 | }) | |
88 | ||
89 | await s3Client.send(command) | |
90 | ||
91 | // FIXME: use bulk delete when s3ninja will support this operation | |
92 | // deleteParams.Delete.Objects.push({ Key: object.Key }) | |
93 | } | |
94 | ||
95 | // FIXME: use bulk delete when s3ninja will support this operation | |
96 | // const deleteCommand = new DeleteObjectsCommand(deleteParams) | |
97 | // await s3Client.send(deleteCommand) | |
98 | ||
99 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
100 | if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | |
101 | } | |
102 | ||
103 | async function makeAvailable (options: { | |
104 | key: string | |
105 | destination: string | |
106 | bucketInfo: BucketInfo | |
107 | }) { | |
108 | const { key, destination, bucketInfo } = options | |
109 | ||
110 | await ensureDir(dirname(options.destination)) | |
111 | ||
112 | const command = new GetObjectCommand({ | |
113 | Bucket: bucketInfo.BUCKET_NAME, | |
114 | Key: buildKey(key, bucketInfo) | |
115 | }) | |
116 | const response = await getClient().send(command) | |
117 | ||
118 | const file = createWriteStream(destination) | |
119 | await pipelinePromise(response.Body as Readable, file) | |
120 | ||
121 | file.close() | |
122 | } | |
123 | ||
124 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
125 | return bucketInfo.PREFIX + key | |
126 | } | |
127 | ||
128 | // --------------------------------------------------------------------------- | |
129 | ||
130 | export { | |
131 | BucketInfo, | |
132 | buildKey, | |
133 | storeObject, | |
134 | removeObject, | |
135 | removePrefix, | |
136 | makeAvailable | |
137 | } | |
138 | ||
139 | // --------------------------------------------------------------------------- | |
140 | ||
141 | async function objectStoragePut (options: { | |
142 | objectStorageKey: string | |
143 | content: ReadStream | |
144 | bucketInfo: BucketInfo | |
145 | }) { | |
146 | const { objectStorageKey, content, bucketInfo } = options | |
147 | ||
f9915efa | 148 | const input: PutObjectCommandInput = { |
0305db28 JB |
149 | Bucket: bucketInfo.BUCKET_NAME, |
150 | Key: buildKey(objectStorageKey, bucketInfo), | |
f9915efa DL |
151 | Body: content |
152 | } | |
153 | ||
154 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
155 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
156 | } | |
157 | ||
158 | const command = new PutObjectCommand(input) | |
0305db28 JB |
159 | |
160 | await getClient().send(command) | |
161 | ||
162 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
163 | } | |
164 | ||
165 | async function multiPartUpload (options: { | |
166 | inputPath: string | |
167 | objectStorageKey: string | |
168 | bucketInfo: BucketInfo | |
169 | }) { | |
170 | const { objectStorageKey, inputPath, bucketInfo } = options | |
171 | ||
172 | const key = buildKey(objectStorageKey, bucketInfo) | |
173 | const s3Client = getClient() | |
174 | ||
175 | const statResult = await stat(inputPath) | |
176 | ||
f9915efa | 177 | const input: CreateMultipartUploadCommandInput = { |
0305db28 | 178 | Bucket: bucketInfo.BUCKET_NAME, |
f9915efa DL |
179 | Key: buildKey(objectStorageKey, bucketInfo) |
180 | } | |
181 | ||
182 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
183 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
184 | } | |
185 | ||
186 | const createMultipartCommand = new CreateMultipartUploadCommand(input) | |
0305db28 JB |
187 | const createResponse = await s3Client.send(createMultipartCommand) |
188 | ||
189 | const fd = await open(inputPath, 'r') | |
190 | let partNumber = 1 | |
191 | const parts: CompletedPart[] = [] | |
192 | const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART | |
193 | ||
194 | for (let start = 0; start < statResult.size; start += partSize) { | |
195 | logger.debug( | |
196 | 'Uploading part %d of file to %s%s in bucket %s', | |
197 | partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
198 | ) | |
199 | ||
200 | // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released | |
201 | // The s3 sdk needs to know the length of the http body beforehand, but doesn't support | |
202 | // streams with start and end set, so it just tries to stat the file in stream.path. | |
203 | // This fails for us because we only want to send part of the file. The stream type | |
204 | // is modified so we can set the byteLength here, which s3 detects because array buffers | |
205 | // have this field set | |
206 | const stream: ReadStream & { byteLength: number } = | |
207 | createReadStream( | |
208 | inputPath, | |
209 | { fd, autoClose: false, start, end: (start + partSize) - 1 } | |
210 | ) as ReadStream & { byteLength: number } | |
211 | ||
212 | // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength | |
213 | stream.byteLength = min([ statResult.size - start, partSize ]) | |
214 | ||
215 | const uploadPartCommand = new UploadPartCommand({ | |
216 | Bucket: bucketInfo.BUCKET_NAME, | |
217 | Key: key, | |
218 | UploadId: createResponse.UploadId, | |
219 | PartNumber: partNumber, | |
220 | Body: stream | |
221 | }) | |
222 | const uploadResponse = await s3Client.send(uploadPartCommand) | |
223 | ||
224 | parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) | |
225 | partNumber += 1 | |
226 | } | |
227 | await close(fd) | |
228 | ||
229 | const completeUploadCommand = new CompleteMultipartUploadCommand({ | |
230 | Bucket: bucketInfo.BUCKET_NAME, | |
1c93ad07 | 231 | Key: key, |
0305db28 JB |
232 | UploadId: createResponse.UploadId, |
233 | MultipartUpload: { Parts: parts } | |
234 | }) | |
235 | await s3Client.send(completeUploadCommand) | |
236 | ||
237 | logger.debug( | |
238 | 'Completed %s%s in bucket %s in %d parts', | |
239 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() | |
240 | ) | |
241 | ||
242 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
243 | } |