aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts89
-rw-r--r--server/tests/api/object-storage/videos.ts42
2 files changed, 52 insertions, 79 deletions
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index ecb82856e..a2de92532 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -1,19 +1,14 @@
1import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' 1import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
2import { min } from 'lodash'
3import { dirname } from 'path' 2import { dirname } from 'path'
4import { Readable } from 'stream' 3import { Readable } from 'stream'
5import { 4import {
6 CompletedPart,
7 CompleteMultipartUploadCommand,
8 CreateMultipartUploadCommand,
9 CreateMultipartUploadCommandInput,
10 DeleteObjectCommand, 5 DeleteObjectCommand,
11 GetObjectCommand, 6 GetObjectCommand,
12 ListObjectsV2Command, 7 ListObjectsV2Command,
13 PutObjectCommand, 8 PutObjectCommand,
14 PutObjectCommandInput, 9 PutObjectCommandInput
15 UploadPartCommand
16} from '@aws-sdk/client-s3' 10} from '@aws-sdk/client-s3'
11import { Upload } from '@aws-sdk/lib-storage'
17import { pipelinePromise } from '@server/helpers/core-utils' 12import { pipelinePromise } from '@server/helpers/core-utils'
18import { isArray } from '@server/helpers/custom-validators/misc' 13import { isArray } from '@server/helpers/custom-validators/misc'
19import { logger } from '@server/helpers/logger' 14import { logger } from '@server/helpers/logger'
@@ -37,13 +32,12 @@ async function storeObject (options: {
37 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) 32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
38 33
39 const stats = await stat(inputPath) 34 const stats = await stat(inputPath)
35 const fileStream = createReadStream(inputPath)
40 36
41 // If bigger than max allowed size we do a multipart upload
42 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { 37 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
43 return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) 38 return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
44 } 39 }
45 40
46 const fileStream = createReadStream(inputPath)
47 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) 41 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
48} 42}
49 43
@@ -163,18 +157,14 @@ async function objectStoragePut (options: {
163} 157}
164 158
165async function multiPartUpload (options: { 159async function multiPartUpload (options: {
166 inputPath: string 160 content: ReadStream
167 objectStorageKey: string 161 objectStorageKey: string
168 bucketInfo: BucketInfo 162 bucketInfo: BucketInfo
169}) { 163}) {
170 const { objectStorageKey, inputPath, bucketInfo } = options 164 const { content, objectStorageKey, bucketInfo } = options
171 165
172 const key = buildKey(objectStorageKey, bucketInfo) 166 const input: PutObjectCommandInput = {
173 const s3Client = getClient() 167 Body: content,
174
175 const statResult = await stat(inputPath)
176
177 const input: CreateMultipartUploadCommandInput = {
178 Bucket: bucketInfo.BUCKET_NAME, 168 Bucket: bucketInfo.BUCKET_NAME,
179 Key: buildKey(objectStorageKey, bucketInfo) 169 Key: buildKey(objectStorageKey, bucketInfo)
180 } 170 }
@@ -183,60 +173,19 @@ async function multiPartUpload (options: {
183 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL 173 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
184 } 174 }
185 175
186 const createMultipartCommand = new CreateMultipartUploadCommand(input) 176 const parallelUploads3 = new Upload({
187 const createResponse = await s3Client.send(createMultipartCommand) 177 client: getClient(),
188 178 queueSize: 4,
189 const fd = await open(inputPath, 'r') 179 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
190 let partNumber = 1 180 leavePartsOnError: false,
191 const parts: CompletedPart[] = [] 181 params: input
192 const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
193
194 for (let start = 0; start < statResult.size; start += partSize) {
195 logger.debug(
196 'Uploading part %d of file to %s%s in bucket %s',
197 partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
198 )
199
200 // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
201 // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
202 // streams with start and end set, so it just tries to stat the file in stream.path.
203 // This fails for us because we only want to send part of the file. The stream type
204 // is modified so we can set the byteLength here, which s3 detects because array buffers
205 // have this field set
206 const stream: ReadStream & { byteLength: number } =
207 createReadStream(
208 inputPath,
209 { fd, autoClose: false, start, end: (start + partSize) - 1 }
210 ) as ReadStream & { byteLength: number }
211
212 // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
213 stream.byteLength = min([ statResult.size - start, partSize ])
214
215 const uploadPartCommand = new UploadPartCommand({
216 Bucket: bucketInfo.BUCKET_NAME,
217 Key: key,
218 UploadId: createResponse.UploadId,
219 PartNumber: partNumber,
220 Body: stream
221 })
222 const uploadResponse = await s3Client.send(uploadPartCommand)
223
224 parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
225 partNumber += 1
226 }
227 await close(fd)
228
229 const completeUploadCommand = new CompleteMultipartUploadCommand({
230 Bucket: bucketInfo.BUCKET_NAME,
231 Key: key,
232 UploadId: createResponse.UploadId,
233 MultipartUpload: { Parts: parts }
234 }) 182 })
235 await s3Client.send(completeUploadCommand) 183
184 await parallelUploads3.done()
236 185
237 logger.debug( 186 logger.debug(
238 'Completed %s%s in bucket %s in %d parts', 187 'Completed %s%s in bucket %s',
239 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() 188 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
240 ) 189 )
241 190
242 return getPrivateUrl(bucketInfo, objectStorageKey) 191 return getPrivateUrl(bucketInfo, objectStorageKey)
diff --git a/server/tests/api/object-storage/videos.ts b/server/tests/api/object-storage/videos.ts
index 498efcb17..22ad06305 100644
--- a/server/tests/api/object-storage/videos.ts
+++ b/server/tests/api/object-storage/videos.ts
@@ -1,9 +1,17 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ 1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2 2
3import 'mocha' 3import 'mocha'
4import bytes from 'bytes'
4import * as chai from 'chai' 5import * as chai from 'chai'
6import { stat } from 'fs-extra'
5import { merge } from 'lodash' 7import { merge } from 'lodash'
6import { checkTmpIsEmpty, expectLogDoesNotContain, expectStartWith, MockObjectStorage } from '@server/tests/shared' 8import {
9 checkTmpIsEmpty,
10 expectLogDoesNotContain,
11 expectStartWith,
12 generateHighBitrateVideo,
13 MockObjectStorage
14} from '@server/tests/shared'
7import { areObjectStorageTestsDisabled } from '@shared/core-utils' 15import { areObjectStorageTestsDisabled } from '@shared/core-utils'
8import { HttpStatusCode, VideoDetails } from '@shared/models' 16import { HttpStatusCode, VideoDetails } from '@shared/models'
9import { 17import {
@@ -107,6 +115,10 @@ async function checkFiles (options: {
107} 115}
108 116
109function runTestSuite (options: { 117function runTestSuite (options: {
118 fixture?: string
119
120 maxUploadPart?: string
121
110 playlistBucket: string 122 playlistBucket: string
111 playlistPrefix?: string 123 playlistPrefix?: string
112 124
@@ -114,10 +126,9 @@ function runTestSuite (options: {
114 webtorrentPrefix?: string 126 webtorrentPrefix?: string
115 127
116 useMockBaseUrl?: boolean 128 useMockBaseUrl?: boolean
117
118 maxUploadPart?: string
119}) { 129}) {
120 const mockObjectStorage = new MockObjectStorage() 130 const mockObjectStorage = new MockObjectStorage()
131 const { fixture } = options
121 let baseMockUrl: string 132 let baseMockUrl: string
122 133
123 let servers: PeerTubeServer[] 134 let servers: PeerTubeServer[]
@@ -144,7 +155,7 @@ function runTestSuite (options: {
144 155
145 credentials: ObjectStorageCommand.getCredentialsConfig(), 156 credentials: ObjectStorageCommand.getCredentialsConfig(),
146 157
147 max_upload_part: options.maxUploadPart || '2MB', 158 max_upload_part: options.maxUploadPart || '5MB',
148 159
149 streaming_playlists: { 160 streaming_playlists: {
150 bucket_name: options.playlistBucket, 161 bucket_name: options.playlistBucket,
@@ -181,7 +192,7 @@ function runTestSuite (options: {
181 it('Should upload a video and move it to the object storage without transcoding', async function () { 192 it('Should upload a video and move it to the object storage without transcoding', async function () {
182 this.timeout(40000) 193 this.timeout(40000)
183 194
184 const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1' }) 195 const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1', fixture })
185 uuidsToDelete.push(uuid) 196 uuidsToDelete.push(uuid)
186 197
187 await waitJobs(servers) 198 await waitJobs(servers)
@@ -197,7 +208,7 @@ function runTestSuite (options: {
197 it('Should upload a video and move it to the object storage with transcoding', async function () { 208 it('Should upload a video and move it to the object storage with transcoding', async function () {
198 this.timeout(120000) 209 this.timeout(120000)
199 210
200 const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2' }) 211 const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2', fixture })
201 uuidsToDelete.push(uuid) 212 uuidsToDelete.push(uuid)
202 213
203 await waitJobs(servers) 214 await waitJobs(servers)
@@ -390,12 +401,25 @@ describe('Object storage for videos', function () {
390 }) 401 })
391 }) 402 })
392 403
393 describe('Test object storage with small upload part', function () { 404 describe('Test object storage with file bigger than upload part', function () {
405 let fixture: string
406 const maxUploadPart = '5MB'
407
408 before(async function () {
409 fixture = await generateHighBitrateVideo()
410
411 const { size } = await stat(fixture)
412
413 if (bytes.parse(maxUploadPart) > size) {
414 throw Error(`Fixture file is too small (${size}) to make sense for this test.`)
415 }
416 })
417
394 runTestSuite({ 418 runTestSuite({
419 maxUploadPart,
395 playlistBucket: 'streaming-playlists', 420 playlistBucket: 'streaming-playlists',
396 webtorrentBucket: 'videos', 421 webtorrentBucket: 'videos',
397 422 fixture
398 maxUploadPart: '5KB'
399 }) 423 })
400 }) 424 })
401}) 425})