From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/models/runner/runner-job.ts | 347 ++++++++++++++++++++++ server/models/runner/runner-registration-token.ts | 103 +++++++ server/models/runner/runner.ts | 112 +++++++ 3 files changed, 562 insertions(+) create mode 100644 server/models/runner/runner-job.ts create mode 100644 server/models/runner/runner-registration-token.ts create mode 100644 server/models/runner/runner.ts (limited to 'server/models/runner') diff --git a/server/models/runner/runner-job.ts b/server/models/runner/runner-job.ts new file mode 100644 index 000000000..add6f9a43 --- /dev/null +++ b/server/models/runner/runner-job.ts @@ -0,0 +1,347 @@ +import { FindOptions, Op, Transaction } from 'sequelize' +import { + AllowNull, + BelongsTo, + Column, + CreatedAt, + DataType, + Default, + ForeignKey, + IsUUID, + Model, + Scopes, + Table, + UpdatedAt +} from 'sequelize-typescript' +import { isUUIDValid } from '@server/helpers/custom-validators/misc' +import { CONSTRAINTS_FIELDS, RUNNER_JOB_STATES } from '@server/initializers/constants' +import { MRunnerJob, MRunnerJobRunner, MRunnerJobRunnerParent } from '@server/types/models/runners' +import { RunnerJob, RunnerJobAdmin, RunnerJobPayload, RunnerJobPrivatePayload, RunnerJobState, RunnerJobType } from '@shared/models' +import { AttributesOnly } from '@shared/typescript-utils' +import { getSort, searchAttribute } from '../shared' +import { RunnerModel } from './runner' + +enum ScopeNames { + WITH_RUNNER = 'WITH_RUNNER', + WITH_PARENT = 'WITH_PARENT' +} + +@Scopes(() => ({ + [ScopeNames.WITH_RUNNER]: { + include: [ + { + model: RunnerModel.unscoped(), + required: false + } + ] + }, + [ScopeNames.WITH_PARENT]: { + include: [ + { + model: RunnerJobModel.unscoped(), + required: false + } + ] + } +})) +@Table({ + tableName: 'runnerJob', + indexes: [ + { + fields: [ 'uuid' ], + unique: true + }, + { + fields: [ 'processingJobToken' ], + unique: true + }, + { + fields: [ 'runnerId' ] + } + ] +}) +export class RunnerJobModel extends Model>> { + + @AllowNull(false) + @IsUUID(4) + @Column(DataType.UUID) + uuid: string + + @AllowNull(false) + @Column + type: RunnerJobType + + @AllowNull(false) + @Column(DataType.JSONB) + payload: RunnerJobPayload + + @AllowNull(false) + @Column(DataType.JSONB) + privatePayload: RunnerJobPrivatePayload + + @AllowNull(false) + @Column + state: RunnerJobState + + @AllowNull(false) + @Default(0) + @Column + failures: number + + @AllowNull(true) + @Column(DataType.STRING(CONSTRAINTS_FIELDS.RUNNER_JOBS.ERROR_MESSAGE.max)) + error: string + + // Less has priority + @AllowNull(false) + @Column + priority: number + + // Used to fetch the appropriate job when the runner wants to post the result + @AllowNull(true) + @Column + processingJobToken: string + + @AllowNull(true) + @Column + progress: number + + @AllowNull(true) + @Column + startedAt: Date + + @AllowNull(true) + @Column + finishedAt: Date + + @CreatedAt + createdAt: Date + + @UpdatedAt + updatedAt: Date + + @ForeignKey(() => RunnerJobModel) + @Column + dependsOnRunnerJobId: number + + @BelongsTo(() => RunnerJobModel, { + foreignKey: { + name: 'dependsOnRunnerJobId', + allowNull: true + }, + onDelete: 'cascade' + }) + DependsOnRunnerJob: RunnerJobModel + + @ForeignKey(() => RunnerModel) + @Column + runnerId: number + + @BelongsTo(() => RunnerModel, { + foreignKey: { + name: 'runnerId', + allowNull: true + }, + onDelete: 'SET NULL' + }) + Runner: RunnerModel + + // --------------------------------------------------------------------------- + + static loadWithRunner (uuid: string) { + const query = { + where: { uuid } + } + + return RunnerJobModel.scope(ScopeNames.WITH_RUNNER).findOne(query) + } + + static loadByRunnerAndJobTokensWithRunner (options: { + uuid: string + runnerToken: string + jobToken: string + }) { + const { uuid, runnerToken, jobToken } = options + + const query = { + where: { + uuid, + processingJobToken: jobToken + }, + include: { + model: RunnerModel.unscoped(), + required: true, + where: { + runnerToken + } + } + } + + return RunnerJobModel.findOne(query) + } + + static listAvailableJobs () { + const query = { + limit: 10, + order: getSort('priority'), + where: { + state: RunnerJobState.PENDING + } + } + + return RunnerJobModel.findAll(query) + } + + static listStalledJobs (options: { + staleTimeMS: number + types: RunnerJobType[] + }) { + const before = new Date(Date.now() - options.staleTimeMS) + + return RunnerJobModel.findAll({ + where: { + type: { + [Op.in]: options.types + }, + state: RunnerJobState.PROCESSING, + updatedAt: { + [Op.lt]: before + } + } + }) + } + + static listChildrenOf (job: MRunnerJob, transaction?: Transaction) { + const query = { + where: { + dependsOnRunnerJobId: job.id + }, + transaction + } + + return RunnerJobModel.findAll(query) + } + + static listForApi (options: { + start: number + count: number + sort: string + search?: string + }) { + const { start, count, sort, search } = options + + const query: FindOptions = { + offset: start, + limit: count, + order: getSort(sort) + } + + if (search) { + if (isUUIDValid(search)) { + query.where = { uuid: search } + } else { + query.where = { + [Op.or]: [ + searchAttribute(search, 'type'), + searchAttribute(search, '$Runner.name$') + ] + } + } + } + + return Promise.all([ + RunnerJobModel.scope([ ScopeNames.WITH_RUNNER ]).count(query), + RunnerJobModel.scope([ ScopeNames.WITH_RUNNER, ScopeNames.WITH_PARENT ]).findAll(query) + ]).then(([ total, data ]) => ({ total, data })) + } + + static updateDependantJobsOf (runnerJob: MRunnerJob) { + const where = { + dependsOnRunnerJobId: runnerJob.id + } + + return RunnerJobModel.update({ state: RunnerJobState.PENDING }, { where }) + } + + static cancelAllJobs (options: { type: RunnerJobType }) { + const where = { + type: options.type + } + + return RunnerJobModel.update({ state: RunnerJobState.CANCELLED }, { where }) + } + + // --------------------------------------------------------------------------- + + resetToPending () { + this.state = RunnerJobState.PENDING + this.processingJobToken = null + this.progress = null + this.startedAt = null + this.runnerId = null + } + + setToErrorOrCancel ( + state: RunnerJobState.PARENT_ERRORED | RunnerJobState.ERRORED | RunnerJobState.CANCELLED | RunnerJobState.PARENT_CANCELLED + ) { + this.state = state + this.processingJobToken = null + this.finishedAt = new Date() + } + + toFormattedJSON (this: MRunnerJobRunnerParent): RunnerJob { + const runner = this.Runner + ? { + id: this.Runner.id, + name: this.Runner.name, + description: this.Runner.description + } + : null + + const parent = this.DependsOnRunnerJob + ? { + id: this.DependsOnRunnerJob.id, + uuid: this.DependsOnRunnerJob.uuid, + type: this.DependsOnRunnerJob.type, + state: { + id: this.DependsOnRunnerJob.state, + label: RUNNER_JOB_STATES[this.DependsOnRunnerJob.state] + } + } + : undefined + + return { + uuid: this.uuid, + type: this.type, + + state: { + id: this.state, + label: RUNNER_JOB_STATES[this.state] + }, + + progress: this.progress, + priority: this.priority, + failures: this.failures, + error: this.error, + + payload: this.payload, + + startedAt: this.startedAt?.toISOString(), + finishedAt: this.finishedAt?.toISOString(), + + createdAt: this.createdAt.toISOString(), + updatedAt: this.updatedAt.toISOString(), + + parent, + runner + } + } + + toFormattedAdminJSON (this: MRunnerJobRunnerParent): RunnerJobAdmin { + return { + ...this.toFormattedJSON(), + + privatePayload: this.privatePayload + } + } +} diff --git a/server/models/runner/runner-registration-token.ts b/server/models/runner/runner-registration-token.ts new file mode 100644 index 000000000..b2ae6c9eb --- /dev/null +++ b/server/models/runner/runner-registration-token.ts @@ -0,0 +1,103 @@ +import { FindOptions, literal } from 'sequelize' +import { AllowNull, Column, CreatedAt, HasMany, Model, Table, UpdatedAt } from 'sequelize-typescript' +import { MRunnerRegistrationToken } from '@server/types/models/runners' +import { RunnerRegistrationToken } from '@shared/models' +import { AttributesOnly } from '@shared/typescript-utils' +import { getSort } from '../shared' +import { RunnerModel } from './runner' + +/** + * + * Tokens used by PeerTube runners to register themselves to the PeerTube instance + * + */ + +@Table({ + tableName: 'runnerRegistrationToken', + indexes: [ + { + fields: [ 'registrationToken' ], + unique: true + } + ] +}) +export class RunnerRegistrationTokenModel extends Model>> { + + @AllowNull(false) + @Column + registrationToken: string + + @CreatedAt + createdAt: Date + + @UpdatedAt + updatedAt: Date + + @HasMany(() => RunnerModel, { + foreignKey: { + allowNull: true + }, + onDelete: 'cascade' + }) + Runners: RunnerModel[] + + static load (id: number) { + return RunnerRegistrationTokenModel.findByPk(id) + } + + static loadByRegistrationToken (registrationToken: string) { + const query = { + where: { registrationToken } + } + + return RunnerRegistrationTokenModel.findOne(query) + } + + static countTotal () { + return RunnerRegistrationTokenModel.unscoped().count() + } + + static listForApi (options: { + start: number + count: number + sort: string + }) { + const { start, count, sort } = options + + const query: FindOptions = { + attributes: { + include: [ + [ + literal('(SELECT COUNT(*) FROM "runner" WHERE "runner"."runnerRegistrationTokenId" = "RunnerRegistrationTokenModel"."id")'), + 'registeredRunnersCount' + ] + ] + }, + offset: start, + limit: count, + order: getSort(sort) + } + + return Promise.all([ + RunnerRegistrationTokenModel.count(query), + RunnerRegistrationTokenModel.findAll(query) + ]).then(([ total, data ]) => ({ total, data })) + } + + // --------------------------------------------------------------------------- + + toFormattedJSON (this: MRunnerRegistrationToken): RunnerRegistrationToken { + const registeredRunnersCount = this.get('registeredRunnersCount') as number + + return { + id: this.id, + + registrationToken: this.registrationToken, + + createdAt: this.createdAt, + updatedAt: this.updatedAt, + + registeredRunnersCount + } + } +} diff --git a/server/models/runner/runner.ts b/server/models/runner/runner.ts new file mode 100644 index 000000000..1ef0018b4 --- /dev/null +++ b/server/models/runner/runner.ts @@ -0,0 +1,112 @@ +import { FindOptions } from 'sequelize' +import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript' +import { MRunner } from '@server/types/models/runners' +import { Runner } from '@shared/models' +import { AttributesOnly } from '@shared/typescript-utils' +import { getSort } from '../shared' +import { RunnerRegistrationTokenModel } from './runner-registration-token' +import { CONSTRAINTS_FIELDS } from '@server/initializers/constants' + +@Table({ + tableName: 'runner', + indexes: [ + { + fields: [ 'runnerToken' ], + unique: true + }, + { + fields: [ 'runnerRegistrationTokenId' ] + } + ] +}) +export class RunnerModel extends Model>> { + + // Used to identify the appropriate runner when it uses the runner REST API + @AllowNull(false) + @Column + runnerToken: string + + @AllowNull(false) + @Column + name: string + + @AllowNull(true) + @Column(DataType.STRING(CONSTRAINTS_FIELDS.RUNNERS.DESCRIPTION.max)) + description: string + + @AllowNull(false) + @Column + lastContact: Date + + @AllowNull(false) + @Column + ip: string + + @CreatedAt + createdAt: Date + + @UpdatedAt + updatedAt: Date + + @ForeignKey(() => RunnerRegistrationTokenModel) + @Column + runnerRegistrationTokenId: number + + @BelongsTo(() => RunnerRegistrationTokenModel, { + foreignKey: { + allowNull: false + }, + onDelete: 'cascade' + }) + RunnerRegistrationToken: RunnerRegistrationTokenModel + + // --------------------------------------------------------------------------- + + static load (id: number) { + return RunnerModel.findByPk(id) + } + + static loadByToken (runnerToken: string) { + const query = { + where: { runnerToken } + } + + return RunnerModel.findOne(query) + } + + static listForApi (options: { + start: number + count: number + sort: string + }) { + const { start, count, sort } = options + + const query: FindOptions = { + offset: start, + limit: count, + order: getSort(sort) + } + + return Promise.all([ + RunnerModel.count(query), + RunnerModel.findAll(query) + ]).then(([ total, data ]) => ({ total, data })) + } + + // --------------------------------------------------------------------------- + + toFormattedJSON (this: MRunner): Runner { + return { + id: this.id, + + name: this.name, + description: this.description, + + ip: this.ip, + lastContact: this.lastContact, + + createdAt: this.createdAt, + updatedAt: this.updatedAt + } + } +} -- cgit v1.2.3