1 import { Transaction } from 'sequelize'
2 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
3 import { getServerActor } from '@server/models/application/application'
4 import { ContextType } from '@shared/models/activitypub/context'
5 import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
6 import { afterCommitIfTransaction } from '../../../helpers/database-utils'
7 import { logger } from '../../../helpers/logger'
8 import { ActorModel } from '../../../models/actor/actor'
9 import { ActorFollowModel } from '../../../models/actor/actor-follow'
10 import { MActor, MActorId, MActorLight, MActorWithInboxes, MVideoAccountLight, MVideoId, MVideoImmutable } from '../../../types/models'
11 import { JobQueue } from '../../job-queue'
12 import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getRemoteVideoAudience } from '../audience'
14 async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAudience) => Activity, options: {
16 video: MVideoImmutable | MVideoAccountLight
17 transaction?: Transaction
18 contextType?: ContextType
20 const { byActor, video, transaction, contextType } = options
22 const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
25 if (video.isOwned() === false) {
26 let accountActor: MActorLight = (video as MVideoAccountLight).VideoChannel?.Account?.Actor
28 if (!accountActor) accountActor = await ActorModel.loadAccountActorByVideoId(video.id, transaction)
30 const audience = getRemoteVideoAudience(accountActor, actorsInvolvedInVideo)
31 const activity = activityBuilder(audience)
33 return afterCommitIfTransaction(transaction, () => {
34 return unicastTo(activity, byActor, accountActor.getSharedInbox(), contextType)
39 const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
40 const activity = activityBuilder(audience)
42 const actorsException = [ byActor ]
44 return broadcastToFollowers(activity, byActor, actorsInvolvedInVideo, transaction, actorsException, contextType)
47 async function forwardVideoRelatedActivity (
50 followersException: MActorWithInboxes[],
53 // Mastodon does not add our announces in audience, so we forward to them manually
54 const additionalActors = await getActorsInvolvedInVideo(video, t)
55 const additionalFollowerUrls = additionalActors.map(a => a.followersUrl)
57 return forwardActivity(activity, t, followersException, additionalFollowerUrls)
60 async function forwardActivity (
63 followersException: MActorWithInboxes[] = [],
64 additionalFollowerUrls: string[] = []
66 logger.info('Forwarding activity %s.', activity.id)
68 const to = activity.to || []
69 const cc = activity.cc || []
71 const followersUrls = additionalFollowerUrls
72 for (const dest of to.concat(cc)) {
73 if (dest.endsWith('/followers')) {
74 followersUrls.push(dest)
78 const toActorFollowers = await ActorModel.listByFollowersUrls(followersUrls, t)
79 const uris = await computeFollowerUris(toActorFollowers, followersException, t)
81 if (uris.length === 0) {
82 logger.info('0 followers for %s, no forwarding.', toActorFollowers.map(a => a.id).join(', '))
86 logger.debug('Creating forwarding job.', { uris })
92 return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
95 async function broadcastToFollowers (
98 toFollowersOf: MActorId[],
100 actorsException: MActorWithInboxes[] = [],
101 contextType?: ContextType
103 const uris = await computeFollowerUris(toFollowersOf, actorsException, t)
105 return afterCommitIfTransaction(t, () => broadcastTo(uris, data, byActor, contextType))
108 async function broadcastToActors (
113 actorsException: MActorWithInboxes[] = [],
114 contextType?: ContextType
116 const uris = await computeUris(toActors, actorsException)
117 return afterCommitIfTransaction(t, () => broadcastTo(uris, data, byActor, contextType))
120 function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
121 if (uris.length === 0) return undefined
123 const broadcastUris: string[] = []
124 const unicastUris: string[] = []
126 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
127 for (const uri of uris) {
128 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
129 unicastUris.push(uri)
131 broadcastUris.push(uri)
135 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
137 if (broadcastUris.length !== 0) {
140 signatureActorId: byActor.id,
145 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
148 for (const unicastUri of unicastUris) {
151 signatureActorId: byActor.id,
156 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
160 function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
161 logger.debug('Creating unicast job.', { uri: toActorUrl })
165 signatureActorId: byActor.id,
170 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
173 // ---------------------------------------------------------------------------
176 broadcastToFollowers,
180 forwardVideoRelatedActivity,
181 sendVideoRelatedActivity
184 // ---------------------------------------------------------------------------
186 async function computeFollowerUris (toFollowersOf: MActorId[], actorsException: MActorWithInboxes[], t: Transaction) {
187 const toActorFollowerIds = toFollowersOf.map(a => a.id)
189 const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t)
190 const sharedInboxesException = await buildSharedInboxesException(actorsException)
192 return result.data.filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
195 async function computeUris (toActors: MActor[], actorsException: MActorWithInboxes[] = []) {
196 const serverActor = await getServerActor()
197 const targetUrls = toActors
198 .filter(a => a.id !== serverActor.id) // Don't send to ourselves
199 .map(a => a.getSharedInbox())
201 const toActorSharedInboxesSet = new Set(targetUrls)
203 const sharedInboxesException = await buildSharedInboxesException(actorsException)
204 return Array.from(toActorSharedInboxesSet)
205 .filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
208 async function buildSharedInboxesException (actorsException: MActorWithInboxes[]) {
209 const serverActor = await getServerActor()
211 return actorsException
212 .map(f => f.getSharedInbox())
213 .concat([ serverActor.sharedInboxUrl ])