]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Translated using Weblate (Persian)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
508c1b1e 1import { map } from 'bluebird'
23c0b67d 2import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
0305db28
JB
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
9ab330b9 6 _Object,
23c0b67d 7 CompleteMultipartUploadCommandOutput,
0305db28
JB
8 DeleteObjectCommand,
9 GetObjectCommand,
10 ListObjectsV2Command,
9ab330b9
C
11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
0305db28 14} from '@aws-sdk/client-s3'
156cdbac 15import { Upload } from '@aws-sdk/lib-storage'
0305db28
JB
16import { pipelinePromise } from '@server/helpers/core-utils'
17import { isArray } from '@server/helpers/custom-validators/misc'
18import { logger } from '@server/helpers/logger'
19import { CONFIG } from '@server/initializers/config'
9ab330b9 20import { getInternalUrl } from '../urls'
0305db28
JB
21import { getClient } from './client'
22import { lTags } from './logger'
23
24type BucketInfo = {
25 BUCKET_NAME: string
26 PREFIX?: string
27}
28
cfd57d2c
C
29async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43}
44
45// ---------------------------------------------------------------------------
46
0305db28
JB
47async function storeObject (options: {
48 inputPath: string
49 objectStorageKey: string
50 bucketInfo: BucketInfo
9ab330b9 51 isPrivate: boolean
0305db28 52}): Promise<string> {
9ab330b9 53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
0305db28
JB
54
55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
56
156cdbac 57 const fileStream = createReadStream(inputPath)
0305db28 58
9ab330b9 59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
0305db28
JB
60}
61
34023e12
C
62async function storeContent (options: {
63 content: string
64 inputPath: string
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}): Promise<string> {
69 const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
70
71 logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
72
73 return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
74}
75
cfd57d2c
C
76// ---------------------------------------------------------------------------
77
8180f604 78async function updateObjectACL (options: {
9ab330b9
C
79 objectStorageKey: string
80 bucketInfo: BucketInfo
81 isPrivate: boolean
82}) {
83 const { objectStorageKey, bucketInfo, isPrivate } = options
84
8180f604
C
85 const acl = getACL(isPrivate)
86 if (!acl) return
87
9ab330b9
C
88 const key = buildKey(objectStorageKey, bucketInfo)
89
90 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
91
92 const command = new PutObjectAclCommand({
0305db28 93 Bucket: bucketInfo.BUCKET_NAME,
9ab330b9 94 Key: key,
8180f604 95 ACL: acl
0305db28
JB
96 })
97
8180f604 98 await getClient().send(command)
0305db28
JB
99}
100
9ab330b9
C
101function updatePrefixACL (options: {
102 prefix: string
103 bucketInfo: BucketInfo
104 isPrivate: boolean
105}) {
106 const { prefix, bucketInfo, isPrivate } = options
107
8180f604
C
108 const acl = getACL(isPrivate)
109 if (!acl) return
110
9ab330b9
C
111 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
112
113 return applyOnPrefix({
114 prefix,
115 bucketInfo,
116 commandBuilder: obj => {
508c1b1e
C
117 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
118
9ab330b9
C
119 return new PutObjectAclCommand({
120 Bucket: bucketInfo.BUCKET_NAME,
121 Key: obj.Key,
8180f604 122 ACL: acl
9ab330b9
C
123 })
124 }
0305db28 125 })
9ab330b9 126}
0305db28 127
9ab330b9 128// ---------------------------------------------------------------------------
0305db28 129
9ab330b9
C
130function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
131 const key = buildKey(objectStorageKey, bucketInfo)
0305db28 132
aa887096
C
133 return removeObjectByFullKey(key, bucketInfo)
134}
135
136function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
137 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
0305db28 138
9ab330b9
C
139 const command = new DeleteObjectCommand({
140 Bucket: bucketInfo.BUCKET_NAME,
aa887096 141 Key: fullKey
9ab330b9 142 })
0305db28 143
9ab330b9
C
144 return getClient().send(command)
145}
0305db28 146
508c1b1e 147async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
0305db28 148 // FIXME: use bulk delete when s3ninja will support this operation
0305db28 149
9ab330b9
C
150 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
151
152 return applyOnPrefix({
153 prefix,
154 bucketInfo,
155 commandBuilder: obj => {
508c1b1e
C
156 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
157
9ab330b9
C
158 return new DeleteObjectCommand({
159 Bucket: bucketInfo.BUCKET_NAME,
160 Key: obj.Key
161 })
162 }
163 })
0305db28
JB
164}
165
cfd57d2c
C
166// ---------------------------------------------------------------------------
167
0305db28
JB
168async function makeAvailable (options: {
169 key: string
170 destination: string
171 bucketInfo: BucketInfo
172}) {
173 const { key, destination, bucketInfo } = options
174
175 await ensureDir(dirname(options.destination))
176
177 const command = new GetObjectCommand({
178 Bucket: bucketInfo.BUCKET_NAME,
179 Key: buildKey(key, bucketInfo)
180 })
181 const response = await getClient().send(command)
182
183 const file = createWriteStream(destination)
184 await pipelinePromise(response.Body as Readable, file)
185
186 file.close()
187}
188
189function buildKey (key: string, bucketInfo: BucketInfo) {
190 return bucketInfo.PREFIX + key
191}
192
193// ---------------------------------------------------------------------------
194
9ab330b9
C
195async function createObjectReadStream (options: {
196 key: string
197 bucketInfo: BucketInfo
198 rangeHeader: string
199}) {
200 const { key, bucketInfo, rangeHeader } = options
201
202 const command = new GetObjectCommand({
203 Bucket: bucketInfo.BUCKET_NAME,
204 Key: buildKey(key, bucketInfo),
205 Range: rangeHeader
206 })
207
208 const response = await getClient().send(command)
209
57e11a20
C
210 return {
211 response,
212 stream: response.Body as Readable
213 }
9ab330b9
C
214}
215
216// ---------------------------------------------------------------------------
217
0305db28
JB
218export {
219 BucketInfo,
220 buildKey,
9ab330b9 221
0305db28 222 storeObject,
34023e12 223 storeContent,
9ab330b9 224
0305db28 225 removeObject,
aa887096 226 removeObjectByFullKey,
0305db28 227 removePrefix,
9ab330b9 228
cfd57d2c 229 makeAvailable,
9ab330b9
C
230
231 updateObjectACL,
232 updatePrefixACL,
233
234 listKeysOfPrefix,
235 createObjectReadStream
0305db28
JB
236}
237
238// ---------------------------------------------------------------------------
239
23c0b67d 240async function uploadToStorage (options: {
34023e12 241 content: ReadStream | string
0305db28
JB
242 objectStorageKey: string
243 bucketInfo: BucketInfo
9ab330b9 244 isPrivate: boolean
0305db28 245}) {
9ab330b9 246 const { content, objectStorageKey, bucketInfo, isPrivate } = options
0305db28 247
156cdbac 248 const input: PutObjectCommandInput = {
249 Body: content,
0305db28 250 Bucket: bucketInfo.BUCKET_NAME,
8180f604 251 Key: buildKey(objectStorageKey, bucketInfo)
f9915efa
DL
252 }
253
8180f604
C
254 const acl = getACL(isPrivate)
255 if (acl) input.ACL = acl
256
156cdbac 257 const parallelUploads3 = new Upload({
258 client: getClient(),
259 queueSize: 4,
260 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
23c0b67d 261
262 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
263 // More detailed explanation:
264 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
265 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
266 leavePartsOnError: true,
156cdbac 267 params: input
0305db28 268 })
156cdbac 269
23c0b67d 270 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
271 // Check is needed even if the HTTP status code is 200 OK
272 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
273 if (!response.Bucket) {
274 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
275 logger.error(message, { response, ...lTags() })
276 throw new Error(message)
277 }
0305db28
JB
278
279 logger.debug(
156cdbac 280 'Completed %s%s in bucket %s',
49b80bd9 281 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
0305db28
JB
282 )
283
9ab330b9
C
284 return getInternalUrl(bucketInfo, objectStorageKey)
285}
286
287async function applyOnPrefix (options: {
288 prefix: string
289 bucketInfo: BucketInfo
290 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
291
292 continuationToken?: string
293}) {
294 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
295
296 const s3Client = getClient()
297
508c1b1e 298 const commandPrefix = buildKey(prefix, bucketInfo)
9ab330b9
C
299 const listCommand = new ListObjectsV2Command({
300 Bucket: bucketInfo.BUCKET_NAME,
301 Prefix: commandPrefix,
302 ContinuationToken: continuationToken
303 })
304
305 const listedObjects = await s3Client.send(listCommand)
306
307 if (isArray(listedObjects.Contents) !== true) {
308 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
309
310 logger.error(message, { response: listedObjects, ...lTags() })
311 throw new Error(message)
312 }
313
508c1b1e 314 await map(listedObjects.Contents, object => {
9ab330b9
C
315 const command = commandBuilder(object)
316
508c1b1e
C
317 return s3Client.send(command)
318 }, { concurrency: 10 })
9ab330b9
C
319
320 // Repeat if not all objects could be listed at once (limit of 1000?)
321 if (listedObjects.IsTruncated) {
322 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
323 }
324}
325
326function getACL (isPrivate: boolean) {
327 return isPrivate
328 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
329 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
0305db28 330}