]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Use private ACL for private videos in s3
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
23c0b67d 1import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
0305db28
JB
2import { dirname } from 'path'
3import { Readable } from 'stream'
4import {
9ab330b9 5 _Object,
23c0b67d 6 CompleteMultipartUploadCommandOutput,
0305db28
JB
7 DeleteObjectCommand,
8 GetObjectCommand,
9 ListObjectsV2Command,
9ab330b9
C
10 PutObjectAclCommand,
11 PutObjectCommandInput,
12 S3Client
0305db28 13} from '@aws-sdk/client-s3'
156cdbac 14import { Upload } from '@aws-sdk/lib-storage'
0305db28
JB
15import { pipelinePromise } from '@server/helpers/core-utils'
16import { isArray } from '@server/helpers/custom-validators/misc'
17import { logger } from '@server/helpers/logger'
18import { CONFIG } from '@server/initializers/config'
9ab330b9 19import { getInternalUrl } from '../urls'
0305db28
JB
20import { getClient } from './client'
21import { lTags } from './logger'
22
23type BucketInfo = {
24 BUCKET_NAME: string
25 PREFIX?: string
26}
27
cfd57d2c
C
28async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
29 const s3Client = getClient()
30
31 const commandPrefix = bucketInfo.PREFIX + prefix
32 const listCommand = new ListObjectsV2Command({
33 Bucket: bucketInfo.BUCKET_NAME,
34 Prefix: commandPrefix
35 })
36
37 const listedObjects = await s3Client.send(listCommand)
38
39 if (isArray(listedObjects.Contents) !== true) return []
40
41 return listedObjects.Contents.map(c => c.Key)
42}
43
44// ---------------------------------------------------------------------------
45
0305db28
JB
46async function storeObject (options: {
47 inputPath: string
48 objectStorageKey: string
49 bucketInfo: BucketInfo
9ab330b9 50 isPrivate: boolean
0305db28 51}): Promise<string> {
9ab330b9 52 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
0305db28
JB
53
54 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
55
156cdbac 56 const fileStream = createReadStream(inputPath)
0305db28 57
9ab330b9 58 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
0305db28
JB
59}
60
cfd57d2c
C
61// ---------------------------------------------------------------------------
62
9ab330b9
C
63function updateObjectACL (options: {
64 objectStorageKey: string
65 bucketInfo: BucketInfo
66 isPrivate: boolean
67}) {
68 const { objectStorageKey, bucketInfo, isPrivate } = options
69
70 const key = buildKey(objectStorageKey, bucketInfo)
71
72 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
73
74 const command = new PutObjectAclCommand({
0305db28 75 Bucket: bucketInfo.BUCKET_NAME,
9ab330b9
C
76 Key: key,
77 ACL: getACL(isPrivate)
0305db28
JB
78 })
79
80 return getClient().send(command)
81}
82
9ab330b9
C
83function updatePrefixACL (options: {
84 prefix: string
85 bucketInfo: BucketInfo
86 isPrivate: boolean
87}) {
88 const { prefix, bucketInfo, isPrivate } = options
89
90 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
91
92 return applyOnPrefix({
93 prefix,
94 bucketInfo,
95 commandBuilder: obj => {
96 return new PutObjectAclCommand({
97 Bucket: bucketInfo.BUCKET_NAME,
98 Key: obj.Key,
99 ACL: getACL(isPrivate)
100 })
101 }
0305db28 102 })
9ab330b9 103}
0305db28 104
9ab330b9 105// ---------------------------------------------------------------------------
0305db28 106
9ab330b9
C
107function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
108 const key = buildKey(objectStorageKey, bucketInfo)
0305db28 109
9ab330b9 110 logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
0305db28 111
9ab330b9
C
112 const command = new DeleteObjectCommand({
113 Bucket: bucketInfo.BUCKET_NAME,
114 Key: key
115 })
0305db28 116
9ab330b9
C
117 return getClient().send(command)
118}
0305db28 119
9ab330b9 120function removePrefix (prefix: string, bucketInfo: BucketInfo) {
0305db28 121 // FIXME: use bulk delete when s3ninja will support this operation
0305db28 122
9ab330b9
C
123 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
124
125 return applyOnPrefix({
126 prefix,
127 bucketInfo,
128 commandBuilder: obj => {
129 return new DeleteObjectCommand({
130 Bucket: bucketInfo.BUCKET_NAME,
131 Key: obj.Key
132 })
133 }
134 })
0305db28
JB
135}
136
cfd57d2c
C
137// ---------------------------------------------------------------------------
138
0305db28
JB
139async function makeAvailable (options: {
140 key: string
141 destination: string
142 bucketInfo: BucketInfo
143}) {
144 const { key, destination, bucketInfo } = options
145
146 await ensureDir(dirname(options.destination))
147
148 const command = new GetObjectCommand({
149 Bucket: bucketInfo.BUCKET_NAME,
150 Key: buildKey(key, bucketInfo)
151 })
152 const response = await getClient().send(command)
153
154 const file = createWriteStream(destination)
155 await pipelinePromise(response.Body as Readable, file)
156
157 file.close()
158}
159
160function buildKey (key: string, bucketInfo: BucketInfo) {
161 return bucketInfo.PREFIX + key
162}
163
164// ---------------------------------------------------------------------------
165
9ab330b9
C
166async function createObjectReadStream (options: {
167 key: string
168 bucketInfo: BucketInfo
169 rangeHeader: string
170}) {
171 const { key, bucketInfo, rangeHeader } = options
172
173 const command = new GetObjectCommand({
174 Bucket: bucketInfo.BUCKET_NAME,
175 Key: buildKey(key, bucketInfo),
176 Range: rangeHeader
177 })
178
179 const response = await getClient().send(command)
180
181 return response.Body as Readable
182}
183
184// ---------------------------------------------------------------------------
185
0305db28
JB
186export {
187 BucketInfo,
188 buildKey,
9ab330b9 189
0305db28 190 storeObject,
9ab330b9 191
0305db28
JB
192 removeObject,
193 removePrefix,
9ab330b9 194
cfd57d2c 195 makeAvailable,
9ab330b9
C
196
197 updateObjectACL,
198 updatePrefixACL,
199
200 listKeysOfPrefix,
201 createObjectReadStream
0305db28
JB
202}
203
204// ---------------------------------------------------------------------------
205
23c0b67d 206async function uploadToStorage (options: {
156cdbac 207 content: ReadStream
0305db28
JB
208 objectStorageKey: string
209 bucketInfo: BucketInfo
9ab330b9 210 isPrivate: boolean
0305db28 211}) {
9ab330b9 212 const { content, objectStorageKey, bucketInfo, isPrivate } = options
0305db28 213
156cdbac 214 const input: PutObjectCommandInput = {
215 Body: content,
0305db28 216 Bucket: bucketInfo.BUCKET_NAME,
9ab330b9
C
217 Key: buildKey(objectStorageKey, bucketInfo),
218 ACL: getACL(isPrivate)
f9915efa
DL
219 }
220
156cdbac 221 const parallelUploads3 = new Upload({
222 client: getClient(),
223 queueSize: 4,
224 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
23c0b67d 225
226 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
227 // More detailed explanation:
228 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
229 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
230 leavePartsOnError: true,
156cdbac 231 params: input
0305db28 232 })
156cdbac 233
23c0b67d 234 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
235 // Check is needed even if the HTTP status code is 200 OK
236 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
237 if (!response.Bucket) {
238 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
239 logger.error(message, { response, ...lTags() })
240 throw new Error(message)
241 }
0305db28
JB
242
243 logger.debug(
156cdbac 244 'Completed %s%s in bucket %s',
245 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
0305db28
JB
246 )
247
9ab330b9
C
248 return getInternalUrl(bucketInfo, objectStorageKey)
249}
250
251async function applyOnPrefix (options: {
252 prefix: string
253 bucketInfo: BucketInfo
254 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
255
256 continuationToken?: string
257}) {
258 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
259
260 const s3Client = getClient()
261
262 const commandPrefix = bucketInfo.PREFIX + prefix
263 const listCommand = new ListObjectsV2Command({
264 Bucket: bucketInfo.BUCKET_NAME,
265 Prefix: commandPrefix,
266 ContinuationToken: continuationToken
267 })
268
269 const listedObjects = await s3Client.send(listCommand)
270
271 if (isArray(listedObjects.Contents) !== true) {
272 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
273
274 logger.error(message, { response: listedObjects, ...lTags() })
275 throw new Error(message)
276 }
277
278 for (const object of listedObjects.Contents) {
279 const command = commandBuilder(object)
280
281 await s3Client.send(command)
282 }
283
284 // Repeat if not all objects could be listed at once (limit of 1000?)
285 if (listedObjects.IsTruncated) {
286 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
287 }
288}
289
290function getACL (isPrivate: boolean) {
291 return isPrivate
292 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
293 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
0305db28 294}