aboutsummaryrefslogblamecommitdiffhomepage
path: root/server/tests/api/runners/runner-common.ts
blob: 5540241908fc8fa0d9998457172ef0c77c8abb1a (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13



                                                                                              








                                         

























































































































































































































































































































































                                                                                                                            
                                                                                         





















































































































































































































































































































                                                                                                                                          
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */

import { expect } from 'chai'
import { wait } from '@shared/core-utils'
import {
  HttpStatusCode,
  Runner,
  RunnerJob,
  RunnerJobAdmin,
  RunnerJobState,
  RunnerJobVODWebVideoTranscodingPayload,
  RunnerRegistrationToken
} from '@shared/models'
import {
  cleanupTests,
  createSingleServer,
  makePostBodyRequest,
  PeerTubeServer,
  setAccessTokensToServers,
  setDefaultVideoChannel,
  waitJobs
} from '@shared/server-commands'

describe('Test runner common actions', function () {
  let server: PeerTubeServer
  let registrationToken: string
  let runnerToken: string
  let jobMaxPriority: string

  before(async function () {
    this.timeout(120_000)

    server = await createSingleServer(1, {
      remote_runners: {
        stalled_jobs: {
          vod: '5 seconds'
        }
      }
    })

    await setAccessTokensToServers([ server ])
    await setDefaultVideoChannel([ server ])

    await server.config.enableTranscoding(true, true)
    await server.config.enableRemoteTranscoding()
  })

  describe('Managing runner registration tokens', function () {
    let base: RunnerRegistrationToken[]
    let registrationTokenToDelete: RunnerRegistrationToken

    it('Should have a default registration token', async function () {
      const { total, data } = await server.runnerRegistrationTokens.list()

      expect(total).to.equal(1)
      expect(data).to.have.lengthOf(1)

      const token = data[0]
      expect(token.id).to.exist
      expect(token.createdAt).to.exist
      expect(token.updatedAt).to.exist
      expect(token.registeredRunnersCount).to.equal(0)
      expect(token.registrationToken).to.exist
    })

    it('Should create other registration tokens', async function () {
      await server.runnerRegistrationTokens.generate()
      await server.runnerRegistrationTokens.generate()

      const { total, data } = await server.runnerRegistrationTokens.list()
      expect(total).to.equal(3)
      expect(data).to.have.lengthOf(3)
    })

    it('Should list registration tokens', async function () {
      {
        const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
        expect(total).to.equal(3)
        expect(data).to.have.lengthOf(3)
        expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
        expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))

        base = data

        registrationTokenToDelete = data[0]
        registrationToken = data[1].registrationToken
      }

      {
        const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
        expect(total).to.equal(3)
        expect(data).to.have.lengthOf(1)
        expect(data[0].registrationToken).to.equal(base[0].registrationToken)
      }
    })

    it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
      await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
      await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })

      const { data } = await server.runnerRegistrationTokens.list()

      for (const d of data) {
        if (d.registrationToken === registrationTokenToDelete.registrationToken) {
          expect(d.registeredRunnersCount).to.equal(2)
        } else {
          expect(d.registeredRunnersCount).to.equal(0)
        }
      }

      const { data: runners } = await server.runners.list()
      expect(runners).to.have.lengthOf(2)
    })

    it('Should delete a registration token', async function () {
      await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })

      const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
      expect(total).to.equal(2)
      expect(data).to.have.lengthOf(2)

      for (const d of data) {
        expect(d.registeredRunnersCount).to.equal(0)
        expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
      }
    })

    it('Should have removed runners of this registration token', async function () {
      const { data: runners } = await server.runners.list()
      expect(runners).to.have.lengthOf(0)
    })
  })

  describe('Managing runners', function () {
    let toDelete: Runner

    it('Should not have runners available', async function () {
      const { total, data } = await server.runners.list()

      expect(data).to.have.lengthOf(0)
      expect(total).to.equal(0)
    })

    it('Should register runners', async function () {
      const now = new Date()

      const result = await server.runners.register({
        name: 'runner 1',
        description: 'my super runner 1',
        registrationToken
      })
      expect(result.runnerToken).to.exist
      runnerToken = result.runnerToken

      await server.runners.register({
        name: 'runner 2',
        registrationToken
      })

      const { total, data } = await server.runners.list({ sort: 'createdAt' })
      expect(total).to.equal(2)
      expect(data).to.have.lengthOf(2)

      for (const d of data) {
        expect(d.id).to.exist
        expect(d.createdAt).to.exist
        expect(d.updatedAt).to.exist
        expect(new Date(d.createdAt)).to.be.above(now)
        expect(new Date(d.updatedAt)).to.be.above(now)
        expect(new Date(d.lastContact)).to.be.above(now)
        expect(d.ip).to.exist
      }

      expect(data[0].name).to.equal('runner 1')
      expect(data[0].description).to.equal('my super runner 1')

      expect(data[1].name).to.equal('runner 2')
      expect(data[1].description).to.be.null

      toDelete = data[1]
    })

    it('Should list runners', async function () {
      const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })

      expect(total).to.equal(2)
      expect(data).to.have.lengthOf(1)
      expect(data[0].name).to.equal('runner 1')
    })

    it('Should delete a runner', async function () {
      await server.runners.delete({ id: toDelete.id })

      const { total, data } = await server.runners.list()

      expect(total).to.equal(1)
      expect(data).to.have.lengthOf(1)
      expect(data[0].name).to.equal('runner 1')
    })

    it('Should unregister a runner', async function () {
      const registered = await server.runners.autoRegisterRunner()

      {
        const { total, data } = await server.runners.list()
        expect(total).to.equal(2)
        expect(data).to.have.lengthOf(2)
      }

      await server.runners.unregister({ runnerToken: registered })

      {
        const { total, data } = await server.runners.list()
        expect(total).to.equal(1)
        expect(data).to.have.lengthOf(1)
        expect(data[0].name).to.equal('runner 1')
      }
    })
  })

  describe('Managing runner jobs', function () {
    let jobUUID: string
    let jobToken: string
    let lastRunnerContact: Date
    let failedJob: RunnerJob

    async function checkMainJobState (
      mainJobState: RunnerJobState,
      otherJobStates: RunnerJobState[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
    ) {
      const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })

      for (const job of data) {
        if (job.uuid === jobUUID) {
          expect(job.state.id).to.equal(mainJobState)
        } else {
          expect(otherJobStates).to.include(job.state.id)
        }
      }
    }

    function getMainJob () {
      return server.runnerJobs.getJob({ uuid: jobUUID })
    }

    describe('List jobs', function () {

      it('Should not have jobs', async function () {
        const { total, data } = await server.runnerJobs.list()

        expect(data).to.have.lengthOf(0)
        expect(total).to.equal(0)
      })

      it('Should upload a video and have available jobs', async function () {
        await server.videos.quickUpload({ name: 'to transcode' })
        await waitJobs([ server ])

        const { total, data } = await server.runnerJobs.list()

        expect(data).to.have.lengthOf(10)
        expect(total).to.equal(10)

        for (const job of data) {
          expect(job.startedAt).to.not.exist
          expect(job.finishedAt).to.not.exist
          expect(job.payload).to.exist
          expect(job.privatePayload).to.exist
        }

        const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
        const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')

        expect(hlsJobs).to.have.lengthOf(5)
        expect(webVideoJobs).to.have.lengthOf(5)

        const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
        const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)

        expect(pendingJobs).to.have.lengthOf(1)
        expect(waitingJobs).to.have.lengthOf(9)
      })

      it('Should upload another video and list/sort jobs', async function () {
        await server.videos.quickUpload({ name: 'to transcode 2' })
        await waitJobs([ server ])

        {
          const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })

          expect(data).to.have.lengthOf(20)
          expect(total).to.equal(20)

          jobUUID = data[16].uuid
        }

        {
          const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
          expect(total).to.equal(20)

          expect(data).to.have.lengthOf(1)
          expect(data[0].uuid).to.equal(jobUUID)
        }

        {
          let previousPriority = Infinity
          const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
          expect(total).to.equal(20)

          for (const job of data) {
            expect(job.priority).to.be.at.most(previousPriority)
            previousPriority = job.priority

            if (job.state.id === RunnerJobState.PENDING) {
              jobMaxPriority = job.uuid
            }
          }
        }
      })

      it('Should search jobs', async function () {
        {
          const { total, data } = await server.runnerJobs.list({ search: jobUUID })

          expect(data).to.have.lengthOf(1)
          expect(total).to.equal(1)

          expect(data[0].uuid).to.equal(jobUUID)
        }

        {
          const { total, data } = await server.runnerJobs.list({ search: 'toto' })

          expect(data).to.have.lengthOf(0)
          expect(total).to.equal(0)
        }

        {
          const { total, data } = await server.runnerJobs.list({ search: 'hls' })

          expect(data).to.not.have.lengthOf(0)
          expect(total).to.not.equal(0)
        }
      })
    })

    describe('Accept/update/abort/process a job', function () {

      it('Should request available jobs', async function () {
        lastRunnerContact = new Date()

        const { availableJobs } = await server.runnerJobs.request({ runnerToken })

        // Only optimize jobs are available
        expect(availableJobs).to.have.lengthOf(2)

        for (const job of availableJobs) {
          expect(job.uuid).to.exist
          expect(job.payload.input).to.exist
          expect((job.payload as RunnerJobVODWebVideoTranscodingPayload).output).to.exist

          expect((job as RunnerJobAdmin).privatePayload).to.not.exist
        }

        const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
        const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')

        expect(hlsJobs).to.have.lengthOf(0)
        expect(webVideoJobs).to.have.lengthOf(2)

        jobUUID = webVideoJobs[0].uuid
      })

      it('Should have sorted available jobs by priority', async function () {
        const { availableJobs } = await server.runnerJobs.request({ runnerToken })

        expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
      })

      it('Should have last runner contact updated', async function () {
        await wait(1000)

        const { data } = await server.runners.list({ sort: 'createdAt' })
        expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
      })

      it('Should accept a job', async function () {
        const startedAt = new Date()

        const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
        jobToken = job.jobToken

        const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
          expect(job.uuid).to.equal(jobUUID)

          expect(job.type).to.equal('vod-web-video-transcoding')
          expect(job.state.label).to.equal('Processing')
          expect(job.state.id).to.equal(RunnerJobState.PROCESSING)

          expect(job.runner).to.exist
          expect(job.runner.name).to.equal('runner 1')
          expect(job.runner.description).to.equal('my super runner 1')

          expect(job.progress).to.be.null

          expect(job.startedAt).to.exist
          expect(new Date(job.startedAt)).to.be.above(startedAt)

          expect(job.finishedAt).to.not.exist

          expect(job.failures).to.equal(0)

          expect(job.payload).to.exist

          if (fromAccept) {
            expect(job.jobToken).to.exist
            expect((job as RunnerJobAdmin).privatePayload).to.not.exist
          } else {
            expect(job.jobToken).to.not.exist
            expect((job as RunnerJobAdmin).privatePayload).to.exist
          }
        }

        checkProcessingJob(job, true)

        const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })

        const processingJob = data.find(j => j.uuid === jobUUID)
        checkProcessingJob(processingJob, false)

        await checkMainJobState(RunnerJobState.PROCESSING)
      })

      it('Should update a job', async function () {
        await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })

        const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })

        for (const job of data) {
          if (job.state.id === RunnerJobState.PROCESSING) {
            expect(job.progress).to.equal(53)
          } else {
            expect(job.progress).to.be.null
          }
        }
      })

      it('Should abort a job', async function () {
        await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })

        await checkMainJobState(RunnerJobState.PENDING)

        const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
        for (const job of data) {
          expect(job.progress).to.be.null
        }
      })

      it('Should accept the same job again and post a success', async function () {
        const { availableJobs } = await server.runnerJobs.request({ runnerToken })
        expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist

        const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
        jobToken = job.jobToken

        await checkMainJobState(RunnerJobState.PROCESSING)

        const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })

        for (const job of data) {
          expect(job.progress).to.be.null
        }

        const payload = {
          videoFile: 'video_short.mp4'
        }

        await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
      })

      it('Should not have available jobs anymore', async function () {
        await checkMainJobState(RunnerJobState.COMPLETED)

        const job = await getMainJob()
        expect(job.finishedAt).to.exist

        const { availableJobs } = await server.runnerJobs.request({ runnerToken })
        expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
      })
    })

    describe('Error job', function () {

      it('Should accept another job and post an error', async function () {
        await server.runnerJobs.cancelAllJobs()
        await server.videos.quickUpload({ name: 'video' })
        await waitJobs([ server ])

        const { availableJobs } = await server.runnerJobs.request({ runnerToken })
        jobUUID = availableJobs[0].uuid

        const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
        jobToken = job.jobToken

        await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
      })

      it('Should have job failures increased', async function () {
        const job = await getMainJob()
        expect(job.state.id).to.equal(RunnerJobState.PENDING)
        expect(job.failures).to.equal(1)
        expect(job.error).to.be.null
        expect(job.progress).to.be.null
        expect(job.finishedAt).to.not.exist
      })

      it('Should error a job when job attempts is too big', async function () {
        for (let i = 0; i < 4; i++) {
          const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
          jobToken = job.jobToken

          await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
        }

        const job = await getMainJob()
        expect(job.failures).to.equal(5)
        expect(job.state.id).to.equal(RunnerJobState.ERRORED)
        expect(job.state.label).to.equal('Errored')
        expect(job.error).to.equal('Error 3')
        expect(job.progress).to.be.null
        expect(job.finishedAt).to.exist

        failedJob = job
      })

      it('Should have failed children jobs too', async function () {
        const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })

        const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
        expect(children).to.have.lengthOf(9)

        for (const child of children) {
          expect(child.parent.uuid).to.equal(failedJob.uuid)
          expect(child.parent.type).to.equal(failedJob.type)
          expect(child.parent.state.id).to.equal(failedJob.state.id)
          expect(child.parent.state.label).to.equal(failedJob.state.label)

          expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
          expect(child.state.label).to.equal('Parent job failed')
        }
      })
    })

    describe('Cancel', function () {

      it('Should cancel a pending job', async function () {
        await server.videos.quickUpload({ name: 'video' })
        await waitJobs([ server ])

        {
          const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })

          const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
          jobUUID = pendingJob.uuid

          await server.runnerJobs.cancelByAdmin({ jobUUID })
        }

        {
          const job = await getMainJob()
          expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
          expect(job.state.label).to.equal('Cancelled')
        }

        {
          const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
          const children = data.filter(j => j.parent?.uuid === jobUUID)
          expect(children).to.have.lengthOf(9)

          for (const child of children) {
            expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
          }
        }
      })

      it('Should cancel an already accepted job and skip success/error', async function () {
        await server.videos.quickUpload({ name: 'video' })
        await waitJobs([ server ])

        const { availableJobs } = await server.runnerJobs.request({ runnerToken })
        jobUUID = availableJobs[0].uuid

        const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
        jobToken = job.jobToken

        await server.runnerJobs.cancelByAdmin({ jobUUID })

        await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
      })
    })

    describe('Stalled jobs', function () {

      it('Should abort stalled jobs', async function () {
        this.timeout(60000)

        await server.videos.quickUpload({ name: 'video' })
        await server.videos.quickUpload({ name: 'video' })
        await waitJobs([ server ])

        const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
        const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })

        for (let i = 0; i < 6; i++) {
          await wait(2000)

          await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
        }

        const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
        const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })

        expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
        expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
      })
    })

    describe('Rate limit', function () {

      before(async function () {
        this.timeout(60000)

        await server.kill()

        await server.run({
          rates_limit: {
            api: {
              max: 10
            }
          }
        })
      })

      it('Should rate limit an unknown runner', async function () {
        const path = '/api/v1/ping'
        const fields = { runnerToken: 'toto' }

        for (let i = 0; i < 20; i++) {
          try {
            await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.OK_200 })
          } catch {}
        }

        await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
      })

      it('Should not rate limit a registered runner', async function () {
        const path = '/api/v1/ping'

        for (let i = 0; i < 20; i++) {
          await makePostBodyRequest({ url: server.url, path, fields: { runnerToken }, expectedStatus: HttpStatusCode.OK_200 })
        }
      })
    })
  })

  after(async function () {
    await cleanupTests([ server ])
  })
})