aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts124
1 files changed, 124 insertions, 0 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
new file mode 100644
index 000000000..7a2b6c78d
--- /dev/null
+++ b/server/lib/job-queue/job-queue.ts
@@ -0,0 +1,124 @@
1import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
8import { processVideoFile, VideoFilePayload } from './handlers/video-file'
9
10type CreateJobArgument =
11 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
12 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
13 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
14 { type: 'video-file', payload: VideoFilePayload }
15
16const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
17 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
18 'activitypub-http-unicast': processActivityPubHttpUnicast,
19 'activitypub-http-fetcher': processActivityPubHttpFetcher,
20 'video-file': processVideoFile
21}
22
23class JobQueue {
24
25 private static instance: JobQueue
26
27 private jobQueue: kue.Queue
28 private initialized = false
29
30 private constructor () {}
31
32 init () {
33 // Already initialized
34 if (this.initialized === true) return
35 this.initialized = true
36
37 this.jobQueue = kue.createQueue({
38 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
39 redis: {
40 host: CONFIG.REDIS.HOSTNAME,
41 port: CONFIG.REDIS.PORT,
42 auth: CONFIG.REDIS.AUTH
43 }
44 })
45
46 this.jobQueue.on('error', err => {
47 logger.error('Error in job queue.', err)
48 process.exit(-1)
49 })
50 this.jobQueue.watchStuckJobs(5000)
51
52 for (const handlerName of Object.keys(handlers)) {
53 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
54 try {
55 const res = await handlers[ handlerName ](job)
56 return done(null, res)
57 } catch (err) {
58 return done(err)
59 }
60 })
61 }
62 }
63
64 createJob (obj: CreateJobArgument, priority = 'normal') {
65 return new Promise((res, rej) => {
66 this.jobQueue
67 .create(obj.type, obj.payload)
68 .priority(priority)
69 .attempts(JOB_ATTEMPTS[obj.type])
70 .backoff({ type: 'exponential' })
71 .save(err => {
72 if (err) return rej(err)
73
74 return res()
75 })
76 })
77 }
78
79 listForApi (state: JobState, start: number, count: number, sort: string) {
80 return new Promise<kue.Job[]>((res, rej) => {
81 kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
82 if (err) return rej(err)
83
84 return res(jobs)
85 })
86 })
87 }
88
89 count (state: JobState) {
90 return new Promise<number>((res, rej) => {
91 this.jobQueue[state + 'Count']((err, total) => {
92 if (err) return rej(err)
93
94 return res(total)
95 })
96 })
97 }
98
99 removeOldJobs () {
100 const now = new Date().getTime()
101 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
102 if (err) {
103 logger.error('Cannot get jobs when removing old jobs.', err)
104 return
105 }
106
107 for (const job of jobs) {
108 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
109 job.remove()
110 }
111 }
112 })
113 }
114
115 static get Instance () {
116 return this.instance || (this.instance = new this())
117 }
118}
119
120// ---------------------------------------------------------------------------
121
122export {
123 JobQueue
124}