diff options
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index ad5f7f6d9..2f01387e7 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { forever, queue } from 'async' | 1 | import { forever, queue } from 'async' |
2 | import * as Sequelize from 'sequelize' | ||
2 | 3 | ||
3 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
4 | import { | 5 | import { |
@@ -7,7 +8,10 @@ import { | |||
7 | JOB_STATES | 8 | JOB_STATES |
8 | } from '../../initializers' | 9 | } from '../../initializers' |
9 | import { logger } from '../../helpers' | 10 | import { logger } from '../../helpers' |
10 | import { jobHandlers } from './handlers' | 11 | import { JobInstance } from '../../models' |
12 | import { JobHandler, jobHandlers } from './handlers' | ||
13 | |||
14 | type JobQueueCallback = (err: Error) => void | ||
11 | 15 | ||
12 | class JobScheduler { | 16 | class JobScheduler { |
13 | 17 | ||
@@ -24,7 +28,7 @@ class JobScheduler { | |||
24 | 28 | ||
25 | logger.info('Jobs scheduler activated.') | 29 | logger.info('Jobs scheduler activated.') |
26 | 30 | ||
27 | const jobsQueue = queue(this.processJob.bind(this)) | 31 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) |
28 | 32 | ||
29 | // Finish processing jobs from a previous start | 33 | // Finish processing jobs from a previous start |
30 | const state = JOB_STATES.PROCESSING | 34 | const state = JOB_STATES.PROCESSING |
@@ -58,7 +62,7 @@ class JobScheduler { | |||
58 | }) | 62 | }) |
59 | } | 63 | } |
60 | 64 | ||
61 | createJob (transaction, handlerName: string, handlerInputData: object, callback) { | 65 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { |
62 | const createQuery = { | 66 | const createQuery = { |
63 | state: JOB_STATES.PENDING, | 67 | state: JOB_STATES.PENDING, |
64 | handlerName, | 68 | handlerName, |
@@ -69,7 +73,7 @@ class JobScheduler { | |||
69 | db.Job.create(createQuery, options).asCallback(callback) | 73 | db.Job.create(createQuery, options).asCallback(callback) |
70 | } | 74 | } |
71 | 75 | ||
72 | private enqueueJobs (err, jobsQueue, jobs) { | 76 | private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { |
73 | if (err) { | 77 | if (err) { |
74 | logger.error('Cannot list pending jobs.', { error: err }) | 78 | logger.error('Cannot list pending jobs.', { error: err }) |
75 | } else { | 79 | } else { |
@@ -79,7 +83,7 @@ class JobScheduler { | |||
79 | } | 83 | } |
80 | } | 84 | } |
81 | 85 | ||
82 | private processJob (job, callback) { | 86 | private processJob (job: JobInstance, callback: (err: Error) => void) { |
83 | const jobHandler = jobHandlers[job.handlerName] | 87 | const jobHandler = jobHandlers[job.handlerName] |
84 | 88 | ||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | 89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) |
@@ -89,8 +93,8 @@ class JobScheduler { | |||
89 | if (err) return this.cannotSaveJobError(err, callback) | 93 | if (err) return this.cannotSaveJobError(err, callback) |
90 | 94 | ||
91 | if (jobHandler === undefined) { | 95 | if (jobHandler === undefined) { |
92 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | 96 | logger.error('Unknown job handler for job %s.', job.handlerName) |
93 | return callback() | 97 | return callback(null) |
94 | } | 98 | } |
95 | 99 | ||
96 | return jobHandler.process(job.handlerInputData, (err, result) => { | 100 | return jobHandler.process(job.handlerInputData, (err, result) => { |
@@ -104,7 +108,7 @@ class JobScheduler { | |||
104 | }) | 108 | }) |
105 | } | 109 | } |
106 | 110 | ||
107 | private onJobError (jobHandler, job, jobResult, callback) { | 111 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { |
108 | job.state = JOB_STATES.ERROR | 112 | job.state = JOB_STATES.ERROR |
109 | 113 | ||
110 | job.save().asCallback(err => { | 114 | job.save().asCallback(err => { |
@@ -114,7 +118,7 @@ class JobScheduler { | |||
114 | }) | 118 | }) |
115 | } | 119 | } |
116 | 120 | ||
117 | private onJobSuccess (jobHandler, job, jobResult, callback) { | 121 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { |
118 | job.state = JOB_STATES.SUCCESS | 122 | job.state = JOB_STATES.SUCCESS |
119 | 123 | ||
120 | job.save().asCallback(err => { | 124 | job.save().asCallback(err => { |
@@ -124,7 +128,7 @@ class JobScheduler { | |||
124 | }) | 128 | }) |
125 | } | 129 | } |
126 | 130 | ||
127 | private cannotSaveJobError (err, callback) { | 131 | private cannotSaveJobError (err: Error, callback: (err: Error) => void) { |
128 | logger.error('Cannot save new job state.', { error: err }) | 132 | logger.error('Cannot save new job state.', { error: err }) |
129 | return callback(err) | 133 | return callback(err) |
130 | } | 134 | } |