From 9db437c8155f3563a33e22ed2896072a9f1fbdb0 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 13 Oct 2021 11:47:32 +0200 Subject: Process slow followers in unicast job queue --- server/lib/activitypub/send/utils.ts | 40 +++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) (limited to 'server/lib/activitypub/send/utils.ts') diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts index 7cd8030e1..7729703b8 100644 --- a/server/lib/activitypub/send/utils.ts +++ b/server/lib/activitypub/send/utils.ts @@ -1,4 +1,5 @@ import { Transaction } from 'sequelize' +import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { getServerActor } from '@server/models/application/application' import { ContextType } from '@shared/models/activitypub/context' import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' @@ -119,16 +120,41 @@ async function broadcastToActors ( function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { if (uris.length === 0) return undefined - logger.debug('Creating broadcast job.', { uris }) + const broadcastUris: string[] = [] + const unicastUris: string[] = [] - const payload = { - uris, - signatureActorId: byActor.id, - body: data, - contextType + // Bad URIs could be slow to respond, prefer to process them in a dedicated queue + for (const uri of uris) { + if (ActorFollowHealthCache.Instance.isBadInbox(uri)) { + unicastUris.push(uri) + } else { + broadcastUris.push(uri) + } + } + + logger.debug('Creating broadcast job.', { broadcastUris, unicastUris }) + + if (broadcastUris.length !== 0) { + const payload = { + uris: broadcastUris, + signatureActorId: byActor.id, + body: data, + contextType + } + + JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } - return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) + for (const unicastUri of unicastUris) { + const payload = { + uri: unicastUri, + signatureActorId: byActor.id, + body: data, + contextType + } + + JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) + } } function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { -- cgit v1.2.3