aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-11-09 17:51:58 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-11-27 19:40:51 +0100
commite4f97babf701481b55cc10fb3448feab5f97c867 (patch)
treeaf37402a594dc5ff09f71ecb0687e8cfe4cdb471 /server/lib/jobs/job-scheduler.ts
parent343ad675f2a26c15b86150a9a3552e619d5d44f4 (diff)
downloadPeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.gz
PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.zst
PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.zip
Begin activitypub
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r--server/lib/jobs/job-scheduler.ts35
1 files changed, 19 insertions, 16 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 61d483268..89a4bca88 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -1,39 +1,41 @@
1import { AsyncQueue, forever, queue } from 'async' 1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3 3
4import { database as db } from '../../initializers/database'
5import { 4import {
5 database as db,
6 JOBS_FETCHING_INTERVAL, 6 JOBS_FETCHING_INTERVAL,
7 JOBS_FETCH_LIMIT_PER_CYCLE, 7 JOBS_FETCH_LIMIT_PER_CYCLE,
8 JOB_STATES 8 JOB_STATES
9} from '../../initializers' 9} from '../../initializers'
10import { logger } from '../../helpers' 10import { logger } from '../../helpers'
11import { JobInstance } from '../../models' 11import { JobInstance } from '../../models'
12import { JobHandler, jobHandlers } from './handlers' 12import { JobCategory } from '../../../shared'
13 13
14export interface JobHandler<T> {
15 process (data: object, jobId: number): T
16 onError (err: Error, jobId: number)
17 onSuccess (jobId: number, jobResult: T)
18}
14type JobQueueCallback = (err: Error) => void 19type JobQueueCallback = (err: Error) => void
15 20
16class JobScheduler { 21class JobScheduler<T> {
17
18 private static instance: JobScheduler
19 22
20 private constructor () { } 23 constructor (
21 24 private jobCategory: JobCategory,
22 static get Instance () { 25 private jobHandlers: { [ id: string ]: JobHandler<T> }
23 return this.instance || (this.instance = new this()) 26 ) {}
24 }
25 27
26 async activate () { 28 async activate () {
27 const limit = JOBS_FETCH_LIMIT_PER_CYCLE 29 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
28 30
29 logger.info('Jobs scheduler activated.') 31 logger.info('Jobs scheduler %s activated.', this.jobCategory)
30 32
31 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) 33 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this))
32 34
33 // Finish processing jobs from a previous start 35 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 36 const state = JOB_STATES.PROCESSING
35 try { 37 try {
36 const jobs = await db.Job.listWithLimit(limit, state) 38 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
37 39
38 this.enqueueJobs(jobsQueue, jobs) 40 this.enqueueJobs(jobsQueue, jobs)
39 } catch (err) { 41 } catch (err) {
@@ -49,7 +51,7 @@ class JobScheduler {
49 51
50 const state = JOB_STATES.PENDING 52 const state = JOB_STATES.PENDING
51 try { 53 try {
52 const jobs = await db.Job.listWithLimit(limit, state) 54 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
53 55
54 this.enqueueJobs(jobsQueue, jobs) 56 this.enqueueJobs(jobsQueue, jobs)
55 } catch (err) { 57 } catch (err) {
@@ -64,9 +66,10 @@ class JobScheduler {
64 ) 66 )
65 } 67 }
66 68
67 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { 69 createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) {
68 const createQuery = { 70 const createQuery = {
69 state: JOB_STATES.PENDING, 71 state: JOB_STATES.PENDING,
72 category,
70 handlerName, 73 handlerName,
71 handlerInputData 74 handlerInputData
72 } 75 }
@@ -80,7 +83,7 @@ class JobScheduler {
80 } 83 }
81 84
82 private async processJob (job: JobInstance, callback: (err: Error) => void) { 85 private async processJob (job: JobInstance, callback: (err: Error) => void) {
83 const jobHandler = jobHandlers[job.handlerName] 86 const jobHandler = this.jobHandlers[job.handlerName]
84 if (jobHandler === undefined) { 87 if (jobHandler === undefined) {
85 logger.error('Unknown job handler for job %s.', job.handlerName) 88 logger.error('Unknown job handler for job %s.', job.handlerName)
86 return callback(null) 89 return callback(null)