diff options
Diffstat (limited to 'server/lib')
27 files changed, 367 insertions, 489 deletions
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index c708b38ba..712de7d0d 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts | |||
@@ -64,7 +64,11 @@ async function getOrCreateActorAndServerAndModel (actorUrl: string, recurseIfNee | |||
64 | actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) | 64 | actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) |
65 | } | 65 | } |
66 | 66 | ||
67 | return refreshActorIfNeeded(actor) | 67 | const options = { |
68 | arguments: [ actor ], | ||
69 | errorMessage: 'Cannot refresh actor if needed with many retries.' | ||
70 | } | ||
71 | return retryTransactionWrapper(refreshActorIfNeeded, options) | ||
68 | } | 72 | } |
69 | 73 | ||
70 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { | 74 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { |
@@ -325,38 +329,43 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu | |||
325 | async function refreshActorIfNeeded (actor: ActorModel) { | 329 | async function refreshActorIfNeeded (actor: ActorModel) { |
326 | if (!actor.isOutdated()) return actor | 330 | if (!actor.isOutdated()) return actor |
327 | 331 | ||
328 | const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) | 332 | try { |
329 | const result = await fetchRemoteActor(actorUrl) | 333 | const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) |
330 | if (result === undefined) { | 334 | const result = await fetchRemoteActor(actorUrl) |
331 | logger.warn('Cannot fetch remote actor in refresh actor.') | 335 | if (result === undefined) { |
332 | return actor | 336 | logger.warn('Cannot fetch remote actor in refresh actor.') |
333 | } | 337 | return actor |
334 | |||
335 | return sequelizeTypescript.transaction(async t => { | ||
336 | updateInstanceWithAnother(actor, result.actor) | ||
337 | |||
338 | if (result.avatarName !== undefined) { | ||
339 | await updateActorAvatarInstance(actor, result.avatarName, t) | ||
340 | } | 338 | } |
341 | 339 | ||
342 | // Force update | 340 | return sequelizeTypescript.transaction(async t => { |
343 | actor.setDataValue('updatedAt', new Date()) | 341 | updateInstanceWithAnother(actor, result.actor) |
344 | await actor.save({ transaction: t }) | ||
345 | 342 | ||
346 | if (actor.Account) { | 343 | if (result.avatarName !== undefined) { |
347 | await actor.save({ transaction: t }) | 344 | await updateActorAvatarInstance(actor, result.avatarName, t) |
345 | } | ||
348 | 346 | ||
349 | actor.Account.set('name', result.name) | 347 | // Force update |
350 | await actor.Account.save({ transaction: t }) | 348 | actor.setDataValue('updatedAt', new Date()) |
351 | } else if (actor.VideoChannel) { | ||
352 | await actor.save({ transaction: t }) | 349 | await actor.save({ transaction: t }) |
353 | 350 | ||
354 | actor.VideoChannel.set('name', result.name) | 351 | if (actor.Account) { |
355 | await actor.VideoChannel.save({ transaction: t }) | 352 | await actor.save({ transaction: t }) |
356 | } | 353 | |
354 | actor.Account.set('name', result.name) | ||
355 | await actor.Account.save({ transaction: t }) | ||
356 | } else if (actor.VideoChannel) { | ||
357 | await actor.save({ transaction: t }) | ||
358 | |||
359 | actor.VideoChannel.set('name', result.name) | ||
360 | await actor.VideoChannel.save({ transaction: t }) | ||
361 | } | ||
357 | 362 | ||
363 | return actor | ||
364 | }) | ||
365 | } catch (err) { | ||
366 | logger.warn('Cannot refresh actor.', err) | ||
358 | return actor | 367 | return actor |
359 | }) | 368 | } |
360 | } | 369 | } |
361 | 370 | ||
362 | function normalizeActor (actor: any) { | 371 | function normalizeActor (actor: any) { |
diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts index 4fc97cc38..b1b370a1a 100644 --- a/server/lib/activitypub/fetch.ts +++ b/server/lib/activitypub/fetch.ts | |||
@@ -1,13 +1,12 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { ActorModel } from '../../models/activitypub/actor' | 1 | import { ActorModel } from '../../models/activitypub/actor' |
3 | import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler' | 2 | import { JobQueue } from '../job-queue' |
4 | 3 | ||
5 | async function addFetchOutboxJob (actor: ActorModel, t: Transaction) { | 4 | async function addFetchOutboxJob (actor: ActorModel) { |
6 | const jobPayload: ActivityPubHttpPayload = { | 5 | const payload = { |
7 | uris: [ actor.outboxUrl ] | 6 | uris: [ actor.outboxUrl ] |
8 | } | 7 | } |
9 | 8 | ||
10 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) | 9 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
11 | } | 10 | } |
12 | 11 | ||
13 | export { | 12 | export { |
diff --git a/server/lib/activitypub/process/process-accept.ts b/server/lib/activitypub/process/process-accept.ts index 551f09ea7..7db2f8ff0 100644 --- a/server/lib/activitypub/process/process-accept.ts +++ b/server/lib/activitypub/process/process-accept.ts | |||
@@ -26,6 +26,6 @@ async function processAccept (actor: ActorModel, targetActor: ActorModel) { | |||
26 | if (follow.state !== 'accepted') { | 26 | if (follow.state !== 'accepted') { |
27 | follow.set('state', 'accepted') | 27 | follow.set('state', 'accepted') |
28 | await follow.save() | 28 | await follow.save() |
29 | await addFetchOutboxJob(targetActor, undefined) | 29 | await addFetchOutboxJob(targetActor) |
30 | } | 30 | } |
31 | } | 31 | } |
diff --git a/server/lib/activitypub/process/process-follow.ts b/server/lib/activitypub/process/process-follow.ts index 69f5c51b5..dc1d542b5 100644 --- a/server/lib/activitypub/process/process-follow.ts +++ b/server/lib/activitypub/process/process-follow.ts | |||
@@ -63,7 +63,7 @@ async function follow (actor: ActorModel, targetActorURL: string) { | |||
63 | actorFollow.ActorFollowing = targetActor | 63 | actorFollow.ActorFollowing = targetActor |
64 | 64 | ||
65 | // Target sends to actor he accepted the follow request | 65 | // Target sends to actor he accepted the follow request |
66 | return sendAccept(actorFollow, t) | 66 | return sendAccept(actorFollow) |
67 | }) | 67 | }) |
68 | 68 | ||
69 | logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) | 69 | logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) |
diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts index dc0d3de57..7a21f0c94 100644 --- a/server/lib/activitypub/send/misc.ts +++ b/server/lib/activitypub/send/misc.ts | |||
@@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | |||
7 | import { VideoModel } from '../../../models/video/video' | 7 | import { VideoModel } from '../../../models/video/video' |
8 | import { VideoCommentModel } from '../../../models/video/video-comment' | 8 | import { VideoCommentModel } from '../../../models/video/video-comment' |
9 | import { VideoShareModel } from '../../../models/video/video-share' | 9 | import { VideoShareModel } from '../../../models/video/video-share' |
10 | import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' | 10 | import { JobQueue } from '../../job-queue' |
11 | 11 | ||
12 | async function forwardActivity ( | 12 | async function forwardActivity ( |
13 | activity: Activity, | 13 | activity: Activity, |
@@ -35,12 +35,11 @@ async function forwardActivity ( | |||
35 | 35 | ||
36 | logger.debug('Creating forwarding job.', { uris }) | 36 | logger.debug('Creating forwarding job.', { uris }) |
37 | 37 | ||
38 | const jobPayload: ActivityPubHttpPayload = { | 38 | const payload = { |
39 | uris, | 39 | uris, |
40 | body: activity | 40 | body: activity |
41 | } | 41 | } |
42 | 42 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | |
43 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) | ||
44 | } | 43 | } |
45 | 44 | ||
46 | async function broadcastToFollowers ( | 45 | async function broadcastToFollowers ( |
@@ -51,44 +50,43 @@ async function broadcastToFollowers ( | |||
51 | actorsException: ActorModel[] = [] | 50 | actorsException: ActorModel[] = [] |
52 | ) { | 51 | ) { |
53 | const uris = await computeFollowerUris(toActorFollowers, actorsException, t) | 52 | const uris = await computeFollowerUris(toActorFollowers, actorsException, t) |
54 | return broadcastTo(uris, data, byActor, t) | 53 | return broadcastTo(uris, data, byActor) |
55 | } | 54 | } |
56 | 55 | ||
57 | async function broadcastToActors ( | 56 | async function broadcastToActors ( |
58 | data: any, | 57 | data: any, |
59 | byActor: ActorModel, | 58 | byActor: ActorModel, |
60 | toActors: ActorModel[], | 59 | toActors: ActorModel[], |
61 | t: Transaction, | ||
62 | actorsException: ActorModel[] = [] | 60 | actorsException: ActorModel[] = [] |
63 | ) { | 61 | ) { |
64 | const uris = await computeUris(toActors, actorsException) | 62 | const uris = await computeUris(toActors, actorsException) |
65 | return broadcastTo(uris, data, byActor, t) | 63 | return broadcastTo(uris, data, byActor) |
66 | } | 64 | } |
67 | 65 | ||
68 | async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { | 66 | async function broadcastTo (uris: string[], data: any, byActor: ActorModel) { |
69 | if (uris.length === 0) return undefined | 67 | if (uris.length === 0) return undefined |
70 | 68 | ||
71 | logger.debug('Creating broadcast job.', { uris }) | 69 | logger.debug('Creating broadcast job.', { uris }) |
72 | 70 | ||
73 | const jobPayload: ActivityPubHttpPayload = { | 71 | const payload = { |
74 | uris, | 72 | uris, |
75 | signatureActorId: byActor.id, | 73 | signatureActorId: byActor.id, |
76 | body: data | 74 | body: data |
77 | } | 75 | } |
78 | 76 | ||
79 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) | 77 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) |
80 | } | 78 | } |
81 | 79 | ||
82 | async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { | 80 | async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) { |
83 | logger.debug('Creating unicast job.', { uri: toActorUrl }) | 81 | logger.debug('Creating unicast job.', { uri: toActorUrl }) |
84 | 82 | ||
85 | const jobPayload: ActivityPubHttpPayload = { | 83 | const payload = { |
86 | uris: [ toActorUrl ], | 84 | uri: toActorUrl, |
87 | signatureActorId: byActor.id, | 85 | signatureActorId: byActor.id, |
88 | body: data | 86 | body: data |
89 | } | 87 | } |
90 | 88 | ||
91 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) | 89 | return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) |
92 | } | 90 | } |
93 | 91 | ||
94 | function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { | 92 | function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { |
diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts index 4eaa329d9..064fd88d2 100644 --- a/server/lib/activitypub/send/send-accept.ts +++ b/server/lib/activitypub/send/send-accept.ts | |||
@@ -1,4 +1,3 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' | 1 | import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' |
3 | import { ActorModel } from '../../../models/activitypub/actor' | 2 | import { ActorModel } from '../../../models/activitypub/actor' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
@@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from | |||
6 | import { unicastTo } from './misc' | 5 | import { unicastTo } from './misc' |
7 | import { followActivityData } from './send-follow' | 6 | import { followActivityData } from './send-follow' |
8 | 7 | ||
9 | async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { | 8 | async function sendAccept (actorFollow: ActorFollowModel) { |
10 | const follower = actorFollow.ActorFollower | 9 | const follower = actorFollow.ActorFollower |
11 | const me = actorFollow.ActorFollowing | 10 | const me = actorFollow.ActorFollowing |
12 | 11 | ||
@@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { | |||
16 | const url = getActorFollowAcceptActivityPubUrl(actorFollow) | 15 | const url = getActorFollowAcceptActivityPubUrl(actorFollow) |
17 | const data = acceptActivityData(url, me, followData) | 16 | const data = acceptActivityData(url, me, followData) |
18 | 17 | ||
19 | return unicastTo(data, me, follower.inboxUrl, t) | 18 | return unicastTo(data, me, follower.inboxUrl) |
20 | } | 19 | } |
21 | 20 | ||
22 | // --------------------------------------------------------------------------- | 21 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts index 578fbc630..93b5668d2 100644 --- a/server/lib/activitypub/send/send-announce.ts +++ b/server/lib/activitypub/send/send-announce.ts | |||
@@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel | |||
42 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) | 42 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) |
43 | const data = await createActivityData(url, byActor, announcedActivity, t, audience) | 43 | const data = await createActivityData(url, byActor, announcedActivity, t, audience) |
44 | 44 | ||
45 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 45 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
46 | } | 46 | } |
47 | 47 | ||
48 | async function announceActivityData ( | 48 | async function announceActivityData ( |
diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 9db663be1..b92615e9b 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts | |||
@@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse' | |||
8 | import { VideoCommentModel } from '../../../models/video/video-comment' | 8 | import { VideoCommentModel } from '../../../models/video/video-comment' |
9 | import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' | 9 | import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' |
10 | import { | 10 | import { |
11 | audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, | 11 | audiencify, |
12 | getOriginVideoAudience, getOriginVideoCommentAudience, | 12 | broadcastToActors, |
13 | broadcastToFollowers, | ||
14 | getActorsInvolvedInVideo, | ||
15 | getAudience, | ||
16 | getObjectFollowersAudience, | ||
17 | getOriginVideoAudience, | ||
18 | getOriginVideoCommentAudience, | ||
13 | unicastTo | 19 | unicastTo |
14 | } from './misc' | 20 | } from './misc' |
15 | 21 | ||
@@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, | |||
31 | const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } | 37 | const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } |
32 | const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) | 38 | const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) |
33 | 39 | ||
34 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 40 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
35 | } | 41 | } |
36 | 42 | ||
37 | async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { | 43 | async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { |
@@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr | |||
47 | 53 | ||
48 | // This was a reply, send it to the parent actors | 54 | // This was a reply, send it to the parent actors |
49 | const actorsException = [ byActor ] | 55 | const actorsException = [ byActor ] |
50 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) | 56 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) |
51 | 57 | ||
52 | // Broadcast to our followers | 58 | // Broadcast to our followers |
53 | await broadcastToFollowers(data, byActor, [ byActor ], t) | 59 | await broadcastToFollowers(data, byActor, [ byActor ], t) |
54 | 60 | ||
55 | // Send to origin | 61 | // Send to origin |
56 | return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 62 | return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl) |
57 | } | 63 | } |
58 | 64 | ||
59 | async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { | 65 | async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { |
@@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode | |||
69 | 75 | ||
70 | // This was a reply, send it to the parent actors | 76 | // This was a reply, send it to the parent actors |
71 | const actorsException = [ byActor ] | 77 | const actorsException = [ byActor ] |
72 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) | 78 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) |
73 | 79 | ||
74 | // Broadcast to our followers | 80 | // Broadcast to our followers |
75 | await broadcastToFollowers(data, byActor, [ byActor ], t) | 81 | await broadcastToFollowers(data, byActor, [ byActor ], t) |
@@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t | |||
86 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) | 92 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) |
87 | const data = await createActivityData(url, byActor, viewActivityData, t, audience) | 93 | const data = await createActivityData(url, byActor, viewActivityData, t, audience) |
88 | 94 | ||
89 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 95 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
90 | } | 96 | } |
91 | 97 | ||
92 | async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 98 | async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
@@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel | |||
111 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) | 117 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) |
112 | const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) | 118 | const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) |
113 | 119 | ||
114 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 120 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
115 | } | 121 | } |
116 | 122 | ||
117 | async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 123 | async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts index eac60e94f..4e9865af4 100644 --- a/server/lib/activitypub/send/send-follow.ts +++ b/server/lib/activitypub/send/send-follow.ts | |||
@@ -1,18 +1,17 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { ActivityFollow } from '../../../../shared/models/activitypub' | 1 | import { ActivityFollow } from '../../../../shared/models/activitypub' |
3 | import { ActorModel } from '../../../models/activitypub/actor' | 2 | import { ActorModel } from '../../../models/activitypub/actor' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
5 | import { getActorFollowActivityPubUrl } from '../url' | 4 | import { getActorFollowActivityPubUrl } from '../url' |
6 | import { unicastTo } from './misc' | 5 | import { unicastTo } from './misc' |
7 | 6 | ||
8 | function sendFollow (actorFollow: ActorFollowModel, t: Transaction) { | 7 | function sendFollow (actorFollow: ActorFollowModel) { |
9 | const me = actorFollow.ActorFollower | 8 | const me = actorFollow.ActorFollower |
10 | const following = actorFollow.ActorFollowing | 9 | const following = actorFollow.ActorFollowing |
11 | 10 | ||
12 | const url = getActorFollowActivityPubUrl(actorFollow) | 11 | const url = getActorFollowActivityPubUrl(actorFollow) |
13 | const data = followActivityData(url, me, following) | 12 | const data = followActivityData(url, me, following) |
14 | 13 | ||
15 | return unicastTo(data, me, following.inboxUrl, t) | 14 | return unicastTo(data, me, following.inboxUrl) |
16 | } | 15 | } |
17 | 16 | ||
18 | function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { | 17 | function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { |
diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts index 743646455..78ed1aaf2 100644 --- a/server/lib/activitypub/send/send-like.ts +++ b/server/lib/activitypub/send/send-like.ts | |||
@@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran | |||
20 | const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) | 20 | const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) |
21 | const data = await likeActivityData(url, byActor, video, t, audience) | 21 | const data = await likeActivityData(url, byActor, video, t, audience) |
22 | 22 | ||
23 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 23 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
24 | } | 24 | } |
25 | 25 | ||
26 | async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 26 | async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts index 3a0597fba..4a08b5ca1 100644 --- a/server/lib/activitypub/send/send-undo.ts +++ b/server/lib/activitypub/send/send-undo.ts | |||
@@ -1,11 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { | 2 | import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub' |
3 | ActivityAudience, | ||
4 | ActivityCreate, | ||
5 | ActivityFollow, | ||
6 | ActivityLike, | ||
7 | ActivityUndo | ||
8 | } from '../../../../shared/models/activitypub' | ||
9 | import { ActorModel } from '../../../models/activitypub/actor' | 3 | import { ActorModel } from '../../../models/activitypub/actor' |
10 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
11 | import { VideoModel } from '../../../models/video/video' | 5 | import { VideoModel } from '../../../models/video/video' |
@@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) { | |||
33 | const object = followActivityData(followUrl, me, following) | 27 | const object = followActivityData(followUrl, me, following) |
34 | const data = await undoActivityData(undoUrl, me, object, t) | 28 | const data = await undoActivityData(undoUrl, me, object, t) |
35 | 29 | ||
36 | return unicastTo(data, me, following.inboxUrl, t) | 30 | return unicastTo(data, me, following.inboxUrl) |
37 | } | 31 | } |
38 | 32 | ||
39 | async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { | 33 | async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { |
@@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: | |||
45 | const object = await likeActivityData(likeUrl, byActor, video, t) | 39 | const object = await likeActivityData(likeUrl, byActor, video, t) |
46 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) | 40 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) |
47 | 41 | ||
48 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 42 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
49 | } | 43 | } |
50 | 44 | ||
51 | async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 45 | async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
@@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel, | |||
72 | 66 | ||
73 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) | 67 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) |
74 | 68 | ||
75 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 69 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
76 | } | 70 | } |
77 | 71 | ||
78 | async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 72 | async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 3f780e319..159856cda 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,10 +1,19 @@ | |||
1 | import * as kue from 'kue' | ||
1 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
2 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
4 | import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' | 5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
5 | 6 | ||
6 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 7 | export type ActivitypubHttpBroadcastPayload = { |
7 | logger.info('Processing ActivityPub broadcast in job %d.', jobId) | 8 | uris: string[] |
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpBroadcast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
8 | 17 | ||
9 | const body = await computeBody(payload) | 18 | const body = await computeBody(payload) |
10 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | 19 | const httpSignatureOptions = await buildSignedRequestOptions(payload) |
@@ -26,28 +35,15 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { | |||
26 | await doRequest(options) | 35 | await doRequest(options) |
27 | goodUrls.push(uri) | 36 | goodUrls.push(uri) |
28 | } catch (err) { | 37 | } catch (err) { |
29 | const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) | 38 | badUrls.push(uri) |
30 | if (isRetryingLater === false) badUrls.push(uri) | ||
31 | } | 39 | } |
32 | } | 40 | } |
33 | 41 | ||
34 | return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) | 42 | return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) |
35 | } | 43 | } |
36 | 44 | ||
37 | function onError (err: Error, jobId: number) { | ||
38 | logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) | ||
39 | return Promise.resolve() | ||
40 | } | ||
41 | |||
42 | function onSuccess (jobId: number) { | ||
43 | logger.info('Job %d is a success.', jobId) | ||
44 | return Promise.resolve() | ||
45 | } | ||
46 | |||
47 | // --------------------------------------------------------------------------- | 45 | // --------------------------------------------------------------------------- |
48 | 46 | ||
49 | export { | 47 | export { |
50 | process, | 48 | processActivityPubHttpBroadcast |
51 | onError, | ||
52 | onSuccess | ||
53 | } | 49 | } |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index a7b5aabd0..062211c85 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,11 +1,18 @@ | |||
1 | import * as kue from 'kue' | ||
1 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
2 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
3 | import { ACTIVITY_PUB } from '../../../initializers' | 4 | import { ACTIVITY_PUB } from '../../../initializers' |
4 | import { processActivities } from '../../activitypub/process' | 5 | import { processActivities } from '../../activitypub/process' |
5 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | 6 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' |
6 | 7 | ||
7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 8 | export type ActivitypubHttpFetcherPayload = { |
8 | logger.info('Processing ActivityPub fetcher in job %d.', jobId) | 9 | uris: string[] |
10 | } | ||
11 | |||
12 | async function processActivityPubHttpFetcher (job: kue.Job) { | ||
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | ||
14 | |||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
9 | 16 | ||
10 | const options = { | 17 | const options = { |
11 | method: 'GET', | 18 | method: 'GET', |
@@ -49,20 +56,8 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { | |||
49 | } | 56 | } |
50 | } | 57 | } |
51 | 58 | ||
52 | function onError (err: Error, jobId: number) { | ||
53 | logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err) | ||
54 | return Promise.resolve() | ||
55 | } | ||
56 | |||
57 | function onSuccess (jobId: number) { | ||
58 | logger.info('Job %d is a success.', jobId) | ||
59 | return Promise.resolve() | ||
60 | } | ||
61 | |||
62 | // --------------------------------------------------------------------------- | 59 | // --------------------------------------------------------------------------- |
63 | 60 | ||
64 | export { | 61 | export { |
65 | process, | 62 | processActivityPubHttpFetcher |
66 | onError, | ||
67 | onSuccess | ||
68 | } | 63 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts new file mode 100644 index 000000000..9b4188c50 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | |||
7 | export type ActivitypubHttpUnicastPayload = { | ||
8 | uri: string | ||
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpUnicast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpUnicastPayload | ||
17 | const uri = payload.uri | ||
18 | |||
19 | const body = await computeBody(payload) | ||
20 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
21 | |||
22 | const options = { | ||
23 | method: 'POST', | ||
24 | uri, | ||
25 | json: body, | ||
26 | httpSignature: httpSignatureOptions | ||
27 | } | ||
28 | |||
29 | try { | ||
30 | await doRequest(options) | ||
31 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) | ||
32 | } catch (err) { | ||
33 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) | ||
34 | |||
35 | throw err | ||
36 | } | ||
37 | } | ||
38 | |||
39 | // --------------------------------------------------------------------------- | ||
40 | |||
41 | export { | ||
42 | processActivityPubHttpUnicast | ||
43 | } | ||
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts new file mode 100644 index 000000000..c087371c6 --- /dev/null +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -0,0 +1,39 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | ||
2 | import { getServerActor } from '../../../../helpers/utils' | ||
3 | import { ActorModel } from '../../../../models/activitypub/actor' | ||
4 | |||
5 | async function computeBody (payload: { body: any, signatureActorId?: number }) { | ||
6 | let body = payload.body | ||
7 | |||
8 | if (payload.signatureActorId) { | ||
9 | const actorSignature = await ActorModel.load(payload.signatureActorId) | ||
10 | if (!actorSignature) throw new Error('Unknown signature actor id.') | ||
11 | body = await buildSignedActivity(actorSignature, payload.body) | ||
12 | } | ||
13 | |||
14 | return body | ||
15 | } | ||
16 | |||
17 | async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { | ||
18 | let actor: ActorModel | ||
19 | if (payload.signatureActorId) { | ||
20 | actor = await ActorModel.load(payload.signatureActorId) | ||
21 | if (!actor) throw new Error('Unknown signature actor id.') | ||
22 | } else { | ||
23 | // We need to sign the request, so use the server | ||
24 | actor = await getServerActor() | ||
25 | } | ||
26 | |||
27 | const keyId = actor.getWebfingerUrl() | ||
28 | return { | ||
29 | algorithm: 'rsa-sha256', | ||
30 | authorizationHeaderName: 'Signature', | ||
31 | keyId, | ||
32 | key: actor.privateKey | ||
33 | } | ||
34 | } | ||
35 | |||
36 | export { | ||
37 | computeBody, | ||
38 | buildSignedRequestOptions | ||
39 | } | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/job-queue/handlers/video-file.ts index f224a31b4..5294483bd 100644 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -1,38 +1,60 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import * as kue from 'kue' |
2 | import { VideoResolution } from '../../../../shared' | ||
2 | import { VideoPrivacy } from '../../../../shared/models/videos' | 3 | import { VideoPrivacy } from '../../../../shared/models/videos' |
3 | import { logger } from '../../../helpers/logger' | 4 | import { logger } from '../../../helpers/logger' |
4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | 5 | import { computeResolutionsToTranscode } from '../../../helpers/utils' |
5 | import { sequelizeTypescript } from '../../../initializers' | 6 | import { sequelizeTypescript } from '../../../initializers' |
6 | import { JobModel } from '../../../models/job/job' | ||
7 | import { VideoModel } from '../../../models/video/video' | 7 | import { VideoModel } from '../../../models/video/video' |
8 | import { shareVideoByServerAndChannel } from '../../activitypub' | 8 | import { shareVideoByServerAndChannel } from '../../activitypub' |
9 | import { sendCreateVideo } from '../../activitypub/send' | 9 | import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' |
10 | import { JobScheduler } from '../job-scheduler' | 10 | import { JobQueue } from '../job-queue' |
11 | import { TranscodingJobPayload } from './transcoding-job-scheduler' | ||
12 | 11 | ||
13 | async function process (data: TranscodingJobPayload, jobId: number) { | 12 | export type VideoFilePayload = { |
14 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) | 13 | videoUUID: string |
14 | resolution?: VideoResolution | ||
15 | } | ||
16 | |||
17 | async function processVideoFile (job: kue.Job) { | ||
18 | const payload = job.data as VideoFilePayload | ||
19 | logger.info('Processing video file in job %d.', job.id) | ||
20 | |||
21 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) | ||
15 | // No video, maybe deleted? | 22 | // No video, maybe deleted? |
16 | if (!video) { | 23 | if (!video) { |
17 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 24 | logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) |
18 | return undefined | 25 | return undefined |
19 | } | 26 | } |
20 | 27 | ||
21 | await video.optimizeOriginalVideofile() | 28 | // Transcoding in other resolution |
29 | if (payload.resolution) { | ||
30 | await video.transcodeOriginalVideofile(payload.resolution) | ||
31 | await onVideoFileTranscoderSuccess(video) | ||
32 | } else { | ||
33 | await video.optimizeOriginalVideofile() | ||
34 | await onVideoFileOptimizerSuccess(video) | ||
35 | } | ||
22 | 36 | ||
23 | return video | 37 | return video |
24 | } | 38 | } |
25 | 39 | ||
26 | function onError (err: Error, jobId: number) { | 40 | async function onVideoFileTranscoderSuccess (video: VideoModel) { |
27 | logger.error('Error when optimized video file in job %d.', jobId, err) | 41 | if (video === undefined) return undefined |
28 | return Promise.resolve() | 42 | |
43 | // Maybe the video changed in database, refresh it | ||
44 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
45 | // Video does not exist anymore | ||
46 | if (!videoDatabase) return undefined | ||
47 | |||
48 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
49 | await sendUpdateVideo(video, undefined) | ||
50 | } | ||
51 | |||
52 | return undefined | ||
29 | } | 53 | } |
30 | 54 | ||
31 | async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) { | 55 | async function onVideoFileOptimizerSuccess (video: VideoModel) { |
32 | if (video === undefined) return undefined | 56 | if (video === undefined) return undefined |
33 | 57 | ||
34 | logger.info('Job %d is a success.', jobId) | ||
35 | |||
36 | // Maybe the video changed in database, refresh it | 58 | // Maybe the video changed in database, refresh it |
37 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | 59 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) |
38 | // Video does not exist anymore | 60 | // Video does not exist anymore |
@@ -56,7 +78,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch | |||
56 | if (resolutionsEnabled.length !== 0) { | 78 | if (resolutionsEnabled.length !== 0) { |
57 | try { | 79 | try { |
58 | await sequelizeTypescript.transaction(async t => { | 80 | await sequelizeTypescript.transaction(async t => { |
59 | const tasks: Bluebird<JobModel>[] = [] | 81 | const tasks: Promise<any>[] = [] |
60 | 82 | ||
61 | for (const resolution of resolutionsEnabled) { | 83 | for (const resolution of resolutionsEnabled) { |
62 | const dataInput = { | 84 | const dataInput = { |
@@ -64,7 +86,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch | |||
64 | resolution | 86 | resolution |
65 | } | 87 | } |
66 | 88 | ||
67 | const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) | 89 | const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) |
68 | tasks.push(p) | 90 | tasks.push(p) |
69 | } | 91 | } |
70 | 92 | ||
@@ -84,7 +106,5 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch | |||
84 | // --------------------------------------------------------------------------- | 106 | // --------------------------------------------------------------------------- |
85 | 107 | ||
86 | export { | 108 | export { |
87 | process, | 109 | processVideoFile |
88 | onError, | ||
89 | onSuccess | ||
90 | } | 110 | } |
diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts new file mode 100644 index 000000000..57231e649 --- /dev/null +++ b/server/lib/job-queue/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './job-queue' | |||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts new file mode 100644 index 000000000..7a2b6c78d --- /dev/null +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -0,0 +1,124 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { JobType, JobState } from '../../../shared/models' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' | ||
5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | ||
6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | ||
7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | ||
8 | import { processVideoFile, VideoFilePayload } from './handlers/video-file' | ||
9 | |||
10 | type CreateJobArgument = | ||
11 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | ||
12 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | ||
13 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | ||
14 | { type: 'video-file', payload: VideoFilePayload } | ||
15 | |||
16 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | ||
17 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | ||
18 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
19 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
20 | 'video-file': processVideoFile | ||
21 | } | ||
22 | |||
23 | class JobQueue { | ||
24 | |||
25 | private static instance: JobQueue | ||
26 | |||
27 | private jobQueue: kue.Queue | ||
28 | private initialized = false | ||
29 | |||
30 | private constructor () {} | ||
31 | |||
32 | init () { | ||
33 | // Already initialized | ||
34 | if (this.initialized === true) return | ||
35 | this.initialized = true | ||
36 | |||
37 | this.jobQueue = kue.createQueue({ | ||
38 | prefix: 'q-' + CONFIG.WEBSERVER.HOST, | ||
39 | redis: { | ||
40 | host: CONFIG.REDIS.HOSTNAME, | ||
41 | port: CONFIG.REDIS.PORT, | ||
42 | auth: CONFIG.REDIS.AUTH | ||
43 | } | ||
44 | }) | ||
45 | |||
46 | this.jobQueue.on('error', err => { | ||
47 | logger.error('Error in job queue.', err) | ||
48 | process.exit(-1) | ||
49 | }) | ||
50 | this.jobQueue.watchStuckJobs(5000) | ||
51 | |||
52 | for (const handlerName of Object.keys(handlers)) { | ||
53 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | ||
54 | try { | ||
55 | const res = await handlers[ handlerName ](job) | ||
56 | return done(null, res) | ||
57 | } catch (err) { | ||
58 | return done(err) | ||
59 | } | ||
60 | }) | ||
61 | } | ||
62 | } | ||
63 | |||
64 | createJob (obj: CreateJobArgument, priority = 'normal') { | ||
65 | return new Promise((res, rej) => { | ||
66 | this.jobQueue | ||
67 | .create(obj.type, obj.payload) | ||
68 | .priority(priority) | ||
69 | .attempts(JOB_ATTEMPTS[obj.type]) | ||
70 | .backoff({ type: 'exponential' }) | ||
71 | .save(err => { | ||
72 | if (err) return rej(err) | ||
73 | |||
74 | return res() | ||
75 | }) | ||
76 | }) | ||
77 | } | ||
78 | |||
79 | listForApi (state: JobState, start: number, count: number, sort: string) { | ||
80 | return new Promise<kue.Job[]>((res, rej) => { | ||
81 | kue.Job.rangeByState(state, start, count, sort, (err, jobs) => { | ||
82 | if (err) return rej(err) | ||
83 | |||
84 | return res(jobs) | ||
85 | }) | ||
86 | }) | ||
87 | } | ||
88 | |||
89 | count (state: JobState) { | ||
90 | return new Promise<number>((res, rej) => { | ||
91 | this.jobQueue[state + 'Count']((err, total) => { | ||
92 | if (err) return rej(err) | ||
93 | |||
94 | return res(total) | ||
95 | }) | ||
96 | }) | ||
97 | } | ||
98 | |||
99 | removeOldJobs () { | ||
100 | const now = new Date().getTime() | ||
101 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
102 | if (err) { | ||
103 | logger.error('Cannot get jobs when removing old jobs.', err) | ||
104 | return | ||
105 | } | ||
106 | |||
107 | for (const job of jobs) { | ||
108 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
109 | job.remove() | ||
110 | } | ||
111 | } | ||
112 | }) | ||
113 | } | ||
114 | |||
115 | static get Instance () { | ||
116 | return this.instance || (this.instance = new this()) | ||
117 | } | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | export { | ||
123 | JobQueue | ||
124 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts deleted file mode 100644 index 4459152db..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ /dev/null | |||
@@ -1,94 +0,0 @@ | |||
1 | import { JobCategory } from '../../../../shared' | ||
2 | import { buildSignedActivity } from '../../../helpers/activitypub' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { getServerActor } from '../../../helpers/utils' | ||
5 | import { ACTIVITY_PUB } from '../../../initializers' | ||
6 | import { ActorModel } from '../../../models/activitypub/actor' | ||
7 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
8 | import { JobHandler, JobScheduler } from '../job-scheduler' | ||
9 | |||
10 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' | ||
11 | import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' | ||
12 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' | ||
13 | |||
14 | type ActivityPubHttpPayload = { | ||
15 | uris: string[] | ||
16 | signatureActorId?: number | ||
17 | body?: any | ||
18 | attemptNumber?: number | ||
19 | } | ||
20 | |||
21 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { | ||
22 | activitypubHttpBroadcastHandler, | ||
23 | activitypubHttpUnicastHandler, | ||
24 | activitypubHttpFetcherHandler | ||
25 | } | ||
26 | const jobCategory: JobCategory = 'activitypub-http' | ||
27 | |||
28 | const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
29 | |||
30 | async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) { | ||
31 | logger.warn('Cannot make request to %s.', uri, err) | ||
32 | |||
33 | let attemptNumber = payload.attemptNumber || 1 | ||
34 | attemptNumber += 1 | ||
35 | |||
36 | if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) { | ||
37 | logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err) | ||
38 | |||
39 | const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined) | ||
40 | if (!actor) { | ||
41 | logger.debug('Actor %s is not a follower, do not retry the request.', uri) | ||
42 | return false | ||
43 | } | ||
44 | |||
45 | const newPayload = Object.assign(payload, { | ||
46 | uris: [ uri ], | ||
47 | attemptNumber | ||
48 | }) | ||
49 | await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload) | ||
50 | |||
51 | return true | ||
52 | } | ||
53 | |||
54 | return false | ||
55 | } | ||
56 | |||
57 | async function computeBody (payload: ActivityPubHttpPayload) { | ||
58 | let body = payload.body | ||
59 | |||
60 | if (payload.signatureActorId) { | ||
61 | const actorSignature = await ActorModel.load(payload.signatureActorId) | ||
62 | if (!actorSignature) throw new Error('Unknown signature actor id.') | ||
63 | body = await buildSignedActivity(actorSignature, payload.body) | ||
64 | } | ||
65 | |||
66 | return body | ||
67 | } | ||
68 | |||
69 | async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) { | ||
70 | let actor: ActorModel | ||
71 | if (payload.signatureActorId) { | ||
72 | actor = await ActorModel.load(payload.signatureActorId) | ||
73 | if (!actor) throw new Error('Unknown signature actor id.') | ||
74 | } else { | ||
75 | // We need to sign the request, so use the server | ||
76 | actor = await getServerActor() | ||
77 | } | ||
78 | |||
79 | const keyId = actor.getWebfingerUrl() | ||
80 | return { | ||
81 | algorithm: 'rsa-sha256', | ||
82 | authorizationHeaderName: 'Signature', | ||
83 | keyId, | ||
84 | key: actor.privateKey | ||
85 | } | ||
86 | } | ||
87 | |||
88 | export { | ||
89 | ActivityPubHttpPayload, | ||
90 | activitypubHttpJobScheduler, | ||
91 | maybeRetryRequestLater, | ||
92 | computeBody, | ||
93 | buildSignedRequestOptions | ||
94 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts deleted file mode 100644 index 54a7504e8..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts +++ /dev/null | |||
@@ -1,50 +0,0 @@ | |||
1 | import { logger } from '../../../helpers/logger' | ||
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
4 | import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' | ||
5 | |||
6 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | ||
7 | logger.info('Processing ActivityPub unicast in job %d.', jobId) | ||
8 | |||
9 | const uri = payload.uris[0] | ||
10 | |||
11 | const body = await computeBody(payload) | ||
12 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
13 | |||
14 | const options = { | ||
15 | method: 'POST', | ||
16 | uri, | ||
17 | json: body, | ||
18 | httpSignature: httpSignatureOptions | ||
19 | } | ||
20 | |||
21 | try { | ||
22 | await doRequest(options) | ||
23 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) | ||
24 | } catch (err) { | ||
25 | const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) | ||
26 | if (isRetryingLater === false) { | ||
27 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) | ||
28 | } | ||
29 | |||
30 | throw err | ||
31 | } | ||
32 | } | ||
33 | |||
34 | function onError (err: Error, jobId: number) { | ||
35 | logger.error('Error when sending ActivityPub request in job %d.', jobId, err) | ||
36 | return Promise.resolve() | ||
37 | } | ||
38 | |||
39 | function onSuccess (jobId: number) { | ||
40 | logger.info('Job %d is a success.', jobId) | ||
41 | return Promise.resolve() | ||
42 | } | ||
43 | |||
44 | // --------------------------------------------------------------------------- | ||
45 | |||
46 | export { | ||
47 | process, | ||
48 | onError, | ||
49 | onSuccess | ||
50 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts deleted file mode 100644 index ad8f527b4..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts +++ /dev/null | |||
@@ -1 +0,0 @@ | |||
1 | export * from './activitypub-http-job-scheduler' | ||
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts deleted file mode 100644 index 394264ec1..000000000 --- a/server/lib/jobs/index.ts +++ /dev/null | |||
@@ -1,2 +0,0 @@ | |||
1 | export * from './activitypub-http-job-scheduler' | ||
2 | export * from './transcoding-job-scheduler' | ||
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts deleted file mode 100644 index 9d55880e6..000000000 --- a/server/lib/jobs/job-scheduler.ts +++ /dev/null | |||
@@ -1,144 +0,0 @@ | |||
1 | import { AsyncQueue, forever, queue } from 'async' | ||
2 | import * as Sequelize from 'sequelize' | ||
3 | import { JobCategory } from '../../../shared' | ||
4 | import { logger } from '../../helpers/logger' | ||
5 | import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' | ||
6 | import { JobModel } from '../../models/job/job' | ||
7 | |||
8 | export interface JobHandler<P, T> { | ||
9 | process (data: object, jobId: number): Promise<T> | ||
10 | onError (err: Error, jobId: number) | ||
11 | onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any> | ||
12 | } | ||
13 | type JobQueueCallback = (err: Error) => void | ||
14 | |||
15 | class JobScheduler<P, T> { | ||
16 | |||
17 | constructor ( | ||
18 | private jobCategory: JobCategory, | ||
19 | private jobHandlers: { [ id: string ]: JobHandler<P, T> } | ||
20 | ) {} | ||
21 | |||
22 | async activate () { | ||
23 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] | ||
24 | |||
25 | logger.info('Jobs scheduler %s activated.', this.jobCategory) | ||
26 | |||
27 | const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this)) | ||
28 | |||
29 | // Finish processing jobs from a previous start | ||
30 | const state = JOB_STATES.PROCESSING | ||
31 | try { | ||
32 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) | ||
33 | |||
34 | this.enqueueJobs(jobsQueue, jobs) | ||
35 | } catch (err) { | ||
36 | logger.error('Cannot list pending jobs.', err) | ||
37 | } | ||
38 | |||
39 | forever( | ||
40 | async next => { | ||
41 | if (jobsQueue.length() !== 0) { | ||
42 | // Finish processing the queue first | ||
43 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
44 | } | ||
45 | |||
46 | const state = JOB_STATES.PENDING | ||
47 | try { | ||
48 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) | ||
49 | |||
50 | this.enqueueJobs(jobsQueue, jobs) | ||
51 | } catch (err) { | ||
52 | logger.error('Cannot list pending jobs.', err) | ||
53 | } | ||
54 | |||
55 | // Optimization: we could use "drain" from queue object | ||
56 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
57 | }, | ||
58 | |||
59 | err => logger.error('Error in job scheduler queue.', err) | ||
60 | ) | ||
61 | } | ||
62 | |||
63 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) { | ||
64 | const createQuery = { | ||
65 | state: JOB_STATES.PENDING, | ||
66 | category: this.jobCategory, | ||
67 | handlerName, | ||
68 | handlerInputData | ||
69 | } | ||
70 | |||
71 | const options = { transaction } | ||
72 | |||
73 | return JobModel.create(createQuery, options) | ||
74 | } | ||
75 | |||
76 | private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) { | ||
77 | jobs.forEach(job => jobsQueue.push(job)) | ||
78 | } | ||
79 | |||
80 | private async processJob (job: JobModel, callback: (err: Error) => void) { | ||
81 | const jobHandler = this.jobHandlers[job.handlerName] | ||
82 | if (jobHandler === undefined) { | ||
83 | const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id | ||
84 | logger.error(errorString) | ||
85 | |||
86 | const error = new Error(errorString) | ||
87 | await this.onJobError(jobHandler, job, error) | ||
88 | return callback(error) | ||
89 | } | ||
90 | |||
91 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
92 | |||
93 | job.state = JOB_STATES.PROCESSING | ||
94 | await job.save() | ||
95 | |||
96 | try { | ||
97 | const result: T = await jobHandler.process(job.handlerInputData, job.id) | ||
98 | await this.onJobSuccess(jobHandler, job, result) | ||
99 | } catch (err) { | ||
100 | logger.error('Error in job handler %s.', job.handlerName, err) | ||
101 | |||
102 | try { | ||
103 | await this.onJobError(jobHandler, job, err) | ||
104 | } catch (innerErr) { | ||
105 | this.cannotSaveJobError(innerErr) | ||
106 | return callback(innerErr) | ||
107 | } | ||
108 | } | ||
109 | |||
110 | return callback(null) | ||
111 | } | ||
112 | |||
113 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) { | ||
114 | job.state = JOB_STATES.ERROR | ||
115 | |||
116 | try { | ||
117 | await job.save() | ||
118 | if (jobHandler) await jobHandler.onError(err, job.id) | ||
119 | } catch (err) { | ||
120 | this.cannotSaveJobError(err) | ||
121 | } | ||
122 | } | ||
123 | |||
124 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) { | ||
125 | job.state = JOB_STATES.SUCCESS | ||
126 | |||
127 | try { | ||
128 | await job.save() | ||
129 | await jobHandler.onSuccess(job.id, jobResult, this) | ||
130 | } catch (err) { | ||
131 | this.cannotSaveJobError(err) | ||
132 | } | ||
133 | } | ||
134 | |||
135 | private cannotSaveJobError (err: Error) { | ||
136 | logger.error('Cannot save new job state.', err) | ||
137 | } | ||
138 | } | ||
139 | |||
140 | // --------------------------------------------------------------------------- | ||
141 | |||
142 | export { | ||
143 | JobScheduler | ||
144 | } | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts deleted file mode 100644 index 73152a1be..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/index.ts +++ /dev/null | |||
@@ -1 +0,0 @@ | |||
1 | export * from './transcoding-job-scheduler' | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts deleted file mode 100644 index e5530a73c..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts +++ /dev/null | |||
@@ -1,23 +0,0 @@ | |||
1 | import { JobCategory } from '../../../../shared' | ||
2 | import { VideoModel } from '../../../models/video/video' | ||
3 | import { JobHandler, JobScheduler } from '../job-scheduler' | ||
4 | |||
5 | import * as videoFileOptimizer from './video-file-optimizer-handler' | ||
6 | import * as videoFileTranscoder from './video-file-transcoder-handler' | ||
7 | |||
8 | type TranscodingJobPayload = { | ||
9 | videoUUID: string | ||
10 | resolution?: number | ||
11 | } | ||
12 | const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = { | ||
13 | videoFileOptimizer, | ||
14 | videoFileTranscoder | ||
15 | } | ||
16 | const jobCategory: JobCategory = 'transcoding' | ||
17 | |||
18 | const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
19 | |||
20 | export { | ||
21 | TranscodingJobPayload, | ||
22 | transcodingJobScheduler | ||
23 | } | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts deleted file mode 100644 index 883d3eba8..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts +++ /dev/null | |||
@@ -1,48 +0,0 @@ | |||
1 | import { VideoResolution } from '../../../../shared' | ||
2 | import { VideoPrivacy } from '../../../../shared/models/videos' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { VideoModel } from '../../../models/video/video' | ||
5 | import { sendUpdateVideo } from '../../activitypub/send' | ||
6 | |||
7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { | ||
8 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) | ||
9 | // No video, maybe deleted? | ||
10 | if (!video) { | ||
11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | ||
12 | return undefined | ||
13 | } | ||
14 | |||
15 | await video.transcodeOriginalVideofile(data.resolution) | ||
16 | |||
17 | return video | ||
18 | } | ||
19 | |||
20 | function onError (err: Error, jobId: number) { | ||
21 | logger.error('Error when transcoding video file in job %d.', jobId, err) | ||
22 | return Promise.resolve() | ||
23 | } | ||
24 | |||
25 | async function onSuccess (jobId: number, video: VideoModel) { | ||
26 | if (video === undefined) return undefined | ||
27 | |||
28 | logger.info('Job %d is a success.', jobId) | ||
29 | |||
30 | // Maybe the video changed in database, refresh it | ||
31 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
32 | // Video does not exist anymore | ||
33 | if (!videoDatabase) return undefined | ||
34 | |||
35 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
36 | await sendUpdateVideo(video, undefined) | ||
37 | } | ||
38 | |||
39 | return undefined | ||
40 | } | ||
41 | |||
42 | // --------------------------------------------------------------------------- | ||
43 | |||
44 | export { | ||
45 | process, | ||
46 | onError, | ||
47 | onSuccess | ||
48 | } | ||
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts new file mode 100644 index 000000000..add5677ac --- /dev/null +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts | |||
@@ -0,0 +1,19 @@ | |||
1 | import { JobQueue } from '../job-queue' | ||
2 | import { AbstractScheduler } from './abstract-scheduler' | ||
3 | |||
4 | export class RemoveOldJobsScheduler extends AbstractScheduler { | ||
5 | |||
6 | private static instance: AbstractScheduler | ||
7 | |||
8 | private constructor () { | ||
9 | super() | ||
10 | } | ||
11 | |||
12 | async execute () { | ||
13 | JobQueue.Instance.removeOldJobs() | ||
14 | } | ||
15 | |||
16 | static get Instance () { | ||
17 | return this.instance || (this.instance = new this()) | ||
18 | } | ||
19 | } | ||