aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/email.ts22
-rw-r--r--server/lib/job-queue/job-queue.ts9
2 files changed, 29 insertions, 2 deletions
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
new file mode 100644
index 000000000..9d7686116
--- /dev/null
+++ b/server/lib/job-queue/handlers/email.ts
@@ -0,0 +1,22 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { Emailer } from '../../emailer'
4
5export type EmailPayload = {
6 to: string[]
7 subject: string
8 text: string
9}
10
11async function processEmail (job: kue.Job) {
12 const payload = job.data as EmailPayload
13 logger.info('Processing email in job %d.', job.id)
14
15 return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text)
16}
17
18// ---------------------------------------------------------------------------
19
20export {
21 processEmail
22}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 7a2b6c78d..3f176f896 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -5,19 +5,22 @@ import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '.
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
8import { EmailPayload, processEmail } from './handlers/email'
8import { processVideoFile, VideoFilePayload } from './handlers/video-file' 9import { processVideoFile, VideoFilePayload } from './handlers/video-file'
9 10
10type CreateJobArgument = 11type CreateJobArgument =
11 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 12 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
12 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 13 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
13 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | 14 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
14 { type: 'video-file', payload: VideoFilePayload } 15 { type: 'video-file', payload: VideoFilePayload } |
16 { type: 'email', payload: EmailPayload }
15 17
16const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { 18const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
17 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 19 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
18 'activitypub-http-unicast': processActivityPubHttpUnicast, 20 'activitypub-http-unicast': processActivityPubHttpUnicast,
19 'activitypub-http-fetcher': processActivityPubHttpFetcher, 21 'activitypub-http-fetcher': processActivityPubHttpFetcher,
20 'video-file': processVideoFile 22 'video-file': processVideoFile,
23 'email': processEmail
21} 24}
22 25
23class JobQueue { 26class JobQueue {
@@ -43,6 +46,8 @@ class JobQueue {
43 } 46 }
44 }) 47 })
45 48
49 this.jobQueue.setMaxListeners(15)
50
46 this.jobQueue.on('error', err => { 51 this.jobQueue.on('error', err => {
47 logger.error('Error in job queue.', err) 52 logger.error('Error in job queue.', err)
48 process.exit(-1) 53 process.exit(-1)