From d102de1b38f2877463529c3b27bd35ffef4fd8bf Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 15:00:01 +0200 Subject: Add runner server tests --- shared/server-commands/index.ts | 2 +- shared/server-commands/miscs/index.ts | 2 - shared/server-commands/miscs/sql-command.ts | 146 ----------- shared/server-commands/miscs/webtorrent.ts | 46 ---- shared/server-commands/requests/requests.ts | 37 ++- shared/server-commands/runners/index.ts | 3 + .../server-commands/runners/runner-jobs-command.ts | 279 +++++++++++++++++++++ .../runners/runner-registration-tokens-command.ts | 55 ++++ shared/server-commands/runners/runners-command.ts | 77 ++++++ shared/server-commands/server/config-command.ts | 34 ++- shared/server-commands/server/jobs.ts | 26 +- shared/server-commands/server/server.ts | 20 +- shared/server-commands/server/servers.ts | 2 +- shared/server-commands/shared/abstract-command.ts | 4 +- shared/server-commands/socket/socket-io-command.ts | 9 + shared/server-commands/videos/live-command.ts | 2 +- .../videos/streaming-playlists-command.ts | 4 +- 17 files changed, 527 insertions(+), 221 deletions(-) delete mode 100644 shared/server-commands/miscs/index.ts delete mode 100644 shared/server-commands/miscs/sql-command.ts delete mode 100644 shared/server-commands/miscs/webtorrent.ts create mode 100644 shared/server-commands/runners/index.ts create mode 100644 shared/server-commands/runners/runner-jobs-command.ts create mode 100644 shared/server-commands/runners/runner-registration-tokens-command.ts create mode 100644 shared/server-commands/runners/runners-command.ts (limited to 'shared') diff --git a/shared/server-commands/index.ts b/shared/server-commands/index.ts index c24ebb2df..a4581dbc0 100644 --- a/shared/server-commands/index.ts +++ b/shared/server-commands/index.ts @@ -3,10 +3,10 @@ export * from './cli' export * from './custom-pages' export * from './feeds' export * from './logs' -export * from './miscs' export * from './moderation' export * from './overviews' export * from './requests' +export * from './runners' export * from './search' export * from './server' export * from './socket' diff --git a/shared/server-commands/miscs/index.ts b/shared/server-commands/miscs/index.ts deleted file mode 100644 index a1d14e998..000000000 --- a/shared/server-commands/miscs/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './sql-command' -export * from './webtorrent' diff --git a/shared/server-commands/miscs/sql-command.ts b/shared/server-commands/miscs/sql-command.ts deleted file mode 100644 index 35cc2253f..000000000 --- a/shared/server-commands/miscs/sql-command.ts +++ /dev/null @@ -1,146 +0,0 @@ -import { QueryTypes, Sequelize } from 'sequelize' -import { forceNumber } from '@shared/core-utils' -import { AbstractCommand } from '../shared' - -export class SQLCommand extends AbstractCommand { - private sequelize: Sequelize - - deleteAll (table: string) { - const seq = this.getSequelize() - - const options = { type: QueryTypes.DELETE } - - return seq.query(`DELETE FROM "${table}"`, options) - } - - async getVideoShareCount () { - const [ { total } ] = await this.selectQuery<{ total: string }>(`SELECT COUNT(*) as total FROM "videoShare"`) - if (total === null) return 0 - - return parseInt(total, 10) - } - - async getInternalFileUrl (fileId: number) { - return this.selectQuery<{ fileUrl: string }>(`SELECT "fileUrl" FROM "videoFile" WHERE id = :fileId`, { fileId }) - .then(rows => rows[0].fileUrl) - } - - setActorField (to: string, field: string, value: string) { - return this.updateQuery(`UPDATE actor SET ${this.escapeColumnName(field)} = :value WHERE url = :to`, { value, to }) - } - - setVideoField (uuid: string, field: string, value: string) { - return this.updateQuery(`UPDATE video SET ${this.escapeColumnName(field)} = :value WHERE uuid = :uuid`, { value, uuid }) - } - - setPlaylistField (uuid: string, field: string, value: string) { - return this.updateQuery(`UPDATE "videoPlaylist" SET ${this.escapeColumnName(field)} = :value WHERE uuid = :uuid`, { value, uuid }) - } - - async countVideoViewsOf (uuid: string) { - const query = 'SELECT SUM("videoView"."views") AS "total" FROM "videoView" ' + - `INNER JOIN "video" ON "video"."id" = "videoView"."videoId" WHERE "video"."uuid" = :uuid` - - const [ { total } ] = await this.selectQuery<{ total: number }>(query, { uuid }) - if (!total) return 0 - - return forceNumber(total) - } - - getActorImage (filename: string) { - return this.selectQuery<{ width: number, height: number }>(`SELECT * FROM "actorImage" WHERE filename = :filename`, { filename }) - .then(rows => rows[0]) - } - - // --------------------------------------------------------------------------- - - setPluginVersion (pluginName: string, newVersion: string) { - return this.setPluginField(pluginName, 'version', newVersion) - } - - setPluginLatestVersion (pluginName: string, newVersion: string) { - return this.setPluginField(pluginName, 'latestVersion', newVersion) - } - - setPluginField (pluginName: string, field: string, value: string) { - return this.updateQuery( - `UPDATE "plugin" SET ${this.escapeColumnName(field)} = :value WHERE "name" = :pluginName`, - { pluginName, value } - ) - } - - // --------------------------------------------------------------------------- - - selectQuery (query: string, replacements: { [id: string]: string | number } = {}) { - const seq = this.getSequelize() - const options = { - type: QueryTypes.SELECT as QueryTypes.SELECT, - replacements - } - - return seq.query(query, options) - } - - updateQuery (query: string, replacements: { [id: string]: string | number } = {}) { - const seq = this.getSequelize() - const options = { type: QueryTypes.UPDATE as QueryTypes.UPDATE, replacements } - - return seq.query(query, options) - } - - // --------------------------------------------------------------------------- - - async getPlaylistInfohash (playlistId: number) { - const query = 'SELECT "p2pMediaLoaderInfohashes" FROM "videoStreamingPlaylist" WHERE id = :playlistId' - - const result = await this.selectQuery<{ p2pMediaLoaderInfohashes: string }>(query, { playlistId }) - if (!result || result.length === 0) return [] - - return result[0].p2pMediaLoaderInfohashes - } - - // --------------------------------------------------------------------------- - - setActorFollowScores (newScore: number) { - return this.updateQuery(`UPDATE "actorFollow" SET "score" = :newScore`, { newScore }) - } - - setTokenField (accessToken: string, field: string, value: string) { - return this.updateQuery( - `UPDATE "oAuthToken" SET ${this.escapeColumnName(field)} = :value WHERE "accessToken" = :accessToken`, - { value, accessToken } - ) - } - - async cleanup () { - if (!this.sequelize) return - - await this.sequelize.close() - this.sequelize = undefined - } - - private getSequelize () { - if (this.sequelize) return this.sequelize - - const dbname = 'peertube_test' + this.server.internalServerNumber - const username = 'peertube' - const password = 'peertube' - const host = '127.0.0.1' - const port = 5432 - - this.sequelize = new Sequelize(dbname, username, password, { - dialect: 'postgres', - host, - port, - logging: false - }) - - return this.sequelize - } - - private escapeColumnName (columnName: string) { - return this.getSequelize().escape(columnName) - .replace(/^'/, '"') - .replace(/'$/, '"') - } -} diff --git a/shared/server-commands/miscs/webtorrent.ts b/shared/server-commands/miscs/webtorrent.ts deleted file mode 100644 index 0683f8893..000000000 --- a/shared/server-commands/miscs/webtorrent.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { readFile } from 'fs-extra' -import parseTorrent from 'parse-torrent' -import { basename, join } from 'path' -import * as WebTorrent from 'webtorrent' -import { VideoFile } from '@shared/models' -import { PeerTubeServer } from '../server' - -let webtorrent: WebTorrent.Instance - -function webtorrentAdd (torrentId: string, refreshWebTorrent = false) { - const WebTorrent = require('webtorrent') - - if (webtorrent && refreshWebTorrent) webtorrent.destroy() - if (!webtorrent || refreshWebTorrent) webtorrent = new WebTorrent() - - webtorrent.on('error', err => console.error('Error in webtorrent', err)) - - return new Promise(res => { - const torrent = webtorrent.add(torrentId, res) - - torrent.on('error', err => console.error('Error in webtorrent torrent', err)) - torrent.on('warning', warn => { - const msg = typeof warn === 'string' - ? warn - : warn.message - - if (msg.includes('Unsupported')) return - - console.error('Warning in webtorrent torrent', warn) - }) - }) -} - -async function parseTorrentVideo (server: PeerTubeServer, file: VideoFile) { - const torrentName = basename(file.torrentUrl) - const torrentPath = server.servers.buildDirectory(join('torrents', torrentName)) - - const data = await readFile(torrentPath) - - return parseTorrent(data) -} - -export { - webtorrentAdd, - parseTorrentVideo -} diff --git a/shared/server-commands/requests/requests.ts b/shared/server-commands/requests/requests.ts index cb0e1a5fb..96f67b4c7 100644 --- a/shared/server-commands/requests/requests.ts +++ b/shared/server-commands/requests/requests.ts @@ -10,6 +10,7 @@ export type CommonRequestParams = { url: string path?: string contentType?: string + responseType?: string range?: string redirects?: number accept?: string @@ -27,16 +28,23 @@ function makeRawRequest (options: { expectedStatus?: HttpStatusCode range?: string query?: { [ id: string ]: string } + method?: 'GET' | 'POST' }) { const { host, protocol, pathname } = new URL(options.url) - return makeGetRequest({ + const reqOptions = { url: `${protocol}//${host}`, path: pathname, contentType: undefined, ...pick(options, [ 'expectedStatus', 'range', 'token', 'query' ]) - }) + } + + if (options.method === 'POST') { + return makePostBodyRequest(reqOptions) + } + + return makeGetRequest(reqOptions) } function makeGetRequest (options: CommonRequestParams & { @@ -135,6 +143,8 @@ function decodeQueryString (path: string) { return decode(path.split('?')[1]) } +// --------------------------------------------------------------------------- + function unwrapBody (test: request.Test): Promise { return test.then(res => res.body) } @@ -149,7 +159,16 @@ function unwrapBodyOrDecodeToJSON (test: request.Test): Promise { try { return JSON.parse(new TextDecoder().decode(res.body)) } catch (err) { - console.error('Cannot decode JSON.', res.body) + console.error('Cannot decode JSON.', res.body instanceof Buffer ? res.body.toString() : res.body) + throw err + } + } + + if (res.text) { + try { + return JSON.parse(res.text) + } catch (err) { + console.error('Cannot decode json', res.text) throw err } } @@ -184,6 +203,7 @@ export { function buildRequest (req: request.Test, options: CommonRequestParams) { if (options.contentType) req.set('Accept', options.contentType) + if (options.responseType) req.responseType(options.responseType) if (options.token) req.set('Authorization', 'Bearer ' + options.token) if (options.range) req.set('Range', options.range) if (options.accept) req.set('Accept', options.accept) @@ -196,13 +216,18 @@ function buildRequest (req: request.Test, options: CommonRequestParams) { req.set(name, options.headers[name]) }) - return req.expect((res) => { + return req.expect(res => { if (options.expectedStatus && res.status !== options.expectedStatus) { - throw new Error(`Expected status ${options.expectedStatus}, got ${res.status}. ` + + const err = new Error(`Expected status ${options.expectedStatus}, got ${res.status}. ` + `\nThe server responded: "${res.body?.error ?? res.text}".\n` + 'You may take a closer look at the logs. To see how to do so, check out this page: ' + - 'https://github.com/Chocobozzz/PeerTube/blob/develop/support/doc/development/tests.md#debug-server-logs') + 'https://github.com/Chocobozzz/PeerTube/blob/develop/support/doc/development/tests.md#debug-server-logs'); + + (err as any).res = res + + throw err } + return res }) } diff --git a/shared/server-commands/runners/index.ts b/shared/server-commands/runners/index.ts new file mode 100644 index 000000000..9e8e1baf2 --- /dev/null +++ b/shared/server-commands/runners/index.ts @@ -0,0 +1,3 @@ +export * from './runner-jobs-command' +export * from './runner-registration-tokens-command' +export * from './runners-command' diff --git a/shared/server-commands/runners/runner-jobs-command.ts b/shared/server-commands/runners/runner-jobs-command.ts new file mode 100644 index 000000000..3b0f84b9d --- /dev/null +++ b/shared/server-commands/runners/runner-jobs-command.ts @@ -0,0 +1,279 @@ +import { omit, pick, wait } from '@shared/core-utils' +import { + AbortRunnerJobBody, + AcceptRunnerJobBody, + AcceptRunnerJobResult, + ErrorRunnerJobBody, + HttpStatusCode, + isHLSTranscodingPayloadSuccess, + isLiveRTMPHLSTranscodingUpdatePayload, + isWebVideoOrAudioMergeTranscodingPayloadSuccess, + RequestRunnerJobBody, + RequestRunnerJobResult, + ResultList, + RunnerJobAdmin, + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobPayload, + RunnerJobState, + RunnerJobSuccessBody, + RunnerJobSuccessPayload, + RunnerJobType, + RunnerJobUpdateBody, + RunnerJobVODPayload +} from '@shared/models' +import { unwrapBody } from '../requests' +import { waitJobs } from '../server' +import { AbstractCommand, OverrideCommandOptions } from '../shared' + +export class RunnerJobsCommand extends AbstractCommand { + + list (options: OverrideCommandOptions & { + start?: number + count?: number + sort?: string + search?: string + } = {}) { + const path = '/api/v1/runners/jobs' + + return this.getRequestBody>({ + ...options, + + path, + query: pick(options, [ 'start', 'count', 'sort', 'search' ]), + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.OK_200 + }) + } + + cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) { + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel' + + return this.postBodyRequest({ + ...options, + + path, + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + // --------------------------------------------------------------------------- + + request (options: OverrideCommandOptions & RequestRunnerJobBody) { + const path = '/api/v1/runners/jobs/request' + + return unwrapBody(this.postBodyRequest({ + ...options, + + path, + fields: pick(options, [ 'runnerToken' ]), + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.OK_200 + })) + } + + async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) { + const vodTypes = new Set([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]) + + const { availableJobs } = await this.request(options) + + return { + availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) + } as RequestRunnerJobResult + } + + async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) { + const vodTypes = new Set([ 'live-rtmp-hls-transcoding' ]) + + const { availableJobs } = await this.request(options) + + return { + availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) + } as RequestRunnerJobResult + } + + // --------------------------------------------------------------------------- + + accept (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) { + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept' + + return unwrapBody>(this.postBodyRequest({ + ...options, + + path, + fields: pick(options, [ 'runnerToken' ]), + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.OK_200 + })) + } + + abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) { + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort' + + return this.postBodyRequest({ + ...options, + + path, + fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]), + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) { + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update' + + const { payload } = options + const attaches: { [id: string]: any } = {} + let payloadWithoutFiles = payload + + if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) { + if (payload.masterPlaylistFile) { + attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile + } + + attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile + attaches[`payload[videoChunkFile]`] = payload.videoChunkFile + + payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ]) + } + + return this.postUploadRequest({ + ...options, + + path, + fields: { + ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]), + + payload: payloadWithoutFiles + }, + attaches, + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) { + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error' + + return this.postBodyRequest({ + ...options, + + path, + fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]), + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) { + const { payload } = options + + const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success' + const attaches: { [id: string]: any } = {} + let payloadWithoutFiles = payload + + if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) { + attaches[`payload[videoFile]`] = payload.videoFile + + payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ]) + } + + if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) { + attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile + + payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ]) + } + + return this.postUploadRequest({ + ...options, + + path, + attaches, + fields: { + ...pick(options, [ 'jobToken', 'runnerToken' ]), + + payload: payloadWithoutFiles + }, + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + getInputFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) { + const { host, protocol, pathname } = new URL(options.url) + + return this.postBodyRequest({ + url: `${protocol}//${host}`, + path: pathname, + + fields: pick(options, [ 'jobToken', 'runnerToken' ]), + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.OK_200 + }) + } + + // --------------------------------------------------------------------------- + + async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) { + const { availableJobs } = await this.request(options) + + const job = options.type + ? availableJobs.find(j => j.type === options.type) + : availableJobs[0] + + return this.accept({ ...options, jobUUID: job.uuid }) + } + + async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) { + let jobUUID = jobUUIDToProcess + + if (!jobUUID) { + const { availableJobs } = await this.request({ runnerToken }) + jobUUID = availableJobs[0].uuid + } + + const { job } = await this.accept({ runnerToken, jobUUID }) + const jobToken = job.jobToken + + const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' } + await this.success({ runnerToken, jobUUID, jobToken, payload }) + + await waitJobs([ this.server ]) + + return job + } + + async cancelAllJobs (options: { state?: RunnerJobState } = {}) { + const { state } = options + + const { data } = await this.list({ count: 100 }) + + for (const job of data) { + if (state && job.state.id !== state) continue + + await this.cancelByAdmin({ jobUUID: job.uuid }) + } + } + + async getJob (options: OverrideCommandOptions & { uuid: string }) { + const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' }) + + return data.find(j => j.uuid === options.uuid) + } + + async requestLiveJob (runnerToken: string) { + let availableJobs: RequestRunnerJobResult['availableJobs'] = [] + + while (availableJobs.length === 0) { + const result = await this.requestLive({ runnerToken }) + availableJobs = result.availableJobs + + if (availableJobs.length === 1) break + + await wait(150) + } + + return availableJobs[0] + } +} diff --git a/shared/server-commands/runners/runner-registration-tokens-command.ts b/shared/server-commands/runners/runner-registration-tokens-command.ts new file mode 100644 index 000000000..e4f2e3d95 --- /dev/null +++ b/shared/server-commands/runners/runner-registration-tokens-command.ts @@ -0,0 +1,55 @@ +import { pick } from '@shared/core-utils' +import { HttpStatusCode, ResultList, RunnerRegistrationToken } from '@shared/models' +import { AbstractCommand, OverrideCommandOptions } from '../shared' + +export class RunnerRegistrationTokensCommand extends AbstractCommand { + + list (options: OverrideCommandOptions & { + start?: number + count?: number + sort?: string + } = {}) { + const path = '/api/v1/runners/registration-tokens' + + return this.getRequestBody>({ + ...options, + + path, + query: pick(options, [ 'start', 'count', 'sort' ]), + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.OK_200 + }) + } + + generate (options: OverrideCommandOptions = {}) { + const path = '/api/v1/runners/registration-tokens/generate' + + return this.postBodyRequest({ + ...options, + + path, + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + delete (options: OverrideCommandOptions & { + id: number + }) { + const path = '/api/v1/runners/registration-tokens/' + options.id + + return this.deleteRequest({ + ...options, + + path, + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + async getFirstRegistrationToken (options: OverrideCommandOptions = {}) { + const { data } = await this.list(options) + + return data[0].registrationToken + } +} diff --git a/shared/server-commands/runners/runners-command.ts b/shared/server-commands/runners/runners-command.ts new file mode 100644 index 000000000..ca9a1d7a3 --- /dev/null +++ b/shared/server-commands/runners/runners-command.ts @@ -0,0 +1,77 @@ +import { pick } from '@shared/core-utils' +import { HttpStatusCode, RegisterRunnerBody, RegisterRunnerResult, ResultList, Runner, UnregisterRunnerBody } from '@shared/models' +import { unwrapBody } from '../requests' +import { AbstractCommand, OverrideCommandOptions } from '../shared' + +export class RunnersCommand extends AbstractCommand { + + list (options: OverrideCommandOptions & { + start?: number + count?: number + sort?: string + } = {}) { + const path = '/api/v1/runners' + + return this.getRequestBody>({ + ...options, + + path, + query: pick(options, [ 'start', 'count', 'sort' ]), + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.OK_200 + }) + } + + register (options: OverrideCommandOptions & RegisterRunnerBody) { + const path = '/api/v1/runners/register' + + return unwrapBody(this.postBodyRequest({ + ...options, + + path, + fields: pick(options, [ 'name', 'registrationToken', 'description' ]), + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.OK_200 + })) + } + + unregister (options: OverrideCommandOptions & UnregisterRunnerBody) { + const path = '/api/v1/runners/unregister' + + return this.postBodyRequest({ + ...options, + + path, + fields: pick(options, [ 'runnerToken' ]), + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + delete (options: OverrideCommandOptions & { + id: number + }) { + const path = '/api/v1/runners/' + options.id + + return this.deleteRequest({ + ...options, + + path, + implicitToken: true, + defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204 + }) + } + + // --------------------------------------------------------------------------- + + async autoRegisterRunner () { + const { data } = await this.server.runnerRegistrationTokens.list({ sort: 'createdAt' }) + + const { runnerToken } = await this.register({ + name: 'runner', + registrationToken: data[0].registrationToken + }) + + return runnerToken + } +} diff --git a/shared/server-commands/server/config-command.ts b/shared/server-commands/server/config-command.ts index 303fcab88..9a6e413f2 100644 --- a/shared/server-commands/server/config-command.ts +++ b/shared/server-commands/server/config-command.ts @@ -5,8 +5,9 @@ import { AbstractCommand, OverrideCommandOptions } from '../shared/abstract-comm export class ConfigCommand extends AbstractCommand { - static getCustomConfigResolutions (enabled: boolean) { + static getCustomConfigResolutions (enabled: boolean, with0p = false) { return { + '0p': enabled && with0p, '144p': enabled, '240p': enabled, '360p': enabled, @@ -129,7 +130,8 @@ export class ConfigCommand extends AbstractCommand { }) } - enableTranscoding (webtorrent = true, hls = true) { + // TODO: convert args to object + enableTranscoding (webtorrent = true, hls = true, with0p = false) { return this.updateExistingSubConfig({ newConfig: { transcoding: { @@ -138,7 +140,7 @@ export class ConfigCommand extends AbstractCommand { allowAudioFiles: true, allowAdditionalExtensions: true, - resolutions: ConfigCommand.getCustomConfigResolutions(true), + resolutions: ConfigCommand.getCustomConfigResolutions(true, with0p), webtorrent: { enabled: webtorrent @@ -151,6 +153,7 @@ export class ConfigCommand extends AbstractCommand { }) } + // TODO: convert args to object enableMinimumTranscoding (webtorrent = true, hls = true) { return this.updateExistingSubConfig({ newConfig: { @@ -173,6 +176,25 @@ export class ConfigCommand extends AbstractCommand { }) } + enableRemoteTranscoding () { + return this.updateExistingSubConfig({ + newConfig: { + transcoding: { + remoteRunners: { + enabled: true + } + }, + live: { + transcoding: { + remoteRunners: { + enabled: true + } + } + } + } + }) + } + // --------------------------------------------------------------------------- enableStudio () { @@ -363,6 +385,9 @@ export class ConfigCommand extends AbstractCommand { }, transcoding: { enabled: true, + remoteRunners: { + enabled: false + }, allowAdditionalExtensions: true, allowAudioFiles: true, threads: 1, @@ -398,6 +423,9 @@ export class ConfigCommand extends AbstractCommand { maxUserLives: 50, transcoding: { enabled: true, + remoteRunners: { + enabled: false + }, threads: 4, profile: 'default', resolutions: { diff --git a/shared/server-commands/server/jobs.ts b/shared/server-commands/server/jobs.ts index e1d6cdff4..ff3098063 100644 --- a/shared/server-commands/server/jobs.ts +++ b/shared/server-commands/server/jobs.ts @@ -1,16 +1,17 @@ import { expect } from 'chai' import { wait } from '@shared/core-utils' -import { JobState, JobType } from '../../models' +import { JobState, JobType, RunnerJobState } from '../../models' import { PeerTubeServer } from './server' async function waitJobs ( serversArg: PeerTubeServer[] | PeerTubeServer, options: { skipDelayed?: boolean // default false + runnerJobs?: boolean // default false } = {} ) { - const { skipDelayed = false } = options + const { skipDelayed = false, runnerJobs = false } = options const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10) @@ -33,7 +34,8 @@ async function waitJobs ( // Check if each server has pending request for (const server of servers) { for (const state of states) { - const p = server.jobs.list({ + + const jobPromise = server.jobs.list({ state, start: 0, count: 10, @@ -46,17 +48,29 @@ async function waitJobs ( } }) - tasks.push(p) + tasks.push(jobPromise) } - const p = server.debug.getDebug() + const debugPromise = server.debug.getDebug() .then(obj => { if (obj.activityPubMessagesWaiting !== 0) { pendingRequests = true } }) + tasks.push(debugPromise) + + if (runnerJobs) { + const runnerJobsPromise = server.runnerJobs.list({ count: 100 }) + .then(({ data }) => { + for (const job of data) { + if (job.state.id !== RunnerJobState.COMPLETED) { + pendingRequests = true + } + } + }) + tasks.push(runnerJobsPromise) + } - tasks.push(p) } return tasks diff --git a/shared/server-commands/server/server.ts b/shared/server-commands/server/server.ts index d7e751581..f68b81367 100644 --- a/shared/server-commands/server/server.ts +++ b/shared/server-commands/server/server.ts @@ -8,9 +8,9 @@ import { CLICommand } from '../cli' import { CustomPagesCommand } from '../custom-pages' import { FeedCommand } from '../feeds' import { LogsCommand } from '../logs' -import { SQLCommand } from '../miscs' import { AbusesCommand } from '../moderation' import { OverviewsCommand } from '../overviews' +import { RunnerJobsCommand, RunnerRegistrationTokensCommand, RunnersCommand } from '../runners' import { SearchCommand } from '../search' import { SocketIOCommand } from '../socket' import { @@ -136,7 +136,6 @@ export class PeerTubeServer { streamingPlaylists?: StreamingPlaylistsCommand channels?: ChannelsCommand comments?: CommentsCommand - sql?: SQLCommand notifications?: NotificationsCommand servers?: ServersCommand login?: LoginCommand @@ -150,6 +149,10 @@ export class PeerTubeServer { videoToken?: VideoTokenCommand registrations?: RegistrationsCommand + runners?: RunnersCommand + runnerRegistrationTokens?: RunnerRegistrationTokensCommand + runnerJobs?: RunnerJobsCommand + constructor (options: { serverNumber: number } | { url: string }) { if ((options as any).url) { this.setUrl((options as any).url) @@ -311,14 +314,14 @@ export class PeerTubeServer { }) } - async kill () { - if (!this.app) return - - await this.sql.cleanup() + kill () { + if (!this.app) return Promise.resolve() process.kill(-this.app.pid) this.app = null + + return Promise.resolve() } private randomServer () { @@ -420,7 +423,6 @@ export class PeerTubeServer { this.streamingPlaylists = new StreamingPlaylistsCommand(this) this.channels = new ChannelsCommand(this) this.comments = new CommentsCommand(this) - this.sql = new SQLCommand(this) this.notifications = new NotificationsCommand(this) this.servers = new ServersCommand(this) this.login = new LoginCommand(this) @@ -433,5 +435,9 @@ export class PeerTubeServer { this.twoFactor = new TwoFactorCommand(this) this.videoToken = new VideoTokenCommand(this) this.registrations = new RegistrationsCommand(this) + + this.runners = new RunnersCommand(this) + this.runnerRegistrationTokens = new RunnerRegistrationTokensCommand(this) + this.runnerJobs = new RunnerJobsCommand(this) } } diff --git a/shared/server-commands/server/servers.ts b/shared/server-commands/server/servers.ts index b2b61adb3..fe9da9e63 100644 --- a/shared/server-commands/server/servers.ts +++ b/shared/server-commands/server/servers.ts @@ -20,7 +20,7 @@ function createMultipleServers (totalServers: number, configOverride?: object, o return Promise.all(serverPromises) } -async function killallServers (servers: PeerTubeServer[]) { +function killallServers (servers: PeerTubeServer[]) { return Promise.all(servers.map(s => s.kill())) } diff --git a/shared/server-commands/shared/abstract-command.ts b/shared/server-commands/shared/abstract-command.ts index 1b53a5330..ca4ffada9 100644 --- a/shared/server-commands/shared/abstract-command.ts +++ b/shared/server-commands/shared/abstract-command.ts @@ -33,6 +33,7 @@ interface InternalCommonCommandOptions extends OverrideCommandOptions { host?: string headers?: { [ name: string ]: string } requestType?: string + responseType?: string xForwardedFor?: string } @@ -169,7 +170,7 @@ abstract class AbstractCommand { } protected buildCommonRequestOptions (options: InternalCommonCommandOptions) { - const { url, path, redirects, contentType, accept, range, host, headers, requestType, xForwardedFor } = options + const { url, path, redirects, contentType, accept, range, host, headers, requestType, xForwardedFor, responseType } = options return { url: url ?? this.server.url, @@ -185,6 +186,7 @@ abstract class AbstractCommand { accept, headers, type: requestType, + responseType, xForwardedFor } } diff --git a/shared/server-commands/socket/socket-io-command.ts b/shared/server-commands/socket/socket-io-command.ts index c277ead28..c28a86366 100644 --- a/shared/server-commands/socket/socket-io-command.ts +++ b/shared/server-commands/socket/socket-io-command.ts @@ -12,4 +12,13 @@ export class SocketIOCommand extends AbstractCommand { getLiveNotificationSocket () { return io(this.server.url + '/live-videos') } + + getRunnersSocket (options: { + runnerToken: string + }) { + return io(this.server.url + '/runners', { + reconnection: false, + auth: { runnerToken: options.runnerToken } + }) + } } diff --git a/shared/server-commands/videos/live-command.ts b/shared/server-commands/videos/live-command.ts index 3273e3a8f..dc3c5a86e 100644 --- a/shared/server-commands/videos/live-command.ts +++ b/shared/server-commands/videos/live-command.ts @@ -121,7 +121,7 @@ export class LiveCommand extends AbstractCommand { permanentLive: boolean privacy?: VideoPrivacy }) { - const { saveReplay, permanentLive, privacy } = options + const { saveReplay, permanentLive, privacy = VideoPrivacy.PUBLIC } = options const { uuid } = await this.create({ ...options, diff --git a/shared/server-commands/videos/streaming-playlists-command.ts b/shared/server-commands/videos/streaming-playlists-command.ts index 26ab2735f..7b92dcc0a 100644 --- a/shared/server-commands/videos/streaming-playlists-command.ts +++ b/shared/server-commands/videos/streaming-playlists-command.ts @@ -13,7 +13,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand { withRetry?: boolean // default false currentRetry?: number - }) { + }): Promise { const { videoFileToken, reinjectVideoFileToken, withRetry, currentRetry = 1 } = options try { @@ -54,6 +54,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand { url: options.url, range: options.range, implicitToken: false, + responseType: 'application/octet-stream', defaultExpectedStatus: HttpStatusCode.OK_200 })) } @@ -65,6 +66,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand { ...options, url: options.url, + contentType: 'application/json', implicitToken: false, defaultExpectedStatus: HttpStatusCode.OK_200 })) -- cgit v1.2.3