diff options
author | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
commit | 9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch) | |
tree | 716078fbe1506e0b0d19936f4939e9c530b3b8ab /server/lib | |
parent | e81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff) | |
download | PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip |
Process slow followers in unicast job queue
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/send/utils.ts | 40 | ||||
-rw-r--r-- | server/lib/activitypub/videos/refresh.ts | 6 | ||||
-rw-r--r-- | server/lib/actor-follow-health-cache.ts (renamed from server/lib/files-cache/actor-follow-score-cache.ts) | 23 | ||||
-rw-r--r-- | server/lib/files-cache/index.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 16 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 6 | ||||
-rw-r--r-- | server/lib/schedulers/actor-follow-scheduler.ts | 14 |
7 files changed, 74 insertions, 33 deletions
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts index 7cd8030e1..7729703b8 100644 --- a/server/lib/activitypub/send/utils.ts +++ b/server/lib/activitypub/send/utils.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
2 | import { getServerActor } from '@server/models/application/application' | 3 | import { getServerActor } from '@server/models/application/application' |
3 | import { ContextType } from '@shared/models/activitypub/context' | 4 | import { ContextType } from '@shared/models/activitypub/context' |
4 | import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' | 5 | import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' |
@@ -119,16 +120,41 @@ async function broadcastToActors ( | |||
119 | function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { | 120 | function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { |
120 | if (uris.length === 0) return undefined | 121 | if (uris.length === 0) return undefined |
121 | 122 | ||
122 | logger.debug('Creating broadcast job.', { uris }) | 123 | const broadcastUris: string[] = [] |
124 | const unicastUris: string[] = [] | ||
123 | 125 | ||
124 | const payload = { | 126 | // Bad URIs could be slow to respond, prefer to process them in a dedicated queue |
125 | uris, | 127 | for (const uri of uris) { |
126 | signatureActorId: byActor.id, | 128 | if (ActorFollowHealthCache.Instance.isBadInbox(uri)) { |
127 | body: data, | 129 | unicastUris.push(uri) |
128 | contextType | 130 | } else { |
131 | broadcastUris.push(uri) | ||
132 | } | ||
133 | } | ||
134 | |||
135 | logger.debug('Creating broadcast job.', { broadcastUris, unicastUris }) | ||
136 | |||
137 | if (broadcastUris.length !== 0) { | ||
138 | const payload = { | ||
139 | uris: broadcastUris, | ||
140 | signatureActorId: byActor.id, | ||
141 | body: data, | ||
142 | contextType | ||
143 | } | ||
144 | |||
145 | JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | ||
129 | } | 146 | } |
130 | 147 | ||
131 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | 148 | for (const unicastUri of unicastUris) { |
149 | const payload = { | ||
150 | uri: unicastUri, | ||
151 | signatureActorId: byActor.id, | ||
152 | body: data, | ||
153 | contextType | ||
154 | } | ||
155 | |||
156 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | ||
157 | } | ||
132 | } | 158 | } |
133 | 159 | ||
134 | function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { | 160 | function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { |
diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts index 3af08acf4..9f952a218 100644 --- a/server/lib/activitypub/videos/refresh.ts +++ b/server/lib/activitypub/videos/refresh.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
2 | import { PeerTubeRequestError } from '@server/helpers/requests' | 2 | import { PeerTubeRequestError } from '@server/helpers/requests' |
3 | import { ActorFollowScoreCache } from '@server/lib/files-cache' | ||
4 | import { VideoLoadByUrlType } from '@server/lib/model-loaders' | 3 | import { VideoLoadByUrlType } from '@server/lib/model-loaders' |
5 | import { VideoModel } from '@server/models/video/video' | 4 | import { VideoModel } from '@server/models/video/video' |
6 | import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' | 5 | import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' |
7 | import { HttpStatusCode } from '@shared/models' | 6 | import { HttpStatusCode } from '@shared/models' |
7 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | ||
8 | import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' | 8 | import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' |
9 | import { APVideoUpdater } from './updater' | 9 | import { APVideoUpdater } from './updater' |
10 | 10 | ||
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: { | |||
39 | 39 | ||
40 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) | 40 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) |
41 | 41 | ||
42 | ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) | 42 | ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) |
43 | 43 | ||
44 | return video | 44 | return video |
45 | } catch (err) { | 45 | } catch (err) { |
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: { | |||
53 | 53 | ||
54 | logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) | 54 | logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) |
55 | 55 | ||
56 | ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) | 56 | ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) |
57 | 57 | ||
58 | // Don't refresh in loop | 58 | // Don't refresh in loop |
59 | await video.setAsRefreshed() | 59 | await video.setAsRefreshed() |
diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/actor-follow-health-cache.ts index 465080e72..ab8cc98df 100644 --- a/server/lib/files-cache/actor-follow-score-cache.ts +++ b/server/lib/actor-follow-health-cache.ts | |||
@@ -1,22 +1,28 @@ | |||
1 | import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants' | 1 | import { ACTOR_FOLLOW_SCORE } from '../initializers/constants' |
2 | import { logger } from '../../helpers/logger' | 2 | import { logger } from '../helpers/logger' |
3 | 3 | ||
4 | // Cache follows scores, instead of writing them too often in database | 4 | // Cache follows scores, instead of writing them too often in database |
5 | // Keep data in memory, we don't really need Redis here as we don't really care to loose some scores | 5 | // Keep data in memory, we don't really need Redis here as we don't really care to loose some scores |
6 | class ActorFollowScoreCache { | 6 | class ActorFollowHealthCache { |
7 | |||
8 | private static instance: ActorFollowHealthCache | ||
7 | 9 | ||
8 | private static instance: ActorFollowScoreCache | ||
9 | private pendingFollowsScore: { [ url: string ]: number } = {} | 10 | private pendingFollowsScore: { [ url: string ]: number } = {} |
11 | |||
10 | private pendingBadServer = new Set<number>() | 12 | private pendingBadServer = new Set<number>() |
11 | private pendingGoodServer = new Set<number>() | 13 | private pendingGoodServer = new Set<number>() |
12 | 14 | ||
15 | private badInboxes = new Set<string>() | ||
16 | |||
13 | private constructor () {} | 17 | private constructor () {} |
14 | 18 | ||
15 | static get Instance () { | 19 | static get Instance () { |
16 | return this.instance || (this.instance = new this()) | 20 | return this.instance || (this.instance = new this()) |
17 | } | 21 | } |
18 | 22 | ||
19 | updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { | 23 | updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) { |
24 | this.badInboxes.clear() | ||
25 | |||
20 | if (goodInboxes.length === 0 && badInboxes.length === 0) return | 26 | if (goodInboxes.length === 0 && badInboxes.length === 0) return |
21 | 27 | ||
22 | logger.info( | 28 | logger.info( |
@@ -34,9 +40,14 @@ class ActorFollowScoreCache { | |||
34 | if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 | 40 | if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 |
35 | 41 | ||
36 | this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY | 42 | this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY |
43 | this.badInboxes.add(badInbox) | ||
37 | } | 44 | } |
38 | } | 45 | } |
39 | 46 | ||
47 | isBadInbox (inboxUrl: string) { | ||
48 | return this.badInboxes.has(inboxUrl) | ||
49 | } | ||
50 | |||
40 | addBadServerId (serverId: number) { | 51 | addBadServerId (serverId: number) { |
41 | this.pendingBadServer.add(serverId) | 52 | this.pendingBadServer.add(serverId) |
42 | } | 53 | } |
@@ -71,5 +82,5 @@ class ActorFollowScoreCache { | |||
71 | } | 82 | } |
72 | 83 | ||
73 | export { | 84 | export { |
74 | ActorFollowScoreCache | 85 | ActorFollowHealthCache |
75 | } | 86 | } |
diff --git a/server/lib/files-cache/index.ts b/server/lib/files-cache/index.ts index e921d04a7..e5853f7d6 100644 --- a/server/lib/files-cache/index.ts +++ b/server/lib/files-cache/index.ts | |||
@@ -1,3 +1,3 @@ | |||
1 | export * from './actor-follow-score-cache' | ||
2 | export * from './videos-preview-cache' | 1 | export * from './videos-preview-cache' |
3 | export * from './videos-caption-cache' | 2 | export * from './videos-caption-cache' |
3 | export * from './videos-torrent-cache' | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 9b0bb6574..fbf01d276 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { map } from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import { Job } from 'bull' | 2 | import { Job } from 'bull' |
3 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
3 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | 4 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
5 | import { doRequest } from '../../../helpers/requests' | 6 | import { doRequest } from '../../../helpers/requests' |
6 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | 7 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' |
7 | import { ActorFollowScoreCache } from '../../files-cache' | ||
8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
@@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) { | |||
25 | const badUrls: string[] = [] | 25 | const badUrls: string[] = [] |
26 | const goodUrls: string[] = [] | 26 | const goodUrls: string[] = [] |
27 | 27 | ||
28 | await map(payload.uris, uri => { | 28 | await map(payload.uris, async uri => { |
29 | return doRequest(uri, options) | 29 | try { |
30 | .then(() => goodUrls.push(uri)) | 30 | await doRequest(uri, options) |
31 | .catch(() => badUrls.push(uri)) | 31 | goodUrls.push(uri) |
32 | } catch (err) { | ||
33 | logger.debug('HTTP broadcast to %s failed.', uri, { err }) | ||
34 | badUrls.push(uri) | ||
35 | } | ||
32 | }, { concurrency: BROADCAST_CONCURRENCY }) | 36 | }, { concurrency: BROADCAST_CONCURRENCY }) |
33 | 37 | ||
34 | return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) | 38 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) |
35 | } | 39 | } |
36 | 40 | ||
37 | // --------------------------------------------------------------------------- | 41 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 9be50837f..673583d2b 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -2,7 +2,7 @@ import { Job } from 'bull' | |||
2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | 2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
5 | import { ActorFollowScoreCache } from '../../files-cache' | 5 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' |
6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
@@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) { | |||
23 | 23 | ||
24 | try { | 24 | try { |
25 | await doRequest(uri, options) | 25 | await doRequest(uri, options) |
26 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) | 26 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) |
27 | } catch (err) { | 27 | } catch (err) { |
28 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) | 28 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) |
29 | 29 | ||
30 | throw err | 30 | throw err |
31 | } | 31 | } |
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts index 1b80316e9..a5a377573 100644 --- a/server/lib/schedulers/actor-follow-scheduler.ts +++ b/server/lib/schedulers/actor-follow-scheduler.ts | |||
@@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils' | |||
2 | import { logger } from '../../helpers/logger' | 2 | import { logger } from '../../helpers/logger' |
3 | import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | 3 | import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' |
4 | import { ActorFollowModel } from '../../models/actor/actor-follow' | 4 | import { ActorFollowModel } from '../../models/actor/actor-follow' |
5 | import { ActorFollowScoreCache } from '../files-cache' | 5 | import { ActorFollowHealthCache } from '../actor-follow-health-cache' |
6 | import { AbstractScheduler } from './abstract-scheduler' | 6 | import { AbstractScheduler } from './abstract-scheduler' |
7 | 7 | ||
8 | export class ActorFollowScheduler extends AbstractScheduler { | 8 | export class ActorFollowScheduler extends AbstractScheduler { |
@@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler { | |||
22 | } | 22 | } |
23 | 23 | ||
24 | private async processPendingScores () { | 24 | private async processPendingScores () { |
25 | const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore() | 25 | const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore() |
26 | const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds() | 26 | const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds() |
27 | const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds() | 27 | const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds() |
28 | 28 | ||
29 | ActorFollowScoreCache.Instance.clearPendingFollowsScore() | 29 | ActorFollowHealthCache.Instance.clearPendingFollowsScore() |
30 | ActorFollowScoreCache.Instance.clearBadFollowingServerIds() | 30 | ActorFollowHealthCache.Instance.clearBadFollowingServerIds() |
31 | ActorFollowScoreCache.Instance.clearGoodFollowingServerIds() | 31 | ActorFollowHealthCache.Instance.clearGoodFollowingServerIds() |
32 | 32 | ||
33 | for (const inbox of Object.keys(pendingScores)) { | 33 | for (const inbox of Object.keys(pendingScores)) { |
34 | await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) | 34 | await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) |