]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/object-storage/shared/object-storage-helpers.ts
object-storage: @aws-sdk/lib-storage for multipart (#4903)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
1 import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
2 import { dirname } from 'path'
3 import { Readable } from 'stream'
4 import {
5 DeleteObjectCommand,
6 GetObjectCommand,
7 ListObjectsV2Command,
8 PutObjectCommand,
9 PutObjectCommandInput
10 } from '@aws-sdk/client-s3'
11 import { Upload } from '@aws-sdk/lib-storage'
12 import { pipelinePromise } from '@server/helpers/core-utils'
13 import { isArray } from '@server/helpers/custom-validators/misc'
14 import { logger } from '@server/helpers/logger'
15 import { CONFIG } from '@server/initializers/config'
16 import { getPrivateUrl } from '../urls'
17 import { getClient } from './client'
18 import { lTags } from './logger'
19
20 type BucketInfo = {
21 BUCKET_NAME: string
22 PREFIX?: string
23 }
24
25 async function storeObject (options: {
26 inputPath: string
27 objectStorageKey: string
28 bucketInfo: BucketInfo
29 }): Promise<string> {
30 const { inputPath, objectStorageKey, bucketInfo } = options
31
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
33
34 const stats = await stat(inputPath)
35 const fileStream = createReadStream(inputPath)
36
37 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
38 return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
39 }
40
41 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
42 }
43
44 async function removeObject (filename: string, bucketInfo: BucketInfo) {
45 const command = new DeleteObjectCommand({
46 Bucket: bucketInfo.BUCKET_NAME,
47 Key: buildKey(filename, bucketInfo)
48 })
49
50 return getClient().send(command)
51 }
52
53 async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
54 const s3Client = getClient()
55
56 const commandPrefix = bucketInfo.PREFIX + prefix
57 const listCommand = new ListObjectsV2Command({
58 Bucket: bucketInfo.BUCKET_NAME,
59 Prefix: commandPrefix
60 })
61
62 const listedObjects = await s3Client.send(listCommand)
63
64 // FIXME: use bulk delete when s3ninja will support this operation
65 // const deleteParams = {
66 // Bucket: bucketInfo.BUCKET_NAME,
67 // Delete: { Objects: [] }
68 // }
69
70 if (isArray(listedObjects.Contents) !== true) {
71 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
72
73 logger.error(message, { response: listedObjects, ...lTags() })
74 throw new Error(message)
75 }
76
77 for (const object of listedObjects.Contents) {
78 const command = new DeleteObjectCommand({
79 Bucket: bucketInfo.BUCKET_NAME,
80 Key: object.Key
81 })
82
83 await s3Client.send(command)
84
85 // FIXME: use bulk delete when s3ninja will support this operation
86 // deleteParams.Delete.Objects.push({ Key: object.Key })
87 }
88
89 // FIXME: use bulk delete when s3ninja will support this operation
90 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
91 // await s3Client.send(deleteCommand)
92
93 // Repeat if not all objects could be listed at once (limit of 1000?)
94 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
95 }
96
97 async function makeAvailable (options: {
98 key: string
99 destination: string
100 bucketInfo: BucketInfo
101 }) {
102 const { key, destination, bucketInfo } = options
103
104 await ensureDir(dirname(options.destination))
105
106 const command = new GetObjectCommand({
107 Bucket: bucketInfo.BUCKET_NAME,
108 Key: buildKey(key, bucketInfo)
109 })
110 const response = await getClient().send(command)
111
112 const file = createWriteStream(destination)
113 await pipelinePromise(response.Body as Readable, file)
114
115 file.close()
116 }
117
118 function buildKey (key: string, bucketInfo: BucketInfo) {
119 return bucketInfo.PREFIX + key
120 }
121
122 // ---------------------------------------------------------------------------
123
124 export {
125 BucketInfo,
126 buildKey,
127 storeObject,
128 removeObject,
129 removePrefix,
130 makeAvailable
131 }
132
133 // ---------------------------------------------------------------------------
134
135 async function objectStoragePut (options: {
136 objectStorageKey: string
137 content: ReadStream
138 bucketInfo: BucketInfo
139 }) {
140 const { objectStorageKey, content, bucketInfo } = options
141
142 const input: PutObjectCommandInput = {
143 Bucket: bucketInfo.BUCKET_NAME,
144 Key: buildKey(objectStorageKey, bucketInfo),
145 Body: content
146 }
147
148 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
149 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
150 }
151
152 const command = new PutObjectCommand(input)
153
154 await getClient().send(command)
155
156 return getPrivateUrl(bucketInfo, objectStorageKey)
157 }
158
159 async function multiPartUpload (options: {
160 content: ReadStream
161 objectStorageKey: string
162 bucketInfo: BucketInfo
163 }) {
164 const { content, objectStorageKey, bucketInfo } = options
165
166 const input: PutObjectCommandInput = {
167 Body: content,
168 Bucket: bucketInfo.BUCKET_NAME,
169 Key: buildKey(objectStorageKey, bucketInfo)
170 }
171
172 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
173 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
174 }
175
176 const parallelUploads3 = new Upload({
177 client: getClient(),
178 queueSize: 4,
179 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
180 leavePartsOnError: false,
181 params: input
182 })
183
184 await parallelUploads3.done()
185
186 logger.debug(
187 'Completed %s%s in bucket %s',
188 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
189 )
190
191 return getPrivateUrl(bucketInfo, objectStorageKey)
192 }