]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Forward 206 status code for object storage proxy
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
508c1b1e 1import { map } from 'bluebird'
23c0b67d 2import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
0305db28
JB
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
9ab330b9 6 _Object,
23c0b67d 7 CompleteMultipartUploadCommandOutput,
0305db28
JB
8 DeleteObjectCommand,
9 GetObjectCommand,
10 ListObjectsV2Command,
9ab330b9
C
11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
0305db28 14} from '@aws-sdk/client-s3'
156cdbac 15import { Upload } from '@aws-sdk/lib-storage'
0305db28
JB
16import { pipelinePromise } from '@server/helpers/core-utils'
17import { isArray } from '@server/helpers/custom-validators/misc'
18import { logger } from '@server/helpers/logger'
19import { CONFIG } from '@server/initializers/config'
9ab330b9 20import { getInternalUrl } from '../urls'
0305db28
JB
21import { getClient } from './client'
22import { lTags } from './logger'
23
24type BucketInfo = {
25 BUCKET_NAME: string
26 PREFIX?: string
27}
28
cfd57d2c
C
29async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43}
44
45// ---------------------------------------------------------------------------
46
0305db28
JB
47async function storeObject (options: {
48 inputPath: string
49 objectStorageKey: string
50 bucketInfo: BucketInfo
9ab330b9 51 isPrivate: boolean
0305db28 52}): Promise<string> {
9ab330b9 53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
0305db28
JB
54
55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
56
156cdbac 57 const fileStream = createReadStream(inputPath)
0305db28 58
9ab330b9 59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
0305db28
JB
60}
61
cfd57d2c
C
62// ---------------------------------------------------------------------------
63
9ab330b9
C
64function updateObjectACL (options: {
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}) {
69 const { objectStorageKey, bucketInfo, isPrivate } = options
70
71 const key = buildKey(objectStorageKey, bucketInfo)
72
73 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
74
75 const command = new PutObjectAclCommand({
0305db28 76 Bucket: bucketInfo.BUCKET_NAME,
9ab330b9
C
77 Key: key,
78 ACL: getACL(isPrivate)
0305db28
JB
79 })
80
81 return getClient().send(command)
82}
83
9ab330b9
C
84function updatePrefixACL (options: {
85 prefix: string
86 bucketInfo: BucketInfo
87 isPrivate: boolean
88}) {
89 const { prefix, bucketInfo, isPrivate } = options
90
91 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
92
93 return applyOnPrefix({
94 prefix,
95 bucketInfo,
96 commandBuilder: obj => {
508c1b1e
C
97 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
98
9ab330b9
C
99 return new PutObjectAclCommand({
100 Bucket: bucketInfo.BUCKET_NAME,
101 Key: obj.Key,
102 ACL: getACL(isPrivate)
103 })
104 }
0305db28 105 })
9ab330b9 106}
0305db28 107
9ab330b9 108// ---------------------------------------------------------------------------
0305db28 109
9ab330b9
C
110function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
111 const key = buildKey(objectStorageKey, bucketInfo)
0305db28 112
aa887096
C
113 return removeObjectByFullKey(key, bucketInfo)
114}
115
116function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
117 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
0305db28 118
9ab330b9
C
119 const command = new DeleteObjectCommand({
120 Bucket: bucketInfo.BUCKET_NAME,
aa887096 121 Key: fullKey
9ab330b9 122 })
0305db28 123
9ab330b9
C
124 return getClient().send(command)
125}
0305db28 126
508c1b1e 127async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
0305db28 128 // FIXME: use bulk delete when s3ninja will support this operation
0305db28 129
9ab330b9
C
130 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
131
132 return applyOnPrefix({
133 prefix,
134 bucketInfo,
135 commandBuilder: obj => {
508c1b1e
C
136 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
137
9ab330b9
C
138 return new DeleteObjectCommand({
139 Bucket: bucketInfo.BUCKET_NAME,
140 Key: obj.Key
141 })
142 }
143 })
0305db28
JB
144}
145
cfd57d2c
C
146// ---------------------------------------------------------------------------
147
0305db28
JB
148async function makeAvailable (options: {
149 key: string
150 destination: string
151 bucketInfo: BucketInfo
152}) {
153 const { key, destination, bucketInfo } = options
154
155 await ensureDir(dirname(options.destination))
156
157 const command = new GetObjectCommand({
158 Bucket: bucketInfo.BUCKET_NAME,
159 Key: buildKey(key, bucketInfo)
160 })
161 const response = await getClient().send(command)
162
163 const file = createWriteStream(destination)
164 await pipelinePromise(response.Body as Readable, file)
165
166 file.close()
167}
168
169function buildKey (key: string, bucketInfo: BucketInfo) {
170 return bucketInfo.PREFIX + key
171}
172
173// ---------------------------------------------------------------------------
174
9ab330b9
C
175async function createObjectReadStream (options: {
176 key: string
177 bucketInfo: BucketInfo
178 rangeHeader: string
179}) {
180 const { key, bucketInfo, rangeHeader } = options
181
182 const command = new GetObjectCommand({
183 Bucket: bucketInfo.BUCKET_NAME,
184 Key: buildKey(key, bucketInfo),
185 Range: rangeHeader
186 })
187
188 const response = await getClient().send(command)
189
57e11a20
C
190 return {
191 response,
192 stream: response.Body as Readable
193 }
9ab330b9
C
194}
195
196// ---------------------------------------------------------------------------
197
0305db28
JB
198export {
199 BucketInfo,
200 buildKey,
9ab330b9 201
0305db28 202 storeObject,
9ab330b9 203
0305db28 204 removeObject,
aa887096 205 removeObjectByFullKey,
0305db28 206 removePrefix,
9ab330b9 207
cfd57d2c 208 makeAvailable,
9ab330b9
C
209
210 updateObjectACL,
211 updatePrefixACL,
212
213 listKeysOfPrefix,
214 createObjectReadStream
0305db28
JB
215}
216
217// ---------------------------------------------------------------------------
218
23c0b67d 219async function uploadToStorage (options: {
156cdbac 220 content: ReadStream
0305db28
JB
221 objectStorageKey: string
222 bucketInfo: BucketInfo
9ab330b9 223 isPrivate: boolean
0305db28 224}) {
9ab330b9 225 const { content, objectStorageKey, bucketInfo, isPrivate } = options
0305db28 226
156cdbac 227 const input: PutObjectCommandInput = {
228 Body: content,
0305db28 229 Bucket: bucketInfo.BUCKET_NAME,
9ab330b9
C
230 Key: buildKey(objectStorageKey, bucketInfo),
231 ACL: getACL(isPrivate)
f9915efa
DL
232 }
233
156cdbac 234 const parallelUploads3 = new Upload({
235 client: getClient(),
236 queueSize: 4,
237 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
23c0b67d 238
239 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
240 // More detailed explanation:
241 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
242 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
243 leavePartsOnError: true,
156cdbac 244 params: input
0305db28 245 })
156cdbac 246
23c0b67d 247 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
248 // Check is needed even if the HTTP status code is 200 OK
249 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
250 if (!response.Bucket) {
251 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
252 logger.error(message, { response, ...lTags() })
253 throw new Error(message)
254 }
0305db28
JB
255
256 logger.debug(
156cdbac 257 'Completed %s%s in bucket %s',
258 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
0305db28
JB
259 )
260
9ab330b9
C
261 return getInternalUrl(bucketInfo, objectStorageKey)
262}
263
264async function applyOnPrefix (options: {
265 prefix: string
266 bucketInfo: BucketInfo
267 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
268
269 continuationToken?: string
270}) {
271 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
272
273 const s3Client = getClient()
274
508c1b1e 275 const commandPrefix = buildKey(prefix, bucketInfo)
9ab330b9
C
276 const listCommand = new ListObjectsV2Command({
277 Bucket: bucketInfo.BUCKET_NAME,
278 Prefix: commandPrefix,
279 ContinuationToken: continuationToken
280 })
281
282 const listedObjects = await s3Client.send(listCommand)
283
284 if (isArray(listedObjects.Contents) !== true) {
285 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
286
287 logger.error(message, { response: listedObjects, ...lTags() })
288 throw new Error(message)
289 }
290
508c1b1e 291 await map(listedObjects.Contents, object => {
9ab330b9
C
292 const command = commandBuilder(object)
293
508c1b1e
C
294 return s3Client.send(command)
295 }, { concurrency: 10 })
9ab330b9
C
296
297 // Repeat if not all objects could be listed at once (limit of 1000?)
298 if (listedObjects.IsTruncated) {
299 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
300 }
301}
302
303function getACL (isPrivate: boolean) {
304 return isPrivate
305 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
306 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
0305db28 307}