]>
Commit | Line | Data |
---|---|---|
23c0b67d | 1 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' |
0305db28 JB |
2 | import { dirname } from 'path' |
3 | import { Readable } from 'stream' | |
4 | import { | |
23c0b67d | 5 | CompleteMultipartUploadCommandOutput, |
0305db28 JB |
6 | DeleteObjectCommand, |
7 | GetObjectCommand, | |
8 | ListObjectsV2Command, | |
156cdbac | 9 | PutObjectCommandInput |
0305db28 | 10 | } from '@aws-sdk/client-s3' |
156cdbac | 11 | import { Upload } from '@aws-sdk/lib-storage' |
0305db28 JB |
12 | import { pipelinePromise } from '@server/helpers/core-utils' |
13 | import { isArray } from '@server/helpers/custom-validators/misc' | |
14 | import { logger } from '@server/helpers/logger' | |
15 | import { CONFIG } from '@server/initializers/config' | |
16 | import { getPrivateUrl } from '../urls' | |
17 | import { getClient } from './client' | |
18 | import { lTags } from './logger' | |
19 | ||
20 | type BucketInfo = { | |
21 | BUCKET_NAME: string | |
22 | PREFIX?: string | |
23 | } | |
24 | ||
cfd57d2c C |
25 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { |
26 | const s3Client = getClient() | |
27 | ||
28 | const commandPrefix = bucketInfo.PREFIX + prefix | |
29 | const listCommand = new ListObjectsV2Command({ | |
30 | Bucket: bucketInfo.BUCKET_NAME, | |
31 | Prefix: commandPrefix | |
32 | }) | |
33 | ||
34 | const listedObjects = await s3Client.send(listCommand) | |
35 | ||
36 | if (isArray(listedObjects.Contents) !== true) return [] | |
37 | ||
38 | return listedObjects.Contents.map(c => c.Key) | |
39 | } | |
40 | ||
41 | // --------------------------------------------------------------------------- | |
42 | ||
0305db28 JB |
43 | async function storeObject (options: { |
44 | inputPath: string | |
45 | objectStorageKey: string | |
46 | bucketInfo: BucketInfo | |
47 | }): Promise<string> { | |
48 | const { inputPath, objectStorageKey, bucketInfo } = options | |
49 | ||
50 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
51 | ||
156cdbac | 52 | const fileStream = createReadStream(inputPath) |
0305db28 | 53 | |
23c0b67d | 54 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) |
0305db28 JB |
55 | } |
56 | ||
cfd57d2c C |
57 | // --------------------------------------------------------------------------- |
58 | ||
0305db28 JB |
59 | async function removeObject (filename: string, bucketInfo: BucketInfo) { |
60 | const command = new DeleteObjectCommand({ | |
61 | Bucket: bucketInfo.BUCKET_NAME, | |
62 | Key: buildKey(filename, bucketInfo) | |
63 | }) | |
64 | ||
65 | return getClient().send(command) | |
66 | } | |
67 | ||
68 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |
69 | const s3Client = getClient() | |
70 | ||
71 | const commandPrefix = bucketInfo.PREFIX + prefix | |
72 | const listCommand = new ListObjectsV2Command({ | |
73 | Bucket: bucketInfo.BUCKET_NAME, | |
74 | Prefix: commandPrefix | |
75 | }) | |
76 | ||
77 | const listedObjects = await s3Client.send(listCommand) | |
78 | ||
79 | // FIXME: use bulk delete when s3ninja will support this operation | |
80 | // const deleteParams = { | |
81 | // Bucket: bucketInfo.BUCKET_NAME, | |
82 | // Delete: { Objects: [] } | |
83 | // } | |
84 | ||
85 | if (isArray(listedObjects.Contents) !== true) { | |
86 | const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
87 | ||
88 | logger.error(message, { response: listedObjects, ...lTags() }) | |
89 | throw new Error(message) | |
90 | } | |
91 | ||
92 | for (const object of listedObjects.Contents) { | |
93 | const command = new DeleteObjectCommand({ | |
94 | Bucket: bucketInfo.BUCKET_NAME, | |
95 | Key: object.Key | |
96 | }) | |
97 | ||
98 | await s3Client.send(command) | |
99 | ||
100 | // FIXME: use bulk delete when s3ninja will support this operation | |
101 | // deleteParams.Delete.Objects.push({ Key: object.Key }) | |
102 | } | |
103 | ||
104 | // FIXME: use bulk delete when s3ninja will support this operation | |
105 | // const deleteCommand = new DeleteObjectsCommand(deleteParams) | |
106 | // await s3Client.send(deleteCommand) | |
107 | ||
108 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
109 | if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | |
110 | } | |
111 | ||
cfd57d2c C |
112 | // --------------------------------------------------------------------------- |
113 | ||
0305db28 JB |
114 | async function makeAvailable (options: { |
115 | key: string | |
116 | destination: string | |
117 | bucketInfo: BucketInfo | |
118 | }) { | |
119 | const { key, destination, bucketInfo } = options | |
120 | ||
121 | await ensureDir(dirname(options.destination)) | |
122 | ||
123 | const command = new GetObjectCommand({ | |
124 | Bucket: bucketInfo.BUCKET_NAME, | |
125 | Key: buildKey(key, bucketInfo) | |
126 | }) | |
127 | const response = await getClient().send(command) | |
128 | ||
129 | const file = createWriteStream(destination) | |
130 | await pipelinePromise(response.Body as Readable, file) | |
131 | ||
132 | file.close() | |
133 | } | |
134 | ||
135 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
136 | return bucketInfo.PREFIX + key | |
137 | } | |
138 | ||
139 | // --------------------------------------------------------------------------- | |
140 | ||
141 | export { | |
142 | BucketInfo, | |
143 | buildKey, | |
144 | storeObject, | |
145 | removeObject, | |
146 | removePrefix, | |
cfd57d2c C |
147 | makeAvailable, |
148 | listKeysOfPrefix | |
0305db28 JB |
149 | } |
150 | ||
151 | // --------------------------------------------------------------------------- | |
152 | ||
23c0b67d | 153 | async function uploadToStorage (options: { |
156cdbac | 154 | content: ReadStream |
0305db28 JB |
155 | objectStorageKey: string |
156 | bucketInfo: BucketInfo | |
157 | }) { | |
156cdbac | 158 | const { content, objectStorageKey, bucketInfo } = options |
0305db28 | 159 | |
156cdbac | 160 | const input: PutObjectCommandInput = { |
161 | Body: content, | |
0305db28 | 162 | Bucket: bucketInfo.BUCKET_NAME, |
f9915efa DL |
163 | Key: buildKey(objectStorageKey, bucketInfo) |
164 | } | |
165 | ||
166 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
167 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
168 | } | |
169 | ||
156cdbac | 170 | const parallelUploads3 = new Upload({ |
171 | client: getClient(), | |
172 | queueSize: 4, | |
173 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
23c0b67d | 174 | |
175 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | |
176 | // More detailed explanation: | |
177 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | |
178 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | |
179 | leavePartsOnError: true, | |
156cdbac | 180 | params: input |
0305db28 | 181 | }) |
156cdbac | 182 | |
23c0b67d | 183 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput |
184 | // Check is needed even if the HTTP status code is 200 OK | |
185 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | |
186 | if (!response.Bucket) { | |
187 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | |
188 | logger.error(message, { response, ...lTags() }) | |
189 | throw new Error(message) | |
190 | } | |
0305db28 JB |
191 | |
192 | logger.debug( | |
156cdbac | 193 | 'Completed %s%s in bucket %s', |
194 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
0305db28 JB |
195 | ) |
196 | ||
197 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
198 | } |