-import { AsyncQueue, queue } from 'async'
+import { queue, QueueObject } from 'async'
import { logger } from '@server/helpers/logger'
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
import { MActorDefault, MActorSignature } from '@server/types/models'
import { Activity } from '@shared/models'
-import { processActivities } from './process'
import { StatsManager } from '../stat-manager'
+import { processActivities } from './process'
type QueueParam = {
activities: Activity[]
private static instance: InboxManager
- private readonly inboxQueue: AsyncQueue<QueueParam>
-
- private messagesProcessed = 0
+ private readonly inboxQueue: QueueObject<QueueParam>
private constructor () {
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
- this.messagesProcessed++
-
processActivities(task.activities, options)
.then(() => cb())
.catch(err => {
})
setInterval(() => {
- StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
- }, SCHEDULER_INTERVALS_MS.updateInboxStats)
+ StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
+ }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
}
addInboxMessage (options: QueueParam) {