]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame_incremental - server/lib/jobs/job-scheduler.ts
Begin activitypub
[github/Chocobozzz/PeerTube.git] / server / lib / jobs / job-scheduler.ts
... / ...
CommitLineData
1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize'
3
4import {
5 database as db,
6 JOBS_FETCHING_INTERVAL,
7 JOBS_FETCH_LIMIT_PER_CYCLE,
8 JOB_STATES
9} from '../../initializers'
10import { logger } from '../../helpers'
11import { JobInstance } from '../../models'
12import { JobCategory } from '../../../shared'
13
14export interface JobHandler<T> {
15 process (data: object, jobId: number): T
16 onError (err: Error, jobId: number)
17 onSuccess (jobId: number, jobResult: T)
18}
19type JobQueueCallback = (err: Error) => void
20
21class JobScheduler<T> {
22
23 constructor (
24 private jobCategory: JobCategory,
25 private jobHandlers: { [ id: string ]: JobHandler<T> }
26 ) {}
27
28 async activate () {
29 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
30
31 logger.info('Jobs scheduler %s activated.', this.jobCategory)
32
33 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this))
34
35 // Finish processing jobs from a previous start
36 const state = JOB_STATES.PROCESSING
37 try {
38 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
39
40 this.enqueueJobs(jobsQueue, jobs)
41 } catch (err) {
42 logger.error('Cannot list pending jobs.', err)
43 }
44
45 forever(
46 async next => {
47 if (jobsQueue.length() !== 0) {
48 // Finish processing the queue first
49 return setTimeout(next, JOBS_FETCHING_INTERVAL)
50 }
51
52 const state = JOB_STATES.PENDING
53 try {
54 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
55
56 this.enqueueJobs(jobsQueue, jobs)
57 } catch (err) {
58 logger.error('Cannot list pending jobs.', err)
59 }
60
61 // Optimization: we could use "drain" from queue object
62 return setTimeout(next, JOBS_FETCHING_INTERVAL)
63 },
64
65 err => logger.error('Error in job scheduler queue.', err)
66 )
67 }
68
69 createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) {
70 const createQuery = {
71 state: JOB_STATES.PENDING,
72 category,
73 handlerName,
74 handlerInputData
75 }
76 const options = { transaction }
77
78 return db.Job.create(createQuery, options)
79 }
80
81 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
82 jobs.forEach(job => jobsQueue.push(job))
83 }
84
85 private async processJob (job: JobInstance, callback: (err: Error) => void) {
86 const jobHandler = this.jobHandlers[job.handlerName]
87 if (jobHandler === undefined) {
88 logger.error('Unknown job handler for job %s.', job.handlerName)
89 return callback(null)
90 }
91
92 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
93
94 job.state = JOB_STATES.PROCESSING
95 await job.save()
96
97 try {
98 const result = await jobHandler.process(job.handlerInputData, job.id)
99 await this.onJobSuccess(jobHandler, job, result)
100 } catch (err) {
101 logger.error('Error in job handler %s.', job.handlerName, err)
102
103 try {
104 await this.onJobError(jobHandler, job, err)
105 } catch (innerErr) {
106 this.cannotSaveJobError(innerErr)
107 return callback(innerErr)
108 }
109 }
110
111 callback(null)
112 }
113
114 private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
115 job.state = JOB_STATES.ERROR
116
117 try {
118 await job.save()
119 await jobHandler.onError(err, job.id)
120 } catch (err) {
121 this.cannotSaveJobError(err)
122 }
123 }
124
125 private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
126 job.state = JOB_STATES.SUCCESS
127
128 try {
129 await job.save()
130 jobHandler.onSuccess(job.id, jobResult)
131 } catch (err) {
132 this.cannotSaveJobError(err)
133 }
134 }
135
136 private cannotSaveJobError (err: Error) {
137 logger.error('Cannot save new job state.', err)
138 }
139}
140
141// ---------------------------------------------------------------------------
142
143export {
144 JobScheduler
145}