From 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 25 Jan 2018 15:05:18 +0100 Subject: Move job queue to redis We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore. --- server/lib/activitypub/send/misc.ts | 26 ++++++++++++-------------- server/lib/activitypub/send/send-accept.ts | 5 ++--- server/lib/activitypub/send/send-announce.ts | 2 +- server/lib/activitypub/send/send-create.ts | 22 ++++++++++++++-------- server/lib/activitypub/send/send-follow.ts | 5 ++--- server/lib/activitypub/send/send-like.ts | 2 +- server/lib/activitypub/send/send-undo.ts | 14 ++++---------- 7 files changed, 36 insertions(+), 40 deletions(-) (limited to 'server/lib/activitypub/send') diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts index dc0d3de57..7a21f0c94 100644 --- a/server/lib/activitypub/send/misc.ts +++ b/server/lib/activitypub/send/misc.ts @@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { VideoModel } from '../../../models/video/video' import { VideoCommentModel } from '../../../models/video/video-comment' import { VideoShareModel } from '../../../models/video/video-share' -import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' +import { JobQueue } from '../../job-queue' async function forwardActivity ( activity: Activity, @@ -35,12 +35,11 @@ async function forwardActivity ( logger.debug('Creating forwarding job.', { uris }) - const jobPayload: ActivityPubHttpPayload = { + const payload = { uris, body: activity } - - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } async function broadcastToFollowers ( @@ -51,44 +50,43 @@ async function broadcastToFollowers ( actorsException: ActorModel[] = [] ) { const uris = await computeFollowerUris(toActorFollowers, actorsException, t) - return broadcastTo(uris, data, byActor, t) + return broadcastTo(uris, data, byActor) } async function broadcastToActors ( data: any, byActor: ActorModel, toActors: ActorModel[], - t: Transaction, actorsException: ActorModel[] = [] ) { const uris = await computeUris(toActors, actorsException) - return broadcastTo(uris, data, byActor, t) + return broadcastTo(uris, data, byActor) } -async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { +async function broadcastTo (uris: string[], data: any, byActor: ActorModel) { if (uris.length === 0) return undefined logger.debug('Creating broadcast job.', { uris }) - const jobPayload: ActivityPubHttpPayload = { + const payload = { uris, signatureActorId: byActor.id, body: data } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } -async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { +async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) { logger.debug('Creating unicast job.', { uri: toActorUrl }) - const jobPayload: ActivityPubHttpPayload = { - uris: [ toActorUrl ], + const payload = { + uri: toActorUrl, signatureActorId: byActor.id, body: data } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) } function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts index 4eaa329d9..064fd88d2 100644 --- a/server/lib/activitypub/send/send-accept.ts +++ b/server/lib/activitypub/send/send-accept.ts @@ -1,4 +1,3 @@ -import { Transaction } from 'sequelize' import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' @@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from import { unicastTo } from './misc' import { followActivityData } from './send-follow' -async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { +async function sendAccept (actorFollow: ActorFollowModel) { const follower = actorFollow.ActorFollower const me = actorFollow.ActorFollowing @@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { const url = getActorFollowAcceptActivityPubUrl(actorFollow) const data = acceptActivityData(url, me, followData) - return unicastTo(data, me, follower.inboxUrl, t) + return unicastTo(data, me, follower.inboxUrl) } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts index 578fbc630..93b5668d2 100644 --- a/server/lib/activitypub/send/send-announce.ts +++ b/server/lib/activitypub/send/send-announce.ts @@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) const data = await createActivityData(url, byActor, announcedActivity, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function announceActivityData ( diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 9db663be1..b92615e9b 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts @@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse' import { VideoCommentModel } from '../../../models/video/video-comment' import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' import { - audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, - getOriginVideoAudience, getOriginVideoCommentAudience, + audiencify, + broadcastToActors, + broadcastToFollowers, + getActorsInvolvedInVideo, + getAudience, + getObjectFollowersAudience, + getOriginVideoAudience, + getOriginVideoCommentAudience, unicastTo } from './misc' @@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { @@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr // This was a reply, send it to the parent actors const actorsException = [ byActor ] - await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) + await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) // Broadcast to our followers await broadcastToFollowers(data, byActor, [ byActor ], t) // Send to origin - return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { @@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode // This was a reply, send it to the parent actors const actorsException = [ byActor ] - await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) + await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) // Broadcast to our followers await broadcastToFollowers(data, byActor, [ byActor ], t) @@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) const data = await createActivityData(url, byActor, viewActivityData, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts index eac60e94f..4e9865af4 100644 --- a/server/lib/activitypub/send/send-follow.ts +++ b/server/lib/activitypub/send/send-follow.ts @@ -1,18 +1,17 @@ -import { Transaction } from 'sequelize' import { ActivityFollow } from '../../../../shared/models/activitypub' import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { getActorFollowActivityPubUrl } from '../url' import { unicastTo } from './misc' -function sendFollow (actorFollow: ActorFollowModel, t: Transaction) { +function sendFollow (actorFollow: ActorFollowModel) { const me = actorFollow.ActorFollower const following = actorFollow.ActorFollowing const url = getActorFollowActivityPubUrl(actorFollow) const data = followActivityData(url, me, following) - return unicastTo(data, me, following.inboxUrl, t) + return unicastTo(data, me, following.inboxUrl) } function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts index 743646455..78ed1aaf2 100644 --- a/server/lib/activitypub/send/send-like.ts +++ b/server/lib/activitypub/send/send-like.ts @@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) const data = await likeActivityData(url, byActor, video, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts index 3a0597fba..4a08b5ca1 100644 --- a/server/lib/activitypub/send/send-undo.ts +++ b/server/lib/activitypub/send/send-undo.ts @@ -1,11 +1,5 @@ import { Transaction } from 'sequelize' -import { - ActivityAudience, - ActivityCreate, - ActivityFollow, - ActivityLike, - ActivityUndo -} from '../../../../shared/models/activitypub' +import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub' import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { VideoModel } from '../../../models/video/video' @@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) { const object = followActivityData(followUrl, me, following) const data = await undoActivityData(undoUrl, me, object, t) - return unicastTo(data, me, following.inboxUrl, t) + return unicastTo(data, me, following.inboxUrl) } async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: const object = await likeActivityData(likeUrl, byActor, video, t) const data = await undoActivityData(undoUrl, byActor, object, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel, const data = await undoActivityData(undoUrl, byActor, object, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { -- cgit v1.2.3