From 156cdbac227b5501116a66063812b9fc9b0e89c7 Mon Sep 17 00:00:00 2001 From: kontrollanten <6680299+kontrollanten@users.noreply.github.com> Date: Tue, 19 Apr 2022 15:22:18 +0200 Subject: object-storage: @aws-sdk/lib-storage for multipart (#4903) * object-storage: @aws-sdk/lib-storage for multipart * gitignore: add .DS_Store * test(object-storage): remove only * test(object-storage/multipart): generate video * fix lint issue * test(obj-storage/video): ensure file size * Styling Co-authored-by: Chocobozzz --- .../shared/object-storage-helpers.ts | 89 +++++----------------- server/tests/api/object-storage/videos.ts | 42 +++++++--- 2 files changed, 52 insertions(+), 79 deletions(-) (limited to 'server') diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index ecb82856e..a2de92532 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts @@ -1,19 +1,14 @@ -import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' -import { min } from 'lodash' +import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' import { dirname } from 'path' import { Readable } from 'stream' import { - CompletedPart, - CompleteMultipartUploadCommand, - CreateMultipartUploadCommand, - CreateMultipartUploadCommandInput, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, - PutObjectCommandInput, - UploadPartCommand + PutObjectCommandInput } from '@aws-sdk/client-s3' +import { Upload } from '@aws-sdk/lib-storage' import { pipelinePromise } from '@server/helpers/core-utils' import { isArray } from '@server/helpers/custom-validators/misc' import { logger } from '@server/helpers/logger' @@ -37,13 +32,12 @@ async function storeObject (options: { logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) const stats = await stat(inputPath) + const fileStream = createReadStream(inputPath) - // If bigger than max allowed size we do a multipart upload if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { - return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) + return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) } - const fileStream = createReadStream(inputPath) return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) } @@ -163,18 +157,14 @@ async function objectStoragePut (options: { } async function multiPartUpload (options: { - inputPath: string + content: ReadStream objectStorageKey: string bucketInfo: BucketInfo }) { - const { objectStorageKey, inputPath, bucketInfo } = options + const { content, objectStorageKey, bucketInfo } = options - const key = buildKey(objectStorageKey, bucketInfo) - const s3Client = getClient() - - const statResult = await stat(inputPath) - - const input: CreateMultipartUploadCommandInput = { + const input: PutObjectCommandInput = { + Body: content, Bucket: bucketInfo.BUCKET_NAME, Key: buildKey(objectStorageKey, bucketInfo) } @@ -183,60 +173,19 @@ async function multiPartUpload (options: { input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL } - const createMultipartCommand = new CreateMultipartUploadCommand(input) - const createResponse = await s3Client.send(createMultipartCommand) - - const fd = await open(inputPath, 'r') - let partNumber = 1 - const parts: CompletedPart[] = [] - const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART - - for (let start = 0; start < statResult.size; start += partSize) { - logger.debug( - 'Uploading part %d of file to %s%s in bucket %s', - partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() - ) - - // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released - // The s3 sdk needs to know the length of the http body beforehand, but doesn't support - // streams with start and end set, so it just tries to stat the file in stream.path. - // This fails for us because we only want to send part of the file. The stream type - // is modified so we can set the byteLength here, which s3 detects because array buffers - // have this field set - const stream: ReadStream & { byteLength: number } = - createReadStream( - inputPath, - { fd, autoClose: false, start, end: (start + partSize) - 1 } - ) as ReadStream & { byteLength: number } - - // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength - stream.byteLength = min([ statResult.size - start, partSize ]) - - const uploadPartCommand = new UploadPartCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: key, - UploadId: createResponse.UploadId, - PartNumber: partNumber, - Body: stream - }) - const uploadResponse = await s3Client.send(uploadPartCommand) - - parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) - partNumber += 1 - } - await close(fd) - - const completeUploadCommand = new CompleteMultipartUploadCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: key, - UploadId: createResponse.UploadId, - MultipartUpload: { Parts: parts } + const parallelUploads3 = new Upload({ + client: getClient(), + queueSize: 4, + partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, + leavePartsOnError: false, + params: input }) - await s3Client.send(completeUploadCommand) + + await parallelUploads3.done() logger.debug( - 'Completed %s%s in bucket %s in %d parts', - bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() + 'Completed %s%s in bucket %s', + bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() ) return getPrivateUrl(bucketInfo, objectStorageKey) diff --git a/server/tests/api/object-storage/videos.ts b/server/tests/api/object-storage/videos.ts index 498efcb17..22ad06305 100644 --- a/server/tests/api/object-storage/videos.ts +++ b/server/tests/api/object-storage/videos.ts @@ -1,9 +1,17 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ import 'mocha' +import bytes from 'bytes' import * as chai from 'chai' +import { stat } from 'fs-extra' import { merge } from 'lodash' -import { checkTmpIsEmpty, expectLogDoesNotContain, expectStartWith, MockObjectStorage } from '@server/tests/shared' +import { + checkTmpIsEmpty, + expectLogDoesNotContain, + expectStartWith, + generateHighBitrateVideo, + MockObjectStorage +} from '@server/tests/shared' import { areObjectStorageTestsDisabled } from '@shared/core-utils' import { HttpStatusCode, VideoDetails } from '@shared/models' import { @@ -107,6 +115,10 @@ async function checkFiles (options: { } function runTestSuite (options: { + fixture?: string + + maxUploadPart?: string + playlistBucket: string playlistPrefix?: string @@ -114,10 +126,9 @@ function runTestSuite (options: { webtorrentPrefix?: string useMockBaseUrl?: boolean - - maxUploadPart?: string }) { const mockObjectStorage = new MockObjectStorage() + const { fixture } = options let baseMockUrl: string let servers: PeerTubeServer[] @@ -144,7 +155,7 @@ function runTestSuite (options: { credentials: ObjectStorageCommand.getCredentialsConfig(), - max_upload_part: options.maxUploadPart || '2MB', + max_upload_part: options.maxUploadPart || '5MB', streaming_playlists: { bucket_name: options.playlistBucket, @@ -181,7 +192,7 @@ function runTestSuite (options: { it('Should upload a video and move it to the object storage without transcoding', async function () { this.timeout(40000) - const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1' }) + const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1', fixture }) uuidsToDelete.push(uuid) await waitJobs(servers) @@ -197,7 +208,7 @@ function runTestSuite (options: { it('Should upload a video and move it to the object storage with transcoding', async function () { this.timeout(120000) - const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2' }) + const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2', fixture }) uuidsToDelete.push(uuid) await waitJobs(servers) @@ -390,12 +401,25 @@ describe('Object storage for videos', function () { }) }) - describe('Test object storage with small upload part', function () { + describe('Test object storage with file bigger than upload part', function () { + let fixture: string + const maxUploadPart = '5MB' + + before(async function () { + fixture = await generateHighBitrateVideo() + + const { size } = await stat(fixture) + + if (bytes.parse(maxUploadPart) > size) { + throw Error(`Fixture file is too small (${size}) to make sense for this test.`) + } + }) + runTestSuite({ + maxUploadPart, playlistBucket: 'streaming-playlists', webtorrentBucket: 'videos', - - maxUploadPart: '5KB' + fixture }) }) }) -- cgit v1.2.3