diff options
author | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-10-13 11:47:32 +0200 |
commit | 9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch) | |
tree | 716078fbe1506e0b0d19936f4939e9c530b3b8ab /server/lib/activitypub | |
parent | e81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff) | |
download | PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip |
Process slow followers in unicast job queue
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r-- | server/lib/activitypub/send/utils.ts | 40 | ||||
-rw-r--r-- | server/lib/activitypub/videos/refresh.ts | 6 |
2 files changed, 36 insertions, 10 deletions
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts index 7cd8030e1..7729703b8 100644 --- a/server/lib/activitypub/send/utils.ts +++ b/server/lib/activitypub/send/utils.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
2 | import { getServerActor } from '@server/models/application/application' | 3 | import { getServerActor } from '@server/models/application/application' |
3 | import { ContextType } from '@shared/models/activitypub/context' | 4 | import { ContextType } from '@shared/models/activitypub/context' |
4 | import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' | 5 | import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' |
@@ -119,16 +120,41 @@ async function broadcastToActors ( | |||
119 | function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { | 120 | function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { |
120 | if (uris.length === 0) return undefined | 121 | if (uris.length === 0) return undefined |
121 | 122 | ||
122 | logger.debug('Creating broadcast job.', { uris }) | 123 | const broadcastUris: string[] = [] |
124 | const unicastUris: string[] = [] | ||
123 | 125 | ||
124 | const payload = { | 126 | // Bad URIs could be slow to respond, prefer to process them in a dedicated queue |
125 | uris, | 127 | for (const uri of uris) { |
126 | signatureActorId: byActor.id, | 128 | if (ActorFollowHealthCache.Instance.isBadInbox(uri)) { |
127 | body: data, | 129 | unicastUris.push(uri) |
128 | contextType | 130 | } else { |
131 | broadcastUris.push(uri) | ||
132 | } | ||
133 | } | ||
134 | |||
135 | logger.debug('Creating broadcast job.', { broadcastUris, unicastUris }) | ||
136 | |||
137 | if (broadcastUris.length !== 0) { | ||
138 | const payload = { | ||
139 | uris: broadcastUris, | ||
140 | signatureActorId: byActor.id, | ||
141 | body: data, | ||
142 | contextType | ||
143 | } | ||
144 | |||
145 | JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | ||
129 | } | 146 | } |
130 | 147 | ||
131 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | 148 | for (const unicastUri of unicastUris) { |
149 | const payload = { | ||
150 | uri: unicastUri, | ||
151 | signatureActorId: byActor.id, | ||
152 | body: data, | ||
153 | contextType | ||
154 | } | ||
155 | |||
156 | JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) | ||
157 | } | ||
132 | } | 158 | } |
133 | 159 | ||
134 | function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { | 160 | function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { |
diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts index 3af08acf4..9f952a218 100644 --- a/server/lib/activitypub/videos/refresh.ts +++ b/server/lib/activitypub/videos/refresh.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
2 | import { PeerTubeRequestError } from '@server/helpers/requests' | 2 | import { PeerTubeRequestError } from '@server/helpers/requests' |
3 | import { ActorFollowScoreCache } from '@server/lib/files-cache' | ||
4 | import { VideoLoadByUrlType } from '@server/lib/model-loaders' | 3 | import { VideoLoadByUrlType } from '@server/lib/model-loaders' |
5 | import { VideoModel } from '@server/models/video/video' | 4 | import { VideoModel } from '@server/models/video/video' |
6 | import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' | 5 | import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' |
7 | import { HttpStatusCode } from '@shared/models' | 6 | import { HttpStatusCode } from '@shared/models' |
7 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | ||
8 | import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' | 8 | import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' |
9 | import { APVideoUpdater } from './updater' | 9 | import { APVideoUpdater } from './updater' |
10 | 10 | ||
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: { | |||
39 | 39 | ||
40 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) | 40 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) |
41 | 41 | ||
42 | ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) | 42 | ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) |
43 | 43 | ||
44 | return video | 44 | return video |
45 | } catch (err) { | 45 | } catch (err) { |
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: { | |||
53 | 53 | ||
54 | logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) | 54 | logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) |
55 | 55 | ||
56 | ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) | 56 | ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) |
57 | 57 | ||
58 | // Don't refresh in loop | 58 | // Don't refresh in loop |
59 | await video.setAsRefreshed() | 59 | await video.setAsRefreshed() |