]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/object-storage/shared/object-storage-helpers.ts
Use private ACL for private videos in s3
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
index c131977e8c622684018d00e77bc492c492571e57..05b52f412aadc2ce5533715d75f07052b938cdb5 100644 (file)
@@ -2,18 +2,21 @@ import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-e
 import { dirname } from 'path'
 import { Readable } from 'stream'
 import {
+  _Object,
   CompleteMultipartUploadCommandOutput,
   DeleteObjectCommand,
   GetObjectCommand,
   ListObjectsV2Command,
-  PutObjectCommandInput
+  PutObjectAclCommand,
+  PutObjectCommandInput,
+  S3Client
 } from '@aws-sdk/client-s3'
 import { Upload } from '@aws-sdk/lib-storage'
 import { pipelinePromise } from '@server/helpers/core-utils'
 import { isArray } from '@server/helpers/custom-validators/misc'
 import { logger } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
-import { getPrivateUrl } from '../urls'
+import { getInternalUrl } from '../urls'
 import { getClient } from './client'
 import { lTags } from './logger'
 
@@ -44,69 +47,91 @@ async function storeObject (options: {
   inputPath: string
   objectStorageKey: string
   bucketInfo: BucketInfo
+  isPrivate: boolean
 }): Promise<string> {
-  const { inputPath, objectStorageKey, bucketInfo } = options
+  const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
 
   logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
 
   const fileStream = createReadStream(inputPath)
 
-  return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
+  return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
 }
 
 // ---------------------------------------------------------------------------
 
-async function removeObject (filename: string, bucketInfo: BucketInfo) {
-  const command = new DeleteObjectCommand({
+function updateObjectACL (options: {
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}) {
+  const { objectStorageKey, bucketInfo, isPrivate } = options
+
+  const key = buildKey(objectStorageKey, bucketInfo)
+
+  logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
+
+  const command = new PutObjectAclCommand({
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: buildKey(filename, bucketInfo)
+    Key: key,
+    ACL: getACL(isPrivate)
   })
 
   return getClient().send(command)
 }
 
-async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
-  const s3Client = getClient()
-
-  const commandPrefix = bucketInfo.PREFIX + prefix
-  const listCommand = new ListObjectsV2Command({
-    Bucket: bucketInfo.BUCKET_NAME,
-    Prefix: commandPrefix
+function updatePrefixACL (options: {
+  prefix: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}) {
+  const { prefix, bucketInfo, isPrivate } = options
+
+  logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
+
+  return applyOnPrefix({
+    prefix,
+    bucketInfo,
+    commandBuilder: obj => {
+      return new PutObjectAclCommand({
+        Bucket: bucketInfo.BUCKET_NAME,
+        Key: obj.Key,
+        ACL: getACL(isPrivate)
+      })
+    }
   })
+}
 
-  const listedObjects = await s3Client.send(listCommand)
+// ---------------------------------------------------------------------------
 
-  // FIXME: use bulk delete when s3ninja will support this operation
-  // const deleteParams = {
-  //   Bucket: bucketInfo.BUCKET_NAME,
-  //   Delete: { Objects: [] }
-  // }
+function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
+  const key = buildKey(objectStorageKey, bucketInfo)
 
-  if (isArray(listedObjects.Contents) !== true) {
-    const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+  logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
 
-    logger.error(message, { response: listedObjects, ...lTags() })
-    throw new Error(message)
-  }
-
-  for (const object of listedObjects.Contents) {
-    const command = new DeleteObjectCommand({
-      Bucket: bucketInfo.BUCKET_NAME,
-      Key: object.Key
-    })
-
-    await s3Client.send(command)
+  const command = new DeleteObjectCommand({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Key: key
+  })
 
-    // FIXME: use bulk delete when s3ninja will support this operation
-    // deleteParams.Delete.Objects.push({ Key: object.Key })
-  }
+  return getClient().send(command)
+}
 
+function removePrefix (prefix: string, bucketInfo: BucketInfo) {
   // FIXME: use bulk delete when s3ninja will support this operation
-  // const deleteCommand = new DeleteObjectsCommand(deleteParams)
-  // await s3Client.send(deleteCommand)
 
-  // Repeat if not all objects could be listed at once (limit of 1000?)
-  if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
+  logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
+
+  return applyOnPrefix({
+    prefix,
+    bucketInfo,
+    commandBuilder: obj => {
+      return new DeleteObjectCommand({
+        Bucket: bucketInfo.BUCKET_NAME,
+        Key: obj.Key
+      })
+    }
+  })
 }
 
 // ---------------------------------------------------------------------------
@@ -138,14 +163,42 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
 
 // ---------------------------------------------------------------------------
 
+async function createObjectReadStream (options: {
+  key: string
+  bucketInfo: BucketInfo
+  rangeHeader: string
+}) {
+  const { key, bucketInfo, rangeHeader } = options
+
+  const command = new GetObjectCommand({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Key: buildKey(key, bucketInfo),
+    Range: rangeHeader
+  })
+
+  const response = await getClient().send(command)
+
+  return response.Body as Readable
+}
+
+// ---------------------------------------------------------------------------
+
 export {
   BucketInfo,
   buildKey,
+
   storeObject,
+
   removeObject,
   removePrefix,
+
   makeAvailable,
-  listKeysOfPrefix
+
+  updateObjectACL,
+  updatePrefixACL,
+
+  listKeysOfPrefix,
+  createObjectReadStream
 }
 
 // ---------------------------------------------------------------------------
@@ -154,17 +207,15 @@ async function uploadToStorage (options: {
   content: ReadStream
   objectStorageKey: string
   bucketInfo: BucketInfo
+  isPrivate: boolean
 }) {
-  const { content, objectStorageKey, bucketInfo } = options
+  const { content, objectStorageKey, bucketInfo, isPrivate } = options
 
   const input: PutObjectCommandInput = {
     Body: content,
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: buildKey(objectStorageKey, bucketInfo)
-  }
-
-  if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
-    input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
+    Key: buildKey(objectStorageKey, bucketInfo),
+    ACL: getACL(isPrivate)
   }
 
   const parallelUploads3 = new Upload({
@@ -194,5 +245,50 @@ async function uploadToStorage (options: {
     bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
   )
 
-  return getPrivateUrl(bucketInfo, objectStorageKey)
+  return getInternalUrl(bucketInfo, objectStorageKey)
+}
+
+async function applyOnPrefix (options: {
+  prefix: string
+  bucketInfo: BucketInfo
+  commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
+
+  continuationToken?: string
+}) {
+  const { prefix, bucketInfo, commandBuilder, continuationToken } = options
+
+  const s3Client = getClient()
+
+  const commandPrefix = bucketInfo.PREFIX + prefix
+  const listCommand = new ListObjectsV2Command({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Prefix: commandPrefix,
+    ContinuationToken: continuationToken
+  })
+
+  const listedObjects = await s3Client.send(listCommand)
+
+  if (isArray(listedObjects.Contents) !== true) {
+    const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+
+    logger.error(message, { response: listedObjects, ...lTags() })
+    throw new Error(message)
+  }
+
+  for (const object of listedObjects.Contents) {
+    const command = commandBuilder(object)
+
+    await s3Client.send(command)
+  }
+
+  // Repeat if not all objects could be listed at once (limit of 1000?)
+  if (listedObjects.IsTruncated) {
+    await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
+  }
+}
+
+function getACL (isPrivate: boolean) {
+  return isPrivate
+    ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
+    : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
 }