aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-06-10 22:15:25 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-06-10 22:15:25 +0200
commit69818c9394366b954b6ba3bd697bd9d2b09f2a16 (patch)
treead199a18ec3c322460d6f9523fc383ee562554e0 /server/lib/jobs/job-scheduler.ts
parent4d4e5cd4dca78480ec7f40e747f424cd107376a4 (diff)
downloadPeerTube-69818c9394366b954b6ba3bd697bd9d2b09f2a16.tar.gz
PeerTube-69818c9394366b954b6ba3bd697bd9d2b09f2a16.tar.zst
PeerTube-69818c9394366b954b6ba3bd697bd9d2b09f2a16.zip
Type functions
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r--server/lib/jobs/job-scheduler.ts24
1 files changed, 14 insertions, 10 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index ad5f7f6d9..2f01387e7 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -1,4 +1,5 @@
1import { forever, queue } from 'async' 1import { forever, queue } from 'async'
2import * as Sequelize from 'sequelize'
2 3
3import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
4import { 5import {
@@ -7,7 +8,10 @@ import {
7 JOB_STATES 8 JOB_STATES
8} from '../../initializers' 9} from '../../initializers'
9import { logger } from '../../helpers' 10import { logger } from '../../helpers'
10import { jobHandlers } from './handlers' 11import { JobInstance } from '../../models'
12import { JobHandler, jobHandlers } from './handlers'
13
14type JobQueueCallback = (err: Error) => void
11 15
12class JobScheduler { 16class JobScheduler {
13 17
@@ -24,7 +28,7 @@ class JobScheduler {
24 28
25 logger.info('Jobs scheduler activated.') 29 logger.info('Jobs scheduler activated.')
26 30
27 const jobsQueue = queue(this.processJob.bind(this)) 31 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this))
28 32
29 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
@@ -58,7 +62,7 @@ class JobScheduler {
58 }) 62 })
59 } 63 }
60 64
61 createJob (transaction, handlerName: string, handlerInputData: object, callback) { 65 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) {
62 const createQuery = { 66 const createQuery = {
63 state: JOB_STATES.PENDING, 67 state: JOB_STATES.PENDING,
64 handlerName, 68 handlerName,
@@ -69,7 +73,7 @@ class JobScheduler {
69 db.Job.create(createQuery, options).asCallback(callback) 73 db.Job.create(createQuery, options).asCallback(callback)
70 } 74 }
71 75
72 private enqueueJobs (err, jobsQueue, jobs) { 76 private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
73 if (err) { 77 if (err) {
74 logger.error('Cannot list pending jobs.', { error: err }) 78 logger.error('Cannot list pending jobs.', { error: err })
75 } else { 79 } else {
@@ -79,7 +83,7 @@ class JobScheduler {
79 } 83 }
80 } 84 }
81 85
82 private processJob (job, callback) { 86 private processJob (job: JobInstance, callback: (err: Error) => void) {
83 const jobHandler = jobHandlers[job.handlerName] 87 const jobHandler = jobHandlers[job.handlerName]
84 88
85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
@@ -89,8 +93,8 @@ class JobScheduler {
89 if (err) return this.cannotSaveJobError(err, callback) 93 if (err) return this.cannotSaveJobError(err, callback)
90 94
91 if (jobHandler === undefined) { 95 if (jobHandler === undefined) {
92 logger.error('Unknown job handler for job %s.', jobHandler.handlerName) 96 logger.error('Unknown job handler for job %s.', job.handlerName)
93 return callback() 97 return callback(null)
94 } 98 }
95 99
96 return jobHandler.process(job.handlerInputData, (err, result) => { 100 return jobHandler.process(job.handlerInputData, (err, result) => {
@@ -104,7 +108,7 @@ class JobScheduler {
104 }) 108 })
105 } 109 }
106 110
107 private onJobError (jobHandler, job, jobResult, callback) { 111 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) {
108 job.state = JOB_STATES.ERROR 112 job.state = JOB_STATES.ERROR
109 113
110 job.save().asCallback(err => { 114 job.save().asCallback(err => {
@@ -114,7 +118,7 @@ class JobScheduler {
114 }) 118 })
115 } 119 }
116 120
117 private onJobSuccess (jobHandler, job, jobResult, callback) { 121 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) {
118 job.state = JOB_STATES.SUCCESS 122 job.state = JOB_STATES.SUCCESS
119 123
120 job.save().asCallback(err => { 124 job.save().asCallback(err => {
@@ -124,7 +128,7 @@ class JobScheduler {
124 }) 128 })
125 } 129 }
126 130
127 private cannotSaveJobError (err, callback) { 131 private cannotSaveJobError (err: Error, callback: (err: Error) => void) {
128 logger.error('Cannot save new job state.', { error: err }) 132 logger.error('Cannot save new job state.', { error: err })
129 return callback(err) 133 return callback(err)
130 } 134 }