]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Fix #5744: Missing plugins-global.css + missing async (#5746)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
508c1b1e 1import { map } from 'bluebird'
23c0b67d 2import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
0305db28
JB
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
9ab330b9 6 _Object,
23c0b67d 7 CompleteMultipartUploadCommandOutput,
0305db28
JB
8 DeleteObjectCommand,
9 GetObjectCommand,
10 ListObjectsV2Command,
9ab330b9
C
11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
0305db28 14} from '@aws-sdk/client-s3'
156cdbac 15import { Upload } from '@aws-sdk/lib-storage'
0305db28
JB
16import { pipelinePromise } from '@server/helpers/core-utils'
17import { isArray } from '@server/helpers/custom-validators/misc'
18import { logger } from '@server/helpers/logger'
19import { CONFIG } from '@server/initializers/config'
9ab330b9 20import { getInternalUrl } from '../urls'
0305db28
JB
21import { getClient } from './client'
22import { lTags } from './logger'
23
24type BucketInfo = {
25 BUCKET_NAME: string
26 PREFIX?: string
27}
28
cfd57d2c
C
29async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43}
44
45// ---------------------------------------------------------------------------
46
0305db28
JB
47async function storeObject (options: {
48 inputPath: string
49 objectStorageKey: string
50 bucketInfo: BucketInfo
9ab330b9 51 isPrivate: boolean
0305db28 52}): Promise<string> {
9ab330b9 53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
0305db28
JB
54
55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
56
156cdbac 57 const fileStream = createReadStream(inputPath)
0305db28 58
9ab330b9 59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
0305db28
JB
60}
61
cfd57d2c
C
62// ---------------------------------------------------------------------------
63
8180f604 64async function updateObjectACL (options: {
9ab330b9
C
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}) {
69 const { objectStorageKey, bucketInfo, isPrivate } = options
70
8180f604
C
71 const acl = getACL(isPrivate)
72 if (!acl) return
73
9ab330b9
C
74 const key = buildKey(objectStorageKey, bucketInfo)
75
76 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
77
78 const command = new PutObjectAclCommand({
0305db28 79 Bucket: bucketInfo.BUCKET_NAME,
9ab330b9 80 Key: key,
8180f604 81 ACL: acl
0305db28
JB
82 })
83
8180f604 84 await getClient().send(command)
0305db28
JB
85}
86
9ab330b9
C
87function updatePrefixACL (options: {
88 prefix: string
89 bucketInfo: BucketInfo
90 isPrivate: boolean
91}) {
92 const { prefix, bucketInfo, isPrivate } = options
93
8180f604
C
94 const acl = getACL(isPrivate)
95 if (!acl) return
96
9ab330b9
C
97 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
98
99 return applyOnPrefix({
100 prefix,
101 bucketInfo,
102 commandBuilder: obj => {
508c1b1e
C
103 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
104
9ab330b9
C
105 return new PutObjectAclCommand({
106 Bucket: bucketInfo.BUCKET_NAME,
107 Key: obj.Key,
8180f604 108 ACL: acl
9ab330b9
C
109 })
110 }
0305db28 111 })
9ab330b9 112}
0305db28 113
9ab330b9 114// ---------------------------------------------------------------------------
0305db28 115
9ab330b9
C
116function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
117 const key = buildKey(objectStorageKey, bucketInfo)
0305db28 118
aa887096
C
119 return removeObjectByFullKey(key, bucketInfo)
120}
121
122function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
123 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
0305db28 124
9ab330b9
C
125 const command = new DeleteObjectCommand({
126 Bucket: bucketInfo.BUCKET_NAME,
aa887096 127 Key: fullKey
9ab330b9 128 })
0305db28 129
9ab330b9
C
130 return getClient().send(command)
131}
0305db28 132
508c1b1e 133async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
0305db28 134 // FIXME: use bulk delete when s3ninja will support this operation
0305db28 135
9ab330b9
C
136 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
137
138 return applyOnPrefix({
139 prefix,
140 bucketInfo,
141 commandBuilder: obj => {
508c1b1e
C
142 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
143
9ab330b9
C
144 return new DeleteObjectCommand({
145 Bucket: bucketInfo.BUCKET_NAME,
146 Key: obj.Key
147 })
148 }
149 })
0305db28
JB
150}
151
cfd57d2c
C
152// ---------------------------------------------------------------------------
153
0305db28
JB
154async function makeAvailable (options: {
155 key: string
156 destination: string
157 bucketInfo: BucketInfo
158}) {
159 const { key, destination, bucketInfo } = options
160
161 await ensureDir(dirname(options.destination))
162
163 const command = new GetObjectCommand({
164 Bucket: bucketInfo.BUCKET_NAME,
165 Key: buildKey(key, bucketInfo)
166 })
167 const response = await getClient().send(command)
168
169 const file = createWriteStream(destination)
170 await pipelinePromise(response.Body as Readable, file)
171
172 file.close()
173}
174
175function buildKey (key: string, bucketInfo: BucketInfo) {
176 return bucketInfo.PREFIX + key
177}
178
179// ---------------------------------------------------------------------------
180
9ab330b9
C
181async function createObjectReadStream (options: {
182 key: string
183 bucketInfo: BucketInfo
184 rangeHeader: string
185}) {
186 const { key, bucketInfo, rangeHeader } = options
187
188 const command = new GetObjectCommand({
189 Bucket: bucketInfo.BUCKET_NAME,
190 Key: buildKey(key, bucketInfo),
191 Range: rangeHeader
192 })
193
194 const response = await getClient().send(command)
195
57e11a20
C
196 return {
197 response,
198 stream: response.Body as Readable
199 }
9ab330b9
C
200}
201
202// ---------------------------------------------------------------------------
203
0305db28
JB
204export {
205 BucketInfo,
206 buildKey,
9ab330b9 207
0305db28 208 storeObject,
9ab330b9 209
0305db28 210 removeObject,
aa887096 211 removeObjectByFullKey,
0305db28 212 removePrefix,
9ab330b9 213
cfd57d2c 214 makeAvailable,
9ab330b9
C
215
216 updateObjectACL,
217 updatePrefixACL,
218
219 listKeysOfPrefix,
220 createObjectReadStream
0305db28
JB
221}
222
223// ---------------------------------------------------------------------------
224
23c0b67d 225async function uploadToStorage (options: {
156cdbac 226 content: ReadStream
0305db28
JB
227 objectStorageKey: string
228 bucketInfo: BucketInfo
9ab330b9 229 isPrivate: boolean
0305db28 230}) {
9ab330b9 231 const { content, objectStorageKey, bucketInfo, isPrivate } = options
0305db28 232
156cdbac 233 const input: PutObjectCommandInput = {
234 Body: content,
0305db28 235 Bucket: bucketInfo.BUCKET_NAME,
8180f604 236 Key: buildKey(objectStorageKey, bucketInfo)
f9915efa
DL
237 }
238
8180f604
C
239 const acl = getACL(isPrivate)
240 if (acl) input.ACL = acl
241
156cdbac 242 const parallelUploads3 = new Upload({
243 client: getClient(),
244 queueSize: 4,
245 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
23c0b67d 246
247 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
248 // More detailed explanation:
249 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
250 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
251 leavePartsOnError: true,
156cdbac 252 params: input
0305db28 253 })
156cdbac 254
23c0b67d 255 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
256 // Check is needed even if the HTTP status code is 200 OK
257 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
258 if (!response.Bucket) {
259 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
260 logger.error(message, { response, ...lTags() })
261 throw new Error(message)
262 }
0305db28
JB
263
264 logger.debug(
156cdbac 265 'Completed %s%s in bucket %s',
266 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
0305db28
JB
267 )
268
9ab330b9
C
269 return getInternalUrl(bucketInfo, objectStorageKey)
270}
271
272async function applyOnPrefix (options: {
273 prefix: string
274 bucketInfo: BucketInfo
275 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
276
277 continuationToken?: string
278}) {
279 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
280
281 const s3Client = getClient()
282
508c1b1e 283 const commandPrefix = buildKey(prefix, bucketInfo)
9ab330b9
C
284 const listCommand = new ListObjectsV2Command({
285 Bucket: bucketInfo.BUCKET_NAME,
286 Prefix: commandPrefix,
287 ContinuationToken: continuationToken
288 })
289
290 const listedObjects = await s3Client.send(listCommand)
291
292 if (isArray(listedObjects.Contents) !== true) {
293 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
294
295 logger.error(message, { response: listedObjects, ...lTags() })
296 throw new Error(message)
297 }
298
508c1b1e 299 await map(listedObjects.Contents, object => {
9ab330b9
C
300 const command = commandBuilder(object)
301
508c1b1e
C
302 return s3Client.send(command)
303 }, { concurrency: 10 })
9ab330b9
C
304
305 // Repeat if not all objects could be listed at once (limit of 1000?)
306 if (listedObjects.IsTruncated) {
307 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
308 }
309}
310
311function getACL (isPrivate: boolean) {
312 return isPrivate
313 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
314 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
0305db28 315}