From 7a9e420a02434e4f16c99e7d58da9075dff25d15 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 2 Aug 2022 14:41:44 +0200 Subject: Remove uneeded async --- package.json | 2 -- server/lib/activitypub/inbox-manager.ts | 37 +++++++++++++-------------------- server/middlewares/async.ts | 19 ++++++++++------- yarn.lock | 7 +------ 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/package.json b/package.json index a527a1880..24924c3da 100644 --- a/package.json +++ b/package.json @@ -104,7 +104,6 @@ "@peertube/feed": "^5.0.1", "@peertube/http-signature": "^1.7.0", "@uploadx/core": "^5.1.2", - "async": "^3.0.1", "async-lru": "^1.1.1", "bcrypt": "5.0.1", "bencode": "^2.0.2", @@ -179,7 +178,6 @@ }, "devDependencies": { "@peertube/maildev": "^1.2.0", - "@types/async": "^3.0.0", "@types/async-lock": "^1.1.0", "@types/bcrypt": "^5.0.0", "@types/bencode": "^2.0.0", diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index f2785d6ce..27778cc9d 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts @@ -1,4 +1,4 @@ -import { queue, QueueObject } from 'async' +import PQueue from 'p-queue' import { logger } from '@server/helpers/logger' import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' import { MActorDefault, MActorSignature } from '@server/types/models' @@ -6,42 +6,33 @@ import { Activity } from '@shared/models' import { StatsManager } from '../stat-manager' import { processActivities } from './process' -type QueueParam = { - activities: Activity[] - signatureActor?: MActorSignature - inboxActor?: MActorDefault -} - class InboxManager { private static instance: InboxManager - - private readonly inboxQueue: QueueObject + private readonly inboxQueue: PQueue private constructor () { - this.inboxQueue = queue((task, cb) => { - const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } - - processActivities(task.activities, options) - .then(() => cb()) - .catch(err => { - logger.error('Error in process activities.', { err }) - cb() - }) - }) + this.inboxQueue = new PQueue({ concurrency: 1 }) setInterval(() => { StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) } - addInboxMessage (options: QueueParam) { - this.inboxQueue.push(options) - .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) + addInboxMessage (param: { + activities: Activity[] + signatureActor?: MActorSignature + inboxActor?: MActorDefault + }) { + this.inboxQueue.add(() => { + const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor } + + return processActivities(param.activities, options) + }).catch(err => logger.error('Error with inbox queue.', { err })) } getActivityPubMessagesWaiting () { - return this.inboxQueue.length() + this.inboxQueue.running() + return this.inboxQueue.size + this.inboxQueue.pending } static get Instance () { diff --git a/server/middlewares/async.ts b/server/middlewares/async.ts index 9d0193536..7e131257d 100644 --- a/server/middlewares/async.ts +++ b/server/middlewares/async.ts @@ -1,21 +1,26 @@ -import { eachSeries } from 'async' +import Bluebird from 'bluebird' import { NextFunction, Request, RequestHandler, Response } from 'express' import { ValidationChain } from 'express-validator' import { ExpressPromiseHandler } from '@server/types/express-handler' import { retryTransactionWrapper } from '../helpers/database-utils' -// Syntactic sugar to avoid try/catch in express controllers -// Thanks: https://medium.com/@Abazhenov/using-async-await-in-express-with-node-8-b8af872c0016 +// Syntactic sugar to avoid try/catch in express controllers/middlewares export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) { return (req: Request, res: Response, next: NextFunction) => { if (Array.isArray(fun) === true) { - return eachSeries(fun as RequestHandler[], (f, cb) => { - Promise.resolve(f(req, res, err => cb(err))) - .catch(err => next(err)) - }, next) + return Bluebird.each(fun as RequestPromiseHandler[], f => { + return new Promise((resolve, reject) => { + return asyncMiddleware(f)(req, res, err => { + if (err) return reject(err) + + return resolve() + }) + }) + }).then(() => next()) + .catch(err => next(err)) } return Promise.resolve((fun as RequestHandler)(req, res, next)) diff --git a/yarn.lock b/yarn.lock index 090abda20..db5433be5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1925,11 +1925,6 @@ resolved "https://registry.yarnpkg.com/@types/async-lock/-/async-lock-1.1.5.tgz#a82f33e09aef451d6ded7bffae73f9d254723124" integrity sha512-A9ClUfmj6wwZMLRz0NaYzb98YH1exlHdf/cdDSKBfMQJnPOdO8xlEW0Eh2QsTTntGzOFWURcEjYElkZ1IY4GCQ== -"@types/async@^3.0.0": - version "3.2.15" - resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.15.tgz#26d4768fdda0e466f18d6c9918ca28cc89a4e1fe" - integrity sha512-PAmPfzvFA31mRoqZyTVsgJMsvbynR429UTTxhmfsUCrWGh3/fxOrzqBtaTPJsn4UtzTv4Vb0+/O7CARWb69N4g== - "@types/bcrypt@^5.0.0": version "5.0.0" resolved "https://registry.yarnpkg.com/@types/bcrypt/-/bcrypt-5.0.0.tgz#a835afa2882d165aff5690893db314eaa98b9f20" @@ -2802,7 +2797,7 @@ async@3.2.3: resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9" integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g== -async@>=0.2.9, async@^3.0.1, async@^3.1.0, async@^3.2.3: +async@>=0.2.9, async@^3.1.0, async@^3.2.3: version "3.2.4" resolved "https://registry.yarnpkg.com/async/-/async-3.2.4.tgz#2d22e00f8cddeb5fde5dd33522b56d1cf569a81c" integrity sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ== -- cgit v1.2.3