aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/object-storage
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/object-storage')
-rw-r--r--server/lib/object-storage/index.ts3
-rw-r--r--server/lib/object-storage/keys.ts20
-rw-r--r--server/lib/object-storage/shared/client.ts56
-rw-r--r--server/lib/object-storage/shared/index.ts3
-rw-r--r--server/lib/object-storage/shared/logger.ts7
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts229
-rw-r--r--server/lib/object-storage/urls.ts40
-rw-r--r--server/lib/object-storage/videos.ts72
8 files changed, 430 insertions, 0 deletions
diff --git a/server/lib/object-storage/index.ts b/server/lib/object-storage/index.ts
new file mode 100644
index 000000000..8b413a40e
--- /dev/null
+++ b/server/lib/object-storage/index.ts
@@ -0,0 +1,3 @@
1export * from './keys'
2export * from './urls'
3export * from './videos'
diff --git a/server/lib/object-storage/keys.ts b/server/lib/object-storage/keys.ts
new file mode 100644
index 000000000..519474775
--- /dev/null
+++ b/server/lib/object-storage/keys.ts
@@ -0,0 +1,20 @@
1import { join } from 'path'
2import { MStreamingPlaylist, MVideoUUID } from '@server/types/models'
3
4function generateHLSObjectStorageKey (playlist: MStreamingPlaylist, video: MVideoUUID, filename: string) {
5 return join(generateHLSObjectBaseStorageKey(playlist, video), filename)
6}
7
8function generateHLSObjectBaseStorageKey (playlist: MStreamingPlaylist, video: MVideoUUID) {
9 return playlist.getStringType() + '_' + video.uuid
10}
11
12function generateWebTorrentObjectStorageKey (filename: string) {
13 return filename
14}
15
16export {
17 generateHLSObjectStorageKey,
18 generateHLSObjectBaseStorageKey,
19 generateWebTorrentObjectStorageKey
20}
diff --git a/server/lib/object-storage/shared/client.ts b/server/lib/object-storage/shared/client.ts
new file mode 100644
index 000000000..c9a614593
--- /dev/null
+++ b/server/lib/object-storage/shared/client.ts
@@ -0,0 +1,56 @@
1import { S3Client } from '@aws-sdk/client-s3'
2import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config'
4import { lTags } from './logger'
5
6let endpointParsed: URL
7function getEndpointParsed () {
8 if (endpointParsed) return endpointParsed
9
10 endpointParsed = new URL(getEndpoint())
11
12 return endpointParsed
13}
14
15let s3Client: S3Client
16function getClient () {
17 if (s3Client) return s3Client
18
19 const OBJECT_STORAGE = CONFIG.OBJECT_STORAGE
20
21 s3Client = new S3Client({
22 endpoint: getEndpoint(),
23 region: OBJECT_STORAGE.REGION,
24 credentials: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID
25 ? {
26 accessKeyId: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID,
27 secretAccessKey: OBJECT_STORAGE.CREDENTIALS.SECRET_ACCESS_KEY
28 }
29 : undefined
30 })
31
32 logger.info('Initialized S3 client %s with region %s.', getEndpoint(), OBJECT_STORAGE.REGION, lTags())
33
34 return s3Client
35}
36
37// ---------------------------------------------------------------------------
38
39export {
40 getEndpointParsed,
41 getClient
42}
43
44// ---------------------------------------------------------------------------
45
46let endpoint: string
47function getEndpoint () {
48 if (endpoint) return endpoint
49
50 const endpointConfig = CONFIG.OBJECT_STORAGE.ENDPOINT
51 endpoint = endpointConfig.startsWith('http://') || endpointConfig.startsWith('https://')
52 ? CONFIG.OBJECT_STORAGE.ENDPOINT
53 : 'https://' + CONFIG.OBJECT_STORAGE.ENDPOINT
54
55 return endpoint
56}
diff --git a/server/lib/object-storage/shared/index.ts b/server/lib/object-storage/shared/index.ts
new file mode 100644
index 000000000..11e10aa9f
--- /dev/null
+++ b/server/lib/object-storage/shared/index.ts
@@ -0,0 +1,3 @@
1export * from './client'
2export * from './logger'
3export * from './object-storage-helpers'
diff --git a/server/lib/object-storage/shared/logger.ts b/server/lib/object-storage/shared/logger.ts
new file mode 100644
index 000000000..8ab7cbd71
--- /dev/null
+++ b/server/lib/object-storage/shared/logger.ts
@@ -0,0 +1,7 @@
1import { loggerTagsFactory } from '@server/helpers/logger'
2
3const lTags = loggerTagsFactory('object-storage')
4
5export {
6 lTags
7}
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
new file mode 100644
index 000000000..e23216907
--- /dev/null
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -0,0 +1,229 @@
1import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
2import { min } from 'lodash'
3import { dirname } from 'path'
4import { Readable } from 'stream'
5import {
6 CompletedPart,
7 CompleteMultipartUploadCommand,
8 CreateMultipartUploadCommand,
9 DeleteObjectCommand,
10 GetObjectCommand,
11 ListObjectsV2Command,
12 PutObjectCommand,
13 UploadPartCommand
14} from '@aws-sdk/client-s3'
15import { pipelinePromise } from '@server/helpers/core-utils'
16import { isArray } from '@server/helpers/custom-validators/misc'
17import { logger } from '@server/helpers/logger'
18import { CONFIG } from '@server/initializers/config'
19import { getPrivateUrl } from '../urls'
20import { getClient } from './client'
21import { lTags } from './logger'
22
23type BucketInfo = {
24 BUCKET_NAME: string
25 PREFIX?: string
26}
27
28async function storeObject (options: {
29 inputPath: string
30 objectStorageKey: string
31 bucketInfo: BucketInfo
32}): Promise<string> {
33 const { inputPath, objectStorageKey, bucketInfo } = options
34
35 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
36
37 const stats = await stat(inputPath)
38
39 // If bigger than max allowed size we do a multipart upload
40 if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
41 return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
42 }
43
44 const fileStream = createReadStream(inputPath)
45 return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
46}
47
48async function removeObject (filename: string, bucketInfo: BucketInfo) {
49 const command = new DeleteObjectCommand({
50 Bucket: bucketInfo.BUCKET_NAME,
51 Key: buildKey(filename, bucketInfo)
52 })
53
54 return getClient().send(command)
55}
56
57async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
58 const s3Client = getClient()
59
60 const commandPrefix = bucketInfo.PREFIX + prefix
61 const listCommand = new ListObjectsV2Command({
62 Bucket: bucketInfo.BUCKET_NAME,
63 Prefix: commandPrefix
64 })
65
66 const listedObjects = await s3Client.send(listCommand)
67
68 // FIXME: use bulk delete when s3ninja will support this operation
69 // const deleteParams = {
70 // Bucket: bucketInfo.BUCKET_NAME,
71 // Delete: { Objects: [] }
72 // }
73
74 if (isArray(listedObjects.Contents) !== true) {
75 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
76
77 logger.error(message, { response: listedObjects, ...lTags() })
78 throw new Error(message)
79 }
80
81 for (const object of listedObjects.Contents) {
82 const command = new DeleteObjectCommand({
83 Bucket: bucketInfo.BUCKET_NAME,
84 Key: object.Key
85 })
86
87 await s3Client.send(command)
88
89 // FIXME: use bulk delete when s3ninja will support this operation
90 // deleteParams.Delete.Objects.push({ Key: object.Key })
91 }
92
93 // FIXME: use bulk delete when s3ninja will support this operation
94 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
95 // await s3Client.send(deleteCommand)
96
97 // Repeat if not all objects could be listed at once (limit of 1000?)
98 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
99}
100
101async function makeAvailable (options: {
102 key: string
103 destination: string
104 bucketInfo: BucketInfo
105}) {
106 const { key, destination, bucketInfo } = options
107
108 await ensureDir(dirname(options.destination))
109
110 const command = new GetObjectCommand({
111 Bucket: bucketInfo.BUCKET_NAME,
112 Key: buildKey(key, bucketInfo)
113 })
114 const response = await getClient().send(command)
115
116 const file = createWriteStream(destination)
117 await pipelinePromise(response.Body as Readable, file)
118
119 file.close()
120}
121
122function buildKey (key: string, bucketInfo: BucketInfo) {
123 return bucketInfo.PREFIX + key
124}
125
126// ---------------------------------------------------------------------------
127
128export {
129 BucketInfo,
130 buildKey,
131 storeObject,
132 removeObject,
133 removePrefix,
134 makeAvailable
135}
136
137// ---------------------------------------------------------------------------
138
139async function objectStoragePut (options: {
140 objectStorageKey: string
141 content: ReadStream
142 bucketInfo: BucketInfo
143}) {
144 const { objectStorageKey, content, bucketInfo } = options
145
146 const command = new PutObjectCommand({
147 Bucket: bucketInfo.BUCKET_NAME,
148 Key: buildKey(objectStorageKey, bucketInfo),
149 Body: content
150 })
151
152 await getClient().send(command)
153
154 return getPrivateUrl(bucketInfo, objectStorageKey)
155}
156
157async function multiPartUpload (options: {
158 inputPath: string
159 objectStorageKey: string
160 bucketInfo: BucketInfo
161}) {
162 const { objectStorageKey, inputPath, bucketInfo } = options
163
164 const key = buildKey(objectStorageKey, bucketInfo)
165 const s3Client = getClient()
166
167 const statResult = await stat(inputPath)
168
169 const createMultipartCommand = new CreateMultipartUploadCommand({
170 Bucket: bucketInfo.BUCKET_NAME,
171 Key: key
172 })
173 const createResponse = await s3Client.send(createMultipartCommand)
174
175 const fd = await open(inputPath, 'r')
176 let partNumber = 1
177 const parts: CompletedPart[] = []
178 const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
179
180 for (let start = 0; start < statResult.size; start += partSize) {
181 logger.debug(
182 'Uploading part %d of file to %s%s in bucket %s',
183 partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
184 )
185
186 // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
187 // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
188 // streams with start and end set, so it just tries to stat the file in stream.path.
189 // This fails for us because we only want to send part of the file. The stream type
190 // is modified so we can set the byteLength here, which s3 detects because array buffers
191 // have this field set
192 const stream: ReadStream & { byteLength: number } =
193 createReadStream(
194 inputPath,
195 { fd, autoClose: false, start, end: (start + partSize) - 1 }
196 ) as ReadStream & { byteLength: number }
197
198 // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
199 stream.byteLength = min([ statResult.size - start, partSize ])
200
201 const uploadPartCommand = new UploadPartCommand({
202 Bucket: bucketInfo.BUCKET_NAME,
203 Key: key,
204 UploadId: createResponse.UploadId,
205 PartNumber: partNumber,
206 Body: stream
207 })
208 const uploadResponse = await s3Client.send(uploadPartCommand)
209
210 parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
211 partNumber += 1
212 }
213 await close(fd)
214
215 const completeUploadCommand = new CompleteMultipartUploadCommand({
216 Bucket: bucketInfo.BUCKET_NAME,
217 Key: objectStorageKey,
218 UploadId: createResponse.UploadId,
219 MultipartUpload: { Parts: parts }
220 })
221 await s3Client.send(completeUploadCommand)
222
223 logger.debug(
224 'Completed %s%s in bucket %s in %d parts',
225 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
226 )
227
228 return getPrivateUrl(bucketInfo, objectStorageKey)
229}
diff --git a/server/lib/object-storage/urls.ts b/server/lib/object-storage/urls.ts
new file mode 100644
index 000000000..2a889190b
--- /dev/null
+++ b/server/lib/object-storage/urls.ts
@@ -0,0 +1,40 @@
1import { CONFIG } from '@server/initializers/config'
2import { BucketInfo, buildKey, getEndpointParsed } from './shared'
3
4function getPrivateUrl (config: BucketInfo, keyWithoutPrefix: string) {
5 return getBaseUrl(config) + buildKey(keyWithoutPrefix, config)
6}
7
8function getWebTorrentPublicFileUrl (fileUrl: string) {
9 const baseUrl = CONFIG.OBJECT_STORAGE.VIDEOS.BASE_URL
10 if (!baseUrl) return fileUrl
11
12 return replaceByBaseUrl(fileUrl, baseUrl)
13}
14
15function getHLSPublicFileUrl (fileUrl: string) {
16 const baseUrl = CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.BASE_URL
17 if (!baseUrl) return fileUrl
18
19 return replaceByBaseUrl(fileUrl, baseUrl)
20}
21
22export {
23 getPrivateUrl,
24 getWebTorrentPublicFileUrl,
25 replaceByBaseUrl,
26 getHLSPublicFileUrl
27}
28
29// ---------------------------------------------------------------------------
30
31function getBaseUrl (bucketInfo: BucketInfo, baseUrl?: string) {
32 if (baseUrl) return baseUrl
33
34 return `${getEndpointParsed().protocol}//${bucketInfo.BUCKET_NAME}.${getEndpointParsed().host}/`
35}
36
37const regex = new RegExp('https?://[^/]+')
38function replaceByBaseUrl (fileUrl: string, baseUrl: string) {
39 return fileUrl.replace(regex, baseUrl)
40}
diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts
new file mode 100644
index 000000000..15b8f58d5
--- /dev/null
+++ b/server/lib/object-storage/videos.ts
@@ -0,0 +1,72 @@
1import { join } from 'path'
2import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config'
4import { MStreamingPlaylist, MVideoFile, MVideoUUID } from '@server/types/models'
5import { getHLSDirectory } from '../paths'
6import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys'
7import { lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
8
9function storeHLSFile (playlist: MStreamingPlaylist, video: MVideoUUID, filename: string) {
10 const baseHlsDirectory = getHLSDirectory(video)
11
12 return storeObject({
13 inputPath: join(baseHlsDirectory, filename),
14 objectStorageKey: generateHLSObjectStorageKey(playlist, video, filename),
15 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
16 })
17}
18
19function storeWebTorrentFile (filename: string) {
20 return storeObject({
21 inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename),
22 objectStorageKey: generateWebTorrentObjectStorageKey(filename),
23 bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS
24 })
25}
26
27function removeHLSObjectStorage (playlist: MStreamingPlaylist, video: MVideoUUID) {
28 return removePrefix(generateHLSObjectBaseStorageKey(playlist, video), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
29}
30
31function removeWebTorrentObjectStorage (videoFile: MVideoFile) {
32 return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS)
33}
34
35async function makeHLSFileAvailable (playlist: MStreamingPlaylist, video: MVideoUUID, filename: string, destination: string) {
36 const key = generateHLSObjectStorageKey(playlist, video, filename)
37
38 logger.info('Fetching HLS file %s from object storage to %s.', key, destination, lTags())
39
40 await makeAvailable({
41 key,
42 destination,
43 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
44 })
45
46 return destination
47}
48
49async function makeWebTorrentFileAvailable (filename: string, destination: string) {
50 const key = generateWebTorrentObjectStorageKey(filename)
51
52 logger.info('Fetching WebTorrent file %s from object storage to %s.', key, destination, lTags())
53
54 await makeAvailable({
55 key,
56 destination,
57 bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS
58 })
59
60 return destination
61}
62
63export {
64 storeWebTorrentFile,
65 storeHLSFile,
66
67 removeHLSObjectStorage,
68 removeWebTorrentObjectStorage,
69
70 makeWebTorrentFileAvailable,
71 makeHLSFileAvailable
72}