aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-10-13 11:47:32 +0200
committerChocobozzz <me@florianbigard.com>2021-10-13 11:47:32 +0200
commit9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch)
tree716078fbe1506e0b0d19936f4939e9c530b3b8ab /server/lib
parente81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff)
downloadPeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz
PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst
PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip
Process slow followers in unicast job queue
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/send/utils.ts40
-rw-r--r--server/lib/activitypub/videos/refresh.ts6
-rw-r--r--server/lib/actor-follow-health-cache.ts (renamed from server/lib/files-cache/actor-follow-score-cache.ts)23
-rw-r--r--server/lib/files-cache/index.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts16
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts6
-rw-r--r--server/lib/schedulers/actor-follow-scheduler.ts14
7 files changed, 74 insertions, 33 deletions
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts
index 7cd8030e1..7729703b8 100644
--- a/server/lib/activitypub/send/utils.ts
+++ b/server/lib/activitypub/send/utils.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
2import { getServerActor } from '@server/models/application/application' 3import { getServerActor } from '@server/models/application/application'
3import { ContextType } from '@shared/models/activitypub/context' 4import { ContextType } from '@shared/models/activitypub/context'
4import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' 5import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
@@ -119,16 +120,41 @@ async function broadcastToActors (
119function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { 120function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
120 if (uris.length === 0) return undefined 121 if (uris.length === 0) return undefined
121 122
122 logger.debug('Creating broadcast job.', { uris }) 123 const broadcastUris: string[] = []
124 const unicastUris: string[] = []
123 125
124 const payload = { 126 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
125 uris, 127 for (const uri of uris) {
126 signatureActorId: byActor.id, 128 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
127 body: data, 129 unicastUris.push(uri)
128 contextType 130 } else {
131 broadcastUris.push(uri)
132 }
133 }
134
135 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
136
137 if (broadcastUris.length !== 0) {
138 const payload = {
139 uris: broadcastUris,
140 signatureActorId: byActor.id,
141 body: data,
142 contextType
143 }
144
145 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
129 } 146 }
130 147
131 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) 148 for (const unicastUri of unicastUris) {
149 const payload = {
150 uri: unicastUri,
151 signatureActorId: byActor.id,
152 body: data,
153 contextType
154 }
155
156 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
157 }
132} 158}
133 159
134function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { 160function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts
index 3af08acf4..9f952a218 100644
--- a/server/lib/activitypub/videos/refresh.ts
+++ b/server/lib/activitypub/videos/refresh.ts
@@ -1,10 +1,10 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger' 1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { PeerTubeRequestError } from '@server/helpers/requests' 2import { PeerTubeRequestError } from '@server/helpers/requests'
3import { ActorFollowScoreCache } from '@server/lib/files-cache'
4import { VideoLoadByUrlType } from '@server/lib/model-loaders' 3import { VideoLoadByUrlType } from '@server/lib/model-loaders'
5import { VideoModel } from '@server/models/video/video' 4import { VideoModel } from '@server/models/video/video'
6import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' 5import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
7import { HttpStatusCode } from '@shared/models' 6import { HttpStatusCode } from '@shared/models'
7import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
8import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' 8import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
9import { APVideoUpdater } from './updater' 9import { APVideoUpdater } from './updater'
10 10
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: {
39 39
40 await syncVideoExternalAttributes(video, videoObject, options.syncParam) 40 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
41 41
42 ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) 42 ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
43 43
44 return video 44 return video
45 } catch (err) { 45 } catch (err) {
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: {
53 53
54 logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) 54 logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
55 55
56 ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) 56 ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
57 57
58 // Don't refresh in loop 58 // Don't refresh in loop
59 await video.setAsRefreshed() 59 await video.setAsRefreshed()
diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/actor-follow-health-cache.ts
index 465080e72..ab8cc98df 100644
--- a/server/lib/files-cache/actor-follow-score-cache.ts
+++ b/server/lib/actor-follow-health-cache.ts
@@ -1,22 +1,28 @@
1import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants' 1import { ACTOR_FOLLOW_SCORE } from '../initializers/constants'
2import { logger } from '../../helpers/logger' 2import { logger } from '../helpers/logger'
3 3
4// Cache follows scores, instead of writing them too often in database 4// Cache follows scores, instead of writing them too often in database
5// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores 5// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
6class ActorFollowScoreCache { 6class ActorFollowHealthCache {
7
8 private static instance: ActorFollowHealthCache
7 9
8 private static instance: ActorFollowScoreCache
9 private pendingFollowsScore: { [ url: string ]: number } = {} 10 private pendingFollowsScore: { [ url: string ]: number } = {}
11
10 private pendingBadServer = new Set<number>() 12 private pendingBadServer = new Set<number>()
11 private pendingGoodServer = new Set<number>() 13 private pendingGoodServer = new Set<number>()
12 14
15 private badInboxes = new Set<string>()
16
13 private constructor () {} 17 private constructor () {}
14 18
15 static get Instance () { 19 static get Instance () {
16 return this.instance || (this.instance = new this()) 20 return this.instance || (this.instance = new this())
17 } 21 }
18 22
19 updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { 23 updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) {
24 this.badInboxes.clear()
25
20 if (goodInboxes.length === 0 && badInboxes.length === 0) return 26 if (goodInboxes.length === 0 && badInboxes.length === 0) return
21 27
22 logger.info( 28 logger.info(
@@ -34,9 +40,14 @@ class ActorFollowScoreCache {
34 if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 40 if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
35 41
36 this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY 42 this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
43 this.badInboxes.add(badInbox)
37 } 44 }
38 } 45 }
39 46
47 isBadInbox (inboxUrl: string) {
48 return this.badInboxes.has(inboxUrl)
49 }
50
40 addBadServerId (serverId: number) { 51 addBadServerId (serverId: number) {
41 this.pendingBadServer.add(serverId) 52 this.pendingBadServer.add(serverId)
42 } 53 }
@@ -71,5 +82,5 @@ class ActorFollowScoreCache {
71} 82}
72 83
73export { 84export {
74 ActorFollowScoreCache 85 ActorFollowHealthCache
75} 86}
diff --git a/server/lib/files-cache/index.ts b/server/lib/files-cache/index.ts
index e921d04a7..e5853f7d6 100644
--- a/server/lib/files-cache/index.ts
+++ b/server/lib/files-cache/index.ts
@@ -1,3 +1,3 @@
1export * from './actor-follow-score-cache'
2export * from './videos-preview-cache' 1export * from './videos-preview-cache'
3export * from './videos-caption-cache' 2export * from './videos-caption-cache'
3export * from './videos-torrent-cache'
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 9b0bb6574..fbf01d276 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,10 +1,10 @@
1import { map } from 'bluebird' 1import { map } from 'bluebird'
2import { Job } from 'bull' 2import { Job } from 'bull'
3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
3import { ActivitypubHttpBroadcastPayload } from '@shared/models' 4import { ActivitypubHttpBroadcastPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
5import { doRequest } from '../../../helpers/requests' 6import { doRequest } from '../../../helpers/requests'
6import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' 7import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
7import { ActorFollowScoreCache } from '../../files-cache'
8import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 8import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
9 9
10async function processActivityPubHttpBroadcast (job: Job) { 10async function processActivityPubHttpBroadcast (job: Job) {
@@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) {
25 const badUrls: string[] = [] 25 const badUrls: string[] = []
26 const goodUrls: string[] = [] 26 const goodUrls: string[] = []
27 27
28 await map(payload.uris, uri => { 28 await map(payload.uris, async uri => {
29 return doRequest(uri, options) 29 try {
30 .then(() => goodUrls.push(uri)) 30 await doRequest(uri, options)
31 .catch(() => badUrls.push(uri)) 31 goodUrls.push(uri)
32 } catch (err) {
33 logger.debug('HTTP broadcast to %s failed.', uri, { err })
34 badUrls.push(uri)
35 }
32 }, { concurrency: BROADCAST_CONCURRENCY }) 36 }, { concurrency: BROADCAST_CONCURRENCY })
33 37
34 return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) 38 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
35} 39}
36 40
37// --------------------------------------------------------------------------- 41// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 9be50837f..673583d2b 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -2,7 +2,7 @@ import { Job } from 'bull'
2import { ActivitypubHttpUnicastPayload } from '@shared/models' 2import { ActivitypubHttpUnicastPayload } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
5import { ActorFollowScoreCache } from '../../files-cache' 5import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7 7
8async function processActivityPubHttpUnicast (job: Job) { 8async function processActivityPubHttpUnicast (job: Job) {
@@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) {
23 23
24 try { 24 try {
25 await doRequest(uri, options) 25 await doRequest(uri, options)
26 ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) 26 ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
27 } catch (err) { 27 } catch (err) {
28 ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) 28 ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])
29 29
30 throw err 30 throw err
31 } 31 }
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
index 1b80316e9..a5a377573 100644
--- a/server/lib/schedulers/actor-follow-scheduler.ts
+++ b/server/lib/schedulers/actor-follow-scheduler.ts
@@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils'
2import { logger } from '../../helpers/logger' 2import { logger } from '../../helpers/logger'
3import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' 3import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { ActorFollowModel } from '../../models/actor/actor-follow' 4import { ActorFollowModel } from '../../models/actor/actor-follow'
5import { ActorFollowScoreCache } from '../files-cache' 5import { ActorFollowHealthCache } from '../actor-follow-health-cache'
6import { AbstractScheduler } from './abstract-scheduler' 6import { AbstractScheduler } from './abstract-scheduler'
7 7
8export class ActorFollowScheduler extends AbstractScheduler { 8export class ActorFollowScheduler extends AbstractScheduler {
@@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler {
22 } 22 }
23 23
24 private async processPendingScores () { 24 private async processPendingScores () {
25 const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore() 25 const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore()
26 const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds() 26 const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds()
27 const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds() 27 const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds()
28 28
29 ActorFollowScoreCache.Instance.clearPendingFollowsScore() 29 ActorFollowHealthCache.Instance.clearPendingFollowsScore()
30 ActorFollowScoreCache.Instance.clearBadFollowingServerIds() 30 ActorFollowHealthCache.Instance.clearBadFollowingServerIds()
31 ActorFollowScoreCache.Instance.clearGoodFollowingServerIds() 31 ActorFollowHealthCache.Instance.clearGoodFollowingServerIds()
32 32
33 for (const inbox of Object.keys(pendingScores)) { 33 for (const inbox of Object.keys(pendingScores)) {
34 await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) 34 await ActorFollowModel.updateScore(inbox, pendingScores[inbox])