/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import { expect } from 'chai'
import { wait } from '@shared/core-utils'
import {
HttpStatusCode,
Runner,
RunnerJob,
RunnerJobAdmin,
RunnerJobState,
RunnerJobVODWebVideoTranscodingPayload,
RunnerRegistrationToken
} from '@shared/models'
import {
cleanupTests,
createSingleServer,
makePostBodyRequest,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel,
waitJobs
} from '@shared/server-commands'
describe('Test runner common actions', function () {
let server: PeerTubeServer
let registrationToken: string
let runnerToken: string
let jobMaxPriority: string
before(async function () {
this.timeout(120_000)
server = await createSingleServer(1, {
remote_runners: {
stalled_jobs: {
vod: '5 seconds'
}
}
})
await setAccessTokensToServers([ server ])
await setDefaultVideoChannel([ server ])
await server.config.enableTranscoding(true, true)
await server.config.enableRemoteTranscoding()
})
describe('Managing runner registration tokens', function () {
let base: RunnerRegistrationToken[]
let registrationTokenToDelete: RunnerRegistrationToken
it('Should have a default registration token', async function () {
const { total, data } = await server.runnerRegistrationTokens.list()
expect(total).to.equal(1)
expect(data).to.have.lengthOf(1)
const token = data[0]
expect(token.id).to.exist
expect(token.createdAt).to.exist
expect(token.updatedAt).to.exist
expect(token.registeredRunnersCount).to.equal(0)
expect(token.registrationToken).to.exist
})
it('Should create other registration tokens', async function () {
await server.runnerRegistrationTokens.generate()
await server.runnerRegistrationTokens.generate()
const { total, data } = await server.runnerRegistrationTokens.list()
expect(total).to.equal(3)
expect(data).to.have.lengthOf(3)
})
it('Should list registration tokens', async function () {
{
const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
expect(total).to.equal(3)
expect(data).to.have.lengthOf(3)
expect(new Date(data[0].createdAt)).to.be.below(new Date(data[1].createdAt))
expect(new Date(data[1].createdAt)).to.be.below(new Date(data[2].createdAt))
base = data
registrationTokenToDelete = data[0]
registrationToken = data[1].registrationToken
}
{
const { total, data } = await server.runnerRegistrationTokens.list({ sort: '-createdAt', start: 2, count: 1 })
expect(total).to.equal(3)
expect(data).to.have.lengthOf(1)
expect(data[0].registrationToken).to.equal(base[0].registrationToken)
}
})
it('Should have appropriate registeredRunnersCount for registration tokens', async function () {
await server.runners.register({ name: 'to delete 1', registrationToken: registrationTokenToDelete.registrationToken })
await server.runners.register({ name: 'to delete 2', registrationToken: registrationTokenToDelete.registrationToken })
const { data } = await server.runnerRegistrationTokens.list()
for (const d of data) {
if (d.registrationToken === registrationTokenToDelete.registrationToken) {
expect(d.registeredRunnersCount).to.equal(2)
} else {
expect(d.registeredRunnersCount).to.equal(0)
}
}
const { data: runners } = await server.runners.list()
expect(runners).to.have.lengthOf(2)
})
it('Should delete a registration token', async function () {
await server.runnerRegistrationTokens.delete({ id: registrationTokenToDelete.id })
const { total, data } = await server.runnerRegistrationTokens.list({ sort: 'createdAt' })
expect(total).to.equal(2)
expect(data).to.have.lengthOf(2)
for (const d of data) {
expect(d.registeredRunnersCount).to.equal(0)
expect(d.registrationToken).to.not.equal(registrationTokenToDelete.registrationToken)
}
})
it('Should have removed runners of this registration token', async function () {
const { data: runners } = await server.runners.list()
expect(runners).to.have.lengthOf(0)
})
})
describe('Managing runners', function () {
let toDelete: Runner
it('Should not have runners available', async function () {
const { total, data } = await server.runners.list()
expect(data).to.have.lengthOf(0)
expect(total).to.equal(0)
})
it('Should register runners', async function () {
const now = new Date()
const result = await server.runners.register({
name: 'runner 1',
description: 'my super runner 1',
registrationToken
})
expect(result.runnerToken).to.exist
runnerToken = result.runnerToken
await server.runners.register({
name: 'runner 2',
registrationToken
})
const { total, data } = await server.runners.list({ sort: 'createdAt' })
expect(total).to.equal(2)
expect(data).to.have.lengthOf(2)
for (const d of data) {
expect(d.id).to.exist
expect(d.createdAt).to.exist
expect(d.updatedAt).to.exist
expect(new Date(d.createdAt)).to.be.above(now)
expect(new Date(d.updatedAt)).to.be.above(now)
expect(new Date(d.lastContact)).to.be.above(now)
expect(d.ip).to.exist
}
expect(data[0].name).to.equal('runner 1')
expect(data[0].description).to.equal('my super runner 1')
expect(data[1].name).to.equal('runner 2')
expect(data[1].description).to.be.null
toDelete = data[1]
})
it('Should list runners', async function () {
const { total, data } = await server.runners.list({ sort: '-createdAt', start: 1, count: 1 })
expect(total).to.equal(2)
expect(data).to.have.lengthOf(1)
expect(data[0].name).to.equal('runner 1')
})
it('Should delete a runner', async function () {
await server.runners.delete({ id: toDelete.id })
const { total, data } = await server.runners.list()
expect(total).to.equal(1)
expect(data).to.have.lengthOf(1)
expect(data[0].name).to.equal('runner 1')
})
it('Should unregister a runner', async function () {
const registered = await server.runners.autoRegisterRunner()
{
const { total, data } = await server.runners.list()
expect(total).to.equal(2)
expect(data).to.have.lengthOf(2)
}
await server.runners.unregister({ runnerToken: registered })
{
const { total, data } = await server.runners.list()
expect(total).to.equal(1)
expect(data).to.have.lengthOf(1)
expect(data[0].name).to.equal('runner 1')
}
})
})
describe('Managing runner jobs', function () {
let jobUUID: string
let jobToken: string
let lastRunnerContact: Date
let failedJob: RunnerJob
async function checkMainJobState (
mainJobState: RunnerJobState,
otherJobStates: RunnerJobState[] = [ RunnerJobState.PENDING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
) {
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
for (const job of data) {
if (job.uuid === jobUUID) {
expect(job.state.id).to.equal(mainJobState)
} else {
expect(otherJobStates).to.include(job.state.id)
}
}
}
function getMainJob () {
return server.runnerJobs.getJob({ uuid: jobUUID })
}
describe('List jobs', function () {
it('Should not have jobs', async function () {
const { total, data } = await server.runnerJobs.list()
expect(data).to.have.lengthOf(0)
expect(total).to.equal(0)
})
it('Should upload a video and have available jobs', async function () {
await server.videos.quickUpload({ name: 'to transcode' })
await waitJobs([ server ])
const { total, data } = await server.runnerJobs.list()
expect(data).to.have.lengthOf(10)
expect(total).to.equal(10)
for (const job of data) {
expect(job.startedAt).to.not.exist
expect(job.finishedAt).to.not.exist
expect(job.payload).to.exist
expect(job.privatePayload).to.exist
}
const hlsJobs = data.filter(d => d.type === 'vod-hls-transcoding')
const webVideoJobs = data.filter(d => d.type === 'vod-web-video-transcoding')
expect(hlsJobs).to.have.lengthOf(5)
expect(webVideoJobs).to.have.lengthOf(5)
const pendingJobs = data.filter(d => d.state.id === RunnerJobState.PENDING)
const waitingJobs = data.filter(d => d.state.id === RunnerJobState.WAITING_FOR_PARENT_JOB)
expect(pendingJobs).to.have.lengthOf(1)
expect(waitingJobs).to.have.lengthOf(9)
})
it('Should upload another video and list/sort jobs', async function () {
await server.videos.quickUpload({ name: 'to transcode 2' })
await waitJobs([ server ])
{
const { total, data } = await server.runnerJobs.list({ start: 0, count: 30 })
expect(data).to.have.lengthOf(20)
expect(total).to.equal(20)
jobUUID = data[16].uuid
}
{
const { total, data } = await server.runnerJobs.list({ start: 3, count: 1, sort: 'createdAt' })
expect(total).to.equal(20)
expect(data).to.have.lengthOf(1)
expect(data[0].uuid).to.equal(jobUUID)
}
{
let previousPriority = Infinity
const { total, data } = await server.runnerJobs.list({ start: 0, count: 100, sort: '-priority' })
expect(total).to.equal(20)
for (const job of data) {
expect(job.priority).to.be.at.most(previousPriority)
previousPriority = job.priority
if (job.state.id === RunnerJobState.PENDING) {
jobMaxPriority = job.uuid
}
}
}
})
it('Should search jobs', async function () {
{
const { total, data } = await server.runnerJobs.list({ search: jobUUID })
expect(data).to.have.lengthOf(1)
expect(total).to.equal(1)
expect(data[0].uuid).to.equal(jobUUID)
}
{
const { total, data } = await server.runnerJobs.list({ search: 'toto' })
expect(data).to.have.lengthOf(0)
expect(total).to.equal(0)
}
{
const { total, data } = await server.runnerJobs.list({ search: 'hls' })
expect(data).to.not.have.lengthOf(0)
expect(total).to.not.equal(0)
}
})
})
describe('Accept/update/abort/process a job', function () {
it('Should request available jobs', async function () {
lastRunnerContact = new Date()
const { availableJobs } = await server.runnerJobs.request({ runnerToken })
// Only optimize jobs are available
expect(availableJobs).to.have.lengthOf(2)
for (const job of availableJobs) {
expect(job.uuid).to.exist
expect(job.payload.input).to.exist
expect((job.payload as RunnerJobVODWebVideoTranscodingPayload).output).to.exist
expect((job as RunnerJobAdmin).privatePayload).to.not.exist
}
const hlsJobs = availableJobs.filter(d => d.type === 'vod-hls-transcoding')
const webVideoJobs = availableJobs.filter(d => d.type === 'vod-web-video-transcoding')
expect(hlsJobs).to.have.lengthOf(0)
expect(webVideoJobs).to.have.lengthOf(2)
jobUUID = webVideoJobs[0].uuid
})
it('Should have sorted available jobs by priority', async function () {
const { availableJobs } = await server.runnerJobs.request({ runnerToken })
expect(availableJobs[0].uuid).to.equal(jobMaxPriority)
})
it('Should have last runner contact updated', async function () {
await wait(1000)
const { data } = await server.runners.list({ sort: 'createdAt' })
expect(new Date(data[0].lastContact)).to.be.above(lastRunnerContact)
})
it('Should accept a job', async function () {
const startedAt = new Date()
const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
jobToken = job.jobToken
const checkProcessingJob = (job: RunnerJob & { jobToken?: string }, fromAccept: boolean) => {
expect(job.uuid).to.equal(jobUUID)
expect(job.type).to.equal('vod-web-video-transcoding')
expect(job.state.label).to.equal('Processing')
expect(job.state.id).to.equal(RunnerJobState.PROCESSING)
expect(job.runner).to.exist
expect(job.runner.name).to.equal('runner 1')
expect(job.runner.description).to.equal('my super runner 1')
expect(job.progress).to.be.null
expect(job.startedAt).to.exist
expect(new Date(job.startedAt)).to.be.above(startedAt)
expect(job.finishedAt).to.not.exist
expect(job.failures).to.equal(0)
expect(job.payload).to.exist
if (fromAccept) {
expect(job.jobToken).to.exist
expect((job as RunnerJobAdmin).privatePayload).to.not.exist
} else {
expect(job.jobToken).to.not.exist
expect((job as RunnerJobAdmin).privatePayload).to.exist
}
}
checkProcessingJob(job, true)
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
const processingJob = data.find(j => j.uuid === jobUUID)
checkProcessingJob(processingJob, false)
await checkMainJobState(RunnerJobState.PROCESSING)
})
it('Should update a job', async function () {
await server.runnerJobs.update({ runnerToken, jobUUID, jobToken, progress: 53 })
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
for (const job of data) {
if (job.state.id === RunnerJobState.PROCESSING) {
expect(job.progress).to.equal(53)
} else {
expect(job.progress).to.be.null
}
}
})
it('Should abort a job', async function () {
await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'for tests' })
await checkMainJobState(RunnerJobState.PENDING)
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
for (const job of data) {
expect(job.progress).to.be.null
}
})
it('Should accept the same job again and post a success', async function () {
const { availableJobs } = await server.runnerJobs.request({ runnerToken })
expect(availableJobs.find(j => j.uuid === jobUUID)).to.exist
const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
jobToken = job.jobToken
await checkMainJobState(RunnerJobState.PROCESSING)
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
for (const job of data) {
expect(job.progress).to.be.null
}
const payload = {
videoFile: 'video_short.mp4'
}
await server.runnerJobs.success({ runnerToken, jobUUID, jobToken, payload })
})
it('Should not have available jobs anymore', async function () {
await checkMainJobState(RunnerJobState.COMPLETED)
const job = await getMainJob()
expect(job.finishedAt).to.exist
const { availableJobs } = await server.runnerJobs.request({ runnerToken })
expect(availableJobs.find(j => j.uuid === jobUUID)).to.not.exist
})
})
describe('Error job', function () {
it('Should accept another job and post an error', async function () {
await server.runnerJobs.cancelAllJobs()
await server.videos.quickUpload({ name: 'video' })
await waitJobs([ server ])
const { availableJobs } = await server.runnerJobs.request({ runnerToken })
jobUUID = availableJobs[0].uuid
const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
jobToken = job.jobToken
await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error' })
})
it('Should have job failures increased', async function () {
const job = await getMainJob()
expect(job.state.id).to.equal(RunnerJobState.PENDING)
expect(job.failures).to.equal(1)
expect(job.error).to.be.null
expect(job.progress).to.be.null
expect(job.finishedAt).to.not.exist
})
it('Should error a job when job attempts is too big', async function () {
for (let i = 0; i < 4; i++) {
const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
jobToken = job.jobToken
await server.runnerJobs.error({ runnerToken, jobUUID, jobToken, message: 'Error ' + i })
}
const job = await getMainJob()
expect(job.failures).to.equal(5)
expect(job.state.id).to.equal(RunnerJobState.ERRORED)
expect(job.state.label).to.equal('Errored')
expect(job.error).to.equal('Error 3')
expect(job.progress).to.be.null
expect(job.finishedAt).to.exist
failedJob = job
})
it('Should have failed children jobs too', async function () {
const { data } = await server.runnerJobs.list({ count: 50, sort: '-updatedAt' })
const children = data.filter(j => j.parent?.uuid === failedJob.uuid)
expect(children).to.have.lengthOf(9)
for (const child of children) {
expect(child.parent.uuid).to.equal(failedJob.uuid)
expect(child.parent.type).to.equal(failedJob.type)
expect(child.parent.state.id).to.equal(failedJob.state.id)
expect(child.parent.state.label).to.equal(failedJob.state.label)
expect(child.state.id).to.equal(RunnerJobState.PARENT_ERRORED)
expect(child.state.label).to.equal('Parent job failed')
}
})
})
describe('Cancel', function () {
it('Should cancel a pending job', async function () {
await server.videos.quickUpload({ name: 'video' })
await waitJobs([ server ])
{
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
const pendingJob = data.find(j => j.state.id === RunnerJobState.PENDING)
jobUUID = pendingJob.uuid
await server.runnerJobs.cancelByAdmin({ jobUUID })
}
{
const job = await getMainJob()
expect(job.state.id).to.equal(RunnerJobState.CANCELLED)
expect(job.state.label).to.equal('Cancelled')
}
{
const { data } = await server.runnerJobs.list({ count: 10, sort: '-updatedAt' })
const children = data.filter(j => j.parent?.uuid === jobUUID)
expect(children).to.have.lengthOf(9)
for (const child of children) {
expect(child.state.id).to.equal(RunnerJobState.PARENT_CANCELLED)
}
}
})
it('Should cancel an already accepted job and skip success/error', async function () {
await server.videos.quickUpload({ name: 'video' })
await waitJobs([ server ])
const { availableJobs } = await server.runnerJobs.request({ runnerToken })
jobUUID = availableJobs[0].uuid
const { job } = await server.runnerJobs.accept({ runnerToken, jobUUID })
jobToken = job.jobToken
await server.runnerJobs.cancelByAdmin({ jobUUID })
await server.runnerJobs.abort({ runnerToken, jobUUID, jobToken, reason: 'aborted', expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})
})
describe('Stalled jobs', function () {
it('Should abort stalled jobs', async function () {
this.timeout(60000)
await server.videos.quickUpload({ name: 'video' })
await server.videos.quickUpload({ name: 'video' })
await waitJobs([ server ])
const { job: job1 } = await server.runnerJobs.autoAccept({ runnerToken })
const { job: stalledJob } = await server.runnerJobs.autoAccept({ runnerToken })
for (let i = 0; i < 6; i++) {
await wait(2000)
await server.runnerJobs.update({ runnerToken, jobToken: job1.jobToken, jobUUID: job1.uuid })
}
const refreshedJob1 = await server.runnerJobs.getJob({ uuid: job1.uuid })
const refreshedStalledJob = await server.runnerJobs.getJob({ uuid: stalledJob.uuid })
expect(refreshedJob1.state.id).to.equal(RunnerJobState.PROCESSING)
expect(refreshedStalledJob.state.id).to.equal(RunnerJobState.PENDING)
})
})
describe('Rate limit', function () {
before(async function () {
this.timeout(60000)
await server.kill()
await server.run({
rates_limit: {
api: {
max: 10
}
}
})
})
it('Should rate limit an unknown runner', async function () {
const path = '/api/v1/ping'
const fields = { runnerToken: 'toto' }
for (let i = 0; i < 20; i++) {
try {
await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.OK_200 })
} catch {}
}
await makePostBodyRequest({ url: server.url, path, fields, expectedStatus: HttpStatusCode.TOO_MANY_REQUESTS_429 })
})
it('Should not rate limit a registered runner', async function () {
const path = '/api/v1/ping'
for (let i = 0; i < 20; i++) {
await makePostBodyRequest({ url: server.url, path, fields: { runnerToken }, expectedStatus: HttpStatusCode.OK_200 })
}
})
})
})
after(async function () {
await cleanupTests([ server ])
})
})