]>
Commit | Line | Data |
---|---|---|
156cdbac | 1 | import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' |
0305db28 JB |
2 | import { dirname } from 'path' |
3 | import { Readable } from 'stream' | |
4 | import { | |
0305db28 JB |
5 | DeleteObjectCommand, |
6 | GetObjectCommand, | |
7 | ListObjectsV2Command, | |
8 | PutObjectCommand, | |
156cdbac | 9 | PutObjectCommandInput |
0305db28 | 10 | } from '@aws-sdk/client-s3' |
156cdbac | 11 | import { Upload } from '@aws-sdk/lib-storage' |
0305db28 JB |
12 | import { pipelinePromise } from '@server/helpers/core-utils' |
13 | import { isArray } from '@server/helpers/custom-validators/misc' | |
14 | import { logger } from '@server/helpers/logger' | |
15 | import { CONFIG } from '@server/initializers/config' | |
16 | import { getPrivateUrl } from '../urls' | |
17 | import { getClient } from './client' | |
18 | import { lTags } from './logger' | |
19 | ||
20 | type BucketInfo = { | |
21 | BUCKET_NAME: string | |
22 | PREFIX?: string | |
23 | } | |
24 | ||
25 | async function storeObject (options: { | |
26 | inputPath: string | |
27 | objectStorageKey: string | |
28 | bucketInfo: BucketInfo | |
29 | }): Promise<string> { | |
30 | const { inputPath, objectStorageKey, bucketInfo } = options | |
31 | ||
32 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |
33 | ||
34 | const stats = await stat(inputPath) | |
156cdbac | 35 | const fileStream = createReadStream(inputPath) |
0305db28 | 36 | |
0305db28 | 37 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { |
156cdbac | 38 | return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo }) |
0305db28 JB |
39 | } |
40 | ||
0305db28 JB |
41 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) |
42 | } | |
43 | ||
44 | async function removeObject (filename: string, bucketInfo: BucketInfo) { | |
45 | const command = new DeleteObjectCommand({ | |
46 | Bucket: bucketInfo.BUCKET_NAME, | |
47 | Key: buildKey(filename, bucketInfo) | |
48 | }) | |
49 | ||
50 | return getClient().send(command) | |
51 | } | |
52 | ||
53 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |
54 | const s3Client = getClient() | |
55 | ||
56 | const commandPrefix = bucketInfo.PREFIX + prefix | |
57 | const listCommand = new ListObjectsV2Command({ | |
58 | Bucket: bucketInfo.BUCKET_NAME, | |
59 | Prefix: commandPrefix | |
60 | }) | |
61 | ||
62 | const listedObjects = await s3Client.send(listCommand) | |
63 | ||
64 | // FIXME: use bulk delete when s3ninja will support this operation | |
65 | // const deleteParams = { | |
66 | // Bucket: bucketInfo.BUCKET_NAME, | |
67 | // Delete: { Objects: [] } | |
68 | // } | |
69 | ||
70 | if (isArray(listedObjects.Contents) !== true) { | |
71 | const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |
72 | ||
73 | logger.error(message, { response: listedObjects, ...lTags() }) | |
74 | throw new Error(message) | |
75 | } | |
76 | ||
77 | for (const object of listedObjects.Contents) { | |
78 | const command = new DeleteObjectCommand({ | |
79 | Bucket: bucketInfo.BUCKET_NAME, | |
80 | Key: object.Key | |
81 | }) | |
82 | ||
83 | await s3Client.send(command) | |
84 | ||
85 | // FIXME: use bulk delete when s3ninja will support this operation | |
86 | // deleteParams.Delete.Objects.push({ Key: object.Key }) | |
87 | } | |
88 | ||
89 | // FIXME: use bulk delete when s3ninja will support this operation | |
90 | // const deleteCommand = new DeleteObjectsCommand(deleteParams) | |
91 | // await s3Client.send(deleteCommand) | |
92 | ||
93 | // Repeat if not all objects could be listed at once (limit of 1000?) | |
94 | if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | |
95 | } | |
96 | ||
97 | async function makeAvailable (options: { | |
98 | key: string | |
99 | destination: string | |
100 | bucketInfo: BucketInfo | |
101 | }) { | |
102 | const { key, destination, bucketInfo } = options | |
103 | ||
104 | await ensureDir(dirname(options.destination)) | |
105 | ||
106 | const command = new GetObjectCommand({ | |
107 | Bucket: bucketInfo.BUCKET_NAME, | |
108 | Key: buildKey(key, bucketInfo) | |
109 | }) | |
110 | const response = await getClient().send(command) | |
111 | ||
112 | const file = createWriteStream(destination) | |
113 | await pipelinePromise(response.Body as Readable, file) | |
114 | ||
115 | file.close() | |
116 | } | |
117 | ||
118 | function buildKey (key: string, bucketInfo: BucketInfo) { | |
119 | return bucketInfo.PREFIX + key | |
120 | } | |
121 | ||
122 | // --------------------------------------------------------------------------- | |
123 | ||
124 | export { | |
125 | BucketInfo, | |
126 | buildKey, | |
127 | storeObject, | |
128 | removeObject, | |
129 | removePrefix, | |
130 | makeAvailable | |
131 | } | |
132 | ||
133 | // --------------------------------------------------------------------------- | |
134 | ||
135 | async function objectStoragePut (options: { | |
136 | objectStorageKey: string | |
137 | content: ReadStream | |
138 | bucketInfo: BucketInfo | |
139 | }) { | |
140 | const { objectStorageKey, content, bucketInfo } = options | |
141 | ||
f9915efa | 142 | const input: PutObjectCommandInput = { |
0305db28 JB |
143 | Bucket: bucketInfo.BUCKET_NAME, |
144 | Key: buildKey(objectStorageKey, bucketInfo), | |
f9915efa DL |
145 | Body: content |
146 | } | |
147 | ||
148 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
149 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
150 | } | |
151 | ||
152 | const command = new PutObjectCommand(input) | |
0305db28 JB |
153 | |
154 | await getClient().send(command) | |
155 | ||
156 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
157 | } | |
158 | ||
159 | async function multiPartUpload (options: { | |
156cdbac | 160 | content: ReadStream |
0305db28 JB |
161 | objectStorageKey: string |
162 | bucketInfo: BucketInfo | |
163 | }) { | |
156cdbac | 164 | const { content, objectStorageKey, bucketInfo } = options |
0305db28 | 165 | |
156cdbac | 166 | const input: PutObjectCommandInput = { |
167 | Body: content, | |
0305db28 | 168 | Bucket: bucketInfo.BUCKET_NAME, |
f9915efa DL |
169 | Key: buildKey(objectStorageKey, bucketInfo) |
170 | } | |
171 | ||
172 | if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { | |
173 | input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL | |
174 | } | |
175 | ||
156cdbac | 176 | const parallelUploads3 = new Upload({ |
177 | client: getClient(), | |
178 | queueSize: 4, | |
179 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | |
180 | leavePartsOnError: false, | |
181 | params: input | |
0305db28 | 182 | }) |
156cdbac | 183 | |
184 | await parallelUploads3.done() | |
0305db28 JB |
185 | |
186 | logger.debug( | |
156cdbac | 187 | 'Completed %s%s in bucket %s', |
188 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |
0305db28 JB |
189 | ) |
190 | ||
191 | return getPrivateUrl(bucketInfo, objectStorageKey) | |
192 | } |