]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/object-storage/shared/object-storage-helpers.ts
Correctly cleanup files from object storage
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
1 import { map } from 'bluebird'
2 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
3 import { dirname } from 'path'
4 import { Readable } from 'stream'
5 import {
6 _Object,
7 CompleteMultipartUploadCommandOutput,
8 DeleteObjectCommand,
9 GetObjectCommand,
10 ListObjectsV2Command,
11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
14 } from '@aws-sdk/client-s3'
15 import { Upload } from '@aws-sdk/lib-storage'
16 import { pipelinePromise } from '@server/helpers/core-utils'
17 import { isArray } from '@server/helpers/custom-validators/misc'
18 import { logger } from '@server/helpers/logger'
19 import { CONFIG } from '@server/initializers/config'
20 import { getInternalUrl } from '../urls'
21 import { getClient } from './client'
22 import { lTags } from './logger'
23
24 type BucketInfo = {
25 BUCKET_NAME: string
26 PREFIX?: string
27 }
28
29 async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43 }
44
45 // ---------------------------------------------------------------------------
46
47 async function storeObject (options: {
48 inputPath: string
49 objectStorageKey: string
50 bucketInfo: BucketInfo
51 isPrivate: boolean
52 }): Promise<string> {
53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
54
55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
56
57 const fileStream = createReadStream(inputPath)
58
59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
60 }
61
62 // ---------------------------------------------------------------------------
63
64 function updateObjectACL (options: {
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68 }) {
69 const { objectStorageKey, bucketInfo, isPrivate } = options
70
71 const key = buildKey(objectStorageKey, bucketInfo)
72
73 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
74
75 const command = new PutObjectAclCommand({
76 Bucket: bucketInfo.BUCKET_NAME,
77 Key: key,
78 ACL: getACL(isPrivate)
79 })
80
81 return getClient().send(command)
82 }
83
84 function updatePrefixACL (options: {
85 prefix: string
86 bucketInfo: BucketInfo
87 isPrivate: boolean
88 }) {
89 const { prefix, bucketInfo, isPrivate } = options
90
91 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
92
93 return applyOnPrefix({
94 prefix,
95 bucketInfo,
96 commandBuilder: obj => {
97 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
98
99 return new PutObjectAclCommand({
100 Bucket: bucketInfo.BUCKET_NAME,
101 Key: obj.Key,
102 ACL: getACL(isPrivate)
103 })
104 }
105 })
106 }
107
108 // ---------------------------------------------------------------------------
109
110 function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
111 const key = buildKey(objectStorageKey, bucketInfo)
112
113 logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
114
115 const command = new DeleteObjectCommand({
116 Bucket: bucketInfo.BUCKET_NAME,
117 Key: key
118 })
119
120 return getClient().send(command)
121 }
122
123 async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
124 // FIXME: use bulk delete when s3ninja will support this operation
125
126 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
127
128 return applyOnPrefix({
129 prefix,
130 bucketInfo,
131 commandBuilder: obj => {
132 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
133
134 return new DeleteObjectCommand({
135 Bucket: bucketInfo.BUCKET_NAME,
136 Key: obj.Key
137 })
138 }
139 })
140 }
141
142 // ---------------------------------------------------------------------------
143
144 async function makeAvailable (options: {
145 key: string
146 destination: string
147 bucketInfo: BucketInfo
148 }) {
149 const { key, destination, bucketInfo } = options
150
151 await ensureDir(dirname(options.destination))
152
153 const command = new GetObjectCommand({
154 Bucket: bucketInfo.BUCKET_NAME,
155 Key: buildKey(key, bucketInfo)
156 })
157 const response = await getClient().send(command)
158
159 const file = createWriteStream(destination)
160 await pipelinePromise(response.Body as Readable, file)
161
162 file.close()
163 }
164
165 function buildKey (key: string, bucketInfo: BucketInfo) {
166 return bucketInfo.PREFIX + key
167 }
168
169 // ---------------------------------------------------------------------------
170
171 async function createObjectReadStream (options: {
172 key: string
173 bucketInfo: BucketInfo
174 rangeHeader: string
175 }) {
176 const { key, bucketInfo, rangeHeader } = options
177
178 const command = new GetObjectCommand({
179 Bucket: bucketInfo.BUCKET_NAME,
180 Key: buildKey(key, bucketInfo),
181 Range: rangeHeader
182 })
183
184 const response = await getClient().send(command)
185
186 return response.Body as Readable
187 }
188
189 // ---------------------------------------------------------------------------
190
191 export {
192 BucketInfo,
193 buildKey,
194
195 storeObject,
196
197 removeObject,
198 removePrefix,
199
200 makeAvailable,
201
202 updateObjectACL,
203 updatePrefixACL,
204
205 listKeysOfPrefix,
206 createObjectReadStream
207 }
208
209 // ---------------------------------------------------------------------------
210
211 async function uploadToStorage (options: {
212 content: ReadStream
213 objectStorageKey: string
214 bucketInfo: BucketInfo
215 isPrivate: boolean
216 }) {
217 const { content, objectStorageKey, bucketInfo, isPrivate } = options
218
219 const input: PutObjectCommandInput = {
220 Body: content,
221 Bucket: bucketInfo.BUCKET_NAME,
222 Key: buildKey(objectStorageKey, bucketInfo),
223 ACL: getACL(isPrivate)
224 }
225
226 const parallelUploads3 = new Upload({
227 client: getClient(),
228 queueSize: 4,
229 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
230
231 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
232 // More detailed explanation:
233 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
234 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
235 leavePartsOnError: true,
236 params: input
237 })
238
239 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
240 // Check is needed even if the HTTP status code is 200 OK
241 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
242 if (!response.Bucket) {
243 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
244 logger.error(message, { response, ...lTags() })
245 throw new Error(message)
246 }
247
248 logger.debug(
249 'Completed %s%s in bucket %s',
250 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
251 )
252
253 return getInternalUrl(bucketInfo, objectStorageKey)
254 }
255
256 async function applyOnPrefix (options: {
257 prefix: string
258 bucketInfo: BucketInfo
259 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
260
261 continuationToken?: string
262 }) {
263 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
264
265 const s3Client = getClient()
266
267 const commandPrefix = buildKey(prefix, bucketInfo)
268 const listCommand = new ListObjectsV2Command({
269 Bucket: bucketInfo.BUCKET_NAME,
270 Prefix: commandPrefix,
271 ContinuationToken: continuationToken
272 })
273
274 const listedObjects = await s3Client.send(listCommand)
275
276 if (isArray(listedObjects.Contents) !== true) {
277 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
278
279 logger.error(message, { response: listedObjects, ...lTags() })
280 throw new Error(message)
281 }
282
283 await map(listedObjects.Contents, object => {
284 const command = commandBuilder(object)
285
286 return s3Client.send(command)
287 }, { concurrency: 10 })
288
289 // Repeat if not all objects could be listed at once (limit of 1000?)
290 if (listedObjects.IsTruncated) {
291 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
292 }
293 }
294
295 function getACL (isPrivate: boolean) {
296 return isPrivate
297 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
298 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
299 }