diff options
author | kontrollanten <6680299+kontrollanten@users.noreply.github.com> | 2022-04-19 15:22:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-19 15:22:18 +0200 |
commit | 156cdbac227b5501116a66063812b9fc9b0e89c7 (patch) | |
tree | 1efb4398708f1552cafeffc84c1818262dbaca04 /server/lib/object-storage/shared | |
parent | 33ac85bf610d74d353ee456a286159872cdd83cc (diff) | |
download | PeerTube-156cdbac227b5501116a66063812b9fc9b0e89c7.tar.gz PeerTube-156cdbac227b5501116a66063812b9fc9b0e89c7.tar.zst PeerTube-156cdbac227b5501116a66063812b9fc9b0e89c7.zip |
object-storage: @aws-sdk/lib-storage for multipart (#4903)
* object-storage: @aws-sdk/lib-storage for multipart
* gitignore: add .DS_Store
* test(object-storage): remove only
* test(object-storage/multipart): generate video
* fix lint issue
* test(obj-storage/video): ensure file size
* Styling
Co-authored-by: Chocobozzz <me@florianbigard.com>
Diffstat (limited to 'server/lib/object-storage/shared')
-rw-r--r-- | server/lib/object-storage/shared/object-storage-helpers.ts | 89 |
1 files changed, 19 insertions, 70 deletions
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index ecb82856e..a2de92532 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts | |||
@@ -1,19 +1,14 @@ | |||
1 | import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' | 1 | import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' |
2 | import { min } from 'lodash' | ||
3 | import { dirname } from 'path' | 2 | import { dirname } from 'path' |
4 | import { Readable } from 'stream' | 3 | import { Readable } from 'stream' |
5 | import { | 4 | import { |
6 | CompletedPart, | ||
7 | CompleteMultipartUploadCommand, | ||
8 | CreateMultipartUploadCommand, | ||
9 | CreateMultipartUploadCommandInput, | ||
10 | DeleteObjectCommand, | 5 | DeleteObjectCommand, |
11 | GetObjectCommand, | 6 | GetObjectCommand, |
12 | ListObjectsV2Command, | 7 | ListObjectsV2Command, |
13 | PutObjectCommand, | 8 | PutObjectCommand, |
14 | PutObjectCommandInput, | 9 | PutObjectCommandInput |
15 | UploadPartCommand | ||
16 | } from '@aws-sdk/client-s3' | 10 | } from '@aws-sdk/client-s3' |
11 | import { Upload } from '@aws-sdk/lib-storage' | ||
17 | import { pipelinePromise } from '@server/helpers/core-utils' | 12 | import { pipelinePromise } from '@server/helpers/core-utils' |
18 | import { isArray } from '@server/helpers/custom-validators/misc' | 13 | import { isArray } from '@server/helpers/custom-validators/misc' |
19 | import { logger } from '@server/helpers/logger' | 14 | import { logger } from '@server/helpers/logger' |
@@ -37,13 +32,12 @@ async function storeObject (options: { | |||
37 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | 32 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) |
38 | 33 | ||
39 | const stats = await stat(inputPath) | 34 | const stats = await stat(inputPath) |
35 | const fileStream = createReadStream(inputPath) | ||
40 | 36 | ||
41 | // If bigger than max allowed size we do a multipart upload | ||
42 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | 37 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { |
43 | return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) | 38 | return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) |
44 | } | 39 | } |
45 | 40 | ||
46 | const fileStream = createReadStream(inputPath) | ||
47 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | 41 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) |
48 | } | 42 | } |
49 | 43 | ||
@@ -163,18 +157,14 @@ async function objectStoragePut (options: { | |||
163 | } | 157 | } |
164 | 158 | ||
165 | async function multiPartUpload (options: { | 159 | async function multiPartUpload (options: { |
166 | inputPath: string | 160 | content: ReadStream |
167 | objectStorageKey: string | 161 | objectStorageKey: string |
168 | bucketInfo: BucketInfo | 162 | bucketInfo: BucketInfo |
169 | }) { | 163 | }) { |
170 | const { objectStorageKey, inputPath, bucketInfo } = options | 164 | const { content, objectStorageKey, bucketInfo } = options |
171 | 165 | ||
172 | const key = buildKey(objectStorageKey, bucketInfo) | 166 | const input: PutObjectCommandInput = { |
173 | const s3Client = getClient() | 167 | Body: content, |
174 | |||
175 | const statResult = await stat(inputPath) | ||
176 | |||
177 | const input: CreateMultipartUploadCommandInput = { | ||
178 | Bucket: bucketInfo.BUCKET_NAME, | 168 | Bucket: bucketInfo.BUCKET_NAME, |
179 | Key: buildKey(objectStorageKey, bucketInfo) | 169 | Key: buildKey(objectStorageKey, bucketInfo) |
180 | } | 170 | } |
@@ -183,60 +173,19 @@ async function multiPartUpload (options: { | |||
183 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | 173 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL |
184 | } | 174 | } |
185 | 175 | ||
186 | const createMultipartCommand = new CreateMultipartUploadCommand(input) | 176 | const parallelUploads3 = new Upload({ |
187 | const createResponse = await s3Client.send(createMultipartCommand) | 177 | client: getClient(), |
188 | 178 | queueSize: 4, | |
189 | const fd = await open(inputPath, 'r') | 179 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, |
190 | let partNumber = 1 | 180 | leavePartsOnError: false, |
191 | const parts: CompletedPart[] = [] | 181 | params: input |
192 | const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART | ||
193 | |||
194 | for (let start = 0; start < statResult.size; start += partSize) { | ||
195 | logger.debug( | ||
196 | 'Uploading part %d of file to %s%s in bucket %s', | ||
197 | partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | ||
198 | ) | ||
199 | |||
200 | // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released | ||
201 | // The s3 sdk needs to know the length of the http body beforehand, but doesn't support | ||
202 | // streams with start and end set, so it just tries to stat the file in stream.path. | ||
203 | // This fails for us because we only want to send part of the file. The stream type | ||
204 | // is modified so we can set the byteLength here, which s3 detects because array buffers | ||
205 | // have this field set | ||
206 | const stream: ReadStream & { byteLength: number } = | ||
207 | createReadStream( | ||
208 | inputPath, | ||
209 | { fd, autoClose: false, start, end: (start + partSize) - 1 } | ||
210 | ) as ReadStream & { byteLength: number } | ||
211 | |||
212 | // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength | ||
213 | stream.byteLength = min([ statResult.size - start, partSize ]) | ||
214 | |||
215 | const uploadPartCommand = new UploadPartCommand({ | ||
216 | Bucket: bucketInfo.BUCKET_NAME, | ||
217 | Key: key, | ||
218 | UploadId: createResponse.UploadId, | ||
219 | PartNumber: partNumber, | ||
220 | Body: stream | ||
221 | }) | ||
222 | const uploadResponse = await s3Client.send(uploadPartCommand) | ||
223 | |||
224 | parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) | ||
225 | partNumber += 1 | ||
226 | } | ||
227 | await close(fd) | ||
228 | |||
229 | const completeUploadCommand = new CompleteMultipartUploadCommand({ | ||
230 | Bucket: bucketInfo.BUCKET_NAME, | ||
231 | Key: key, | ||
232 | UploadId: createResponse.UploadId, | ||
233 | MultipartUpload: { Parts: parts } | ||
234 | }) | 182 | }) |
235 | await s3Client.send(completeUploadCommand) | 183 | |
184 | await parallelUploads3.done() | ||
236 | 185 | ||
237 | logger.debug( | 186 | logger.debug( |
238 | 'Completed %s%s in bucket %s in %d parts', | 187 | 'Completed %s%s in bucket %s', |
239 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() | 188 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() |
240 | ) | 189 | ) |
241 | 190 | ||
242 | return getPrivateUrl(bucketInfo, objectStorageKey) | 191 | return getPrivateUrl(bucketInfo, objectStorageKey) |