aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-10-13 11:47:32 +0200
committerChocobozzz <me@florianbigard.com>2021-10-13 11:47:32 +0200
commit9db437c8155f3563a33e22ed2896072a9f1fbdb0 (patch)
tree716078fbe1506e0b0d19936f4939e9c530b3b8ab /server/lib/activitypub
parente81f6ccf989d4573b59ec7b2bf2812fe3e9fb534 (diff)
downloadPeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.gz
PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.tar.zst
PeerTube-9db437c8155f3563a33e22ed2896072a9f1fbdb0.zip
Process slow followers in unicast job queue
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r--server/lib/activitypub/send/utils.ts40
-rw-r--r--server/lib/activitypub/videos/refresh.ts6
2 files changed, 36 insertions, 10 deletions
diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts
index 7cd8030e1..7729703b8 100644
--- a/server/lib/activitypub/send/utils.ts
+++ b/server/lib/activitypub/send/utils.ts
@@ -1,4 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
2import { getServerActor } from '@server/models/application/application' 3import { getServerActor } from '@server/models/application/application'
3import { ContextType } from '@shared/models/activitypub/context' 4import { ContextType } from '@shared/models/activitypub/context'
4import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' 5import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
@@ -119,16 +120,41 @@ async function broadcastToActors (
119function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { 120function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
120 if (uris.length === 0) return undefined 121 if (uris.length === 0) return undefined
121 122
122 logger.debug('Creating broadcast job.', { uris }) 123 const broadcastUris: string[] = []
124 const unicastUris: string[] = []
123 125
124 const payload = { 126 // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
125 uris, 127 for (const uri of uris) {
126 signatureActorId: byActor.id, 128 if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
127 body: data, 129 unicastUris.push(uri)
128 contextType 130 } else {
131 broadcastUris.push(uri)
132 }
133 }
134
135 logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
136
137 if (broadcastUris.length !== 0) {
138 const payload = {
139 uris: broadcastUris,
140 signatureActorId: byActor.id,
141 body: data,
142 contextType
143 }
144
145 JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
129 } 146 }
130 147
131 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) 148 for (const unicastUri of unicastUris) {
149 const payload = {
150 uri: unicastUri,
151 signatureActorId: byActor.id,
152 body: data,
153 contextType
154 }
155
156 JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
157 }
132} 158}
133 159
134function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { 160function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts
index 3af08acf4..9f952a218 100644
--- a/server/lib/activitypub/videos/refresh.ts
+++ b/server/lib/activitypub/videos/refresh.ts
@@ -1,10 +1,10 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger' 1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { PeerTubeRequestError } from '@server/helpers/requests' 2import { PeerTubeRequestError } from '@server/helpers/requests'
3import { ActorFollowScoreCache } from '@server/lib/files-cache'
4import { VideoLoadByUrlType } from '@server/lib/model-loaders' 3import { VideoLoadByUrlType } from '@server/lib/model-loaders'
5import { VideoModel } from '@server/models/video/video' 4import { VideoModel } from '@server/models/video/video'
6import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' 5import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
7import { HttpStatusCode } from '@shared/models' 6import { HttpStatusCode } from '@shared/models'
7import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
8import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' 8import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
9import { APVideoUpdater } from './updater' 9import { APVideoUpdater } from './updater'
10 10
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: {
39 39
40 await syncVideoExternalAttributes(video, videoObject, options.syncParam) 40 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
41 41
42 ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) 42 ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
43 43
44 return video 44 return video
45 } catch (err) { 45 } catch (err) {
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: {
53 53
54 logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) 54 logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
55 55
56 ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) 56 ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
57 57
58 // Don't refresh in loop 58 // Don't refresh in loop
59 await video.setAsRefreshed() 59 await video.setAsRefreshed()