aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/object-storage/shared/object-storage-helpers.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-10-19 10:43:53 +0200
committerChocobozzz <chocobozzz@cpy.re>2022-10-24 14:48:24 +0200
commit9ab330b90decf4edf152ff8e1d2948c065766b2c (patch)
tree29d924f50f7307e8e828a57ecb9ea78623487ce0 /server/lib/object-storage/shared/object-storage-helpers.ts
parent3545e72c686ff1725bbdfd8d16d693e2f4aa75a3 (diff)
downloadPeerTube-9ab330b90decf4edf152ff8e1d2948c065766b2c.tar.gz
PeerTube-9ab330b90decf4edf152ff8e1d2948c065766b2c.tar.zst
PeerTube-9ab330b90decf4edf152ff8e1d2948c065766b2c.zip
Use private ACL for private videos in s3
Diffstat (limited to 'server/lib/object-storage/shared/object-storage-helpers.ts')
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts192
1 files changed, 144 insertions, 48 deletions
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index c131977e8..05b52f412 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -2,18 +2,21 @@ import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-e
2import { dirname } from 'path' 2import { dirname } from 'path'
3import { Readable } from 'stream' 3import { Readable } from 'stream'
4import { 4import {
5 _Object,
5 CompleteMultipartUploadCommandOutput, 6 CompleteMultipartUploadCommandOutput,
6 DeleteObjectCommand, 7 DeleteObjectCommand,
7 GetObjectCommand, 8 GetObjectCommand,
8 ListObjectsV2Command, 9 ListObjectsV2Command,
9 PutObjectCommandInput 10 PutObjectAclCommand,
11 PutObjectCommandInput,
12 S3Client
10} from '@aws-sdk/client-s3' 13} from '@aws-sdk/client-s3'
11import { Upload } from '@aws-sdk/lib-storage' 14import { Upload } from '@aws-sdk/lib-storage'
12import { pipelinePromise } from '@server/helpers/core-utils' 15import { pipelinePromise } from '@server/helpers/core-utils'
13import { isArray } from '@server/helpers/custom-validators/misc' 16import { isArray } from '@server/helpers/custom-validators/misc'
14import { logger } from '@server/helpers/logger' 17import { logger } from '@server/helpers/logger'
15import { CONFIG } from '@server/initializers/config' 18import { CONFIG } from '@server/initializers/config'
16import { getPrivateUrl } from '../urls' 19import { getInternalUrl } from '../urls'
17import { getClient } from './client' 20import { getClient } from './client'
18import { lTags } from './logger' 21import { lTags } from './logger'
19 22
@@ -44,69 +47,91 @@ async function storeObject (options: {
44 inputPath: string 47 inputPath: string
45 objectStorageKey: string 48 objectStorageKey: string
46 bucketInfo: BucketInfo 49 bucketInfo: BucketInfo
50 isPrivate: boolean
47}): Promise<string> { 51}): Promise<string> {
48 const { inputPath, objectStorageKey, bucketInfo } = options 52 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
49 53
50 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) 54 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
51 55
52 const fileStream = createReadStream(inputPath) 56 const fileStream = createReadStream(inputPath)
53 57
54 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) 58 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
55} 59}
56 60
57// --------------------------------------------------------------------------- 61// ---------------------------------------------------------------------------
58 62
59async function removeObject (filename: string, bucketInfo: BucketInfo) { 63function updateObjectACL (options: {
60 const command = new DeleteObjectCommand({ 64 objectStorageKey: string
65 bucketInfo: BucketInfo
66 isPrivate: boolean
67}) {
68 const { objectStorageKey, bucketInfo, isPrivate } = options
69
70 const key = buildKey(objectStorageKey, bucketInfo)
71
72 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
73
74 const command = new PutObjectAclCommand({
61 Bucket: bucketInfo.BUCKET_NAME, 75 Bucket: bucketInfo.BUCKET_NAME,
62 Key: buildKey(filename, bucketInfo) 76 Key: key,
77 ACL: getACL(isPrivate)
63 }) 78 })
64 79
65 return getClient().send(command) 80 return getClient().send(command)
66} 81}
67 82
68async function removePrefix (prefix: string, bucketInfo: BucketInfo) { 83function updatePrefixACL (options: {
69 const s3Client = getClient() 84 prefix: string
70 85 bucketInfo: BucketInfo
71 const commandPrefix = bucketInfo.PREFIX + prefix 86 isPrivate: boolean
72 const listCommand = new ListObjectsV2Command({ 87}) {
73 Bucket: bucketInfo.BUCKET_NAME, 88 const { prefix, bucketInfo, isPrivate } = options
74 Prefix: commandPrefix 89
90 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
91
92 return applyOnPrefix({
93 prefix,
94 bucketInfo,
95 commandBuilder: obj => {
96 return new PutObjectAclCommand({
97 Bucket: bucketInfo.BUCKET_NAME,
98 Key: obj.Key,
99 ACL: getACL(isPrivate)
100 })
101 }
75 }) 102 })
103}
76 104
77 const listedObjects = await s3Client.send(listCommand) 105// ---------------------------------------------------------------------------
78 106
79 // FIXME: use bulk delete when s3ninja will support this operation 107function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
80 // const deleteParams = { 108 const key = buildKey(objectStorageKey, bucketInfo)
81 // Bucket: bucketInfo.BUCKET_NAME,
82 // Delete: { Objects: [] }
83 // }
84 109
85 if (isArray(listedObjects.Contents) !== true) { 110 logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
86 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
87 111
88 logger.error(message, { response: listedObjects, ...lTags() }) 112 const command = new DeleteObjectCommand({
89 throw new Error(message) 113 Bucket: bucketInfo.BUCKET_NAME,
90 } 114 Key: key
91 115 })
92 for (const object of listedObjects.Contents) {
93 const command = new DeleteObjectCommand({
94 Bucket: bucketInfo.BUCKET_NAME,
95 Key: object.Key
96 })
97
98 await s3Client.send(command)
99 116
100 // FIXME: use bulk delete when s3ninja will support this operation 117 return getClient().send(command)
101 // deleteParams.Delete.Objects.push({ Key: object.Key }) 118}
102 }
103 119
120function removePrefix (prefix: string, bucketInfo: BucketInfo) {
104 // FIXME: use bulk delete when s3ninja will support this operation 121 // FIXME: use bulk delete when s3ninja will support this operation
105 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
106 // await s3Client.send(deleteCommand)
107 122
108 // Repeat if not all objects could be listed at once (limit of 1000?) 123 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
109 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) 124
125 return applyOnPrefix({
126 prefix,
127 bucketInfo,
128 commandBuilder: obj => {
129 return new DeleteObjectCommand({
130 Bucket: bucketInfo.BUCKET_NAME,
131 Key: obj.Key
132 })
133 }
134 })
110} 135}
111 136
112// --------------------------------------------------------------------------- 137// ---------------------------------------------------------------------------
@@ -138,14 +163,42 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
138 163
139// --------------------------------------------------------------------------- 164// ---------------------------------------------------------------------------
140 165
166async function createObjectReadStream (options: {
167 key: string
168 bucketInfo: BucketInfo
169 rangeHeader: string
170}) {
171 const { key, bucketInfo, rangeHeader } = options
172
173 const command = new GetObjectCommand({
174 Bucket: bucketInfo.BUCKET_NAME,
175 Key: buildKey(key, bucketInfo),
176 Range: rangeHeader
177 })
178
179 const response = await getClient().send(command)
180
181 return response.Body as Readable
182}
183
184// ---------------------------------------------------------------------------
185
141export { 186export {
142 BucketInfo, 187 BucketInfo,
143 buildKey, 188 buildKey,
189
144 storeObject, 190 storeObject,
191
145 removeObject, 192 removeObject,
146 removePrefix, 193 removePrefix,
194
147 makeAvailable, 195 makeAvailable,
148 listKeysOfPrefix 196
197 updateObjectACL,
198 updatePrefixACL,
199
200 listKeysOfPrefix,
201 createObjectReadStream
149} 202}
150 203
151// --------------------------------------------------------------------------- 204// ---------------------------------------------------------------------------
@@ -154,17 +207,15 @@ async function uploadToStorage (options: {
154 content: ReadStream 207 content: ReadStream
155 objectStorageKey: string 208 objectStorageKey: string
156 bucketInfo: BucketInfo 209 bucketInfo: BucketInfo
210 isPrivate: boolean
157}) { 211}) {
158 const { content, objectStorageKey, bucketInfo } = options 212 const { content, objectStorageKey, bucketInfo, isPrivate } = options
159 213
160 const input: PutObjectCommandInput = { 214 const input: PutObjectCommandInput = {
161 Body: content, 215 Body: content,
162 Bucket: bucketInfo.BUCKET_NAME, 216 Bucket: bucketInfo.BUCKET_NAME,
163 Key: buildKey(objectStorageKey, bucketInfo) 217 Key: buildKey(objectStorageKey, bucketInfo),
164 } 218 ACL: getACL(isPrivate)
165
166 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
167 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
168 } 219 }
169 220
170 const parallelUploads3 = new Upload({ 221 const parallelUploads3 = new Upload({
@@ -194,5 +245,50 @@ async function uploadToStorage (options: {
194 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() 245 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
195 ) 246 )
196 247
197 return getPrivateUrl(bucketInfo, objectStorageKey) 248 return getInternalUrl(bucketInfo, objectStorageKey)
249}
250
251async function applyOnPrefix (options: {
252 prefix: string
253 bucketInfo: BucketInfo
254 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
255
256 continuationToken?: string
257}) {
258 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
259
260 const s3Client = getClient()
261
262 const commandPrefix = bucketInfo.PREFIX + prefix
263 const listCommand = new ListObjectsV2Command({
264 Bucket: bucketInfo.BUCKET_NAME,
265 Prefix: commandPrefix,
266 ContinuationToken: continuationToken
267 })
268
269 const listedObjects = await s3Client.send(listCommand)
270
271 if (isArray(listedObjects.Contents) !== true) {
272 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
273
274 logger.error(message, { response: listedObjects, ...lTags() })
275 throw new Error(message)
276 }
277
278 for (const object of listedObjects.Contents) {
279 const command = commandBuilder(object)
280
281 await s3Client.send(command)
282 }
283
284 // Repeat if not all objects could be listed at once (limit of 1000?)
285 if (listedObjects.IsTruncated) {
286 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
287 }
288}
289
290function getACL (isPrivate: boolean) {
291 return isPrivate
292 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
293 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
198} 294}