]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/jobs/job-scheduler.ts
Begin activitypub
[github/Chocobozzz/PeerTube.git] / server / lib / jobs / job-scheduler.ts
CommitLineData
bcd1c9e1 1import { AsyncQueue, forever, queue } from 'async'
69818c93 2import * as Sequelize from 'sequelize'
65fcc311 3
65fcc311 4import {
e4f97bab 5 database as db,
65fcc311
C
6 JOBS_FETCHING_INTERVAL,
7 JOBS_FETCH_LIMIT_PER_CYCLE,
8 JOB_STATES
9} from '../../initializers'
10import { logger } from '../../helpers'
69818c93 11import { JobInstance } from '../../models'
e4f97bab 12import { JobCategory } from '../../../shared'
69818c93 13
e4f97bab
C
14export interface JobHandler<T> {
15 process (data: object, jobId: number): T
16 onError (err: Error, jobId: number)
17 onSuccess (jobId: number, jobResult: T)
18}
69818c93 19type JobQueueCallback = (err: Error) => void
65fcc311 20
e4f97bab 21class JobScheduler<T> {
65fcc311 22
e4f97bab
C
23 constructor (
24 private jobCategory: JobCategory,
25 private jobHandlers: { [ id: string ]: JobHandler<T> }
26 ) {}
65fcc311 27
f5028693 28 async activate () {
e4f97bab 29 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
65fcc311 30
e4f97bab 31 logger.info('Jobs scheduler %s activated.', this.jobCategory)
65fcc311 32
69818c93 33 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this))
65fcc311
C
34
35 // Finish processing jobs from a previous start
36 const state = JOB_STATES.PROCESSING
f5028693 37 try {
e4f97bab 38 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
f5028693
C
39
40 this.enqueueJobs(jobsQueue, jobs)
41 } catch (err) {
42 logger.error('Cannot list pending jobs.', err)
43 }
44
45 forever(
46 async next => {
47 if (jobsQueue.length() !== 0) {
48 // Finish processing the queue first
49 return setTimeout(next, JOBS_FETCHING_INTERVAL)
50 }
51
52 const state = JOB_STATES.PENDING
53 try {
e4f97bab 54 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
f5028693
C
55
56 this.enqueueJobs(jobsQueue, jobs)
57 } catch (err) {
58 logger.error('Cannot list pending jobs.', err)
59 }
60
61 // Optimization: we could use "drain" from queue object
62 return setTimeout(next, JOBS_FETCHING_INTERVAL)
63 },
64
65 err => logger.error('Error in job scheduler queue.', err)
66 )
65fcc311
C
67 }
68
e4f97bab 69 createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) {
65fcc311
C
70 const createQuery = {
71 state: JOB_STATES.PENDING,
e4f97bab 72 category,
65fcc311
C
73 handlerName,
74 handlerInputData
75 }
76 const options = { transaction }
77
6fcd19ba 78 return db.Job.create(createQuery, options)
65fcc311
C
79 }
80
6fcd19ba
C
81 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
82 jobs.forEach(job => jobsQueue.push(job))
65fcc311
C
83 }
84
f5028693 85 private async processJob (job: JobInstance, callback: (err: Error) => void) {
e4f97bab 86 const jobHandler = this.jobHandlers[job.handlerName]
6fcd19ba
C
87 if (jobHandler === undefined) {
88 logger.error('Unknown job handler for job %s.', job.handlerName)
89 return callback(null)
90 }
65fcc311
C
91
92 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
93
94 job.state = JOB_STATES.PROCESSING
f5028693
C
95 await job.save()
96
97 try {
98 const result = await jobHandler.process(job.handlerInputData, job.id)
99 await this.onJobSuccess(jobHandler, job, result)
100 } catch (err) {
101 logger.error('Error in job handler %s.', job.handlerName, err)
102
103 try {
104 await this.onJobError(jobHandler, job, err)
105 } catch (innerErr) {
106 this.cannotSaveJobError(innerErr)
107 return callback(innerErr)
108 }
109 }
110
111 callback(null)
65fcc311
C
112 }
113
f5028693 114 private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
65fcc311
C
115 job.state = JOB_STATES.ERROR
116
f5028693
C
117 try {
118 await job.save()
119 await jobHandler.onError(err, job.id)
120 } catch (err) {
121 this.cannotSaveJobError(err)
122 }
65fcc311
C
123 }
124
f5028693 125 private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
65fcc311
C
126 job.state = JOB_STATES.SUCCESS
127
f5028693
C
128 try {
129 await job.save()
130 jobHandler.onSuccess(job.id, jobResult)
131 } catch (err) {
132 this.cannotSaveJobError(err)
133 }
65fcc311
C
134 }
135
6fcd19ba 136 private cannotSaveJobError (err: Error) {
ad0997ad 137 logger.error('Cannot save new job state.', err)
65fcc311
C
138 }
139}
140
141// ---------------------------------------------------------------------------
142
143export {
144 JobScheduler
145}