aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--package.json2
-rw-r--r--server/lib/activitypub/inbox-manager.ts37
-rw-r--r--server/middlewares/async.ts19
-rw-r--r--yarn.lock7
4 files changed, 27 insertions, 38 deletions
diff --git a/package.json b/package.json
index a527a1880..24924c3da 100644
--- a/package.json
+++ b/package.json
@@ -104,7 +104,6 @@
104 "@peertube/feed": "^5.0.1", 104 "@peertube/feed": "^5.0.1",
105 "@peertube/http-signature": "^1.7.0", 105 "@peertube/http-signature": "^1.7.0",
106 "@uploadx/core": "^5.1.2", 106 "@uploadx/core": "^5.1.2",
107 "async": "^3.0.1",
108 "async-lru": "^1.1.1", 107 "async-lru": "^1.1.1",
109 "bcrypt": "5.0.1", 108 "bcrypt": "5.0.1",
110 "bencode": "^2.0.2", 109 "bencode": "^2.0.2",
@@ -179,7 +178,6 @@
179 }, 178 },
180 "devDependencies": { 179 "devDependencies": {
181 "@peertube/maildev": "^1.2.0", 180 "@peertube/maildev": "^1.2.0",
182 "@types/async": "^3.0.0",
183 "@types/async-lock": "^1.1.0", 181 "@types/async-lock": "^1.1.0",
184 "@types/bcrypt": "^5.0.0", 182 "@types/bcrypt": "^5.0.0",
185 "@types/bencode": "^2.0.0", 183 "@types/bencode": "^2.0.0",
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
index f2785d6ce..27778cc9d 100644
--- a/server/lib/activitypub/inbox-manager.ts
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -1,4 +1,4 @@
1import { queue, QueueObject } from 'async' 1import PQueue from 'p-queue'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' 3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models' 4import { MActorDefault, MActorSignature } from '@server/types/models'
@@ -6,42 +6,33 @@ import { Activity } from '@shared/models'
6import { StatsManager } from '../stat-manager' 6import { StatsManager } from '../stat-manager'
7import { processActivities } from './process' 7import { processActivities } from './process'
8 8
9type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13}
14
15class InboxManager { 9class InboxManager {
16 10
17 private static instance: InboxManager 11 private static instance: InboxManager
18 12 private readonly inboxQueue: PQueue
19 private readonly inboxQueue: QueueObject<QueueParam>
20 13
21 private constructor () { 14 private constructor () {
22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => { 15 this.inboxQueue = new PQueue({ concurrency: 1 })
23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
24
25 processActivities(task.activities, options)
26 .then(() => cb())
27 .catch(err => {
28 logger.error('Error in process activities.', { err })
29 cb()
30 })
31 })
32 16
33 setInterval(() => { 17 setInterval(() => {
34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) 18 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
35 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) 19 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
36 } 20 }
37 21
38 addInboxMessage (options: QueueParam) { 22 addInboxMessage (param: {
39 this.inboxQueue.push(options) 23 activities: Activity[]
40 .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) 24 signatureActor?: MActorSignature
25 inboxActor?: MActorDefault
26 }) {
27 this.inboxQueue.add(() => {
28 const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
29
30 return processActivities(param.activities, options)
31 }).catch(err => logger.error('Error with inbox queue.', { err }))
41 } 32 }
42 33
43 getActivityPubMessagesWaiting () { 34 getActivityPubMessagesWaiting () {
44 return this.inboxQueue.length() + this.inboxQueue.running() 35 return this.inboxQueue.size + this.inboxQueue.pending
45 } 36 }
46 37
47 static get Instance () { 38 static get Instance () {
diff --git a/server/middlewares/async.ts b/server/middlewares/async.ts
index 9d0193536..7e131257d 100644
--- a/server/middlewares/async.ts
+++ b/server/middlewares/async.ts
@@ -1,21 +1,26 @@
1import { eachSeries } from 'async' 1import Bluebird from 'bluebird'
2import { NextFunction, Request, RequestHandler, Response } from 'express' 2import { NextFunction, Request, RequestHandler, Response } from 'express'
3import { ValidationChain } from 'express-validator' 3import { ValidationChain } from 'express-validator'
4import { ExpressPromiseHandler } from '@server/types/express-handler' 4import { ExpressPromiseHandler } from '@server/types/express-handler'
5import { retryTransactionWrapper } from '../helpers/database-utils' 5import { retryTransactionWrapper } from '../helpers/database-utils'
6 6
7// Syntactic sugar to avoid try/catch in express controllers 7// Syntactic sugar to avoid try/catch in express controllers/middlewares
8// Thanks: https://medium.com/@Abazhenov/using-async-await-in-express-with-node-8-b8af872c0016
9 8
10export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler 9export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler
11 10
12function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) { 11function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) {
13 return (req: Request, res: Response, next: NextFunction) => { 12 return (req: Request, res: Response, next: NextFunction) => {
14 if (Array.isArray(fun) === true) { 13 if (Array.isArray(fun) === true) {
15 return eachSeries(fun as RequestHandler[], (f, cb) => { 14 return Bluebird.each(fun as RequestPromiseHandler[], f => {
16 Promise.resolve(f(req, res, err => cb(err))) 15 return new Promise<void>((resolve, reject) => {
17 .catch(err => next(err)) 16 return asyncMiddleware(f)(req, res, err => {
18 }, next) 17 if (err) return reject(err)
18
19 return resolve()
20 })
21 })
22 }).then(() => next())
23 .catch(err => next(err))
19 } 24 }
20 25
21 return Promise.resolve((fun as RequestHandler)(req, res, next)) 26 return Promise.resolve((fun as RequestHandler)(req, res, next))
diff --git a/yarn.lock b/yarn.lock
index 090abda20..db5433be5 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -1925,11 +1925,6 @@
1925 resolved "https://registry.yarnpkg.com/@types/async-lock/-/async-lock-1.1.5.tgz#a82f33e09aef451d6ded7bffae73f9d254723124" 1925 resolved "https://registry.yarnpkg.com/@types/async-lock/-/async-lock-1.1.5.tgz#a82f33e09aef451d6ded7bffae73f9d254723124"
1926 integrity sha512-A9ClUfmj6wwZMLRz0NaYzb98YH1exlHdf/cdDSKBfMQJnPOdO8xlEW0Eh2QsTTntGzOFWURcEjYElkZ1IY4GCQ== 1926 integrity sha512-A9ClUfmj6wwZMLRz0NaYzb98YH1exlHdf/cdDSKBfMQJnPOdO8xlEW0Eh2QsTTntGzOFWURcEjYElkZ1IY4GCQ==
1927 1927
1928"@types/async@^3.0.0":
1929 version "3.2.15"
1930 resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.15.tgz#26d4768fdda0e466f18d6c9918ca28cc89a4e1fe"
1931 integrity sha512-PAmPfzvFA31mRoqZyTVsgJMsvbynR429UTTxhmfsUCrWGh3/fxOrzqBtaTPJsn4UtzTv4Vb0+/O7CARWb69N4g==
1932
1933"@types/bcrypt@^5.0.0": 1928"@types/bcrypt@^5.0.0":
1934 version "5.0.0" 1929 version "5.0.0"
1935 resolved "https://registry.yarnpkg.com/@types/bcrypt/-/bcrypt-5.0.0.tgz#a835afa2882d165aff5690893db314eaa98b9f20" 1930 resolved "https://registry.yarnpkg.com/@types/bcrypt/-/bcrypt-5.0.0.tgz#a835afa2882d165aff5690893db314eaa98b9f20"
@@ -2802,7 +2797,7 @@ async@3.2.3:
2802 resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9" 2797 resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9"
2803 integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g== 2798 integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g==
2804 2799
2805async@>=0.2.9, async@^3.0.1, async@^3.1.0, async@^3.2.3: 2800async@>=0.2.9, async@^3.1.0, async@^3.2.3:
2806 version "3.2.4" 2801 version "3.2.4"
2807 resolved "https://registry.yarnpkg.com/async/-/async-3.2.4.tgz#2d22e00f8cddeb5fde5dd33522b56d1cf569a81c" 2802 resolved "https://registry.yarnpkg.com/async/-/async-3.2.4.tgz#2d22e00f8cddeb5fde5dd33522b56d1cf569a81c"
2808 integrity sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ== 2803 integrity sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==