aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-11-10 17:27:49 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-11-27 19:40:51 +0100
commit571389d43b8fc8aaf27e77c06f19b320b08dbbc9 (patch)
treee57173bcd0590d939c28952a29258fd02a281e35 /server/lib/jobs/job-scheduler.ts
parent38fa2065831b5f55be0d7f30f19a62c967397208 (diff)
downloadPeerTube-571389d43b8fc8aaf27e77c06f19b320b08dbbc9.tar.gz
PeerTube-571389d43b8fc8aaf27e77c06f19b320b08dbbc9.tar.zst
PeerTube-571389d43b8fc8aaf27e77c06f19b320b08dbbc9.zip
Make it compile at least
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r--server/lib/jobs/job-scheduler.ts33
1 files changed, 14 insertions, 19 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 89a4bca88..f10f745b3 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -1,28 +1,22 @@
1import { AsyncQueue, forever, queue } from 'async' 1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3 3import { JobCategory } from '../../../shared'
4import {
5 database as db,
6 JOBS_FETCHING_INTERVAL,
7 JOBS_FETCH_LIMIT_PER_CYCLE,
8 JOB_STATES
9} from '../../initializers'
10import { logger } from '../../helpers' 4import { logger } from '../../helpers'
5import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
11import { JobInstance } from '../../models' 6import { JobInstance } from '../../models'
12import { JobCategory } from '../../../shared'
13 7
14export interface JobHandler<T> { 8export interface JobHandler<P, T> {
15 process (data: object, jobId: number): T 9 process (data: object, jobId: number): Promise<T>
16 onError (err: Error, jobId: number) 10 onError (err: Error, jobId: number)
17 onSuccess (jobId: number, jobResult: T) 11 onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>)
18} 12}
19type JobQueueCallback = (err: Error) => void 13type JobQueueCallback = (err: Error) => void
20 14
21class JobScheduler<T> { 15class JobScheduler<P, T> {
22 16
23 constructor ( 17 constructor (
24 private jobCategory: JobCategory, 18 private jobCategory: JobCategory,
25 private jobHandlers: { [ id: string ]: JobHandler<T> } 19 private jobHandlers: { [ id: string ]: JobHandler<P, T> }
26 ) {} 20 ) {}
27 21
28 async activate () { 22 async activate () {
@@ -66,13 +60,14 @@ class JobScheduler<T> {
66 ) 60 )
67 } 61 }
68 62
69 createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) { 63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
70 const createQuery = { 64 const createQuery = {
71 state: JOB_STATES.PENDING, 65 state: JOB_STATES.PENDING,
72 category, 66 category: this.jobCategory,
73 handlerName, 67 handlerName,
74 handlerInputData 68 handlerInputData
75 } 69 }
70
76 const options = { transaction } 71 const options = { transaction }
77 72
78 return db.Job.create(createQuery, options) 73 return db.Job.create(createQuery, options)
@@ -95,7 +90,7 @@ class JobScheduler<T> {
95 await job.save() 90 await job.save()
96 91
97 try { 92 try {
98 const result = await jobHandler.process(job.handlerInputData, job.id) 93 const result: T = await jobHandler.process(job.handlerInputData, job.id)
99 await this.onJobSuccess(jobHandler, job, result) 94 await this.onJobSuccess(jobHandler, job, result)
100 } catch (err) { 95 } catch (err) {
101 logger.error('Error in job handler %s.', job.handlerName, err) 96 logger.error('Error in job handler %s.', job.handlerName, err)
@@ -111,7 +106,7 @@ class JobScheduler<T> {
111 callback(null) 106 callback(null)
112 } 107 }
113 108
114 private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { 109 private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) {
115 job.state = JOB_STATES.ERROR 110 job.state = JOB_STATES.ERROR
116 111
117 try { 112 try {
@@ -122,12 +117,12 @@ class JobScheduler<T> {
122 } 117 }
123 } 118 }
124 119
125 private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { 120 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) {
126 job.state = JOB_STATES.SUCCESS 121 job.state = JOB_STATES.SUCCESS
127 122
128 try { 123 try {
129 await job.save() 124 await job.save()
130 jobHandler.onSuccess(job.id, jobResult) 125 jobHandler.onSuccess(job.id, jobResult, this)
131 } catch (err) { 126 } catch (err) {
132 this.cannotSaveJobError(err) 127 this.cannotSaveJobError(err)
133 } 128 }