1 /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
3 import { expect } from 'chai'
4 import { wait } from '@shared/core-utils'
11 RunnerJobVODWebVideoTranscodingPayload,
12 RunnerRegistrationToken
13 } from '@shared/models'
19 setAccessTokensToServers,
20 setDefaultVideoChannel,
22 } from '@shared/server-commands'
24 describe('Test runner common actions', function () {
25 let server: PeerTubeServer
26 let registrationToken: string
27 let runnerToken: string
28 let jobMaxPriority: string
30 before(async function () {
33 server = await createSingleServer(1, {
41 await setAccessTokensToServers([ server ])
42 await setDefaultVideoChannel([ server ])
44 await server.config.enableTranscoding(true, true)
45 await server.config.enableRemoteTranscoding()
48 describe('Managing runner registration tokens', function () {
49 let base: RunnerRegistrationToken[]
50 let registrationTokenToDelete: RunnerRegistrationToken
52 it('Should have a default registration token', async function () {
53 const { total, data } = await server.runnerRegistrationTokens.list()
55 expect(total).to.equal(1)
56 expect(data).to.have.lengthOf(1)
59 expect(token.id).to.exist
60 expect(token.createdAt).to.exist
61 expect(token.updatedAt).to.exist
62 expect(token.registeredRunnersCount).to.equal(0)
63 expect(token.registrationToken).to.exist
66 it('Should create other registration tokens', async function () {
67 await server.runnerRegistrationTokens.generate()
68 await server.runnerRegistrationTokens.generate()
70 const { total, data } = await server.runnerRegistrationTokens.list()
71 expect(total).to.equal(3)
72 expect(data).to.have.lengthOf(3)
75 it('Should list registration tokens', async function () {
77 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
78 expect(total).to.equal(3)
79 expect(data).to.have.lengthOf(3)
80 expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
81 expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))
85 registrationTokenToDelete = data[0]
86 registrationToken = data[1].registrationToken
90 const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
91 expect(total).to.equal(3)
92 expect(data).to.have.lengthOf(1)
93 expect(data[0].registrationToken).to.equal(base[0].registrationToken)
97 it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
98 await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
99 await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })
101 const { data } = await server.runnerRegistrationTokens.list()
103 for (const d of data) {
104 if (d.registrationToken === registrationTokenToDelete.registrationToken) {
105 expect(d.registeredRunnersCount).to.equal(2)
107 expect(d.registeredRunnersCount).to.equal(0)
111 const { data: runners } = await server.runners.list()
112 expect(runners).to.have.lengthOf(2)
115 it('Should delete a registration token', async function () {
116 await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })
118 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
119 expect(total).to.equal(2)
120 expect(data).to.have.lengthOf(2)
122 for (const d of data) {
123 expect(d.registeredRunnersCount).to.equal(0)
124 expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
128 it('Should have removed runners of this registration token', async function () {
129 const { data: runners } = await server.runners.list()
130 expect(runners).to.have.lengthOf(0)
134 describe('Managing runners', function () {
137 it('Should not have runners available', async function () {
138 const { total, data } = await server.runners.list()
140 expect(data).to.have.lengthOf(0)
141 expect(total).to.equal(0)
144 it('Should register runners', async function () {
145 const now = new Date()
147 const result = await server.runners.register({
149 description: 'my super runner 1',
152 expect(result.runnerToken).to.exist
153 runnerToken = result.runnerToken
155 await server.runners.register({
160 const { total, data } = await server.runners.list({ sort: 'createdAt' })
161 expect(total).to.equal(2)
162 expect(data).to.have.lengthOf(2)
164 for (const d of data) {
165 expect(d.id).to.exist
166 expect(d.createdAt).to.exist
167 expect(d.updatedAt).to.exist
168 expect(new Date(d.createdAt)).to.be.above(now)
169 expect(new Date(d.updatedAt)).to.be.above(now)
170 expect(new Date(d.lastContact)).to.be.above(now)
171 expect(d.ip).to.exist
174 expect(data[0].name).to.equal('runner 1')
175 expect(data[0].description).to.equal('my super runner 1')
177 expect(data[1].name).to.equal('runner 2')
178 expect(data[1].description).to.be.null
183 it('Should list runners', async function () {
184 const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })
186 expect(total).to.equal(2)
187 expect(data).to.have.lengthOf(1)
188 expect(data[0].name).to.equal('runner 1')
191 it('Should delete a runner', async function () {
192 await server.runners.delete({ id: toDelete.id })
194 const { total, data } = await server.runners.list()
196 expect(total).to.equal(1)
197 expect(data).to.have.lengthOf(1)
198 expect(data[0].name).to.equal('runner 1')
201 it('Should unregister a runner', async function () {
202 const registered = await server.runners.autoRegisterRunner()
205 const { total, data } = await server.runners.list()
206 expect(total).to.equal(2)
207 expect(data).to.have.lengthOf(2)
210 await server.runners.unregister({ runnerToken: registered })
213 const { total, data } = await server.runners.list()
214 expect(total).to.equal(1)
215 expect(data).to.have.lengthOf(1)
216 expect(data[0].name).to.equal('runner 1')
221 describe('Managing runner jobs', function () {
224 let lastRunnerContact: Date
225 let failedJob: RunnerJob
227 async function checkMainJobState (
228 mainJobState: RunnerJobState,
229 otherJobStates: RunnerJobState[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
231 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
233 for (const job of data) {
234 if (job.uuid === jobUUID) {
235 expect(job.state.id).to.equal(mainJobState)
237 expect(otherJobStates).to.include(job.state.id)
242 function getMainJob () {
243 return server.runnerJobs.getJob({ uuid: jobUUID })
246 describe('List jobs', function () {
248 it('Should not have jobs', async function () {
249 const { total, data } = await server.runnerJobs.list()
251 expect(data).to.have.lengthOf(0)
252 expect(total).to.equal(0)
255 it('Should upload a video and have available jobs', async function () {
256 await server.videos.quickUpload({ name: 'to transcode' })
257 await waitJobs([ server ])
259 const { total, data } = await server.runnerJobs.list()
261 expect(data).to.have.lengthOf(10)
262 expect(total).to.equal(10)
264 for (const job of data) {
265 expect(job.startedAt).to.not.exist
266 expect(job.finishedAt).to.not.exist
267 expect(job.payload).to.exist
268 expect(job.privatePayload).to.exist
271 const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
272 const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')
274 expect(hlsJobs).to.have.lengthOf(5)
275 expect(webVideoJobs).to.have.lengthOf(5)
277 const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
278 const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)
280 expect(pendingJobs).to.have.lengthOf(1)
281 expect(waitingJobs).to.have.lengthOf(9)
284 it('Should upload another video and list/sort jobs', async function () {
285 await server.videos.quickUpload({ name: 'to transcode 2' })
286 await waitJobs([ server ])
289 const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })
291 expect(data).to.have.lengthOf(20)
292 expect(total).to.equal(20)
294 jobUUID = data[16].uuid
298 const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
299 expect(total).to.equal(20)
301 expect(data).to.have.lengthOf(1)
302 expect(data[0].uuid).to.equal(jobUUID)
306 let previousPriority = Infinity
307 const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
308 expect(total).to.equal(20)
310 for (const job of data) {
311 expect(job.priority).to.be.at.most(previousPriority)
312 previousPriority = job.priority
314 if (job.state.id === RunnerJobState.PENDING) {
315 jobMaxPriority = job.uuid
321 it('Should search jobs', async function () {
323 const { total, data } = await server.runnerJobs.list({ search: jobUUID })
325 expect(data).to.have.lengthOf(1)
326 expect(total).to.equal(1)
328 expect(data[0].uuid).to.equal(jobUUID)
332 const { total, data } = await server.runnerJobs.list({ search: 'toto' })
334 expect(data).to.have.lengthOf(0)
335 expect(total).to.equal(0)
339 const { total, data } = await server.runnerJobs.list({ search: 'hls' })
341 expect(data).to.not.have.lengthOf(0)
342 expect(total).to.not.equal(0)
347 describe('Accept/update/abort/process a job', function () {
349 it('Should request available jobs', async function () {
350 lastRunnerContact = new Date()
352 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
354 // Only optimize jobs are available
355 expect(availableJobs).to.have.lengthOf(2)
357 for (const job of availableJobs) {
358 expect(job.uuid).to.exist
359 expect(job.payload.input).to.exist
360 expect((job.payload as RunnerJobVODWebVideoTranscodingPayload).output).to.exist
362 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
365 const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
366 const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')
368 expect(hlsJobs).to.have.lengthOf(0)
369 expect(webVideoJobs).to.have.lengthOf(2)
371 jobUUID = webVideoJobs[0].uuid
374 it('Should have sorted available jobs by priority', async function () {
375 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
377 expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
380 it('Should have last runner contact updated', async function () {
383 const { data } = await server.runners.list({ sort: 'createdAt' })
384 expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
387 it('Should accept a job', async function () {
388 const startedAt = new Date()
390 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
391 jobToken = job.jobToken
393 const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
394 expect(job.uuid).to.equal(jobUUID)
396 expect(job.type).to.equal('vod-web-video-transcoding')
397 expect(job.state.label).to.equal('Processing')
398 expect(job.state.id).to.equal(RunnerJobState.PROCESSING)
400 expect(job.runner).to.exist
401 expect(job.runner.name).to.equal('runner 1')
402 expect(job.runner.description).to.equal('my super runner 1')
404 expect(job.progress).to.be.null
406 expect(job.startedAt).to.exist
407 expect(new Date(job.startedAt)).to.be.above(startedAt)
409 expect(job.finishedAt).to.not.exist
411 expect(job.failures).to.equal(0)
413 expect(job.payload).to.exist
416 expect(job.jobToken).to.exist
417 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
419 expect(job.jobToken).to.not.exist
420 expect((job as RunnerJobAdmin).privatePayload).to.exist
424 checkProcessingJob(job, true)
426 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
428 const processingJob = data.find(j => j.uuid === jobUUID)
429 checkProcessingJob(processingJob, false)
431 await checkMainJobState(RunnerJobState.PROCESSING)
434 it('Should update a job', async function () {
435 await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })
437 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
439 for (const job of data) {
440 if (job.state.id === RunnerJobState.PROCESSING) {
441 expect(job.progress).to.equal(53)
443 expect(job.progress).to.be.null
448 it('Should abort a job', async function () {
449 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })
451 await checkMainJobState(RunnerJobState.PENDING)
453 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
454 for (const job of data) {
455 expect(job.progress).to.be.null
459 it('Should accept the same job again and post a success', async function () {
460 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
461 expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist
463 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
464 jobToken = job.jobToken
466 await checkMainJobState(RunnerJobState.PROCESSING)
468 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
470 for (const job of data) {
471 expect(job.progress).to.be.null
475 videoFile: 'video_short.mp4'
478 await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
481 it('Should not have available jobs anymore', async function () {
482 await checkMainJobState(RunnerJobState.COMPLETED)
484 const job = await getMainJob()
485 expect(job.finishedAt).to.exist
487 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
488 expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
492 describe('Error job', function () {
494 it('Should accept another job and post an error', async function () {
495 await server.runnerJobs.cancelAllJobs()
496 await server.videos.quickUpload({ name: 'video' })
497 await waitJobs([ server ])
499 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
500 jobUUID = availableJobs[0].uuid
502 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
503 jobToken = job.jobToken
505 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
508 it('Should have job failures increased', async function () {
509 const job = await getMainJob()
510 expect(job.state.id).to.equal(RunnerJobState.PENDING)
511 expect(job.failures).to.equal(1)
512 expect(job.error).to.be.null
513 expect(job.progress).to.be.null
514 expect(job.finishedAt).to.not.exist
517 it('Should error a job when job attempts is too big', async function () {
518 for (let i = 0; i < 4; i++) {
519 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
520 jobToken = job.jobToken
522 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
525 const job = await getMainJob()
526 expect(job.failures).to.equal(5)
527 expect(job.state.id).to.equal(RunnerJobState.ERRORED)
528 expect(job.state.label).to.equal('Errored')
529 expect(job.error).to.equal('Error 3')
530 expect(job.progress).to.be.null
531 expect(job.finishedAt).to.exist
536 it('Should have failed children jobs too', async function () {
537 const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })
539 const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
540 expect(children).to.have.lengthOf(9)
542 for (const child of children) {
543 expect(child.parent.uuid).to.equal(failedJob.uuid)
544 expect(child.parent.type).to.equal(failedJob.type)
545 expect(child.parent.state.id).to.equal(failedJob.state.id)
546 expect(child.parent.state.label).to.equal(failedJob.state.label)
548 expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
549 expect(child.state.label).to.equal('Parent job failed')
554 describe('Cancel', function () {
556 it('Should cancel a pending job', async function () {
557 await server.videos.quickUpload({ name: 'video' })
558 await waitJobs([ server ])
561 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
563 const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
564 jobUUID = pendingJob.uuid
566 await server.runnerJobs.cancelByAdmin({ jobUUID })
570 const job = await getMainJob()
571 expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
572 expect(job.state.label).to.equal('Cancelled')
576 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
577 const children = data.filter(j => j.parent?.uuid === jobUUID)
578 expect(children).to.have.lengthOf(9)
580 for (const child of children) {
581 expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
586 it('Should cancel an already accepted job and skip success/error', async function () {
587 await server.videos.quickUpload({ name: 'video' })
588 await waitJobs([ server ])
590 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
591 jobUUID = availableJobs[0].uuid
593 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
594 jobToken = job.jobToken
596 await server.runnerJobs.cancelByAdmin({ jobUUID })
598 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
602 describe('Stalled jobs', function () {
604 it('Should abort stalled jobs', async function () {
607 await server.videos.quickUpload({ name: 'video' })
608 await server.videos.quickUpload({ name: 'video' })
609 await waitJobs([ server ])
611 const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
612 const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })
614 for (let i = 0; i < 6; i++) {
617 await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
620 const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
621 const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })
623 expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
624 expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
628 describe('Rate limit', function () {
630 before(async function () {
644 it('Should rate limit an unknown runner', async function () {
645 const path = '/api/v1/ping'
646 const fields = { runnerToken: 'toto' }
648 for (let i = 0; i < 20; i++) {
650 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.OK_200 })
654 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
657 it('Should not rate limit a registered runner', async function () {
658 const path = '/api/v1/ping'
660 for (let i = 0; i < 20; i++) {
661 await makePostBodyRequest({ url: server.url, path, fields: { runnerToken }, expectedStatus: HttpStatusCode.OK_200 })
667 after(async function () {
668 await cleanupTests([ server ])