diff options
Diffstat (limited to 'server/lib/object-storage/shared')
-rw-r--r-- | server/lib/object-storage/shared/client.ts | 56 | ||||
-rw-r--r-- | server/lib/object-storage/shared/index.ts | 3 | ||||
-rw-r--r-- | server/lib/object-storage/shared/logger.ts | 7 | ||||
-rw-r--r-- | server/lib/object-storage/shared/object-storage-helpers.ts | 229 |
4 files changed, 295 insertions, 0 deletions
diff --git a/server/lib/object-storage/shared/client.ts b/server/lib/object-storage/shared/client.ts new file mode 100644 index 000000000..c9a614593 --- /dev/null +++ b/server/lib/object-storage/shared/client.ts | |||
@@ -0,0 +1,56 @@ | |||
1 | import { S3Client } from '@aws-sdk/client-s3' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { CONFIG } from '@server/initializers/config' | ||
4 | import { lTags } from './logger' | ||
5 | |||
6 | let endpointParsed: URL | ||
7 | function getEndpointParsed () { | ||
8 | if (endpointParsed) return endpointParsed | ||
9 | |||
10 | endpointParsed = new URL(getEndpoint()) | ||
11 | |||
12 | return endpointParsed | ||
13 | } | ||
14 | |||
15 | let s3Client: S3Client | ||
16 | function getClient () { | ||
17 | if (s3Client) return s3Client | ||
18 | |||
19 | const OBJECT_STORAGE = CONFIG.OBJECT_STORAGE | ||
20 | |||
21 | s3Client = new S3Client({ | ||
22 | endpoint: getEndpoint(), | ||
23 | region: OBJECT_STORAGE.REGION, | ||
24 | credentials: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID | ||
25 | ? { | ||
26 | accessKeyId: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID, | ||
27 | secretAccessKey: OBJECT_STORAGE.CREDENTIALS.SECRET_ACCESS_KEY | ||
28 | } | ||
29 | : undefined | ||
30 | }) | ||
31 | |||
32 | logger.info('Initialized S3 client %s with region %s.', getEndpoint(), OBJECT_STORAGE.REGION, lTags()) | ||
33 | |||
34 | return s3Client | ||
35 | } | ||
36 | |||
37 | // --------------------------------------------------------------------------- | ||
38 | |||
39 | export { | ||
40 | getEndpointParsed, | ||
41 | getClient | ||
42 | } | ||
43 | |||
44 | // --------------------------------------------------------------------------- | ||
45 | |||
46 | let endpoint: string | ||
47 | function getEndpoint () { | ||
48 | if (endpoint) return endpoint | ||
49 | |||
50 | const endpointConfig = CONFIG.OBJECT_STORAGE.ENDPOINT | ||
51 | endpoint = endpointConfig.startsWith('http://') || endpointConfig.startsWith('https://') | ||
52 | ? CONFIG.OBJECT_STORAGE.ENDPOINT | ||
53 | : 'https://' + CONFIG.OBJECT_STORAGE.ENDPOINT | ||
54 | |||
55 | return endpoint | ||
56 | } | ||
diff --git a/server/lib/object-storage/shared/index.ts b/server/lib/object-storage/shared/index.ts new file mode 100644 index 000000000..11e10aa9f --- /dev/null +++ b/server/lib/object-storage/shared/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './client' | ||
2 | export * from './logger' | ||
3 | export * from './object-storage-helpers' | ||
diff --git a/server/lib/object-storage/shared/logger.ts b/server/lib/object-storage/shared/logger.ts new file mode 100644 index 000000000..8ab7cbd71 --- /dev/null +++ b/server/lib/object-storage/shared/logger.ts | |||
@@ -0,0 +1,7 @@ | |||
1 | import { loggerTagsFactory } from '@server/helpers/logger' | ||
2 | |||
3 | const lTags = loggerTagsFactory('object-storage') | ||
4 | |||
5 | export { | ||
6 | lTags | ||
7 | } | ||
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts new file mode 100644 index 000000000..e23216907 --- /dev/null +++ b/server/lib/object-storage/shared/object-storage-helpers.ts | |||
@@ -0,0 +1,229 @@ | |||
1 | import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' | ||
2 | import { min } from 'lodash' | ||
3 | import { dirname } from 'path' | ||
4 | import { Readable } from 'stream' | ||
5 | import { | ||
6 | CompletedPart, | ||
7 | CompleteMultipartUploadCommand, | ||
8 | CreateMultipartUploadCommand, | ||
9 | DeleteObjectCommand, | ||
10 | GetObjectCommand, | ||
11 | ListObjectsV2Command, | ||
12 | PutObjectCommand, | ||
13 | UploadPartCommand | ||
14 | } from '@aws-sdk/client-s3' | ||
15 | import { pipelinePromise } from '@server/helpers/core-utils' | ||
16 | import { isArray } from '@server/helpers/custom-validators/misc' | ||
17 | import { logger } from '@server/helpers/logger' | ||
18 | import { CONFIG } from '@server/initializers/config' | ||
19 | import { getPrivateUrl } from '../urls' | ||
20 | import { getClient } from './client' | ||
21 | import { lTags } from './logger' | ||
22 | |||
23 | type BucketInfo = { | ||
24 | BUCKET_NAME: string | ||
25 | PREFIX?: string | ||
26 | } | ||
27 | |||
28 | async function storeObject (options: { | ||
29 | inputPath: string | ||
30 | objectStorageKey: string | ||
31 | bucketInfo: BucketInfo | ||
32 | }): Promise<string> { | ||
33 | const { inputPath, objectStorageKey, bucketInfo } = options | ||
34 | |||
35 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | ||
36 | |||
37 | const stats = await stat(inputPath) | ||
38 | |||
39 | // If bigger than max allowed size we do a multipart upload | ||
40 | if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | ||
41 | return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) | ||
42 | } | ||
43 | |||
44 | const fileStream = createReadStream(inputPath) | ||
45 | return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | ||
46 | } | ||
47 | |||
48 | async function removeObject (filename: string, bucketInfo: BucketInfo) { | ||
49 | const command = new DeleteObjectCommand({ | ||
50 | Bucket: bucketInfo.BUCKET_NAME, | ||
51 | Key: buildKey(filename, bucketInfo) | ||
52 | }) | ||
53 | |||
54 | return getClient().send(command) | ||
55 | } | ||
56 | |||
57 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | ||
58 | const s3Client = getClient() | ||
59 | |||
60 | const commandPrefix = bucketInfo.PREFIX + prefix | ||
61 | const listCommand = new ListObjectsV2Command({ | ||
62 | Bucket: bucketInfo.BUCKET_NAME, | ||
63 | Prefix: commandPrefix | ||
64 | }) | ||
65 | |||
66 | const listedObjects = await s3Client.send(listCommand) | ||
67 | |||
68 | // FIXME: use bulk delete when s3ninja will support this operation | ||
69 | // const deleteParams = { | ||
70 | // Bucket: bucketInfo.BUCKET_NAME, | ||
71 | // Delete: { Objects: [] } | ||
72 | // } | ||
73 | |||
74 | if (isArray(listedObjects.Contents) !== true) { | ||
75 | const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | ||
76 | |||
77 | logger.error(message, { response: listedObjects, ...lTags() }) | ||
78 | throw new Error(message) | ||
79 | } | ||
80 | |||
81 | for (const object of listedObjects.Contents) { | ||
82 | const command = new DeleteObjectCommand({ | ||
83 | Bucket: bucketInfo.BUCKET_NAME, | ||
84 | Key: object.Key | ||
85 | }) | ||
86 | |||
87 | await s3Client.send(command) | ||
88 | |||
89 | // FIXME: use bulk delete when s3ninja will support this operation | ||
90 | // deleteParams.Delete.Objects.push({ Key: object.Key }) | ||
91 | } | ||
92 | |||
93 | // FIXME: use bulk delete when s3ninja will support this operation | ||
94 | // const deleteCommand = new DeleteObjectsCommand(deleteParams) | ||
95 | // await s3Client.send(deleteCommand) | ||
96 | |||
97 | // Repeat if not all objects could be listed at once (limit of 1000?) | ||
98 | if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | ||
99 | } | ||
100 | |||
101 | async function makeAvailable (options: { | ||
102 | key: string | ||
103 | destination: string | ||
104 | bucketInfo: BucketInfo | ||
105 | }) { | ||
106 | const { key, destination, bucketInfo } = options | ||
107 | |||
108 | await ensureDir(dirname(options.destination)) | ||
109 | |||
110 | const command = new GetObjectCommand({ | ||
111 | Bucket: bucketInfo.BUCKET_NAME, | ||
112 | Key: buildKey(key, bucketInfo) | ||
113 | }) | ||
114 | const response = await getClient().send(command) | ||
115 | |||
116 | const file = createWriteStream(destination) | ||
117 | await pipelinePromise(response.Body as Readable, file) | ||
118 | |||
119 | file.close() | ||
120 | } | ||
121 | |||
122 | function buildKey (key: string, bucketInfo: BucketInfo) { | ||
123 | return bucketInfo.PREFIX + key | ||
124 | } | ||
125 | |||
126 | // --------------------------------------------------------------------------- | ||
127 | |||
128 | export { | ||
129 | BucketInfo, | ||
130 | buildKey, | ||
131 | storeObject, | ||
132 | removeObject, | ||
133 | removePrefix, | ||
134 | makeAvailable | ||
135 | } | ||
136 | |||
137 | // --------------------------------------------------------------------------- | ||
138 | |||
139 | async function objectStoragePut (options: { | ||
140 | objectStorageKey: string | ||
141 | content: ReadStream | ||
142 | bucketInfo: BucketInfo | ||
143 | }) { | ||
144 | const { objectStorageKey, content, bucketInfo } = options | ||
145 | |||
146 | const command = new PutObjectCommand({ | ||
147 | Bucket: bucketInfo.BUCKET_NAME, | ||
148 | Key: buildKey(objectStorageKey, bucketInfo), | ||
149 | Body: content | ||
150 | }) | ||
151 | |||
152 | await getClient().send(command) | ||
153 | |||
154 | return getPrivateUrl(bucketInfo, objectStorageKey) | ||
155 | } | ||
156 | |||
157 | async function multiPartUpload (options: { | ||
158 | inputPath: string | ||
159 | objectStorageKey: string | ||
160 | bucketInfo: BucketInfo | ||
161 | }) { | ||
162 | const { objectStorageKey, inputPath, bucketInfo } = options | ||
163 | |||
164 | const key = buildKey(objectStorageKey, bucketInfo) | ||
165 | const s3Client = getClient() | ||
166 | |||
167 | const statResult = await stat(inputPath) | ||
168 | |||
169 | const createMultipartCommand = new CreateMultipartUploadCommand({ | ||
170 | Bucket: bucketInfo.BUCKET_NAME, | ||
171 | Key: key | ||
172 | }) | ||
173 | const createResponse = await s3Client.send(createMultipartCommand) | ||
174 | |||
175 | const fd = await open(inputPath, 'r') | ||
176 | let partNumber = 1 | ||
177 | const parts: CompletedPart[] = [] | ||
178 | const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART | ||
179 | |||
180 | for (let start = 0; start < statResult.size; start += partSize) { | ||
181 | logger.debug( | ||
182 | 'Uploading part %d of file to %s%s in bucket %s', | ||
183 | partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | ||
184 | ) | ||
185 | |||
186 | // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released | ||
187 | // The s3 sdk needs to know the length of the http body beforehand, but doesn't support | ||
188 | // streams with start and end set, so it just tries to stat the file in stream.path. | ||
189 | // This fails for us because we only want to send part of the file. The stream type | ||
190 | // is modified so we can set the byteLength here, which s3 detects because array buffers | ||
191 | // have this field set | ||
192 | const stream: ReadStream & { byteLength: number } = | ||
193 | createReadStream( | ||
194 | inputPath, | ||
195 | { fd, autoClose: false, start, end: (start + partSize) - 1 } | ||
196 | ) as ReadStream & { byteLength: number } | ||
197 | |||
198 | // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength | ||
199 | stream.byteLength = min([ statResult.size - start, partSize ]) | ||
200 | |||
201 | const uploadPartCommand = new UploadPartCommand({ | ||
202 | Bucket: bucketInfo.BUCKET_NAME, | ||
203 | Key: key, | ||
204 | UploadId: createResponse.UploadId, | ||
205 | PartNumber: partNumber, | ||
206 | Body: stream | ||
207 | }) | ||
208 | const uploadResponse = await s3Client.send(uploadPartCommand) | ||
209 | |||
210 | parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) | ||
211 | partNumber += 1 | ||
212 | } | ||
213 | await close(fd) | ||
214 | |||
215 | const completeUploadCommand = new CompleteMultipartUploadCommand({ | ||
216 | Bucket: bucketInfo.BUCKET_NAME, | ||
217 | Key: objectStorageKey, | ||
218 | UploadId: createResponse.UploadId, | ||
219 | MultipartUpload: { Parts: parts } | ||
220 | }) | ||
221 | await s3Client.send(completeUploadCommand) | ||
222 | |||
223 | logger.debug( | ||
224 | 'Completed %s%s in bucket %s in %d parts', | ||
225 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() | ||
226 | ) | ||
227 | |||
228 | return getPrivateUrl(bucketInfo, objectStorageKey) | ||
229 | } | ||