1 import { Transaction } from 'sequelize'
2 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
3 import { getServerActor } from '@server/models/application/application'
4 import { Activity, ActivityAudience, ActivitypubHttpBroadcastPayload } from '@shared/models'
5 import { ContextType } from '@shared/models/activitypub/context'
6 import { afterCommitIfTransaction } from '../../../../helpers/database-utils'
7 import { logger } from '../../../../helpers/logger'
8 import { ActorModel } from '../../../../models/actor/actor'
9 import { ActorFollowModel } from '../../../../models/actor/actor-follow'
10 import { MActor, MActorId, MActorLight, MActorWithInboxes, MVideoAccountLight, MVideoId, MVideoImmutable } from '../../../../types/models'
11 import { JobQueue } from '../../../job-queue'
12 import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getOriginVideoAudience } from './audience-utils'
14 async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAudience) => Activity, options: {
16 video: MVideoImmutable | MVideoAccountLight
17 contextType: ContextType
18 parallelizable?: boolean
19 transaction?: Transaction
21 const { byActor, video, transaction, contextType, parallelizable } = options
24 if (video.isOwned() === false) {
25 return sendVideoActivityToOrigin(activityBuilder, options)
28 const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
31 const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
32 const activity = activityBuilder(audience)
34 const actorsException = [ byActor ]
36 return broadcastToFollowers({
39 toFollowersOf: actorsInvolvedInVideo,
47 async function sendVideoActivityToOrigin (activityBuilder: (audience: ActivityAudience) => Activity, options: {
49 video: MVideoImmutable | MVideoAccountLight
50 contextType: ContextType
52 actorsInvolvedInVideo?: MActorLight[]
53 transaction?: Transaction
55 const { byActor, video, actorsInvolvedInVideo, transaction, contextType } = options
57 if (video.isOwned()) throw new Error('Cannot send activity to owned video origin ' + video.url)
59 let accountActor: MActorLight = (video as MVideoAccountLight).VideoChannel?.Account?.Actor
60 if (!accountActor) accountActor = await ActorModel.loadAccountActorByVideoId(video.id, transaction)
62 const audience = getOriginVideoAudience(accountActor, actorsInvolvedInVideo)
63 const activity = activityBuilder(audience)
65 return afterCommitIfTransaction(transaction, () => {
69 toActorUrl: accountActor.getSharedInbox(),
75 // ---------------------------------------------------------------------------
77 async function forwardVideoRelatedActivity (
80 followersException: MActorWithInboxes[],
83 // Mastodon does not add our announces in audience, so we forward to them manually
84 const additionalActors = await getActorsInvolvedInVideo(video, t)
85 const additionalFollowerUrls = additionalActors.map(a => a.followersUrl)
87 return forwardActivity(activity, t, followersException, additionalFollowerUrls)
90 async function forwardActivity (
93 followersException: MActorWithInboxes[] = [],
94 additionalFollowerUrls: string[] = []
96 logger.info('Forwarding activity %s.', activity.id)
98 const to = activity.to || []
99 const cc = activity.cc || []
101 const followersUrls = additionalFollowerUrls
102 for (const dest of to.concat(cc)) {
103 if (dest.endsWith('/followers')) {
104 followersUrls.push(dest)
108 const toActorFollowers = await ActorModel.listByFollowersUrls(followersUrls, t)
109 const uris = await computeFollowerUris(toActorFollowers, followersException, t)
111 if (uris.length === 0) {
112 logger.info('0 followers for %s, no forwarding.', toActorFollowers.map(a => a.id).join(', '))
116 logger.debug('Creating forwarding job.', { uris })
118 const payload: ActivitypubHttpBroadcastPayload = {
123 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
126 // ---------------------------------------------------------------------------
128 async function broadcastToFollowers (options: {
131 toFollowersOf: MActorId[]
132 transaction: Transaction
133 contextType: ContextType
135 parallelizable?: boolean
136 actorsException?: MActorWithInboxes[]
138 const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
140 const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
142 return afterCommitIfTransaction(transaction, () => {
153 async function broadcastToActors (options: {
157 transaction: Transaction
158 contextType: ContextType
159 actorsException?: MActorWithInboxes[]
161 const { data, byActor, toActors, transaction, contextType, actorsException = [] } = options
163 const uris = await computeUris(toActors, actorsException)
165 return afterCommitIfTransaction(transaction, () => {
175 function broadcastTo (options: {
179 contextType: ContextType
180 parallelizable?: boolean // default to false
182 const { uris, data, byActor, contextType, parallelizable } = options
184 if (uris.length === 0) return undefined
186 const broadcastUris: string[] = []
187 const unicastUris: string[] = []
189 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
190 for (const uri of uris) {
191 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
192 unicastUris.push(uri)
194 broadcastUris.push(uri)
198 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
200 if (broadcastUris.length !== 0) {
203 signatureActorId: byActor.id,
208 JobQueue.Instance.createJob({
210 ? 'activitypub-http-broadcast-parallel'
211 : 'activitypub-http-broadcast',
217 for (const unicastUri of unicastUris) {
220 signatureActorId: byActor.id,
225 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
229 function unicastTo (options: {
233 contextType: ContextType
235 const { data, byActor, toActorUrl, contextType } = options
237 logger.debug('Creating unicast job.', { uri: toActorUrl })
241 signatureActorId: byActor.id,
246 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
249 // ---------------------------------------------------------------------------
252 broadcastToFollowers,
256 sendVideoActivityToOrigin,
257 forwardVideoRelatedActivity,
258 sendVideoRelatedActivity
261 // ---------------------------------------------------------------------------
263 async function computeFollowerUris (toFollowersOf: MActorId[], actorsException: MActorWithInboxes[], t: Transaction) {
264 const toActorFollowerIds = toFollowersOf.map(a => a.id)
266 const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t)
267 const sharedInboxesException = await buildSharedInboxesException(actorsException)
269 return result.data.filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
272 async function computeUris (toActors: MActor[], actorsException: MActorWithInboxes[] = []) {
273 const serverActor = await getServerActor()
274 const targetUrls = toActors
275 .filter(a => a.id !== serverActor.id) // Don't send to ourselves
276 .map(a => a.getSharedInbox())
278 const toActorSharedInboxesSet = new Set(targetUrls)
280 const sharedInboxesException = await buildSharedInboxesException(actorsException)
281 return Array.from(toActorSharedInboxesSet)
282 .filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
285 async function buildSharedInboxesException (actorsException: MActorWithInboxes[]) {
286 const serverActor = await getServerActor()
288 return actorsException
289 .map(f => f.getSharedInbox())
290 .concat([ serverActor.sharedInboxUrl ])