diff options
author | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
commit | 9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch) | |
tree | 716078fbe1506e0b0d19936f4939e9c530b3b8ab /server | |
parent | e81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff) | |
download | PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip |
Process slow followers in unicast job queue
Diffstat (limited to 'server')
-rw-r--r-- | server/initializers/constants.ts | 8 | ||||
-rw-r--r-- | server/lib/activitypub/send/utils.ts | 40 | ||||
-rw-r--r-- | server/lib/activitypub/videos/refresh.ts | 6 | ||||
-rw-r--r-- | server/lib/actor-follow-health-cache.ts (renamed from server/lib/files-cache/actor-follow-score-cache.ts) | 23 | ||||
-rw-r--r-- | server/lib/files-cache/index.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 16 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 6 | ||||
-rw-r--r-- | server/lib/schedulers/actor-follow-scheduler.ts | 14 | ||||
-rw-r--r-- | server/tests/api/server/index.ts | 1 | ||||
-rw-r--r-- | server/tests/api/server/slow-follows.ts | 81 |
10 files changed, 160 insertions, 37 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index facd3b721..9896e1efb 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -134,9 +134,9 @@ const REMOTE_SCHEME = { | |||
134 | } | 134 | } |
135 | 135 | ||
136 | const JOB_ATTEMPTS: { [id in JobType]: number } = { | 136 | const JOB_ATTEMPTS: { [id in JobType]: number } = { |
137 | 'activitypub-http-broadcast': 5, | 137 | 'activitypub-http-broadcast': 1, |
138 | 'activitypub-http-unicast': 5, | 138 | 'activitypub-http-unicast': 1, |
139 | 'activitypub-http-fetcher': 5, | 139 | 'activitypub-http-fetcher': 2, |
140 | 'activitypub-follow': 5, | 140 | 'activitypub-follow': 5, |
141 | 'activitypub-cleaner': 1, | 141 | 'activitypub-cleaner': 1, |
142 | 'video-file-import': 1, | 142 | 'video-file-import': 1, |
@@ -153,7 +153,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
153 | // Excluded keys are jobs that can be configured by admins | 153 | // Excluded keys are jobs that can be configured by admins |
154 | const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { | 154 | const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { |
155 | 'activitypub-http-broadcast': 1, | 155 | 'activitypub-http-broadcast': 1, |
156 | 'activitypub-http-unicast': 5, | 156 | 'activitypub-http-unicast': 10, |
157 | 'activitypub-http-fetcher': 3, | 157 | 'activitypub-http-fetcher': 3, |
158 | 'activitypub-cleaner': 1, | 158 | 'activitypub-cleaner': 1, |
159 | 'activitypub-follow': 1, | 159 | 'activitypub-follow': 1, |
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts index 7cd8030e1..7729703b8 100644 --- a/server/lib/activitypub/send/utils.ts +++ b/server/lib/activitypub/send/utils.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
2 | import { getServerActor } from '@server/models/application/application' | 3 | import { getServerActor } from '@server/models/application/application' |
3 | import { ContextType } from '@shared/models/activitypub/context' | 4 | import { ContextType } from '@shared/models/activitypub/context' |
4 | import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' | 5 | import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' |
@@ -119,16 +120,41 @@ async function broadcastToActors ( | |||
119 | function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { | 120 | function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { |
120 | if (uris.length === 0) return undefined | 121 | if (uris.length === 0) return undefined |
121 | 122 | ||
122 | logger.debug('Creating broadcast job.', { uris }) | 123 | const broadcastUris: string[] = [] |
124 | const unicastUris: string[] = [] | ||
123 | 125 | ||
124 | const payload = { | 126 | // Bad URIs could be slow to respond, prefer to process them in a dedicated queue |
125 | uris, | 127 | for (const uri of uris) { |
126 | signatureActorId: byActor.id, | 128 | if (ActorFollowHealthCache.Instance.isBadInbox(uri)) { |
127 | body: data, | 129 | unicastUris.push(uri) |
128 | contextType | 130 | } else { |
131 | broadcastUris.push(uri) | ||
132 | } | ||
133 | } | ||
134 | |||
135 | logger.debug('Creating broadcast job.', { broadcastUris, unicastUris }) | ||
136 | |||
137 | if (broadcastUris.length !== 0) { | ||
138 | const payload = { | ||
139 | uris: broadcastUris, | ||
140 | signatureActorId: byActor.id, | ||
141 | body: data, | ||
142 | contextType | ||
143 | } | ||
144 | |||
145 | JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | ||
129 | } | 146 | } |
130 | 147 | ||
131 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | 148 | for (const unicastUri of unicastUris) { |
149 | const payload = { | ||
150 | uri: unicastUri, | ||
151 | signatureActorId: byActor.id, | ||
152 | body: data, | ||
153 | contextType | ||
154 | } | ||
155 | |||
156 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | ||
157 | } | ||
132 | } | 158 | } |
133 | 159 | ||
134 | function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { | 160 | function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { |
diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts index 3af08acf4..9f952a218 100644 --- a/server/lib/activitypub/videos/refresh.ts +++ b/server/lib/activitypub/videos/refresh.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
2 | import { PeerTubeRequestError } from '@server/helpers/requests' | 2 | import { PeerTubeRequestError } from '@server/helpers/requests' |
3 | import { ActorFollowScoreCache } from '@server/lib/files-cache' | ||
4 | import { VideoLoadByUrlType } from '@server/lib/model-loaders' | 3 | import { VideoLoadByUrlType } from '@server/lib/model-loaders' |
5 | import { VideoModel } from '@server/models/video/video' | 4 | import { VideoModel } from '@server/models/video/video' |
6 | import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' | 5 | import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' |
7 | import { HttpStatusCode } from '@shared/models' | 6 | import { HttpStatusCode } from '@shared/models' |
7 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | ||
8 | import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' | 8 | import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' |
9 | import { APVideoUpdater } from './updater' | 9 | import { APVideoUpdater } from './updater' |
10 | 10 | ||
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: { | |||
39 | 39 | ||
40 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) | 40 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) |
41 | 41 | ||
42 | ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) | 42 | ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) |
43 | 43 | ||
44 | return video | 44 | return video |
45 | } catch (err) { | 45 | } catch (err) { |
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: { | |||
53 | 53 | ||
54 | logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) | 54 | logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) |
55 | 55 | ||
56 | ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) | 56 | ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) |
57 | 57 | ||
58 | // Don't refresh in loop | 58 | // Don't refresh in loop |
59 | await video.setAsRefreshed() | 59 | await video.setAsRefreshed() |
diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/actor-follow-health-cache.ts index 465080e72..ab8cc98df 100644 --- a/server/lib/files-cache/actor-follow-score-cache.ts +++ b/server/lib/actor-follow-health-cache.ts | |||
@@ -1,22 +1,28 @@ | |||
1 | import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants' | 1 | import { ACTOR_FOLLOW_SCORE } from '../initializers/constants' |
2 | import { logger } from '../../helpers/logger' | 2 | import { logger } from '../helpers/logger' |
3 | 3 | ||
4 | // Cache follows scores, instead of writing them too often in database | 4 | // Cache follows scores, instead of writing them too often in database |
5 | // Keep data in memory, we don't really need Redis here as we don't really care to loose some scores | 5 | // Keep data in memory, we don't really need Redis here as we don't really care to loose some scores |
6 | class ActorFollowScoreCache { | 6 | class ActorFollowHealthCache { |
7 | |||
8 | private static instance: ActorFollowHealthCache | ||
7 | 9 | ||
8 | private static instance: ActorFollowScoreCache | ||
9 | private pendingFollowsScore: { [ url: string ]: number } = {} | 10 | private pendingFollowsScore: { [ url: string ]: number } = {} |
11 | |||
10 | private pendingBadServer = new Set<number>() | 12 | private pendingBadServer = new Set<number>() |
11 | private pendingGoodServer = new Set<number>() | 13 | private pendingGoodServer = new Set<number>() |
12 | 14 | ||
15 | private badInboxes = new Set<string>() | ||
16 | |||
13 | private constructor () {} | 17 | private constructor () {} |
14 | 18 | ||
15 | static get Instance () { | 19 | static get Instance () { |
16 | return this.instance || (this.instance = new this()) | 20 | return this.instance || (this.instance = new this()) |
17 | } | 21 | } |
18 | 22 | ||
19 | updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { | 23 | updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) { |
24 | this.badInboxes.clear() | ||
25 | |||
20 | if (goodInboxes.length === 0 && badInboxes.length === 0) return | 26 | if (goodInboxes.length === 0 && badInboxes.length === 0) return |
21 | 27 | ||
22 | logger.info( | 28 | logger.info( |
@@ -34,9 +40,14 @@ class ActorFollowScoreCache { | |||
34 | if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 | 40 | if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 |
35 | 41 | ||
36 | this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY | 42 | this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY |
43 | this.badInboxes.add(badInbox) | ||
37 | } | 44 | } |
38 | } | 45 | } |
39 | 46 | ||
47 | isBadInbox (inboxUrl: string) { | ||
48 | return this.badInboxes.has(inboxUrl) | ||
49 | } | ||
50 | |||
40 | addBadServerId (serverId: number) { | 51 | addBadServerId (serverId: number) { |
41 | this.pendingBadServer.add(serverId) | 52 | this.pendingBadServer.add(serverId) |
42 | } | 53 | } |
@@ -71,5 +82,5 @@ class ActorFollowScoreCache { | |||
71 | } | 82 | } |
72 | 83 | ||
73 | export { | 84 | export { |
74 | ActorFollowScoreCache | 85 | ActorFollowHealthCache |
75 | } | 86 | } |
diff --git a/server/lib/files-cache/index.ts b/server/lib/files-cache/index.ts index e921d04a7..e5853f7d6 100644 --- a/server/lib/files-cache/index.ts +++ b/server/lib/files-cache/index.ts | |||
@@ -1,3 +1,3 @@ | |||
1 | export * from './actor-follow-score-cache' | ||
2 | export * from './videos-preview-cache' | 1 | export * from './videos-preview-cache' |
3 | export * from './videos-caption-cache' | 2 | export * from './videos-caption-cache' |
3 | export * from './videos-torrent-cache' | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 9b0bb6574..fbf01d276 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { map } from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import { Job } from 'bull' | 2 | import { Job } from 'bull' |
3 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
3 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | 4 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | 5 | import { logger } from '../../../helpers/logger' |
5 | import { doRequest } from '../../../helpers/requests' | 6 | import { doRequest } from '../../../helpers/requests' |
6 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' | 7 | import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' |
7 | import { ActorFollowScoreCache } from '../../files-cache' | ||
8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 8 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
9 | 9 | ||
10 | async function processActivityPubHttpBroadcast (job: Job) { | 10 | async function processActivityPubHttpBroadcast (job: Job) { |
@@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) { | |||
25 | const badUrls: string[] = [] | 25 | const badUrls: string[] = [] |
26 | const goodUrls: string[] = [] | 26 | const goodUrls: string[] = [] |
27 | 27 | ||
28 | await map(payload.uris, uri => { | 28 | await map(payload.uris, async uri => { |
29 | return doRequest(uri, options) | 29 | try { |
30 | .then(() => goodUrls.push(uri)) | 30 | await doRequest(uri, options) |
31 | .catch(() => badUrls.push(uri)) | 31 | goodUrls.push(uri) |
32 | } catch (err) { | ||
33 | logger.debug('HTTP broadcast to %s failed.', uri, { err }) | ||
34 | badUrls.push(uri) | ||
35 | } | ||
32 | }, { concurrency: BROADCAST_CONCURRENCY }) | 36 | }, { concurrency: BROADCAST_CONCURRENCY }) |
33 | 37 | ||
34 | return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) | 38 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) |
35 | } | 39 | } |
36 | 40 | ||
37 | // --------------------------------------------------------------------------- | 41 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 9be50837f..673583d2b 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -2,7 +2,7 @@ import { Job } from 'bull' | |||
2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | 2 | import { ActivitypubHttpUnicastPayload } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
5 | import { ActorFollowScoreCache } from '../../files-cache' | 5 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' |
6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
7 | 7 | ||
8 | async function processActivityPubHttpUnicast (job: Job) { | 8 | async function processActivityPubHttpUnicast (job: Job) { |
@@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) { | |||
23 | 23 | ||
24 | try { | 24 | try { |
25 | await doRequest(uri, options) | 25 | await doRequest(uri, options) |
26 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) | 26 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) |
27 | } catch (err) { | 27 | } catch (err) { |
28 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) | 28 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) |
29 | 29 | ||
30 | throw err | 30 | throw err |
31 | } | 31 | } |
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts index 1b80316e9..a5a377573 100644 --- a/server/lib/schedulers/actor-follow-scheduler.ts +++ b/server/lib/schedulers/actor-follow-scheduler.ts | |||
@@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils' | |||
2 | import { logger } from '../../helpers/logger' | 2 | import { logger } from '../../helpers/logger' |
3 | import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | 3 | import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' |
4 | import { ActorFollowModel } from '../../models/actor/actor-follow' | 4 | import { ActorFollowModel } from '../../models/actor/actor-follow' |
5 | import { ActorFollowScoreCache } from '../files-cache' | 5 | import { ActorFollowHealthCache } from '../actor-follow-health-cache' |
6 | import { AbstractScheduler } from './abstract-scheduler' | 6 | import { AbstractScheduler } from './abstract-scheduler' |
7 | 7 | ||
8 | export class ActorFollowScheduler extends AbstractScheduler { | 8 | export class ActorFollowScheduler extends AbstractScheduler { |
@@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler { | |||
22 | } | 22 | } |
23 | 23 | ||
24 | private async processPendingScores () { | 24 | private async processPendingScores () { |
25 | const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore() | 25 | const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore() |
26 | const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds() | 26 | const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds() |
27 | const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds() | 27 | const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds() |
28 | 28 | ||
29 | ActorFollowScoreCache.Instance.clearPendingFollowsScore() | 29 | ActorFollowHealthCache.Instance.clearPendingFollowsScore() |
30 | ActorFollowScoreCache.Instance.clearBadFollowingServerIds() | 30 | ActorFollowHealthCache.Instance.clearBadFollowingServerIds() |
31 | ActorFollowScoreCache.Instance.clearGoodFollowingServerIds() | 31 | ActorFollowHealthCache.Instance.clearGoodFollowingServerIds() |
32 | 32 | ||
33 | for (const inbox of Object.keys(pendingScores)) { | 33 | for (const inbox of Object.keys(pendingScores)) { |
34 | await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) | 34 | await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) |
diff --git a/server/tests/api/server/index.ts b/server/tests/api/server/index.ts index b16a22ee7..8136fc3c6 100644 --- a/server/tests/api/server/index.ts +++ b/server/tests/api/server/index.ts | |||
@@ -11,6 +11,7 @@ import './jobs' | |||
11 | import './logs' | 11 | import './logs' |
12 | import './reverse-proxy' | 12 | import './reverse-proxy' |
13 | import './services' | 13 | import './services' |
14 | import './slow-follows' | ||
14 | import './stats' | 15 | import './stats' |
15 | import './tracker' | 16 | import './tracker' |
16 | import './no-client' | 17 | import './no-client' |
diff --git a/server/tests/api/server/slow-follows.ts b/server/tests/api/server/slow-follows.ts new file mode 100644 index 000000000..2bef0c9f2 --- /dev/null +++ b/server/tests/api/server/slow-follows.ts | |||
@@ -0,0 +1,81 @@ | |||
1 | /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ | ||
2 | |||
3 | import 'mocha' | ||
4 | import * as chai from 'chai' | ||
5 | import { cleanupTests, createMultipleServers, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/extra-utils' | ||
6 | import { Job } from '@shared/models' | ||
7 | |||
8 | const expect = chai.expect | ||
9 | |||
10 | describe('Test slow follows', function () { | ||
11 | let servers: PeerTubeServer[] = [] | ||
12 | |||
13 | let afterFollows: Date | ||
14 | |||
15 | before(async function () { | ||
16 | this.timeout(60000) | ||
17 | |||
18 | servers = await createMultipleServers(3) | ||
19 | |||
20 | // Get the access tokens | ||
21 | await setAccessTokensToServers(servers) | ||
22 | |||
23 | await doubleFollow(servers[0], servers[1]) | ||
24 | await doubleFollow(servers[0], servers[2]) | ||
25 | |||
26 | afterFollows = new Date() | ||
27 | |||
28 | for (let i = 0; i < 5; i++) { | ||
29 | await servers[0].videos.quickUpload({ name: 'video ' + i }) | ||
30 | } | ||
31 | |||
32 | await waitJobs(servers) | ||
33 | }) | ||
34 | |||
35 | it('Should only have broadcast jobs', async function () { | ||
36 | const { data } = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' }) | ||
37 | |||
38 | for (const job of data) { | ||
39 | expect(new Date(job.createdAt)).below(afterFollows) | ||
40 | } | ||
41 | }) | ||
42 | |||
43 | it('Should process bad follower', async function () { | ||
44 | this.timeout(30000) | ||
45 | |||
46 | await servers[1].kill() | ||
47 | |||
48 | // Set server 2 as bad follower | ||
49 | await servers[0].videos.quickUpload({ name: 'video 6' }) | ||
50 | await waitJobs(servers[0]) | ||
51 | |||
52 | afterFollows = new Date() | ||
53 | const filter = (job: Job) => new Date(job.createdAt) > afterFollows | ||
54 | |||
55 | // Resend another broadcast job | ||
56 | await servers[0].videos.quickUpload({ name: 'video 7' }) | ||
57 | await waitJobs(servers[0]) | ||
58 | |||
59 | const resBroadcast = await servers[0].jobs.list({ jobType: 'activitypub-http-broadcast', sort: '-createdAt' }) | ||
60 | const resUnicast = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' }) | ||
61 | |||
62 | const broadcast = resBroadcast.data.filter(filter) | ||
63 | const unicast = resUnicast.data.filter(filter) | ||
64 | |||
65 | expect(unicast).to.have.lengthOf(2) | ||
66 | expect(broadcast).to.have.lengthOf(2) | ||
67 | |||
68 | for (const u of unicast) { | ||
69 | expect(u.data.uri).to.equal(servers[1].url + '/inbox') | ||
70 | } | ||
71 | |||
72 | for (const b of broadcast) { | ||
73 | expect(b.data.uris).to.have.lengthOf(1) | ||
74 | expect(b.data.uris[0]).to.equal(servers[2].url + '/inbox') | ||
75 | } | ||
76 | }) | ||
77 | |||
78 | after(async function () { | ||
79 | await cleanupTests(servers) | ||
80 | }) | ||
81 | }) | ||