From afffe98839db7ccbfa9fb8b7d1413b97900fdc73 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 17 Nov 2017 11:35:10 +0100 Subject: Speed up activity pub http requests --- server/lib/activitypub/send-request.ts | 170 ++++++++++----------- .../activitypub-http-broadcast-handler.ts | 43 ++++++ .../activitypub-http-job-scheduler.ts | 23 +++ .../activitypub-http-unicast-handler.ts | 40 +++++ .../jobs/activitypub-http-job-scheduler/index.ts | 1 + .../http-request-broadcast-handler.ts | 36 ----- .../http-request-job-scheduler.ts | 22 --- .../http-request-unicast-handler.ts | 34 ----- .../lib/jobs/http-request-job-scheduler/index.ts | 1 - server/lib/jobs/index.ts | 2 +- 10 files changed, 193 insertions(+), 179 deletions(-) create mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts create mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts create mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts create mode 100644 server/lib/jobs/activitypub-http-job-scheduler/index.ts delete mode 100644 server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts delete mode 100644 server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts delete mode 100644 server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts delete mode 100644 server/lib/jobs/http-request-job-scheduler/index.ts (limited to 'server/lib') diff --git a/server/lib/activitypub/send-request.ts b/server/lib/activitypub/send-request.ts index abc1b598d..8d013fa87 100644 --- a/server/lib/activitypub/send-request.ts +++ b/server/lib/activitypub/send-request.ts @@ -1,116 +1,124 @@ -import * as Sequelize from 'sequelize' - -import { database as db } from '../../initializers' +import { Transaction } from 'sequelize' import { - AccountInstance, - VideoInstance, - VideoChannelInstance -} from '../../models' -import { httpRequestJobScheduler } from '../jobs' -import { signObject, activityPubContextify } from '../../helpers' -import { Activity, VideoAbuseObject } from '../../../shared' -import { VideoAbuseInstance } from '../../models/video/video-abuse-interface' + ActivityAccept, + ActivityAdd, + ActivityCreate, + ActivityDelete, + ActivityFollow, + ActivityUpdate +} from '../../../shared/models/activitypub/activity' import { getActivityPubUrl } from '../../helpers/activitypub' import { logger } from '../../helpers/logger' +import { database as db } from '../../initializers' +import { AccountInstance, VideoChannelInstance, VideoInstance } from '../../models' +import { VideoAbuseInstance } from '../../models/video/video-abuse-interface' +import { activitypubHttpJobScheduler } from '../jobs' + +async function sendCreateVideoChannel (videoChannel: VideoChannelInstance, t: Transaction) { + const byAccount = videoChannel.Account -async function sendCreateVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { const videoChannelObject = videoChannel.toActivityPubObject() - const data = await createActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) + const data = await createActivityData(videoChannel.url, byAccount, videoChannelObject) - return broadcastToFollowers(data, [ videoChannel.Account ], t) + return broadcastToFollowers(data, byAccount, [ byAccount ], t) } -async function sendUpdateVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { +async function sendUpdateVideoChannel (videoChannel: VideoChannelInstance, t: Transaction) { + const byAccount = videoChannel.Account + const videoChannelObject = videoChannel.toActivityPubObject() - const data = await updateActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) + const data = await updateActivityData(videoChannel.url, byAccount, videoChannelObject) const accountsInvolved = await db.VideoChannelShare.loadAccountsByShare(videoChannel.id) - accountsInvolved.push(videoChannel.Account) + accountsInvolved.push(byAccount) - return broadcastToFollowers(data, accountsInvolved, t) + return broadcastToFollowers(data, byAccount, accountsInvolved, t) } -async function sendDeleteVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { - const data = await deleteActivityData(videoChannel.url, videoChannel.Account) +async function sendDeleteVideoChannel (videoChannel: VideoChannelInstance, t: Transaction) { + const byAccount = videoChannel.Account + + const data = await deleteActivityData(videoChannel.url, byAccount) const accountsInvolved = await db.VideoChannelShare.loadAccountsByShare(videoChannel.id) - accountsInvolved.push(videoChannel.Account) + accountsInvolved.push(byAccount) - return broadcastToFollowers(data, accountsInvolved, t) + return broadcastToFollowers(data, byAccount, accountsInvolved, t) } -async function sendAddVideo (video: VideoInstance, t: Sequelize.Transaction) { +async function sendAddVideo (video: VideoInstance, t: Transaction) { + const byAccount = video.VideoChannel.Account + const videoObject = video.toActivityPubObject() - const data = await addActivityData(video.url, video.VideoChannel.Account, video.VideoChannel.url, videoObject) + const data = await addActivityData(video.url, byAccount, video.VideoChannel.url, videoObject) - return broadcastToFollowers(data, [ video.VideoChannel.Account ], t) + return broadcastToFollowers(data, byAccount, [ byAccount ], t) } -async function sendUpdateVideo (video: VideoInstance, t: Sequelize.Transaction) { +async function sendUpdateVideo (video: VideoInstance, t: Transaction) { + const byAccount = video.VideoChannel.Account + const videoObject = video.toActivityPubObject() - const data = await updateActivityData(video.url, video.VideoChannel.Account, videoObject) + const data = await updateActivityData(video.url, byAccount, videoObject) const accountsInvolved = await db.VideoShare.loadAccountsByShare(video.id) - accountsInvolved.push(video.VideoChannel.Account) + accountsInvolved.push(byAccount) - return broadcastToFollowers(data, accountsInvolved, t) + return broadcastToFollowers(data, byAccount, accountsInvolved, t) } -async function sendDeleteVideo (video: VideoInstance, t: Sequelize.Transaction) { - const data = await deleteActivityData(video.url, video.VideoChannel.Account) +async function sendDeleteVideo (video: VideoInstance, t: Transaction) { + const byAccount = video.VideoChannel.Account + + const data = await deleteActivityData(video.url, byAccount) const accountsInvolved = await db.VideoShare.loadAccountsByShare(video.id) - accountsInvolved.push(video.VideoChannel.Account) + accountsInvolved.push(byAccount) - return broadcastToFollowers(data, accountsInvolved, t) + return broadcastToFollowers(data, byAccount, accountsInvolved, t) } -async function sendDeleteAccount (account: AccountInstance, t: Sequelize.Transaction) { +async function sendDeleteAccount (account: AccountInstance, t: Transaction) { const data = await deleteActivityData(account.url, account) - return broadcastToFollowers(data, [ account ], t) + return broadcastToFollowers(data, account, [ account ], t) } -async function sendVideoChannelAnnounce (byAccount: AccountInstance, videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { +async function sendVideoChannelAnnounce (byAccount: AccountInstance, videoChannel: VideoChannelInstance, t: Transaction) { const url = getActivityPubUrl('videoChannel', videoChannel.uuid) + '#announce' - const announcedActivity = await createActivityData(url, videoChannel.Account, videoChannel.toActivityPubObject(), true) + const announcedActivity = await createActivityData(url, videoChannel.Account, videoChannel.toActivityPubObject()) const data = await announceActivityData(url, byAccount, announcedActivity) - return broadcastToFollowers(data, [ byAccount ], t) + return broadcastToFollowers(data, byAccount, [ byAccount ], t) } -async function sendVideoAnnounce (byAccount: AccountInstance, video: VideoInstance, t: Sequelize.Transaction) { +async function sendVideoAnnounce (byAccount: AccountInstance, video: VideoInstance, t: Transaction) { const url = getActivityPubUrl('video', video.uuid) + '#announce' const videoChannel = video.VideoChannel - const announcedActivity = await addActivityData(url, videoChannel.Account, videoChannel.url, video.toActivityPubObject(), true) + const announcedActivity = await addActivityData(url, videoChannel.Account, videoChannel.url, video.toActivityPubObject()) const data = await announceActivityData(url, byAccount, announcedActivity) - return broadcastToFollowers(data, [ byAccount ], t) + return broadcastToFollowers(data, byAccount, [ byAccount ], t) } -async function sendVideoAbuse ( - fromAccount: AccountInstance, - videoAbuse: VideoAbuseInstance, - video: VideoInstance, - t: Sequelize.Transaction -) { +async function sendVideoAbuse (byAccount: AccountInstance, videoAbuse: VideoAbuseInstance, video: VideoInstance, t: Transaction) { const url = getActivityPubUrl('videoAbuse', videoAbuse.id.toString()) - const data = await createActivityData(url, fromAccount, videoAbuse.toActivityPubObject()) + const data = await createActivityData(url, byAccount, videoAbuse.toActivityPubObject()) - return unicastTo(data, video.VideoChannel.Account.sharedInboxUrl, t) + return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t) } -async function sendAccept (fromAccount: AccountInstance, toAccount: AccountInstance, t: Sequelize.Transaction) { - const data = await acceptActivityData(fromAccount) +async function sendAccept (byAccount: AccountInstance, toAccount: AccountInstance, t: Transaction) { + const data = await acceptActivityData(byAccount) - return unicastTo(data, toAccount.inboxUrl, t) + return unicastTo(data, byAccount, toAccount.inboxUrl, t) } -async function sendFollow (fromAccount: AccountInstance, toAccount: AccountInstance, t: Sequelize.Transaction) { - const data = await followActivityData(toAccount.url, fromAccount) +async function sendFollow (byAccount: AccountInstance, toAccount: AccountInstance, t: Transaction) { + const data = await followActivityData(toAccount.url, byAccount) - return unicastTo(data, toAccount.inboxUrl, t) + return unicastTo(data, byAccount, toAccount.inboxUrl, t) } // --------------------------------------------------------------------------- @@ -132,7 +140,7 @@ export { // --------------------------------------------------------------------------- -async function broadcastToFollowers (data: any, toAccountFollowers: AccountInstance[], t: Sequelize.Transaction) { +async function broadcastToFollowers (data: any, byAccount: AccountInstance, toAccountFollowers: AccountInstance[], t: Transaction) { const toAccountFollowerIds = toAccountFollowers.map(a => a.id) const result = await db.AccountFollow.listAcceptedFollowerSharedInboxUrls(toAccountFollowerIds) if (result.data.length === 0) { @@ -142,25 +150,21 @@ async function broadcastToFollowers (data: any, toAccountFollowers: AccountInsta const jobPayload = { uris: result.data, + signatureAccountId: byAccount.id, body: data } - return httpRequestJobScheduler.createJob(t, 'httpRequestBroadcastHandler', jobPayload) + return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) } -async function unicastTo (data: any, toAccountUrl: string, t: Sequelize.Transaction) { +async function unicastTo (data: any, byAccount: AccountInstance, toAccountUrl: string, t: Transaction) { const jobPayload = { uris: [ toAccountUrl ], + signatureAccountId: byAccount.id, body: data } - return httpRequestJobScheduler.createJob(t, 'httpRequestUnicastHandler', jobPayload) -} - -function buildSignedActivity (byAccount: AccountInstance, data: Object) { - const activity = activityPubContextify(data) - - return signObject(byAccount, activity) as Promise + return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) } async function getPublicActivityTo (account: AccountInstance) { @@ -169,9 +173,9 @@ async function getPublicActivityTo (account: AccountInstance) { return inboxUrls.concat('https://www.w3.org/ns/activitystreams#Public') } -async function createActivityData (url: string, byAccount: AccountInstance, object: any, raw = false) { +async function createActivityData (url: string, byAccount: AccountInstance, object: any) { const to = await getPublicActivityTo(byAccount) - const base = { + const activity: ActivityCreate = { type: 'Create', id: url, actor: byAccount.url, @@ -179,14 +183,12 @@ async function createActivityData (url: string, byAccount: AccountInstance, obje object } - if (raw === true) return base - - return buildSignedActivity(byAccount, base) + return activity } async function updateActivityData (url: string, byAccount: AccountInstance, object: any) { const to = await getPublicActivityTo(byAccount) - const base = { + const activity: ActivityUpdate = { type: 'Update', id: url, actor: byAccount.url, @@ -194,22 +196,22 @@ async function updateActivityData (url: string, byAccount: AccountInstance, obje object } - return buildSignedActivity(byAccount, base) + return activity } async function deleteActivityData (url: string, byAccount: AccountInstance) { - const base = { + const activity: ActivityDelete = { type: 'Delete', id: url, actor: byAccount.url } - return buildSignedActivity(byAccount, base) + return activity } -async function addActivityData (url: string, byAccount: AccountInstance, target: string, object: any, raw = false) { +async function addActivityData (url: string, byAccount: AccountInstance, target: string, object: any) { const to = await getPublicActivityTo(byAccount) - const base = { + const activity: ActivityAdd = { type: 'Add', id: url, actor: byAccount.url, @@ -218,39 +220,37 @@ async function addActivityData (url: string, byAccount: AccountInstance, target: target } - if (raw === true) return base - - return buildSignedActivity(byAccount, base) + return activity } async function announceActivityData (url: string, byAccount: AccountInstance, object: any) { - const base = { + const activity = { type: 'Announce', id: url, actor: byAccount.url, object } - return buildSignedActivity(byAccount, base) + return activity } async function followActivityData (url: string, byAccount: AccountInstance) { - const base = { + const activity: ActivityFollow = { type: 'Follow', id: byAccount.url, actor: byAccount.url, object: url } - return buildSignedActivity(byAccount, base) + return activity } async function acceptActivityData (byAccount: AccountInstance) { - const base = { + const activity: ActivityAccept = { type: 'Accept', id: byAccount.url, actor: byAccount.url } - return buildSignedActivity(byAccount, base) + return activity } diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts new file mode 100644 index 000000000..111fc88a4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts @@ -0,0 +1,43 @@ +import { logger } from '../../../helpers' +import { buildSignedActivity } from '../../../helpers/activitypub' +import { doRequest } from '../../../helpers/requests' +import { database as db } from '../../../initializers' +import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' + +async function process (payload: ActivityPubHttpPayload, jobId: number) { + logger.info('Processing ActivityPub broadcast in job %d.', jobId) + + const accountSignature = await db.Account.load(payload.signatureAccountId) + if (!accountSignature) throw new Error('Unknown signature account id.') + + const signedBody = await buildSignedActivity(accountSignature, payload.body) + + const options = { + method: 'POST', + uri: '', + json: signedBody + } + + for (const uri of payload.uris) { + options.uri = uri + await doRequest(options) + } +} + +function onError (err: Error, jobId: number) { + logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) + return Promise.resolve() +} + +function onSuccess (jobId: number) { + logger.info('Job %d is a success.', jobId) + return Promise.resolve() +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts new file mode 100644 index 000000000..e4f6c94a5 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts @@ -0,0 +1,23 @@ +import { JobScheduler, JobHandler } from '../job-scheduler' + +import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' +import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' +import { JobCategory } from '../../../../shared' + +type ActivityPubHttpPayload = { + uris: string[] + signatureAccountId: number + body: any +} +const jobHandlers: { [ handlerName: string ]: JobHandler } = { + activitypubHttpBroadcastHandler, + activitypubHttpUnicastHandler +} +const jobCategory: JobCategory = 'activitypub-http' + +const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) + +export { + ActivityPubHttpPayload, + activitypubHttpJobScheduler +} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts new file mode 100644 index 000000000..8d3b755ad --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts @@ -0,0 +1,40 @@ +import { logger } from '../../../helpers' +import { doRequest } from '../../../helpers/requests' +import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' +import { database as db } from '../../../initializers/database' +import { buildSignedActivity } from '../../../helpers/activitypub' + +async function process (payload: ActivityPubHttpPayload, jobId: number) { + logger.info('Processing ActivityPub unicast in job %d.', jobId) + + const accountSignature = await db.Account.load(payload.signatureAccountId) + if (!accountSignature) throw new Error('Unknown signature account id.') + + const signedBody = await buildSignedActivity(accountSignature, payload.body) + const uri = payload.uris[0] + const options = { + method: 'POST', + uri, + json: signedBody + } + + await doRequest(options) +} + +function onError (err: Error, jobId: number) { + logger.error('Error when sending ActivityPub request in job %d.', jobId, err) + return Promise.resolve() +} + +function onSuccess (jobId: number) { + logger.info('Job %d is a success.', jobId) + return Promise.resolve() +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts new file mode 100644 index 000000000..ad8f527b4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/index.ts @@ -0,0 +1 @@ +export * from './activitypub-http-job-scheduler' diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts deleted file mode 100644 index ccb008e4d..000000000 --- a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { logger } from '../../../helpers' -import { doRequest } from '../../../helpers/requests' -import { HTTPRequestPayload } from './http-request-job-scheduler' - -async function process (payload: HTTPRequestPayload, jobId: number) { - logger.info('Processing broadcast in job %d.', jobId) - - const options = { - method: 'POST', - uri: '', - json: payload.body - } - - for (const uri of payload.uris) { - options.uri = uri - await doRequest(options) - } -} - -function onError (err: Error, jobId: number) { - logger.error('Error when broadcasting request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts deleted file mode 100644 index ad3349866..000000000 --- a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { JobScheduler, JobHandler } from '../job-scheduler' - -import * as httpRequestBroadcastHandler from './http-request-broadcast-handler' -import * as httpRequestUnicastHandler from './http-request-unicast-handler' -import { JobCategory } from '../../../../shared' - -type HTTPRequestPayload = { - uris: string[] - body: any -} -const jobHandlers: { [ handlerName: string ]: JobHandler } = { - httpRequestBroadcastHandler, - httpRequestUnicastHandler -} -const jobCategory: JobCategory = 'http-request' - -const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) - -export { - HTTPRequestPayload, - httpRequestJobScheduler -} diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts deleted file mode 100644 index 9e4e73891..000000000 --- a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { logger } from '../../../helpers' -import { doRequest } from '../../../helpers/requests' -import { HTTPRequestPayload } from './http-request-job-scheduler' - -async function process (payload: HTTPRequestPayload, jobId: number) { - logger.info('Processing unicast in job %d.', jobId) - - const uri = payload.uris[0] - const options = { - method: 'POST', - uri, - json: payload.body - } - - await doRequest(options) -} - -function onError (err: Error, jobId: number) { - logger.error('Error when sending request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/http-request-job-scheduler/index.ts b/server/lib/jobs/http-request-job-scheduler/index.ts deleted file mode 100644 index 4d2573296..000000000 --- a/server/lib/jobs/http-request-job-scheduler/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './http-request-job-scheduler' diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts index a92743707..394264ec1 100644 --- a/server/lib/jobs/index.ts +++ b/server/lib/jobs/index.ts @@ -1,2 +1,2 @@ -export * from './http-request-job-scheduler' +export * from './activitypub-http-job-scheduler' export * from './transcoding-job-scheduler' -- cgit v1.2.3