]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Make object storage ACL configurable
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
0305db28
JB
1import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
2import { min } from 'lodash'
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
6 CompletedPart,
7 CompleteMultipartUploadCommand,
8 CreateMultipartUploadCommand,
f9915efa 9 CreateMultipartUploadCommandInput,
0305db28
JB
10 DeleteObjectCommand,
11 GetObjectCommand,
12 ListObjectsV2Command,
13 PutObjectCommand,
f9915efa 14 PutObjectCommandInput,
0305db28
JB
15 UploadPartCommand
16} from '@aws-sdk/client-s3'
17import { pipelinePromise } from '@server/helpers/core-utils'
18import { isArray } from '@server/helpers/custom-validators/misc'
19import { logger } from '@server/helpers/logger'
20import { CONFIG } from '@server/initializers/config'
21import { getPrivateUrl } from '../urls'
22import { getClient } from './client'
23import { lTags } from './logger'
24
25type BucketInfo = {
26 BUCKET_NAME: string
27 PREFIX?: string
28}
29
30async function storeObject (options: {
31 inputPath: string
32 objectStorageKey: string
33 bucketInfo: BucketInfo
34}): Promise<string> {
35 const { inputPath, objectStorageKey, bucketInfo } = options
36
37 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
38
39 const stats = await stat(inputPath)
40
41 // If bigger than max allowed size we do a multipart upload
42 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
43 return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
44 }
45
46 const fileStream = createReadStream(inputPath)
47 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
48}
49
50async function removeObject (filename: string, bucketInfo: BucketInfo) {
51 const command = new DeleteObjectCommand({
52 Bucket: bucketInfo.BUCKET_NAME,
53 Key: buildKey(filename, bucketInfo)
54 })
55
56 return getClient().send(command)
57}
58
59async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
60 const s3Client = getClient()
61
62 const commandPrefix = bucketInfo.PREFIX + prefix
63 const listCommand = new ListObjectsV2Command({
64 Bucket: bucketInfo.BUCKET_NAME,
65 Prefix: commandPrefix
66 })
67
68 const listedObjects = await s3Client.send(listCommand)
69
70 // FIXME: use bulk delete when s3ninja will support this operation
71 // const deleteParams = {
72 // Bucket: bucketInfo.BUCKET_NAME,
73 // Delete: { Objects: [] }
74 // }
75
76 if (isArray(listedObjects.Contents) !== true) {
77 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
78
79 logger.error(message, { response: listedObjects, ...lTags() })
80 throw new Error(message)
81 }
82
83 for (const object of listedObjects.Contents) {
84 const command = new DeleteObjectCommand({
85 Bucket: bucketInfo.BUCKET_NAME,
86 Key: object.Key
87 })
88
89 await s3Client.send(command)
90
91 // FIXME: use bulk delete when s3ninja will support this operation
92 // deleteParams.Delete.Objects.push({ Key: object.Key })
93 }
94
95 // FIXME: use bulk delete when s3ninja will support this operation
96 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
97 // await s3Client.send(deleteCommand)
98
99 // Repeat if not all objects could be listed at once (limit of 1000?)
100 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
101}
102
103async function makeAvailable (options: {
104 key: string
105 destination: string
106 bucketInfo: BucketInfo
107}) {
108 const { key, destination, bucketInfo } = options
109
110 await ensureDir(dirname(options.destination))
111
112 const command = new GetObjectCommand({
113 Bucket: bucketInfo.BUCKET_NAME,
114 Key: buildKey(key, bucketInfo)
115 })
116 const response = await getClient().send(command)
117
118 const file = createWriteStream(destination)
119 await pipelinePromise(response.Body as Readable, file)
120
121 file.close()
122}
123
124function buildKey (key: string, bucketInfo: BucketInfo) {
125 return bucketInfo.PREFIX + key
126}
127
128// ---------------------------------------------------------------------------
129
130export {
131 BucketInfo,
132 buildKey,
133 storeObject,
134 removeObject,
135 removePrefix,
136 makeAvailable
137}
138
139// ---------------------------------------------------------------------------
140
141async function objectStoragePut (options: {
142 objectStorageKey: string
143 content: ReadStream
144 bucketInfo: BucketInfo
145}) {
146 const { objectStorageKey, content, bucketInfo } = options
147
f9915efa 148 const input: PutObjectCommandInput = {
0305db28
JB
149 Bucket: bucketInfo.BUCKET_NAME,
150 Key: buildKey(objectStorageKey, bucketInfo),
f9915efa
DL
151 Body: content
152 }
153
154 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
155 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
156 }
157
158 const command = new PutObjectCommand(input)
0305db28
JB
159
160 await getClient().send(command)
161
162 return getPrivateUrl(bucketInfo, objectStorageKey)
163}
164
165async function multiPartUpload (options: {
166 inputPath: string
167 objectStorageKey: string
168 bucketInfo: BucketInfo
169}) {
170 const { objectStorageKey, inputPath, bucketInfo } = options
171
172 const key = buildKey(objectStorageKey, bucketInfo)
173 const s3Client = getClient()
174
175 const statResult = await stat(inputPath)
176
f9915efa 177 const input: CreateMultipartUploadCommandInput = {
0305db28 178 Bucket: bucketInfo.BUCKET_NAME,
f9915efa
DL
179 Key: buildKey(objectStorageKey, bucketInfo)
180 }
181
182 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
183 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
184 }
185
186 const createMultipartCommand = new CreateMultipartUploadCommand(input)
0305db28
JB
187 const createResponse = await s3Client.send(createMultipartCommand)
188
189 const fd = await open(inputPath, 'r')
190 let partNumber = 1
191 const parts: CompletedPart[] = []
192 const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
193
194 for (let start = 0; start < statResult.size; start += partSize) {
195 logger.debug(
196 'Uploading part %d of file to %s%s in bucket %s',
197 partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
198 )
199
200 // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
201 // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
202 // streams with start and end set, so it just tries to stat the file in stream.path.
203 // This fails for us because we only want to send part of the file. The stream type
204 // is modified so we can set the byteLength here, which s3 detects because array buffers
205 // have this field set
206 const stream: ReadStream & { byteLength: number } =
207 createReadStream(
208 inputPath,
209 { fd, autoClose: false, start, end: (start + partSize) - 1 }
210 ) as ReadStream & { byteLength: number }
211
212 // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
213 stream.byteLength = min([ statResult.size - start, partSize ])
214
215 const uploadPartCommand = new UploadPartCommand({
216 Bucket: bucketInfo.BUCKET_NAME,
217 Key: key,
218 UploadId: createResponse.UploadId,
219 PartNumber: partNumber,
220 Body: stream
221 })
222 const uploadResponse = await s3Client.send(uploadPartCommand)
223
224 parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
225 partNumber += 1
226 }
227 await close(fd)
228
229 const completeUploadCommand = new CompleteMultipartUploadCommand({
230 Bucket: bucketInfo.BUCKET_NAME,
1c93ad07 231 Key: key,
0305db28
JB
232 UploadId: createResponse.UploadId,
233 MultipartUpload: { Parts: parts }
234 })
235 await s3Client.send(completeUploadCommand)
236
237 logger.debug(
238 'Completed %s%s in bucket %s in %d parts',
239 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
240 )
241
242 return getPrivateUrl(bucketInfo, objectStorageKey)
243}