]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Fix object storage multipart upload
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
0305db28
JB
1import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
2import { min } from 'lodash'
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
6 CompletedPart,
7 CompleteMultipartUploadCommand,
8 CreateMultipartUploadCommand,
9 DeleteObjectCommand,
10 GetObjectCommand,
11 ListObjectsV2Command,
12 PutObjectCommand,
13 UploadPartCommand
14} from '@aws-sdk/client-s3'
15import { pipelinePromise } from '@server/helpers/core-utils'
16import { isArray } from '@server/helpers/custom-validators/misc'
17import { logger } from '@server/helpers/logger'
18import { CONFIG } from '@server/initializers/config'
19import { getPrivateUrl } from '../urls'
20import { getClient } from './client'
21import { lTags } from './logger'
22
23type BucketInfo = {
24 BUCKET_NAME: string
25 PREFIX?: string
26}
27
28async function storeObject (options: {
29 inputPath: string
30 objectStorageKey: string
31 bucketInfo: BucketInfo
32}): Promise<string> {
33 const { inputPath, objectStorageKey, bucketInfo } = options
34
35 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
36
37 const stats = await stat(inputPath)
38
39 // If bigger than max allowed size we do a multipart upload
40 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
41 return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
42 }
43
44 const fileStream = createReadStream(inputPath)
45 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
46}
47
48async function removeObject (filename: string, bucketInfo: BucketInfo) {
49 const command = new DeleteObjectCommand({
50 Bucket: bucketInfo.BUCKET_NAME,
51 Key: buildKey(filename, bucketInfo)
52 })
53
54 return getClient().send(command)
55}
56
57async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
58 const s3Client = getClient()
59
60 const commandPrefix = bucketInfo.PREFIX + prefix
61 const listCommand = new ListObjectsV2Command({
62 Bucket: bucketInfo.BUCKET_NAME,
63 Prefix: commandPrefix
64 })
65
66 const listedObjects = await s3Client.send(listCommand)
67
68 // FIXME: use bulk delete when s3ninja will support this operation
69 // const deleteParams = {
70 // Bucket: bucketInfo.BUCKET_NAME,
71 // Delete: { Objects: [] }
72 // }
73
74 if (isArray(listedObjects.Contents) !== true) {
75 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
76
77 logger.error(message, { response: listedObjects, ...lTags() })
78 throw new Error(message)
79 }
80
81 for (const object of listedObjects.Contents) {
82 const command = new DeleteObjectCommand({
83 Bucket: bucketInfo.BUCKET_NAME,
84 Key: object.Key
85 })
86
87 await s3Client.send(command)
88
89 // FIXME: use bulk delete when s3ninja will support this operation
90 // deleteParams.Delete.Objects.push({ Key: object.Key })
91 }
92
93 // FIXME: use bulk delete when s3ninja will support this operation
94 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
95 // await s3Client.send(deleteCommand)
96
97 // Repeat if not all objects could be listed at once (limit of 1000?)
98 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
99}
100
101async function makeAvailable (options: {
102 key: string
103 destination: string
104 bucketInfo: BucketInfo
105}) {
106 const { key, destination, bucketInfo } = options
107
108 await ensureDir(dirname(options.destination))
109
110 const command = new GetObjectCommand({
111 Bucket: bucketInfo.BUCKET_NAME,
112 Key: buildKey(key, bucketInfo)
113 })
114 const response = await getClient().send(command)
115
116 const file = createWriteStream(destination)
117 await pipelinePromise(response.Body as Readable, file)
118
119 file.close()
120}
121
122function buildKey (key: string, bucketInfo: BucketInfo) {
123 return bucketInfo.PREFIX + key
124}
125
126// ---------------------------------------------------------------------------
127
128export {
129 BucketInfo,
130 buildKey,
131 storeObject,
132 removeObject,
133 removePrefix,
134 makeAvailable
135}
136
137// ---------------------------------------------------------------------------
138
139async function objectStoragePut (options: {
140 objectStorageKey: string
141 content: ReadStream
142 bucketInfo: BucketInfo
143}) {
144 const { objectStorageKey, content, bucketInfo } = options
145
146 const command = new PutObjectCommand({
147 Bucket: bucketInfo.BUCKET_NAME,
148 Key: buildKey(objectStorageKey, bucketInfo),
0d4a3c62
C
149 Body: content,
150 ACL: 'public-read'
0305db28
JB
151 })
152
153 await getClient().send(command)
154
155 return getPrivateUrl(bucketInfo, objectStorageKey)
156}
157
158async function multiPartUpload (options: {
159 inputPath: string
160 objectStorageKey: string
161 bucketInfo: BucketInfo
162}) {
163 const { objectStorageKey, inputPath, bucketInfo } = options
164
165 const key = buildKey(objectStorageKey, bucketInfo)
166 const s3Client = getClient()
167
168 const statResult = await stat(inputPath)
169
170 const createMultipartCommand = new CreateMultipartUploadCommand({
171 Bucket: bucketInfo.BUCKET_NAME,
0d4a3c62
C
172 Key: key,
173 ACL: 'public-read'
0305db28
JB
174 })
175 const createResponse = await s3Client.send(createMultipartCommand)
176
177 const fd = await open(inputPath, 'r')
178 let partNumber = 1
179 const parts: CompletedPart[] = []
180 const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
181
182 for (let start = 0; start < statResult.size; start += partSize) {
183 logger.debug(
184 'Uploading part %d of file to %s%s in bucket %s',
185 partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
186 )
187
188 // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
189 // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
190 // streams with start and end set, so it just tries to stat the file in stream.path.
191 // This fails for us because we only want to send part of the file. The stream type
192 // is modified so we can set the byteLength here, which s3 detects because array buffers
193 // have this field set
194 const stream: ReadStream & { byteLength: number } =
195 createReadStream(
196 inputPath,
197 { fd, autoClose: false, start, end: (start + partSize) - 1 }
198 ) as ReadStream & { byteLength: number }
199
200 // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
201 stream.byteLength = min([ statResult.size - start, partSize ])
202
203 const uploadPartCommand = new UploadPartCommand({
204 Bucket: bucketInfo.BUCKET_NAME,
205 Key: key,
206 UploadId: createResponse.UploadId,
207 PartNumber: partNumber,
208 Body: stream
209 })
210 const uploadResponse = await s3Client.send(uploadPartCommand)
211
212 parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
213 partNumber += 1
214 }
215 await close(fd)
216
217 const completeUploadCommand = new CompleteMultipartUploadCommand({
218 Bucket: bucketInfo.BUCKET_NAME,
1c93ad07 219 Key: key,
0305db28
JB
220 UploadId: createResponse.UploadId,
221 MultipartUpload: { Parts: parts }
222 })
223 await s3Client.send(completeUploadCommand)
224
225 logger.debug(
226 'Completed %s%s in bucket %s in %d parts',
227 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
228 )
229
230 return getPrivateUrl(bucketInfo, objectStorageKey)
231}