aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/tests/src/api/runners
diff options
context:
space:
mode:
Diffstat (limited to 'packages/tests/src/api/runners')
-rw-r--r--packages/tests/src/api/runners/index.ts5
-rw-r--r--packages/tests/src/api/runners/runner-common.ts744
-rw-r--r--packages/tests/src/api/runners/runner-live-transcoding.ts332
-rw-r--r--packages/tests/src/api/runners/runner-socket.ts120
-rw-r--r--packages/tests/src/api/runners/runner-studio-transcoding.ts169
-rw-r--r--packages/tests/src/api/runners/runner-vod-transcoding.ts545
6 files changed, 1915 insertions, 0 deletions
diff --git a/packages/tests/src/api/runners/index.ts b/packages/tests/src/api/runners/index.ts
new file mode 100644
index 000000000..441ddc874
--- /dev/null
+++ b/packages/tests/src/api/runners/index.ts
@@ -0,0 +1,5 @@
1export * from './runner-common.js'
2export * from './runner-live-transcoding.js'
3export * from './runner-socket.js'
4export * from './runner-studio-transcoding.js'
5export * from './runner-vod-transcoding.js'
diff --git a/packages/tests/src/api/runners/runner-common.ts b/packages/tests/src/api/runners/runner-common.ts
new file mode 100644
index 000000000..53ea321d0
--- /dev/null
+++ b/packages/tests/src/api/runners/runner-common.ts
@@ -0,0 +1,744 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import { wait } from '@peertube/peertube-core-utils'
4import {
5 HttpStatusCode,
6 Runner,
7 RunnerJob,
8 RunnerJobAdmin,
9 RunnerJobState,
10 RunnerJobStateType,
11 RunnerJobVODWebVideoTranscodingPayload,
12 RunnerRegistrationToken
13} from '@peertube/peertube-models'
14import {
15 PeerTubeServer,
16 cleanupTests,
17 createSingleServer,
18 setAccessTokensToServers,
19 setDefaultVideoChannel,
20 waitJobs
21} from '@peertube/peertube-server-commands'
22import { expect } from 'chai'
23
24describe('Test runner common actions', function () {
25 let server: PeerTubeServer
26 let registrationToken: string
27 let runnerToken: string
28 let jobMaxPriority: string
29
30 before(async function () {
31 this.timeout(120_000)
32
33 server = await createSingleServer(1, {
34 remote_runners: {
35 stalled_jobs: {
36 vod: '5 seconds'
37 }
38 }
39 })
40
41 await setAccessTokensToServers([ server ])
42 await setDefaultVideoChannel([ server ])
43
44 await server.config.enableTranscoding({ hls: true, webVideo: true })
45 await server.config.enableRemoteTranscoding()
46 })
47
48 describe('Managing runner registration tokens', function () {
49 let base: RunnerRegistrationToken[]
50 let registrationTokenToDelete: RunnerRegistrationToken
51
52 it('Should have a default registration token', async function () {
53 const { total, data } = await server.runnerRegistrationTokens.list()
54
55 expect(total).to.equal(1)
56 expect(data).to.have.lengthOf(1)
57
58 const token = data[0]
59 expect(token.id).to.exist
60 expect(token.createdAt).to.exist
61 expect(token.updatedAt).to.exist
62 expect(token.registeredRunnersCount).to.equal(0)
63 expect(token.registrationToken).to.exist
64 })
65
66 it('Should create other registration tokens', async function () {
67 await server.runnerRegistrationTokens.generate()
68 await server.runnerRegistrationTokens.generate()
69
70 const { total, data } = await server.runnerRegistrationTokens.list()
71 expect(total).to.equal(3)
72 expect(data).to.have.lengthOf(3)
73 })
74
75 it('Should list registration tokens', async function () {
76 {
77 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
78 expect(total).to.equal(3)
79 expect(data).to.have.lengthOf(3)
80 expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
81 expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))
82
83 base = data
84
85 registrationTokenToDelete = data[0]
86 registrationToken = data[1].registrationToken
87 }
88
89 {
90 const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
91 expect(total).to.equal(3)
92 expect(data).to.have.lengthOf(1)
93 expect(data[0].registrationToken).to.equal(base[0].registrationToken)
94 }
95 })
96
97 it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
98 await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
99 await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })
100
101 const { data } = await server.runnerRegistrationTokens.list()
102
103 for (const d of data) {
104 if (d.registrationToken === registrationTokenToDelete.registrationToken) {
105 expect(d.registeredRunnersCount).to.equal(2)
106 } else {
107 expect(d.registeredRunnersCount).to.equal(0)
108 }
109 }
110
111 const { data: runners } = await server.runners.list()
112 expect(runners).to.have.lengthOf(2)
113 })
114
115 it('Should delete a registration token', async function () {
116 await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })
117
118 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
119 expect(total).to.equal(2)
120 expect(data).to.have.lengthOf(2)
121
122 for (const d of data) {
123 expect(d.registeredRunnersCount).to.equal(0)
124 expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
125 }
126 })
127
128 it('Should have removed runners of this registration token', async function () {
129 const { data: runners } = await server.runners.list()
130 expect(runners).to.have.lengthOf(0)
131 })
132 })
133
134 describe('Managing runners', function () {
135 let toDelete: Runner
136
137 it('Should not have runners available', async function () {
138 const { total, data } = await server.runners.list()
139
140 expect(data).to.have.lengthOf(0)
141 expect(total).to.equal(0)
142 })
143
144 it('Should register runners', async function () {
145 const now = new Date()
146
147 const result = await server.runners.register({
148 name: 'runner 1',
149 description: 'my super runner 1',
150 registrationToken
151 })
152 expect(result.runnerToken).to.exist
153 runnerToken = result.runnerToken
154
155 await server.runners.register({
156 name: 'runner 2',
157 registrationToken
158 })
159
160 const { total, data } = await server.runners.list({ sort: 'createdAt' })
161 expect(total).to.equal(2)
162 expect(data).to.have.lengthOf(2)
163
164 for (const d of data) {
165 expect(d.id).to.exist
166 expect(d.createdAt).to.exist
167 expect(d.updatedAt).to.exist
168 expect(new Date(d.createdAt)).to.be.above(now)
169 expect(new Date(d.updatedAt)).to.be.above(now)
170 expect(new Date(d.lastContact)).to.be.above(now)
171 expect(d.ip).to.exist
172 }
173
174 expect(data[0].name).to.equal('runner 1')
175 expect(data[0].description).to.equal('my super runner 1')
176
177 expect(data[1].name).to.equal('runner 2')
178 expect(data[1].description).to.be.null
179
180 toDelete = data[1]
181 })
182
183 it('Should list runners', async function () {
184 const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })
185
186 expect(total).to.equal(2)
187 expect(data).to.have.lengthOf(1)
188 expect(data[0].name).to.equal('runner 1')
189 })
190
191 it('Should delete a runner', async function () {
192 await server.runners.delete({ id: toDelete.id })
193
194 const { total, data } = await server.runners.list()
195
196 expect(total).to.equal(1)
197 expect(data).to.have.lengthOf(1)
198 expect(data[0].name).to.equal('runner 1')
199 })
200
201 it('Should unregister a runner', async function () {
202 const registered = await server.runners.autoRegisterRunner()
203
204 {
205 const { total, data } = await server.runners.list()
206 expect(total).to.equal(2)
207 expect(data).to.have.lengthOf(2)
208 }
209
210 await server.runners.unregister({ runnerToken: registered })
211
212 {
213 const { total, data } = await server.runners.list()
214 expect(total).to.equal(1)
215 expect(data).to.have.lengthOf(1)
216 expect(data[0].name).to.equal('runner 1')
217 }
218 })
219 })
220
221 describe('Managing runner jobs', function () {
222 let jobUUID: string
223 let jobToken: string
224 let lastRunnerContact: Date
225 let failedJob: RunnerJob
226
227 async function checkMainJobState (
228 mainJobState: RunnerJobStateType,
229 otherJobStates: RunnerJobStateType[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
230 ) {
231 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
232
233 for (const job of data) {
234 if (job.uuid === jobUUID) {
235 expect(job.state.id).to.equal(mainJobState)
236 } else {
237 expect(otherJobStates).to.include(job.state.id)
238 }
239 }
240 }
241
242 function getMainJob () {
243 return server.runnerJobs.getJob({ uuid: jobUUID })
244 }
245
246 describe('List jobs', function () {
247
248 it('Should not have jobs', async function () {
249 const { total, data } = await server.runnerJobs.list()
250
251 expect(data).to.have.lengthOf(0)
252 expect(total).to.equal(0)
253 })
254
255 it('Should upload a video and have available jobs', async function () {
256 await server.videos.quickUpload({ name: 'to transcode' })
257 await waitJobs([ server ])
258
259 const { total, data } = await server.runnerJobs.list()
260
261 expect(data).to.have.lengthOf(10)
262 expect(total).to.equal(10)
263
264 for (const job of data) {
265 expect(job.startedAt).to.not.exist
266 expect(job.finishedAt).to.not.exist
267 expect(job.payload).to.exist
268 expect(job.privatePayload).to.exist
269 }
270
271 const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
272 const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')
273
274 expect(hlsJobs).to.have.lengthOf(5)
275 expect(webVideoJobs).to.have.lengthOf(5)
276
277 const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
278 const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)
279
280 expect(pendingJobs).to.have.lengthOf(1)
281 expect(waitingJobs).to.have.lengthOf(9)
282 })
283
284 it('Should upload another video and list/sort jobs', async function () {
285 await server.videos.quickUpload({ name: 'to transcode 2' })
286 await waitJobs([ server ])
287
288 {
289 const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })
290
291 expect(data).to.have.lengthOf(20)
292 expect(total).to.equal(20)
293
294 jobUUID = data[16].uuid
295 }
296
297 {
298 const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
299 expect(total).to.equal(20)
300
301 expect(data).to.have.lengthOf(1)
302 expect(data[0].uuid).to.equal(jobUUID)
303 }
304
305 {
306 let previousPriority = Infinity
307 const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
308 expect(total).to.equal(20)
309
310 for (const job of data) {
311 expect(job.priority).to.be.at.most(previousPriority)
312 previousPriority = job.priority
313
314 if (job.state.id === RunnerJobState.PENDING) {
315 jobMaxPriority = job.uuid
316 }
317 }
318 }
319 })
320
321 it('Should search jobs', async function () {
322 {
323 const { total, data } = await server.runnerJobs.list({ search: jobUUID })
324
325 expect(data).to.have.lengthOf(1)
326 expect(total).to.equal(1)
327
328 expect(data[0].uuid).to.equal(jobUUID)
329 }
330
331 {
332 const { total, data } = await server.runnerJobs.list({ search: 'toto' })
333
334 expect(data).to.have.lengthOf(0)
335 expect(total).to.equal(0)
336 }
337
338 {
339 const { total, data } = await server.runnerJobs.list({ search: 'hls' })
340
341 expect(data).to.not.have.lengthOf(0)
342 expect(total).to.not.equal(0)
343
344 for (const job of data) {
345 expect(job.type).to.include('hls')
346 }
347 }
348 })
349
350 it('Should filter jobs', async function () {
351 {
352 const { total, data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.WAITING_FOR_PARENT_JOB ] })
353
354 expect(data).to.not.have.lengthOf(0)
355 expect(total).to.not.equal(0)
356
357 for (const job of data) {
358 expect(job.state.label).to.equal('Waiting for parent job to finish')
359 }
360 }
361
362 {
363 const { total, data } = await server.runnerJobs.list({ stateOneOf: [ RunnerJobState.COMPLETED ] })
364
365 expect(data).to.have.lengthOf(0)
366 expect(total).to.equal(0)
367 }
368 })
369 })
370
371 describe('Accept/update/abort/process a job', function () {
372
373 it('Should request available jobs', async function () {
374 lastRunnerContact = new Date()
375
376 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
377
378 // Only optimize jobs are available
379 expect(availableJobs).to.have.lengthOf(2)
380
381 for (const job of availableJobs) {
382 expect(job.uuid).to.exist
383 expect(job.payload.input).to.exist
384 expect((job.payload as RunnerJobVODWebVideoTranscodingPayload).output).to.exist
385
386 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
387 }
388
389 const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
390 const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')
391
392 expect(hlsJobs).to.have.lengthOf(0)
393 expect(webVideoJobs).to.have.lengthOf(2)
394
395 jobUUID = webVideoJobs[0].uuid
396 })
397
398 it('Should have sorted available jobs by priority', async function () {
399 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
400
401 expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
402 })
403
404 it('Should have last runner contact updated', async function () {
405 await wait(1000)
406
407 const { data } = await server.runners.list({ sort: 'createdAt' })
408 expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
409 })
410
411 it('Should accept a job', async function () {
412 const startedAt = new Date()
413
414 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
415 jobToken = job.jobToken
416
417 const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
418 expect(job.uuid).to.equal(jobUUID)
419
420 expect(job.type).to.equal('vod-web-video-transcoding')
421 expect(job.state.label).to.equal('Processing')
422 expect(job.state.id).to.equal(RunnerJobState.PROCESSING)
423
424 expect(job.runner).to.exist
425 expect(job.runner.name).to.equal('runner 1')
426 expect(job.runner.description).to.equal('my super runner 1')
427
428 expect(job.progress).to.be.null
429
430 expect(job.startedAt).to.exist
431 expect(new Date(job.startedAt)).to.be.above(startedAt)
432
433 expect(job.finishedAt).to.not.exist
434
435 expect(job.failures).to.equal(0)
436
437 expect(job.payload).to.exist
438
439 if (fromAccept) {
440 expect(job.jobToken).to.exist
441 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
442 } else {
443 expect(job.jobToken).to.not.exist
444 expect((job as RunnerJobAdmin).privatePayload).to.exist
445 }
446 }
447
448 checkProcessingJob(job, true)
449
450 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
451
452 const processingJob = data.find(j => j.uuid === jobUUID)
453 checkProcessingJob(processingJob, false)
454
455 await checkMainJobState(RunnerJobState.PROCESSING)
456 })
457
458 it('Should update a job', async function () {
459 await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })
460
461 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
462
463 for (const job of data) {
464 if (job.state.id === RunnerJobState.PROCESSING) {
465 expect(job.progress).to.equal(53)
466 } else {
467 expect(job.progress).to.be.null
468 }
469 }
470 })
471
472 it('Should abort a job', async function () {
473 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })
474
475 await checkMainJobState(RunnerJobState.PENDING)
476
477 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
478 for (const job of data) {
479 expect(job.progress).to.be.null
480 }
481 })
482
483 it('Should accept the same job again and post a success', async function () {
484 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
485 expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist
486
487 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
488 jobToken = job.jobToken
489
490 await checkMainJobState(RunnerJobState.PROCESSING)
491
492 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
493
494 for (const job of data) {
495 expect(job.progress).to.be.null
496 }
497
498 const payload = {
499 videoFile: 'video_short.mp4'
500 }
501
502 await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
503 })
504
505 it('Should not have available jobs anymore', async function () {
506 await checkMainJobState(RunnerJobState.COMPLETED)
507
508 const job = await getMainJob()
509 expect(job.finishedAt).to.exist
510
511 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
512 expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
513 })
514 })
515
516 describe('Error job', function () {
517
518 it('Should accept another job and post an error', async function () {
519 await server.runnerJobs.cancelAllJobs()
520 await server.videos.quickUpload({ name: 'video' })
521 await waitJobs([ server ])
522
523 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
524 jobUUID = availableJobs[0].uuid
525
526 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
527 jobToken = job.jobToken
528
529 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
530 })
531
532 it('Should have job failures increased', async function () {
533 const job = await getMainJob()
534 expect(job.state.id).to.equal(RunnerJobState.PENDING)
535 expect(job.failures).to.equal(1)
536 expect(job.error).to.be.null
537 expect(job.progress).to.be.null
538 expect(job.finishedAt).to.not.exist
539 })
540
541 it('Should error a job when job attempts is too big', async function () {
542 for (let i = 0; i < 4; i++) {
543 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
544 jobToken = job.jobToken
545
546 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
547 }
548
549 const job = await getMainJob()
550 expect(job.failures).to.equal(5)
551 expect(job.state.id).to.equal(RunnerJobState.ERRORED)
552 expect(job.state.label).to.equal('Errored')
553 expect(job.error).to.equal('Error 3')
554 expect(job.progress).to.be.null
555 expect(job.finishedAt).to.exist
556
557 failedJob = job
558 })
559
560 it('Should have failed children jobs too', async function () {
561 const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })
562
563 const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
564 expect(children).to.have.lengthOf(9)
565
566 for (const child of children) {
567 expect(child.parent.uuid).to.equal(failedJob.uuid)
568 expect(child.parent.type).to.equal(failedJob.type)
569 expect(child.parent.state.id).to.equal(failedJob.state.id)
570 expect(child.parent.state.label).to.equal(failedJob.state.label)
571
572 expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
573 expect(child.state.label).to.equal('Parent job failed')
574 }
575 })
576 })
577
578 describe('Cancel', function () {
579
580 it('Should cancel a pending job', async function () {
581 await server.videos.quickUpload({ name: 'video' })
582 await waitJobs([ server ])
583
584 {
585 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
586
587 const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
588 jobUUID = pendingJob.uuid
589
590 await server.runnerJobs.cancelByAdmin({ jobUUID })
591 }
592
593 {
594 const job = await getMainJob()
595 expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
596 expect(job.state.label).to.equal('Cancelled')
597 }
598
599 {
600 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
601 const children = data.filter(j => j.parent?.uuid === jobUUID)
602 expect(children).to.have.lengthOf(9)
603
604 for (const child of children) {
605 expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
606 }
607 }
608 })
609
610 it('Should cancel an already accepted job and skip success/error', async function () {
611 await server.videos.quickUpload({ name: 'video' })
612 await waitJobs([ server ])
613
614 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
615 jobUUID = availableJobs[0].uuid
616
617 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
618 jobToken = job.jobToken
619
620 await server.runnerJobs.cancelByAdmin({ jobUUID })
621
622 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
623 })
624 })
625
626 describe('Remove', function () {
627
628 it('Should remove a pending job', async function () {
629 await server.videos.quickUpload({ name: 'video' })
630 await waitJobs([ server ])
631
632 {
633 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
634
635 const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
636 jobUUID = pendingJob.uuid
637
638 await server.runnerJobs.deleteByAdmin({ jobUUID })
639 }
640
641 {
642 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
643
644 const parent = data.find(j => j.uuid === jobUUID)
645 expect(parent).to.not.exist
646
647 const children = data.filter(j => j.parent?.uuid === jobUUID)
648 expect(children).to.have.lengthOf(0)
649 }
650 })
651 })
652
653 describe('Stalled jobs', function () {
654
655 it('Should abort stalled jobs', async function () {
656 this.timeout(60000)
657
658 await server.videos.quickUpload({ name: 'video' })
659 await server.videos.quickUpload({ name: 'video' })
660 await waitJobs([ server ])
661
662 const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
663 const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })
664
665 for (let i = 0; i < 6; i++) {
666 await wait(2000)
667
668 await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
669 }
670
671 const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
672 const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })
673
674 expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
675 expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
676 })
677 })
678
679 describe('Rate limit', function () {
680
681 before(async function () {
682 this.timeout(60000)
683
684 await server.kill()
685
686 await server.run({
687 rates_limit: {
688 api: {
689 max: 10
690 }
691 }
692 })
693 })
694
695 it('Should rate limit an unknown runner, but not a registered one', async function () {
696 this.timeout(60000)
697
698 await server.videos.quickUpload({ name: 'video' })
699 await waitJobs([ server ])
700
701 const { job } = await server.runnerJobs.autoAccept({ runnerToken })
702
703 for (let i = 0; i < 20; i++) {
704 try {
705 await server.runnerJobs.request({ runnerToken })
706 await server.runnerJobs.update({ runnerToken, jobToken: job.jobToken, jobUUID: job.uuid })
707 } catch {}
708 }
709
710 // Invalid
711 {
712 await server.runnerJobs.request({ runnerToken: 'toto', expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
713 await server.runnerJobs.update({
714 runnerToken: 'toto',
715 jobToken: job.jobToken,
716 jobUUID: job.uuid,
717 expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429
718 })
719 }
720
721 // Not provided
722 {
723 await server.runnerJobs.request({ runnerToken: undefined, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
724 await server.runnerJobs.update({
725 runnerToken: undefined,
726 jobToken: job.jobToken,
727 jobUUID: job.uuid,
728 expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429
729 })
730 }
731
732 // Registered
733 {
734 await server.runnerJobs.request({ runnerToken })
735 await server.runnerJobs.update({ runnerToken, jobToken: job.jobToken, jobUUID: job.uuid })
736 }
737 })
738 })
739 })
740
741 after(async function () {
742 await cleanupTests([ server ])
743 })
744})
diff --git a/packages/tests/src/api/runners/runner-live-transcoding.ts b/packages/tests/src/api/runners/runner-live-transcoding.ts
new file mode 100644
index 000000000..20c1e5c2a
--- /dev/null
+++ b/packages/tests/src/api/runners/runner-live-transcoding.ts
@@ -0,0 +1,332 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import { expect } from 'chai'
4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { readFile } from 'fs/promises'
6import { wait } from '@peertube/peertube-core-utils'
7import {
8 HttpStatusCode,
9 LiveRTMPHLSTranscodingUpdatePayload,
10 LiveVideo,
11 LiveVideoError,
12 LiveVideoErrorType,
13 RunnerJob,
14 RunnerJobLiveRTMPHLSTranscodingPayload,
15 Video,
16 VideoPrivacy,
17 VideoState
18} from '@peertube/peertube-models'
19import { buildAbsoluteFixturePath } from '@peertube/peertube-node-utils'
20import {
21 cleanupTests,
22 createSingleServer,
23 makeRawRequest,
24 PeerTubeServer,
25 sendRTMPStream,
26 setAccessTokensToServers,
27 setDefaultVideoChannel,
28 stopFfmpeg,
29 testFfmpegStreamError,
30 waitJobs
31} from '@peertube/peertube-server-commands'
32
33describe('Test runner live transcoding', function () {
34 let server: PeerTubeServer
35 let runnerToken: string
36 let baseUrl: string
37
38 before(async function () {
39 this.timeout(120_000)
40
41 server = await createSingleServer(1)
42
43 await setAccessTokensToServers([ server ])
44 await setDefaultVideoChannel([ server ])
45
46 await server.config.enableRemoteTranscoding()
47 await server.config.enableTranscoding()
48 runnerToken = await server.runners.autoRegisterRunner()
49
50 baseUrl = server.url + '/static/streaming-playlists/hls'
51 })
52
53 describe('Without transcoding enabled', function () {
54
55 before(async function () {
56 await server.config.enableLive({
57 allowReplay: false,
58 resolutions: 'min',
59 transcoding: false
60 })
61 })
62
63 it('Should not have available jobs', async function () {
64 this.timeout(120000)
65
66 const { live, video } = await server.live.quickCreate({ permanentLive: true, saveReplay: false, privacy: VideoPrivacy.PUBLIC })
67
68 const ffmpegCommand = sendRTMPStream({ rtmpBaseUrl: live.rtmpUrl, streamKey: live.streamKey })
69 await server.live.waitUntilPublished({ videoId: video.id })
70
71 await waitJobs([ server ])
72
73 const { availableJobs } = await server.runnerJobs.requestLive({ runnerToken })
74 expect(availableJobs).to.have.lengthOf(0)
75
76 await stopFfmpeg(ffmpegCommand)
77 })
78 })
79
80 describe('With transcoding enabled on classic live', function () {
81 let live: LiveVideo
82 let video: Video
83 let ffmpegCommand: FfmpegCommand
84 let jobUUID: string
85 let acceptedJob: RunnerJob & { jobToken: string }
86
87 async function testPlaylistFile (fixture: string, expected: string) {
88 const text = await server.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/${fixture}` })
89 expect(await readFile(buildAbsoluteFixturePath(expected), 'utf-8')).to.equal(text)
90
91 }
92
93 async function testTSFile (fixture: string, expected: string) {
94 const { body } = await makeRawRequest({ url: `${baseUrl}/${video.uuid}/${fixture}`, expectedStatus: HttpStatusCode.OK_200 })
95 expect(await readFile(buildAbsoluteFixturePath(expected))).to.deep.equal(body)
96 }
97
98 before(async function () {
99 await server.config.enableLive({
100 allowReplay: true,
101 resolutions: 'max',
102 transcoding: true
103 })
104 })
105
106 it('Should publish a a live and have available jobs', async function () {
107 this.timeout(120000)
108
109 const data = await server.live.quickCreate({ permanentLive: false, saveReplay: false, privacy: VideoPrivacy.PUBLIC })
110 live = data.live
111 video = data.video
112
113 ffmpegCommand = sendRTMPStream({ rtmpBaseUrl: live.rtmpUrl, streamKey: live.streamKey })
114 await waitJobs([ server ])
115
116 const job = await server.runnerJobs.requestLiveJob(runnerToken)
117 jobUUID = job.uuid
118
119 expect(job.type).to.equal('live-rtmp-hls-transcoding')
120 expect(job.payload.input.rtmpUrl).to.exist
121
122 expect(job.payload.output.toTranscode).to.have.lengthOf(5)
123
124 for (const { resolution, fps } of job.payload.output.toTranscode) {
125 expect([ 720, 480, 360, 240, 144 ]).to.contain(resolution)
126
127 expect(fps).to.be.above(25)
128 expect(fps).to.be.below(70)
129 }
130 })
131
132 it('Should update the live with a new chunk', async function () {
133 this.timeout(120000)
134
135 const { job } = await server.runnerJobs.accept<RunnerJobLiveRTMPHLSTranscodingPayload>({ jobUUID, runnerToken })
136 acceptedJob = job
137
138 {
139 const payload: LiveRTMPHLSTranscodingUpdatePayload = {
140 masterPlaylistFile: 'live/master.m3u8',
141 resolutionPlaylistFile: 'live/0.m3u8',
142 resolutionPlaylistFilename: '0.m3u8',
143 type: 'add-chunk',
144 videoChunkFile: 'live/0-000067.ts',
145 videoChunkFilename: '0-000067.ts'
146 }
147 await server.runnerJobs.update({ jobUUID, runnerToken, jobToken: job.jobToken, payload, progress: 50 })
148
149 const updatedJob = await server.runnerJobs.getJob({ uuid: job.uuid })
150 expect(updatedJob.progress).to.equal(50)
151 }
152
153 {
154 const payload: LiveRTMPHLSTranscodingUpdatePayload = {
155 resolutionPlaylistFile: 'live/1.m3u8',
156 resolutionPlaylistFilename: '1.m3u8',
157 type: 'add-chunk',
158 videoChunkFile: 'live/1-000068.ts',
159 videoChunkFilename: '1-000068.ts'
160 }
161 await server.runnerJobs.update({ jobUUID, runnerToken, jobToken: job.jobToken, payload })
162 }
163
164 await wait(1000)
165
166 await testPlaylistFile('master.m3u8', 'live/master.m3u8')
167 await testPlaylistFile('0.m3u8', 'live/0.m3u8')
168 await testPlaylistFile('1.m3u8', 'live/1.m3u8')
169
170 await testTSFile('0-000067.ts', 'live/0-000067.ts')
171 await testTSFile('1-000068.ts', 'live/1-000068.ts')
172 })
173
174 it('Should replace existing m3u8 on update', async function () {
175 this.timeout(120000)
176
177 const payload: LiveRTMPHLSTranscodingUpdatePayload = {
178 masterPlaylistFile: 'live/1.m3u8',
179 resolutionPlaylistFilename: '0.m3u8',
180 resolutionPlaylistFile: 'live/1.m3u8',
181 type: 'add-chunk',
182 videoChunkFile: 'live/1-000069.ts',
183 videoChunkFilename: '1-000068.ts'
184 }
185 await server.runnerJobs.update({ jobUUID, runnerToken, jobToken: acceptedJob.jobToken, payload })
186 await wait(1000)
187
188 await testPlaylistFile('master.m3u8', 'live/1.m3u8')
189 await testPlaylistFile('0.m3u8', 'live/1.m3u8')
190 await testTSFile('1-000068.ts', 'live/1-000069.ts')
191 })
192
193 it('Should update the live with removed chunks', async function () {
194 this.timeout(120000)
195
196 const payload: LiveRTMPHLSTranscodingUpdatePayload = {
197 resolutionPlaylistFile: 'live/0.m3u8',
198 resolutionPlaylistFilename: '0.m3u8',
199 type: 'remove-chunk',
200 videoChunkFilename: '1-000068.ts'
201 }
202 await server.runnerJobs.update({ jobUUID, runnerToken, jobToken: acceptedJob.jobToken, payload })
203
204 await wait(1000)
205
206 await server.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/master.m3u8` })
207 await server.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/0.m3u8` })
208 await server.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/1.m3u8` })
209 await makeRawRequest({ url: `${baseUrl}/${video.uuid}/0-000067.ts`, expectedStatus: HttpStatusCode.OK_200 })
210 await makeRawRequest({ url: `${baseUrl}/${video.uuid}/1-000068.ts`, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
211 })
212
213 it('Should complete the live and save the replay', async function () {
214 this.timeout(120000)
215
216 for (const segment of [ '0-000069.ts', '0-000070.ts' ]) {
217 const payload: LiveRTMPHLSTranscodingUpdatePayload = {
218 masterPlaylistFile: 'live/master.m3u8',
219 resolutionPlaylistFilename: '0.m3u8',
220 resolutionPlaylistFile: 'live/0.m3u8',
221 type: 'add-chunk',
222 videoChunkFile: 'live/' + segment,
223 videoChunkFilename: segment
224 }
225 await server.runnerJobs.update({ jobUUID, runnerToken, jobToken: acceptedJob.jobToken, payload })
226
227 await wait(1000)
228 }
229
230 await waitJobs([ server ])
231
232 {
233 const { state } = await server.videos.get({ id: video.uuid })
234 expect(state.id).to.equal(VideoState.PUBLISHED)
235 }
236
237 await stopFfmpeg(ffmpegCommand)
238
239 await server.runnerJobs.success({ jobUUID, runnerToken, jobToken: acceptedJob.jobToken, payload: {} })
240
241 await wait(1500)
242 await waitJobs([ server ])
243
244 {
245 const { state } = await server.videos.get({ id: video.uuid })
246 expect(state.id).to.equal(VideoState.LIVE_ENDED)
247
248 const session = await server.live.findLatestSession({ videoId: video.uuid })
249 expect(session.error).to.be.null
250 }
251 })
252 })
253
254 describe('With transcoding enabled on cancelled/aborted/errored live', function () {
255 let live: LiveVideo
256 let video: Video
257 let ffmpegCommand: FfmpegCommand
258
259 async function prepare () {
260 ffmpegCommand = sendRTMPStream({ rtmpBaseUrl: live.rtmpUrl, streamKey: live.streamKey })
261 await server.runnerJobs.requestLiveJob(runnerToken)
262
263 const { job } = await server.runnerJobs.autoAccept({ runnerToken, type: 'live-rtmp-hls-transcoding' })
264
265 return job
266 }
267
268 async function checkSessionError (error: LiveVideoErrorType) {
269 await wait(1500)
270 await waitJobs([ server ])
271
272 const session = await server.live.findLatestSession({ videoId: video.uuid })
273 expect(session.error).to.equal(error)
274 }
275
276 before(async function () {
277 await server.config.enableLive({
278 allowReplay: true,
279 resolutions: 'max',
280 transcoding: true
281 })
282
283 const data = await server.live.quickCreate({ permanentLive: true, saveReplay: false, privacy: VideoPrivacy.PUBLIC })
284 live = data.live
285 video = data.video
286 })
287
288 it('Should abort a running live', async function () {
289 this.timeout(120000)
290
291 const job = await prepare()
292
293 await Promise.all([
294 server.runnerJobs.abort({ jobUUID: job.uuid, runnerToken, jobToken: job.jobToken, reason: 'abort' }),
295 testFfmpegStreamError(ffmpegCommand, true)
296 ])
297
298 // Abort is not supported
299 await checkSessionError(LiveVideoError.RUNNER_JOB_ERROR)
300 })
301
302 it('Should cancel a running live', async function () {
303 this.timeout(120000)
304
305 const job = await prepare()
306
307 await Promise.all([
308 server.runnerJobs.cancelByAdmin({ jobUUID: job.uuid }),
309 testFfmpegStreamError(ffmpegCommand, true)
310 ])
311
312 await checkSessionError(LiveVideoError.RUNNER_JOB_CANCEL)
313 })
314
315 it('Should error a running live', async function () {
316 this.timeout(120000)
317
318 const job = await prepare()
319
320 await Promise.all([
321 server.runnerJobs.error({ jobUUID: job.uuid, runnerToken, jobToken: job.jobToken, message: 'error' }),
322 testFfmpegStreamError(ffmpegCommand, true)
323 ])
324
325 await checkSessionError(LiveVideoError.RUNNER_JOB_ERROR)
326 })
327 })
328
329 after(async function () {
330 await cleanupTests([ server ])
331 })
332})
diff --git a/packages/tests/src/api/runners/runner-socket.ts b/packages/tests/src/api/runners/runner-socket.ts
new file mode 100644
index 000000000..726ef084f
--- /dev/null
+++ b/packages/tests/src/api/runners/runner-socket.ts
@@ -0,0 +1,120 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import { expect } from 'chai'
4import { wait } from '@peertube/peertube-core-utils'
5import {
6 cleanupTests,
7 createSingleServer,
8 PeerTubeServer,
9 setAccessTokensToServers,
10 setDefaultVideoChannel,
11 waitJobs
12} from '@peertube/peertube-server-commands'
13
14describe('Test runner socket', function () {
15 let server: PeerTubeServer
16 let runnerToken: string
17
18 before(async function () {
19 this.timeout(120_000)
20
21 server = await createSingleServer(1)
22
23 await setAccessTokensToServers([ server ])
24 await setDefaultVideoChannel([ server ])
25
26 await server.config.enableTranscoding({ hls: true, webVideo: true })
27 await server.config.enableRemoteTranscoding()
28 runnerToken = await server.runners.autoRegisterRunner()
29 })
30
31 it('Should throw an error without runner token', function (done) {
32 const localSocket = server.socketIO.getRunnersSocket({ runnerToken: null })
33 localSocket.on('connect_error', err => {
34 expect(err.message).to.contain('No runner token provided')
35 done()
36 })
37 })
38
39 it('Should throw an error with a bad runner token', function (done) {
40 const localSocket = server.socketIO.getRunnersSocket({ runnerToken: 'ergag' })
41 localSocket.on('connect_error', err => {
42 expect(err.message).to.contain('Invalid runner token')
43 done()
44 })
45 })
46
47 it('Should not send ping if there is no available jobs', async function () {
48 let pings = 0
49 const localSocket = server.socketIO.getRunnersSocket({ runnerToken })
50 localSocket.on('available-jobs', () => pings++)
51
52 expect(pings).to.equal(0)
53 })
54
55 it('Should send a ping on available job', async function () {
56 let pings = 0
57 const localSocket = server.socketIO.getRunnersSocket({ runnerToken })
58 localSocket.on('available-jobs', () => pings++)
59
60 await server.videos.quickUpload({ name: 'video1' })
61 await waitJobs([ server ])
62
63 // eslint-disable-next-line no-unmodified-loop-condition
64 while (pings !== 1) {
65 await wait(500)
66 }
67
68 await server.videos.quickUpload({ name: 'video2' })
69 await waitJobs([ server ])
70
71 // eslint-disable-next-line no-unmodified-loop-condition
72 while ((pings as number) !== 2) {
73 await wait(500)
74 }
75
76 await server.runnerJobs.cancelAllJobs()
77 })
78
79 it('Should send a ping when a child is ready', async function () {
80 let pings = 0
81 const localSocket = server.socketIO.getRunnersSocket({ runnerToken })
82 localSocket.on('available-jobs', () => pings++)
83
84 await server.videos.quickUpload({ name: 'video3' })
85 await waitJobs([ server ])
86
87 // eslint-disable-next-line no-unmodified-loop-condition
88 while (pings !== 1) {
89 await wait(500)
90 }
91
92 await server.runnerJobs.autoProcessWebVideoJob(runnerToken)
93 await waitJobs([ server ])
94
95 // eslint-disable-next-line no-unmodified-loop-condition
96 while ((pings as number) !== 2) {
97 await wait(500)
98 }
99 })
100
101 it('Should not send a ping if the ended job does not have a child', async function () {
102 let pings = 0
103 const localSocket = server.socketIO.getRunnersSocket({ runnerToken })
104 localSocket.on('available-jobs', () => pings++)
105
106 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
107 const job = availableJobs.find(j => j.type === 'vod-web-video-transcoding')
108 await server.runnerJobs.autoProcessWebVideoJob(runnerToken, job.uuid)
109
110 // Wait for debounce
111 await wait(1000)
112 await waitJobs([ server ])
113
114 expect(pings).to.equal(0)
115 })
116
117 after(async function () {
118 await cleanupTests([ server ])
119 })
120})
diff --git a/packages/tests/src/api/runners/runner-studio-transcoding.ts b/packages/tests/src/api/runners/runner-studio-transcoding.ts
new file mode 100644
index 000000000..adf6941c3
--- /dev/null
+++ b/packages/tests/src/api/runners/runner-studio-transcoding.ts
@@ -0,0 +1,169 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import { expect } from 'chai'
4import { readFile } from 'fs/promises'
5import { buildAbsoluteFixturePath } from '@peertube/peertube-node-utils'
6import {
7 RunnerJobStudioTranscodingPayload,
8 VideoStudioTranscodingSuccess,
9 VideoState,
10 VideoStudioTask,
11 VideoStudioTaskIntro
12} from '@peertube/peertube-models'
13import {
14 cleanupTests,
15 createMultipleServers,
16 doubleFollow,
17 PeerTubeServer,
18 setAccessTokensToServers,
19 setDefaultVideoChannel,
20 VideoStudioCommand,
21 waitJobs
22} from '@peertube/peertube-server-commands'
23import { checkVideoDuration } from '@tests/shared/checks.js'
24import { checkPersistentTmpIsEmpty } from '@tests/shared/directories.js'
25
26describe('Test runner video studio transcoding', function () {
27 let servers: PeerTubeServer[] = []
28 let runnerToken: string
29 let videoUUID: string
30 let jobUUID: string
31
32 async function renewStudio (tasks: VideoStudioTask[] = VideoStudioCommand.getComplexTask()) {
33 const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
34 videoUUID = uuid
35
36 await waitJobs(servers)
37
38 await servers[0].videoStudio.createEditionTasks({ videoId: uuid, tasks })
39 await waitJobs(servers)
40
41 const { availableJobs } = await servers[0].runnerJobs.request({ runnerToken })
42 expect(availableJobs).to.have.lengthOf(1)
43
44 jobUUID = availableJobs[0].uuid
45 }
46
47 before(async function () {
48 this.timeout(120_000)
49
50 servers = await createMultipleServers(2)
51
52 await setAccessTokensToServers(servers)
53 await setDefaultVideoChannel(servers)
54
55 await doubleFollow(servers[0], servers[1])
56
57 await servers[0].config.enableTranscoding({ hls: true, webVideo: true })
58 await servers[0].config.enableStudio()
59 await servers[0].config.enableRemoteStudio()
60
61 runnerToken = await servers[0].runners.autoRegisterRunner()
62 })
63
64 it('Should error a studio transcoding job', async function () {
65 this.timeout(60000)
66
67 await renewStudio()
68
69 for (let i = 0; i < 5; i++) {
70 const { job } = await servers[0].runnerJobs.accept({ runnerToken, jobUUID })
71 const jobToken = job.jobToken
72
73 await servers[0].runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
74 }
75
76 const video = await servers[0].videos.get({ id: videoUUID })
77 expect(video.state.id).to.equal(VideoState.PUBLISHED)
78
79 await checkPersistentTmpIsEmpty(servers[0])
80 })
81
82 it('Should cancel a transcoding job', async function () {
83 this.timeout(60000)
84
85 await renewStudio()
86
87 await servers[0].runnerJobs.cancelByAdmin({ jobUUID })
88
89 const video = await servers[0].videos.get({ id: videoUUID })
90 expect(video.state.id).to.equal(VideoState.PUBLISHED)
91
92 await checkPersistentTmpIsEmpty(servers[0])
93 })
94
95 it('Should execute a remote studio job', async function () {
96 this.timeout(240_000)
97
98 const tasks = [
99 {
100 name: 'add-outro' as 'add-outro',
101 options: {
102 file: 'video_short.webm'
103 }
104 },
105 {
106 name: 'add-watermark' as 'add-watermark',
107 options: {
108 file: 'custom-thumbnail.png'
109 }
110 },
111 {
112 name: 'add-intro' as 'add-intro',
113 options: {
114 file: 'video_very_short_240p.mp4'
115 }
116 }
117 ]
118
119 await renewStudio(tasks)
120
121 for (const server of servers) {
122 await checkVideoDuration(server, videoUUID, 5)
123 }
124
125 const { job } = await servers[0].runnerJobs.accept<RunnerJobStudioTranscodingPayload>({ runnerToken, jobUUID })
126 const jobToken = job.jobToken
127
128 expect(job.type === 'video-studio-transcoding')
129 expect(job.payload.input.videoFileUrl).to.exist
130
131 // Check video input file
132 {
133 await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
134 }
135
136 // Check task files
137 for (let i = 0; i < tasks.length; i++) {
138 const task = tasks[i]
139 const payloadTask = job.payload.tasks[i]
140
141 expect(payloadTask.name).to.equal(task.name)
142
143 const inputFile = await readFile(buildAbsoluteFixturePath(task.options.file))
144
145 const { body } = await servers[0].runnerJobs.getJobFile({
146 url: (payloadTask as VideoStudioTaskIntro).options.file as string,
147 jobToken,
148 runnerToken
149 })
150
151 expect(body).to.deep.equal(inputFile)
152 }
153
154 const payload: VideoStudioTranscodingSuccess = { videoFile: 'video_very_short_240p.mp4' }
155 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
156
157 await waitJobs(servers)
158
159 for (const server of servers) {
160 await checkVideoDuration(server, videoUUID, 2)
161 }
162
163 await checkPersistentTmpIsEmpty(servers[0])
164 })
165
166 after(async function () {
167 await cleanupTests(servers)
168 })
169})
diff --git a/packages/tests/src/api/runners/runner-vod-transcoding.ts b/packages/tests/src/api/runners/runner-vod-transcoding.ts
new file mode 100644
index 000000000..fe1c8f0b2
--- /dev/null
+++ b/packages/tests/src/api/runners/runner-vod-transcoding.ts
@@ -0,0 +1,545 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import { expect } from 'chai'
4import { readFile } from 'fs/promises'
5import { completeCheckHlsPlaylist } from '@tests/shared/streaming-playlists.js'
6import { buildAbsoluteFixturePath } from '@peertube/peertube-node-utils'
7import {
8 HttpStatusCode,
9 RunnerJobSuccessPayload,
10 RunnerJobVODAudioMergeTranscodingPayload,
11 RunnerJobVODHLSTranscodingPayload,
12 RunnerJobVODPayload,
13 RunnerJobVODWebVideoTranscodingPayload,
14 VideoState,
15 VODAudioMergeTranscodingSuccess,
16 VODHLSTranscodingSuccess,
17 VODWebVideoTranscodingSuccess
18} from '@peertube/peertube-models'
19import {
20 cleanupTests,
21 createMultipleServers,
22 doubleFollow,
23 makeGetRequest,
24 makeRawRequest,
25 PeerTubeServer,
26 setAccessTokensToServers,
27 setDefaultVideoChannel,
28 waitJobs
29} from '@peertube/peertube-server-commands'
30
31async function processAllJobs (server: PeerTubeServer, runnerToken: string) {
32 do {
33 const { availableJobs } = await server.runnerJobs.requestVOD({ runnerToken })
34 if (availableJobs.length === 0) break
35
36 const { job } = await server.runnerJobs.accept<RunnerJobVODPayload>({ runnerToken, jobUUID: availableJobs[0].uuid })
37
38 const payload: RunnerJobSuccessPayload = {
39 videoFile: `video_short_${job.payload.output.resolution}p.mp4`,
40 resolutionPlaylistFile: `video_short_${job.payload.output.resolution}p.m3u8`
41 }
42 await server.runnerJobs.success({ runnerToken, jobUUID: job.uuid, jobToken: job.jobToken, payload })
43 } while (true)
44
45 await waitJobs([ server ])
46}
47
48describe('Test runner VOD transcoding', function () {
49 let servers: PeerTubeServer[] = []
50 let runnerToken: string
51
52 before(async function () {
53 this.timeout(120_000)
54
55 servers = await createMultipleServers(2)
56
57 await setAccessTokensToServers(servers)
58 await setDefaultVideoChannel(servers)
59
60 await doubleFollow(servers[0], servers[1])
61
62 await servers[0].config.enableRemoteTranscoding()
63 runnerToken = await servers[0].runners.autoRegisterRunner()
64 })
65
66 describe('Without transcoding', function () {
67
68 before(async function () {
69 this.timeout(60000)
70
71 await servers[0].config.disableTranscoding()
72 await servers[0].videos.quickUpload({ name: 'video' })
73
74 await waitJobs(servers)
75 })
76
77 it('Should not have available jobs', async function () {
78 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
79 expect(availableJobs).to.have.lengthOf(0)
80 })
81 })
82
83 describe('With classic transcoding enabled', function () {
84
85 before(async function () {
86 this.timeout(60000)
87
88 await servers[0].config.enableTranscoding({ hls: true, webVideo: true })
89 })
90
91 it('Should error a transcoding job', async function () {
92 this.timeout(60000)
93
94 await servers[0].runnerJobs.cancelAllJobs()
95 const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
96 await waitJobs(servers)
97
98 const { availableJobs } = await servers[0].runnerJobs.request({ runnerToken })
99 const jobUUID = availableJobs[0].uuid
100
101 for (let i = 0; i < 5; i++) {
102 const { job } = await servers[0].runnerJobs.accept({ runnerToken, jobUUID })
103 const jobToken = job.jobToken
104
105 await servers[0].runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
106 }
107
108 const video = await servers[0].videos.get({ id: uuid })
109 expect(video.state.id).to.equal(VideoState.TRANSCODING_FAILED)
110 })
111
112 it('Should cancel a transcoding job', async function () {
113 await servers[0].runnerJobs.cancelAllJobs()
114 const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
115 await waitJobs(servers)
116
117 const { availableJobs } = await servers[0].runnerJobs.request({ runnerToken })
118 const jobUUID = availableJobs[0].uuid
119
120 await servers[0].runnerJobs.cancelByAdmin({ jobUUID })
121
122 const video = await servers[0].videos.get({ id: uuid })
123 expect(video.state.id).to.equal(VideoState.PUBLISHED)
124 })
125 })
126
127 describe('Web video transcoding only', function () {
128 let videoUUID: string
129 let jobToken: string
130 let jobUUID: string
131
132 before(async function () {
133 this.timeout(60000)
134
135 await servers[0].runnerJobs.cancelAllJobs()
136 await servers[0].config.enableTranscoding({ hls: false, webVideo: true })
137
138 const { uuid } = await servers[0].videos.quickUpload({ name: 'web video', fixture: 'video_short.webm' })
139 videoUUID = uuid
140
141 await waitJobs(servers)
142 })
143
144 it('Should have jobs available for remote runners', async function () {
145 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
146 expect(availableJobs).to.have.lengthOf(1)
147
148 jobUUID = availableJobs[0].uuid
149 })
150
151 it('Should have a valid first transcoding job', async function () {
152 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODWebVideoTranscodingPayload>({ runnerToken, jobUUID })
153 jobToken = job.jobToken
154
155 expect(job.type === 'vod-web-video-transcoding')
156 expect(job.payload.input.videoFileUrl).to.exist
157 expect(job.payload.output.resolution).to.equal(720)
158 expect(job.payload.output.fps).to.equal(25)
159
160 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
161 const inputFile = await readFile(buildAbsoluteFixturePath('video_short.webm'))
162
163 expect(body).to.deep.equal(inputFile)
164 })
165
166 it('Should transcode the max video resolution and send it back to the server', async function () {
167 this.timeout(60000)
168
169 const payload: VODWebVideoTranscodingSuccess = {
170 videoFile: 'video_short.mp4'
171 }
172 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
173
174 await waitJobs(servers)
175 })
176
177 it('Should have the video updated', async function () {
178 for (const server of servers) {
179 const video = await server.videos.get({ id: videoUUID })
180 expect(video.files).to.have.lengthOf(1)
181 expect(video.streamingPlaylists).to.have.lengthOf(0)
182
183 const { body } = await makeRawRequest({ url: video.files[0].fileUrl, expectedStatus: HttpStatusCode.OK_200 })
184 expect(body).to.deep.equal(await readFile(buildAbsoluteFixturePath('video_short.mp4')))
185 }
186 })
187
188 it('Should have 4 lower resolution to transcode', async function () {
189 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
190 expect(availableJobs).to.have.lengthOf(4)
191
192 for (const resolution of [ 480, 360, 240, 144 ]) {
193 const job = availableJobs.find(j => j.payload.output.resolution === resolution)
194 expect(job).to.exist
195 expect(job.type).to.equal('vod-web-video-transcoding')
196
197 if (resolution === 240) jobUUID = job.uuid
198 }
199 })
200
201 it('Should process one of these transcoding jobs', async function () {
202 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODWebVideoTranscodingPayload>({ runnerToken, jobUUID })
203 jobToken = job.jobToken
204
205 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
206 const inputFile = await readFile(buildAbsoluteFixturePath('video_short.mp4'))
207
208 expect(body).to.deep.equal(inputFile)
209
210 const payload: VODWebVideoTranscodingSuccess = { videoFile: `video_short_${job.payload.output.resolution}p.mp4` }
211 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
212 })
213
214 it('Should process all other jobs', async function () {
215 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
216 expect(availableJobs).to.have.lengthOf(3)
217
218 for (const resolution of [ 480, 360, 144 ]) {
219 const availableJob = availableJobs.find(j => j.payload.output.resolution === resolution)
220 expect(availableJob).to.exist
221 jobUUID = availableJob.uuid
222
223 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODWebVideoTranscodingPayload>({ runnerToken, jobUUID })
224 jobToken = job.jobToken
225
226 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
227 const inputFile = await readFile(buildAbsoluteFixturePath('video_short.mp4'))
228 expect(body).to.deep.equal(inputFile)
229
230 const payload: VODWebVideoTranscodingSuccess = { videoFile: `video_short_${resolution}p.mp4` }
231 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
232 }
233
234 await waitJobs(servers)
235 })
236
237 it('Should have the video updated', async function () {
238 for (const server of servers) {
239 const video = await server.videos.get({ id: videoUUID })
240 expect(video.files).to.have.lengthOf(5)
241 expect(video.streamingPlaylists).to.have.lengthOf(0)
242
243 const { body } = await makeRawRequest({ url: video.files[0].fileUrl, expectedStatus: HttpStatusCode.OK_200 })
244 expect(body).to.deep.equal(await readFile(buildAbsoluteFixturePath('video_short.mp4')))
245
246 for (const file of video.files) {
247 await makeRawRequest({ url: file.fileUrl, expectedStatus: HttpStatusCode.OK_200 })
248 await makeRawRequest({ url: file.torrentUrl, expectedStatus: HttpStatusCode.OK_200 })
249 }
250 }
251 })
252
253 it('Should not have available jobs anymore', async function () {
254 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
255 expect(availableJobs).to.have.lengthOf(0)
256 })
257 })
258
259 describe('HLS transcoding only', function () {
260 let videoUUID: string
261 let jobToken: string
262 let jobUUID: string
263
264 before(async function () {
265 this.timeout(60000)
266
267 await servers[0].config.enableTranscoding({ hls: true, webVideo: false })
268
269 const { uuid } = await servers[0].videos.quickUpload({ name: 'hls video', fixture: 'video_short.webm' })
270 videoUUID = uuid
271
272 await waitJobs(servers)
273 })
274
275 it('Should run the optimize job', async function () {
276 this.timeout(60000)
277
278 await servers[0].runnerJobs.autoProcessWebVideoJob(runnerToken)
279 })
280
281 it('Should have 5 HLS resolution to transcode', async function () {
282 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
283 expect(availableJobs).to.have.lengthOf(5)
284
285 for (const resolution of [ 720, 480, 360, 240, 144 ]) {
286 const job = availableJobs.find(j => j.payload.output.resolution === resolution)
287 expect(job).to.exist
288 expect(job.type).to.equal('vod-hls-transcoding')
289
290 if (resolution === 480) jobUUID = job.uuid
291 }
292 })
293
294 it('Should process one of these transcoding jobs', async function () {
295 this.timeout(60000)
296
297 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODHLSTranscodingPayload>({ runnerToken, jobUUID })
298 jobToken = job.jobToken
299
300 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
301 const inputFile = await readFile(buildAbsoluteFixturePath('video_short.mp4'))
302
303 expect(body).to.deep.equal(inputFile)
304
305 const payload: VODHLSTranscodingSuccess = {
306 videoFile: 'video_short_480p.mp4',
307 resolutionPlaylistFile: 'video_short_480p.m3u8'
308 }
309 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
310
311 await waitJobs(servers)
312 })
313
314 it('Should have the video updated', async function () {
315 for (const server of servers) {
316 const video = await server.videos.get({ id: videoUUID })
317
318 expect(video.files).to.have.lengthOf(1)
319 expect(video.streamingPlaylists).to.have.lengthOf(1)
320
321 const hls = video.streamingPlaylists[0]
322 expect(hls.files).to.have.lengthOf(1)
323
324 await completeCheckHlsPlaylist({ videoUUID, hlsOnly: false, servers, resolutions: [ 480 ] })
325 }
326 })
327
328 it('Should process all other jobs', async function () {
329 this.timeout(60000)
330
331 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
332 expect(availableJobs).to.have.lengthOf(4)
333
334 let maxQualityFile = 'video_short.mp4'
335
336 for (const resolution of [ 720, 360, 240, 144 ]) {
337 const availableJob = availableJobs.find(j => j.payload.output.resolution === resolution)
338 expect(availableJob).to.exist
339 jobUUID = availableJob.uuid
340
341 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODHLSTranscodingPayload>({ runnerToken, jobUUID })
342 jobToken = job.jobToken
343
344 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
345 const inputFile = await readFile(buildAbsoluteFixturePath(maxQualityFile))
346 expect(body).to.deep.equal(inputFile)
347
348 const payload: VODHLSTranscodingSuccess = {
349 videoFile: `video_short_${resolution}p.mp4`,
350 resolutionPlaylistFile: `video_short_${resolution}p.m3u8`
351 }
352 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
353
354 if (resolution === 720) {
355 maxQualityFile = 'video_short_720p.mp4'
356 }
357 }
358
359 await waitJobs(servers)
360 })
361
362 it('Should have the video updated', async function () {
363 for (const server of servers) {
364 const video = await server.videos.get({ id: videoUUID })
365
366 expect(video.files).to.have.lengthOf(0)
367 expect(video.streamingPlaylists).to.have.lengthOf(1)
368
369 const hls = video.streamingPlaylists[0]
370 expect(hls.files).to.have.lengthOf(5)
371
372 await completeCheckHlsPlaylist({ videoUUID, hlsOnly: true, servers, resolutions: [ 720, 480, 360, 240, 144 ] })
373 }
374 })
375
376 it('Should not have available jobs anymore', async function () {
377 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
378 expect(availableJobs).to.have.lengthOf(0)
379 })
380 })
381
382 describe('Web video and HLS transcoding', function () {
383
384 before(async function () {
385 this.timeout(60000)
386
387 await servers[0].config.enableTranscoding({ hls: true, webVideo: true })
388
389 await servers[0].videos.quickUpload({ name: 'web video and hls video', fixture: 'video_short.webm' })
390
391 await waitJobs(servers)
392 })
393
394 it('Should process the first optimize job', async function () {
395 this.timeout(60000)
396
397 await servers[0].runnerJobs.autoProcessWebVideoJob(runnerToken)
398 })
399
400 it('Should have 9 jobs to process', async function () {
401 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
402
403 expect(availableJobs).to.have.lengthOf(9)
404
405 const webVideoJobs = availableJobs.filter(j => j.type === 'vod-web-video-transcoding')
406 const hlsJobs = availableJobs.filter(j => j.type === 'vod-hls-transcoding')
407
408 expect(webVideoJobs).to.have.lengthOf(4)
409 expect(hlsJobs).to.have.lengthOf(5)
410 })
411
412 it('Should process all available jobs', async function () {
413 await processAllJobs(servers[0], runnerToken)
414 })
415 })
416
417 describe('Audio merge transcoding', function () {
418 let videoUUID: string
419 let jobToken: string
420 let jobUUID: string
421
422 before(async function () {
423 this.timeout(60000)
424
425 await servers[0].config.enableTranscoding({ hls: true, webVideo: true })
426
427 const attributes = { name: 'audio_with_preview', previewfile: 'custom-preview.jpg', fixture: 'sample.ogg' }
428 const { uuid } = await servers[0].videos.upload({ attributes, mode: 'legacy' })
429 videoUUID = uuid
430
431 await waitJobs(servers)
432 })
433
434 it('Should have an audio merge transcoding job', async function () {
435 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
436 expect(availableJobs).to.have.lengthOf(1)
437
438 expect(availableJobs[0].type).to.equal('vod-audio-merge-transcoding')
439
440 jobUUID = availableJobs[0].uuid
441 })
442
443 it('Should have a valid remote audio merge transcoding job', async function () {
444 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODAudioMergeTranscodingPayload>({ runnerToken, jobUUID })
445 jobToken = job.jobToken
446
447 expect(job.type === 'vod-audio-merge-transcoding')
448 expect(job.payload.input.audioFileUrl).to.exist
449 expect(job.payload.input.previewFileUrl).to.exist
450 expect(job.payload.output.resolution).to.equal(480)
451
452 {
453 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.audioFileUrl, jobToken, runnerToken })
454 const inputFile = await readFile(buildAbsoluteFixturePath('sample.ogg'))
455 expect(body).to.deep.equal(inputFile)
456 }
457
458 {
459 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.previewFileUrl, jobToken, runnerToken })
460
461 const video = await servers[0].videos.get({ id: videoUUID })
462 const { body: inputFile } = await makeGetRequest({
463 url: servers[0].url,
464 path: video.previewPath,
465 expectedStatus: HttpStatusCode.OK_200
466 })
467
468 expect(body).to.deep.equal(inputFile)
469 }
470 })
471
472 it('Should merge the audio', async function () {
473 this.timeout(60000)
474
475 const payload: VODAudioMergeTranscodingSuccess = { videoFile: 'video_short_480p.mp4' }
476 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
477
478 await waitJobs(servers)
479 })
480
481 it('Should have the video updated', async function () {
482 for (const server of servers) {
483 const video = await server.videos.get({ id: videoUUID })
484 expect(video.files).to.have.lengthOf(1)
485 expect(video.streamingPlaylists).to.have.lengthOf(0)
486
487 const { body } = await makeRawRequest({ url: video.files[0].fileUrl, expectedStatus: HttpStatusCode.OK_200 })
488 expect(body).to.deep.equal(await readFile(buildAbsoluteFixturePath('video_short_480p.mp4')))
489 }
490 })
491
492 it('Should have 7 lower resolutions to transcode', async function () {
493 const { availableJobs } = await servers[0].runnerJobs.requestVOD({ runnerToken })
494 expect(availableJobs).to.have.lengthOf(7)
495
496 for (const resolution of [ 360, 240, 144 ]) {
497 const jobs = availableJobs.filter(j => j.payload.output.resolution === resolution)
498 expect(jobs).to.have.lengthOf(2)
499 }
500
501 jobUUID = availableJobs.find(j => j.payload.output.resolution === 480).uuid
502 })
503
504 it('Should process one other job', async function () {
505 this.timeout(60000)
506
507 const { job } = await servers[0].runnerJobs.accept<RunnerJobVODHLSTranscodingPayload>({ runnerToken, jobUUID })
508 jobToken = job.jobToken
509
510 const { body } = await servers[0].runnerJobs.getJobFile({ url: job.payload.input.videoFileUrl, jobToken, runnerToken })
511 const inputFile = await readFile(buildAbsoluteFixturePath('video_short_480p.mp4'))
512 expect(body).to.deep.equal(inputFile)
513
514 const payload: VODHLSTranscodingSuccess = {
515 videoFile: `video_short_480p.mp4`,
516 resolutionPlaylistFile: `video_short_480p.m3u8`
517 }
518 await servers[0].runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
519
520 await waitJobs(servers)
521 })
522
523 it('Should have the video updated', async function () {
524 for (const server of servers) {
525 const video = await server.videos.get({ id: videoUUID })
526
527 expect(video.files).to.have.lengthOf(1)
528 expect(video.streamingPlaylists).to.have.lengthOf(1)
529
530 const hls = video.streamingPlaylists[0]
531 expect(hls.files).to.have.lengthOf(1)
532
533 await completeCheckHlsPlaylist({ videoUUID, hlsOnly: false, servers, resolutions: [ 480 ] })
534 }
535 })
536
537 it('Should process all available jobs', async function () {
538 await processAllJobs(servers[0], runnerToken)
539 })
540 })
541
542 after(async function () {
543 await cleanupTests(servers)
544 })
545})