1 /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
3 import { expect } from 'chai'
4 import { wait } from '@shared/core-utils'
5 import { HttpStatusCode, Runner, RunnerJob, RunnerJobAdmin, RunnerJobState, RunnerRegistrationToken } from '@shared/models'
11 setAccessTokensToServers,
12 setDefaultVideoChannel,
14 } from '@shared/server-commands'
16 describe('Test runner common actions', function () {
17 let server: PeerTubeServer
18 let registrationToken: string
19 let runnerToken: string
20 let jobMaxPriority: string
22 before(async function () {
25 server = await createSingleServer(1, {
33 await setAccessTokensToServers([ server ])
34 await setDefaultVideoChannel([ server ])
36 await server.config.enableTranscoding(true, true)
37 await server.config.enableRemoteTranscoding()
40 describe('Managing runner registration tokens', function () {
41 let base: RunnerRegistrationToken[]
42 let registrationTokenToDelete: RunnerRegistrationToken
44 it('Should have a default registration token', async function () {
45 const { total, data } = await server.runnerRegistrationTokens.list()
47 expect(total).to.equal(1)
48 expect(data).to.have.lengthOf(1)
51 expect(token.id).to.exist
52 expect(token.createdAt).to.exist
53 expect(token.updatedAt).to.exist
54 expect(token.registeredRunnersCount).to.equal(0)
55 expect(token.registrationToken).to.exist
58 it('Should create other registration tokens', async function () {
59 await server.runnerRegistrationTokens.generate()
60 await server.runnerRegistrationTokens.generate()
62 const { total, data } = await server.runnerRegistrationTokens.list()
63 expect(total).to.equal(3)
64 expect(data).to.have.lengthOf(3)
67 it('Should list registration tokens', async function () {
69 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
70 expect(total).to.equal(3)
71 expect(data).to.have.lengthOf(3)
72 expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
73 expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))
77 registrationTokenToDelete = data[0]
78 registrationToken = data[1].registrationToken
82 const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
83 expect(total).to.equal(3)
84 expect(data).to.have.lengthOf(1)
85 expect(data[0].registrationToken).to.equal(base[0].registrationToken)
89 it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
90 await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
91 await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })
93 const { data } = await server.runnerRegistrationTokens.list()
95 for (const d of data) {
96 if (d.registrationToken === registrationTokenToDelete.registrationToken) {
97 expect(d.registeredRunnersCount).to.equal(2)
99 expect(d.registeredRunnersCount).to.equal(0)
103 const { data: runners } = await server.runners.list()
104 expect(runners).to.have.lengthOf(2)
107 it('Should delete a registration token', async function () {
108 await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })
110 const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
111 expect(total).to.equal(2)
112 expect(data).to.have.lengthOf(2)
114 for (const d of data) {
115 expect(d.registeredRunnersCount).to.equal(0)
116 expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
120 it('Should have removed runners of this registration token', async function () {
121 const { data: runners } = await server.runners.list()
122 expect(runners).to.have.lengthOf(0)
126 describe('Managing runners', function () {
129 it('Should not have runners available', async function () {
130 const { total, data } = await server.runners.list()
132 expect(data).to.have.lengthOf(0)
133 expect(total).to.equal(0)
136 it('Should register runners', async function () {
137 const now = new Date()
139 const result = await server.runners.register({
141 description: 'my super runner 1',
144 expect(result.runnerToken).to.exist
145 runnerToken = result.runnerToken
147 await server.runners.register({
152 const { total, data } = await server.runners.list({ sort: 'createdAt' })
153 expect(total).to.equal(2)
154 expect(data).to.have.lengthOf(2)
156 for (const d of data) {
157 expect(d.id).to.exist
158 expect(d.createdAt).to.exist
159 expect(d.updatedAt).to.exist
160 expect(new Date(d.createdAt)).to.be.above(now)
161 expect(new Date(d.updatedAt)).to.be.above(now)
162 expect(new Date(d.lastContact)).to.be.above(now)
163 expect(d.ip).to.exist
166 expect(data[0].name).to.equal('runner 1')
167 expect(data[0].description).to.equal('my super runner 1')
169 expect(data[1].name).to.equal('runner 2')
170 expect(data[1].description).to.be.null
175 it('Should list runners', async function () {
176 const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })
178 expect(total).to.equal(2)
179 expect(data).to.have.lengthOf(1)
180 expect(data[0].name).to.equal('runner 1')
183 it('Should delete a runner', async function () {
184 await server.runners.delete({ id: toDelete.id })
186 const { total, data } = await server.runners.list()
188 expect(total).to.equal(1)
189 expect(data).to.have.lengthOf(1)
190 expect(data[0].name).to.equal('runner 1')
193 it('Should unregister a runner', async function () {
194 const registered = await server.runners.autoRegisterRunner()
197 const { total, data } = await server.runners.list()
198 expect(total).to.equal(2)
199 expect(data).to.have.lengthOf(2)
202 await server.runners.unregister({ runnerToken: registered })
205 const { total, data } = await server.runners.list()
206 expect(total).to.equal(1)
207 expect(data).to.have.lengthOf(1)
208 expect(data[0].name).to.equal('runner 1')
213 describe('Managing runner jobs', function () {
216 let lastRunnerContact: Date
217 let failedJob: RunnerJob
219 async function checkMainJobState (
220 mainJobState: RunnerJobState,
221 otherJobStates: RunnerJobState[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
223 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
225 for (const job of data) {
226 if (job.uuid === jobUUID) {
227 expect(job.state.id).to.equal(mainJobState)
229 expect(otherJobStates).to.include(job.state.id)
234 function getMainJob () {
235 return server.runnerJobs.getJob({ uuid: jobUUID })
238 describe('List jobs', function () {
240 it('Should not have jobs', async function () {
241 const { total, data } = await server.runnerJobs.list()
243 expect(data).to.have.lengthOf(0)
244 expect(total).to.equal(0)
247 it('Should upload a video and have available jobs', async function () {
248 await server.videos.quickUpload({ name: 'to transcode' })
249 await waitJobs([ server ])
251 const { total, data } = await server.runnerJobs.list()
253 expect(data).to.have.lengthOf(10)
254 expect(total).to.equal(10)
256 for (const job of data) {
257 expect(job.startedAt).to.not.exist
258 expect(job.finishedAt).to.not.exist
259 expect(job.payload).to.exist
260 expect(job.privatePayload).to.exist
263 const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
264 const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')
266 expect(hlsJobs).to.have.lengthOf(5)
267 expect(webVideoJobs).to.have.lengthOf(5)
269 const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
270 const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)
272 expect(pendingJobs).to.have.lengthOf(1)
273 expect(waitingJobs).to.have.lengthOf(9)
276 it('Should upload another video and list/sort jobs', async function () {
277 await server.videos.quickUpload({ name: 'to transcode 2' })
278 await waitJobs([ server ])
281 const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })
283 expect(data).to.have.lengthOf(20)
284 expect(total).to.equal(20)
286 jobUUID = data[16].uuid
290 const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
291 expect(total).to.equal(20)
293 expect(data).to.have.lengthOf(1)
294 expect(data[0].uuid).to.equal(jobUUID)
298 let previousPriority = Infinity
299 const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
300 expect(total).to.equal(20)
302 for (const job of data) {
303 expect(job.priority).to.be.at.most(previousPriority)
304 previousPriority = job.priority
306 if (job.state.id === RunnerJobState.PENDING) {
307 jobMaxPriority = job.uuid
313 it('Should search jobs', async function () {
315 const { total, data } = await server.runnerJobs.list({ search: jobUUID })
317 expect(data).to.have.lengthOf(1)
318 expect(total).to.equal(1)
320 expect(data[0].uuid).to.equal(jobUUID)
324 const { total, data } = await server.runnerJobs.list({ search: 'toto' })
326 expect(data).to.have.lengthOf(0)
327 expect(total).to.equal(0)
331 const { total, data } = await server.runnerJobs.list({ search: 'hls' })
333 expect(data).to.not.have.lengthOf(0)
334 expect(total).to.not.equal(0)
339 describe('Accept/update/abort/process a job', function () {
341 it('Should request available jobs', async function () {
342 lastRunnerContact = new Date()
344 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
346 // Only optimize jobs are available
347 expect(availableJobs).to.have.lengthOf(2)
349 for (const job of availableJobs) {
350 expect(job.uuid).to.exist
351 expect(job.payload.input).to.exist
352 expect(job.payload.output).to.exist
354 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
357 const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
358 const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')
360 expect(hlsJobs).to.have.lengthOf(0)
361 expect(webVideoJobs).to.have.lengthOf(2)
363 jobUUID = webVideoJobs[0].uuid
366 it('Should have sorted available jobs by priority', async function () {
367 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
369 expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
372 it('Should have last runner contact updated', async function () {
375 const { data } = await server.runners.list({ sort: 'createdAt' })
376 expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
379 it('Should accept a job', async function () {
380 const startedAt = new Date()
382 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
383 jobToken = job.jobToken
385 const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
386 expect(job.uuid).to.equal(jobUUID)
388 expect(job.type).to.equal('vod-web-video-transcoding')
389 expect(job.state.label).to.equal('Processing')
390 expect(job.state.id).to.equal(RunnerJobState.PROCESSING)
392 expect(job.runner).to.exist
393 expect(job.runner.name).to.equal('runner 1')
394 expect(job.runner.description).to.equal('my super runner 1')
396 expect(job.progress).to.be.null
398 expect(job.startedAt).to.exist
399 expect(new Date(job.startedAt)).to.be.above(startedAt)
401 expect(job.finishedAt).to.not.exist
403 expect(job.failures).to.equal(0)
405 expect(job.payload).to.exist
408 expect(job.jobToken).to.exist
409 expect((job as RunnerJobAdmin).privatePayload).to.not.exist
411 expect(job.jobToken).to.not.exist
412 expect((job as RunnerJobAdmin).privatePayload).to.exist
416 checkProcessingJob(job, true)
418 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
420 const processingJob = data.find(j => j.uuid === jobUUID)
421 checkProcessingJob(processingJob, false)
423 await checkMainJobState(RunnerJobState.PROCESSING)
426 it('Should update a job', async function () {
427 await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })
429 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
431 for (const job of data) {
432 if (job.state.id === RunnerJobState.PROCESSING) {
433 expect(job.progress).to.equal(53)
435 expect(job.progress).to.be.null
440 it('Should abort a job', async function () {
441 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })
443 await checkMainJobState(RunnerJobState.PENDING)
445 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
446 for (const job of data) {
447 expect(job.progress).to.be.null
451 it('Should accept the same job again and post a success', async function () {
452 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
453 expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist
455 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
456 jobToken = job.jobToken
458 await checkMainJobState(RunnerJobState.PROCESSING)
460 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
462 for (const job of data) {
463 expect(job.progress).to.be.null
467 videoFile: 'video_short.mp4'
470 await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
473 it('Should not have available jobs anymore', async function () {
474 await checkMainJobState(RunnerJobState.COMPLETED)
476 const job = await getMainJob()
477 expect(job.finishedAt).to.exist
479 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
480 expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
484 describe('Error job', function () {
486 it('Should accept another job and post an error', async function () {
487 await server.runnerJobs.cancelAllJobs()
488 await server.videos.quickUpload({ name: 'video' })
489 await waitJobs([ server ])
491 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
492 jobUUID = availableJobs[0].uuid
494 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
495 jobToken = job.jobToken
497 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
500 it('Should have job failures increased', async function () {
501 const job = await getMainJob()
502 expect(job.state.id).to.equal(RunnerJobState.PENDING)
503 expect(job.failures).to.equal(1)
504 expect(job.error).to.be.null
505 expect(job.progress).to.be.null
506 expect(job.finishedAt).to.not.exist
509 it('Should error a job when job attempts is too big', async function () {
510 for (let i = 0; i < 4; i++) {
511 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
512 jobToken = job.jobToken
514 await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
517 const job = await getMainJob()
518 expect(job.failures).to.equal(5)
519 expect(job.state.id).to.equal(RunnerJobState.ERRORED)
520 expect(job.state.label).to.equal('Errored')
521 expect(job.error).to.equal('Error 3')
522 expect(job.progress).to.be.null
523 expect(job.finishedAt).to.exist
528 it('Should have failed children jobs too', async function () {
529 const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })
531 const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
532 expect(children).to.have.lengthOf(9)
534 for (const child of children) {
535 expect(child.parent.uuid).to.equal(failedJob.uuid)
536 expect(child.parent.type).to.equal(failedJob.type)
537 expect(child.parent.state.id).to.equal(failedJob.state.id)
538 expect(child.parent.state.label).to.equal(failedJob.state.label)
540 expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
541 expect(child.state.label).to.equal('Parent job failed')
546 describe('Cancel', function () {
548 it('Should cancel a pending job', async function () {
549 await server.videos.quickUpload({ name: 'video' })
550 await waitJobs([ server ])
553 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
555 const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
556 jobUUID = pendingJob.uuid
558 await server.runnerJobs.cancelByAdmin({ jobUUID })
562 const job = await getMainJob()
563 expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
564 expect(job.state.label).to.equal('Cancelled')
568 const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
569 const children = data.filter(j => j.parent?.uuid === jobUUID)
570 expect(children).to.have.lengthOf(9)
572 for (const child of children) {
573 expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
578 it('Should cancel an already accepted job and skip success/error', async function () {
579 await server.videos.quickUpload({ name: 'video' })
580 await waitJobs([ server ])
582 const { availableJobs } = await server.runnerJobs.request({ runnerToken })
583 jobUUID = availableJobs[0].uuid
585 const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
586 jobToken = job.jobToken
588 await server.runnerJobs.cancelByAdmin({ jobUUID })
590 await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
594 describe('Stalled jobs', function () {
596 it('Should abort stalled jobs', async function () {
599 await server.videos.quickUpload({ name: 'video' })
600 await server.videos.quickUpload({ name: 'video' })
601 await waitJobs([ server ])
603 const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
604 const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })
606 for (let i = 0; i < 6; i++) {
609 await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
612 const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
613 const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })
615 expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
616 expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
620 describe('Rate limit', function () {
622 before(async function () {
636 it('Should rate limit an unknown runner', async function () {
637 const path = '/api/v1/ping'
638 const fields = { runnerToken: 'toto' }
640 for (let i = 0; i < 20; i++) {
642 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.OK_200 })
646 await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
649 it('Should not rate limit a registered runner', async function () {
650 const path = '/api/v1/ping'
652 for (let i = 0; i < 20; i++) {
653 await makePostBodyRequest({ url: server.url, path, fields: { runnerToken }, expectedStatus: HttpStatusCode.OK_200 })
659 after(async function () {
660 await cleanupTests([ server ])