]>
Commit | Line | Data |
---|---|---|
1 | import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' | |
2 | import { min } from 'lodash' | |
3 | import { dirname } from 'path' | |
4 | import { Readable } from 'stream' | |
5 | import { | |
6 | CompletedPart, | |
7 | CompleteMultipartUploadCommand, | |
8 | CreateMultipartUploadCommand, | |
9 | DeleteObjectCommand, | |
10 | GetObjectCommand, | |
11 | ListObjectsV2Command, | |
12 | PutObjectCommand, | |
13 | UploadPartCommand | |
14 | } from '@aws-sdk/client-s3' | |
15 | import { pipelinePromise } from '@server/helpers/core-utils' | |
16 | import { isArray } from '@server/helpers/custom-validators/misc' | |
17 | import { logger } from '@server/helpers/logger' | |
18 | import { CONFIG } from '@server/initializers/config' | |
19 | import { getPrivateUrl } from '../urls' | |
20 | import { getClient } from './client' | |
21 | import { lTags } from './logger' | |
22 | ||
23 | type BucketInfo = { | |
24 | BUCKET_NAME: string | |
25 | PREFIX?: string | |
26 | } | |
27 | ||
28 | async function storeObject (options: { | |
29 | inputPath: string | |
30 | objectStorageKey: string | |
31 | bucketInfo: BucketInfo | |
32 | }): Promise<string> { | |
33 | const { inputPath, objectStorageKey, bucketInfo } = options | |
34 | ||
35 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
36 | ||
37 | const stats = await stat(inputPath) | |
38 | ||
39 | // If bigger than max allowed size we do a multipart upload | |
40 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | |
41 | return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) | |
42 | } | |
43 | ||
44 | const fileStream = createReadStream(inputPath) | |
45 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | |
46 | } | |
47 | ||
48 | async function removeObject (filename: string, bucketInfo: BucketInfo) { | |
49 | const command = new DeleteObjectCommand({ | |
50 | Bucket: bucketInfo.BUCKET_NAME, | |
51 | Key: buildKey(filename, bucketInfo) | |
52 | }) | |
53 | ||
54 | return getClient().send(command) | |
55 | } | |
56 | ||
57 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |
58 | const s3Client = getClient() | |
59 | ||
60 | const commandPrefix = bucketInfo.PREFIX + prefix | |
61 | const listCommand = new ListObjectsV2Command({ | |
62 | Bucket: bucketInfo.BUCKET_NAME, | |
63 | Prefix: commandPrefix | |
64 | }) | |
65 | ||
66 | const listedObjects = await s3Client.send(listCommand) | |
67 | ||
68 | // FIXME: use bulk delete when s3ninja will support this operation | |
69 | // const deleteParams = { | |
70 | // Bucket: bucketInfo.BUCKET_NAME, | |
71 | // Delete: { Objects: [] } | |
72 | // } | |
73 | ||
74 | if (isArray(listedObjects.Contents) !== true) { | |
75 | const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
76 | ||
77 | logger.error(message, { response: listedObjects, ...lTags() }) | |
78 | throw new Error(message) | |
79 | } | |
80 | ||
81 | for (const object of listedObjects.Contents) { | |
82 | const command = new DeleteObjectCommand({ | |
83 | Bucket: bucketInfo.BUCKET_NAME, | |
84 | Key: object.Key | |
85 | }) | |
86 | ||
87 | await s3Client.send(command) | |
88 | ||
89 | // FIXME: use bulk delete when s3ninja will support this operation | |
90 | // deleteParams.Delete.Objects.push({ Key: object.Key }) | |
91 | } | |
92 | ||
93 | // FIXME: use bulk delete when s3ninja will support this operation | |
94 | // const deleteCommand = new DeleteObjectsCommand(deleteParams) | |
95 | // await s3Client.send(deleteCommand) | |
96 | ||
97 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
98 | if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | |
99 | } | |
100 | ||
101 | async function makeAvailable (options: { | |
102 | key: string | |
103 | destination: string | |
104 | bucketInfo: BucketInfo | |
105 | }) { | |
106 | const { key, destination, bucketInfo } = options | |
107 | ||
108 | await ensureDir(dirname(options.destination)) | |
109 | ||
110 | const command = new GetObjectCommand({ | |
111 | Bucket: bucketInfo.BUCKET_NAME, | |
112 | Key: buildKey(key, bucketInfo) | |
113 | }) | |
114 | const response = await getClient().send(command) | |
115 | ||
116 | const file = createWriteStream(destination) | |
117 | await pipelinePromise(response.Body as Readable, file) | |
118 | ||
119 | file.close() | |
120 | } | |
121 | ||
122 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
123 | return bucketInfo.PREFIX + key | |
124 | } | |
125 | ||
126 | // --------------------------------------------------------------------------- | |
127 | ||
128 | export { | |
129 | BucketInfo, | |
130 | buildKey, | |
131 | storeObject, | |
132 | removeObject, | |
133 | removePrefix, | |
134 | makeAvailable | |
135 | } | |
136 | ||
137 | // --------------------------------------------------------------------------- | |
138 | ||
139 | async function objectStoragePut (options: { | |
140 | objectStorageKey: string | |
141 | content: ReadStream | |
142 | bucketInfo: BucketInfo | |
143 | }) { | |
144 | const { objectStorageKey, content, bucketInfo } = options | |
145 | ||
146 | const command = new PutObjectCommand({ | |
147 | Bucket: bucketInfo.BUCKET_NAME, | |
148 | Key: buildKey(objectStorageKey, bucketInfo), | |
149 | Body: content, | |
150 | ACL: 'public-read' | |
151 | }) | |
152 | ||
153 | await getClient().send(command) | |
154 | ||
155 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
156 | } | |
157 | ||
158 | async function multiPartUpload (options: { | |
159 | inputPath: string | |
160 | objectStorageKey: string | |
161 | bucketInfo: BucketInfo | |
162 | }) { | |
163 | const { objectStorageKey, inputPath, bucketInfo } = options | |
164 | ||
165 | const key = buildKey(objectStorageKey, bucketInfo) | |
166 | const s3Client = getClient() | |
167 | ||
168 | const statResult = await stat(inputPath) | |
169 | ||
170 | const createMultipartCommand = new CreateMultipartUploadCommand({ | |
171 | Bucket: bucketInfo.BUCKET_NAME, | |
172 | Key: key, | |
173 | ACL: 'public-read' | |
174 | }) | |
175 | const createResponse = await s3Client.send(createMultipartCommand) | |
176 | ||
177 | const fd = await open(inputPath, 'r') | |
178 | let partNumber = 1 | |
179 | const parts: CompletedPart[] = [] | |
180 | const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART | |
181 | ||
182 | for (let start = 0; start < statResult.size; start += partSize) { | |
183 | logger.debug( | |
184 | 'Uploading part %d of file to %s%s in bucket %s', | |
185 | partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
186 | ) | |
187 | ||
188 | // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released | |
189 | // The s3 sdk needs to know the length of the http body beforehand, but doesn't support | |
190 | // streams with start and end set, so it just tries to stat the file in stream.path. | |
191 | // This fails for us because we only want to send part of the file. The stream type | |
192 | // is modified so we can set the byteLength here, which s3 detects because array buffers | |
193 | // have this field set | |
194 | const stream: ReadStream & { byteLength: number } = | |
195 | createReadStream( | |
196 | inputPath, | |
197 | { fd, autoClose: false, start, end: (start + partSize) - 1 } | |
198 | ) as ReadStream & { byteLength: number } | |
199 | ||
200 | // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength | |
201 | stream.byteLength = min([ statResult.size - start, partSize ]) | |
202 | ||
203 | const uploadPartCommand = new UploadPartCommand({ | |
204 | Bucket: bucketInfo.BUCKET_NAME, | |
205 | Key: key, | |
206 | UploadId: createResponse.UploadId, | |
207 | PartNumber: partNumber, | |
208 | Body: stream | |
209 | }) | |
210 | const uploadResponse = await s3Client.send(uploadPartCommand) | |
211 | ||
212 | parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) | |
213 | partNumber += 1 | |
214 | } | |
215 | await close(fd) | |
216 | ||
217 | const completeUploadCommand = new CompleteMultipartUploadCommand({ | |
218 | Bucket: bucketInfo.BUCKET_NAME, | |
219 | Key: objectStorageKey, | |
220 | UploadId: createResponse.UploadId, | |
221 | MultipartUpload: { Parts: parts } | |
222 | }) | |
223 | await s3Client.send(completeUploadCommand) | |
224 | ||
225 | logger.debug( | |
226 | 'Completed %s%s in bucket %s in %d parts', | |
227 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() | |
228 | ) | |
229 | ||
230 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
231 | } |