aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/initializers/constants.ts8
-rw-r--r--server/lib/activitypub/send/utils.ts40
-rw-r--r--server/lib/activitypub/videos/refresh.ts6
-rw-r--r--server/lib/actor-follow-health-cache.ts (renamed from server/lib/files-cache/actor-follow-score-cache.ts)23
-rw-r--r--server/lib/files-cache/index.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts16
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts6
-rw-r--r--server/lib/schedulers/actor-follow-scheduler.ts14
-rw-r--r--server/tests/api/server/index.ts1
-rw-r--r--server/tests/api/server/slow-follows.ts81
10 files changed, 160 insertions, 37 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index facd3b721..9896e1efb 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -134,9 +134,9 @@ const REMOTE_SCHEME = {
134} 134}
135 135
136const JOB_ATTEMPTS: { [id in JobType]: number } = { 136const JOB_ATTEMPTS: { [id in JobType]: number } = {
137 'activitypub-http-broadcast': 5, 137 'activitypub-http-broadcast': 1,
138 'activitypub-http-unicast': 5, 138 'activitypub-http-unicast': 1,
139 'activitypub-http-fetcher': 5, 139 'activitypub-http-fetcher': 2,
140 'activitypub-follow': 5, 140 'activitypub-follow': 5,
141 'activitypub-cleaner': 1, 141 'activitypub-cleaner': 1,
142 'video-file-import': 1, 142 'video-file-import': 1,
@@ -153,7 +153,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
153// Excluded keys are jobs that can be configured by admins 153// Excluded keys are jobs that can be configured by admins
154const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { 154const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
155 'activitypub-http-broadcast': 1, 155 'activitypub-http-broadcast': 1,
156 'activitypub-http-unicast': 5, 156 'activitypub-http-unicast': 10,
157 'activitypub-http-fetcher': 3, 157 'activitypub-http-fetcher': 3,
158 'activitypub-cleaner': 1, 158 'activitypub-cleaner': 1,
159 'activitypub-follow': 1, 159 'activitypub-follow': 1,
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts
index 7cd8030e1..7729703b8 100644
--- a/server/lib/activitypub/send/utils.ts
+++ b/server/lib/activitypub/send/utils.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
2import { getServerActor } from '@server/models/application/application' 3import { getServerActor } from '@server/models/application/application'
3import { ContextType } from '@shared/models/activitypub/context' 4import { ContextType } from '@shared/models/activitypub/context'
4import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' 5import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
@@ -119,16 +120,41 @@ async function broadcastToActors (
119function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { 120function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
120 if (uris.length === 0) return undefined 121 if (uris.length === 0) return undefined
121 122
122 logger.debug('Creating broadcast job.', { uris }) 123 const broadcastUris: string[] = []
124 const unicastUris: string[] = []
123 125
124 const payload = { 126 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
125 uris, 127 for (const uri of uris) {
126 signatureActorId: byActor.id, 128 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
127 body: data, 129 unicastUris.push(uri)
128 contextType 130 } else {
131 broadcastUris.push(uri)
132 }
133 }
134
135 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
136
137 if (broadcastUris.length !== 0) {
138 const payload = {
139 uris: broadcastUris,
140 signatureActorId: byActor.id,
141 body: data,
142 contextType
143 }
144
145 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
129 } 146 }
130 147
131 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) 148 for (const unicastUri of unicastUris) {
149 const payload = {
150 uri: unicastUri,
151 signatureActorId: byActor.id,
152 body: data,
153 contextType
154 }
155
156 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
157 }
132} 158}
133 159
134function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { 160function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts
index 3af08acf4..9f952a218 100644
--- a/server/lib/activitypub/videos/refresh.ts
+++ b/server/lib/activitypub/videos/refresh.ts
@@ -1,10 +1,10 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger' 1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { PeerTubeRequestError } from '@server/helpers/requests' 2import { PeerTubeRequestError } from '@server/helpers/requests'
3import { ActorFollowScoreCache } from '@server/lib/files-cache'
4import { VideoLoadByUrlType } from '@server/lib/model-loaders' 3import { VideoLoadByUrlType } from '@server/lib/model-loaders'
5import { VideoModel } from '@server/models/video/video' 4import { VideoModel } from '@server/models/video/video'
6import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' 5import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
7import { HttpStatusCode } from '@shared/models' 6import { HttpStatusCode } from '@shared/models'
7import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
8import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' 8import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
9import { APVideoUpdater } from './updater' 9import { APVideoUpdater } from './updater'
10 10
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: {
39 39
40 await syncVideoExternalAttributes(video, videoObject, options.syncParam) 40 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
41 41
42 ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) 42 ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
43 43
44 return video 44 return video
45 } catch (err) { 45 } catch (err) {
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: {
53 53
54 logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) 54 logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
55 55
56 ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) 56 ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
57 57
58 // Don't refresh in loop 58 // Don't refresh in loop
59 await video.setAsRefreshed() 59 await video.setAsRefreshed()
diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/actor-follow-health-cache.ts
index 465080e72..ab8cc98df 100644
--- a/server/lib/files-cache/actor-follow-score-cache.ts
+++ b/server/lib/actor-follow-health-cache.ts
@@ -1,22 +1,28 @@
1import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants' 1import { ACTOR_FOLLOW_SCORE } from '../initializers/constants'
2import { logger } from '../../helpers/logger' 2import { logger } from '../helpers/logger'
3 3
4// Cache follows scores, instead of writing them too often in database 4// Cache follows scores, instead of writing them too often in database
5// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores 5// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
6class ActorFollowScoreCache { 6class ActorFollowHealthCache {
7
8 private static instance: ActorFollowHealthCache
7 9
8 private static instance: ActorFollowScoreCache
9 private pendingFollowsScore: { [ url: string ]: number } = {} 10 private pendingFollowsScore: { [ url: string ]: number } = {}
11
10 private pendingBadServer = new Set<number>() 12 private pendingBadServer = new Set<number>()
11 private pendingGoodServer = new Set<number>() 13 private pendingGoodServer = new Set<number>()
12 14
15 private badInboxes = new Set<string>()
16
13 private constructor () {} 17 private constructor () {}
14 18
15 static get Instance () { 19 static get Instance () {
16 return this.instance || (this.instance = new this()) 20 return this.instance || (this.instance = new this())
17 } 21 }
18 22
19 updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { 23 updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) {
24 this.badInboxes.clear()
25
20 if (goodInboxes.length === 0 && badInboxes.length === 0) return 26 if (goodInboxes.length === 0 && badInboxes.length === 0) return
21 27
22 logger.info( 28 logger.info(
@@ -34,9 +40,14 @@ class ActorFollowScoreCache {
34 if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 40 if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
35 41
36 this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY 42 this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
43 this.badInboxes.add(badInbox)
37 } 44 }
38 } 45 }
39 46
47 isBadInbox (inboxUrl: string) {
48 return this.badInboxes.has(inboxUrl)
49 }
50
40 addBadServerId (serverId: number) { 51 addBadServerId (serverId: number) {
41 this.pendingBadServer.add(serverId) 52 this.pendingBadServer.add(serverId)
42 } 53 }
@@ -71,5 +82,5 @@ class ActorFollowScoreCache {
71} 82}
72 83
73export { 84export {
74 ActorFollowScoreCache 85 ActorFollowHealthCache
75} 86}
diff --git a/server/lib/files-cache/index.ts b/server/lib/files-cache/index.ts
index e921d04a7..e5853f7d6 100644
--- a/server/lib/files-cache/index.ts
+++ b/server/lib/files-cache/index.ts
@@ -1,3 +1,3 @@
1export * from './actor-follow-score-cache'
2export * from './videos-preview-cache' 1export * from './videos-preview-cache'
3export * from './videos-caption-cache' 2export * from './videos-caption-cache'
3export * from './videos-torrent-cache'
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 9b0bb6574..fbf01d276 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,10 +1,10 @@
1import { map } from 'bluebird' 1import { map } from 'bluebird'
2import { Job } from 'bull' 2import { Job } from 'bull'
3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
3import { ActivitypubHttpBroadcastPayload } from '@shared/models' 4import { ActivitypubHttpBroadcastPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 5import { logger } from '../../../helpers/logger'
5import { doRequest } from '../../../helpers/requests' 6import { doRequest } from '../../../helpers/requests'
6import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' 7import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
7import { ActorFollowScoreCache } from '../../files-cache'
8import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 8import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
9 9
10async function processActivityPubHttpBroadcast (job: Job) { 10async function processActivityPubHttpBroadcast (job: Job) {
@@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) {
25 const badUrls: string[] = [] 25 const badUrls: string[] = []
26 const goodUrls: string[] = [] 26 const goodUrls: string[] = []
27 27
28 await map(payload.uris, uri => { 28 await map(payload.uris, async uri => {
29 return doRequest(uri, options) 29 try {
30 .then(() => goodUrls.push(uri)) 30 await doRequest(uri, options)
31 .catch(() => badUrls.push(uri)) 31 goodUrls.push(uri)
32 } catch (err) {
33 logger.debug('HTTP broadcast to %s failed.', uri, { err })
34 badUrls.push(uri)
35 }
32 }, { concurrency: BROADCAST_CONCURRENCY }) 36 }, { concurrency: BROADCAST_CONCURRENCY })
33 37
34 return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) 38 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
35} 39}
36 40
37// --------------------------------------------------------------------------- 41// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 9be50837f..673583d2b 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -2,7 +2,7 @@ import { Job } from 'bull'
2import { ActivitypubHttpUnicastPayload } from '@shared/models' 2import { ActivitypubHttpUnicastPayload } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
5import { ActorFollowScoreCache } from '../../files-cache' 5import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7 7
8async function processActivityPubHttpUnicast (job: Job) { 8async function processActivityPubHttpUnicast (job: Job) {
@@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) {
23 23
24 try { 24 try {
25 await doRequest(uri, options) 25 await doRequest(uri, options)
26 ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) 26 ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
27 } catch (err) { 27 } catch (err) {
28 ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) 28 ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])
29 29
30 throw err 30 throw err
31 } 31 }
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
index 1b80316e9..a5a377573 100644
--- a/server/lib/schedulers/actor-follow-scheduler.ts
+++ b/server/lib/schedulers/actor-follow-scheduler.ts
@@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils'
2import { logger } from '../../helpers/logger' 2import { logger } from '../../helpers/logger'
3import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' 3import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { ActorFollowModel } from '../../models/actor/actor-follow' 4import { ActorFollowModel } from '../../models/actor/actor-follow'
5import { ActorFollowScoreCache } from '../files-cache' 5import { ActorFollowHealthCache } from '../actor-follow-health-cache'
6import { AbstractScheduler } from './abstract-scheduler' 6import { AbstractScheduler } from './abstract-scheduler'
7 7
8export class ActorFollowScheduler extends AbstractScheduler { 8export class ActorFollowScheduler extends AbstractScheduler {
@@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler {
22 } 22 }
23 23
24 private async processPendingScores () { 24 private async processPendingScores () {
25 const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore() 25 const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore()
26 const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds() 26 const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds()
27 const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds() 27 const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds()
28 28
29 ActorFollowScoreCache.Instance.clearPendingFollowsScore() 29 ActorFollowHealthCache.Instance.clearPendingFollowsScore()
30 ActorFollowScoreCache.Instance.clearBadFollowingServerIds() 30 ActorFollowHealthCache.Instance.clearBadFollowingServerIds()
31 ActorFollowScoreCache.Instance.clearGoodFollowingServerIds() 31 ActorFollowHealthCache.Instance.clearGoodFollowingServerIds()
32 32
33 for (const inbox of Object.keys(pendingScores)) { 33 for (const inbox of Object.keys(pendingScores)) {
34 await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) 34 await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
diff --git a/server/tests/api/server/index.ts b/server/tests/api/server/index.ts
index b16a22ee7..8136fc3c6 100644
--- a/server/tests/api/server/index.ts
+++ b/server/tests/api/server/index.ts
@@ -11,6 +11,7 @@ import './jobs'
11import './logs' 11import './logs'
12import './reverse-proxy' 12import './reverse-proxy'
13import './services' 13import './services'
14import './slow-follows'
14import './stats' 15import './stats'
15import './tracker' 16import './tracker'
16import './no-client' 17import './no-client'
diff --git a/server/tests/api/server/slow-follows.ts b/server/tests/api/server/slow-follows.ts
new file mode 100644
index 000000000..2bef0c9f2
--- /dev/null
+++ b/server/tests/api/server/slow-follows.ts
@@ -0,0 +1,81 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import 'mocha'
4import * as chai from 'chai'
5import { cleanupTests, createMultipleServers, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/extra-utils'
6import { Job } from '@shared/models'
7
8const expect = chai.expect
9
10describe('Test slow follows', function () {
11 let servers: PeerTubeServer[] = []
12
13 let afterFollows: Date
14
15 before(async function () {
16 this.timeout(60000)
17
18 servers = await createMultipleServers(3)
19
20 // Get the access tokens
21 await setAccessTokensToServers(servers)
22
23 await doubleFollow(servers[0], servers[1])
24 await doubleFollow(servers[0], servers[2])
25
26 afterFollows = new Date()
27
28 for (let i = 0; i < 5; i++) {
29 await servers[0].videos.quickUpload({ name: 'video ' + i })
30 }
31
32 await waitJobs(servers)
33 })
34
35 it('Should only have broadcast jobs', async function () {
36 const { data } = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
37
38 for (const job of data) {
39 expect(new Date(job.createdAt)).below(afterFollows)
40 }
41 })
42
43 it('Should process bad follower', async function () {
44 this.timeout(30000)
45
46 await servers[1].kill()
47
48 // Set server 2 as bad follower
49 await servers[0].videos.quickUpload({ name: 'video 6' })
50 await waitJobs(servers[0])
51
52 afterFollows = new Date()
53 const filter = (job: Job) => new Date(job.createdAt) > afterFollows
54
55 // Resend another broadcast job
56 await servers[0].videos.quickUpload({ name: 'video 7' })
57 await waitJobs(servers[0])
58
59 const resBroadcast = await servers[0].jobs.list({ jobType: 'activitypub-http-broadcast', sort: '-createdAt' })
60 const resUnicast = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
61
62 const broadcast = resBroadcast.data.filter(filter)
63 const unicast = resUnicast.data.filter(filter)
64
65 expect(unicast).to.have.lengthOf(2)
66 expect(broadcast).to.have.lengthOf(2)
67
68 for (const u of unicast) {
69 expect(u.data.uri).to.equal(servers[1].url + '/inbox')
70 }
71
72 for (const b of broadcast) {
73 expect(b.data.uris).to.have.lengthOf(1)
74 expect(b.data.uris[0]).to.equal(servers[2].url + '/inbox')
75 }
76 })
77
78 after(async function () {
79 await cleanupTests(servers)
80 })
81})