From 5350fd8e5b2b2d017b16d97828893a8a4a40bd89 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 18 Apr 2018 15:32:40 +0200 Subject: Move server follow in the job queue It helps to track follow errors --- .../lib/job-queue/handlers/activitypub-follow.ts | 68 ++++++++++++++++++++++ server/lib/job-queue/job-queue.ts | 5 +- 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 server/lib/job-queue/handlers/activitypub-follow.ts (limited to 'server/lib') diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts new file mode 100644 index 000000000..6764a4037 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-follow.ts @@ -0,0 +1,68 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { getServerActor } from '../../../helpers/utils' +import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' +import { sendFollow } from '../../activitypub/send' +import { sanitizeHost } from '../../../helpers/core-utils' +import { loadActorUrlOrGetFromWebfinger } from '../../../helpers/webfinger' +import { getOrCreateActorAndServerAndModel } from '../../activitypub/actor' +import { retryTransactionWrapper } from '../../../helpers/database-utils' +import { ActorFollowModel } from '../../../models/activitypub/actor-follow' +import { ActorModel } from '../../../models/activitypub/actor' + +export type ActivitypubFollowPayload = { + host: string +} + +async function processActivityPubFollow (job: kue.Job) { + const payload = job.data as ActivitypubFollowPayload + const host = payload.host + + logger.info('Processing ActivityPub follow in job %d.', job.id) + + const sanitizedHost = sanitizeHost(host, REMOTE_SCHEME.HTTP) + + const actorUrl = await loadActorUrlOrGetFromWebfinger(SERVER_ACTOR_NAME, sanitizedHost) + const targetActor = await getOrCreateActorAndServerAndModel(actorUrl) + + const fromActor = await getServerActor() + const options = { + arguments: [ fromActor, targetActor ], + errorMessage: 'Cannot follow with many retries.' + } + + return retryTransactionWrapper(follow, options) +} +// --------------------------------------------------------------------------- + +export { + processActivityPubFollow +} + +// --------------------------------------------------------------------------- + +function follow (fromActor: ActorModel, targetActor: ActorModel) { + if (fromActor.id === targetActor.id) { + throw new Error('Follower is the same than target actor.') + } + + return sequelizeTypescript.transaction(async t => { + const [ actorFollow ] = await ActorFollowModel.findOrCreate({ + where: { + actorId: fromActor.id, + targetActorId: targetActor.id + }, + defaults: { + state: 'pending', + actorId: fromActor.id, + targetActorId: targetActor.id + }, + transaction: t + }) + actorFollow.ActorFollowing = targetActor + actorFollow.ActorFollower = fromActor + + // Send a notification to remote server + await sendFollow(actorFollow) + }) +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 1dc28755e..bf40a9206 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -8,11 +8,13 @@ import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './ import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' import { processVideoFile, VideoFilePayload } from './handlers/video-file' +import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | + { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } @@ -20,6 +22,7 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'activitypub-follow': processActivityPubFollow, 'video-file': processVideoFile, 'email': processEmail } @@ -50,7 +53,7 @@ class JobQueue { } }) - this.jobQueue.setMaxListeners(15) + this.jobQueue.setMaxListeners(20) this.jobQueue.on('error', err => { logger.error('Error in job queue.', { err }) -- cgit v1.2.3 From f55e5a7bf81c2c27db1514273e3366511aabf4ae Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 18 Apr 2018 16:04:49 +0200 Subject: Process broadcast requests in parallel --- .../job-queue/handlers/activitypub-http-broadcast.ts | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'server/lib') diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 78878fc01..38b8393f4 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,8 +1,10 @@ import * as kue from 'kue' +import * as Bluebird from 'bluebird' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' +import { BROADCAST_CONCURRENCY } from '../../../initializers' export type ActivitypubHttpBroadcastPayload = { uris: string[] @@ -28,16 +30,11 @@ async function processActivityPubHttpBroadcast (job: kue.Job) { const badUrls: string[] = [] const goodUrls: string[] = [] - for (const uri of payload.uris) { - options.uri = uri - - try { - await doRequest(options) - goodUrls.push(uri) - } catch (err) { - badUrls.push(uri) - } - } + await Bluebird.map(payload.uris, uri => { + return doRequest(Object.assign({}, options, { uri })) + .then(() => goodUrls.push(uri)) + .catch(() => badUrls.push(uri)) + }, { concurrency: BROADCAST_CONCURRENCY }) return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) } -- cgit v1.2.3