]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/object-storage/shared/object-storage-helpers.ts
Merge branch 'release/4.3.0' into develop
[github/Chocobozzz/PeerTube.git] / server / lib / object-storage / shared / object-storage-helpers.ts
CommitLineData
23c0b67d 1import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
0305db28
JB
2import { dirname } from 'path'
3import { Readable } from 'stream'
4import {
23c0b67d 5 CompleteMultipartUploadCommandOutput,
0305db28
JB
6 DeleteObjectCommand,
7 GetObjectCommand,
8 ListObjectsV2Command,
156cdbac 9 PutObjectCommandInput
0305db28 10} from '@aws-sdk/client-s3'
156cdbac 11import { Upload } from '@aws-sdk/lib-storage'
0305db28
JB
12import { pipelinePromise } from '@server/helpers/core-utils'
13import { isArray } from '@server/helpers/custom-validators/misc'
14import { logger } from '@server/helpers/logger'
15import { CONFIG } from '@server/initializers/config'
16import { getPrivateUrl } from '../urls'
17import { getClient } from './client'
18import { lTags } from './logger'
19
20type BucketInfo = {
21 BUCKET_NAME: string
22 PREFIX?: string
23}
24
cfd57d2c
C
25async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
26 const s3Client = getClient()
27
28 const commandPrefix = bucketInfo.PREFIX + prefix
29 const listCommand = new ListObjectsV2Command({
30 Bucket: bucketInfo.BUCKET_NAME,
31 Prefix: commandPrefix
32 })
33
34 const listedObjects = await s3Client.send(listCommand)
35
36 if (isArray(listedObjects.Contents) !== true) return []
37
38 return listedObjects.Contents.map(c => c.Key)
39}
40
41// ---------------------------------------------------------------------------
42
0305db28
JB
43async function storeObject (options: {
44 inputPath: string
45 objectStorageKey: string
46 bucketInfo: BucketInfo
47}): Promise<string> {
48 const { inputPath, objectStorageKey, bucketInfo } = options
49
50 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
51
156cdbac 52 const fileStream = createReadStream(inputPath)
0305db28 53
23c0b67d 54 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
0305db28
JB
55}
56
cfd57d2c
C
57// ---------------------------------------------------------------------------
58
0305db28
JB
59async function removeObject (filename: string, bucketInfo: BucketInfo) {
60 const command = new DeleteObjectCommand({
61 Bucket: bucketInfo.BUCKET_NAME,
62 Key: buildKey(filename, bucketInfo)
63 })
64
65 return getClient().send(command)
66}
67
68async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
69 const s3Client = getClient()
70
71 const commandPrefix = bucketInfo.PREFIX + prefix
72 const listCommand = new ListObjectsV2Command({
73 Bucket: bucketInfo.BUCKET_NAME,
74 Prefix: commandPrefix
75 })
76
77 const listedObjects = await s3Client.send(listCommand)
78
79 // FIXME: use bulk delete when s3ninja will support this operation
80 // const deleteParams = {
81 // Bucket: bucketInfo.BUCKET_NAME,
82 // Delete: { Objects: [] }
83 // }
84
85 if (isArray(listedObjects.Contents) !== true) {
86 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
87
88 logger.error(message, { response: listedObjects, ...lTags() })
89 throw new Error(message)
90 }
91
92 for (const object of listedObjects.Contents) {
93 const command = new DeleteObjectCommand({
94 Bucket: bucketInfo.BUCKET_NAME,
95 Key: object.Key
96 })
97
98 await s3Client.send(command)
99
100 // FIXME: use bulk delete when s3ninja will support this operation
101 // deleteParams.Delete.Objects.push({ Key: object.Key })
102 }
103
104 // FIXME: use bulk delete when s3ninja will support this operation
105 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
106 // await s3Client.send(deleteCommand)
107
108 // Repeat if not all objects could be listed at once (limit of 1000?)
109 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
110}
111
cfd57d2c
C
112// ---------------------------------------------------------------------------
113
0305db28
JB
114async function makeAvailable (options: {
115 key: string
116 destination: string
117 bucketInfo: BucketInfo
118}) {
119 const { key, destination, bucketInfo } = options
120
121 await ensureDir(dirname(options.destination))
122
123 const command = new GetObjectCommand({
124 Bucket: bucketInfo.BUCKET_NAME,
125 Key: buildKey(key, bucketInfo)
126 })
127 const response = await getClient().send(command)
128
129 const file = createWriteStream(destination)
130 await pipelinePromise(response.Body as Readable, file)
131
132 file.close()
133}
134
135function buildKey (key: string, bucketInfo: BucketInfo) {
136 return bucketInfo.PREFIX + key
137}
138
139// ---------------------------------------------------------------------------
140
141export {
142 BucketInfo,
143 buildKey,
144 storeObject,
145 removeObject,
146 removePrefix,
cfd57d2c
C
147 makeAvailable,
148 listKeysOfPrefix
0305db28
JB
149}
150
151// ---------------------------------------------------------------------------
152
23c0b67d 153async function uploadToStorage (options: {
156cdbac 154 content: ReadStream
0305db28
JB
155 objectStorageKey: string
156 bucketInfo: BucketInfo
157}) {
156cdbac 158 const { content, objectStorageKey, bucketInfo } = options
0305db28 159
156cdbac 160 const input: PutObjectCommandInput = {
161 Body: content,
0305db28 162 Bucket: bucketInfo.BUCKET_NAME,
f9915efa
DL
163 Key: buildKey(objectStorageKey, bucketInfo)
164 }
165
166 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
167 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
168 }
169
156cdbac 170 const parallelUploads3 = new Upload({
171 client: getClient(),
172 queueSize: 4,
173 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
23c0b67d 174
175 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
176 // More detailed explanation:
177 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
178 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
179 leavePartsOnError: true,
156cdbac 180 params: input
0305db28 181 })
156cdbac 182
23c0b67d 183 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
184 // Check is needed even if the HTTP status code is 200 OK
185 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
186 if (!response.Bucket) {
187 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
188 logger.error(message, { response, ...lTags() })
189 throw new Error(message)
190 }
0305db28
JB
191
192 logger.debug(
156cdbac 193 'Completed %s%s in bucket %s',
194 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
0305db28
JB
195 )
196
197 return getPrivateUrl(bucketInfo, objectStorageKey)
198}