From 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 25 Jan 2018 15:05:18 +0100 Subject: Move job queue to redis We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore. --- server/lib/activitypub/send/misc.ts | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) (limited to 'server/lib/activitypub/send/misc.ts') diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts index dc0d3de57..7a21f0c94 100644 --- a/server/lib/activitypub/send/misc.ts +++ b/server/lib/activitypub/send/misc.ts @@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { VideoModel } from '../../../models/video/video' import { VideoCommentModel } from '../../../models/video/video-comment' import { VideoShareModel } from '../../../models/video/video-share' -import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' +import { JobQueue } from '../../job-queue' async function forwardActivity ( activity: Activity, @@ -35,12 +35,11 @@ async function forwardActivity ( logger.debug('Creating forwarding job.', { uris }) - const jobPayload: ActivityPubHttpPayload = { + const payload = { uris, body: activity } - - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } async function broadcastToFollowers ( @@ -51,44 +50,43 @@ async function broadcastToFollowers ( actorsException: ActorModel[] = [] ) { const uris = await computeFollowerUris(toActorFollowers, actorsException, t) - return broadcastTo(uris, data, byActor, t) + return broadcastTo(uris, data, byActor) } async function broadcastToActors ( data: any, byActor: ActorModel, toActors: ActorModel[], - t: Transaction, actorsException: ActorModel[] = [] ) { const uris = await computeUris(toActors, actorsException) - return broadcastTo(uris, data, byActor, t) + return broadcastTo(uris, data, byActor) } -async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { +async function broadcastTo (uris: string[], data: any, byActor: ActorModel) { if (uris.length === 0) return undefined logger.debug('Creating broadcast job.', { uris }) - const jobPayload: ActivityPubHttpPayload = { + const payload = { uris, signatureActorId: byActor.id, body: data } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } -async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { +async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) { logger.debug('Creating unicast job.', { uri: toActorUrl }) - const jobPayload: ActivityPubHttpPayload = { - uris: [ toActorUrl ], + const payload = { + uri: toActorUrl, signatureActorId: byActor.id, body: data } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) } function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { -- cgit v1.2.3