aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/lib/activitypub/inbox-manager.ts37
-rw-r--r--server/middlewares/async.ts19
2 files changed, 26 insertions, 30 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
index f2785d6ce..27778cc9d 100644
--- a/server/lib/activitypub/inbox-manager.ts
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -1,4 +1,4 @@
1import { queue, QueueObject } from 'async' 1import PQueue from 'p-queue'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' 3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models' 4import { MActorDefault, MActorSignature } from '@server/types/models'
@@ -6,42 +6,33 @@ import { Activity } from '@shared/models'
6import { StatsManager } from '../stat-manager' 6import { StatsManager } from '../stat-manager'
7import { processActivities } from './process' 7import { processActivities } from './process'
8 8
9type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13}
14
15class InboxManager { 9class InboxManager {
16 10
17 private static instance: InboxManager 11 private static instance: InboxManager
18 12 private readonly inboxQueue: PQueue
19 private readonly inboxQueue: QueueObject<QueueParam>
20 13
21 private constructor () { 14 private constructor () {
22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => { 15 this.inboxQueue = new PQueue({ concurrency: 1 })
23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
24
25 processActivities(task.activities, options)
26 .then(() => cb())
27 .catch(err => {
28 logger.error('Error in process activities.', { err })
29 cb()
30 })
31 })
32 16
33 setInterval(() => { 17 setInterval(() => {
34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) 18 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
35 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) 19 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
36 } 20 }
37 21
38 addInboxMessage (options: QueueParam) { 22 addInboxMessage (param: {
39 this.inboxQueue.push(options) 23 activities: Activity[]
40 .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) 24 signatureActor?: MActorSignature
25 inboxActor?: MActorDefault
26 }) {
27 this.inboxQueue.add(() => {
28 const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
29
30 return processActivities(param.activities, options)
31 }).catch(err => logger.error('Error with inbox queue.', { err }))
41 } 32 }
42 33
43 getActivityPubMessagesWaiting () { 34 getActivityPubMessagesWaiting () {
44 return this.inboxQueue.length() + this.inboxQueue.running() 35 return this.inboxQueue.size + this.inboxQueue.pending
45 } 36 }
46 37
47 static get Instance () { 38 static get Instance () {
diff --git a/server/middlewares/async.ts b/server/middlewares/async.ts
index 9d0193536..7e131257d 100644
--- a/server/middlewares/async.ts
+++ b/server/middlewares/async.ts
@@ -1,21 +1,26 @@
1import { eachSeries } from 'async' 1import Bluebird from 'bluebird'
2import { NextFunction, Request, RequestHandler, Response } from 'express' 2import { NextFunction, Request, RequestHandler, Response } from 'express'
3import { ValidationChain } from 'express-validator' 3import { ValidationChain } from 'express-validator'
4import { ExpressPromiseHandler } from '@server/types/express-handler' 4import { ExpressPromiseHandler } from '@server/types/express-handler'
5import { retryTransactionWrapper } from '../helpers/database-utils' 5import { retryTransactionWrapper } from '../helpers/database-utils'
6 6
7// Syntactic sugar to avoid try/catch in express controllers 7// Syntactic sugar to avoid try/catch in express controllers/middlewares
8// Thanks: https://medium.com/@Abazhenov/using-async-await-in-express-with-node-8-b8af872c0016
9 8
10export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler 9export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler
11 10
12function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) { 11function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) {
13 return (req: Request, res: Response, next: NextFunction) => { 12 return (req: Request, res: Response, next: NextFunction) => {
14 if (Array.isArray(fun) === true) { 13 if (Array.isArray(fun) === true) {
15 return eachSeries(fun as RequestHandler[], (f, cb) => { 14 return Bluebird.each(fun as RequestPromiseHandler[], f => {
16 Promise.resolve(f(req, res, err => cb(err))) 15 return new Promise<void>((resolve, reject) => {
17 .catch(err => next(err)) 16 return asyncMiddleware(f)(req, res, err => {
18 }, next) 17 if (err) return reject(err)
18
19 return resolve()
20 })
21 })
22 }).then(() => next())
23 .catch(err => next(err))
19 } 24 }
20 25
21 return Promise.resolve((fun as RequestHandler)(req, res, next)) 26 return Promise.resolve((fun as RequestHandler)(req, res, next))