]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/object-storage/shared/object-storage-helpers.ts
Translated using Weblate (Persian)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
index c131977e8c622684018d00e77bc492c492571e57..861c490d79b2b11df88c232d8f2a2cca92c1c85a 100644 (file)
@@ -1,19 +1,23 @@
+import { map } from 'bluebird'
 import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
 import { dirname } from 'path'
 import { Readable } from 'stream'
 import {
+  _Object,
   CompleteMultipartUploadCommandOutput,
   DeleteObjectCommand,
   GetObjectCommand,
   ListObjectsV2Command,
-  PutObjectCommandInput
+  PutObjectAclCommand,
+  PutObjectCommandInput,
+  S3Client
 } from '@aws-sdk/client-s3'
 import { Upload } from '@aws-sdk/lib-storage'
 import { pipelinePromise } from '@server/helpers/core-utils'
 import { isArray } from '@server/helpers/custom-validators/misc'
 import { logger } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
-import { getPrivateUrl } from '../urls'
+import { getInternalUrl } from '../urls'
 import { getClient } from './client'
 import { lTags } from './logger'
 
@@ -44,69 +48,119 @@ async function storeObject (options: {
   inputPath: string
   objectStorageKey: string
   bucketInfo: BucketInfo
+  isPrivate: boolean
 }): Promise<string> {
-  const { inputPath, objectStorageKey, bucketInfo } = options
+  const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
 
   logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
 
   const fileStream = createReadStream(inputPath)
 
-  return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
+  return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
+}
+
+async function storeContent (options: {
+  content: string
+  inputPath: string
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}): Promise<string> {
+  const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+  logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+  return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
 }
 
 // ---------------------------------------------------------------------------
 
-async function removeObject (filename: string, bucketInfo: BucketInfo) {
-  const command = new DeleteObjectCommand({
+async function updateObjectACL (options: {
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}) {
+  const { objectStorageKey, bucketInfo, isPrivate } = options
+
+  const acl = getACL(isPrivate)
+  if (!acl) return
+
+  const key = buildKey(objectStorageKey, bucketInfo)
+
+  logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
+
+  const command = new PutObjectAclCommand({
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: buildKey(filename, bucketInfo)
+    Key: key,
+    ACL: acl
   })
 
-  return getClient().send(command)
+  await getClient().send(command)
 }
 
-async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
-  const s3Client = getClient()
+function updatePrefixACL (options: {
+  prefix: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}) {
+  const { prefix, bucketInfo, isPrivate } = options
 
-  const commandPrefix = bucketInfo.PREFIX + prefix
-  const listCommand = new ListObjectsV2Command({
-    Bucket: bucketInfo.BUCKET_NAME,
-    Prefix: commandPrefix
-  })
+  const acl = getACL(isPrivate)
+  if (!acl) return
 
-  const listedObjects = await s3Client.send(listCommand)
+  logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
 
-  // FIXME: use bulk delete when s3ninja will support this operation
-  // const deleteParams = {
-  //   Bucket: bucketInfo.BUCKET_NAME,
-  //   Delete: { Objects: [] }
-  // }
+  return applyOnPrefix({
+    prefix,
+    bucketInfo,
+    commandBuilder: obj => {
+      logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
 
-  if (isArray(listedObjects.Contents) !== true) {
-    const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+      return new PutObjectAclCommand({
+        Bucket: bucketInfo.BUCKET_NAME,
+        Key: obj.Key,
+        ACL: acl
+      })
+    }
+  })
+}
 
-    logger.error(message, { response: listedObjects, ...lTags() })
-    throw new Error(message)
-  }
+// ---------------------------------------------------------------------------
+
+function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
+  const key = buildKey(objectStorageKey, bucketInfo)
 
-  for (const object of listedObjects.Contents) {
-    const command = new DeleteObjectCommand({
-      Bucket: bucketInfo.BUCKET_NAME,
-      Key: object.Key
-    })
+  return removeObjectByFullKey(key, bucketInfo)
+}
 
-    await s3Client.send(command)
+function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
+  logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
 
-    // FIXME: use bulk delete when s3ninja will support this operation
-    // deleteParams.Delete.Objects.push({ Key: object.Key })
-  }
+  const command = new DeleteObjectCommand({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Key: fullKey
+  })
+
+  return getClient().send(command)
+}
 
+async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
   // FIXME: use bulk delete when s3ninja will support this operation
-  // const deleteCommand = new DeleteObjectsCommand(deleteParams)
-  // await s3Client.send(deleteCommand)
 
-  // Repeat if not all objects could be listed at once (limit of 1000?)
-  if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
+  logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
+
+  return applyOnPrefix({
+    prefix,
+    bucketInfo,
+    commandBuilder: obj => {
+      logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
+      return new DeleteObjectCommand({
+        Bucket: bucketInfo.BUCKET_NAME,
+        Key: obj.Key
+      })
+    }
+  })
 }
 
 // ---------------------------------------------------------------------------
@@ -138,24 +192,58 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
 
 // ---------------------------------------------------------------------------
 
+async function createObjectReadStream (options: {
+  key: string
+  bucketInfo: BucketInfo
+  rangeHeader: string
+}) {
+  const { key, bucketInfo, rangeHeader } = options
+
+  const command = new GetObjectCommand({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Key: buildKey(key, bucketInfo),
+    Range: rangeHeader
+  })
+
+  const response = await getClient().send(command)
+
+  return {
+    response,
+    stream: response.Body as Readable
+  }
+}
+
+// ---------------------------------------------------------------------------
+
 export {
   BucketInfo,
   buildKey,
+
   storeObject,
+  storeContent,
+
   removeObject,
+  removeObjectByFullKey,
   removePrefix,
+
   makeAvailable,
-  listKeysOfPrefix
+
+  updateObjectACL,
+  updatePrefixACL,
+
+  listKeysOfPrefix,
+  createObjectReadStream
 }
 
 // ---------------------------------------------------------------------------
 
 async function uploadToStorage (options: {
-  content: ReadStream
+  content: ReadStream | string
   objectStorageKey: string
   bucketInfo: BucketInfo
+  isPrivate: boolean
 }) {
-  const { content, objectStorageKey, bucketInfo } = options
+  const { content, objectStorageKey, bucketInfo, isPrivate } = options
 
   const input: PutObjectCommandInput = {
     Body: content,
@@ -163,9 +251,8 @@ async function uploadToStorage (options: {
     Key: buildKey(objectStorageKey, bucketInfo)
   }
 
-  if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
-    input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
-  }
+  const acl = getACL(isPrivate)
+  if (acl) input.ACL = acl
 
   const parallelUploads3 = new Upload({
     client: getClient(),
@@ -191,8 +278,53 @@ async function uploadToStorage (options: {
 
   logger.debug(
     'Completed %s%s in bucket %s',
-    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
+    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
   )
 
-  return getPrivateUrl(bucketInfo, objectStorageKey)
+  return getInternalUrl(bucketInfo, objectStorageKey)
+}
+
+async function applyOnPrefix (options: {
+  prefix: string
+  bucketInfo: BucketInfo
+  commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
+
+  continuationToken?: string
+}) {
+  const { prefix, bucketInfo, commandBuilder, continuationToken } = options
+
+  const s3Client = getClient()
+
+  const commandPrefix = buildKey(prefix, bucketInfo)
+  const listCommand = new ListObjectsV2Command({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Prefix: commandPrefix,
+    ContinuationToken: continuationToken
+  })
+
+  const listedObjects = await s3Client.send(listCommand)
+
+  if (isArray(listedObjects.Contents) !== true) {
+    const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+
+    logger.error(message, { response: listedObjects, ...lTags() })
+    throw new Error(message)
+  }
+
+  await map(listedObjects.Contents, object => {
+    const command = commandBuilder(object)
+
+    return s3Client.send(command)
+  }, { concurrency: 10 })
+
+  // Repeat if not all objects could be listed at once (limit of 1000?)
+  if (listedObjects.IsTruncated) {
+    await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
+  }
+}
+
+function getACL (isPrivate: boolean) {
+  return isPrivate
+    ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
+    : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
 }