aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorkontrollanten <6680299+kontrollanten@users.noreply.github.com>2022-04-19 15:22:18 +0200
committerGitHub <noreply@github.com>2022-04-19 15:22:18 +0200
commit156cdbac227b5501116a66063812b9fc9b0e89c7 (patch)
tree1efb4398708f1552cafeffc84c1818262dbaca04 /server/lib
parent33ac85bf610d74d353ee456a286159872cdd83cc (diff)
downloadPeerTube-156cdbac227b5501116a66063812b9fc9b0e89c7.tar.gz
PeerTube-156cdbac227b5501116a66063812b9fc9b0e89c7.tar.zst
PeerTube-156cdbac227b5501116a66063812b9fc9b0e89c7.zip
object-storage: @aws-sdk/lib-storage for multipart (#4903)
* object-storage: @aws-sdk/lib-storage for multipart * gitignore: add .DS_Store * test(object-storage): remove only * test(object-storage/multipart): generate video * fix lint issue * test(obj-storage/video): ensure file size * Styling Co-authored-by: Chocobozzz <me@florianbigard.com>
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts89
1 files changed, 19 insertions, 70 deletions
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index ecb82856e..a2de92532 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -1,19 +1,14 @@
1import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' 1import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
2import { min } from 'lodash'
3import { dirname } from 'path' 2import { dirname } from 'path'
4import { Readable } from 'stream' 3import { Readable } from 'stream'
5import { 4import {
6 CompletedPart,
7 CompleteMultipartUploadCommand,
8 CreateMultipartUploadCommand,
9 CreateMultipartUploadCommandInput,
10 DeleteObjectCommand, 5 DeleteObjectCommand,
11 GetObjectCommand, 6 GetObjectCommand,
12 ListObjectsV2Command, 7 ListObjectsV2Command,
13 PutObjectCommand, 8 PutObjectCommand,
14 PutObjectCommandInput, 9 PutObjectCommandInput
15 UploadPartCommand
16} from '@aws-sdk/client-s3' 10} from '@aws-sdk/client-s3'
11import { Upload } from '@aws-sdk/lib-storage'
17import { pipelinePromise } from '@server/helpers/core-utils' 12import { pipelinePromise } from '@server/helpers/core-utils'
18import { isArray } from '@server/helpers/custom-validators/misc' 13import { isArray } from '@server/helpers/custom-validators/misc'
19import { logger } from '@server/helpers/logger' 14import { logger } from '@server/helpers/logger'
@@ -37,13 +32,12 @@ async function storeObject (options: {
37 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) 32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
38 33
39 const stats = await stat(inputPath) 34 const stats = await stat(inputPath)
35 const fileStream = createReadStream(inputPath)
40 36
41 // If bigger than max allowed size we do a multipart upload
42 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { 37 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
43 return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) 38 return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
44 } 39 }
45 40
46 const fileStream = createReadStream(inputPath)
47 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) 41 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
48} 42}
49 43
@@ -163,18 +157,14 @@ async function objectStoragePut (options: {
163} 157}
164 158
165async function multiPartUpload (options: { 159async function multiPartUpload (options: {
166 inputPath: string 160 content: ReadStream
167 objectStorageKey: string 161 objectStorageKey: string
168 bucketInfo: BucketInfo 162 bucketInfo: BucketInfo
169}) { 163}) {
170 const { objectStorageKey, inputPath, bucketInfo } = options 164 const { content, objectStorageKey, bucketInfo } = options
171 165
172 const key = buildKey(objectStorageKey, bucketInfo) 166 const input: PutObjectCommandInput = {
173 const s3Client = getClient() 167 Body: content,
174
175 const statResult = await stat(inputPath)
176
177 const input: CreateMultipartUploadCommandInput = {
178 Bucket: bucketInfo.BUCKET_NAME, 168 Bucket: bucketInfo.BUCKET_NAME,
179 Key: buildKey(objectStorageKey, bucketInfo) 169 Key: buildKey(objectStorageKey, bucketInfo)
180 } 170 }
@@ -183,60 +173,19 @@ async function multiPartUpload (options: {
183 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL 173 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
184 } 174 }
185 175
186 const createMultipartCommand = new CreateMultipartUploadCommand(input) 176 const parallelUploads3 = new Upload({
187 const createResponse = await s3Client.send(createMultipartCommand) 177 client: getClient(),
188 178 queueSize: 4,
189 const fd = await open(inputPath, 'r') 179 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
190 let partNumber = 1 180 leavePartsOnError: false,
191 const parts: CompletedPart[] = [] 181 params: input
192 const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
193
194 for (let start = 0; start < statResult.size; start += partSize) {
195 logger.debug(
196 'Uploading part %d of file to %s%s in bucket %s',
197 partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
198 )
199
200 // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
201 // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
202 // streams with start and end set, so it just tries to stat the file in stream.path.
203 // This fails for us because we only want to send part of the file. The stream type
204 // is modified so we can set the byteLength here, which s3 detects because array buffers
205 // have this field set
206 const stream: ReadStream & { byteLength: number } =
207 createReadStream(
208 inputPath,
209 { fd, autoClose: false, start, end: (start + partSize) - 1 }
210 ) as ReadStream & { byteLength: number }
211
212 // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
213 stream.byteLength = min([ statResult.size - start, partSize ])
214
215 const uploadPartCommand = new UploadPartCommand({
216 Bucket: bucketInfo.BUCKET_NAME,
217 Key: key,
218 UploadId: createResponse.UploadId,
219 PartNumber: partNumber,
220 Body: stream
221 })
222 const uploadResponse = await s3Client.send(uploadPartCommand)
223
224 parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
225 partNumber += 1
226 }
227 await close(fd)
228
229 const completeUploadCommand = new CompleteMultipartUploadCommand({
230 Bucket: bucketInfo.BUCKET_NAME,
231 Key: key,
232 UploadId: createResponse.UploadId,
233 MultipartUpload: { Parts: parts }
234 }) 182 })
235 await s3Client.send(completeUploadCommand) 183
184 await parallelUploads3.done()
236 185
237 logger.debug( 186 logger.debug(
238 'Completed %s%s in bucket %s in %d parts', 187 'Completed %s%s in bucket %s',
239 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() 188 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
240 ) 189 )
241 190
242 return getPrivateUrl(bucketInfo, objectStorageKey) 191 return getPrivateUrl(bucketInfo, objectStorageKey)