aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/tests/api/object-storage
diff options
context:
space:
mode:
authorJelle Besseling <jelle@pingiun.com>2021-08-17 08:26:20 +0200
committerGitHub <noreply@github.com>2021-08-17 08:26:20 +0200
commit0305db28c98fd6cf43a3c50ba92c76215e99d512 (patch)
tree33b753a19728d9f453c1aa4f19b36ac797e5fe80 /server/tests/api/object-storage
parentf88ae8f5bc223579313b28582de9101944a4a814 (diff)
downloadPeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.tar.gz
PeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.tar.zst
PeerTube-0305db28c98fd6cf43a3c50ba92c76215e99d512.zip
Add support for saving video files to object storage (#4290)
* Add support for saving video files to object storage * Add support for custom url generation on s3 stored files Uses two config keys to support url generation that doesn't directly go to (compatible s3). Can be used to generate urls to any cache server or CDN. * Upload files to s3 concurrently and delete originals afterwards * Only publish after move to object storage is complete * Use base url instead of url template * Fix mistyped config field * Add rudenmentary way to download before transcode * Implement Chocobozzz suggestions https://github.com/Chocobozzz/PeerTube/pull/4290#issuecomment-891670478 The remarks in question: Try to use objectStorage prefix instead of s3 prefix for your function/variables/config names Prefer to use a tree for the config: s3.streaming_playlists_bucket -> object_storage.streaming_playlists.bucket Use uppercase for config: S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket -> OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET (maybe BUCKET_NAME instead of BUCKET) I suggest to rename moveJobsRunning to pendingMovingJobs (or better, create a dedicated videoJobInfo table with a pendingMove & videoId columns so we could also use this table to track pending transcoding jobs) https://github.com/Chocobozzz/PeerTube/pull/4290/files#diff-3e26d41ca4bda1de8e1747af70ca2af642abcc1e9e0bfb94239ff2165acfbde5R19 uses a string instead of an integer I think we should store the origin object storage URL in fileUrl, without base_url injection. Instead, inject the base_url at "runtime" so admins can easily change this configuration without running a script to update DB URLs * Import correct function * Support multipart upload * Remove import of node 15.0 module stream/promises * Extend maximum upload job length Using the same value as for redundancy downloading seems logical * Use dynamic part size for really large uploads Also adds very small part size for local testing * Fix decreasePendingMove query * Resolve various PR comments * Move to object storage after optimize * Make upload size configurable and increase default * Prune webtorrent files that are stored in object storage * Move files after transcoding jobs * Fix federation * Add video path manager * Support move to external storage job in client * Fix live object storage tests Co-authored-by: Chocobozzz <me@florianbigard.com>
Diffstat (limited to 'server/tests/api/object-storage')
-rw-r--r--server/tests/api/object-storage/index.ts3
-rw-r--r--server/tests/api/object-storage/live.ts136
-rw-r--r--server/tests/api/object-storage/video-imports.ts112
-rw-r--r--server/tests/api/object-storage/videos.ts391
4 files changed, 642 insertions, 0 deletions
diff --git a/server/tests/api/object-storage/index.ts b/server/tests/api/object-storage/index.ts
new file mode 100644
index 000000000..f319d6ef5
--- /dev/null
+++ b/server/tests/api/object-storage/index.ts
@@ -0,0 +1,3 @@
1export * from './live'
2export * from './video-imports'
3export * from './videos'
diff --git a/server/tests/api/object-storage/live.ts b/server/tests/api/object-storage/live.ts
new file mode 100644
index 000000000..d3e6777f2
--- /dev/null
+++ b/server/tests/api/object-storage/live.ts
@@ -0,0 +1,136 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import 'mocha'
4import * as chai from 'chai'
5import { FfmpegCommand } from 'fluent-ffmpeg'
6import {
7 areObjectStorageTestsDisabled,
8 createMultipleServers,
9 doubleFollow,
10 expectStartWith,
11 killallServers,
12 makeRawRequest,
13 ObjectStorageCommand,
14 PeerTubeServer,
15 setAccessTokensToServers,
16 setDefaultVideoChannel,
17 stopFfmpeg,
18 waitJobs,
19 waitUntilLivePublishedOnAllServers,
20 waitUntilLiveSavedOnAllServers
21} from '@shared/extra-utils'
22import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models'
23
24const expect = chai.expect
25
26async function createLive (server: PeerTubeServer) {
27 const attributes: LiveVideoCreate = {
28 channelId: server.store.channel.id,
29 privacy: VideoPrivacy.PUBLIC,
30 name: 'my super live',
31 saveReplay: true
32 }
33
34 const { uuid } = await server.live.create({ fields: attributes })
35
36 return uuid
37}
38
39async function checkFiles (files: VideoFile[]) {
40 for (const file of files) {
41 expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl())
42
43 await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
44 }
45}
46
47describe('Object storage for lives', function () {
48 if (areObjectStorageTestsDisabled()) return
49
50 let ffmpegCommand: FfmpegCommand
51 let servers: PeerTubeServer[]
52 let videoUUID: string
53
54 before(async function () {
55 this.timeout(120000)
56
57 await ObjectStorageCommand.prepareDefaultBuckets()
58
59 servers = await createMultipleServers(2, ObjectStorageCommand.getDefaultConfig())
60
61 await setAccessTokensToServers(servers)
62 await setDefaultVideoChannel(servers)
63 await doubleFollow(servers[0], servers[1])
64
65 await servers[0].config.enableTranscoding()
66 })
67
68 describe('Without live transcoding', async function () {
69
70 before(async function () {
71 await servers[0].config.enableLive({ transcoding: false })
72
73 videoUUID = await createLive(servers[0])
74 })
75
76 it('Should create a live and save the replay on object storage', async function () {
77 this.timeout(220000)
78
79 ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
80 await waitUntilLivePublishedOnAllServers(servers, videoUUID)
81
82 await stopFfmpeg(ffmpegCommand)
83
84 await waitUntilLiveSavedOnAllServers(servers, videoUUID)
85 await waitJobs(servers)
86
87 for (const server of servers) {
88 const video = await server.videos.get({ id: videoUUID })
89
90 expect(video.files).to.have.lengthOf(0)
91 expect(video.streamingPlaylists).to.have.lengthOf(1)
92
93 const files = video.streamingPlaylists[0].files
94
95 await checkFiles(files)
96 }
97 })
98 })
99
100 describe('With live transcoding', async function () {
101
102 before(async function () {
103 await servers[0].config.enableLive({ transcoding: true })
104
105 videoUUID = await createLive(servers[0])
106 })
107
108 it('Should import a video and have sent it to object storage', async function () {
109 this.timeout(240000)
110
111 ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
112 await waitUntilLivePublishedOnAllServers(servers, videoUUID)
113
114 await stopFfmpeg(ffmpegCommand)
115
116 await waitUntilLiveSavedOnAllServers(servers, videoUUID)
117 await waitJobs(servers)
118
119 for (const server of servers) {
120 const video = await server.videos.get({ id: videoUUID })
121
122 expect(video.files).to.have.lengthOf(0)
123 expect(video.streamingPlaylists).to.have.lengthOf(1)
124
125 const files = video.streamingPlaylists[0].files
126 expect(files).to.have.lengthOf(4)
127
128 await checkFiles(files)
129 }
130 })
131 })
132
133 after(async function () {
134 await killallServers(servers)
135 })
136})
diff --git a/server/tests/api/object-storage/video-imports.ts b/server/tests/api/object-storage/video-imports.ts
new file mode 100644
index 000000000..efc01f550
--- /dev/null
+++ b/server/tests/api/object-storage/video-imports.ts
@@ -0,0 +1,112 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import 'mocha'
4import * as chai from 'chai'
5import {
6 areObjectStorageTestsDisabled,
7 createSingleServer,
8 expectStartWith,
9 FIXTURE_URLS,
10 killallServers,
11 makeRawRequest,
12 ObjectStorageCommand,
13 PeerTubeServer,
14 setAccessTokensToServers,
15 setDefaultVideoChannel,
16 waitJobs
17} from '@shared/extra-utils'
18import { HttpStatusCode, VideoPrivacy } from '@shared/models'
19
20const expect = chai.expect
21
22async function importVideo (server: PeerTubeServer) {
23 const attributes = {
24 name: 'import 2',
25 privacy: VideoPrivacy.PUBLIC,
26 channelId: server.store.channel.id,
27 targetUrl: FIXTURE_URLS.goodVideo720
28 }
29
30 const { video: { uuid } } = await server.imports.importVideo({ attributes })
31
32 return uuid
33}
34
35describe('Object storage for video import', function () {
36 if (areObjectStorageTestsDisabled()) return
37
38 let server: PeerTubeServer
39
40 before(async function () {
41 this.timeout(120000)
42
43 await ObjectStorageCommand.prepareDefaultBuckets()
44
45 server = await createSingleServer(1, ObjectStorageCommand.getDefaultConfig())
46
47 await setAccessTokensToServers([ server ])
48 await setDefaultVideoChannel([ server ])
49
50 await server.config.enableImports()
51 })
52
53 describe('Without transcoding', async function () {
54
55 before(async function () {
56 await server.config.disableTranscoding()
57 })
58
59 it('Should import a video and have sent it to object storage', async function () {
60 this.timeout(120000)
61
62 const uuid = await importVideo(server)
63 await waitJobs(server)
64
65 const video = await server.videos.get({ id: uuid })
66
67 expect(video.files).to.have.lengthOf(1)
68 expect(video.streamingPlaylists).to.have.lengthOf(0)
69
70 const fileUrl = video.files[0].fileUrl
71 expectStartWith(fileUrl, ObjectStorageCommand.getWebTorrentBaseUrl())
72
73 await makeRawRequest(fileUrl, HttpStatusCode.OK_200)
74 })
75 })
76
77 describe('With transcoding', async function () {
78
79 before(async function () {
80 await server.config.enableTranscoding()
81 })
82
83 it('Should import a video and have sent it to object storage', async function () {
84 this.timeout(120000)
85
86 const uuid = await importVideo(server)
87 await waitJobs(server)
88
89 const video = await server.videos.get({ id: uuid })
90
91 expect(video.files).to.have.lengthOf(4)
92 expect(video.streamingPlaylists).to.have.lengthOf(1)
93 expect(video.streamingPlaylists[0].files).to.have.lengthOf(4)
94
95 for (const file of video.files) {
96 expectStartWith(file.fileUrl, ObjectStorageCommand.getWebTorrentBaseUrl())
97
98 await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
99 }
100
101 for (const file of video.streamingPlaylists[0].files) {
102 expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl())
103
104 await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
105 }
106 })
107 })
108
109 after(async function () {
110 await killallServers([ server ])
111 })
112})
diff --git a/server/tests/api/object-storage/videos.ts b/server/tests/api/object-storage/videos.ts
new file mode 100644
index 000000000..3958bd3d7
--- /dev/null
+++ b/server/tests/api/object-storage/videos.ts
@@ -0,0 +1,391 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import 'mocha'
4import * as chai from 'chai'
5import { merge } from 'lodash'
6import {
7 areObjectStorageTestsDisabled,
8 checkTmpIsEmpty,
9 cleanupTests,
10 createMultipleServers,
11 createSingleServer,
12 doubleFollow,
13 expectStartWith,
14 killallServers,
15 makeRawRequest,
16 MockObjectStorage,
17 ObjectStorageCommand,
18 PeerTubeServer,
19 setAccessTokensToServers,
20 waitJobs,
21 webtorrentAdd
22} from '@shared/extra-utils'
23import { HttpStatusCode, VideoDetails } from '@shared/models'
24
25const expect = chai.expect
26
27async function checkFiles (options: {
28 video: VideoDetails
29
30 baseMockUrl?: string
31
32 playlistBucket: string
33 playlistPrefix?: string
34
35 webtorrentBucket: string
36 webtorrentPrefix?: string
37}) {
38 const {
39 video,
40 playlistBucket,
41 webtorrentBucket,
42 baseMockUrl,
43 playlistPrefix,
44 webtorrentPrefix
45 } = options
46
47 let allFiles = video.files
48
49 for (const file of video.files) {
50 const baseUrl = baseMockUrl
51 ? `${baseMockUrl}/${webtorrentBucket}/`
52 : `http://${webtorrentBucket}.${ObjectStorageCommand.getEndpointHost()}/`
53
54 const prefix = webtorrentPrefix || ''
55 const start = baseUrl + prefix
56
57 expectStartWith(file.fileUrl, start)
58
59 const res = await makeRawRequest(file.fileDownloadUrl, HttpStatusCode.FOUND_302)
60 const location = res.headers['location']
61 expectStartWith(location, start)
62
63 await makeRawRequest(location, HttpStatusCode.OK_200)
64 }
65
66 const hls = video.streamingPlaylists[0]
67
68 if (hls) {
69 allFiles = allFiles.concat(hls.files)
70
71 const baseUrl = baseMockUrl
72 ? `${baseMockUrl}/${playlistBucket}/`
73 : `http://${playlistBucket}.${ObjectStorageCommand.getEndpointHost()}/`
74
75 const prefix = playlistPrefix || ''
76 const start = baseUrl + prefix
77
78 expectStartWith(hls.playlistUrl, start)
79 expectStartWith(hls.segmentsSha256Url, start)
80
81 await makeRawRequest(hls.playlistUrl, HttpStatusCode.OK_200)
82
83 const resSha = await makeRawRequest(hls.segmentsSha256Url, HttpStatusCode.OK_200)
84 expect(JSON.stringify(resSha.body)).to.not.throw
85
86 for (const file of hls.files) {
87 expectStartWith(file.fileUrl, start)
88
89 const res = await makeRawRequest(file.fileDownloadUrl, HttpStatusCode.FOUND_302)
90 const location = res.headers['location']
91 expectStartWith(location, start)
92
93 await makeRawRequest(location, HttpStatusCode.OK_200)
94 }
95 }
96
97 for (const file of allFiles) {
98 const torrent = await webtorrentAdd(file.magnetUri, true)
99
100 expect(torrent.files).to.be.an('array')
101 expect(torrent.files.length).to.equal(1)
102 expect(torrent.files[0].path).to.exist.and.to.not.equal('')
103
104 const res = await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
105 expect(res.body).to.have.length.above(100)
106 }
107
108 return allFiles.map(f => f.fileUrl)
109}
110
111function runTestSuite (options: {
112 playlistBucket: string
113 playlistPrefix?: string
114
115 webtorrentBucket: string
116 webtorrentPrefix?: string
117
118 useMockBaseUrl?: boolean
119
120 maxUploadPart?: string
121}) {
122 const mockObjectStorage = new MockObjectStorage()
123 let baseMockUrl: string
124
125 let servers: PeerTubeServer[]
126
127 let keptUrls: string[] = []
128
129 const uuidsToDelete: string[] = []
130 let deletedUrls: string[] = []
131
132 before(async function () {
133 this.timeout(120000)
134
135 const port = await mockObjectStorage.initialize()
136 baseMockUrl = options.useMockBaseUrl ? `http://localhost:${port}` : undefined
137
138 await ObjectStorageCommand.createBucket(options.playlistBucket)
139 await ObjectStorageCommand.createBucket(options.webtorrentBucket)
140
141 const config = {
142 object_storage: {
143 enabled: true,
144 endpoint: 'http://' + ObjectStorageCommand.getEndpointHost(),
145 region: ObjectStorageCommand.getRegion(),
146
147 credentials: ObjectStorageCommand.getCredentialsConfig(),
148
149 max_upload_part: options.maxUploadPart || '2MB',
150
151 streaming_playlists: {
152 bucket_name: options.playlistBucket,
153 prefix: options.playlistPrefix,
154 base_url: baseMockUrl
155 ? `${baseMockUrl}/${options.playlistBucket}`
156 : undefined
157 },
158
159 videos: {
160 bucket_name: options.webtorrentBucket,
161 prefix: options.webtorrentPrefix,
162 base_url: baseMockUrl
163 ? `${baseMockUrl}/${options.webtorrentBucket}`
164 : undefined
165 }
166 }
167 }
168
169 servers = await createMultipleServers(2, config)
170
171 await setAccessTokensToServers(servers)
172 await doubleFollow(servers[0], servers[1])
173
174 for (const server of servers) {
175 const { uuid } = await server.videos.quickUpload({ name: 'video to keep' })
176 await waitJobs(servers)
177
178 const files = await server.videos.listFiles({ id: uuid })
179 keptUrls = keptUrls.concat(files.map(f => f.fileUrl))
180 }
181 })
182
183 it('Should upload a video and move it to the object storage without transcoding', async function () {
184 this.timeout(20000)
185
186 const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1' })
187 uuidsToDelete.push(uuid)
188
189 await waitJobs(servers)
190
191 for (const server of servers) {
192 const video = await server.videos.get({ id: uuid })
193 const files = await checkFiles({ ...options, video, baseMockUrl })
194
195 deletedUrls = deletedUrls.concat(files)
196 }
197 })
198
199 it('Should upload a video and move it to the object storage with transcoding', async function () {
200 this.timeout(40000)
201
202 const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2' })
203 uuidsToDelete.push(uuid)
204
205 await waitJobs(servers)
206
207 for (const server of servers) {
208 const video = await server.videos.get({ id: uuid })
209 const files = await checkFiles({ ...options, video, baseMockUrl })
210
211 deletedUrls = deletedUrls.concat(files)
212 }
213 })
214
215 it('Should correctly delete the files', async function () {
216 await servers[0].videos.remove({ id: uuidsToDelete[0] })
217 await servers[1].videos.remove({ id: uuidsToDelete[1] })
218
219 await waitJobs(servers)
220
221 for (const url of deletedUrls) {
222 await makeRawRequest(url, HttpStatusCode.NOT_FOUND_404)
223 }
224 })
225
226 it('Should have kept other files', async function () {
227 for (const url of keptUrls) {
228 await makeRawRequest(url, HttpStatusCode.OK_200)
229 }
230 })
231
232 it('Should have an empty tmp directory', async function () {
233 for (const server of servers) {
234 await checkTmpIsEmpty(server)
235 }
236 })
237
238 after(async function () {
239 mockObjectStorage.terminate()
240
241 await cleanupTests(servers)
242 })
243}
244
245describe('Object storage for videos', function () {
246 if (areObjectStorageTestsDisabled()) return
247
248 describe('Test config', function () {
249 let server: PeerTubeServer
250
251 const baseConfig = {
252 object_storage: {
253 enabled: true,
254 endpoint: 'http://' + ObjectStorageCommand.getEndpointHost(),
255 region: ObjectStorageCommand.getRegion(),
256
257 credentials: ObjectStorageCommand.getCredentialsConfig(),
258
259 streaming_playlists: {
260 bucket_name: ObjectStorageCommand.DEFAULT_PLAYLIST_BUCKET
261 },
262
263 videos: {
264 bucket_name: ObjectStorageCommand.DEFAULT_WEBTORRENT_BUCKET
265 }
266 }
267 }
268
269 const badCredentials = {
270 access_key_id: 'AKIAIOSFODNN7EXAMPLE',
271 secret_access_key: 'aJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
272 }
273
274 it('Should fail with same bucket names without prefix', function (done) {
275 const config = merge({}, baseConfig, {
276 object_storage: {
277 streaming_playlists: {
278 bucket_name: 'aaa'
279 },
280
281 videos: {
282 bucket_name: 'aaa'
283 }
284 }
285 })
286
287 createSingleServer(1, config)
288 .then(() => done(new Error('Did not throw')))
289 .catch(() => done())
290 })
291
292 it('Should fail with bad credentials', async function () {
293 this.timeout(60000)
294
295 await ObjectStorageCommand.prepareDefaultBuckets()
296
297 const config = merge({}, baseConfig, {
298 object_storage: {
299 credentials: badCredentials
300 }
301 })
302
303 server = await createSingleServer(1, config)
304 await setAccessTokensToServers([ server ])
305
306 const { uuid } = await server.videos.quickUpload({ name: 'video' })
307
308 await waitJobs([ server ], true)
309 const video = await server.videos.get({ id: uuid })
310
311 expectStartWith(video.files[0].fileUrl, server.url)
312
313 await killallServers([ server ])
314 })
315
316 it('Should succeed with credentials from env', async function () {
317 this.timeout(60000)
318
319 await ObjectStorageCommand.prepareDefaultBuckets()
320
321 const config = merge({}, baseConfig, {
322 object_storage: {
323 credentials: {
324 access_key_id: '',
325 secret_access_key: ''
326 }
327 }
328 })
329
330 const goodCredentials = ObjectStorageCommand.getCredentialsConfig()
331
332 server = await createSingleServer(1, config, {
333 env: {
334 AWS_ACCESS_KEY_ID: goodCredentials.access_key_id,
335 AWS_SECRET_ACCESS_KEY: goodCredentials.secret_access_key
336 }
337 })
338
339 await setAccessTokensToServers([ server ])
340
341 const { uuid } = await server.videos.quickUpload({ name: 'video' })
342
343 await waitJobs([ server ], true)
344 const video = await server.videos.get({ id: uuid })
345
346 expectStartWith(video.files[0].fileUrl, ObjectStorageCommand.getWebTorrentBaseUrl())
347 })
348
349 after(async function () {
350 await killallServers([ server ])
351 })
352 })
353
354 describe('Test simple object storage', function () {
355 runTestSuite({
356 playlistBucket: 'streaming-playlists',
357 webtorrentBucket: 'videos'
358 })
359 })
360
361 describe('Test object storage with prefix', function () {
362 runTestSuite({
363 playlistBucket: 'mybucket',
364 webtorrentBucket: 'mybucket',
365
366 playlistPrefix: 'streaming-playlists_',
367 webtorrentPrefix: 'webtorrent_'
368 })
369 })
370
371 describe('Test object storage with prefix and base URL', function () {
372 runTestSuite({
373 playlistBucket: 'mybucket',
374 webtorrentBucket: 'mybucket',
375
376 playlistPrefix: 'streaming-playlists_',
377 webtorrentPrefix: 'webtorrent_',
378
379 useMockBaseUrl: true
380 })
381 })
382
383 describe('Test object storage with small upload part', function () {
384 runTestSuite({
385 playlistBucket: 'streaming-playlists',
386 webtorrentBucket: 'videos',
387
388 maxUploadPart: '5KB'
389 })
390 })
391})