]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/object-storage/shared/object-storage-helpers.ts
fix: validate s3 response (#5231)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
1 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
2 import { dirname } from 'path'
3 import { Readable } from 'stream'
4 import {
5 CompleteMultipartUploadCommandOutput,
6 DeleteObjectCommand,
7 GetObjectCommand,
8 ListObjectsV2Command,
9 PutObjectCommandInput
10 } from '@aws-sdk/client-s3'
11 import { Upload } from '@aws-sdk/lib-storage'
12 import { pipelinePromise } from '@server/helpers/core-utils'
13 import { isArray } from '@server/helpers/custom-validators/misc'
14 import { logger } from '@server/helpers/logger'
15 import { CONFIG } from '@server/initializers/config'
16 import { getPrivateUrl } from '../urls'
17 import { getClient } from './client'
18 import { lTags } from './logger'
19
20 type BucketInfo = {
21 BUCKET_NAME: string
22 PREFIX?: string
23 }
24
25 async function storeObject (options: {
26 inputPath: string
27 objectStorageKey: string
28 bucketInfo: BucketInfo
29 }): Promise<string> {
30 const { inputPath, objectStorageKey, bucketInfo } = options
31
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
33
34 const fileStream = createReadStream(inputPath)
35
36 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
37 }
38
39 async function removeObject (filename: string, bucketInfo: BucketInfo) {
40 const command = new DeleteObjectCommand({
41 Bucket: bucketInfo.BUCKET_NAME,
42 Key: buildKey(filename, bucketInfo)
43 })
44
45 return getClient().send(command)
46 }
47
48 async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
49 const s3Client = getClient()
50
51 const commandPrefix = bucketInfo.PREFIX + prefix
52 const listCommand = new ListObjectsV2Command({
53 Bucket: bucketInfo.BUCKET_NAME,
54 Prefix: commandPrefix
55 })
56
57 const listedObjects = await s3Client.send(listCommand)
58
59 // FIXME: use bulk delete when s3ninja will support this operation
60 // const deleteParams = {
61 // Bucket: bucketInfo.BUCKET_NAME,
62 // Delete: { Objects: [] }
63 // }
64
65 if (isArray(listedObjects.Contents) !== true) {
66 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
67
68 logger.error(message, { response: listedObjects, ...lTags() })
69 throw new Error(message)
70 }
71
72 for (const object of listedObjects.Contents) {
73 const command = new DeleteObjectCommand({
74 Bucket: bucketInfo.BUCKET_NAME,
75 Key: object.Key
76 })
77
78 await s3Client.send(command)
79
80 // FIXME: use bulk delete when s3ninja will support this operation
81 // deleteParams.Delete.Objects.push({ Key: object.Key })
82 }
83
84 // FIXME: use bulk delete when s3ninja will support this operation
85 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
86 // await s3Client.send(deleteCommand)
87
88 // Repeat if not all objects could be listed at once (limit of 1000?)
89 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
90 }
91
92 async function makeAvailable (options: {
93 key: string
94 destination: string
95 bucketInfo: BucketInfo
96 }) {
97 const { key, destination, bucketInfo } = options
98
99 await ensureDir(dirname(options.destination))
100
101 const command = new GetObjectCommand({
102 Bucket: bucketInfo.BUCKET_NAME,
103 Key: buildKey(key, bucketInfo)
104 })
105 const response = await getClient().send(command)
106
107 const file = createWriteStream(destination)
108 await pipelinePromise(response.Body as Readable, file)
109
110 file.close()
111 }
112
113 function buildKey (key: string, bucketInfo: BucketInfo) {
114 return bucketInfo.PREFIX + key
115 }
116
117 // ---------------------------------------------------------------------------
118
119 export {
120 BucketInfo,
121 buildKey,
122 storeObject,
123 removeObject,
124 removePrefix,
125 makeAvailable
126 }
127
128 // ---------------------------------------------------------------------------
129
130 async function uploadToStorage (options: {
131 content: ReadStream
132 objectStorageKey: string
133 bucketInfo: BucketInfo
134 }) {
135 const { content, objectStorageKey, bucketInfo } = options
136
137 const input: PutObjectCommandInput = {
138 Body: content,
139 Bucket: bucketInfo.BUCKET_NAME,
140 Key: buildKey(objectStorageKey, bucketInfo)
141 }
142
143 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
144 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
145 }
146
147 const parallelUploads3 = new Upload({
148 client: getClient(),
149 queueSize: 4,
150 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
151
152 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
153 // More detailed explanation:
154 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
155 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
156 leavePartsOnError: true,
157 params: input
158 })
159
160 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
161 // Check is needed even if the HTTP status code is 200 OK
162 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
163 if (!response.Bucket) {
164 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
165 logger.error(message, { response, ...lTags() })
166 throw new Error(message)
167 }
168
169 logger.debug(
170 'Completed %s%s in bucket %s',
171 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
172 )
173
174 return getPrivateUrl(bucketInfo, objectStorageKey)
175 }