-import { AsyncQueue, queue } from 'async'
+import PQueue from 'p-queue'
import { logger } from '@server/helpers/logger'
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
import { MActorDefault, MActorSignature } from '@server/types/models'
import { Activity } from '@shared/models'
-import { processActivities } from './process'
import { StatsManager } from '../stat-manager'
-
-type QueueParam = {
- activities: Activity[]
- signatureActor?: MActorSignature
- inboxActor?: MActorDefault
-}
+import { processActivities } from './process'
class InboxManager {
private static instance: InboxManager
-
- private readonly inboxQueue: AsyncQueue<QueueParam>
-
- private messagesProcessed = 0
+ private readonly inboxQueue: PQueue
private constructor () {
- this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
- const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
-
- this.messagesProcessed++
-
- processActivities(task.activities, options)
- .then(() => cb())
- .catch(err => {
- logger.error('Error in process activities.', { err })
- cb()
- })
- })
+ this.inboxQueue = new PQueue({ concurrency: 1 })
setInterval(() => {
- StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
- }, SCHEDULER_INTERVALS_MS.updateInboxStats)
+ StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
+ }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
}
- addInboxMessage (options: QueueParam) {
- this.inboxQueue.push(options)
- .catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
+ addInboxMessage (param: {
+ activities: Activity[]
+ signatureActor?: MActorSignature
+ inboxActor?: MActorDefault
+ }) {
+ this.inboxQueue.add(() => {
+ const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
+
+ return processActivities(param.activities, options)
+ }).catch(err => logger.error('Error with inbox queue.', { err }))
}
getActivityPubMessagesWaiting () {
- return this.inboxQueue.length() + this.inboxQueue.running()
+ return this.inboxQueue.size + this.inboxQueue.pending
}
static get Instance () {