diff options
author | Chocobozzz <me@florianbigard.com> | 2023-07-31 14:34:36 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-08-11 15:02:33 +0200 |
commit | 3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch) | |
tree | e4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/object-storage/shared | |
parent | 04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff) | |
download | PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip |
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge
conflicts, but it's a major step forward:
* Server can be faster at startup because imports() are async and we can
easily lazy import big modules
* Angular doesn't seem to support ES import (with .js extension), so we
had to correctly organize peertube into a monorepo:
* Use yarn workspace feature
* Use typescript reference projects for dependencies
* Shared projects have been moved into "packages", each one is now a
node module (with a dedicated package.json/tsconfig.json)
* server/tools have been moved into apps/ and is now a dedicated app
bundled and published on NPM so users don't have to build peertube
cli tools manually
* server/tests have been moved into packages/ so we don't compile
them every time we want to run the server
* Use isolatedModule option:
* Had to move from const enum to const
(https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums)
* Had to explictely specify "type" imports when used in decorators
* Prefer tsx (that uses esbuild under the hood) instead of ts-node to
load typescript files (tests with mocha or scripts):
* To reduce test complexity as esbuild doesn't support decorator
metadata, we only test server files that do not import server
models
* We still build tests files into js files for a faster CI
* Remove unmaintained peertube CLI import script
* Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/object-storage/shared')
-rw-r--r-- | server/lib/object-storage/shared/client.ts | 71 | ||||
-rw-r--r-- | server/lib/object-storage/shared/index.ts | 3 | ||||
-rw-r--r-- | server/lib/object-storage/shared/logger.ts | 7 | ||||
-rw-r--r-- | server/lib/object-storage/shared/object-storage-helpers.ts | 328 |
4 files changed, 0 insertions, 409 deletions
diff --git a/server/lib/object-storage/shared/client.ts b/server/lib/object-storage/shared/client.ts deleted file mode 100644 index d5cb074df..000000000 --- a/server/lib/object-storage/shared/client.ts +++ /dev/null | |||
@@ -1,71 +0,0 @@ | |||
1 | import { S3Client } from '@aws-sdk/client-s3' | ||
2 | import { NodeHttpHandler } from '@aws-sdk/node-http-handler' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { isProxyEnabled } from '@server/helpers/proxy' | ||
5 | import { getAgent } from '@server/helpers/requests' | ||
6 | import { CONFIG } from '@server/initializers/config' | ||
7 | import { lTags } from './logger' | ||
8 | |||
9 | function getProxyRequestHandler () { | ||
10 | if (!isProxyEnabled()) return null | ||
11 | |||
12 | const { agent } = getAgent() | ||
13 | |||
14 | return new NodeHttpHandler({ | ||
15 | httpAgent: agent.http, | ||
16 | httpsAgent: agent.https | ||
17 | }) | ||
18 | } | ||
19 | |||
20 | let endpointParsed: URL | ||
21 | function getEndpointParsed () { | ||
22 | if (endpointParsed) return endpointParsed | ||
23 | |||
24 | endpointParsed = new URL(getEndpoint()) | ||
25 | |||
26 | return endpointParsed | ||
27 | } | ||
28 | |||
29 | let s3Client: S3Client | ||
30 | function getClient () { | ||
31 | if (s3Client) return s3Client | ||
32 | |||
33 | const OBJECT_STORAGE = CONFIG.OBJECT_STORAGE | ||
34 | |||
35 | s3Client = new S3Client({ | ||
36 | endpoint: getEndpoint(), | ||
37 | region: OBJECT_STORAGE.REGION, | ||
38 | credentials: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID | ||
39 | ? { | ||
40 | accessKeyId: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID, | ||
41 | secretAccessKey: OBJECT_STORAGE.CREDENTIALS.SECRET_ACCESS_KEY | ||
42 | } | ||
43 | : undefined, | ||
44 | requestHandler: getProxyRequestHandler() | ||
45 | }) | ||
46 | |||
47 | logger.info('Initialized S3 client %s with region %s.', getEndpoint(), OBJECT_STORAGE.REGION, lTags()) | ||
48 | |||
49 | return s3Client | ||
50 | } | ||
51 | |||
52 | // --------------------------------------------------------------------------- | ||
53 | |||
54 | export { | ||
55 | getEndpointParsed, | ||
56 | getClient | ||
57 | } | ||
58 | |||
59 | // --------------------------------------------------------------------------- | ||
60 | |||
61 | let endpoint: string | ||
62 | function getEndpoint () { | ||
63 | if (endpoint) return endpoint | ||
64 | |||
65 | const endpointConfig = CONFIG.OBJECT_STORAGE.ENDPOINT | ||
66 | endpoint = endpointConfig.startsWith('http://') || endpointConfig.startsWith('https://') | ||
67 | ? CONFIG.OBJECT_STORAGE.ENDPOINT | ||
68 | : 'https://' + CONFIG.OBJECT_STORAGE.ENDPOINT | ||
69 | |||
70 | return endpoint | ||
71 | } | ||
diff --git a/server/lib/object-storage/shared/index.ts b/server/lib/object-storage/shared/index.ts deleted file mode 100644 index 11e10aa9f..000000000 --- a/server/lib/object-storage/shared/index.ts +++ /dev/null | |||
@@ -1,3 +0,0 @@ | |||
1 | export * from './client' | ||
2 | export * from './logger' | ||
3 | export * from './object-storage-helpers' | ||
diff --git a/server/lib/object-storage/shared/logger.ts b/server/lib/object-storage/shared/logger.ts deleted file mode 100644 index 8ab7cbd71..000000000 --- a/server/lib/object-storage/shared/logger.ts +++ /dev/null | |||
@@ -1,7 +0,0 @@ | |||
1 | import { loggerTagsFactory } from '@server/helpers/logger' | ||
2 | |||
3 | const lTags = loggerTagsFactory('object-storage') | ||
4 | |||
5 | export { | ||
6 | lTags | ||
7 | } | ||
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts deleted file mode 100644 index 0d8878bd2..000000000 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ /dev/null | |||
@@ -1,328 +0,0 @@ | |||
1 | import { map } from 'bluebird' | ||
2 | import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' | ||
3 | import { dirname } from 'path' | ||
4 | import { Readable } from 'stream' | ||
5 | import { | ||
6 | _Object, | ||
7 | CompleteMultipartUploadCommandOutput, | ||
8 | DeleteObjectCommand, | ||
9 | GetObjectCommand, | ||
10 | ListObjectsV2Command, | ||
11 | PutObjectAclCommand, | ||
12 | PutObjectCommandInput, | ||
13 | S3Client | ||
14 | } from '@aws-sdk/client-s3' | ||
15 | import { Upload } from '@aws-sdk/lib-storage' | ||
16 | import { pipelinePromise } from '@server/helpers/core-utils' | ||
17 | import { isArray } from '@server/helpers/custom-validators/misc' | ||
18 | import { logger } from '@server/helpers/logger' | ||
19 | import { CONFIG } from '@server/initializers/config' | ||
20 | import { getInternalUrl } from '../urls' | ||
21 | import { getClient } from './client' | ||
22 | import { lTags } from './logger' | ||
23 | |||
24 | type BucketInfo = { | ||
25 | BUCKET_NAME: string | ||
26 | PREFIX?: string | ||
27 | } | ||
28 | |||
29 | async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { | ||
30 | const s3Client = getClient() | ||
31 | |||
32 | const commandPrefix = bucketInfo.PREFIX + prefix | ||
33 | const listCommand = new ListObjectsV2Command({ | ||
34 | Bucket: bucketInfo.BUCKET_NAME, | ||
35 | Prefix: commandPrefix | ||
36 | }) | ||
37 | |||
38 | const listedObjects = await s3Client.send(listCommand) | ||
39 | |||
40 | if (isArray(listedObjects.Contents) !== true) return [] | ||
41 | |||
42 | return listedObjects.Contents.map(c => c.Key) | ||
43 | } | ||
44 | |||
45 | // --------------------------------------------------------------------------- | ||
46 | |||
47 | async function storeObject (options: { | ||
48 | inputPath: string | ||
49 | objectStorageKey: string | ||
50 | bucketInfo: BucketInfo | ||
51 | isPrivate: boolean | ||
52 | }): Promise<string> { | ||
53 | const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options | ||
54 | |||
55 | logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | ||
56 | |||
57 | const fileStream = createReadStream(inputPath) | ||
58 | |||
59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) | ||
60 | } | ||
61 | |||
62 | async function storeContent (options: { | ||
63 | content: string | ||
64 | inputPath: string | ||
65 | objectStorageKey: string | ||
66 | bucketInfo: BucketInfo | ||
67 | isPrivate: boolean | ||
68 | }): Promise<string> { | ||
69 | const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options | ||
70 | |||
71 | logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | ||
72 | |||
73 | return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate }) | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async function updateObjectACL (options: { | ||
79 | objectStorageKey: string | ||
80 | bucketInfo: BucketInfo | ||
81 | isPrivate: boolean | ||
82 | }) { | ||
83 | const { objectStorageKey, bucketInfo, isPrivate } = options | ||
84 | |||
85 | const acl = getACL(isPrivate) | ||
86 | if (!acl) return | ||
87 | |||
88 | const key = buildKey(objectStorageKey, bucketInfo) | ||
89 | |||
90 | logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) | ||
91 | |||
92 | const command = new PutObjectAclCommand({ | ||
93 | Bucket: bucketInfo.BUCKET_NAME, | ||
94 | Key: key, | ||
95 | ACL: acl | ||
96 | }) | ||
97 | |||
98 | await getClient().send(command) | ||
99 | } | ||
100 | |||
101 | function updatePrefixACL (options: { | ||
102 | prefix: string | ||
103 | bucketInfo: BucketInfo | ||
104 | isPrivate: boolean | ||
105 | }) { | ||
106 | const { prefix, bucketInfo, isPrivate } = options | ||
107 | |||
108 | const acl = getACL(isPrivate) | ||
109 | if (!acl) return | ||
110 | |||
111 | logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) | ||
112 | |||
113 | return applyOnPrefix({ | ||
114 | prefix, | ||
115 | bucketInfo, | ||
116 | commandBuilder: obj => { | ||
117 | logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) | ||
118 | |||
119 | return new PutObjectAclCommand({ | ||
120 | Bucket: bucketInfo.BUCKET_NAME, | ||
121 | Key: obj.Key, | ||
122 | ACL: acl | ||
123 | }) | ||
124 | } | ||
125 | }) | ||
126 | } | ||
127 | |||
128 | // --------------------------------------------------------------------------- | ||
129 | |||
130 | function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { | ||
131 | const key = buildKey(objectStorageKey, bucketInfo) | ||
132 | |||
133 | return removeObjectByFullKey(key, bucketInfo) | ||
134 | } | ||
135 | |||
136 | function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) { | ||
137 | logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags()) | ||
138 | |||
139 | const command = new DeleteObjectCommand({ | ||
140 | Bucket: bucketInfo.BUCKET_NAME, | ||
141 | Key: fullKey | ||
142 | }) | ||
143 | |||
144 | return getClient().send(command) | ||
145 | } | ||
146 | |||
147 | async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | ||
148 | logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) | ||
149 | |||
150 | return applyOnPrefix({ | ||
151 | prefix, | ||
152 | bucketInfo, | ||
153 | commandBuilder: obj => { | ||
154 | logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags()) | ||
155 | |||
156 | return new DeleteObjectCommand({ | ||
157 | Bucket: bucketInfo.BUCKET_NAME, | ||
158 | Key: obj.Key | ||
159 | }) | ||
160 | } | ||
161 | }) | ||
162 | } | ||
163 | |||
164 | // --------------------------------------------------------------------------- | ||
165 | |||
166 | async function makeAvailable (options: { | ||
167 | key: string | ||
168 | destination: string | ||
169 | bucketInfo: BucketInfo | ||
170 | }) { | ||
171 | const { key, destination, bucketInfo } = options | ||
172 | |||
173 | await ensureDir(dirname(options.destination)) | ||
174 | |||
175 | const command = new GetObjectCommand({ | ||
176 | Bucket: bucketInfo.BUCKET_NAME, | ||
177 | Key: buildKey(key, bucketInfo) | ||
178 | }) | ||
179 | const response = await getClient().send(command) | ||
180 | |||
181 | const file = createWriteStream(destination) | ||
182 | await pipelinePromise(response.Body as Readable, file) | ||
183 | |||
184 | file.close() | ||
185 | } | ||
186 | |||
187 | function buildKey (key: string, bucketInfo: BucketInfo) { | ||
188 | return bucketInfo.PREFIX + key | ||
189 | } | ||
190 | |||
191 | // --------------------------------------------------------------------------- | ||
192 | |||
193 | async function createObjectReadStream (options: { | ||
194 | key: string | ||
195 | bucketInfo: BucketInfo | ||
196 | rangeHeader: string | ||
197 | }) { | ||
198 | const { key, bucketInfo, rangeHeader } = options | ||
199 | |||
200 | const command = new GetObjectCommand({ | ||
201 | Bucket: bucketInfo.BUCKET_NAME, | ||
202 | Key: buildKey(key, bucketInfo), | ||
203 | Range: rangeHeader | ||
204 | }) | ||
205 | |||
206 | const response = await getClient().send(command) | ||
207 | |||
208 | return { | ||
209 | response, | ||
210 | stream: response.Body as Readable | ||
211 | } | ||
212 | } | ||
213 | |||
214 | // --------------------------------------------------------------------------- | ||
215 | |||
216 | export { | ||
217 | BucketInfo, | ||
218 | buildKey, | ||
219 | |||
220 | storeObject, | ||
221 | storeContent, | ||
222 | |||
223 | removeObject, | ||
224 | removeObjectByFullKey, | ||
225 | removePrefix, | ||
226 | |||
227 | makeAvailable, | ||
228 | |||
229 | updateObjectACL, | ||
230 | updatePrefixACL, | ||
231 | |||
232 | listKeysOfPrefix, | ||
233 | createObjectReadStream | ||
234 | } | ||
235 | |||
236 | // --------------------------------------------------------------------------- | ||
237 | |||
238 | async function uploadToStorage (options: { | ||
239 | content: ReadStream | string | ||
240 | objectStorageKey: string | ||
241 | bucketInfo: BucketInfo | ||
242 | isPrivate: boolean | ||
243 | }) { | ||
244 | const { content, objectStorageKey, bucketInfo, isPrivate } = options | ||
245 | |||
246 | const input: PutObjectCommandInput = { | ||
247 | Body: content, | ||
248 | Bucket: bucketInfo.BUCKET_NAME, | ||
249 | Key: buildKey(objectStorageKey, bucketInfo) | ||
250 | } | ||
251 | |||
252 | const acl = getACL(isPrivate) | ||
253 | if (acl) input.ACL = acl | ||
254 | |||
255 | const parallelUploads3 = new Upload({ | ||
256 | client: getClient(), | ||
257 | queueSize: 4, | ||
258 | partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, | ||
259 | |||
260 | // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts | ||
261 | // More detailed explanation: | ||
262 | // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274 | ||
263 | // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928 | ||
264 | leavePartsOnError: true, | ||
265 | params: input | ||
266 | }) | ||
267 | |||
268 | const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput | ||
269 | // Check is needed even if the HTTP status code is 200 OK | ||
270 | // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html | ||
271 | if (!response.Bucket) { | ||
272 | const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}` | ||
273 | logger.error(message, { response, ...lTags() }) | ||
274 | throw new Error(message) | ||
275 | } | ||
276 | |||
277 | logger.debug( | ||
278 | 'Completed %s%s in bucket %s', | ||
279 | bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata } | ||
280 | ) | ||
281 | |||
282 | return getInternalUrl(bucketInfo, objectStorageKey) | ||
283 | } | ||
284 | |||
285 | async function applyOnPrefix (options: { | ||
286 | prefix: string | ||
287 | bucketInfo: BucketInfo | ||
288 | commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0] | ||
289 | |||
290 | continuationToken?: string | ||
291 | }) { | ||
292 | const { prefix, bucketInfo, commandBuilder, continuationToken } = options | ||
293 | |||
294 | const s3Client = getClient() | ||
295 | |||
296 | const commandPrefix = buildKey(prefix, bucketInfo) | ||
297 | const listCommand = new ListObjectsV2Command({ | ||
298 | Bucket: bucketInfo.BUCKET_NAME, | ||
299 | Prefix: commandPrefix, | ||
300 | ContinuationToken: continuationToken | ||
301 | }) | ||
302 | |||
303 | const listedObjects = await s3Client.send(listCommand) | ||
304 | |||
305 | if (isArray(listedObjects.Contents) !== true) { | ||
306 | const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | ||
307 | |||
308 | logger.error(message, { response: listedObjects, ...lTags() }) | ||
309 | throw new Error(message) | ||
310 | } | ||
311 | |||
312 | await map(listedObjects.Contents, object => { | ||
313 | const command = commandBuilder(object) | ||
314 | |||
315 | return s3Client.send(command) | ||
316 | }, { concurrency: 10 }) | ||
317 | |||
318 | // Repeat if not all objects could be listed at once (limit of 1000?) | ||
319 | if (listedObjects.IsTruncated) { | ||
320 | await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) | ||
321 | } | ||
322 | } | ||
323 | |||
324 | function getACL (isPrivate: boolean) { | ||
325 | return isPrivate | ||
326 | ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE | ||
327 | : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC | ||
328 | } | ||