]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/object-storage/shared/object-storage-helpers.ts
Translated using Weblate (Persian)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
index e2321690740fab654346d553a281f79bb7ae23b0..861c490d79b2b11df88c232d8f2a2cca92c1c85a 100644 (file)
@@ -1,22 +1,23 @@
-import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
-import { min } from 'lodash'
+import { map } from 'bluebird'
+import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
 import { dirname } from 'path'
 import { Readable } from 'stream'
 import {
-  CompletedPart,
-  CompleteMultipartUploadCommand,
-  CreateMultipartUploadCommand,
+  _Object,
+  CompleteMultipartUploadCommandOutput,
   DeleteObjectCommand,
   GetObjectCommand,
   ListObjectsV2Command,
-  PutObjectCommand,
-  UploadPartCommand
+  PutObjectAclCommand,
+  PutObjectCommandInput,
+  S3Client
 } from '@aws-sdk/client-s3'
+import { Upload } from '@aws-sdk/lib-storage'
 import { pipelinePromise } from '@server/helpers/core-utils'
 import { isArray } from '@server/helpers/custom-validators/misc'
 import { logger } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
-import { getPrivateUrl } from '../urls'
+import { getInternalUrl } from '../urls'
 import { getClient } from './client'
 import { lTags } from './logger'
 
@@ -25,79 +26,145 @@ type BucketInfo = {
   PREFIX?: string
 }
 
+async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
+  const s3Client = getClient()
+
+  const commandPrefix = bucketInfo.PREFIX + prefix
+  const listCommand = new ListObjectsV2Command({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Prefix: commandPrefix
+  })
+
+  const listedObjects = await s3Client.send(listCommand)
+
+  if (isArray(listedObjects.Contents) !== true) return []
+
+  return listedObjects.Contents.map(c => c.Key)
+}
+
+// ---------------------------------------------------------------------------
+
 async function storeObject (options: {
   inputPath: string
   objectStorageKey: string
   bucketInfo: BucketInfo
+  isPrivate: boolean
 }): Promise<string> {
-  const { inputPath, objectStorageKey, bucketInfo } = options
+  const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
 
   logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
 
-  const stats = await stat(inputPath)
+  const fileStream = createReadStream(inputPath)
 
-  // If bigger than max allowed size we do a multipart upload
-  if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
-    return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
-  }
+  return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
+}
 
-  const fileStream = createReadStream(inputPath)
-  return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
+async function storeContent (options: {
+  content: string
+  inputPath: string
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}): Promise<string> {
+  const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+  logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+  return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
 }
 
-async function removeObject (filename: string, bucketInfo: BucketInfo) {
-  const command = new DeleteObjectCommand({
+// ---------------------------------------------------------------------------
+
+async function updateObjectACL (options: {
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}) {
+  const { objectStorageKey, bucketInfo, isPrivate } = options
+
+  const acl = getACL(isPrivate)
+  if (!acl) return
+
+  const key = buildKey(objectStorageKey, bucketInfo)
+
+  logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
+
+  const command = new PutObjectAclCommand({
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: buildKey(filename, bucketInfo)
+    Key: key,
+    ACL: acl
   })
 
-  return getClient().send(command)
+  await getClient().send(command)
 }
 
-async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
-  const s3Client = getClient()
+function updatePrefixACL (options: {
+  prefix: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}) {
+  const { prefix, bucketInfo, isPrivate } = options
 
-  const commandPrefix = bucketInfo.PREFIX + prefix
-  const listCommand = new ListObjectsV2Command({
-    Bucket: bucketInfo.BUCKET_NAME,
-    Prefix: commandPrefix
-  })
+  const acl = getACL(isPrivate)
+  if (!acl) return
 
-  const listedObjects = await s3Client.send(listCommand)
+  logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
 
-  // FIXME: use bulk delete when s3ninja will support this operation
-  // const deleteParams = {
-  //   Bucket: bucketInfo.BUCKET_NAME,
-  //   Delete: { Objects: [] }
-  // }
+  return applyOnPrefix({
+    prefix,
+    bucketInfo,
+    commandBuilder: obj => {
+      logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
 
-  if (isArray(listedObjects.Contents) !== true) {
-    const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+      return new PutObjectAclCommand({
+        Bucket: bucketInfo.BUCKET_NAME,
+        Key: obj.Key,
+        ACL: acl
+      })
+    }
+  })
+}
 
-    logger.error(message, { response: listedObjects, ...lTags() })
-    throw new Error(message)
-  }
+// ---------------------------------------------------------------------------
 
-  for (const object of listedObjects.Contents) {
-    const command = new DeleteObjectCommand({
-      Bucket: bucketInfo.BUCKET_NAME,
-      Key: object.Key
-    })
+function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
+  const key = buildKey(objectStorageKey, bucketInfo)
 
-    await s3Client.send(command)
+  return removeObjectByFullKey(key, bucketInfo)
+}
 
-    // FIXME: use bulk delete when s3ninja will support this operation
-    // deleteParams.Delete.Objects.push({ Key: object.Key })
-  }
+function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
+  logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
+
+  const command = new DeleteObjectCommand({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Key: fullKey
+  })
+
+  return getClient().send(command)
+}
 
+async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
   // FIXME: use bulk delete when s3ninja will support this operation
-  // const deleteCommand = new DeleteObjectsCommand(deleteParams)
-  // await s3Client.send(deleteCommand)
 
-  // Repeat if not all objects could be listed at once (limit of 1000?)
-  if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
+  logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
+
+  return applyOnPrefix({
+    prefix,
+    bucketInfo,
+    commandBuilder: obj => {
+      logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
+
+      return new DeleteObjectCommand({
+        Bucket: bucketInfo.BUCKET_NAME,
+        Key: obj.Key
+      })
+    }
+  })
 }
 
+// ---------------------------------------------------------------------------
+
 async function makeAvailable (options: {
   key: string
   destination: string
@@ -125,105 +192,139 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
 
 // ---------------------------------------------------------------------------
 
+async function createObjectReadStream (options: {
+  key: string
+  bucketInfo: BucketInfo
+  rangeHeader: string
+}) {
+  const { key, bucketInfo, rangeHeader } = options
+
+  const command = new GetObjectCommand({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Key: buildKey(key, bucketInfo),
+    Range: rangeHeader
+  })
+
+  const response = await getClient().send(command)
+
+  return {
+    response,
+    stream: response.Body as Readable
+  }
+}
+
+// ---------------------------------------------------------------------------
+
 export {
   BucketInfo,
   buildKey,
+
   storeObject,
+  storeContent,
+
   removeObject,
+  removeObjectByFullKey,
   removePrefix,
-  makeAvailable
+
+  makeAvailable,
+
+  updateObjectACL,
+  updatePrefixACL,
+
+  listKeysOfPrefix,
+  createObjectReadStream
 }
 
 // ---------------------------------------------------------------------------
 
-async function objectStoragePut (options: {
+async function uploadToStorage (options: {
+  content: ReadStream | string
   objectStorageKey: string
-  content: ReadStream
   bucketInfo: BucketInfo
+  isPrivate: boolean
 }) {
-  const { objectStorageKey, content, bucketInfo } = options
+  const { content, objectStorageKey, bucketInfo, isPrivate } = options
 
-  const command = new PutObjectCommand({
+  const input: PutObjectCommandInput = {
+    Body: content,
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: buildKey(objectStorageKey, bucketInfo),
-    Body: content
+    Key: buildKey(objectStorageKey, bucketInfo)
+  }
+
+  const acl = getACL(isPrivate)
+  if (acl) input.ACL = acl
+
+  const parallelUploads3 = new Upload({
+    client: getClient(),
+    queueSize: 4,
+    partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
+
+    // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
+    // More detailed explanation:
+    // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
+    // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
+    leavePartsOnError: true,
+    params: input
   })
 
-  await getClient().send(command)
+  const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
+  // Check is needed even if the HTTP status code is 200 OK
+  // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
+  if (!response.Bucket) {
+    const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
+    logger.error(message, { response, ...lTags() })
+    throw new Error(message)
+  }
+
+  logger.debug(
+    'Completed %s%s in bucket %s',
+    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
+  )
 
-  return getPrivateUrl(bucketInfo, objectStorageKey)
+  return getInternalUrl(bucketInfo, objectStorageKey)
 }
 
-async function multiPartUpload (options: {
-  inputPath: string
-  objectStorageKey: string
+async function applyOnPrefix (options: {
+  prefix: string
   bucketInfo: BucketInfo
+  commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
+
+  continuationToken?: string
 }) {
-  const { objectStorageKey, inputPath, bucketInfo } = options
+  const { prefix, bucketInfo, commandBuilder, continuationToken } = options
 
-  const key = buildKey(objectStorageKey, bucketInfo)
   const s3Client = getClient()
 
-  const statResult = await stat(inputPath)
-
-  const createMultipartCommand = new CreateMultipartUploadCommand({
+  const commandPrefix = buildKey(prefix, bucketInfo)
+  const listCommand = new ListObjectsV2Command({
     Bucket: bucketInfo.BUCKET_NAME,
-    Key: key
+    Prefix: commandPrefix,
+    ContinuationToken: continuationToken
   })
-  const createResponse = await s3Client.send(createMultipartCommand)
-
-  const fd = await open(inputPath, 'r')
-  let partNumber = 1
-  const parts: CompletedPart[] = []
-  const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
-
-  for (let start = 0; start < statResult.size; start += partSize) {
-    logger.debug(
-      'Uploading part %d of file to %s%s in bucket %s',
-      partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
-    )
-
-    // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
-    // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
-    // streams with start and end set, so it just tries to stat the file in stream.path.
-    // This fails for us because we only want to send part of the file. The stream type
-    // is modified so we can set the byteLength here, which s3 detects because array buffers
-    // have this field set
-    const stream: ReadStream & { byteLength: number } =
-      createReadStream(
-        inputPath,
-        { fd, autoClose: false, start, end: (start + partSize) - 1 }
-      ) as ReadStream & { byteLength: number }
-
-    // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
-    stream.byteLength = min([ statResult.size - start, partSize ])
-
-    const uploadPartCommand = new UploadPartCommand({
-      Bucket: bucketInfo.BUCKET_NAME,
-      Key: key,
-      UploadId: createResponse.UploadId,
-      PartNumber: partNumber,
-      Body: stream
-    })
-    const uploadResponse = await s3Client.send(uploadPartCommand)
-
-    parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
-    partNumber += 1
+
+  const listedObjects = await s3Client.send(listCommand)
+
+  if (isArray(listedObjects.Contents) !== true) {
+    const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
+
+    logger.error(message, { response: listedObjects, ...lTags() })
+    throw new Error(message)
   }
-  await close(fd)
 
-  const completeUploadCommand = new CompleteMultipartUploadCommand({
-    Bucket: bucketInfo.BUCKET_NAME,
-    Key: objectStorageKey,
-    UploadId: createResponse.UploadId,
-    MultipartUpload: { Parts: parts }
-  })
-  await s3Client.send(completeUploadCommand)
+  await map(listedObjects.Contents, object => {
+    const command = commandBuilder(object)
 
-  logger.debug(
-    'Completed %s%s in bucket %s in %d parts',
-    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
-  )
+    return s3Client.send(command)
+  }, { concurrency: 10 })
+
+  // Repeat if not all objects could be listed at once (limit of 1000?)
+  if (listedObjects.IsTruncated) {
+    await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
+  }
+}
 
-  return getPrivateUrl(bucketInfo, objectStorageKey)
+function getACL (isPrivate: boolean) {
+  return isPrivate
+    ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
+    : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
 }