]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
object-storage: @aws-sdk/lib-storage for multipart (#4903)
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
156cdbac 1import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
0305db28
JB
2import { dirname } from 'path'
3import { Readable } from 'stream'
4import {
0305db28
JB
5 DeleteObjectCommand,
6 GetObjectCommand,
7 ListObjectsV2Command,
8 PutObjectCommand,
156cdbac 9 PutObjectCommandInput
0305db28 10} from '@aws-sdk/client-s3'
156cdbac 11import { Upload } from '@aws-sdk/lib-storage'
0305db28
JB
12import { pipelinePromise } from '@server/helpers/core-utils'
13import { isArray } from '@server/helpers/custom-validators/misc'
14import { logger } from '@server/helpers/logger'
15import { CONFIG } from '@server/initializers/config'
16import { getPrivateUrl } from '../urls'
17import { getClient } from './client'
18import { lTags } from './logger'
19
20type BucketInfo = {
21 BUCKET_NAME: string
22 PREFIX?: string
23}
24
25async function storeObject (options: {
26 inputPath: string
27 objectStorageKey: string
28 bucketInfo: BucketInfo
29}): Promise<string> {
30 const { inputPath, objectStorageKey, bucketInfo } = options
31
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
33
34 const stats = await stat(inputPath)
156cdbac 35 const fileStream = createReadStream(inputPath)
0305db28 36
0305db28 37 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
156cdbac 38 return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
0305db28
JB
39 }
40
0305db28
JB
41 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
42}
43
44async function removeObject (filename: string, bucketInfo: BucketInfo) {
45 const command = new DeleteObjectCommand({
46 Bucket: bucketInfo.BUCKET_NAME,
47 Key: buildKey(filename, bucketInfo)
48 })
49
50 return getClient().send(command)
51}
52
53async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
54 const s3Client = getClient()
55
56 const commandPrefix = bucketInfo.PREFIX + prefix
57 const listCommand = new ListObjectsV2Command({
58 Bucket: bucketInfo.BUCKET_NAME,
59 Prefix: commandPrefix
60 })
61
62 const listedObjects = await s3Client.send(listCommand)
63
64 // FIXME: use bulk delete when s3ninja will support this operation
65 // const deleteParams = {
66 // Bucket: bucketInfo.BUCKET_NAME,
67 // Delete: { Objects: [] }
68 // }
69
70 if (isArray(listedObjects.Contents) !== true) {
71 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
72
73 logger.error(message, { response: listedObjects, ...lTags() })
74 throw new Error(message)
75 }
76
77 for (const object of listedObjects.Contents) {
78 const command = new DeleteObjectCommand({
79 Bucket: bucketInfo.BUCKET_NAME,
80 Key: object.Key
81 })
82
83 await s3Client.send(command)
84
85 // FIXME: use bulk delete when s3ninja will support this operation
86 // deleteParams.Delete.Objects.push({ Key: object.Key })
87 }
88
89 // FIXME: use bulk delete when s3ninja will support this operation
90 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
91 // await s3Client.send(deleteCommand)
92
93 // Repeat if not all objects could be listed at once (limit of 1000?)
94 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
95}
96
97async function makeAvailable (options: {
98 key: string
99 destination: string
100 bucketInfo: BucketInfo
101}) {
102 const { key, destination, bucketInfo } = options
103
104 await ensureDir(dirname(options.destination))
105
106 const command = new GetObjectCommand({
107 Bucket: bucketInfo.BUCKET_NAME,
108 Key: buildKey(key, bucketInfo)
109 })
110 const response = await getClient().send(command)
111
112 const file = createWriteStream(destination)
113 await pipelinePromise(response.Body as Readable, file)
114
115 file.close()
116}
117
118function buildKey (key: string, bucketInfo: BucketInfo) {
119 return bucketInfo.PREFIX + key
120}
121
122// ---------------------------------------------------------------------------
123
124export {
125 BucketInfo,
126 buildKey,
127 storeObject,
128 removeObject,
129 removePrefix,
130 makeAvailable
131}
132
133// ---------------------------------------------------------------------------
134
135async function objectStoragePut (options: {
136 objectStorageKey: string
137 content: ReadStream
138 bucketInfo: BucketInfo
139}) {
140 const { objectStorageKey, content, bucketInfo } = options
141
f9915efa 142 const input: PutObjectCommandInput = {
0305db28
JB
143 Bucket: bucketInfo.BUCKET_NAME,
144 Key: buildKey(objectStorageKey, bucketInfo),
f9915efa
DL
145 Body: content
146 }
147
148 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
149 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
150 }
151
152 const command = new PutObjectCommand(input)
0305db28
JB
153
154 await getClient().send(command)
155
156 return getPrivateUrl(bucketInfo, objectStorageKey)
157}
158
159async function multiPartUpload (options: {
156cdbac 160 content: ReadStream
0305db28
JB
161 objectStorageKey: string
162 bucketInfo: BucketInfo
163}) {
156cdbac 164 const { content, objectStorageKey, bucketInfo } = options
0305db28 165
156cdbac 166 const input: PutObjectCommandInput = {
167 Body: content,
0305db28 168 Bucket: bucketInfo.BUCKET_NAME,
f9915efa
DL
169 Key: buildKey(objectStorageKey, bucketInfo)
170 }
171
172 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
173 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
174 }
175
156cdbac 176 const parallelUploads3 = new Upload({
177 client: getClient(),
178 queueSize: 4,
179 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
180 leavePartsOnError: false,
181 params: input
0305db28 182 })
156cdbac 183
184 await parallelUploads3.done()
0305db28
JB
185
186 logger.debug(
156cdbac 187 'Completed %s%s in bucket %s',
188 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
0305db28
JB
189 )
190
191 return getPrivateUrl(bucketInfo, objectStorageKey)
192}