]>
Commit | Line | Data |
---|---|---|
1 | import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' | |
2 | import { dirname } from 'path' | |
3 | import { Readable } from 'stream' | |
4 | import { | |
5 | DeleteObjectCommand, | |
6 | GetObjectCommand, | |
7 | ListObjectsV2Command, | |
8 | PutObjectCommand, | |
9 | PutObjectCommandInput | |
10 | } from '@aws-sdk/client-s3' | |
11 | import { Upload } from '@aws-sdk/lib-storage' | |
12 | import { pipelinePromise } from '@server/helpers/core-utils' | |
13 | import { isArray } from '@server/helpers/custom-validators/misc' | |
14 | import { logger } from '@server/helpers/logger' | |
15 | import { CONFIG } from '@server/initializers/config' | |
16 | import { getPrivateUrl } from '../urls' | |
17 | import { getClient } from './client' | |
18 | import { lTags } from './logger' | |
19 | ||
20 | type BucketInfo = { | |
21 | BUCKET_NAME: string | |
22 | PREFIX?: string | |
23 | } | |
24 | ||
25 | async function storeObject (options: { | |
26 | inputPath: string | |
27 | objectStorageKey: string | |
28 | bucketInfo: BucketInfo | |
29 | }): Promise<string> { | |
30 | const { inputPath, objectStorageKey, bucketInfo } = options | |
31 | ||
32 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
33 | ||
34 | const stats = await stat(inputPath) | |
35 | const fileStream = createReadStream(inputPath) | |
36 | ||
37 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | |
38 | return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) | |
39 | } | |
40 | ||
41 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | |
42 | } | |
43 | ||
44 | async function removeObject (filename: string, bucketInfo: BucketInfo) { | |
45 | const command = new DeleteObjectCommand({ | |
46 | Bucket: bucketInfo.BUCKET_NAME, | |
47 | Key: buildKey(filename, bucketInfo) | |
48 | }) | |
49 | ||
50 | return getClient().send(command) | |
51 | } | |
52 | ||
53 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |
54 | const s3Client = getClient() | |
55 | ||
56 | const commandPrefix = bucketInfo.PREFIX + prefix | |
57 | const listCommand = new ListObjectsV2Command({ | |
58 | Bucket: bucketInfo.BUCKET_NAME, | |
59 | Prefix: commandPrefix | |
60 | }) | |
61 | ||
62 | const listedObjects = await s3Client.send(listCommand) | |
63 | ||
64 | // FIXME: use bulk delete when s3ninja will support this operation | |
65 | // const deleteParams = { | |
66 | // Bucket: bucketInfo.BUCKET_NAME, | |
67 | // Delete: { Objects: [] } | |
68 | // } | |
69 | ||
70 | if (isArray(listedObjects.Contents) !== true) { | |
71 | const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
72 | ||
73 | logger.error(message, { response: listedObjects, ...lTags() }) | |
74 | throw new Error(message) | |
75 | } | |
76 | ||
77 | for (const object of listedObjects.Contents) { | |
78 | const command = new DeleteObjectCommand({ | |
79 | Bucket: bucketInfo.BUCKET_NAME, | |
80 | Key: object.Key | |
81 | }) | |
82 | ||
83 | await s3Client.send(command) | |
84 | ||
85 | // FIXME: use bulk delete when s3ninja will support this operation | |
86 | // deleteParams.Delete.Objects.push({ Key: object.Key }) | |
87 | } | |
88 | ||
89 | // FIXME: use bulk delete when s3ninja will support this operation | |
90 | // const deleteCommand = new DeleteObjectsCommand(deleteParams) | |
91 | // await s3Client.send(deleteCommand) | |
92 | ||
93 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
94 | if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | |
95 | } | |
96 | ||
97 | async function makeAvailable (options: { | |
98 | key: string | |
99 | destination: string | |
100 | bucketInfo: BucketInfo | |
101 | }) { | |
102 | const { key, destination, bucketInfo } = options | |
103 | ||
104 | await ensureDir(dirname(options.destination)) | |
105 | ||
106 | const command = new GetObjectCommand({ | |
107 | Bucket: bucketInfo.BUCKET_NAME, | |
108 | Key: buildKey(key, bucketInfo) | |
109 | }) | |
110 | const response = await getClient().send(command) | |
111 | ||
112 | const file = createWriteStream(destination) | |
113 | await pipelinePromise(response.Body as Readable, file) | |
114 | ||
115 | file.close() | |
116 | } | |
117 | ||
118 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
119 | return bucketInfo.PREFIX + key | |
120 | } | |
121 | ||
122 | // --------------------------------------------------------------------------- | |
123 | ||
124 | export { | |
125 | BucketInfo, | |
126 | buildKey, | |
127 | storeObject, | |
128 | removeObject, | |
129 | removePrefix, | |
130 | makeAvailable | |
131 | } | |
132 | ||
133 | // --------------------------------------------------------------------------- | |
134 | ||
135 | async function objectStoragePut (options: { | |
136 | objectStorageKey: string | |
137 | content: ReadStream | |
138 | bucketInfo: BucketInfo | |
139 | }) { | |
140 | const { objectStorageKey, content, bucketInfo } = options | |
141 | ||
142 | const input: PutObjectCommandInput = { | |
143 | Bucket: bucketInfo.BUCKET_NAME, | |
144 | Key: buildKey(objectStorageKey, bucketInfo), | |
145 | Body: content | |
146 | } | |
147 | ||
148 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
149 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
150 | } | |
151 | ||
152 | const command = new PutObjectCommand(input) | |
153 | ||
154 | await getClient().send(command) | |
155 | ||
156 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
157 | } | |
158 | ||
159 | async function multiPartUpload (options: { | |
160 | content: ReadStream | |
161 | objectStorageKey: string | |
162 | bucketInfo: BucketInfo | |
163 | }) { | |
164 | const { content, objectStorageKey, bucketInfo } = options | |
165 | ||
166 | const input: PutObjectCommandInput = { | |
167 | Body: content, | |
168 | Bucket: bucketInfo.BUCKET_NAME, | |
169 | Key: buildKey(objectStorageKey, bucketInfo) | |
170 | } | |
171 | ||
172 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
173 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
174 | } | |
175 | ||
176 | const parallelUploads3 = new Upload({ | |
177 | client: getClient(), | |
178 | queueSize: 4, | |
179 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
180 | leavePartsOnError: false, | |
181 | params: input | |
182 | }) | |
183 | ||
184 | await parallelUploads3.done() | |
185 | ||
186 | logger.debug( | |
187 | 'Completed %s%s in bucket %s', | |
188 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
189 | ) | |
190 | ||
191 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
192 | } |