'activitypub-follow',
'activitypub-http-broadcast',
+ 'activitypub-http-broadcast-parallel',
'activitypub-http-fetcher',
'activitypub-http-unicast',
'activitypub-refresher',
const JOB_ATTEMPTS: { [id in JobType]: number } = {
'activitypub-http-broadcast': 1,
+ 'activitypub-http-broadcast-parallel': 1,
'activitypub-http-unicast': 1,
'activitypub-http-fetcher': 2,
'activitypub-follow': 5,
// Excluded keys are jobs that can be configured by admins
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
'activitypub-http-broadcast': 1,
+ 'activitypub-http-broadcast-parallel': 30,
'activitypub-http-unicast': 10,
'activitypub-http-fetcher': 3,
'activitypub-cleaner': 1,
}
const JOB_TTL: { [id in JobType]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
+ 'activitypub-http-broadcast-parallel': 60000 * 10, // 10 minutes
'activitypub-http-unicast': 60000 * 10, // 10 minutes
'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
'activitypub-follow': 60000 * 10, // 10 minutes
return buildViewActivity({ url, byActor, video, audience, type })
}
- return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' })
+ return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
}
// ---------------------------------------------------------------------------
byActor: MActorLight
video: MVideoImmutable | MVideoAccountLight
contextType: ContextType
+ parallelizable?: boolean
transaction?: Transaction
}) {
- const { byActor, video, transaction, contextType } = options
+ const { byActor, video, transaction, contextType, parallelizable } = options
// Send to origin
if (video.isOwned() === false) {
toFollowersOf: actorsInvolvedInVideo,
transaction,
actorsException,
+ parallelizable,
contextType
})
}
transaction: Transaction
contextType: ContextType
+ parallelizable?: boolean
actorsException?: MActorWithInboxes[]
}) {
- const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options
+ const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
uris,
data,
byActor,
+ parallelizable,
contextType
})
})
data: any
byActor: MActorId
contextType: ContextType
+ parallelizable?: boolean // default to false
}) {
- const { uris, data, byActor, contextType } = options
+ const { uris, data, byActor, contextType, parallelizable } = options
if (uris.length === 0) return undefined
contextType
}
- JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
+ JobQueue.Instance.createJob({
+ type: parallelizable
+ ? 'activitypub-http-broadcast-parallel'
+ : 'activitypub-http-broadcast',
+
+ payload
+ })
}
for (const unicastUri of unicastUris) {
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
+ { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
{ type: 'activitypub-http-cleaner', payload: {} } |
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
+ 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
const jobTypes: JobType[] = [
'activitypub-follow',
'activitypub-http-broadcast',
+ 'activitypub-http-broadcast-parallel',
'activitypub-http-fetcher',
'activitypub-http-unicast',
'activitypub-cleaner',
export type JobType =
| 'activitypub-http-unicast'
| 'activitypub-http-broadcast'
+ | 'activitypub-http-broadcast-parallel'
| 'activitypub-http-fetcher'
| 'activitypub-cleaner'
| 'activitypub-follow'