aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/models/runner
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/models/runner
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/models/runner')
-rw-r--r--server/models/runner/runner-job.ts347
-rw-r--r--server/models/runner/runner-registration-token.ts103
-rw-r--r--server/models/runner/runner.ts112
3 files changed, 562 insertions, 0 deletions
diff --git a/server/models/runner/runner-job.ts b/server/models/runner/runner-job.ts
new file mode 100644
index 000000000..add6f9a43
--- /dev/null
+++ b/server/models/runner/runner-job.ts
@@ -0,0 +1,347 @@
1import { FindOptions, Op, Transaction } from 'sequelize'
2import {
3 AllowNull,
4 BelongsTo,
5 Column,
6 CreatedAt,
7 DataType,
8 Default,
9 ForeignKey,
10 IsUUID,
11 Model,
12 Scopes,
13 Table,
14 UpdatedAt
15} from 'sequelize-typescript'
16import { isUUIDValid } from '@server/helpers/custom-validators/misc'
17import { CONSTRAINTS_FIELDS, RUNNER_JOB_STATES } from '@server/initializers/constants'
18import { MRunnerJob, MRunnerJobRunner, MRunnerJobRunnerParent } from '@server/types/models/runners'
19import { RunnerJob, RunnerJobAdmin, RunnerJobPayload, RunnerJobPrivatePayload, RunnerJobState, RunnerJobType } from '@shared/models'
20import { AttributesOnly } from '@shared/typescript-utils'
21import { getSort, searchAttribute } from '../shared'
22import { RunnerModel } from './runner'
23
24enum ScopeNames {
25 WITH_RUNNER = 'WITH_RUNNER',
26 WITH_PARENT = 'WITH_PARENT'
27}
28
29@Scopes(() => ({
30 [ScopeNames.WITH_RUNNER]: {
31 include: [
32 {
33 model: RunnerModel.unscoped(),
34 required: false
35 }
36 ]
37 },
38 [ScopeNames.WITH_PARENT]: {
39 include: [
40 {
41 model: RunnerJobModel.unscoped(),
42 required: false
43 }
44 ]
45 }
46}))
47@Table({
48 tableName: 'runnerJob',
49 indexes: [
50 {
51 fields: [ 'uuid' ],
52 unique: true
53 },
54 {
55 fields: [ 'processingJobToken' ],
56 unique: true
57 },
58 {
59 fields: [ 'runnerId' ]
60 }
61 ]
62})
63export class RunnerJobModel extends Model<Partial<AttributesOnly<RunnerJobModel>>> {
64
65 @AllowNull(false)
66 @IsUUID(4)
67 @Column(DataType.UUID)
68 uuid: string
69
70 @AllowNull(false)
71 @Column
72 type: RunnerJobType
73
74 @AllowNull(false)
75 @Column(DataType.JSONB)
76 payload: RunnerJobPayload
77
78 @AllowNull(false)
79 @Column(DataType.JSONB)
80 privatePayload: RunnerJobPrivatePayload
81
82 @AllowNull(false)
83 @Column
84 state: RunnerJobState
85
86 @AllowNull(false)
87 @Default(0)
88 @Column
89 failures: number
90
91 @AllowNull(true)
92 @Column(DataType.STRING(CONSTRAINTS_FIELDS.RUNNER_JOBS.ERROR_MESSAGE.max))
93 error: string
94
95 // Less has priority
96 @AllowNull(false)
97 @Column
98 priority: number
99
100 // Used to fetch the appropriate job when the runner wants to post the result
101 @AllowNull(true)
102 @Column
103 processingJobToken: string
104
105 @AllowNull(true)
106 @Column
107 progress: number
108
109 @AllowNull(true)
110 @Column
111 startedAt: Date
112
113 @AllowNull(true)
114 @Column
115 finishedAt: Date
116
117 @CreatedAt
118 createdAt: Date
119
120 @UpdatedAt
121 updatedAt: Date
122
123 @ForeignKey(() => RunnerJobModel)
124 @Column
125 dependsOnRunnerJobId: number
126
127 @BelongsTo(() => RunnerJobModel, {
128 foreignKey: {
129 name: 'dependsOnRunnerJobId',
130 allowNull: true
131 },
132 onDelete: 'cascade'
133 })
134 DependsOnRunnerJob: RunnerJobModel
135
136 @ForeignKey(() => RunnerModel)
137 @Column
138 runnerId: number
139
140 @BelongsTo(() => RunnerModel, {
141 foreignKey: {
142 name: 'runnerId',
143 allowNull: true
144 },
145 onDelete: 'SET NULL'
146 })
147 Runner: RunnerModel
148
149 // ---------------------------------------------------------------------------
150
151 static loadWithRunner (uuid: string) {
152 const query = {
153 where: { uuid }
154 }
155
156 return RunnerJobModel.scope(ScopeNames.WITH_RUNNER).findOne<MRunnerJobRunner>(query)
157 }
158
159 static loadByRunnerAndJobTokensWithRunner (options: {
160 uuid: string
161 runnerToken: string
162 jobToken: string
163 }) {
164 const { uuid, runnerToken, jobToken } = options
165
166 const query = {
167 where: {
168 uuid,
169 processingJobToken: jobToken
170 },
171 include: {
172 model: RunnerModel.unscoped(),
173 required: true,
174 where: {
175 runnerToken
176 }
177 }
178 }
179
180 return RunnerJobModel.findOne<MRunnerJobRunner>(query)
181 }
182
183 static listAvailableJobs () {
184 const query = {
185 limit: 10,
186 order: getSort('priority'),
187 where: {
188 state: RunnerJobState.PENDING
189 }
190 }
191
192 return RunnerJobModel.findAll<MRunnerJob>(query)
193 }
194
195 static listStalledJobs (options: {
196 staleTimeMS: number
197 types: RunnerJobType[]
198 }) {
199 const before = new Date(Date.now() - options.staleTimeMS)
200
201 return RunnerJobModel.findAll<MRunnerJob>({
202 where: {
203 type: {
204 [Op.in]: options.types
205 },
206 state: RunnerJobState.PROCESSING,
207 updatedAt: {
208 [Op.lt]: before
209 }
210 }
211 })
212 }
213
214 static listChildrenOf (job: MRunnerJob, transaction?: Transaction) {
215 const query = {
216 where: {
217 dependsOnRunnerJobId: job.id
218 },
219 transaction
220 }
221
222 return RunnerJobModel.findAll<MRunnerJob>(query)
223 }
224
225 static listForApi (options: {
226 start: number
227 count: number
228 sort: string
229 search?: string
230 }) {
231 const { start, count, sort, search } = options
232
233 const query: FindOptions = {
234 offset: start,
235 limit: count,
236 order: getSort(sort)
237 }
238
239 if (search) {
240 if (isUUIDValid(search)) {
241 query.where = { uuid: search }
242 } else {
243 query.where = {
244 [Op.or]: [
245 searchAttribute(search, 'type'),
246 searchAttribute(search, '$Runner.name$')
247 ]
248 }
249 }
250 }
251
252 return Promise.all([
253 RunnerJobModel.scope([ ScopeNames.WITH_RUNNER ]).count(query),
254 RunnerJobModel.scope([ ScopeNames.WITH_RUNNER, ScopeNames.WITH_PARENT ]).findAll<MRunnerJobRunnerParent>(query)
255 ]).then(([ total, data ]) => ({ total, data }))
256 }
257
258 static updateDependantJobsOf (runnerJob: MRunnerJob) {
259 const where = {
260 dependsOnRunnerJobId: runnerJob.id
261 }
262
263 return RunnerJobModel.update({ state: RunnerJobState.PENDING }, { where })
264 }
265
266 static cancelAllJobs (options: { type: RunnerJobType }) {
267 const where = {
268 type: options.type
269 }
270
271 return RunnerJobModel.update({ state: RunnerJobState.CANCELLED }, { where })
272 }
273
274 // ---------------------------------------------------------------------------
275
276 resetToPending () {
277 this.state = RunnerJobState.PENDING
278 this.processingJobToken = null
279 this.progress = null
280 this.startedAt = null
281 this.runnerId = null
282 }
283
284 setToErrorOrCancel (
285 state: RunnerJobState.PARENT_ERRORED | RunnerJobState.ERRORED | RunnerJobState.CANCELLED | RunnerJobState.PARENT_CANCELLED
286 ) {
287 this.state = state
288 this.processingJobToken = null
289 this.finishedAt = new Date()
290 }
291
292 toFormattedJSON (this: MRunnerJobRunnerParent): RunnerJob {
293 const runner = this.Runner
294 ? {
295 id: this.Runner.id,
296 name: this.Runner.name,
297 description: this.Runner.description
298 }
299 : null
300
301 const parent = this.DependsOnRunnerJob
302 ? {
303 id: this.DependsOnRunnerJob.id,
304 uuid: this.DependsOnRunnerJob.uuid,
305 type: this.DependsOnRunnerJob.type,
306 state: {
307 id: this.DependsOnRunnerJob.state,
308 label: RUNNER_JOB_STATES[this.DependsOnRunnerJob.state]
309 }
310 }
311 : undefined
312
313 return {
314 uuid: this.uuid,
315 type: this.type,
316
317 state: {
318 id: this.state,
319 label: RUNNER_JOB_STATES[this.state]
320 },
321
322 progress: this.progress,
323 priority: this.priority,
324 failures: this.failures,
325 error: this.error,
326
327 payload: this.payload,
328
329 startedAt: this.startedAt?.toISOString(),
330 finishedAt: this.finishedAt?.toISOString(),
331
332 createdAt: this.createdAt.toISOString(),
333 updatedAt: this.updatedAt.toISOString(),
334
335 parent,
336 runner
337 }
338 }
339
340 toFormattedAdminJSON (this: MRunnerJobRunnerParent): RunnerJobAdmin {
341 return {
342 ...this.toFormattedJSON(),
343
344 privatePayload: this.privatePayload
345 }
346 }
347}
diff --git a/server/models/runner/runner-registration-token.ts b/server/models/runner/runner-registration-token.ts
new file mode 100644
index 000000000..b2ae6c9eb
--- /dev/null
+++ b/server/models/runner/runner-registration-token.ts
@@ -0,0 +1,103 @@
1import { FindOptions, literal } from 'sequelize'
2import { AllowNull, Column, CreatedAt, HasMany, Model, Table, UpdatedAt } from 'sequelize-typescript'
3import { MRunnerRegistrationToken } from '@server/types/models/runners'
4import { RunnerRegistrationToken } from '@shared/models'
5import { AttributesOnly } from '@shared/typescript-utils'
6import { getSort } from '../shared'
7import { RunnerModel } from './runner'
8
9/**
10 *
11 * Tokens used by PeerTube runners to register themselves to the PeerTube instance
12 *
13 */
14
15@Table({
16 tableName: 'runnerRegistrationToken',
17 indexes: [
18 {
19 fields: [ 'registrationToken' ],
20 unique: true
21 }
22 ]
23})
24export class RunnerRegistrationTokenModel extends Model<Partial<AttributesOnly<RunnerRegistrationTokenModel>>> {
25
26 @AllowNull(false)
27 @Column
28 registrationToken: string
29
30 @CreatedAt
31 createdAt: Date
32
33 @UpdatedAt
34 updatedAt: Date
35
36 @HasMany(() => RunnerModel, {
37 foreignKey: {
38 allowNull: true
39 },
40 onDelete: 'cascade'
41 })
42 Runners: RunnerModel[]
43
44 static load (id: number) {
45 return RunnerRegistrationTokenModel.findByPk(id)
46 }
47
48 static loadByRegistrationToken (registrationToken: string) {
49 const query = {
50 where: { registrationToken }
51 }
52
53 return RunnerRegistrationTokenModel.findOne(query)
54 }
55
56 static countTotal () {
57 return RunnerRegistrationTokenModel.unscoped().count()
58 }
59
60 static listForApi (options: {
61 start: number
62 count: number
63 sort: string
64 }) {
65 const { start, count, sort } = options
66
67 const query: FindOptions = {
68 attributes: {
69 include: [
70 [
71 literal('(SELECT COUNT(*) FROM "runner" WHERE "runner"."runnerRegistrationTokenId" = "RunnerRegistrationTokenModel"."id")'),
72 'registeredRunnersCount'
73 ]
74 ]
75 },
76 offset: start,
77 limit: count,
78 order: getSort(sort)
79 }
80
81 return Promise.all([
82 RunnerRegistrationTokenModel.count(query),
83 RunnerRegistrationTokenModel.findAll<MRunnerRegistrationToken>(query)
84 ]).then(([ total, data ]) => ({ total, data }))
85 }
86
87 // ---------------------------------------------------------------------------
88
89 toFormattedJSON (this: MRunnerRegistrationToken): RunnerRegistrationToken {
90 const registeredRunnersCount = this.get('registeredRunnersCount') as number
91
92 return {
93 id: this.id,
94
95 registrationToken: this.registrationToken,
96
97 createdAt: this.createdAt,
98 updatedAt: this.updatedAt,
99
100 registeredRunnersCount
101 }
102 }
103}
diff --git a/server/models/runner/runner.ts b/server/models/runner/runner.ts
new file mode 100644
index 000000000..1ef0018b4
--- /dev/null
+++ b/server/models/runner/runner.ts
@@ -0,0 +1,112 @@
1import { FindOptions } from 'sequelize'
2import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript'
3import { MRunner } from '@server/types/models/runners'
4import { Runner } from '@shared/models'
5import { AttributesOnly } from '@shared/typescript-utils'
6import { getSort } from '../shared'
7import { RunnerRegistrationTokenModel } from './runner-registration-token'
8import { CONSTRAINTS_FIELDS } from '@server/initializers/constants'
9
10@Table({
11 tableName: 'runner',
12 indexes: [
13 {
14 fields: [ 'runnerToken' ],
15 unique: true
16 },
17 {
18 fields: [ 'runnerRegistrationTokenId' ]
19 }
20 ]
21})
22export class RunnerModel extends Model<Partial<AttributesOnly<RunnerModel>>> {
23
24 // Used to identify the appropriate runner when it uses the runner REST API
25 @AllowNull(false)
26 @Column
27 runnerToken: string
28
29 @AllowNull(false)
30 @Column
31 name: string
32
33 @AllowNull(true)
34 @Column(DataType.STRING(CONSTRAINTS_FIELDS.RUNNERS.DESCRIPTION.max))
35 description: string
36
37 @AllowNull(false)
38 @Column
39 lastContact: Date
40
41 @AllowNull(false)
42 @Column
43 ip: string
44
45 @CreatedAt
46 createdAt: Date
47
48 @UpdatedAt
49 updatedAt: Date
50
51 @ForeignKey(() => RunnerRegistrationTokenModel)
52 @Column
53 runnerRegistrationTokenId: number
54
55 @BelongsTo(() => RunnerRegistrationTokenModel, {
56 foreignKey: {
57 allowNull: false
58 },
59 onDelete: 'cascade'
60 })
61 RunnerRegistrationToken: RunnerRegistrationTokenModel
62
63 // ---------------------------------------------------------------------------
64
65 static load (id: number) {
66 return RunnerModel.findByPk(id)
67 }
68
69 static loadByToken (runnerToken: string) {
70 const query = {
71 where: { runnerToken }
72 }
73
74 return RunnerModel.findOne(query)
75 }
76
77 static listForApi (options: {
78 start: number
79 count: number
80 sort: string
81 }) {
82 const { start, count, sort } = options
83
84 const query: FindOptions = {
85 offset: start,
86 limit: count,
87 order: getSort(sort)
88 }
89
90 return Promise.all([
91 RunnerModel.count(query),
92 RunnerModel.findAll<MRunner>(query)
93 ]).then(([ total, data ]) => ({ total, data }))
94 }
95
96 // ---------------------------------------------------------------------------
97
98 toFormattedJSON (this: MRunner): Runner {
99 return {
100 id: this.id,
101
102 name: this.name,
103 description: this.description,
104
105 ip: this.ip,
106 lastContact: this.lastContact,
107
108 createdAt: this.createdAt,
109 updatedAt: this.updatedAt
110 }
111 }
112}