]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/object-storage/shared/object-storage-helpers.ts
Translated using Weblate (Persian)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
index 05b52f412aadc2ce5533715d75f07052b938cdb5..861c490d79b2b11df88c232d8f2a2cca92c1c85a 100644 (file)
@@ -1,3 +1,4 @@
+import { map } from 'bluebird'
 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
 import { dirname } from 'path'
 import { Readable } from 'stream'
@@ -58,15 +59,32 @@ async function storeObject (options: {
   return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
 }
 
+async function storeContent (options: {
+  content: string
+  inputPath: string
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}): Promise<string> {
+  const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+  logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+  return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
+}
+
 // ---------------------------------------------------------------------------
 
-function updateObjectACL (options: {
+async function updateObjectACL (options: {
   objectStorageKey: string
   bucketInfo: BucketInfo
   isPrivate: boolean
 }) {
   const { objectStorageKey, bucketInfo, isPrivate } = options
 
+  const acl = getACL(isPrivate)
+  if (!acl) return
+
   const key = buildKey(objectStorageKey, bucketInfo)
 
   logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
@@ -74,10 +92,10 @@ function updateObjectACL (options: {
   const command = new PutObjectAclCommand({
     Bucket: bucketInfo.BUCKET_NAME,
     Key: key,
-    ACL: getACL(isPrivate)
+    ACL: acl
   })
 
-  return getClient().send(command)
+  await getClient().send(command)
 }
 
 function updatePrefixACL (options: {
@@ -87,16 +105,21 @@ function updatePrefixACL (options: {
 }) {
   const { prefix, bucketInfo, isPrivate } = options
 
+  const acl = getACL(isPrivate)
+  if (!acl) return
+
   logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
 
   return applyOnPrefix({
     prefix,
     bucketInfo,
     commandBuilder: obj => {
+      logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
       return new PutObjectAclCommand({
         Bucket: bucketInfo.BUCKET_NAME,
         Key: obj.Key,
-        ACL: getACL(isPrivate)
+        ACL: acl
       })
     }
   })
@@ -107,17 +130,21 @@ function updatePrefixACL (options: {
 function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
   const key = buildKey(objectStorageKey, bucketInfo)
 
-  logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
+  return removeObjectByFullKey(key, bucketInfo)
+}
+
+function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
+  logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
 
   const command = new DeleteObjectCommand({
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: key
+    Key: fullKey
   })
 
   return getClient().send(command)
 }
 
-function removePrefix (prefix: string, bucketInfo: BucketInfo) {
+async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
   // FIXME: use bulk delete when s3ninja will support this operation
 
   logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
@@ -126,6 +153,8 @@ function removePrefix (prefix: string, bucketInfo: BucketInfo) {
     prefix,
     bucketInfo,
     commandBuilder: obj => {
+      logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
       return new DeleteObjectCommand({
         Bucket: bucketInfo.BUCKET_NAME,
         Key: obj.Key
@@ -178,7 +207,10 @@ async function createObjectReadStream (options: {
 
   const response = await getClient().send(command)
 
-  return response.Body as Readable
+  return {
+    response,
+    stream: response.Body as Readable
+  }
 }
 
 // ---------------------------------------------------------------------------
@@ -188,8 +220,10 @@ export {
   buildKey,
 
   storeObject,
+  storeContent,
 
   removeObject,
+  removeObjectByFullKey,
   removePrefix,
 
   makeAvailable,
@@ -204,7 +238,7 @@ export {
 // ---------------------------------------------------------------------------
 
 async function uploadToStorage (options: {
-  content: ReadStream
+  content: ReadStream | string
   objectStorageKey: string
   bucketInfo: BucketInfo
   isPrivate: boolean
@@ -214,10 +248,12 @@ async function uploadToStorage (options: {
   const input: PutObjectCommandInput = {
     Body: content,
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: buildKey(objectStorageKey, bucketInfo),
-    ACL: getACL(isPrivate)
+    Key: buildKey(objectStorageKey, bucketInfo)
   }
 
+  const acl = getACL(isPrivate)
+  if (acl) input.ACL = acl
+
   const parallelUploads3 = new Upload({
     client: getClient(),
     queueSize: 4,
@@ -242,7 +278,7 @@ async function uploadToStorage (options: {
 
   logger.debug(
     'Completed %s%s in bucket %s',
-    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
+    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
   )
 
   return getInternalUrl(bucketInfo, objectStorageKey)
@@ -259,7 +295,7 @@ async function applyOnPrefix (options: {
 
   const s3Client = getClient()
 
-  const commandPrefix = bucketInfo.PREFIX + prefix
+  const commandPrefix = buildKey(prefix, bucketInfo)
   const listCommand = new ListObjectsV2Command({
     Bucket: bucketInfo.BUCKET_NAME,
     Prefix: commandPrefix,
@@ -275,11 +311,11 @@ async function applyOnPrefix (options: {
     throw new Error(message)
   }
 
-  for (const object of listedObjects.Contents) {
+  await map(listedObjects.Contents, object => {
     const command = commandBuilder(object)
 
-    await s3Client.send(command)
-  }
+    return s3Client.send(command)
+  }, { concurrency: 10 })
 
   // Repeat if not all objects could be listed at once (limit of 1000?)
   if (listedObjects.IsTruncated) {