diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/lib/object-storage/shared/object-storage-helpers.ts | 89 | ||||
-rw-r--r-- | server/tests/api/object-storage/videos.ts | 42 |
2 files changed, 52 insertions, 79 deletions
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index ecb82856e..a2de92532 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts | |||
@@ -1,19 +1,14 @@ | |||
1 | import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' | 1 | import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' |
2 | import { min } from 'lodash' | ||
3 | import { dirname } from 'path' | 2 | import { dirname } from 'path' |
4 | import { Readable } from 'stream' | 3 | import { Readable } from 'stream' |
5 | import { | 4 | import { |
6 | CompletedPart, | ||
7 | CompleteMultipartUploadCommand, | ||
8 | CreateMultipartUploadCommand, | ||
9 | CreateMultipartUploadCommandInput, | ||
10 | DeleteObjectCommand, | 5 | DeleteObjectCommand, |
11 | GetObjectCommand, | 6 | GetObjectCommand, |
12 | ListObjectsV2Command, | 7 | ListObjectsV2Command, |
13 | PutObjectCommand, | 8 | PutObjectCommand, |
14 | PutObjectCommandInput, | 9 | PutObjectCommandInput |
15 | UploadPartCommand | ||
16 | } from '@aws-sdk/client-s3' | 10 | } from '@aws-sdk/client-s3' |
11 | import { Upload } from '@aws-sdk/lib-storage' | ||
17 | import { pipelinePromise } from '@server/helpers/core-utils' | 12 | import { pipelinePromise } from '@server/helpers/core-utils' |
18 | import { isArray } from '@server/helpers/custom-validators/misc' | 13 | import { isArray } from '@server/helpers/custom-validators/misc' |
19 | import { logger } from '@server/helpers/logger' | 14 | import { logger } from '@server/helpers/logger' |
@@ -37,13 +32,12 @@ async function storeObject (options: { | |||
37 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | 32 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) |
38 | 33 | ||
39 | const stats = await stat(inputPath) | 34 | const stats = await stat(inputPath) |
35 | const fileStream = createReadStream(inputPath) | ||
40 | 36 | ||
41 | // If bigger than max allowed size we do a multipart upload | ||
42 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | 37 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { |
43 | return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) | 38 | return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) |
44 | } | 39 | } |
45 | 40 | ||
46 | const fileStream = createReadStream(inputPath) | ||
47 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | 41 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) |
48 | } | 42 | } |
49 | 43 | ||
@@ -163,18 +157,14 @@ async function objectStoragePut (options: { | |||
163 | } | 157 | } |
164 | 158 | ||
165 | async function multiPartUpload (options: { | 159 | async function multiPartUpload (options: { |
166 | inputPath: string | 160 | content: ReadStream |
167 | objectStorageKey: string | 161 | objectStorageKey: string |
168 | bucketInfo: BucketInfo | 162 | bucketInfo: BucketInfo |
169 | }) { | 163 | }) { |
170 | const { objectStorageKey, inputPath, bucketInfo } = options | 164 | const { content, objectStorageKey, bucketInfo } = options |
171 | 165 | ||
172 | const key = buildKey(objectStorageKey, bucketInfo) | 166 | const input: PutObjectCommandInput = { |
173 | const s3Client = getClient() | 167 | Body: content, |
174 | |||
175 | const statResult = await stat(inputPath) | ||
176 | |||
177 | const input: CreateMultipartUploadCommandInput = { | ||
178 | Bucket: bucketInfo.BUCKET_NAME, | 168 | Bucket: bucketInfo.BUCKET_NAME, |
179 | Key: buildKey(objectStorageKey, bucketInfo) | 169 | Key: buildKey(objectStorageKey, bucketInfo) |
180 | } | 170 | } |
@@ -183,60 +173,19 @@ async function multiPartUpload (options: { | |||
183 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | 173 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL |
184 | } | 174 | } |
185 | 175 | ||
186 | const createMultipartCommand = new CreateMultipartUploadCommand(input) | 176 | const parallelUploads3 = new Upload({ |
187 | const createResponse = await s3Client.send(createMultipartCommand) | 177 | client: getClient(), |
188 | 178 | queueSize: 4, | |
189 | const fd = await open(inputPath, 'r') | 179 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, |
190 | let partNumber = 1 | 180 | leavePartsOnError: false, |
191 | const parts: CompletedPart[] = [] | 181 | params: input |
192 | const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART | ||
193 | |||
194 | for (let start = 0; start < statResult.size; start += partSize) { | ||
195 | logger.debug( | ||
196 | 'Uploading part %d of file to %s%s in bucket %s', | ||
197 | partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | ||
198 | ) | ||
199 | |||
200 | // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released | ||
201 | // The s3 sdk needs to know the length of the http body beforehand, but doesn't support | ||
202 | // streams with start and end set, so it just tries to stat the file in stream.path. | ||
203 | // This fails for us because we only want to send part of the file. The stream type | ||
204 | // is modified so we can set the byteLength here, which s3 detects because array buffers | ||
205 | // have this field set | ||
206 | const stream: ReadStream & { byteLength: number } = | ||
207 | createReadStream( | ||
208 | inputPath, | ||
209 | { fd, autoClose: false, start, end: (start + partSize) - 1 } | ||
210 | ) as ReadStream & { byteLength: number } | ||
211 | |||
212 | // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength | ||
213 | stream.byteLength = min([ statResult.size - start, partSize ]) | ||
214 | |||
215 | const uploadPartCommand = new UploadPartCommand({ | ||
216 | Bucket: bucketInfo.BUCKET_NAME, | ||
217 | Key: key, | ||
218 | UploadId: createResponse.UploadId, | ||
219 | PartNumber: partNumber, | ||
220 | Body: stream | ||
221 | }) | ||
222 | const uploadResponse = await s3Client.send(uploadPartCommand) | ||
223 | |||
224 | parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) | ||
225 | partNumber += 1 | ||
226 | } | ||
227 | await close(fd) | ||
228 | |||
229 | const completeUploadCommand = new CompleteMultipartUploadCommand({ | ||
230 | Bucket: bucketInfo.BUCKET_NAME, | ||
231 | Key: key, | ||
232 | UploadId: createResponse.UploadId, | ||
233 | MultipartUpload: { Parts: parts } | ||
234 | }) | 182 | }) |
235 | await s3Client.send(completeUploadCommand) | 183 | |
184 | await parallelUploads3.done() | ||
236 | 185 | ||
237 | logger.debug( | 186 | logger.debug( |
238 | 'Completed %s%s in bucket %s in %d parts', | 187 | 'Completed %s%s in bucket %s', |
239 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() | 188 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() |
240 | ) | 189 | ) |
241 | 190 | ||
242 | return getPrivateUrl(bucketInfo, objectStorageKey) | 191 | return getPrivateUrl(bucketInfo, objectStorageKey) |
diff --git a/server/tests/api/object-storage/videos.ts b/server/tests/api/object-storage/videos.ts index 498efcb17..22ad06305 100644 --- a/server/tests/api/object-storage/videos.ts +++ b/server/tests/api/object-storage/videos.ts | |||
@@ -1,9 +1,17 @@ | |||
1 | /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ | 1 | /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ |
2 | 2 | ||
3 | import 'mocha' | 3 | import 'mocha' |
4 | import bytes from 'bytes' | ||
4 | import * as chai from 'chai' | 5 | import * as chai from 'chai' |
6 | import { stat } from 'fs-extra' | ||
5 | import { merge } from 'lodash' | 7 | import { merge } from 'lodash' |
6 | import { checkTmpIsEmpty, expectLogDoesNotContain, expectStartWith, MockObjectStorage } from '@server/tests/shared' | 8 | import { |
9 | checkTmpIsEmpty, | ||
10 | expectLogDoesNotContain, | ||
11 | expectStartWith, | ||
12 | generateHighBitrateVideo, | ||
13 | MockObjectStorage | ||
14 | } from '@server/tests/shared' | ||
7 | import { areObjectStorageTestsDisabled } from '@shared/core-utils' | 15 | import { areObjectStorageTestsDisabled } from '@shared/core-utils' |
8 | import { HttpStatusCode, VideoDetails } from '@shared/models' | 16 | import { HttpStatusCode, VideoDetails } from '@shared/models' |
9 | import { | 17 | import { |
@@ -107,6 +115,10 @@ async function checkFiles (options: { | |||
107 | } | 115 | } |
108 | 116 | ||
109 | function runTestSuite (options: { | 117 | function runTestSuite (options: { |
118 | fixture?: string | ||
119 | |||
120 | maxUploadPart?: string | ||
121 | |||
110 | playlistBucket: string | 122 | playlistBucket: string |
111 | playlistPrefix?: string | 123 | playlistPrefix?: string |
112 | 124 | ||
@@ -114,10 +126,9 @@ function runTestSuite (options: { | |||
114 | webtorrentPrefix?: string | 126 | webtorrentPrefix?: string |
115 | 127 | ||
116 | useMockBaseUrl?: boolean | 128 | useMockBaseUrl?: boolean |
117 | |||
118 | maxUploadPart?: string | ||
119 | }) { | 129 | }) { |
120 | const mockObjectStorage = new MockObjectStorage() | 130 | const mockObjectStorage = new MockObjectStorage() |
131 | const { fixture } = options | ||
121 | let baseMockUrl: string | 132 | let baseMockUrl: string |
122 | 133 | ||
123 | let servers: PeerTubeServer[] | 134 | let servers: PeerTubeServer[] |
@@ -144,7 +155,7 @@ function runTestSuite (options: { | |||
144 | 155 | ||
145 | credentials: ObjectStorageCommand.getCredentialsConfig(), | 156 | credentials: ObjectStorageCommand.getCredentialsConfig(), |
146 | 157 | ||
147 | max_upload_part: options.maxUploadPart || '2MB', | 158 | max_upload_part: options.maxUploadPart || '5MB', |
148 | 159 | ||
149 | streaming_playlists: { | 160 | streaming_playlists: { |
150 | bucket_name: options.playlistBucket, | 161 | bucket_name: options.playlistBucket, |
@@ -181,7 +192,7 @@ function runTestSuite (options: { | |||
181 | it('Should upload a video and move it to the object storage without transcoding', async function () { | 192 | it('Should upload a video and move it to the object storage without transcoding', async function () { |
182 | this.timeout(40000) | 193 | this.timeout(40000) |
183 | 194 | ||
184 | const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1' }) | 195 | const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1', fixture }) |
185 | uuidsToDelete.push(uuid) | 196 | uuidsToDelete.push(uuid) |
186 | 197 | ||
187 | await waitJobs(servers) | 198 | await waitJobs(servers) |
@@ -197,7 +208,7 @@ function runTestSuite (options: { | |||
197 | it('Should upload a video and move it to the object storage with transcoding', async function () { | 208 | it('Should upload a video and move it to the object storage with transcoding', async function () { |
198 | this.timeout(120000) | 209 | this.timeout(120000) |
199 | 210 | ||
200 | const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2' }) | 211 | const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2', fixture }) |
201 | uuidsToDelete.push(uuid) | 212 | uuidsToDelete.push(uuid) |
202 | 213 | ||
203 | await waitJobs(servers) | 214 | await waitJobs(servers) |
@@ -390,12 +401,25 @@ describe('Object storage for videos', function () { | |||
390 | }) | 401 | }) |
391 | }) | 402 | }) |
392 | 403 | ||
393 | describe('Test object storage with small upload part', function () { | 404 | describe('Test object storage with file bigger than upload part', function () { |
405 | let fixture: string | ||
406 | const maxUploadPart = '5MB' | ||
407 | |||
408 | before(async function () { | ||
409 | fixture = await generateHighBitrateVideo() | ||
410 | |||
411 | const { size } = await stat(fixture) | ||
412 | |||
413 | if (bytes.parse(maxUploadPart) > size) { | ||
414 | throw Error(`Fixture file is too small (${size}) to make sense for this test.`) | ||
415 | } | ||
416 | }) | ||
417 | |||
394 | runTestSuite({ | 418 | runTestSuite({ |
419 | maxUploadPart, | ||
395 | playlistBucket: 'streaming-playlists', | 420 | playlistBucket: 'streaming-playlists', |
396 | webtorrentBucket: 'videos', | 421 | webtorrentBucket: 'videos', |
397 | 422 | fixture | |
398 | maxUploadPart: '5KB' | ||
399 | }) | 423 | }) |
400 | }) | 424 | }) |
401 | }) | 425 | }) |