aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub/send/utils.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-10-13 11:47:32 +0200
committerChocobozzz <me@florianbigard.com>2021-10-13 11:47:32 +0200
commit9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch)
tree716078fbe1506e0b0d19936f4939e9c530b3b8ab /server/lib/activitypub/send/utils.ts
parente81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff)
downloadPeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz
PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst
PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip
Process slow followers in unicast job queue
Diffstat (limited to 'server/lib/activitypub/send/utils.ts')
-rw-r--r--server/lib/activitypub/send/utils.ts40
1 files changed, 33 insertions, 7 deletions
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts
index 7cd8030e1..7729703b8 100644
--- a/server/lib/activitypub/send/utils.ts
+++ b/server/lib/activitypub/send/utils.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
2import { getServerActor } from '@server/models/application/application' 3import { getServerActor } from '@server/models/application/application'
3import { ContextType } from '@shared/models/activitypub/context' 4import { ContextType } from '@shared/models/activitypub/context'
4import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' 5import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
@@ -119,16 +120,41 @@ async function broadcastToActors (
119function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { 120function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
120 if (uris.length === 0) return undefined 121 if (uris.length === 0) return undefined
121 122
122 logger.debug('Creating broadcast job.', { uris }) 123 const broadcastUris: string[] = []
124 const unicastUris: string[] = []
123 125
124 const payload = { 126 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
125 uris, 127 for (const uri of uris) {
126 signatureActorId: byActor.id, 128 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
127 body: data, 129 unicastUris.push(uri)
128 contextType 130 } else {
131 broadcastUris.push(uri)
132 }
133 }
134
135 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
136
137 if (broadcastUris.length !== 0) {
138 const payload = {
139 uris: broadcastUris,
140 signatureActorId: byActor.id,
141 body: data,
142 contextType
143 }
144
145 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
129 } 146 }
130 147
131 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) 148 for (const unicastUri of unicastUris) {
149 const payload = {
150 uri: unicastUri,
151 signatureActorId: byActor.id,
152 body: data,
153 contextType
154 }
155
156 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
157 }
132} 158}
133 159
134function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { 160function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {