diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-11-09 17:51:58 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-11-27 19:40:51 +0100 |
commit | e4f97babf701481b55cc10fb3448feab5f97c867 (patch) | |
tree | af37402a594dc5ff09f71ecb0687e8cfe4cdb471 /server/lib/jobs/job-scheduler.ts | |
parent | 343ad675f2a26c15b86150a9a3552e619d5d44f4 (diff) | |
download | PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.gz PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.zst PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.zip |
Begin activitypub
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 35 |
1 files changed, 19 insertions, 16 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 61d483268..89a4bca88 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -1,39 +1,41 @@ | |||
1 | import { AsyncQueue, forever, queue } from 'async' | 1 | import { AsyncQueue, forever, queue } from 'async' |
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | 3 | ||
4 | import { database as db } from '../../initializers/database' | ||
5 | import { | 4 | import { |
5 | database as db, | ||
6 | JOBS_FETCHING_INTERVAL, | 6 | JOBS_FETCHING_INTERVAL, |
7 | JOBS_FETCH_LIMIT_PER_CYCLE, | 7 | JOBS_FETCH_LIMIT_PER_CYCLE, |
8 | JOB_STATES | 8 | JOB_STATES |
9 | } from '../../initializers' | 9 | } from '../../initializers' |
10 | import { logger } from '../../helpers' | 10 | import { logger } from '../../helpers' |
11 | import { JobInstance } from '../../models' | 11 | import { JobInstance } from '../../models' |
12 | import { JobHandler, jobHandlers } from './handlers' | 12 | import { JobCategory } from '../../../shared' |
13 | 13 | ||
14 | export interface JobHandler<T> { | ||
15 | process (data: object, jobId: number): T | ||
16 | onError (err: Error, jobId: number) | ||
17 | onSuccess (jobId: number, jobResult: T) | ||
18 | } | ||
14 | type JobQueueCallback = (err: Error) => void | 19 | type JobQueueCallback = (err: Error) => void |
15 | 20 | ||
16 | class JobScheduler { | 21 | class JobScheduler<T> { |
17 | |||
18 | private static instance: JobScheduler | ||
19 | 22 | ||
20 | private constructor () { } | 23 | constructor ( |
21 | 24 | private jobCategory: JobCategory, | |
22 | static get Instance () { | 25 | private jobHandlers: { [ id: string ]: JobHandler<T> } |
23 | return this.instance || (this.instance = new this()) | 26 | ) {} |
24 | } | ||
25 | 27 | ||
26 | async activate () { | 28 | async activate () { |
27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE | 29 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] |
28 | 30 | ||
29 | logger.info('Jobs scheduler activated.') | 31 | logger.info('Jobs scheduler %s activated.', this.jobCategory) |
30 | 32 | ||
31 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) | 33 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) |
32 | 34 | ||
33 | // Finish processing jobs from a previous start | 35 | // Finish processing jobs from a previous start |
34 | const state = JOB_STATES.PROCESSING | 36 | const state = JOB_STATES.PROCESSING |
35 | try { | 37 | try { |
36 | const jobs = await db.Job.listWithLimit(limit, state) | 38 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) |
37 | 39 | ||
38 | this.enqueueJobs(jobsQueue, jobs) | 40 | this.enqueueJobs(jobsQueue, jobs) |
39 | } catch (err) { | 41 | } catch (err) { |
@@ -49,7 +51,7 @@ class JobScheduler { | |||
49 | 51 | ||
50 | const state = JOB_STATES.PENDING | 52 | const state = JOB_STATES.PENDING |
51 | try { | 53 | try { |
52 | const jobs = await db.Job.listWithLimit(limit, state) | 54 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) |
53 | 55 | ||
54 | this.enqueueJobs(jobsQueue, jobs) | 56 | this.enqueueJobs(jobsQueue, jobs) |
55 | } catch (err) { | 57 | } catch (err) { |
@@ -64,9 +66,10 @@ class JobScheduler { | |||
64 | ) | 66 | ) |
65 | } | 67 | } |
66 | 68 | ||
67 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { | 69 | createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) { |
68 | const createQuery = { | 70 | const createQuery = { |
69 | state: JOB_STATES.PENDING, | 71 | state: JOB_STATES.PENDING, |
72 | category, | ||
70 | handlerName, | 73 | handlerName, |
71 | handlerInputData | 74 | handlerInputData |
72 | } | 75 | } |
@@ -80,7 +83,7 @@ class JobScheduler { | |||
80 | } | 83 | } |
81 | 84 | ||
82 | private async processJob (job: JobInstance, callback: (err: Error) => void) { | 85 | private async processJob (job: JobInstance, callback: (err: Error) => void) { |
83 | const jobHandler = jobHandlers[job.handlerName] | 86 | const jobHandler = this.jobHandlers[job.handlerName] |
84 | if (jobHandler === undefined) { | 87 | if (jobHandler === undefined) { |
85 | logger.error('Unknown job handler for job %s.', job.handlerName) | 88 | logger.error('Unknown job handler for job %s.', job.handlerName) |
86 | return callback(null) | 89 | return callback(null) |