aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-06-17 14:08:13 +0200
committerChocobozzz <me@florianbigard.com>2022-06-17 14:08:13 +0200
commitf27b7a750f1876632e84d594608d3d64ce974efc (patch)
treed7f973387b4ba1776cacbf06dcffef8a9428e9fe
parent3396e6534592865f184ee2db32a75957c42cb887 (diff)
downloadPeerTube-f27b7a750f1876632e84d594608d3d64ce974efc.tar.gz
PeerTube-f27b7a750f1876632e84d594608d3d64ce974efc.tar.zst
PeerTube-f27b7a750f1876632e84d594608d3d64ce974efc.zip
Send views in a dedicated queue
-rw-r--r--client/src/app/+admin/system/jobs/jobs.component.ts1
-rw-r--r--server/initializers/constants.ts3
-rw-r--r--server/lib/activitypub/send/send-view.ts2
-rw-r--r--server/lib/activitypub/send/shared/send-utils.ts19
-rw-r--r--server/lib/job-queue/job-queue.ts3
-rw-r--r--shared/models/server/job.model.ts1
6 files changed, 24 insertions, 5 deletions
diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts
index f7e10fd04..42f503be6 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.ts
+++ b/client/src/app/+admin/system/jobs/jobs.component.ts
@@ -25,6 +25,7 @@ export class JobsComponent extends RestTable implements OnInit {
25 25
26 'activitypub-follow', 26 'activitypub-follow',
27 'activitypub-http-broadcast', 27 'activitypub-http-broadcast',
28 'activitypub-http-broadcast-parallel',
28 'activitypub-http-fetcher', 29 'activitypub-http-fetcher',
29 'activitypub-http-unicast', 30 'activitypub-http-unicast',
30 'activitypub-refresher', 31 'activitypub-refresher',
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 6034da1c6..cd2cc33d3 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -139,6 +139,7 @@ const REMOTE_SCHEME = {
139 139
140const JOB_ATTEMPTS: { [id in JobType]: number } = { 140const JOB_ATTEMPTS: { [id in JobType]: number } = {
141 'activitypub-http-broadcast': 1, 141 'activitypub-http-broadcast': 1,
142 'activitypub-http-broadcast-parallel': 1,
142 'activitypub-http-unicast': 1, 143 'activitypub-http-unicast': 1,
143 'activitypub-http-fetcher': 2, 144 'activitypub-http-fetcher': 2,
144 'activitypub-follow': 5, 145 'activitypub-follow': 5,
@@ -159,6 +160,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
159// Excluded keys are jobs that can be configured by admins 160// Excluded keys are jobs that can be configured by admins
160const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { 161const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
161 'activitypub-http-broadcast': 1, 162 'activitypub-http-broadcast': 1,
163 'activitypub-http-broadcast-parallel': 30,
162 'activitypub-http-unicast': 10, 164 'activitypub-http-unicast': 10,
163 'activitypub-http-fetcher': 3, 165 'activitypub-http-fetcher': 3,
164 'activitypub-cleaner': 1, 166 'activitypub-cleaner': 1,
@@ -176,6 +178,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
176} 178}
177const JOB_TTL: { [id in JobType]: number } = { 179const JOB_TTL: { [id in JobType]: number } = {
178 'activitypub-http-broadcast': 60000 * 10, // 10 minutes 180 'activitypub-http-broadcast': 60000 * 10, // 10 minutes
181 'activitypub-http-broadcast-parallel': 60000 * 10, // 10 minutes
179 'activitypub-http-unicast': 60000 * 10, // 10 minutes 182 'activitypub-http-unicast': 60000 * 10, // 10 minutes
180 'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours 183 'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
181 'activitypub-follow': 60000 * 10, // 10 minutes 184 'activitypub-follow': 60000 * 10, // 10 minutes
diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts
index 25a20ec6d..bf3451603 100644
--- a/server/lib/activitypub/send/send-view.ts
+++ b/server/lib/activitypub/send/send-view.ts
@@ -26,7 +26,7 @@ async function sendView (options: {
26 return buildViewActivity({ url, byActor, video, audience, type }) 26 return buildViewActivity({ url, byActor, video, audience, type })
27 } 27 }
28 28
29 return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' }) 29 return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
30} 30}
31 31
32// --------------------------------------------------------------------------- 32// ---------------------------------------------------------------------------
diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts
index 72368c762..fcec63991 100644
--- a/server/lib/activitypub/send/shared/send-utils.ts
+++ b/server/lib/activitypub/send/shared/send-utils.ts
@@ -15,9 +15,10 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
15 byActor: MActorLight 15 byActor: MActorLight
16 video: MVideoImmutable | MVideoAccountLight 16 video: MVideoImmutable | MVideoAccountLight
17 contextType: ContextType 17 contextType: ContextType
18 parallelizable?: boolean
18 transaction?: Transaction 19 transaction?: Transaction
19}) { 20}) {
20 const { byActor, video, transaction, contextType } = options 21 const { byActor, video, transaction, contextType, parallelizable } = options
21 22
22 // Send to origin 23 // Send to origin
23 if (video.isOwned() === false) { 24 if (video.isOwned() === false) {
@@ -38,6 +39,7 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
38 toFollowersOf: actorsInvolvedInVideo, 39 toFollowersOf: actorsInvolvedInVideo,
39 transaction, 40 transaction,
40 actorsException, 41 actorsException,
42 parallelizable,
41 contextType 43 contextType
42 }) 44 })
43} 45}
@@ -130,9 +132,10 @@ async function broadcastToFollowers (options: {
130 transaction: Transaction 132 transaction: Transaction
131 contextType: ContextType 133 contextType: ContextType
132 134
135 parallelizable?: boolean
133 actorsException?: MActorWithInboxes[] 136 actorsException?: MActorWithInboxes[]
134}) { 137}) {
135 const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options 138 const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
136 139
137 const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction) 140 const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
138 141
@@ -141,6 +144,7 @@ async function broadcastToFollowers (options: {
141 uris, 144 uris,
142 data, 145 data,
143 byActor, 146 byActor,
147 parallelizable,
144 contextType 148 contextType
145 }) 149 })
146 }) 150 })
@@ -173,8 +177,9 @@ function broadcastTo (options: {
173 data: any 177 data: any
174 byActor: MActorId 178 byActor: MActorId
175 contextType: ContextType 179 contextType: ContextType
180 parallelizable?: boolean // default to false
176}) { 181}) {
177 const { uris, data, byActor, contextType } = options 182 const { uris, data, byActor, contextType, parallelizable } = options
178 183
179 if (uris.length === 0) return undefined 184 if (uris.length === 0) return undefined
180 185
@@ -200,7 +205,13 @@ function broadcastTo (options: {
200 contextType 205 contextType
201 } 206 }
202 207
203 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) 208 JobQueue.Instance.createJob({
209 type: parallelizable
210 ? 'activitypub-http-broadcast-parallel'
211 : 'activitypub-http-broadcast',
212
213 payload
214 })
204 } 215 }
205 216
206 for (const unicastUri of unicastUris) { 217 for (const unicastUri of unicastUris) {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index d3776c3bf..f339e9135 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -43,6 +43,7 @@ import { processVideosViewsStats } from './handlers/video-views-stats'
43 43
44type CreateJobArgument = 44type CreateJobArgument =
45 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 45 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
46 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
46 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | 47 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
47 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | 48 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
48 { type: 'activitypub-http-cleaner', payload: {} } | 49 { type: 'activitypub-http-cleaner', payload: {} } |
@@ -68,6 +69,7 @@ export type CreateJobOptions = {
68 69
69const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { 70const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
70 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 71 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
72 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
71 'activitypub-http-unicast': processActivityPubHttpUnicast, 73 'activitypub-http-unicast': processActivityPubHttpUnicast,
72 'activitypub-http-fetcher': processActivityPubHttpFetcher, 74 'activitypub-http-fetcher': processActivityPubHttpFetcher,
73 'activitypub-cleaner': processActivityPubCleaner, 75 'activitypub-cleaner': processActivityPubCleaner,
@@ -89,6 +91,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
89const jobTypes: JobType[] = [ 91const jobTypes: JobType[] = [
90 'activitypub-follow', 92 'activitypub-follow',
91 'activitypub-http-broadcast', 93 'activitypub-http-broadcast',
94 'activitypub-http-broadcast-parallel',
92 'activitypub-http-fetcher', 95 'activitypub-http-fetcher',
93 'activitypub-http-unicast', 96 'activitypub-http-unicast',
94 'activitypub-cleaner', 97 'activitypub-cleaner',
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts
index 073f15872..4633ab769 100644
--- a/shared/models/server/job.model.ts
+++ b/shared/models/server/job.model.ts
@@ -9,6 +9,7 @@ export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
9export type JobType = 9export type JobType =
10 | 'activitypub-http-unicast' 10 | 'activitypub-http-unicast'
11 | 'activitypub-http-broadcast' 11 | 'activitypub-http-broadcast'
12 | 'activitypub-http-broadcast-parallel'
12 | 'activitypub-http-fetcher' 13 | 'activitypub-http-fetcher'
13 | 'activitypub-cleaner' 14 | 'activitypub-cleaner'
14 | 'activitypub-follow' 15 | 'activitypub-follow'