aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/object-storage/shared
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-07-31 14:34:36 +0200
committerChocobozzz <me@florianbigard.com>2023-08-11 15:02:33 +0200
commit3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch)
treee4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/object-storage/shared
parent04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff)
downloadPeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge conflicts, but it's a major step forward: * Server can be faster at startup because imports() are async and we can easily lazy import big modules * Angular doesn't seem to support ES import (with .js extension), so we had to correctly organize peertube into a monorepo: * Use yarn workspace feature * Use typescript reference projects for dependencies * Shared projects have been moved into "packages", each one is now a node module (with a dedicated package.json/tsconfig.json) * server/tools have been moved into apps/ and is now a dedicated app bundled and published on NPM so users don't have to build peertube cli tools manually * server/tests have been moved into packages/ so we don't compile them every time we want to run the server * Use isolatedModule option: * Had to move from const enum to const (https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums) * Had to explictely specify "type" imports when used in decorators * Prefer tsx (that uses esbuild under the hood) instead of ts-node to load typescript files (tests with mocha or scripts): * To reduce test complexity as esbuild doesn't support decorator metadata, we only test server files that do not import server models * We still build tests files into js files for a faster CI * Remove unmaintained peertube CLI import script * Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/object-storage/shared')
-rw-r--r--server/lib/object-storage/shared/client.ts71
-rw-r--r--server/lib/object-storage/shared/index.ts3
-rw-r--r--server/lib/object-storage/shared/logger.ts7
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts328
4 files changed, 0 insertions, 409 deletions
diff --git a/server/lib/object-storage/shared/client.ts b/server/lib/object-storage/shared/client.ts
deleted file mode 100644
index d5cb074df..000000000
--- a/server/lib/object-storage/shared/client.ts
+++ /dev/null
@@ -1,71 +0,0 @@
1import { S3Client } from '@aws-sdk/client-s3'
2import { NodeHttpHandler } from '@aws-sdk/node-http-handler'
3import { logger } from '@server/helpers/logger'
4import { isProxyEnabled } from '@server/helpers/proxy'
5import { getAgent } from '@server/helpers/requests'
6import { CONFIG } from '@server/initializers/config'
7import { lTags } from './logger'
8
9function getProxyRequestHandler () {
10 if (!isProxyEnabled()) return null
11
12 const { agent } = getAgent()
13
14 return new NodeHttpHandler({
15 httpAgent: agent.http,
16 httpsAgent: agent.https
17 })
18}
19
20let endpointParsed: URL
21function getEndpointParsed () {
22 if (endpointParsed) return endpointParsed
23
24 endpointParsed = new URL(getEndpoint())
25
26 return endpointParsed
27}
28
29let s3Client: S3Client
30function getClient () {
31 if (s3Client) return s3Client
32
33 const OBJECT_STORAGE = CONFIG.OBJECT_STORAGE
34
35 s3Client = new S3Client({
36 endpoint: getEndpoint(),
37 region: OBJECT_STORAGE.REGION,
38 credentials: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID
39 ? {
40 accessKeyId: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID,
41 secretAccessKey: OBJECT_STORAGE.CREDENTIALS.SECRET_ACCESS_KEY
42 }
43 : undefined,
44 requestHandler: getProxyRequestHandler()
45 })
46
47 logger.info('Initialized S3 client %s with region %s.', getEndpoint(), OBJECT_STORAGE.REGION, lTags())
48
49 return s3Client
50}
51
52// ---------------------------------------------------------------------------
53
54export {
55 getEndpointParsed,
56 getClient
57}
58
59// ---------------------------------------------------------------------------
60
61let endpoint: string
62function getEndpoint () {
63 if (endpoint) return endpoint
64
65 const endpointConfig = CONFIG.OBJECT_STORAGE.ENDPOINT
66 endpoint = endpointConfig.startsWith('http://') || endpointConfig.startsWith('https://')
67 ? CONFIG.OBJECT_STORAGE.ENDPOINT
68 : 'https://' + CONFIG.OBJECT_STORAGE.ENDPOINT
69
70 return endpoint
71}
diff --git a/server/lib/object-storage/shared/index.ts b/server/lib/object-storage/shared/index.ts
deleted file mode 100644
index 11e10aa9f..000000000
--- a/server/lib/object-storage/shared/index.ts
+++ /dev/null
@@ -1,3 +0,0 @@
1export * from './client'
2export * from './logger'
3export * from './object-storage-helpers'
diff --git a/server/lib/object-storage/shared/logger.ts b/server/lib/object-storage/shared/logger.ts
deleted file mode 100644
index 8ab7cbd71..000000000
--- a/server/lib/object-storage/shared/logger.ts
+++ /dev/null
@@ -1,7 +0,0 @@
1import { loggerTagsFactory } from '@server/helpers/logger'
2
3const lTags = loggerTagsFactory('object-storage')
4
5export {
6 lTags
7}
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
deleted file mode 100644
index 0d8878bd2..000000000
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ /dev/null
@@ -1,328 +0,0 @@
1import { map } from 'bluebird'
2import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
6 _Object,
7 CompleteMultipartUploadCommandOutput,
8 DeleteObjectCommand,
9 GetObjectCommand,
10 ListObjectsV2Command,
11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
14} from '@aws-sdk/client-s3'
15import { Upload } from '@aws-sdk/lib-storage'
16import { pipelinePromise } from '@server/helpers/core-utils'
17import { isArray } from '@server/helpers/custom-validators/misc'
18import { logger } from '@server/helpers/logger'
19import { CONFIG } from '@server/initializers/config'
20import { getInternalUrl } from '../urls'
21import { getClient } from './client'
22import { lTags } from './logger'
23
24type BucketInfo = {
25 BUCKET_NAME: string
26 PREFIX?: string
27}
28
29async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43}
44
45// ---------------------------------------------------------------------------
46
47async function storeObject (options: {
48 inputPath: string
49 objectStorageKey: string
50 bucketInfo: BucketInfo
51 isPrivate: boolean
52}): Promise<string> {
53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
54
55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
56
57 const fileStream = createReadStream(inputPath)
58
59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
60}
61
62async function storeContent (options: {
63 content: string
64 inputPath: string
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}): Promise<string> {
69 const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
70
71 logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
72
73 return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
74}
75
76// ---------------------------------------------------------------------------
77
78async function updateObjectACL (options: {
79 objectStorageKey: string
80 bucketInfo: BucketInfo
81 isPrivate: boolean
82}) {
83 const { objectStorageKey, bucketInfo, isPrivate } = options
84
85 const acl = getACL(isPrivate)
86 if (!acl) return
87
88 const key = buildKey(objectStorageKey, bucketInfo)
89
90 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
91
92 const command = new PutObjectAclCommand({
93 Bucket: bucketInfo.BUCKET_NAME,
94 Key: key,
95 ACL: acl
96 })
97
98 await getClient().send(command)
99}
100
101function updatePrefixACL (options: {
102 prefix: string
103 bucketInfo: BucketInfo
104 isPrivate: boolean
105}) {
106 const { prefix, bucketInfo, isPrivate } = options
107
108 const acl = getACL(isPrivate)
109 if (!acl) return
110
111 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
112
113 return applyOnPrefix({
114 prefix,
115 bucketInfo,
116 commandBuilder: obj => {
117 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
118
119 return new PutObjectAclCommand({
120 Bucket: bucketInfo.BUCKET_NAME,
121 Key: obj.Key,
122 ACL: acl
123 })
124 }
125 })
126}
127
128// ---------------------------------------------------------------------------
129
130function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
131 const key = buildKey(objectStorageKey, bucketInfo)
132
133 return removeObjectByFullKey(key, bucketInfo)
134}
135
136function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
137 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
138
139 const command = new DeleteObjectCommand({
140 Bucket: bucketInfo.BUCKET_NAME,
141 Key: fullKey
142 })
143
144 return getClient().send(command)
145}
146
147async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
148 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
149
150 return applyOnPrefix({
151 prefix,
152 bucketInfo,
153 commandBuilder: obj => {
154 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
155
156 return new DeleteObjectCommand({
157 Bucket: bucketInfo.BUCKET_NAME,
158 Key: obj.Key
159 })
160 }
161 })
162}
163
164// ---------------------------------------------------------------------------
165
166async function makeAvailable (options: {
167 key: string
168 destination: string
169 bucketInfo: BucketInfo
170}) {
171 const { key, destination, bucketInfo } = options
172
173 await ensureDir(dirname(options.destination))
174
175 const command = new GetObjectCommand({
176 Bucket: bucketInfo.BUCKET_NAME,
177 Key: buildKey(key, bucketInfo)
178 })
179 const response = await getClient().send(command)
180
181 const file = createWriteStream(destination)
182 await pipelinePromise(response.Body as Readable, file)
183
184 file.close()
185}
186
187function buildKey (key: string, bucketInfo: BucketInfo) {
188 return bucketInfo.PREFIX + key
189}
190
191// ---------------------------------------------------------------------------
192
193async function createObjectReadStream (options: {
194 key: string
195 bucketInfo: BucketInfo
196 rangeHeader: string
197}) {
198 const { key, bucketInfo, rangeHeader } = options
199
200 const command = new GetObjectCommand({
201 Bucket: bucketInfo.BUCKET_NAME,
202 Key: buildKey(key, bucketInfo),
203 Range: rangeHeader
204 })
205
206 const response = await getClient().send(command)
207
208 return {
209 response,
210 stream: response.Body as Readable
211 }
212}
213
214// ---------------------------------------------------------------------------
215
216export {
217 BucketInfo,
218 buildKey,
219
220 storeObject,
221 storeContent,
222
223 removeObject,
224 removeObjectByFullKey,
225 removePrefix,
226
227 makeAvailable,
228
229 updateObjectACL,
230 updatePrefixACL,
231
232 listKeysOfPrefix,
233 createObjectReadStream
234}
235
236// ---------------------------------------------------------------------------
237
238async function uploadToStorage (options: {
239 content: ReadStream | string
240 objectStorageKey: string
241 bucketInfo: BucketInfo
242 isPrivate: boolean
243}) {
244 const { content, objectStorageKey, bucketInfo, isPrivate } = options
245
246 const input: PutObjectCommandInput = {
247 Body: content,
248 Bucket: bucketInfo.BUCKET_NAME,
249 Key: buildKey(objectStorageKey, bucketInfo)
250 }
251
252 const acl = getACL(isPrivate)
253 if (acl) input.ACL = acl
254
255 const parallelUploads3 = new Upload({
256 client: getClient(),
257 queueSize: 4,
258 partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
259
260 // `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
261 // More detailed explanation:
262 // https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
263 // https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
264 leavePartsOnError: true,
265 params: input
266 })
267
268 const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
269 // Check is needed even if the HTTP status code is 200 OK
270 // For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
271 if (!response.Bucket) {
272 const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
273 logger.error(message, { response, ...lTags() })
274 throw new Error(message)
275 }
276
277 logger.debug(
278 'Completed %s%s in bucket %s',
279 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, { ...lTags(), reseponseMetadata: response.$metadata }
280 )
281
282 return getInternalUrl(bucketInfo, objectStorageKey)
283}
284
285async function applyOnPrefix (options: {
286 prefix: string
287 bucketInfo: BucketInfo
288 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
289
290 continuationToken?: string
291}) {
292 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
293
294 const s3Client = getClient()
295
296 const commandPrefix = buildKey(prefix, bucketInfo)
297 const listCommand = new ListObjectsV2Command({
298 Bucket: bucketInfo.BUCKET_NAME,
299 Prefix: commandPrefix,
300 ContinuationToken: continuationToken
301 })
302
303 const listedObjects = await s3Client.send(listCommand)
304
305 if (isArray(listedObjects.Contents) !== true) {
306 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
307
308 logger.error(message, { response: listedObjects, ...lTags() })
309 throw new Error(message)
310 }
311
312 await map(listedObjects.Contents, object => {
313 const command = commandBuilder(object)
314
315 return s3Client.send(command)
316 }, { concurrency: 10 })
317
318 // Repeat if not all objects could be listed at once (limit of 1000?)
319 if (listedObjects.IsTruncated) {
320 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
321 }
322}
323
324function getACL (isPrivate: boolean) {
325 return isPrivate
326 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
327 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
328}