diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-11-17 11:35:10 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-11-27 19:40:52 +0100 |
commit | afffe98839db7ccbfa9fb8b7d1413b97900fdc73 (patch) | |
tree | f0b3cbe58f73e81a5ba4bb31dabd9691994cf2ca /server/lib | |
parent | 1b3989b0961d22a5a45ad16239e3c3f58f66180c (diff) | |
download | PeerTube-afffe98839db7ccbfa9fb8b7d1413b97900fdc73.tar.gz PeerTube-afffe98839db7ccbfa9fb8b7d1413b97900fdc73.tar.zst PeerTube-afffe98839db7ccbfa9fb8b7d1413b97900fdc73.zip |
Speed up activity pub http requests
Diffstat (limited to 'server/lib')
10 files changed, 193 insertions, 179 deletions
diff --git a/server/lib/activitypub/send-request.ts b/server/lib/activitypub/send-request.ts index abc1b598d..8d013fa87 100644 --- a/server/lib/activitypub/send-request.ts +++ b/server/lib/activitypub/send-request.ts | |||
@@ -1,116 +1,124 @@ | |||
1 | import * as Sequelize from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | |||
3 | import { database as db } from '../../initializers' | ||
4 | import { | 2 | import { |
5 | AccountInstance, | 3 | ActivityAccept, |
6 | VideoInstance, | 4 | ActivityAdd, |
7 | VideoChannelInstance | 5 | ActivityCreate, |
8 | } from '../../models' | 6 | ActivityDelete, |
9 | import { httpRequestJobScheduler } from '../jobs' | 7 | ActivityFollow, |
10 | import { signObject, activityPubContextify } from '../../helpers' | 8 | ActivityUpdate |
11 | import { Activity, VideoAbuseObject } from '../../../shared' | 9 | } from '../../../shared/models/activitypub/activity' |
12 | import { VideoAbuseInstance } from '../../models/video/video-abuse-interface' | ||
13 | import { getActivityPubUrl } from '../../helpers/activitypub' | 10 | import { getActivityPubUrl } from '../../helpers/activitypub' |
14 | import { logger } from '../../helpers/logger' | 11 | import { logger } from '../../helpers/logger' |
12 | import { database as db } from '../../initializers' | ||
13 | import { AccountInstance, VideoChannelInstance, VideoInstance } from '../../models' | ||
14 | import { VideoAbuseInstance } from '../../models/video/video-abuse-interface' | ||
15 | import { activitypubHttpJobScheduler } from '../jobs' | ||
16 | |||
17 | async function sendCreateVideoChannel (videoChannel: VideoChannelInstance, t: Transaction) { | ||
18 | const byAccount = videoChannel.Account | ||
15 | 19 | ||
16 | async function sendCreateVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { | ||
17 | const videoChannelObject = videoChannel.toActivityPubObject() | 20 | const videoChannelObject = videoChannel.toActivityPubObject() |
18 | const data = await createActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) | 21 | const data = await createActivityData(videoChannel.url, byAccount, videoChannelObject) |
19 | 22 | ||
20 | return broadcastToFollowers(data, [ videoChannel.Account ], t) | 23 | return broadcastToFollowers(data, byAccount, [ byAccount ], t) |
21 | } | 24 | } |
22 | 25 | ||
23 | async function sendUpdateVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { | 26 | async function sendUpdateVideoChannel (videoChannel: VideoChannelInstance, t: Transaction) { |
27 | const byAccount = videoChannel.Account | ||
28 | |||
24 | const videoChannelObject = videoChannel.toActivityPubObject() | 29 | const videoChannelObject = videoChannel.toActivityPubObject() |
25 | const data = await updateActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) | 30 | const data = await updateActivityData(videoChannel.url, byAccount, videoChannelObject) |
26 | 31 | ||
27 | const accountsInvolved = await db.VideoChannelShare.loadAccountsByShare(videoChannel.id) | 32 | const accountsInvolved = await db.VideoChannelShare.loadAccountsByShare(videoChannel.id) |
28 | accountsInvolved.push(videoChannel.Account) | 33 | accountsInvolved.push(byAccount) |
29 | 34 | ||
30 | return broadcastToFollowers(data, accountsInvolved, t) | 35 | return broadcastToFollowers(data, byAccount, accountsInvolved, t) |
31 | } | 36 | } |
32 | 37 | ||
33 | async function sendDeleteVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { | 38 | async function sendDeleteVideoChannel (videoChannel: VideoChannelInstance, t: Transaction) { |
34 | const data = await deleteActivityData(videoChannel.url, videoChannel.Account) | 39 | const byAccount = videoChannel.Account |
40 | |||
41 | const data = await deleteActivityData(videoChannel.url, byAccount) | ||
35 | 42 | ||
36 | const accountsInvolved = await db.VideoChannelShare.loadAccountsByShare(videoChannel.id) | 43 | const accountsInvolved = await db.VideoChannelShare.loadAccountsByShare(videoChannel.id) |
37 | accountsInvolved.push(videoChannel.Account) | 44 | accountsInvolved.push(byAccount) |
38 | 45 | ||
39 | return broadcastToFollowers(data, accountsInvolved, t) | 46 | return broadcastToFollowers(data, byAccount, accountsInvolved, t) |
40 | } | 47 | } |
41 | 48 | ||
42 | async function sendAddVideo (video: VideoInstance, t: Sequelize.Transaction) { | 49 | async function sendAddVideo (video: VideoInstance, t: Transaction) { |
50 | const byAccount = video.VideoChannel.Account | ||
51 | |||
43 | const videoObject = video.toActivityPubObject() | 52 | const videoObject = video.toActivityPubObject() |
44 | const data = await addActivityData(video.url, video.VideoChannel.Account, video.VideoChannel.url, videoObject) | 53 | const data = await addActivityData(video.url, byAccount, video.VideoChannel.url, videoObject) |
45 | 54 | ||
46 | return broadcastToFollowers(data, [ video.VideoChannel.Account ], t) | 55 | return broadcastToFollowers(data, byAccount, [ byAccount ], t) |
47 | } | 56 | } |
48 | 57 | ||
49 | async function sendUpdateVideo (video: VideoInstance, t: Sequelize.Transaction) { | 58 | async function sendUpdateVideo (video: VideoInstance, t: Transaction) { |
59 | const byAccount = video.VideoChannel.Account | ||
60 | |||
50 | const videoObject = video.toActivityPubObject() | 61 | const videoObject = video.toActivityPubObject() |
51 | const data = await updateActivityData(video.url, video.VideoChannel.Account, videoObject) | 62 | const data = await updateActivityData(video.url, byAccount, videoObject) |
52 | 63 | ||
53 | const accountsInvolved = await db.VideoShare.loadAccountsByShare(video.id) | 64 | const accountsInvolved = await db.VideoShare.loadAccountsByShare(video.id) |
54 | accountsInvolved.push(video.VideoChannel.Account) | 65 | accountsInvolved.push(byAccount) |
55 | 66 | ||
56 | return broadcastToFollowers(data, accountsInvolved, t) | 67 | return broadcastToFollowers(data, byAccount, accountsInvolved, t) |
57 | } | 68 | } |
58 | 69 | ||
59 | async function sendDeleteVideo (video: VideoInstance, t: Sequelize.Transaction) { | 70 | async function sendDeleteVideo (video: VideoInstance, t: Transaction) { |
60 | const data = await deleteActivityData(video.url, video.VideoChannel.Account) | 71 | const byAccount = video.VideoChannel.Account |
72 | |||
73 | const data = await deleteActivityData(video.url, byAccount) | ||
61 | 74 | ||
62 | const accountsInvolved = await db.VideoShare.loadAccountsByShare(video.id) | 75 | const accountsInvolved = await db.VideoShare.loadAccountsByShare(video.id) |
63 | accountsInvolved.push(video.VideoChannel.Account) | 76 | accountsInvolved.push(byAccount) |
64 | 77 | ||
65 | return broadcastToFollowers(data, accountsInvolved, t) | 78 | return broadcastToFollowers(data, byAccount, accountsInvolved, t) |
66 | } | 79 | } |
67 | 80 | ||
68 | async function sendDeleteAccount (account: AccountInstance, t: Sequelize.Transaction) { | 81 | async function sendDeleteAccount (account: AccountInstance, t: Transaction) { |
69 | const data = await deleteActivityData(account.url, account) | 82 | const data = await deleteActivityData(account.url, account) |
70 | 83 | ||
71 | return broadcastToFollowers(data, [ account ], t) | 84 | return broadcastToFollowers(data, account, [ account ], t) |
72 | } | 85 | } |
73 | 86 | ||
74 | async function sendVideoChannelAnnounce (byAccount: AccountInstance, videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { | 87 | async function sendVideoChannelAnnounce (byAccount: AccountInstance, videoChannel: VideoChannelInstance, t: Transaction) { |
75 | const url = getActivityPubUrl('videoChannel', videoChannel.uuid) + '#announce' | 88 | const url = getActivityPubUrl('videoChannel', videoChannel.uuid) + '#announce' |
76 | const announcedActivity = await createActivityData(url, videoChannel.Account, videoChannel.toActivityPubObject(), true) | 89 | const announcedActivity = await createActivityData(url, videoChannel.Account, videoChannel.toActivityPubObject()) |
77 | 90 | ||
78 | const data = await announceActivityData(url, byAccount, announcedActivity) | 91 | const data = await announceActivityData(url, byAccount, announcedActivity) |
79 | return broadcastToFollowers(data, [ byAccount ], t) | 92 | return broadcastToFollowers(data, byAccount, [ byAccount ], t) |
80 | } | 93 | } |
81 | 94 | ||
82 | async function sendVideoAnnounce (byAccount: AccountInstance, video: VideoInstance, t: Sequelize.Transaction) { | 95 | async function sendVideoAnnounce (byAccount: AccountInstance, video: VideoInstance, t: Transaction) { |
83 | const url = getActivityPubUrl('video', video.uuid) + '#announce' | 96 | const url = getActivityPubUrl('video', video.uuid) + '#announce' |
84 | 97 | ||
85 | const videoChannel = video.VideoChannel | 98 | const videoChannel = video.VideoChannel |
86 | const announcedActivity = await addActivityData(url, videoChannel.Account, videoChannel.url, video.toActivityPubObject(), true) | 99 | const announcedActivity = await addActivityData(url, videoChannel.Account, videoChannel.url, video.toActivityPubObject()) |
87 | 100 | ||
88 | const data = await announceActivityData(url, byAccount, announcedActivity) | 101 | const data = await announceActivityData(url, byAccount, announcedActivity) |
89 | return broadcastToFollowers(data, [ byAccount ], t) | 102 | return broadcastToFollowers(data, byAccount, [ byAccount ], t) |
90 | } | 103 | } |
91 | 104 | ||
92 | async function sendVideoAbuse ( | 105 | async function sendVideoAbuse (byAccount: AccountInstance, videoAbuse: VideoAbuseInstance, video: VideoInstance, t: Transaction) { |
93 | fromAccount: AccountInstance, | ||
94 | videoAbuse: VideoAbuseInstance, | ||
95 | video: VideoInstance, | ||
96 | t: Sequelize.Transaction | ||
97 | ) { | ||
98 | const url = getActivityPubUrl('videoAbuse', videoAbuse.id.toString()) | 106 | const url = getActivityPubUrl('videoAbuse', videoAbuse.id.toString()) |
99 | const data = await createActivityData(url, fromAccount, videoAbuse.toActivityPubObject()) | 107 | const data = await createActivityData(url, byAccount, videoAbuse.toActivityPubObject()) |
100 | 108 | ||
101 | return unicastTo(data, video.VideoChannel.Account.sharedInboxUrl, t) | 109 | return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t) |
102 | } | 110 | } |
103 | 111 | ||
104 | async function sendAccept (fromAccount: AccountInstance, toAccount: AccountInstance, t: Sequelize.Transaction) { | 112 | async function sendAccept (byAccount: AccountInstance, toAccount: AccountInstance, t: Transaction) { |
105 | const data = await acceptActivityData(fromAccount) | 113 | const data = await acceptActivityData(byAccount) |
106 | 114 | ||
107 | return unicastTo(data, toAccount.inboxUrl, t) | 115 | return unicastTo(data, byAccount, toAccount.inboxUrl, t) |
108 | } | 116 | } |
109 | 117 | ||
110 | async function sendFollow (fromAccount: AccountInstance, toAccount: AccountInstance, t: Sequelize.Transaction) { | 118 | async function sendFollow (byAccount: AccountInstance, toAccount: AccountInstance, t: Transaction) { |
111 | const data = await followActivityData(toAccount.url, fromAccount) | 119 | const data = await followActivityData(toAccount.url, byAccount) |
112 | 120 | ||
113 | return unicastTo(data, toAccount.inboxUrl, t) | 121 | return unicastTo(data, byAccount, toAccount.inboxUrl, t) |
114 | } | 122 | } |
115 | 123 | ||
116 | // --------------------------------------------------------------------------- | 124 | // --------------------------------------------------------------------------- |
@@ -132,7 +140,7 @@ export { | |||
132 | 140 | ||
133 | // --------------------------------------------------------------------------- | 141 | // --------------------------------------------------------------------------- |
134 | 142 | ||
135 | async function broadcastToFollowers (data: any, toAccountFollowers: AccountInstance[], t: Sequelize.Transaction) { | 143 | async function broadcastToFollowers (data: any, byAccount: AccountInstance, toAccountFollowers: AccountInstance[], t: Transaction) { |
136 | const toAccountFollowerIds = toAccountFollowers.map(a => a.id) | 144 | const toAccountFollowerIds = toAccountFollowers.map(a => a.id) |
137 | const result = await db.AccountFollow.listAcceptedFollowerSharedInboxUrls(toAccountFollowerIds) | 145 | const result = await db.AccountFollow.listAcceptedFollowerSharedInboxUrls(toAccountFollowerIds) |
138 | if (result.data.length === 0) { | 146 | if (result.data.length === 0) { |
@@ -142,25 +150,21 @@ async function broadcastToFollowers (data: any, toAccountFollowers: AccountInsta | |||
142 | 150 | ||
143 | const jobPayload = { | 151 | const jobPayload = { |
144 | uris: result.data, | 152 | uris: result.data, |
153 | signatureAccountId: byAccount.id, | ||
145 | body: data | 154 | body: data |
146 | } | 155 | } |
147 | 156 | ||
148 | return httpRequestJobScheduler.createJob(t, 'httpRequestBroadcastHandler', jobPayload) | 157 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) |
149 | } | 158 | } |
150 | 159 | ||
151 | async function unicastTo (data: any, toAccountUrl: string, t: Sequelize.Transaction) { | 160 | async function unicastTo (data: any, byAccount: AccountInstance, toAccountUrl: string, t: Transaction) { |
152 | const jobPayload = { | 161 | const jobPayload = { |
153 | uris: [ toAccountUrl ], | 162 | uris: [ toAccountUrl ], |
163 | signatureAccountId: byAccount.id, | ||
154 | body: data | 164 | body: data |
155 | } | 165 | } |
156 | 166 | ||
157 | return httpRequestJobScheduler.createJob(t, 'httpRequestUnicastHandler', jobPayload) | 167 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) |
158 | } | ||
159 | |||
160 | function buildSignedActivity (byAccount: AccountInstance, data: Object) { | ||
161 | const activity = activityPubContextify(data) | ||
162 | |||
163 | return signObject(byAccount, activity) as Promise<Activity> | ||
164 | } | 168 | } |
165 | 169 | ||
166 | async function getPublicActivityTo (account: AccountInstance) { | 170 | async function getPublicActivityTo (account: AccountInstance) { |
@@ -169,9 +173,9 @@ async function getPublicActivityTo (account: AccountInstance) { | |||
169 | return inboxUrls.concat('https://www.w3.org/ns/activitystreams#Public') | 173 | return inboxUrls.concat('https://www.w3.org/ns/activitystreams#Public') |
170 | } | 174 | } |
171 | 175 | ||
172 | async function createActivityData (url: string, byAccount: AccountInstance, object: any, raw = false) { | 176 | async function createActivityData (url: string, byAccount: AccountInstance, object: any) { |
173 | const to = await getPublicActivityTo(byAccount) | 177 | const to = await getPublicActivityTo(byAccount) |
174 | const base = { | 178 | const activity: ActivityCreate = { |
175 | type: 'Create', | 179 | type: 'Create', |
176 | id: url, | 180 | id: url, |
177 | actor: byAccount.url, | 181 | actor: byAccount.url, |
@@ -179,14 +183,12 @@ async function createActivityData (url: string, byAccount: AccountInstance, obje | |||
179 | object | 183 | object |
180 | } | 184 | } |
181 | 185 | ||
182 | if (raw === true) return base | 186 | return activity |
183 | |||
184 | return buildSignedActivity(byAccount, base) | ||
185 | } | 187 | } |
186 | 188 | ||
187 | async function updateActivityData (url: string, byAccount: AccountInstance, object: any) { | 189 | async function updateActivityData (url: string, byAccount: AccountInstance, object: any) { |
188 | const to = await getPublicActivityTo(byAccount) | 190 | const to = await getPublicActivityTo(byAccount) |
189 | const base = { | 191 | const activity: ActivityUpdate = { |
190 | type: 'Update', | 192 | type: 'Update', |
191 | id: url, | 193 | id: url, |
192 | actor: byAccount.url, | 194 | actor: byAccount.url, |
@@ -194,22 +196,22 @@ async function updateActivityData (url: string, byAccount: AccountInstance, obje | |||
194 | object | 196 | object |
195 | } | 197 | } |
196 | 198 | ||
197 | return buildSignedActivity(byAccount, base) | 199 | return activity |
198 | } | 200 | } |
199 | 201 | ||
200 | async function deleteActivityData (url: string, byAccount: AccountInstance) { | 202 | async function deleteActivityData (url: string, byAccount: AccountInstance) { |
201 | const base = { | 203 | const activity: ActivityDelete = { |
202 | type: 'Delete', | 204 | type: 'Delete', |
203 | id: url, | 205 | id: url, |
204 | actor: byAccount.url | 206 | actor: byAccount.url |
205 | } | 207 | } |
206 | 208 | ||
207 | return buildSignedActivity(byAccount, base) | 209 | return activity |
208 | } | 210 | } |
209 | 211 | ||
210 | async function addActivityData (url: string, byAccount: AccountInstance, target: string, object: any, raw = false) { | 212 | async function addActivityData (url: string, byAccount: AccountInstance, target: string, object: any) { |
211 | const to = await getPublicActivityTo(byAccount) | 213 | const to = await getPublicActivityTo(byAccount) |
212 | const base = { | 214 | const activity: ActivityAdd = { |
213 | type: 'Add', | 215 | type: 'Add', |
214 | id: url, | 216 | id: url, |
215 | actor: byAccount.url, | 217 | actor: byAccount.url, |
@@ -218,39 +220,37 @@ async function addActivityData (url: string, byAccount: AccountInstance, target: | |||
218 | target | 220 | target |
219 | } | 221 | } |
220 | 222 | ||
221 | if (raw === true) return base | 223 | return activity |
222 | |||
223 | return buildSignedActivity(byAccount, base) | ||
224 | } | 224 | } |
225 | 225 | ||
226 | async function announceActivityData (url: string, byAccount: AccountInstance, object: any) { | 226 | async function announceActivityData (url: string, byAccount: AccountInstance, object: any) { |
227 | const base = { | 227 | const activity = { |
228 | type: 'Announce', | 228 | type: 'Announce', |
229 | id: url, | 229 | id: url, |
230 | actor: byAccount.url, | 230 | actor: byAccount.url, |
231 | object | 231 | object |
232 | } | 232 | } |
233 | 233 | ||
234 | return buildSignedActivity(byAccount, base) | 234 | return activity |
235 | } | 235 | } |
236 | 236 | ||
237 | async function followActivityData (url: string, byAccount: AccountInstance) { | 237 | async function followActivityData (url: string, byAccount: AccountInstance) { |
238 | const base = { | 238 | const activity: ActivityFollow = { |
239 | type: 'Follow', | 239 | type: 'Follow', |
240 | id: byAccount.url, | 240 | id: byAccount.url, |
241 | actor: byAccount.url, | 241 | actor: byAccount.url, |
242 | object: url | 242 | object: url |
243 | } | 243 | } |
244 | 244 | ||
245 | return buildSignedActivity(byAccount, base) | 245 | return activity |
246 | } | 246 | } |
247 | 247 | ||
248 | async function acceptActivityData (byAccount: AccountInstance) { | 248 | async function acceptActivityData (byAccount: AccountInstance) { |
249 | const base = { | 249 | const activity: ActivityAccept = { |
250 | type: 'Accept', | 250 | type: 'Accept', |
251 | id: byAccount.url, | 251 | id: byAccount.url, |
252 | actor: byAccount.url | 252 | actor: byAccount.url |
253 | } | 253 | } |
254 | 254 | ||
255 | return buildSignedActivity(byAccount, base) | 255 | return activity |
256 | } | 256 | } |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts new file mode 100644 index 000000000..111fc88a4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import { logger } from '../../../helpers' | ||
2 | import { buildSignedActivity } from '../../../helpers/activitypub' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { database as db } from '../../../initializers' | ||
5 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | ||
6 | |||
7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | ||
8 | logger.info('Processing ActivityPub broadcast in job %d.', jobId) | ||
9 | |||
10 | const accountSignature = await db.Account.load(payload.signatureAccountId) | ||
11 | if (!accountSignature) throw new Error('Unknown signature account id.') | ||
12 | |||
13 | const signedBody = await buildSignedActivity(accountSignature, payload.body) | ||
14 | |||
15 | const options = { | ||
16 | method: 'POST', | ||
17 | uri: '', | ||
18 | json: signedBody | ||
19 | } | ||
20 | |||
21 | for (const uri of payload.uris) { | ||
22 | options.uri = uri | ||
23 | await doRequest(options) | ||
24 | } | ||
25 | } | ||
26 | |||
27 | function onError (err: Error, jobId: number) { | ||
28 | logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) | ||
29 | return Promise.resolve() | ||
30 | } | ||
31 | |||
32 | function onSuccess (jobId: number) { | ||
33 | logger.info('Job %d is a success.', jobId) | ||
34 | return Promise.resolve() | ||
35 | } | ||
36 | |||
37 | // --------------------------------------------------------------------------- | ||
38 | |||
39 | export { | ||
40 | process, | ||
41 | onError, | ||
42 | onSuccess | ||
43 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts new file mode 100644 index 000000000..e4f6c94a5 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts | |||
@@ -0,0 +1,23 @@ | |||
1 | import { JobScheduler, JobHandler } from '../job-scheduler' | ||
2 | |||
3 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' | ||
4 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' | ||
5 | import { JobCategory } from '../../../../shared' | ||
6 | |||
7 | type ActivityPubHttpPayload = { | ||
8 | uris: string[] | ||
9 | signatureAccountId: number | ||
10 | body: any | ||
11 | } | ||
12 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { | ||
13 | activitypubHttpBroadcastHandler, | ||
14 | activitypubHttpUnicastHandler | ||
15 | } | ||
16 | const jobCategory: JobCategory = 'activitypub-http' | ||
17 | |||
18 | const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
19 | |||
20 | export { | ||
21 | ActivityPubHttpPayload, | ||
22 | activitypubHttpJobScheduler | ||
23 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts new file mode 100644 index 000000000..8d3b755ad --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts | |||
@@ -0,0 +1,40 @@ | |||
1 | import { logger } from '../../../helpers' | ||
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | ||
4 | import { database as db } from '../../../initializers/database' | ||
5 | import { buildSignedActivity } from '../../../helpers/activitypub' | ||
6 | |||
7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | ||
8 | logger.info('Processing ActivityPub unicast in job %d.', jobId) | ||
9 | |||
10 | const accountSignature = await db.Account.load(payload.signatureAccountId) | ||
11 | if (!accountSignature) throw new Error('Unknown signature account id.') | ||
12 | |||
13 | const signedBody = await buildSignedActivity(accountSignature, payload.body) | ||
14 | const uri = payload.uris[0] | ||
15 | const options = { | ||
16 | method: 'POST', | ||
17 | uri, | ||
18 | json: signedBody | ||
19 | } | ||
20 | |||
21 | await doRequest(options) | ||
22 | } | ||
23 | |||
24 | function onError (err: Error, jobId: number) { | ||
25 | logger.error('Error when sending ActivityPub request in job %d.', jobId, err) | ||
26 | return Promise.resolve() | ||
27 | } | ||
28 | |||
29 | function onSuccess (jobId: number) { | ||
30 | logger.info('Job %d is a success.', jobId) | ||
31 | return Promise.resolve() | ||
32 | } | ||
33 | |||
34 | // --------------------------------------------------------------------------- | ||
35 | |||
36 | export { | ||
37 | process, | ||
38 | onError, | ||
39 | onSuccess | ||
40 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts new file mode 100644 index 000000000..ad8f527b4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './activitypub-http-job-scheduler' | |||
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts deleted file mode 100644 index ccb008e4d..000000000 --- a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts +++ /dev/null | |||
@@ -1,36 +0,0 @@ | |||
1 | import { logger } from '../../../helpers' | ||
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { HTTPRequestPayload } from './http-request-job-scheduler' | ||
4 | |||
5 | async function process (payload: HTTPRequestPayload, jobId: number) { | ||
6 | logger.info('Processing broadcast in job %d.', jobId) | ||
7 | |||
8 | const options = { | ||
9 | method: 'POST', | ||
10 | uri: '', | ||
11 | json: payload.body | ||
12 | } | ||
13 | |||
14 | for (const uri of payload.uris) { | ||
15 | options.uri = uri | ||
16 | await doRequest(options) | ||
17 | } | ||
18 | } | ||
19 | |||
20 | function onError (err: Error, jobId: number) { | ||
21 | logger.error('Error when broadcasting request in job %d.', jobId, err) | ||
22 | return Promise.resolve() | ||
23 | } | ||
24 | |||
25 | function onSuccess (jobId: number) { | ||
26 | logger.info('Job %d is a success.', jobId) | ||
27 | return Promise.resolve() | ||
28 | } | ||
29 | |||
30 | // --------------------------------------------------------------------------- | ||
31 | |||
32 | export { | ||
33 | process, | ||
34 | onError, | ||
35 | onSuccess | ||
36 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts deleted file mode 100644 index ad3349866..000000000 --- a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts +++ /dev/null | |||
@@ -1,22 +0,0 @@ | |||
1 | import { JobScheduler, JobHandler } from '../job-scheduler' | ||
2 | |||
3 | import * as httpRequestBroadcastHandler from './http-request-broadcast-handler' | ||
4 | import * as httpRequestUnicastHandler from './http-request-unicast-handler' | ||
5 | import { JobCategory } from '../../../../shared' | ||
6 | |||
7 | type HTTPRequestPayload = { | ||
8 | uris: string[] | ||
9 | body: any | ||
10 | } | ||
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<HTTPRequestPayload, void> } = { | ||
12 | httpRequestBroadcastHandler, | ||
13 | httpRequestUnicastHandler | ||
14 | } | ||
15 | const jobCategory: JobCategory = 'http-request' | ||
16 | |||
17 | const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
18 | |||
19 | export { | ||
20 | HTTPRequestPayload, | ||
21 | httpRequestJobScheduler | ||
22 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts deleted file mode 100644 index 9e4e73891..000000000 --- a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts +++ /dev/null | |||
@@ -1,34 +0,0 @@ | |||
1 | import { logger } from '../../../helpers' | ||
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { HTTPRequestPayload } from './http-request-job-scheduler' | ||
4 | |||
5 | async function process (payload: HTTPRequestPayload, jobId: number) { | ||
6 | logger.info('Processing unicast in job %d.', jobId) | ||
7 | |||
8 | const uri = payload.uris[0] | ||
9 | const options = { | ||
10 | method: 'POST', | ||
11 | uri, | ||
12 | json: payload.body | ||
13 | } | ||
14 | |||
15 | await doRequest(options) | ||
16 | } | ||
17 | |||
18 | function onError (err: Error, jobId: number) { | ||
19 | logger.error('Error when sending request in job %d.', jobId, err) | ||
20 | return Promise.resolve() | ||
21 | } | ||
22 | |||
23 | function onSuccess (jobId: number) { | ||
24 | logger.info('Job %d is a success.', jobId) | ||
25 | return Promise.resolve() | ||
26 | } | ||
27 | |||
28 | // --------------------------------------------------------------------------- | ||
29 | |||
30 | export { | ||
31 | process, | ||
32 | onError, | ||
33 | onSuccess | ||
34 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/index.ts b/server/lib/jobs/http-request-job-scheduler/index.ts deleted file mode 100644 index 4d2573296..000000000 --- a/server/lib/jobs/http-request-job-scheduler/index.ts +++ /dev/null | |||
@@ -1 +0,0 @@ | |||
1 | export * from './http-request-job-scheduler' | ||
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts index a92743707..394264ec1 100644 --- a/server/lib/jobs/index.ts +++ b/server/lib/jobs/index.ts | |||
@@ -1,2 +1,2 @@ | |||
1 | export * from './http-request-job-scheduler' | 1 | export * from './activitypub-http-job-scheduler' |
2 | export * from './transcoding-job-scheduler' | 2 | export * from './transcoding-job-scheduler' |