]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/activitypub/send/utils.ts
Process slow followers in unicast job queue
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / send / utils.ts
CommitLineData
54141398 1import { Transaction } from 'sequelize'
9db437c8 2import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
7d9ba5c0
C
3import { getServerActor } from '@server/models/application/application'
4import { ContextType } from '@shared/models/activitypub/context'
a2377d15 5import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
7d9ba5c0 6import { afterCommitIfTransaction } from '../../../helpers/database-utils'
da854ddd 7import { logger } from '../../../helpers/logger'
7d9ba5c0
C
8import { ActorModel } from '../../../models/actor/actor'
9import { ActorFollowModel } from '../../../models/actor/actor-follow'
10import { MActor, MActorId, MActorLight, MActorWithInboxes, MVideoAccountLight, MVideoId, MVideoImmutable } from '../../../types/models'
94a5ff8a 11import { JobQueue } from '../../job-queue'
a2377d15 12import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getRemoteVideoAudience } from '../audience'
9588d4f4 13
a2377d15 14async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAudience) => Activity, options: {
a1587156 15 byActor: MActorLight
2c8776fc 16 video: MVideoImmutable | MVideoAccountLight
f51c02c7 17 transaction?: Transaction
598edb8a 18 contextType?: ContextType
a2377d15 19}) {
598edb8a 20 const { byActor, video, transaction, contextType } = options
5224c394
C
21
22 const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
a2377d15
C
23
24 // Send to origin
5224c394 25 if (video.isOwned() === false) {
06c27593
C
26 let accountActor: MActorLight = (video as MVideoAccountLight).VideoChannel?.Account?.Actor
27
28 if (!accountActor) accountActor = await ActorModel.loadAccountActorByVideoId(video.id, transaction)
2c8776fc
C
29
30 const audience = getRemoteVideoAudience(accountActor, actorsInvolvedInVideo)
a2377d15
C
31 const activity = activityBuilder(audience)
32
5224c394 33 return afterCommitIfTransaction(transaction, () => {
2c8776fc 34 return unicastTo(activity, byActor, accountActor.getSharedInbox(), contextType)
2284f202 35 })
a2377d15
C
36 }
37
38 // Send to followers
39 const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
40 const activity = activityBuilder(audience)
41
5224c394 42 const actorsException = [ byActor ]
2284f202 43
598edb8a 44 return broadcastToFollowers(activity, byActor, actorsInvolvedInVideo, transaction, actorsException, contextType)
a2377d15
C
45}
46
9588d4f4
C
47async function forwardVideoRelatedActivity (
48 activity: Activity,
49 t: Transaction,
bdd428a6 50 followersException: MActorWithInboxes[],
943e5193 51 video: MVideoId
9588d4f4
C
52) {
53 // Mastodon does not add our announces in audience, so we forward to them manually
54 const additionalActors = await getActorsInvolvedInVideo(video, t)
55 const additionalFollowerUrls = additionalActors.map(a => a.followersUrl)
56
57 return forwardActivity(activity, t, followersException, additionalFollowerUrls)
58}
63c93323
C
59
60async function forwardActivity (
61 activity: Activity,
62 t: Transaction,
47581df0 63 followersException: MActorWithInboxes[] = [],
93ef8a9d 64 additionalFollowerUrls: string[] = []
63c93323 65) {
8e0fd45e
C
66 logger.info('Forwarding activity %s.', activity.id)
67
63c93323
C
68 const to = activity.to || []
69 const cc = activity.cc || []
70
93ef8a9d 71 const followersUrls = additionalFollowerUrls
63c93323
C
72 for (const dest of to.concat(cc)) {
73 if (dest.endsWith('/followers')) {
74 followersUrls.push(dest)
75 }
76 }
77
50d6de9c
C
78 const toActorFollowers = await ActorModel.listByFollowersUrls(followersUrls, t)
79 const uris = await computeFollowerUris(toActorFollowers, followersException, t)
63c93323
C
80
81 if (uris.length === 0) {
50d6de9c 82 logger.info('0 followers for %s, no forwarding.', toActorFollowers.map(a => a.id).join(', '))
df1966c9 83 return undefined
63c93323
C
84 }
85
86 logger.debug('Creating forwarding job.', { uris })
87
94a5ff8a 88 const payload = {
63c93323
C
89 uris,
90 body: activity
91 }
2284f202 92 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
63c93323 93}
54141398 94
40ff5707
C
95async function broadcastToFollowers (
96 data: any,
453e83ea
C
97 byActor: MActorId,
98 toFollowersOf: MActorId[],
40ff5707 99 t: Transaction,
598edb8a
C
100 actorsException: MActorWithInboxes[] = [],
101 contextType?: ContextType
40ff5707 102) {
c48e82b5 103 const uris = await computeFollowerUris(toFollowersOf, actorsException, t)
2284f202 104
598edb8a 105 return afterCommitIfTransaction(t, () => broadcastTo(uris, data, byActor, contextType))
93ef8a9d
C
106}
107
108async function broadcastToActors (
109 data: any,
453e83ea
C
110 byActor: MActorId,
111 toActors: MActor[],
2284f202 112 t?: Transaction,
598edb8a
C
113 actorsException: MActorWithInboxes[] = [],
114 contextType?: ContextType
93ef8a9d
C
115) {
116 const uris = await computeUris(toActors, actorsException)
598edb8a 117 return afterCommitIfTransaction(t, () => broadcastTo(uris, data, byActor, contextType))
93ef8a9d
C
118}
119
598edb8a 120function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
93ef8a9d 121 if (uris.length === 0) return undefined
54141398 122
9db437c8
C
123 const broadcastUris: string[] = []
124 const unicastUris: string[] = []
40ff5707 125
9db437c8
C
126 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
127 for (const uri of uris) {
128 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
129 unicastUris.push(uri)
130 } else {
131 broadcastUris.push(uri)
132 }
133 }
134
135 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
136
137 if (broadcastUris.length !== 0) {
138 const payload = {
139 uris: broadcastUris,
140 signatureActorId: byActor.id,
141 body: data,
142 contextType
143 }
144
145 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
54141398
C
146 }
147
9db437c8
C
148 for (const unicastUri of unicastUris) {
149 const payload = {
150 uri: unicastUri,
151 signatureActorId: byActor.id,
152 body: data,
153 contextType
154 }
155
156 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
157 }
54141398
C
158}
159
598edb8a 160function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
50d6de9c 161 logger.debug('Creating unicast job.', { uri: toActorUrl })
63c93323 162
94a5ff8a
C
163 const payload = {
164 uri: toActorUrl,
50d6de9c 165 signatureActorId: byActor.id,
598edb8a
C
166 body: data,
167 contextType
54141398
C
168 }
169
2284f202 170 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
54141398
C
171}
172
e251f170 173// ---------------------------------------------------------------------------
54141398 174
e251f170
C
175export {
176 broadcastToFollowers,
177 unicastTo,
178 forwardActivity,
9588d4f4 179 broadcastToActors,
a2377d15
C
180 forwardVideoRelatedActivity,
181 sendVideoRelatedActivity
54141398
C
182}
183
e251f170 184// ---------------------------------------------------------------------------
e12a0092 185
47581df0 186async function computeFollowerUris (toFollowersOf: MActorId[], actorsException: MActorWithInboxes[], t: Transaction) {
c48e82b5 187 const toActorFollowerIds = toFollowersOf.map(a => a.id)
63c93323 188
50d6de9c 189 const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t)
06a05d5f
C
190 const sharedInboxesException = await buildSharedInboxesException(actorsException)
191
bdd428a6 192 return result.data.filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
93ef8a9d
C
193}
194
47581df0 195async function computeUris (toActors: MActor[], actorsException: MActorWithInboxes[] = []) {
06a05d5f
C
196 const serverActor = await getServerActor()
197 const targetUrls = toActors
198 .filter(a => a.id !== serverActor.id) // Don't send to ourselves
47581df0 199 .map(a => a.getSharedInbox())
06a05d5f
C
200
201 const toActorSharedInboxesSet = new Set(targetUrls)
93ef8a9d 202
06a05d5f 203 const sharedInboxesException = await buildSharedInboxesException(actorsException)
93ef8a9d 204 return Array.from(toActorSharedInboxesSet)
bdd428a6 205 .filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
54141398 206}
06a05d5f 207
47581df0 208async function buildSharedInboxesException (actorsException: MActorWithInboxes[]) {
06a05d5f
C
209 const serverActor = await getServerActor()
210
211 return actorsException
47581df0 212 .map(f => f.getSharedInbox())
06a05d5f
C
213 .concat([ serverActor.sharedInboxUrl ])
214}