aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2017-12-12 17:53:50 +0100
committerChocobozzz <me@florianbigard.com>2017-12-13 16:50:33 +0100
commit3fd3ab2d34d512b160a5e6084d7609be7b4f4452 (patch)
treee5ca358287fca6ecacce83defcf23af1e8e9f419 /server/lib/jobs/job-scheduler.ts
parentc893d4514e6ecbf282c7985fe5f82b8acd8a1137 (diff)
downloadPeerTube-3fd3ab2d34d512b160a5e6084d7609be7b4f4452.tar.gz
PeerTube-3fd3ab2d34d512b160a5e6084d7609be7b4f4452.tar.zst
PeerTube-3fd3ab2d34d512b160a5e6084d7609be7b4f4452.zip
Move models to typescript-sequelize
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r--server/lib/jobs/job-scheduler.ts20
1 files changed, 10 insertions, 10 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 62ce6927e..88fe8a4a3 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -2,8 +2,8 @@ import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import { JobCategory } from '../../../shared' 3import { JobCategory } from '../../../shared'
4import { logger } from '../../helpers' 4import { logger } from '../../helpers'
5import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' 5import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
6import { JobInstance } from '../../models' 6import { JobModel } from '../../models/job/job'
7 7
8export interface JobHandler<P, T> { 8export interface JobHandler<P, T> {
9 process (data: object, jobId: number): Promise<T> 9 process (data: object, jobId: number): Promise<T>
@@ -24,12 +24,12 @@ class JobScheduler<P, T> {
24 24
25 logger.info('Jobs scheduler %s activated.', this.jobCategory) 25 logger.info('Jobs scheduler %s activated.', this.jobCategory)
26 26
27 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) 27 const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))
28 28
29 // Finish processing jobs from a previous start 29 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING 30 const state = JOB_STATES.PROCESSING
31 try { 31 try {
32 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) 32 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
33 33
34 this.enqueueJobs(jobsQueue, jobs) 34 this.enqueueJobs(jobsQueue, jobs)
35 } catch (err) { 35 } catch (err) {
@@ -45,7 +45,7 @@ class JobScheduler<P, T> {
45 45
46 const state = JOB_STATES.PENDING 46 const state = JOB_STATES.PENDING
47 try { 47 try {
48 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) 48 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
49 49
50 this.enqueueJobs(jobsQueue, jobs) 50 this.enqueueJobs(jobsQueue, jobs)
51 } catch (err) { 51 } catch (err) {
@@ -70,14 +70,14 @@ class JobScheduler<P, T> {
70 70
71 const options = { transaction } 71 const options = { transaction }
72 72
73 return db.Job.create(createQuery, options) 73 return JobModel.create(createQuery, options)
74 } 74 }
75 75
76 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { 76 private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
77 jobs.forEach(job => jobsQueue.push(job)) 77 jobs.forEach(job => jobsQueue.push(job))
78 } 78 }
79 79
80 private async processJob (job: JobInstance, callback: (err: Error) => void) { 80 private async processJob (job: JobModel, callback: (err: Error) => void) {
81 const jobHandler = this.jobHandlers[job.handlerName] 81 const jobHandler = this.jobHandlers[job.handlerName]
82 if (jobHandler === undefined) { 82 if (jobHandler === undefined) {
83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id 83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
@@ -110,7 +110,7 @@ class JobScheduler<P, T> {
110 return callback(null) 110 return callback(null)
111 } 111 }
112 112
113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) { 113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
114 job.state = JOB_STATES.ERROR 114 job.state = JOB_STATES.ERROR
115 115
116 try { 116 try {
@@ -121,7 +121,7 @@ class JobScheduler<P, T> {
121 } 121 }
122 } 122 }
123 123
124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) { 124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
125 job.state = JOB_STATES.SUCCESS 125 job.state = JOB_STATES.SUCCESS
126 126
127 try { 127 try {