]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/tests/api/runners/runner-common.ts
Support studio transcoding in peertube runner
[github/Chocobozzz/PeerTube.git] / server / tests / api / runners / runner-common.ts
CommitLineData
d102de1b
C
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import { expect } from 'chai'
4import { wait } from '@shared/core-utils'
5e47f6ab
C
5import {
6 HttpStatusCode,
7 Runner,
8 RunnerJob,
9 RunnerJobAdmin,
10 RunnerJobState,
11 RunnerJobVODWebVideoTranscodingPayload,
12 RunnerRegistrationToken
13} from '@shared/models'
d102de1b
C
14import {
15 cleanupTests,
16 createSingleServer,
17 makePostBodyRequest,
18 PeerTubeServer,
19 setAccessTokensToServers,
20 setDefaultVideoChannel,
21 waitJobs
22} from '@shared/server-commands'
23
24describe('Test runner common actions', function () {
25 let server: PeerTubeServer
26 let registrationToken: string
27 let runnerToken: string
28 let jobMaxPriority: string
29
30 before(async function () {
31 this.timeout(120_000)
32
33 server = await createSingleServer(1, {
34 remote_runners: {
35 stalled_jobs: {
36 vod: '5 seconds'
37 }
38 }
39 })
40
41 await setAccessTokensToServers([ server ])
42 await setDefaultVideoChannel([ server ])
43
44 await server.config.enableTranscoding(true, true)
45 await server.config.enableRemoteTranscoding()
46 })
47
48 describe('Managing runner registration tokens', function () {
49 let base: RunnerRegistrationToken[]
50 let registrationTokenToDelete: RunnerRegistrationToken
51
52 it('Should have a default registration token', async function () {
53 const { total, data } = await server.runnerRegistrationTokens.list()
54
55 expect(total).to.equal(1)
56 expect(data).to.have.lengthOf(1)
57
58 const token = data[0]
59 expect(token.id).to.exist
60 expect(token.createdAt).to.exist
61 expect(token.updatedAt).to.exist
62 expect(token.registeredRunnersCount).to.equal(0)
63 expect(token.registrationToken).to.exist
64 })
65
66 it('Should create other registration tokens', async function () {
67 await server.runnerRegistrationTokens.generate()
68 await server.runnerRegistrationTokens.generate()
69
70 const { total, data } = await server.runnerRegistrationTokens.list()
71 expect(total).to.equal(3)
72 expect(data).to.have.lengthOf(3)
73 })
74
75 it('Should list registration tokens', async function () {
76 {
77 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
78 expect(total).to.equal(3)
79 expect(data).to.have.lengthOf(3)
80 expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
81 expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))
82
83 base = data
84
85 registrationTokenToDelete = data[0]
86 registrationToken = data[1].registrationToken
87 }
88
89 {
90 const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
91 expect(total).to.equal(3)
92 expect(data).to.have.lengthOf(1)
93 expect(data[0].registrationToken).to.equal(base[0].registrationToken)
94 }
95 })
96
97 it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
98 await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
99 await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })
100
101 const { data } = await server.runnerRegistrationTokens.list()
102
103 for (const d of data) {
104 if (d.registrationToken === registrationTokenToDelete.registrationToken) {
105 expect(d.registeredRunnersCount).to.equal(2)
106 } else {
107 expect(d.registeredRunnersCount).to.equal(0)
108 }
109 }
110
111 const { data: runners } = await server.runners.list()
112 expect(runners).to.have.lengthOf(2)
113 })
114
115 it('Should delete a registration token', async function () {
116 await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })
117
118 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
119 expect(total).to.equal(2)
120 expect(data).to.have.lengthOf(2)
121
122 for (const d of data) {
123 expect(d.registeredRunnersCount).to.equal(0)
124 expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
125 }
126 })
127
128 it('Should have removed runners of this registration token', async function () {
129 const { data: runners } = await server.runners.list()
130 expect(runners).to.have.lengthOf(0)
131 })
132 })
133
134 describe('Managing runners', function () {
135 let toDelete: Runner
136
137 it('Should not have runners available', async function () {
138 const { total, data } = await server.runners.list()
139
140 expect(data).to.have.lengthOf(0)
141 expect(total).to.equal(0)
142 })
143
144 it('Should register runners', async function () {
145 const now = new Date()
146
147 const result = await server.runners.register({
148 name: 'runner 1',
149 description: 'my super runner 1',
150 registrationToken
151 })
152 expect(result.runnerToken).to.exist
153 runnerToken = result.runnerToken
154
155 await server.runners.register({
156 name: 'runner 2',
157 registrationToken
158 })
159
160 const { total, data } = await server.runners.list({ sort: 'createdAt' })
161 expect(total).to.equal(2)
162 expect(data).to.have.lengthOf(2)
163
164 for (const d of data) {
165 expect(d.id).to.exist
166 expect(d.createdAt).to.exist
167 expect(d.updatedAt).to.exist
168 expect(new Date(d.createdAt)).to.be.above(now)
169 expect(new Date(d.updatedAt)).to.be.above(now)
170 expect(new Date(d.lastContact)).to.be.above(now)
171 expect(d.ip).to.exist
172 }
173
174 expect(data[0].name).to.equal('runner 1')
175 expect(data[0].description).to.equal('my super runner 1')
176
177 expect(data[1].name).to.equal('runner 2')
178 expect(data[1].description).to.be.null
179
180 toDelete = data[1]
181 })
182
183 it('Should list runners', async function () {
184 const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })
185
186 expect(total).to.equal(2)
187 expect(data).to.have.lengthOf(1)
188 expect(data[0].name).to.equal('runner 1')
189 })
190
191 it('Should delete a runner', async function () {
192 await server.runners.delete({ id: toDelete.id })
193
194 const { total, data } = await server.runners.list()
195
196 expect(total).to.equal(1)
197 expect(data).to.have.lengthOf(1)
198 expect(data[0].name).to.equal('runner 1')
199 })
200
201 it('Should unregister a runner', async function () {
202 const registered = await server.runners.autoRegisterRunner()
203
204 {
205 const { total, data } = await server.runners.list()
206 expect(total).to.equal(2)
207 expect(data).to.have.lengthOf(2)
208 }
209
210 await server.runners.unregister({ runnerToken: registered })
211
212 {
213 const { total, data } = await server.runners.list()
214 expect(total).to.equal(1)
215 expect(data).to.have.lengthOf(1)
216 expect(data[0].name).to.equal('runner 1')
217 }
218 })
219 })
220
221 describe('Managing runner jobs', function () {
222 let jobUUID: string
223 let jobToken: string
224 let lastRunnerContact: Date
225 let failedJob: RunnerJob
226
227 async function checkMainJobState (
228 mainJobState: RunnerJobState,
229 otherJobStates: RunnerJobState[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
230 ) {
231 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
232
233 for (const job of data) {
234 if (job.uuid === jobUUID) {
235 expect(job.state.id).to.equal(mainJobState)
236 } else {
237 expect(otherJobStates).to.include(job.state.id)
238 }
239 }
240 }
241
242 function getMainJob () {
243 return server.runnerJobs.getJob({ uuid: jobUUID })
244 }
245
246 describe('List jobs', function () {
247
248 it('Should not have jobs', async function () {
249 const { total, data } = await server.runnerJobs.list()
250
251 expect(data).to.have.lengthOf(0)
252 expect(total).to.equal(0)
253 })
254
255 it('Should upload a video and have available jobs', async function () {
256 await server.videos.quickUpload({ name: 'to transcode' })
257 await waitJobs([ server ])
258
259 const { total, data } = await server.runnerJobs.list()
260
261 expect(data).to.have.lengthOf(10)
262 expect(total).to.equal(10)
263
264 for (const job of data) {
265 expect(job.startedAt).to.not.exist
266 expect(job.finishedAt).to.not.exist
267 expect(job.payload).to.exist
268 expect(job.privatePayload).to.exist
269 }
270
271 const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
272 const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')
273
274 expect(hlsJobs).to.have.lengthOf(5)
275 expect(webVideoJobs).to.have.lengthOf(5)
276
277 const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
278 const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)
279
280 expect(pendingJobs).to.have.lengthOf(1)
281 expect(waitingJobs).to.have.lengthOf(9)
282 })
283
284 it('Should upload another video and list/sort jobs', async function () {
285 await server.videos.quickUpload({ name: 'to transcode 2' })
286 await waitJobs([ server ])
287
288 {
289 const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })
290
291 expect(data).to.have.lengthOf(20)
292 expect(total).to.equal(20)
293
294 jobUUID = data[16].uuid
295 }
296
297 {
298 const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
299 expect(total).to.equal(20)
300
301 expect(data).to.have.lengthOf(1)
302 expect(data[0].uuid).to.equal(jobUUID)
303 }
304
305 {
306 let previousPriority = Infinity
307 const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
308 expect(total).to.equal(20)
309
310 for (const job of data) {
311 expect(job.priority).to.be.at.most(previousPriority)
312 previousPriority = job.priority
313
314 if (job.state.id === RunnerJobState.PENDING) {
315 jobMaxPriority = job.uuid
316 }
317 }
318 }
319 })
320
321 it('Should search jobs', async function () {
322 {
323 const { total, data } = await server.runnerJobs.list({ search: jobUUID })
324
325 expect(data).to.have.lengthOf(1)
326 expect(total).to.equal(1)
327
328 expect(data[0].uuid).to.equal(jobUUID)
329 }
330
331 {
332 const { total, data } = await server.runnerJobs.list({ search: 'toto' })
333
334 expect(data).to.have.lengthOf(0)
335 expect(total).to.equal(0)
336 }
337
338 {
339 const { total, data } = await server.runnerJobs.list({ search: 'hls' })
340
341 expect(data).to.not.have.lengthOf(0)
342 expect(total).to.not.equal(0)
343 }
344 })
345 })
346
347 describe('Accept/update/abort/process a job', function () {
348
349 it('Should request available jobs', async function () {
350 lastRunnerContact = new Date()
351
352 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
353
354 // Only optimize jobs are available
355 expect(availableJobs).to.have.lengthOf(2)
356
357 for (const job of availableJobs) {
358 expect(job.uuid).to.exist
359 expect(job.payload.input).to.exist
5e47f6ab 360 expect((job.payload as RunnerJobVODWebVideoTranscodingPayload).output).to.exist
d102de1b
C
361
362 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
363 }
364
365 const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
366 const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')
367
368 expect(hlsJobs).to.have.lengthOf(0)
369 expect(webVideoJobs).to.have.lengthOf(2)
370
371 jobUUID = webVideoJobs[0].uuid
372 })
373
374 it('Should have sorted available jobs by priority', async function () {
375 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
376
377 expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
378 })
379
380 it('Should have last runner contact updated', async function () {
381 await wait(1000)
382
383 const { data } = await server.runners.list({ sort: 'createdAt' })
384 expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
385 })
386
387 it('Should accept a job', async function () {
388 const startedAt = new Date()
389
390 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
391 jobToken = job.jobToken
392
393 const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
394 expect(job.uuid).to.equal(jobUUID)
395
396 expect(job.type).to.equal('vod-web-video-transcoding')
397 expect(job.state.label).to.equal('Processing')
398 expect(job.state.id).to.equal(RunnerJobState.PROCESSING)
399
400 expect(job.runner).to.exist
401 expect(job.runner.name).to.equal('runner 1')
402 expect(job.runner.description).to.equal('my super runner 1')
403
404 expect(job.progress).to.be.null
405
406 expect(job.startedAt).to.exist
407 expect(new Date(job.startedAt)).to.be.above(startedAt)
408
409 expect(job.finishedAt).to.not.exist
410
411 expect(job.failures).to.equal(0)
412
413 expect(job.payload).to.exist
414
415 if (fromAccept) {
416 expect(job.jobToken).to.exist
417 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
418 } else {
419 expect(job.jobToken).to.not.exist
420 expect((job as RunnerJobAdmin).privatePayload).to.exist
421 }
422 }
423
424 checkProcessingJob(job, true)
425
426 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
427
428 const processingJob = data.find(j => j.uuid === jobUUID)
429 checkProcessingJob(processingJob, false)
430
431 await checkMainJobState(RunnerJobState.PROCESSING)
432 })
433
434 it('Should update a job', async function () {
435 await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })
436
437 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
438
439 for (const job of data) {
440 if (job.state.id === RunnerJobState.PROCESSING) {
441 expect(job.progress).to.equal(53)
442 } else {
443 expect(job.progress).to.be.null
444 }
445 }
446 })
447
448 it('Should abort a job', async function () {
449 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })
450
451 await checkMainJobState(RunnerJobState.PENDING)
452
453 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
454 for (const job of data) {
455 expect(job.progress).to.be.null
456 }
457 })
458
459 it('Should accept the same job again and post a success', async function () {
460 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
461 expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist
462
463 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
464 jobToken = job.jobToken
465
466 await checkMainJobState(RunnerJobState.PROCESSING)
467
468 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
469
470 for (const job of data) {
471 expect(job.progress).to.be.null
472 }
473
474 const payload = {
475 videoFile: 'video_short.mp4'
476 }
477
478 await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
479 })
480
481 it('Should not have available jobs anymore', async function () {
482 await checkMainJobState(RunnerJobState.COMPLETED)
483
484 const job = await getMainJob()
485 expect(job.finishedAt).to.exist
486
487 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
488 expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
489 })
490 })
491
492 describe('Error job', function () {
493
494 it('Should accept another job and post an error', async function () {
495 await server.runnerJobs.cancelAllJobs()
496 await server.videos.quickUpload({ name: 'video' })
497 await waitJobs([ server ])
498
499 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
500 jobUUID = availableJobs[0].uuid
501
502 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
503 jobToken = job.jobToken
504
505 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
506 })
507
508 it('Should have job failures increased', async function () {
509 const job = await getMainJob()
510 expect(job.state.id).to.equal(RunnerJobState.PENDING)
511 expect(job.failures).to.equal(1)
512 expect(job.error).to.be.null
513 expect(job.progress).to.be.null
514 expect(job.finishedAt).to.not.exist
515 })
516
517 it('Should error a job when job attempts is too big', async function () {
518 for (let i = 0; i < 4; i++) {
519 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
520 jobToken = job.jobToken
521
522 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
523 }
524
525 const job = await getMainJob()
526 expect(job.failures).to.equal(5)
527 expect(job.state.id).to.equal(RunnerJobState.ERRORED)
528 expect(job.state.label).to.equal('Errored')
529 expect(job.error).to.equal('Error 3')
530 expect(job.progress).to.be.null
531 expect(job.finishedAt).to.exist
532
533 failedJob = job
534 })
535
536 it('Should have failed children jobs too', async function () {
537 const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })
538
539 const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
540 expect(children).to.have.lengthOf(9)
541
542 for (const child of children) {
543 expect(child.parent.uuid).to.equal(failedJob.uuid)
544 expect(child.parent.type).to.equal(failedJob.type)
545 expect(child.parent.state.id).to.equal(failedJob.state.id)
546 expect(child.parent.state.label).to.equal(failedJob.state.label)
547
548 expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
549 expect(child.state.label).to.equal('Parent job failed')
550 }
551 })
552 })
553
554 describe('Cancel', function () {
555
556 it('Should cancel a pending job', async function () {
557 await server.videos.quickUpload({ name: 'video' })
558 await waitJobs([ server ])
559
560 {
561 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
562
563 const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
564 jobUUID = pendingJob.uuid
565
566 await server.runnerJobs.cancelByAdmin({ jobUUID })
567 }
568
569 {
570 const job = await getMainJob()
571 expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
572 expect(job.state.label).to.equal('Cancelled')
573 }
574
575 {
576 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
577 const children = data.filter(j => j.parent?.uuid === jobUUID)
578 expect(children).to.have.lengthOf(9)
579
580 for (const child of children) {
581 expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
582 }
583 }
584 })
585
586 it('Should cancel an already accepted job and skip success/error', async function () {
587 await server.videos.quickUpload({ name: 'video' })
588 await waitJobs([ server ])
589
590 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
591 jobUUID = availableJobs[0].uuid
592
593 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
594 jobToken = job.jobToken
595
596 await server.runnerJobs.cancelByAdmin({ jobUUID })
597
598 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
599 })
600 })
601
602 describe('Stalled jobs', function () {
603
604 it('Should abort stalled jobs', async function () {
605 this.timeout(60000)
606
607 await server.videos.quickUpload({ name: 'video' })
608 await server.videos.quickUpload({ name: 'video' })
609 await waitJobs([ server ])
610
611 const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
612 const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })
613
614 for (let i = 0; i < 6; i++) {
615 await wait(2000)
616
617 await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
618 }
619
620 const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
621 const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })
622
623 expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
624 expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
625 })
626 })
627
628 describe('Rate limit', function () {
629
630 before(async function () {
631 this.timeout(60000)
632
633 await server.kill()
634
635 await server.run({
636 rates_limit: {
637 api: {
638 max: 10
639 }
640 }
641 })
642 })
643
644 it('Should rate limit an unknown runner', async function () {
645 const path = '/api/v1/ping'
646 const fields = { runnerToken: 'toto' }
647
648 for (let i = 0; i < 20; i++) {
649 try {
650 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.OK_200 })
651 } catch {}
652 }
653
654 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
655 })
656
657 it('Should not rate limit a registered runner', async function () {
658 const path = '/api/v1/ping'
659
660 for (let i = 0; i < 20; i++) {
661 await makePostBodyRequest({ url: server.url, path, fields: { runnerToken }, expectedStatus: HttpStatusCode.OK_200 })
662 }
663 })
664 })
665 })
666
667 after(async function () {
668 await cleanupTests([ server ])
669 })
670})