diff options
-rw-r--r-- | package.json | 2 | ||||
-rw-r--r-- | server/lib/activitypub/inbox-manager.ts | 37 | ||||
-rw-r--r-- | server/middlewares/async.ts | 19 | ||||
-rw-r--r-- | yarn.lock | 7 |
4 files changed, 27 insertions, 38 deletions
diff --git a/package.json b/package.json index a527a1880..24924c3da 100644 --- a/package.json +++ b/package.json | |||
@@ -104,7 +104,6 @@ | |||
104 | "@peertube/feed": "^5.0.1", | 104 | "@peertube/feed": "^5.0.1", |
105 | "@peertube/http-signature": "^1.7.0", | 105 | "@peertube/http-signature": "^1.7.0", |
106 | "@uploadx/core": "^5.1.2", | 106 | "@uploadx/core": "^5.1.2", |
107 | "async": "^3.0.1", | ||
108 | "async-lru": "^1.1.1", | 107 | "async-lru": "^1.1.1", |
109 | "bcrypt": "5.0.1", | 108 | "bcrypt": "5.0.1", |
110 | "bencode": "^2.0.2", | 109 | "bencode": "^2.0.2", |
@@ -179,7 +178,6 @@ | |||
179 | }, | 178 | }, |
180 | "devDependencies": { | 179 | "devDependencies": { |
181 | "@peertube/maildev": "^1.2.0", | 180 | "@peertube/maildev": "^1.2.0", |
182 | "@types/async": "^3.0.0", | ||
183 | "@types/async-lock": "^1.1.0", | 181 | "@types/async-lock": "^1.1.0", |
184 | "@types/bcrypt": "^5.0.0", | 182 | "@types/bcrypt": "^5.0.0", |
185 | "@types/bencode": "^2.0.0", | 183 | "@types/bencode": "^2.0.0", |
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index f2785d6ce..27778cc9d 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { queue, QueueObject } from 'async' | 1 | import PQueue from 'p-queue' |
2 | import { logger } from '@server/helpers/logger' | 2 | import { logger } from '@server/helpers/logger' |
3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' | 3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' |
4 | import { MActorDefault, MActorSignature } from '@server/types/models' | 4 | import { MActorDefault, MActorSignature } from '@server/types/models' |
@@ -6,42 +6,33 @@ import { Activity } from '@shared/models' | |||
6 | import { StatsManager } from '../stat-manager' | 6 | import { StatsManager } from '../stat-manager' |
7 | import { processActivities } from './process' | 7 | import { processActivities } from './process' |
8 | 8 | ||
9 | type QueueParam = { | ||
10 | activities: Activity[] | ||
11 | signatureActor?: MActorSignature | ||
12 | inboxActor?: MActorDefault | ||
13 | } | ||
14 | |||
15 | class InboxManager { | 9 | class InboxManager { |
16 | 10 | ||
17 | private static instance: InboxManager | 11 | private static instance: InboxManager |
18 | 12 | private readonly inboxQueue: PQueue | |
19 | private readonly inboxQueue: QueueObject<QueueParam> | ||
20 | 13 | ||
21 | private constructor () { | 14 | private constructor () { |
22 | this.inboxQueue = queue<QueueParam, Error>((task, cb) => { | 15 | this.inboxQueue = new PQueue({ concurrency: 1 }) |
23 | const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } | ||
24 | |||
25 | processActivities(task.activities, options) | ||
26 | .then(() => cb()) | ||
27 | .catch(err => { | ||
28 | logger.error('Error in process activities.', { err }) | ||
29 | cb() | ||
30 | }) | ||
31 | }) | ||
32 | 16 | ||
33 | setInterval(() => { | 17 | setInterval(() => { |
34 | StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) | 18 | StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) |
35 | }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) | 19 | }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) |
36 | } | 20 | } |
37 | 21 | ||
38 | addInboxMessage (options: QueueParam) { | 22 | addInboxMessage (param: { |
39 | this.inboxQueue.push(options) | 23 | activities: Activity[] |
40 | .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) | 24 | signatureActor?: MActorSignature |
25 | inboxActor?: MActorDefault | ||
26 | }) { | ||
27 | this.inboxQueue.add(() => { | ||
28 | const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor } | ||
29 | |||
30 | return processActivities(param.activities, options) | ||
31 | }).catch(err => logger.error('Error with inbox queue.', { err })) | ||
41 | } | 32 | } |
42 | 33 | ||
43 | getActivityPubMessagesWaiting () { | 34 | getActivityPubMessagesWaiting () { |
44 | return this.inboxQueue.length() + this.inboxQueue.running() | 35 | return this.inboxQueue.size + this.inboxQueue.pending |
45 | } | 36 | } |
46 | 37 | ||
47 | static get Instance () { | 38 | static get Instance () { |
diff --git a/server/middlewares/async.ts b/server/middlewares/async.ts index 9d0193536..7e131257d 100644 --- a/server/middlewares/async.ts +++ b/server/middlewares/async.ts | |||
@@ -1,21 +1,26 @@ | |||
1 | import { eachSeries } from 'async' | 1 | import Bluebird from 'bluebird' |
2 | import { NextFunction, Request, RequestHandler, Response } from 'express' | 2 | import { NextFunction, Request, RequestHandler, Response } from 'express' |
3 | import { ValidationChain } from 'express-validator' | 3 | import { ValidationChain } from 'express-validator' |
4 | import { ExpressPromiseHandler } from '@server/types/express-handler' | 4 | import { ExpressPromiseHandler } from '@server/types/express-handler' |
5 | import { retryTransactionWrapper } from '../helpers/database-utils' | 5 | import { retryTransactionWrapper } from '../helpers/database-utils' |
6 | 6 | ||
7 | // Syntactic sugar to avoid try/catch in express controllers | 7 | // Syntactic sugar to avoid try/catch in express controllers/middlewares |
8 | // Thanks: https://medium.com/@Abazhenov/using-async-await-in-express-with-node-8-b8af872c0016 | ||
9 | 8 | ||
10 | export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler | 9 | export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler |
11 | 10 | ||
12 | function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) { | 11 | function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) { |
13 | return (req: Request, res: Response, next: NextFunction) => { | 12 | return (req: Request, res: Response, next: NextFunction) => { |
14 | if (Array.isArray(fun) === true) { | 13 | if (Array.isArray(fun) === true) { |
15 | return eachSeries(fun as RequestHandler[], (f, cb) => { | 14 | return Bluebird.each(fun as RequestPromiseHandler[], f => { |
16 | Promise.resolve(f(req, res, err => cb(err))) | 15 | return new Promise<void>((resolve, reject) => { |
17 | .catch(err => next(err)) | 16 | return asyncMiddleware(f)(req, res, err => { |
18 | }, next) | 17 | if (err) return reject(err) |
18 | |||
19 | return resolve() | ||
20 | }) | ||
21 | }) | ||
22 | }).then(() => next()) | ||
23 | .catch(err => next(err)) | ||
19 | } | 24 | } |
20 | 25 | ||
21 | return Promise.resolve((fun as RequestHandler)(req, res, next)) | 26 | return Promise.resolve((fun as RequestHandler)(req, res, next)) |
@@ -1925,11 +1925,6 @@ | |||
1925 | resolved "https://registry.yarnpkg.com/@types/async-lock/-/async-lock-1.1.5.tgz#a82f33e09aef451d6ded7bffae73f9d254723124" | 1925 | resolved "https://registry.yarnpkg.com/@types/async-lock/-/async-lock-1.1.5.tgz#a82f33e09aef451d6ded7bffae73f9d254723124" |
1926 | integrity sha512-A9ClUfmj6wwZMLRz0NaYzb98YH1exlHdf/cdDSKBfMQJnPOdO8xlEW0Eh2QsTTntGzOFWURcEjYElkZ1IY4GCQ== | 1926 | integrity sha512-A9ClUfmj6wwZMLRz0NaYzb98YH1exlHdf/cdDSKBfMQJnPOdO8xlEW0Eh2QsTTntGzOFWURcEjYElkZ1IY4GCQ== |
1927 | 1927 | ||
1928 | "@types/async@^3.0.0": | ||
1929 | version "3.2.15" | ||
1930 | resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.15.tgz#26d4768fdda0e466f18d6c9918ca28cc89a4e1fe" | ||
1931 | integrity sha512-PAmPfzvFA31mRoqZyTVsgJMsvbynR429UTTxhmfsUCrWGh3/fxOrzqBtaTPJsn4UtzTv4Vb0+/O7CARWb69N4g== | ||
1932 | |||
1933 | "@types/bcrypt@^5.0.0": | 1928 | "@types/bcrypt@^5.0.0": |
1934 | version "5.0.0" | 1929 | version "5.0.0" |
1935 | resolved "https://registry.yarnpkg.com/@types/bcrypt/-/bcrypt-5.0.0.tgz#a835afa2882d165aff5690893db314eaa98b9f20" | 1930 | resolved "https://registry.yarnpkg.com/@types/bcrypt/-/bcrypt-5.0.0.tgz#a835afa2882d165aff5690893db314eaa98b9f20" |
@@ -2802,7 +2797,7 @@ async@3.2.3: | |||
2802 | resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9" | 2797 | resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9" |
2803 | integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g== | 2798 | integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g== |
2804 | 2799 | ||
2805 | async@>=0.2.9, async@^3.0.1, async@^3.1.0, async@^3.2.3: | 2800 | async@>=0.2.9, async@^3.1.0, async@^3.2.3: |
2806 | version "3.2.4" | 2801 | version "3.2.4" |
2807 | resolved "https://registry.yarnpkg.com/async/-/async-3.2.4.tgz#2d22e00f8cddeb5fde5dd33522b56d1cf569a81c" | 2802 | resolved "https://registry.yarnpkg.com/async/-/async-3.2.4.tgz#2d22e00f8cddeb5fde5dd33522b56d1cf569a81c" |
2808 | integrity sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ== | 2803 | integrity sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ== |