]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/activitypub/send/shared/send-utils.ts
Optimize broadcast job creation
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / send / shared / send-utils.ts
CommitLineData
54141398 1import { Transaction } from 'sequelize'
9db437c8 2import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
7d9ba5c0 3import { getServerActor } from '@server/models/application/application'
a219c910 4import { Activity, ActivityAudience, ActivitypubHttpBroadcastPayload } from '@shared/models'
7d9ba5c0 5import { ContextType } from '@shared/models/activitypub/context'
57e4e1c1
C
6import { afterCommitIfTransaction } from '../../../../helpers/database-utils'
7import { logger } from '../../../../helpers/logger'
8import { ActorModel } from '../../../../models/actor/actor'
9import { ActorFollowModel } from '../../../../models/actor/actor-follow'
10import { MActor, MActorId, MActorLight, MActorWithInboxes, MVideoAccountLight, MVideoId, MVideoImmutable } from '../../../../types/models'
11import { JobQueue } from '../../../job-queue'
12import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getOriginVideoAudience } from './audience-utils'
9588d4f4 13
a2377d15 14async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAudience) => Activity, options: {
a1587156 15 byActor: MActorLight
2c8776fc 16 video: MVideoImmutable | MVideoAccountLight
a219c910 17 contextType: ContextType
f51c02c7 18 transaction?: Transaction
a2377d15 19}) {
598edb8a 20 const { byActor, video, transaction, contextType } = options
5224c394 21
a2377d15 22 // Send to origin
5224c394 23 if (video.isOwned() === false) {
57e4e1c1 24 return sendVideoActivityToOrigin(activityBuilder, options)
a2377d15
C
25 }
26
3396e653
C
27 const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
28
a2377d15
C
29 // Send to followers
30 const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
31 const activity = activityBuilder(audience)
32
5224c394 33 const actorsException = [ byActor ]
2284f202 34
a219c910
C
35 return broadcastToFollowers({
36 data: activity,
37 byActor,
38 toFollowersOf: actorsInvolvedInVideo,
39 transaction,
40 actorsException,
41 contextType
42 })
a2377d15
C
43}
44
57e4e1c1
C
45async function sendVideoActivityToOrigin (activityBuilder: (audience: ActivityAudience) => Activity, options: {
46 byActor: MActorLight
47 video: MVideoImmutable | MVideoAccountLight
a219c910
C
48 contextType: ContextType
49
57e4e1c1
C
50 actorsInvolvedInVideo?: MActorLight[]
51 transaction?: Transaction
57e4e1c1
C
52}) {
53 const { byActor, video, actorsInvolvedInVideo, transaction, contextType } = options
54
55 if (video.isOwned()) throw new Error('Cannot send activity to owned video origin ' + video.url)
56
57 let accountActor: MActorLight = (video as MVideoAccountLight).VideoChannel?.Account?.Actor
58 if (!accountActor) accountActor = await ActorModel.loadAccountActorByVideoId(video.id, transaction)
59
60 const audience = getOriginVideoAudience(accountActor, actorsInvolvedInVideo)
61 const activity = activityBuilder(audience)
62
63 return afterCommitIfTransaction(transaction, () => {
a219c910
C
64 return unicastTo({
65 data: activity,
66 byActor,
67 toActorUrl: accountActor.getSharedInbox(),
68 contextType
69 })
57e4e1c1
C
70 })
71}
72
73// ---------------------------------------------------------------------------
74
9588d4f4
C
75async function forwardVideoRelatedActivity (
76 activity: Activity,
77 t: Transaction,
bdd428a6 78 followersException: MActorWithInboxes[],
943e5193 79 video: MVideoId
9588d4f4
C
80) {
81 // Mastodon does not add our announces in audience, so we forward to them manually
82 const additionalActors = await getActorsInvolvedInVideo(video, t)
83 const additionalFollowerUrls = additionalActors.map(a => a.followersUrl)
84
85 return forwardActivity(activity, t, followersException, additionalFollowerUrls)
86}
63c93323
C
87
88async function forwardActivity (
89 activity: Activity,
90 t: Transaction,
47581df0 91 followersException: MActorWithInboxes[] = [],
93ef8a9d 92 additionalFollowerUrls: string[] = []
63c93323 93) {
8e0fd45e
C
94 logger.info('Forwarding activity %s.', activity.id)
95
63c93323
C
96 const to = activity.to || []
97 const cc = activity.cc || []
98
93ef8a9d 99 const followersUrls = additionalFollowerUrls
63c93323
C
100 for (const dest of to.concat(cc)) {
101 if (dest.endsWith('/followers')) {
102 followersUrls.push(dest)
103 }
104 }
105
50d6de9c
C
106 const toActorFollowers = await ActorModel.listByFollowersUrls(followersUrls, t)
107 const uris = await computeFollowerUris(toActorFollowers, followersException, t)
63c93323
C
108
109 if (uris.length === 0) {
50d6de9c 110 logger.info('0 followers for %s, no forwarding.', toActorFollowers.map(a => a.id).join(', '))
df1966c9 111 return undefined
63c93323
C
112 }
113
114 logger.debug('Creating forwarding job.', { uris })
115
a219c910 116 const payload: ActivitypubHttpBroadcastPayload = {
63c93323 117 uris,
a219c910
C
118 body: activity,
119 contextType: null
63c93323 120 }
2284f202 121 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
63c93323 122}
54141398 123
57e4e1c1
C
124// ---------------------------------------------------------------------------
125
a219c910
C
126async function broadcastToFollowers (options: {
127 data: any
128 byActor: MActorId
129 toFollowersOf: MActorId[]
130 transaction: Transaction
131 contextType: ContextType
132
133 actorsException?: MActorWithInboxes[]
134}) {
135 const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options
2284f202 136
a219c910
C
137 const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
138
139 return afterCommitIfTransaction(transaction, () => {
140 return broadcastTo({
141 uris,
142 data,
143 byActor,
144 contextType
145 })
146 })
93ef8a9d
C
147}
148
a219c910
C
149async function broadcastToActors (options: {
150 data: any
151 byActor: MActorId
152 toActors: MActor[]
153 transaction: Transaction
154 contextType: ContextType
155 actorsException?: MActorWithInboxes[]
156}) {
157 const { data, byActor, toActors, transaction, contextType, actorsException = [] } = options
158
93ef8a9d 159 const uris = await computeUris(toActors, actorsException)
a219c910
C
160
161 return afterCommitIfTransaction(transaction, () => {
162 return broadcastTo({
163 uris,
164 data,
165 byActor,
166 contextType
167 })
168 })
93ef8a9d
C
169}
170
a219c910
C
171function broadcastTo (options: {
172 uris: string[]
173 data: any
174 byActor: MActorId
175 contextType: ContextType
176}) {
177 const { uris, data, byActor, contextType } = options
178
93ef8a9d 179 if (uris.length === 0) return undefined
54141398 180
9db437c8
C
181 const broadcastUris: string[] = []
182 const unicastUris: string[] = []
40ff5707 183
9db437c8
C
184 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
185 for (const uri of uris) {
186 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
187 unicastUris.push(uri)
188 } else {
189 broadcastUris.push(uri)
190 }
191 }
192
193 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
194
195 if (broadcastUris.length !== 0) {
196 const payload = {
197 uris: broadcastUris,
198 signatureActorId: byActor.id,
199 body: data,
200 contextType
201 }
202
203 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
54141398
C
204 }
205
9db437c8
C
206 for (const unicastUri of unicastUris) {
207 const payload = {
208 uri: unicastUri,
209 signatureActorId: byActor.id,
210 body: data,
211 contextType
212 }
213
214 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
215 }
54141398
C
216}
217
a219c910
C
218function unicastTo (options: {
219 data: any
220 byActor: MActorId
221 toActorUrl: string
222 contextType: ContextType
223}) {
224 const { data, byActor, toActorUrl, contextType } = options
225
50d6de9c 226 logger.debug('Creating unicast job.', { uri: toActorUrl })
63c93323 227
94a5ff8a
C
228 const payload = {
229 uri: toActorUrl,
50d6de9c 230 signatureActorId: byActor.id,
598edb8a
C
231 body: data,
232 contextType
54141398
C
233 }
234
2284f202 235 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
54141398
C
236}
237
e251f170 238// ---------------------------------------------------------------------------
54141398 239
e251f170
C
240export {
241 broadcastToFollowers,
242 unicastTo,
243 forwardActivity,
9588d4f4 244 broadcastToActors,
57e4e1c1 245 sendVideoActivityToOrigin,
a2377d15
C
246 forwardVideoRelatedActivity,
247 sendVideoRelatedActivity
54141398
C
248}
249
e251f170 250// ---------------------------------------------------------------------------
e12a0092 251
47581df0 252async function computeFollowerUris (toFollowersOf: MActorId[], actorsException: MActorWithInboxes[], t: Transaction) {
c48e82b5 253 const toActorFollowerIds = toFollowersOf.map(a => a.id)
63c93323 254
50d6de9c 255 const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t)
06a05d5f
C
256 const sharedInboxesException = await buildSharedInboxesException(actorsException)
257
bdd428a6 258 return result.data.filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
93ef8a9d
C
259}
260
47581df0 261async function computeUris (toActors: MActor[], actorsException: MActorWithInboxes[] = []) {
06a05d5f
C
262 const serverActor = await getServerActor()
263 const targetUrls = toActors
264 .filter(a => a.id !== serverActor.id) // Don't send to ourselves
47581df0 265 .map(a => a.getSharedInbox())
06a05d5f
C
266
267 const toActorSharedInboxesSet = new Set(targetUrls)
93ef8a9d 268
06a05d5f 269 const sharedInboxesException = await buildSharedInboxesException(actorsException)
93ef8a9d 270 return Array.from(toActorSharedInboxesSet)
bdd428a6 271 .filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
54141398 272}
06a05d5f 273
47581df0 274async function buildSharedInboxesException (actorsException: MActorWithInboxes[]) {
06a05d5f
C
275 const serverActor = await getServerActor()
276
277 return actorsException
47581df0 278 .map(f => f.getSharedInbox())
06a05d5f
C
279 .concat([ serverActor.sharedInboxUrl ])
280}