-import { AsyncQueue, queue } from 'async'
+import { queue, QueueObject } from 'async'
import { logger } from '@server/helpers/logger'
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
import { MActorDefault, MActorSignature } from '@server/types/models'
import { Activity } from '@shared/models'
-import { processActivities } from './process'
import { StatsManager } from '../stat-manager'
+import { processActivities } from './process'
type QueueParam = {
activities: Activity[]
private static instance: InboxManager
- private readonly inboxQueue: AsyncQueue<QueueParam>
-
- private messagesProcessed = 0
+ private readonly inboxQueue: QueueObject<QueueParam>
private constructor () {
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
- this.messagesProcessed++
-
processActivities(task.activities, options)
.then(() => cb())
.catch(err => {
})
setInterval(() => {
- StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
+ StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
}, SCHEDULER_INTERVALS_MS.updateInboxStats)
}
import { processViewActivity } from './process-view'
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
import { MActorDefault, MActorSignature } from '../../../types/models'
+import { StatsManager } from '@server/lib/stat-manager'
const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = {
Create: processCreateActivity,
try {
await activityProcessor({ activity, byActor, inboxActor, fromFetch })
+
+ StatsManager.Instance.addInboxProcessedSuccess(activity.type)
} catch (err) {
logger.warn('Cannot process activity %s.', activity.type, { err })
+
+ StatsManager.Instance.addInboxProcessedError(activity.type)
}
}
}
import { VideoModel } from '@server/models/video/video'
import { VideoCommentModel } from '@server/models/video/video-comment'
import { VideoFileModel } from '@server/models/video/video-file'
-import { ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models'
+import { ActivityType, ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models'
class StatsManager {
private readonly instanceStartDate = new Date()
- private inboxMessagesProcessed = 0
- private inboxMessagesWaiting = 0
+ private inboxMessages = {
+ processed: 0,
+ errors: 0,
+ successes: 0,
+ waiting: 0,
+ errorsPerType: this.buildAPPerType(),
+ successesPerType: this.buildAPPerType()
+ }
private constructor () {}
- updateInboxStats (inboxMessagesProcessed: number, inboxMessagesWaiting: number) {
- this.inboxMessagesProcessed = inboxMessagesProcessed
- this.inboxMessagesWaiting = inboxMessagesWaiting
+ updateInboxWaiting (inboxMessagesWaiting: number) {
+ this.inboxMessages.waiting = inboxMessagesWaiting
+ }
+
+ addInboxProcessedSuccess (type: ActivityType) {
+ this.inboxMessages.processed++
+ this.inboxMessages.successes++
+ this.inboxMessages.successesPerType[type]++
+ }
+
+ addInboxProcessedError (type: ActivityType) {
+ this.inboxMessages.processed++
+ this.inboxMessages.errors++
+ this.inboxMessages.errorsPerType[type]++
}
async getStats () {
videosRedundancy: videosRedundancyStats,
- totalActivityPubMessagesProcessed: this.inboxMessagesProcessed,
- activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(),
- totalActivityPubMessagesWaiting: this.inboxMessagesWaiting
+ ...this.buildAPStats()
}
return data
const now = new Date()
const startedSeconds = (now.getTime() - this.instanceStartDate.getTime()) / 1000
- return this.inboxMessagesProcessed / startedSeconds
+ return this.inboxMessages.processed / startedSeconds
}
private buildRedundancyStats () {
)
}
+ private buildAPPerType () {
+ return {
+ Create: 0,
+ Update: 0,
+ Delete: 0,
+ Follow: 0,
+ Accept: 0,
+ Reject: 0,
+ Announce: 0,
+ Undo: 0,
+ Like: 0,
+ Dislike: 0,
+ Flag: 0,
+ View: 0
+ }
+ }
+
+ private buildAPStats () {
+ return {
+ totalActivityPubMessagesProcessed: this.inboxMessages.processed,
+
+ totalActivityPubMessagesSuccesses: this.inboxMessages.successes,
+
+ // Dirty, but simpler and with type checking
+ totalActivityPubCreateMessagesSuccesses: this.inboxMessages.successesPerType.Create,
+ totalActivityPubUpdateMessagesSuccesses: this.inboxMessages.successesPerType.Update,
+ totalActivityPubDeleteMessagesSuccesses: this.inboxMessages.successesPerType.Delete,
+ totalActivityPubFollowMessagesSuccesses: this.inboxMessages.successesPerType.Follow,
+ totalActivityPubAcceptMessagesSuccesses: this.inboxMessages.successesPerType.Accept,
+ totalActivityPubRejectMessagesSuccesses: this.inboxMessages.successesPerType.Reject,
+ totalActivityPubAnnounceMessagesSuccesses: this.inboxMessages.successesPerType.Announce,
+ totalActivityPubUndoMessagesSuccesses: this.inboxMessages.successesPerType.Undo,
+ totalActivityPubLikeMessagesSuccesses: this.inboxMessages.successesPerType.Like,
+ totalActivityPubDislikeMessagesSuccesses: this.inboxMessages.successesPerType.Dislike,
+ totalActivityPubFlagMessagesSuccesses: this.inboxMessages.successesPerType.Flag,
+ totalActivityPubViewMessagesSuccesses: this.inboxMessages.successesPerType.View,
+
+ totalActivityPubCreateMessagesErrors: this.inboxMessages.errorsPerType.Create,
+ totalActivityPubUpdateMessagesErrors: this.inboxMessages.errorsPerType.Update,
+ totalActivityPubDeleteMessagesErrors: this.inboxMessages.errorsPerType.Delete,
+ totalActivityPubFollowMessagesErrors: this.inboxMessages.errorsPerType.Follow,
+ totalActivityPubAcceptMessagesErrors: this.inboxMessages.errorsPerType.Accept,
+ totalActivityPubRejectMessagesErrors: this.inboxMessages.errorsPerType.Reject,
+ totalActivityPubAnnounceMessagesErrors: this.inboxMessages.errorsPerType.Announce,
+ totalActivityPubUndoMessagesErrors: this.inboxMessages.errorsPerType.Undo,
+ totalActivityPubLikeMessagesErrors: this.inboxMessages.errorsPerType.Like,
+ totalActivityPubDislikeMessagesErrors: this.inboxMessages.errorsPerType.Dislike,
+ totalActivityPubFlagMessagesErrors: this.inboxMessages.errorsPerType.Flag,
+ totalActivityPubViewMessagesErrors: this.inboxMessages.errorsPerType.View,
+
+ totalActivityPubMessagesErrors: this.inboxMessages.errors,
+
+ activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(),
+ totalActivityPubMessagesWaiting: this.inboxMessages.waiting
+ }
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}
import { getStats } from '../../../../shared/extra-utils/server/stats'
import { addVideoCommentThread } from '../../../../shared/extra-utils/videos/video-comments'
import { ServerStats } from '../../../../shared/models/server/server-stats.model'
+import { ActivityType } from '@shared/models'
const expect = chai.expect
const second: ServerStats = res2.body
expect(second.totalActivityPubMessagesProcessed).to.be.greaterThan(first.totalActivityPubMessagesProcessed)
+ const apTypes: ActivityType[] = [
+ 'Create', 'Update', 'Delete', 'Follow', 'Accept', 'Announce', 'Undo', 'Like', 'Reject', 'View', 'Dislike', 'Flag'
+ ]
+
+ const processed = apTypes.reduce(
+ (previous, type) => previous + second['totalActivityPub' + type + 'MessagesSuccesses'],
+ 0
+ )
+ expect(second.totalActivityPubMessagesProcessed).to.equal(processed)
+ expect(second.totalActivityPubMessagesSuccesses).to.equal(processed)
+
+ expect(second.totalActivityPubMessagesErrors).to.equal(0)
+
+ for (const apType of apTypes) {
+ expect(second['totalActivityPub' + apType + 'MessagesErrors']).to.equal(0)
+ }
await wait(6000)
import { VideoRedundancyStrategyWithManual } from '../redundancy'
-
export interface ServerStats {
totalUsers: number
totalDailyActiveUsers: number
videosRedundancy: VideosRedundancyStats[]
totalActivityPubMessagesProcessed: number
+ totalActivityPubMessagesSuccesses: number
+ totalActivityPubMessagesErrors: number
+
+ totalActivityPubCreateMessagesSuccesses: number
+ totalActivityPubUpdateMessagesSuccesses: number
+ totalActivityPubDeleteMessagesSuccesses: number
+ totalActivityPubFollowMessagesSuccesses: number
+ totalActivityPubAcceptMessagesSuccesses: number
+ totalActivityPubRejectMessagesSuccesses: number
+ totalActivityPubAnnounceMessagesSuccesses: number
+ totalActivityPubUndoMessagesSuccesses: number
+ totalActivityPubLikeMessagesSuccesses: number
+ totalActivityPubDislikeMessagesSuccesses: number
+ totalActivityPubFlagMessagesSuccesses: number
+ totalActivityPubViewMessagesSuccesses: number
+
+ totalActivityPubCreateMessagesErrors: number
+ totalActivityPubUpdateMessagesErrors: number
+ totalActivityPubDeleteMessagesErrors: number
+ totalActivityPubFollowMessagesErrors: number
+ totalActivityPubAcceptMessagesErrors: number
+ totalActivityPubRejectMessagesErrors: number
+ totalActivityPubAnnounceMessagesErrors: number
+ totalActivityPubUndoMessagesErrors: number
+ totalActivityPubLikeMessagesErrors: number
+ totalActivityPubDislikeMessagesErrors: number
+ totalActivityPubFlagMessagesErrors: number
+ totalActivityPubViewMessagesErrors: number
+
activityPubMessagesProcessedPerSecond: number
totalActivityPubMessagesWaiting: number
}