diff options
author | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
commit | 9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch) | |
tree | 716078fbe1506e0b0d19936f4939e9c530b3b8ab /server/lib/job-queue/handlers | |
parent | e81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff) | |
download | PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip |
Process slow followers in unicast job queue
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 16 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 6 |
2 files changed, 13 insertions, 9 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 9b0bb6574..fbf01d276 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { map } from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import { Job } from 'bull' | 2 | import { Job } from 'bull' |
3 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
3 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | 4 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
5 | import { doRequest } from '../../../helpers/requests' | 6 | import { doRequest } from '../../../helpers/requests' |
6 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | 7 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' |
7 | import { ActorFollowScoreCache } from '../../files-cache' | ||
8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
@@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) { | |||
25 | const badUrls: string[] = [] | 25 | const badUrls: string[] = [] |
26 | const goodUrls: string[] = [] | 26 | const goodUrls: string[] = [] |
27 | 27 | ||
28 | await map(payload.uris, uri => { | 28 | await map(payload.uris, async uri => { |
29 | return doRequest(uri, options) | 29 | try { |
30 | .then(() => goodUrls.push(uri)) | 30 | await doRequest(uri, options) |
31 | .catch(() => badUrls.push(uri)) | 31 | goodUrls.push(uri) |
32 | } catch (err) { | ||
33 | logger.debug('HTTP broadcast to %s failed.', uri, { err }) | ||
34 | badUrls.push(uri) | ||
35 | } | ||
32 | }, { concurrency: BROADCAST_CONCURRENCY }) | 36 | }, { concurrency: BROADCAST_CONCURRENCY }) |
33 | 37 | ||
34 | return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) | 38 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) |
35 | } | 39 | } |
36 | 40 | ||
37 | // --------------------------------------------------------------------------- | 41 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 9be50837f..673583d2b 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -2,7 +2,7 @@ import { Job } from 'bull' | |||
2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | 2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
5 | import { ActorFollowScoreCache } from '../../files-cache' | 5 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' |
6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
@@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) { | |||
23 | 23 | ||
24 | try { | 24 | try { |
25 | await doRequest(uri, options) | 25 | await doRequest(uri, options) |
26 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) | 26 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) |
27 | } catch (err) { | 27 | } catch (err) { |
28 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) | 28 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) |
29 | 29 | ||
30 | throw err | 30 | throw err |
31 | } | 31 | } |