aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts24
1 files changed, 18 insertions, 6 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index bf40a9206..acc69ef24 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,7 +1,7 @@
1import * as kue from 'kue' 1import * as kue from 'kue'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' 4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
5import { Redis } from '../redis' 5import { Redis } from '../redis'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
@@ -27,6 +27,13 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
27 'email': processEmail 27 'email': processEmail
28} 28}
29 29
30const jobsWithTLL: JobType[] = [
31 'activitypub-http-broadcast',
32 'activitypub-http-unicast',
33 'activitypub-http-fetcher',
34 'activitypub-follow'
35]
36
30class JobQueue { 37class JobQueue {
31 38
32 private static instance: JobQueue 39 private static instance: JobQueue
@@ -77,16 +84,21 @@ class JobQueue {
77 84
78 createJob (obj: CreateJobArgument, priority = 'normal') { 85 createJob (obj: CreateJobArgument, priority = 'normal') {
79 return new Promise((res, rej) => { 86 return new Promise((res, rej) => {
80 this.jobQueue 87 let job = this.jobQueue
81 .create(obj.type, obj.payload) 88 .create(obj.type, obj.payload)
82 .priority(priority) 89 .priority(priority)
83 .attempts(JOB_ATTEMPTS[obj.type]) 90 .attempts(JOB_ATTEMPTS[obj.type])
84 .backoff({ delay: 60 * 1000, type: 'exponential' }) 91 .backoff({ delay: 60 * 1000, type: 'exponential' })
85 .save(err => {
86 if (err) return rej(err)
87 92
88 return res() 93 if (jobsWithTLL.indexOf(obj.type) !== -1) {
89 }) 94 job = job.ttl(JOB_REQUEST_TTL)
95 }
96
97 return job.save(err => {
98 if (err) return rej(err)
99
100 return res()
101 })
90 }) 102 })
91 } 103 }
92 104