From f5028693a896a3076dd286ac0030e3d8f78f5ebf Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 25 Oct 2017 16:03:33 +0200 Subject: Use async/await in lib and initializers --- server/lib/jobs/job-scheduler.ts | 118 +++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 55 deletions(-) (limited to 'server/lib/jobs/job-scheduler.ts') diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index c2409d20c..61d483268 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts @@ -23,7 +23,7 @@ class JobScheduler { return this.instance || (this.instance = new this()) } - activate () { + async activate () { const limit = JOBS_FETCH_LIMIT_PER_CYCLE logger.info('Jobs scheduler activated.') @@ -32,32 +32,36 @@ class JobScheduler { // Finish processing jobs from a previous start const state = JOB_STATES.PROCESSING - db.Job.listWithLimit(limit, state) - .then(jobs => { - this.enqueueJobs(jobsQueue, jobs) - - forever( - next => { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, JOBS_FETCHING_INTERVAL) - } - - const state = JOB_STATES.PENDING - db.Job.listWithLimit(limit, state) - .then(jobs => { - this.enqueueJobs(jobsQueue, jobs) - - // Optimization: we could use "drain" from queue object - return setTimeout(next, JOBS_FETCHING_INTERVAL) - }) - .catch(err => logger.error('Cannot list pending jobs.', err)) - }, - - err => logger.error('Error in job scheduler queue.', err) - ) - }) - .catch(err => logger.error('Cannot list pending jobs.', err)) + try { + const jobs = await db.Job.listWithLimit(limit, state) + + this.enqueueJobs(jobsQueue, jobs) + } catch (err) { + logger.error('Cannot list pending jobs.', err) + } + + forever( + async next => { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, JOBS_FETCHING_INTERVAL) + } + + const state = JOB_STATES.PENDING + try { + const jobs = await db.Job.listWithLimit(limit, state) + + this.enqueueJobs(jobsQueue, jobs) + } catch (err) { + logger.error('Cannot list pending jobs.', err) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, JOBS_FETCHING_INTERVAL) + }, + + err => logger.error('Error in job scheduler queue.', err) + ) } createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { @@ -75,7 +79,7 @@ class JobScheduler { jobs.forEach(job => jobsQueue.push(job)) } - private processJob (job: JobInstance, callback: (err: Error) => void) { + private async processJob (job: JobInstance, callback: (err: Error) => void) { const jobHandler = jobHandlers[job.handlerName] if (jobHandler === undefined) { logger.error('Unknown job handler for job %s.', job.handlerName) @@ -85,41 +89,45 @@ class JobScheduler { logger.info('Processing job %d with handler %s.', job.id, job.handlerName) job.state = JOB_STATES.PROCESSING - return job.save() - .then(() => { - return jobHandler.process(job.handlerInputData, job.id) - }) - .then( - result => { - return this.onJobSuccess(jobHandler, job, result) - }, - - err => { - logger.error('Error in job handler %s.', job.handlerName, err) - return this.onJobError(jobHandler, job, err) - } - ) - .then(() => callback(null)) - .catch(err => { - this.cannotSaveJobError(err) - return callback(err) - }) + await job.save() + + try { + const result = await jobHandler.process(job.handlerInputData, job.id) + await this.onJobSuccess(jobHandler, job, result) + } catch (err) { + logger.error('Error in job handler %s.', job.handlerName, err) + + try { + await this.onJobError(jobHandler, job, err) + } catch (innerErr) { + this.cannotSaveJobError(innerErr) + return callback(innerErr) + } + } + + callback(null) } - private onJobError (jobHandler: JobHandler, job: JobInstance, err: Error) { + private async onJobError (jobHandler: JobHandler, job: JobInstance, err: Error) { job.state = JOB_STATES.ERROR - return job.save() - .then(() => jobHandler.onError(err, job.id)) - .catch(err => this.cannotSaveJobError(err)) + try { + await job.save() + await jobHandler.onError(err, job.id) + } catch (err) { + this.cannotSaveJobError(err) + } } - private onJobSuccess (jobHandler: JobHandler, job: JobInstance, jobResult: any) { + private async onJobSuccess (jobHandler: JobHandler, job: JobInstance, jobResult: any) { job.state = JOB_STATES.SUCCESS - return job.save() - .then(() => jobHandler.onSuccess(job.id, jobResult)) - .catch(err => this.cannotSaveJobError(err)) + try { + await job.save() + jobHandler.onSuccess(job.id, jobResult) + } catch (err) { + this.cannotSaveJobError(err) + } } private cannotSaveJobError (err: Error) { -- cgit v1.2.3