]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Send views in a dedicated queue
authorChocobozzz <me@florianbigard.com>
Fri, 17 Jun 2022 12:08:13 +0000 (14:08 +0200)
committerChocobozzz <me@florianbigard.com>
Fri, 17 Jun 2022 12:08:13 +0000 (14:08 +0200)
client/src/app/+admin/system/jobs/jobs.component.ts
server/initializers/constants.ts
server/lib/activitypub/send/send-view.ts
server/lib/activitypub/send/shared/send-utils.ts
server/lib/job-queue/job-queue.ts
shared/models/server/job.model.ts

index f7e10fd042a968dcff62753849590c992ede41ba..42f503be6c90ee1559ae552df2a9c6d9b362cf6b 100644 (file)
@@ -25,6 +25,7 @@ export class JobsComponent extends RestTable implements OnInit {
 
     'activitypub-follow',
     'activitypub-http-broadcast',
+    'activitypub-http-broadcast-parallel',
     'activitypub-http-fetcher',
     'activitypub-http-unicast',
     'activitypub-refresher',
index 6034da1c6a95da46a1d5487827b99cc418cd842d..cd2cc33d312703fd086eef6ae9218b6690b06d82 100644 (file)
@@ -139,6 +139,7 @@ const REMOTE_SCHEME = {
 
 const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'activitypub-http-broadcast': 1,
+  'activitypub-http-broadcast-parallel': 1,
   'activitypub-http-unicast': 1,
   'activitypub-http-fetcher': 2,
   'activitypub-follow': 5,
@@ -159,6 +160,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
 // Excluded keys are jobs that can be configured by admins
 const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
   'activitypub-http-broadcast': 1,
+  'activitypub-http-broadcast-parallel': 30,
   'activitypub-http-unicast': 10,
   'activitypub-http-fetcher': 3,
   'activitypub-cleaner': 1,
@@ -176,6 +178,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
 }
 const JOB_TTL: { [id in JobType]: number } = {
   'activitypub-http-broadcast': 60000 * 10, // 10 minutes
+  'activitypub-http-broadcast-parallel': 60000 * 10, // 10 minutes
   'activitypub-http-unicast': 60000 * 10, // 10 minutes
   'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
   'activitypub-follow': 60000 * 10, // 10 minutes
index 25a20ec6decad3a31d7cf7514bd3cc6c619ccf95..bf3451603d8c7a3f83a703de48828a27578b3e5e 100644 (file)
@@ -26,7 +26,7 @@ async function sendView (options: {
     return buildViewActivity({ url, byActor, video, audience, type })
   }
 
-  return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' })
+  return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
 }
 
 // ---------------------------------------------------------------------------
index 72368c7624e1fea7cb869bdb4d178fa57c59bf03..fcec63991a9a62f3eaf1ecb77ad8ec3bc405331f 100644 (file)
@@ -15,9 +15,10 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
   byActor: MActorLight
   video: MVideoImmutable | MVideoAccountLight
   contextType: ContextType
+  parallelizable?: boolean
   transaction?: Transaction
 }) {
-  const { byActor, video, transaction, contextType } = options
+  const { byActor, video, transaction, contextType, parallelizable } = options
 
   // Send to origin
   if (video.isOwned() === false) {
@@ -38,6 +39,7 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
     toFollowersOf: actorsInvolvedInVideo,
     transaction,
     actorsException,
+    parallelizable,
     contextType
   })
 }
@@ -130,9 +132,10 @@ async function broadcastToFollowers (options: {
   transaction: Transaction
   contextType: ContextType
 
+  parallelizable?: boolean
   actorsException?: MActorWithInboxes[]
 }) {
-  const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options
+  const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
 
   const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
 
@@ -141,6 +144,7 @@ async function broadcastToFollowers (options: {
       uris,
       data,
       byActor,
+      parallelizable,
       contextType
     })
   })
@@ -173,8 +177,9 @@ function broadcastTo (options: {
   data: any
   byActor: MActorId
   contextType: ContextType
+  parallelizable?: boolean // default to false
 }) {
-  const { uris, data, byActor, contextType } = options
+  const { uris, data, byActor, contextType, parallelizable } = options
 
   if (uris.length === 0) return undefined
 
@@ -200,7 +205,13 @@ function broadcastTo (options: {
       contextType
     }
 
-    JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
+    JobQueue.Instance.createJob({
+      type: parallelizable
+        ? 'activitypub-http-broadcast-parallel'
+        : 'activitypub-http-broadcast',
+
+      payload
+    })
   }
 
   for (const unicastUri of unicastUris) {
index d3776c3bfd6b72d373fd5c97e88e7f87cc0853f9..f339e913500e6ff67fc72ee6a33faaa789cf2b1c 100644 (file)
@@ -43,6 +43,7 @@ import { processVideosViewsStats } from './handlers/video-views-stats'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
+  { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
   { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
   { type: 'activitypub-http-cleaner', payload: {} } |
@@ -68,6 +69,7 @@ export type CreateJobOptions = {
 
 const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
+  'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
   'activitypub-http-fetcher': processActivityPubHttpFetcher,
   'activitypub-cleaner': processActivityPubCleaner,
@@ -89,6 +91,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
 const jobTypes: JobType[] = [
   'activitypub-follow',
   'activitypub-http-broadcast',
+  'activitypub-http-broadcast-parallel',
   'activitypub-http-fetcher',
   'activitypub-http-unicast',
   'activitypub-cleaner',
index 073f1587291e068333ac38dd5da69f731a7abc3e..4633ab7696fa2ec11ad486b948865856c62ae023 100644 (file)
@@ -9,6 +9,7 @@ export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
 export type JobType =
   | 'activitypub-http-unicast'
   | 'activitypub-http-broadcast'
+  | 'activitypub-http-broadcast-parallel'
   | 'activitypub-http-fetcher'
   | 'activitypub-cleaner'
   | 'activitypub-follow'