]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/tests/api/runners/runner-common.ts
Add runner server tests
[github/Chocobozzz/PeerTube.git] / server / tests / api / runners / runner-common.ts
1 /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3 import { expect } from 'chai'
4 import { wait } from '@shared/core-utils'
5 import { HttpStatusCode, Runner, RunnerJob, RunnerJobAdmin, RunnerJobState, RunnerRegistrationToken } from '@shared/models'
6 import {
7 cleanupTests,
8 createSingleServer,
9 makePostBodyRequest,
10 PeerTubeServer,
11 setAccessTokensToServers,
12 setDefaultVideoChannel,
13 waitJobs
14 } from '@shared/server-commands'
15
16 describe('Test runner common actions', function () {
17 let server: PeerTubeServer
18 let registrationToken: string
19 let runnerToken: string
20 let jobMaxPriority: string
21
22 before(async function () {
23 this.timeout(120_000)
24
25 server = await createSingleServer(1, {
26 remote_runners: {
27 stalled_jobs: {
28 vod: '5 seconds'
29 }
30 }
31 })
32
33 await setAccessTokensToServers([ server ])
34 await setDefaultVideoChannel([ server ])
35
36 await server.config.enableTranscoding(true, true)
37 await server.config.enableRemoteTranscoding()
38 })
39
40 describe('Managing runner registration tokens', function () {
41 let base: RunnerRegistrationToken[]
42 let registrationTokenToDelete: RunnerRegistrationToken
43
44 it('Should have a default registration token', async function () {
45 const { total, data } = await server.runnerRegistrationTokens.list()
46
47 expect(total).to.equal(1)
48 expect(data).to.have.lengthOf(1)
49
50 const token = data[0]
51 expect(token.id).to.exist
52 expect(token.createdAt).to.exist
53 expect(token.updatedAt).to.exist
54 expect(token.registeredRunnersCount).to.equal(0)
55 expect(token.registrationToken).to.exist
56 })
57
58 it('Should create other registration tokens', async function () {
59 await server.runnerRegistrationTokens.generate()
60 await server.runnerRegistrationTokens.generate()
61
62 const { total, data } = await server.runnerRegistrationTokens.list()
63 expect(total).to.equal(3)
64 expect(data).to.have.lengthOf(3)
65 })
66
67 it('Should list registration tokens', async function () {
68 {
69 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
70 expect(total).to.equal(3)
71 expect(data).to.have.lengthOf(3)
72 expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
73 expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))
74
75 base = data
76
77 registrationTokenToDelete = data[0]
78 registrationToken = data[1].registrationToken
79 }
80
81 {
82 const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
83 expect(total).to.equal(3)
84 expect(data).to.have.lengthOf(1)
85 expect(data[0].registrationToken).to.equal(base[0].registrationToken)
86 }
87 })
88
89 it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
90 await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
91 await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })
92
93 const { data } = await server.runnerRegistrationTokens.list()
94
95 for (const d of data) {
96 if (d.registrationToken === registrationTokenToDelete.registrationToken) {
97 expect(d.registeredRunnersCount).to.equal(2)
98 } else {
99 expect(d.registeredRunnersCount).to.equal(0)
100 }
101 }
102
103 const { data: runners } = await server.runners.list()
104 expect(runners).to.have.lengthOf(2)
105 })
106
107 it('Should delete a registration token', async function () {
108 await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })
109
110 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
111 expect(total).to.equal(2)
112 expect(data).to.have.lengthOf(2)
113
114 for (const d of data) {
115 expect(d.registeredRunnersCount).to.equal(0)
116 expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
117 }
118 })
119
120 it('Should have removed runners of this registration token', async function () {
121 const { data: runners } = await server.runners.list()
122 expect(runners).to.have.lengthOf(0)
123 })
124 })
125
126 describe('Managing runners', function () {
127 let toDelete: Runner
128
129 it('Should not have runners available', async function () {
130 const { total, data } = await server.runners.list()
131
132 expect(data).to.have.lengthOf(0)
133 expect(total).to.equal(0)
134 })
135
136 it('Should register runners', async function () {
137 const now = new Date()
138
139 const result = await server.runners.register({
140 name: 'runner 1',
141 description: 'my super runner 1',
142 registrationToken
143 })
144 expect(result.runnerToken).to.exist
145 runnerToken = result.runnerToken
146
147 await server.runners.register({
148 name: 'runner 2',
149 registrationToken
150 })
151
152 const { total, data } = await server.runners.list({ sort: 'createdAt' })
153 expect(total).to.equal(2)
154 expect(data).to.have.lengthOf(2)
155
156 for (const d of data) {
157 expect(d.id).to.exist
158 expect(d.createdAt).to.exist
159 expect(d.updatedAt).to.exist
160 expect(new Date(d.createdAt)).to.be.above(now)
161 expect(new Date(d.updatedAt)).to.be.above(now)
162 expect(new Date(d.lastContact)).to.be.above(now)
163 expect(d.ip).to.exist
164 }
165
166 expect(data[0].name).to.equal('runner 1')
167 expect(data[0].description).to.equal('my super runner 1')
168
169 expect(data[1].name).to.equal('runner 2')
170 expect(data[1].description).to.be.null
171
172 toDelete = data[1]
173 })
174
175 it('Should list runners', async function () {
176 const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })
177
178 expect(total).to.equal(2)
179 expect(data).to.have.lengthOf(1)
180 expect(data[0].name).to.equal('runner 1')
181 })
182
183 it('Should delete a runner', async function () {
184 await server.runners.delete({ id: toDelete.id })
185
186 const { total, data } = await server.runners.list()
187
188 expect(total).to.equal(1)
189 expect(data).to.have.lengthOf(1)
190 expect(data[0].name).to.equal('runner 1')
191 })
192
193 it('Should unregister a runner', async function () {
194 const registered = await server.runners.autoRegisterRunner()
195
196 {
197 const { total, data } = await server.runners.list()
198 expect(total).to.equal(2)
199 expect(data).to.have.lengthOf(2)
200 }
201
202 await server.runners.unregister({ runnerToken: registered })
203
204 {
205 const { total, data } = await server.runners.list()
206 expect(total).to.equal(1)
207 expect(data).to.have.lengthOf(1)
208 expect(data[0].name).to.equal('runner 1')
209 }
210 })
211 })
212
213 describe('Managing runner jobs', function () {
214 let jobUUID: string
215 let jobToken: string
216 let lastRunnerContact: Date
217 let failedJob: RunnerJob
218
219 async function checkMainJobState (
220 mainJobState: RunnerJobState,
221 otherJobStates: RunnerJobState[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
222 ) {
223 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
224
225 for (const job of data) {
226 if (job.uuid === jobUUID) {
227 expect(job.state.id).to.equal(mainJobState)
228 } else {
229 expect(otherJobStates).to.include(job.state.id)
230 }
231 }
232 }
233
234 function getMainJob () {
235 return server.runnerJobs.getJob({ uuid: jobUUID })
236 }
237
238 describe('List jobs', function () {
239
240 it('Should not have jobs', async function () {
241 const { total, data } = await server.runnerJobs.list()
242
243 expect(data).to.have.lengthOf(0)
244 expect(total).to.equal(0)
245 })
246
247 it('Should upload a video and have available jobs', async function () {
248 await server.videos.quickUpload({ name: 'to transcode' })
249 await waitJobs([ server ])
250
251 const { total, data } = await server.runnerJobs.list()
252
253 expect(data).to.have.lengthOf(10)
254 expect(total).to.equal(10)
255
256 for (const job of data) {
257 expect(job.startedAt).to.not.exist
258 expect(job.finishedAt).to.not.exist
259 expect(job.payload).to.exist
260 expect(job.privatePayload).to.exist
261 }
262
263 const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
264 const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')
265
266 expect(hlsJobs).to.have.lengthOf(5)
267 expect(webVideoJobs).to.have.lengthOf(5)
268
269 const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
270 const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)
271
272 expect(pendingJobs).to.have.lengthOf(1)
273 expect(waitingJobs).to.have.lengthOf(9)
274 })
275
276 it('Should upload another video and list/sort jobs', async function () {
277 await server.videos.quickUpload({ name: 'to transcode 2' })
278 await waitJobs([ server ])
279
280 {
281 const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })
282
283 expect(data).to.have.lengthOf(20)
284 expect(total).to.equal(20)
285
286 jobUUID = data[16].uuid
287 }
288
289 {
290 const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
291 expect(total).to.equal(20)
292
293 expect(data).to.have.lengthOf(1)
294 expect(data[0].uuid).to.equal(jobUUID)
295 }
296
297 {
298 let previousPriority = Infinity
299 const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
300 expect(total).to.equal(20)
301
302 for (const job of data) {
303 expect(job.priority).to.be.at.most(previousPriority)
304 previousPriority = job.priority
305
306 if (job.state.id === RunnerJobState.PENDING) {
307 jobMaxPriority = job.uuid
308 }
309 }
310 }
311 })
312
313 it('Should search jobs', async function () {
314 {
315 const { total, data } = await server.runnerJobs.list({ search: jobUUID })
316
317 expect(data).to.have.lengthOf(1)
318 expect(total).to.equal(1)
319
320 expect(data[0].uuid).to.equal(jobUUID)
321 }
322
323 {
324 const { total, data } = await server.runnerJobs.list({ search: 'toto' })
325
326 expect(data).to.have.lengthOf(0)
327 expect(total).to.equal(0)
328 }
329
330 {
331 const { total, data } = await server.runnerJobs.list({ search: 'hls' })
332
333 expect(data).to.not.have.lengthOf(0)
334 expect(total).to.not.equal(0)
335 }
336 })
337 })
338
339 describe('Accept/update/abort/process a job', function () {
340
341 it('Should request available jobs', async function () {
342 lastRunnerContact = new Date()
343
344 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
345
346 // Only optimize jobs are available
347 expect(availableJobs).to.have.lengthOf(2)
348
349 for (const job of availableJobs) {
350 expect(job.uuid).to.exist
351 expect(job.payload.input).to.exist
352 expect(job.payload.output).to.exist
353
354 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
355 }
356
357 const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
358 const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')
359
360 expect(hlsJobs).to.have.lengthOf(0)
361 expect(webVideoJobs).to.have.lengthOf(2)
362
363 jobUUID = webVideoJobs[0].uuid
364 })
365
366 it('Should have sorted available jobs by priority', async function () {
367 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
368
369 expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
370 })
371
372 it('Should have last runner contact updated', async function () {
373 await wait(1000)
374
375 const { data } = await server.runners.list({ sort: 'createdAt' })
376 expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
377 })
378
379 it('Should accept a job', async function () {
380 const startedAt = new Date()
381
382 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
383 jobToken = job.jobToken
384
385 const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
386 expect(job.uuid).to.equal(jobUUID)
387
388 expect(job.type).to.equal('vod-web-video-transcoding')
389 expect(job.state.label).to.equal('Processing')
390 expect(job.state.id).to.equal(RunnerJobState.PROCESSING)
391
392 expect(job.runner).to.exist
393 expect(job.runner.name).to.equal('runner 1')
394 expect(job.runner.description).to.equal('my super runner 1')
395
396 expect(job.progress).to.be.null
397
398 expect(job.startedAt).to.exist
399 expect(new Date(job.startedAt)).to.be.above(startedAt)
400
401 expect(job.finishedAt).to.not.exist
402
403 expect(job.failures).to.equal(0)
404
405 expect(job.payload).to.exist
406
407 if (fromAccept) {
408 expect(job.jobToken).to.exist
409 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
410 } else {
411 expect(job.jobToken).to.not.exist
412 expect((job as RunnerJobAdmin).privatePayload).to.exist
413 }
414 }
415
416 checkProcessingJob(job, true)
417
418 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
419
420 const processingJob = data.find(j => j.uuid === jobUUID)
421 checkProcessingJob(processingJob, false)
422
423 await checkMainJobState(RunnerJobState.PROCESSING)
424 })
425
426 it('Should update a job', async function () {
427 await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })
428
429 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
430
431 for (const job of data) {
432 if (job.state.id === RunnerJobState.PROCESSING) {
433 expect(job.progress).to.equal(53)
434 } else {
435 expect(job.progress).to.be.null
436 }
437 }
438 })
439
440 it('Should abort a job', async function () {
441 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })
442
443 await checkMainJobState(RunnerJobState.PENDING)
444
445 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
446 for (const job of data) {
447 expect(job.progress).to.be.null
448 }
449 })
450
451 it('Should accept the same job again and post a success', async function () {
452 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
453 expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist
454
455 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
456 jobToken = job.jobToken
457
458 await checkMainJobState(RunnerJobState.PROCESSING)
459
460 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
461
462 for (const job of data) {
463 expect(job.progress).to.be.null
464 }
465
466 const payload = {
467 videoFile: 'video_short.mp4'
468 }
469
470 await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
471 })
472
473 it('Should not have available jobs anymore', async function () {
474 await checkMainJobState(RunnerJobState.COMPLETED)
475
476 const job = await getMainJob()
477 expect(job.finishedAt).to.exist
478
479 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
480 expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
481 })
482 })
483
484 describe('Error job', function () {
485
486 it('Should accept another job and post an error', async function () {
487 await server.runnerJobs.cancelAllJobs()
488 await server.videos.quickUpload({ name: 'video' })
489 await waitJobs([ server ])
490
491 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
492 jobUUID = availableJobs[0].uuid
493
494 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
495 jobToken = job.jobToken
496
497 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
498 })
499
500 it('Should have job failures increased', async function () {
501 const job = await getMainJob()
502 expect(job.state.id).to.equal(RunnerJobState.PENDING)
503 expect(job.failures).to.equal(1)
504 expect(job.error).to.be.null
505 expect(job.progress).to.be.null
506 expect(job.finishedAt).to.not.exist
507 })
508
509 it('Should error a job when job attempts is too big', async function () {
510 for (let i = 0; i < 4; i++) {
511 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
512 jobToken = job.jobToken
513
514 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
515 }
516
517 const job = await getMainJob()
518 expect(job.failures).to.equal(5)
519 expect(job.state.id).to.equal(RunnerJobState.ERRORED)
520 expect(job.state.label).to.equal('Errored')
521 expect(job.error).to.equal('Error 3')
522 expect(job.progress).to.be.null
523 expect(job.finishedAt).to.exist
524
525 failedJob = job
526 })
527
528 it('Should have failed children jobs too', async function () {
529 const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })
530
531 const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
532 expect(children).to.have.lengthOf(9)
533
534 for (const child of children) {
535 expect(child.parent.uuid).to.equal(failedJob.uuid)
536 expect(child.parent.type).to.equal(failedJob.type)
537 expect(child.parent.state.id).to.equal(failedJob.state.id)
538 expect(child.parent.state.label).to.equal(failedJob.state.label)
539
540 expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
541 expect(child.state.label).to.equal('Parent job failed')
542 }
543 })
544 })
545
546 describe('Cancel', function () {
547
548 it('Should cancel a pending job', async function () {
549 await server.videos.quickUpload({ name: 'video' })
550 await waitJobs([ server ])
551
552 {
553 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
554
555 const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
556 jobUUID = pendingJob.uuid
557
558 await server.runnerJobs.cancelByAdmin({ jobUUID })
559 }
560
561 {
562 const job = await getMainJob()
563 expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
564 expect(job.state.label).to.equal('Cancelled')
565 }
566
567 {
568 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
569 const children = data.filter(j => j.parent?.uuid === jobUUID)
570 expect(children).to.have.lengthOf(9)
571
572 for (const child of children) {
573 expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
574 }
575 }
576 })
577
578 it('Should cancel an already accepted job and skip success/error', async function () {
579 await server.videos.quickUpload({ name: 'video' })
580 await waitJobs([ server ])
581
582 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
583 jobUUID = availableJobs[0].uuid
584
585 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
586 jobToken = job.jobToken
587
588 await server.runnerJobs.cancelByAdmin({ jobUUID })
589
590 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
591 })
592 })
593
594 describe('Stalled jobs', function () {
595
596 it('Should abort stalled jobs', async function () {
597 this.timeout(60000)
598
599 await server.videos.quickUpload({ name: 'video' })
600 await server.videos.quickUpload({ name: 'video' })
601 await waitJobs([ server ])
602
603 const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
604 const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })
605
606 for (let i = 0; i < 6; i++) {
607 await wait(2000)
608
609 await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
610 }
611
612 const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
613 const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })
614
615 expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
616 expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
617 })
618 })
619
620 describe('Rate limit', function () {
621
622 before(async function () {
623 this.timeout(60000)
624
625 await server.kill()
626
627 await server.run({
628 rates_limit: {
629 api: {
630 max: 10
631 }
632 }
633 })
634 })
635
636 it('Should rate limit an unknown runner', async function () {
637 const path = '/api/v1/ping'
638 const fields = { runnerToken: 'toto' }
639
640 for (let i = 0; i < 20; i++) {
641 try {
642 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.OK_200 })
643 } catch {}
644 }
645
646 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
647 })
648
649 it('Should not rate limit a registered runner', async function () {
650 const path = '/api/v1/ping'
651
652 for (let i = 0; i < 20; i++) {
653 await makePostBodyRequest({ url: server.url, path, fields: { runnerToken }, expectedStatus: HttpStatusCode.OK_200 })
654 }
655 })
656 })
657 })
658
659 after(async function () {
660 await cleanupTests([ server ])
661 })
662 })