1 import * as kue from 'kue'
2 import { JobType, JobState } from '../../../shared/models'
3 import { logger } from '../../helpers/logger'
4 import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5 import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6 import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7 import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
8 import { EmailPayload, processEmail } from './handlers/email'
9 import { processVideoFile, VideoFilePayload } from './handlers/video-file'
11 type CreateJobArgument =
12 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
13 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
14 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
15 { type: 'video-file', payload: VideoFilePayload } |
16 { type: 'email', payload: EmailPayload }
18 const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
19 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
20 'activitypub-http-unicast': processActivityPubHttpUnicast,
21 'activitypub-http-fetcher': processActivityPubHttpFetcher,
22 'video-file': processVideoFile,
28 private static instance: JobQueue
30 private jobQueue: kue.Queue
31 private initialized = false
33 private constructor () {}
36 // Already initialized
37 if (this.initialized === true) return
38 this.initialized = true
40 this.jobQueue = kue.createQueue({
41 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
43 host: CONFIG.REDIS.HOSTNAME,
44 port: CONFIG.REDIS.PORT,
45 auth: CONFIG.REDIS.AUTH
49 this.jobQueue.setMaxListeners(15)
51 this.jobQueue.on('error', err => {
52 logger.error('Error in job queue.', err)
55 this.jobQueue.watchStuckJobs(5000)
57 for (const handlerName of Object.keys(handlers)) {
58 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
60 const res = await handlers[ handlerName ](job)
61 return done(null, res)
69 createJob (obj: CreateJobArgument, priority = 'normal') {
70 return new Promise((res, rej) => {
72 .create(obj.type, obj.payload)
74 .attempts(JOB_ATTEMPTS[obj.type])
75 .backoff({ delay: 60 * 1000, type: 'exponential' })
77 if (err) return rej(err)
84 listForApi (state: JobState, start: number, count: number, sort: string) {
85 return new Promise<kue.Job[]>((res, rej) => {
86 kue.Job.rangeByState(state, start, start + count, sort, (err, jobs) => {
87 if (err) return rej(err)
94 count (state: JobState) {
95 return new Promise<number>((res, rej) => {
96 this.jobQueue[state + 'Count']((err, total) => {
97 if (err) return rej(err)
105 const now = new Date().getTime()
106 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
108 logger.error('Cannot get jobs when removing old jobs.', err)
112 for (const job of jobs) {
113 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
120 static get Instance () {
121 return this.instance || (this.instance = new this())
125 // ---------------------------------------------------------------------------