]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/activitypub/send/shared/send-utils.ts
Bumped to version v5.2.1
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / send / shared / send-utils.ts
1 import { Transaction } from 'sequelize'
2 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
3 import { getServerActor } from '@server/models/application/application'
4 import { Activity, ActivityAudience, ActivitypubHttpBroadcastPayload } from '@shared/models'
5 import { ContextType } from '@shared/models/activitypub/context'
6 import { afterCommitIfTransaction } from '../../../../helpers/database-utils'
7 import { logger } from '../../../../helpers/logger'
8 import { ActorModel } from '../../../../models/actor/actor'
9 import { ActorFollowModel } from '../../../../models/actor/actor-follow'
10 import { MActor, MActorId, MActorLight, MActorWithInboxes, MVideoAccountLight, MVideoId, MVideoImmutable } from '../../../../types/models'
11 import { JobQueue } from '../../../job-queue'
12 import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getOriginVideoAudience } from './audience-utils'
13
14 async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAudience) => Activity, options: {
15 byActor: MActorLight
16 video: MVideoImmutable | MVideoAccountLight
17 contextType: ContextType
18 parallelizable?: boolean
19 transaction?: Transaction
20 }) {
21 const { byActor, video, transaction, contextType, parallelizable } = options
22
23 // Send to origin
24 if (video.isOwned() === false) {
25 return sendVideoActivityToOrigin(activityBuilder, options)
26 }
27
28 const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
29
30 // Send to followers
31 const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
32 const activity = activityBuilder(audience)
33
34 const actorsException = [ byActor ]
35
36 return broadcastToFollowers({
37 data: activity,
38 byActor,
39 toFollowersOf: actorsInvolvedInVideo,
40 transaction,
41 actorsException,
42 parallelizable,
43 contextType
44 })
45 }
46
47 async function sendVideoActivityToOrigin (activityBuilder: (audience: ActivityAudience) => Activity, options: {
48 byActor: MActorLight
49 video: MVideoImmutable | MVideoAccountLight
50 contextType: ContextType
51
52 actorsInvolvedInVideo?: MActorLight[]
53 transaction?: Transaction
54 }) {
55 const { byActor, video, actorsInvolvedInVideo, transaction, contextType } = options
56
57 if (video.isOwned()) throw new Error('Cannot send activity to owned video origin ' + video.url)
58
59 let accountActor: MActorLight = (video as MVideoAccountLight).VideoChannel?.Account?.Actor
60 if (!accountActor) accountActor = await ActorModel.loadAccountActorByVideoId(video.id, transaction)
61
62 const audience = getOriginVideoAudience(accountActor, actorsInvolvedInVideo)
63 const activity = activityBuilder(audience)
64
65 return afterCommitIfTransaction(transaction, () => {
66 return unicastTo({
67 data: activity,
68 byActor,
69 toActorUrl: accountActor.getSharedInbox(),
70 contextType
71 })
72 })
73 }
74
75 // ---------------------------------------------------------------------------
76
77 async function forwardVideoRelatedActivity (
78 activity: Activity,
79 t: Transaction,
80 followersException: MActorWithInboxes[],
81 video: MVideoId
82 ) {
83 // Mastodon does not add our announces in audience, so we forward to them manually
84 const additionalActors = await getActorsInvolvedInVideo(video, t)
85 const additionalFollowerUrls = additionalActors.map(a => a.followersUrl)
86
87 return forwardActivity(activity, t, followersException, additionalFollowerUrls)
88 }
89
90 async function forwardActivity (
91 activity: Activity,
92 t: Transaction,
93 followersException: MActorWithInboxes[] = [],
94 additionalFollowerUrls: string[] = []
95 ) {
96 logger.info('Forwarding activity %s.', activity.id)
97
98 const to = activity.to || []
99 const cc = activity.cc || []
100
101 const followersUrls = additionalFollowerUrls
102 for (const dest of to.concat(cc)) {
103 if (dest.endsWith('/followers')) {
104 followersUrls.push(dest)
105 }
106 }
107
108 const toActorFollowers = await ActorModel.listByFollowersUrls(followersUrls, t)
109 const uris = await computeFollowerUris(toActorFollowers, followersException, t)
110
111 if (uris.length === 0) {
112 logger.info('0 followers for %s, no forwarding.', toActorFollowers.map(a => a.id).join(', '))
113 return undefined
114 }
115
116 logger.debug('Creating forwarding job.', { uris })
117
118 const payload: ActivitypubHttpBroadcastPayload = {
119 uris,
120 body: activity,
121 contextType: null
122 }
123 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload }))
124 }
125
126 // ---------------------------------------------------------------------------
127
128 async function broadcastToFollowers (options: {
129 data: any
130 byActor: MActorId
131 toFollowersOf: MActorId[]
132 transaction: Transaction
133 contextType: ContextType
134
135 parallelizable?: boolean
136 actorsException?: MActorWithInboxes[]
137 }) {
138 const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
139
140 const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
141
142 return afterCommitIfTransaction(transaction, () => {
143 return broadcastTo({
144 uris,
145 data,
146 byActor,
147 parallelizable,
148 contextType
149 })
150 })
151 }
152
153 async function broadcastToActors (options: {
154 data: any
155 byActor: MActorId
156 toActors: MActor[]
157 transaction: Transaction
158 contextType: ContextType
159 actorsException?: MActorWithInboxes[]
160 }) {
161 const { data, byActor, toActors, transaction, contextType, actorsException = [] } = options
162
163 const uris = await computeUris(toActors, actorsException)
164
165 return afterCommitIfTransaction(transaction, () => {
166 return broadcastTo({
167 uris,
168 data,
169 byActor,
170 contextType
171 })
172 })
173 }
174
175 function broadcastTo (options: {
176 uris: string[]
177 data: any
178 byActor: MActorId
179 contextType: ContextType
180 parallelizable?: boolean // default to false
181 }) {
182 const { uris, data, byActor, contextType, parallelizable } = options
183
184 if (uris.length === 0) return undefined
185
186 const broadcastUris: string[] = []
187 const unicastUris: string[] = []
188
189 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
190 for (const uri of uris) {
191 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
192 unicastUris.push(uri)
193 } else {
194 broadcastUris.push(uri)
195 }
196 }
197
198 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
199
200 if (broadcastUris.length !== 0) {
201 const payload = {
202 uris: broadcastUris,
203 signatureActorId: byActor.id,
204 body: data,
205 contextType
206 }
207
208 JobQueue.Instance.createJobAsync({
209 type: parallelizable
210 ? 'activitypub-http-broadcast-parallel'
211 : 'activitypub-http-broadcast',
212
213 payload
214 })
215 }
216
217 for (const unicastUri of unicastUris) {
218 const payload = {
219 uri: unicastUri,
220 signatureActorId: byActor.id,
221 body: data,
222 contextType
223 }
224
225 JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
226 }
227 }
228
229 function unicastTo (options: {
230 data: any
231 byActor: MActorId
232 toActorUrl: string
233 contextType: ContextType
234 }) {
235 const { data, byActor, toActorUrl, contextType } = options
236
237 logger.debug('Creating unicast job.', { uri: toActorUrl })
238
239 const payload = {
240 uri: toActorUrl,
241 signatureActorId: byActor.id,
242 body: data,
243 contextType
244 }
245
246 JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
247 }
248
249 // ---------------------------------------------------------------------------
250
251 export {
252 broadcastToFollowers,
253 unicastTo,
254 forwardActivity,
255 broadcastToActors,
256 sendVideoActivityToOrigin,
257 forwardVideoRelatedActivity,
258 sendVideoRelatedActivity
259 }
260
261 // ---------------------------------------------------------------------------
262
263 async function computeFollowerUris (toFollowersOf: MActorId[], actorsException: MActorWithInboxes[], t: Transaction) {
264 const toActorFollowerIds = toFollowersOf.map(a => a.id)
265
266 const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t)
267 const sharedInboxesException = await buildSharedInboxesException(actorsException)
268
269 return result.data.filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
270 }
271
272 async function computeUris (toActors: MActor[], actorsException: MActorWithInboxes[] = []) {
273 const serverActor = await getServerActor()
274 const targetUrls = toActors
275 .filter(a => a.id !== serverActor.id) // Don't send to ourselves
276 .map(a => a.getSharedInbox())
277
278 const toActorSharedInboxesSet = new Set(targetUrls)
279
280 const sharedInboxesException = await buildSharedInboxesException(actorsException)
281 return Array.from(toActorSharedInboxesSet)
282 .filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
283 }
284
285 async function buildSharedInboxesException (actorsException: MActorWithInboxes[]) {
286 const serverActor = await getServerActor()
287
288 return actorsException
289 .map(f => f.getSharedInbox())
290 .concat([ serverActor.sharedInboxUrl ])
291 }