]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Don't stuck on active jobs
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
94a5ff8a
C
1import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
ecb4e35f 8import { EmailPayload, processEmail } from './handlers/email'
94a5ff8a
C
9import { processVideoFile, VideoFilePayload } from './handlers/video-file'
10
11type CreateJobArgument =
12 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
13 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
14 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
ecb4e35f
C
15 { type: 'video-file', payload: VideoFilePayload } |
16 { type: 'email', payload: EmailPayload }
94a5ff8a
C
17
18const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
19 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
20 'activitypub-http-unicast': processActivityPubHttpUnicast,
21 'activitypub-http-fetcher': processActivityPubHttpFetcher,
ecb4e35f
C
22 'video-file': processVideoFile,
23 'email': processEmail
94a5ff8a
C
24}
25
26class JobQueue {
27
28 private static instance: JobQueue
29
30 private jobQueue: kue.Queue
31 private initialized = false
32
33 private constructor () {}
34
3df45638 35 async init () {
94a5ff8a
C
36 // Already initialized
37 if (this.initialized === true) return
38 this.initialized = true
39
40 this.jobQueue = kue.createQueue({
41 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
42 redis: {
43 host: CONFIG.REDIS.HOSTNAME,
44 port: CONFIG.REDIS.PORT,
45 auth: CONFIG.REDIS.AUTH
46 }
47 })
48
ecb4e35f
C
49 this.jobQueue.setMaxListeners(15)
50
94a5ff8a
C
51 this.jobQueue.on('error', err => {
52 logger.error('Error in job queue.', err)
53 process.exit(-1)
54 })
55 this.jobQueue.watchStuckJobs(5000)
56
3df45638
C
57 await this.reactiveStuckJobs()
58
94a5ff8a
C
59 for (const handlerName of Object.keys(handlers)) {
60 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
61 try {
62 const res = await handlers[ handlerName ](job)
63 return done(null, res)
64 } catch (err) {
65 return done(err)
66 }
67 })
68 }
69 }
70
71 createJob (obj: CreateJobArgument, priority = 'normal') {
72 return new Promise((res, rej) => {
73 this.jobQueue
74 .create(obj.type, obj.payload)
75 .priority(priority)
76 .attempts(JOB_ATTEMPTS[obj.type])
802dbc32 77 .backoff({ delay: 60 * 1000, type: 'exponential' })
94a5ff8a
C
78 .save(err => {
79 if (err) return rej(err)
80
81 return res()
82 })
83 })
84 }
85
86 listForApi (state: JobState, start: number, count: number, sort: string) {
87 return new Promise<kue.Job[]>((res, rej) => {
18cfac83 88 kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => {
94a5ff8a
C
89 if (err) return rej(err)
90
91 return res(jobs)
92 })
93 })
94 }
95
96 count (state: JobState) {
97 return new Promise<number>((res, rej) => {
98 this.jobQueue[state + 'Count']((err, total) => {
99 if (err) return rej(err)
100
101 return res(total)
102 })
103 })
104 }
105
106 removeOldJobs () {
107 const now = new Date().getTime()
108 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
109 if (err) {
110 logger.error('Cannot get jobs when removing old jobs.', err)
111 return
112 }
113
114 for (const job of jobs) {
115 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
116 job.remove()
117 }
118 }
119 })
120 }
121
3df45638
C
122 private reactiveStuckJobs () {
123 const promises: Promise<any>[] = []
124
125 this.jobQueue.active((err, ids) => {
126 if (err) throw err
127
128 for (const id of ids) {
129 kue.Job.get(id, (err, job) => {
130 if (err) throw err
131
132 const p = new Promise((res, rej) => {
133 job.inactive(err => {
134 if (err) return rej(err)
135 return res()
136 })
137 })
138
139 promises.push(p)
140 })
141 }
142 })
143
144 return Promise.all(promises)
145 }
146
94a5ff8a
C
147 static get Instance () {
148 return this.instance || (this.instance = new this())
149 }
150}
151
152// ---------------------------------------------------------------------------
153
154export {
155 JobQueue
156}