diff options
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 62ce6927e..88fe8a4a3 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -2,8 +2,8 @@ import { AsyncQueue, forever, queue } from 'async' | |||
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | import { JobCategory } from '../../../shared' | 3 | import { JobCategory } from '../../../shared' |
4 | import { logger } from '../../helpers' | 4 | import { logger } from '../../helpers' |
5 | import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' | 5 | import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' |
6 | import { JobInstance } from '../../models' | 6 | import { JobModel } from '../../models/job/job' |
7 | 7 | ||
8 | export interface JobHandler<P, T> { | 8 | export interface JobHandler<P, T> { |
9 | process (data: object, jobId: number): Promise<T> | 9 | process (data: object, jobId: number): Promise<T> |
@@ -24,12 +24,12 @@ class JobScheduler<P, T> { | |||
24 | 24 | ||
25 | logger.info('Jobs scheduler %s activated.', this.jobCategory) | 25 | logger.info('Jobs scheduler %s activated.', this.jobCategory) |
26 | 26 | ||
27 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) | 27 | const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this)) |
28 | 28 | ||
29 | // Finish processing jobs from a previous start | 29 | // Finish processing jobs from a previous start |
30 | const state = JOB_STATES.PROCESSING | 30 | const state = JOB_STATES.PROCESSING |
31 | try { | 31 | try { |
32 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) | 32 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) |
33 | 33 | ||
34 | this.enqueueJobs(jobsQueue, jobs) | 34 | this.enqueueJobs(jobsQueue, jobs) |
35 | } catch (err) { | 35 | } catch (err) { |
@@ -45,7 +45,7 @@ class JobScheduler<P, T> { | |||
45 | 45 | ||
46 | const state = JOB_STATES.PENDING | 46 | const state = JOB_STATES.PENDING |
47 | try { | 47 | try { |
48 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) | 48 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) |
49 | 49 | ||
50 | this.enqueueJobs(jobsQueue, jobs) | 50 | this.enqueueJobs(jobsQueue, jobs) |
51 | } catch (err) { | 51 | } catch (err) { |
@@ -70,14 +70,14 @@ class JobScheduler<P, T> { | |||
70 | 70 | ||
71 | const options = { transaction } | 71 | const options = { transaction } |
72 | 72 | ||
73 | return db.Job.create(createQuery, options) | 73 | return JobModel.create(createQuery, options) |
74 | } | 74 | } |
75 | 75 | ||
76 | private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { | 76 | private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) { |
77 | jobs.forEach(job => jobsQueue.push(job)) | 77 | jobs.forEach(job => jobsQueue.push(job)) |
78 | } | 78 | } |
79 | 79 | ||
80 | private async processJob (job: JobInstance, callback: (err: Error) => void) { | 80 | private async processJob (job: JobModel, callback: (err: Error) => void) { |
81 | const jobHandler = this.jobHandlers[job.handlerName] | 81 | const jobHandler = this.jobHandlers[job.handlerName] |
82 | if (jobHandler === undefined) { | 82 | if (jobHandler === undefined) { |
83 | const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id | 83 | const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id |
@@ -110,7 +110,7 @@ class JobScheduler<P, T> { | |||
110 | return callback(null) | 110 | return callback(null) |
111 | } | 111 | } |
112 | 112 | ||
113 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) { | 113 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) { |
114 | job.state = JOB_STATES.ERROR | 114 | job.state = JOB_STATES.ERROR |
115 | 115 | ||
116 | try { | 116 | try { |
@@ -121,7 +121,7 @@ class JobScheduler<P, T> { | |||
121 | } | 121 | } |
122 | } | 122 | } |
123 | 123 | ||
124 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) { | 124 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) { |
125 | job.state = JOB_STATES.SUCCESS | 125 | job.state = JOB_STATES.SUCCESS |
126 | 126 | ||
127 | try { | 127 | try { |