aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-01-25 15:05:18 +0100
committerChocobozzz <me@florianbigard.com>2018-01-25 18:41:17 +0100
commit94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 (patch)
tree32a9148e0e4567f0c4ffae0412cbed20b84e8873 /server/lib
parentd765fafc3faf0db9818eb1a07161df1cb1bc0efa (diff)
downloadPeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.gz
PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.zst
PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.zip
Move job queue to redis
We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore.
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/actor.ts59
-rw-r--r--server/lib/activitypub/fetch.ts9
-rw-r--r--server/lib/activitypub/process/process-accept.ts2
-rw-r--r--server/lib/activitypub/process/process-follow.ts2
-rw-r--r--server/lib/activitypub/send/misc.ts26
-rw-r--r--server/lib/activitypub/send/send-accept.ts5
-rw-r--r--server/lib/activitypub/send/send-announce.ts2
-rw-r--r--server/lib/activitypub/send/send-create.ts22
-rw-r--r--server/lib/activitypub/send/send-follow.ts5
-rw-r--r--server/lib/activitypub/send/send-like.ts2
-rw-r--r--server/lib/activitypub/send/send-undo.ts14
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts (renamed from server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts)32
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts (renamed from server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts)27
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts43
-rw-r--r--server/lib/job-queue/handlers/utils/activitypub-http-utils.ts39
-rw-r--r--server/lib/job-queue/handlers/video-file.ts (renamed from server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts)60
-rw-r--r--server/lib/job-queue/index.ts1
-rw-r--r--server/lib/job-queue/job-queue.ts124
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts94
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts50
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/index.ts2
-rw-r--r--server/lib/jobs/job-scheduler.ts144
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts23
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts48
-rw-r--r--server/lib/schedulers/remove-old-jobs-scheduler.ts19
27 files changed, 367 insertions, 489 deletions
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts
index c708b38ba..712de7d0d 100644
--- a/server/lib/activitypub/actor.ts
+++ b/server/lib/activitypub/actor.ts
@@ -64,7 +64,11 @@ async function getOrCreateActorAndServerAndModel (actorUrl: string, recurseIfNee
64 actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) 64 actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options)
65 } 65 }
66 66
67 return refreshActorIfNeeded(actor) 67 const options = {
68 arguments: [ actor ],
69 errorMessage: 'Cannot refresh actor if needed with many retries.'
70 }
71 return retryTransactionWrapper(refreshActorIfNeeded, options)
68} 72}
69 73
70function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { 74function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) {
@@ -325,38 +329,43 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu
325async function refreshActorIfNeeded (actor: ActorModel) { 329async function refreshActorIfNeeded (actor: ActorModel) {
326 if (!actor.isOutdated()) return actor 330 if (!actor.isOutdated()) return actor
327 331
328 const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) 332 try {
329 const result = await fetchRemoteActor(actorUrl) 333 const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost())
330 if (result === undefined) { 334 const result = await fetchRemoteActor(actorUrl)
331 logger.warn('Cannot fetch remote actor in refresh actor.') 335 if (result === undefined) {
332 return actor 336 logger.warn('Cannot fetch remote actor in refresh actor.')
333 } 337 return actor
334
335 return sequelizeTypescript.transaction(async t => {
336 updateInstanceWithAnother(actor, result.actor)
337
338 if (result.avatarName !== undefined) {
339 await updateActorAvatarInstance(actor, result.avatarName, t)
340 } 338 }
341 339
342 // Force update 340 return sequelizeTypescript.transaction(async t => {
343 actor.setDataValue('updatedAt', new Date()) 341 updateInstanceWithAnother(actor, result.actor)
344 await actor.save({ transaction: t })
345 342
346 if (actor.Account) { 343 if (result.avatarName !== undefined) {
347 await actor.save({ transaction: t }) 344 await updateActorAvatarInstance(actor, result.avatarName, t)
345 }
348 346
349 actor.Account.set('name', result.name) 347 // Force update
350 await actor.Account.save({ transaction: t }) 348 actor.setDataValue('updatedAt', new Date())
351 } else if (actor.VideoChannel) {
352 await actor.save({ transaction: t }) 349 await actor.save({ transaction: t })
353 350
354 actor.VideoChannel.set('name', result.name) 351 if (actor.Account) {
355 await actor.VideoChannel.save({ transaction: t }) 352 await actor.save({ transaction: t })
356 } 353
354 actor.Account.set('name', result.name)
355 await actor.Account.save({ transaction: t })
356 } else if (actor.VideoChannel) {
357 await actor.save({ transaction: t })
358
359 actor.VideoChannel.set('name', result.name)
360 await actor.VideoChannel.save({ transaction: t })
361 }
357 362
363 return actor
364 })
365 } catch (err) {
366 logger.warn('Cannot refresh actor.', err)
358 return actor 367 return actor
359 }) 368 }
360} 369}
361 370
362function normalizeActor (actor: any) { 371function normalizeActor (actor: any) {
diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts
index 4fc97cc38..b1b370a1a 100644
--- a/server/lib/activitypub/fetch.ts
+++ b/server/lib/activitypub/fetch.ts
@@ -1,13 +1,12 @@
1import { Transaction } from 'sequelize'
2import { ActorModel } from '../../models/activitypub/actor' 1import { ActorModel } from '../../models/activitypub/actor'
3import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler' 2import { JobQueue } from '../job-queue'
4 3
5async function addFetchOutboxJob (actor: ActorModel, t: Transaction) { 4async function addFetchOutboxJob (actor: ActorModel) {
6 const jobPayload: ActivityPubHttpPayload = { 5 const payload = {
7 uris: [ actor.outboxUrl ] 6 uris: [ actor.outboxUrl ]
8 } 7 }
9 8
10 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) 9 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
11} 10}
12 11
13export { 12export {
diff --git a/server/lib/activitypub/process/process-accept.ts b/server/lib/activitypub/process/process-accept.ts
index 551f09ea7..7db2f8ff0 100644
--- a/server/lib/activitypub/process/process-accept.ts
+++ b/server/lib/activitypub/process/process-accept.ts
@@ -26,6 +26,6 @@ async function processAccept (actor: ActorModel, targetActor: ActorModel) {
26 if (follow.state !== 'accepted') { 26 if (follow.state !== 'accepted') {
27 follow.set('state', 'accepted') 27 follow.set('state', 'accepted')
28 await follow.save() 28 await follow.save()
29 await addFetchOutboxJob(targetActor, undefined) 29 await addFetchOutboxJob(targetActor)
30 } 30 }
31} 31}
diff --git a/server/lib/activitypub/process/process-follow.ts b/server/lib/activitypub/process/process-follow.ts
index 69f5c51b5..dc1d542b5 100644
--- a/server/lib/activitypub/process/process-follow.ts
+++ b/server/lib/activitypub/process/process-follow.ts
@@ -63,7 +63,7 @@ async function follow (actor: ActorModel, targetActorURL: string) {
63 actorFollow.ActorFollowing = targetActor 63 actorFollow.ActorFollowing = targetActor
64 64
65 // Target sends to actor he accepted the follow request 65 // Target sends to actor he accepted the follow request
66 return sendAccept(actorFollow, t) 66 return sendAccept(actorFollow)
67 }) 67 })
68 68
69 logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) 69 logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url)
diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts
index dc0d3de57..7a21f0c94 100644
--- a/server/lib/activitypub/send/misc.ts
+++ b/server/lib/activitypub/send/misc.ts
@@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
7import { VideoModel } from '../../../models/video/video' 7import { VideoModel } from '../../../models/video/video'
8import { VideoCommentModel } from '../../../models/video/video-comment' 8import { VideoCommentModel } from '../../../models/video/video-comment'
9import { VideoShareModel } from '../../../models/video/video-share' 9import { VideoShareModel } from '../../../models/video/video-share'
10import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' 10import { JobQueue } from '../../job-queue'
11 11
12async function forwardActivity ( 12async function forwardActivity (
13 activity: Activity, 13 activity: Activity,
@@ -35,12 +35,11 @@ async function forwardActivity (
35 35
36 logger.debug('Creating forwarding job.', { uris }) 36 logger.debug('Creating forwarding job.', { uris })
37 37
38 const jobPayload: ActivityPubHttpPayload = { 38 const payload = {
39 uris, 39 uris,
40 body: activity 40 body: activity
41 } 41 }
42 42 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
43 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload)
44} 43}
45 44
46async function broadcastToFollowers ( 45async function broadcastToFollowers (
@@ -51,44 +50,43 @@ async function broadcastToFollowers (
51 actorsException: ActorModel[] = [] 50 actorsException: ActorModel[] = []
52) { 51) {
53 const uris = await computeFollowerUris(toActorFollowers, actorsException, t) 52 const uris = await computeFollowerUris(toActorFollowers, actorsException, t)
54 return broadcastTo(uris, data, byActor, t) 53 return broadcastTo(uris, data, byActor)
55} 54}
56 55
57async function broadcastToActors ( 56async function broadcastToActors (
58 data: any, 57 data: any,
59 byActor: ActorModel, 58 byActor: ActorModel,
60 toActors: ActorModel[], 59 toActors: ActorModel[],
61 t: Transaction,
62 actorsException: ActorModel[] = [] 60 actorsException: ActorModel[] = []
63) { 61) {
64 const uris = await computeUris(toActors, actorsException) 62 const uris = await computeUris(toActors, actorsException)
65 return broadcastTo(uris, data, byActor, t) 63 return broadcastTo(uris, data, byActor)
66} 64}
67 65
68async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { 66async function broadcastTo (uris: string[], data: any, byActor: ActorModel) {
69 if (uris.length === 0) return undefined 67 if (uris.length === 0) return undefined
70 68
71 logger.debug('Creating broadcast job.', { uris }) 69 logger.debug('Creating broadcast job.', { uris })
72 70
73 const jobPayload: ActivityPubHttpPayload = { 71 const payload = {
74 uris, 72 uris,
75 signatureActorId: byActor.id, 73 signatureActorId: byActor.id,
76 body: data 74 body: data
77 } 75 }
78 76
79 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) 77 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
80} 78}
81 79
82async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { 80async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) {
83 logger.debug('Creating unicast job.', { uri: toActorUrl }) 81 logger.debug('Creating unicast job.', { uri: toActorUrl })
84 82
85 const jobPayload: ActivityPubHttpPayload = { 83 const payload = {
86 uris: [ toActorUrl ], 84 uri: toActorUrl,
87 signatureActorId: byActor.id, 85 signatureActorId: byActor.id,
88 body: data 86 body: data
89 } 87 }
90 88
91 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) 89 return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
92} 90}
93 91
94function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { 92function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) {
diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts
index 4eaa329d9..064fd88d2 100644
--- a/server/lib/activitypub/send/send-accept.ts
+++ b/server/lib/activitypub/send/send-accept.ts
@@ -1,4 +1,3 @@
1import { Transaction } from 'sequelize'
2import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' 1import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub'
3import { ActorModel } from '../../../models/activitypub/actor' 2import { ActorModel } from '../../../models/activitypub/actor'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from
6import { unicastTo } from './misc' 5import { unicastTo } from './misc'
7import { followActivityData } from './send-follow' 6import { followActivityData } from './send-follow'
8 7
9async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { 8async function sendAccept (actorFollow: ActorFollowModel) {
10 const follower = actorFollow.ActorFollower 9 const follower = actorFollow.ActorFollower
11 const me = actorFollow.ActorFollowing 10 const me = actorFollow.ActorFollowing
12 11
@@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) {
16 const url = getActorFollowAcceptActivityPubUrl(actorFollow) 15 const url = getActorFollowAcceptActivityPubUrl(actorFollow)
17 const data = acceptActivityData(url, me, followData) 16 const data = acceptActivityData(url, me, followData)
18 17
19 return unicastTo(data, me, follower.inboxUrl, t) 18 return unicastTo(data, me, follower.inboxUrl)
20} 19}
21 20
22// --------------------------------------------------------------------------- 21// ---------------------------------------------------------------------------
diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts
index 578fbc630..93b5668d2 100644
--- a/server/lib/activitypub/send/send-announce.ts
+++ b/server/lib/activitypub/send/send-announce.ts
@@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel
42 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) 42 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
43 const data = await createActivityData(url, byActor, announcedActivity, t, audience) 43 const data = await createActivityData(url, byActor, announcedActivity, t, audience)
44 44
45 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 45 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
46} 46}
47 47
48async function announceActivityData ( 48async function announceActivityData (
diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts
index 9db663be1..b92615e9b 100644
--- a/server/lib/activitypub/send/send-create.ts
+++ b/server/lib/activitypub/send/send-create.ts
@@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse'
8import { VideoCommentModel } from '../../../models/video/video-comment' 8import { VideoCommentModel } from '../../../models/video/video-comment'
9import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' 9import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url'
10import { 10import {
11 audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, 11 audiencify,
12 getOriginVideoAudience, getOriginVideoCommentAudience, 12 broadcastToActors,
13 broadcastToFollowers,
14 getActorsInvolvedInVideo,
15 getAudience,
16 getObjectFollowersAudience,
17 getOriginVideoAudience,
18 getOriginVideoCommentAudience,
13 unicastTo 19 unicastTo
14} from './misc' 20} from './misc'
15 21
@@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel,
31 const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } 37 const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] }
32 const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) 38 const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience)
33 39
34 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 40 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
35} 41}
36 42
37async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { 43async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) {
@@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr
47 53
48 // This was a reply, send it to the parent actors 54 // This was a reply, send it to the parent actors
49 const actorsException = [ byActor ] 55 const actorsException = [ byActor ]
50 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) 56 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException)
51 57
52 // Broadcast to our followers 58 // Broadcast to our followers
53 await broadcastToFollowers(data, byActor, [ byActor ], t) 59 await broadcastToFollowers(data, byActor, [ byActor ], t)
54 60
55 // Send to origin 61 // Send to origin
56 return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t) 62 return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl)
57} 63}
58 64
59async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { 65async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) {
@@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode
69 75
70 // This was a reply, send it to the parent actors 76 // This was a reply, send it to the parent actors
71 const actorsException = [ byActor ] 77 const actorsException = [ byActor ]
72 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) 78 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException)
73 79
74 // Broadcast to our followers 80 // Broadcast to our followers
75 await broadcastToFollowers(data, byActor, [ byActor ], t) 81 await broadcastToFollowers(data, byActor, [ byActor ], t)
@@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t
86 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) 92 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
87 const data = await createActivityData(url, byActor, viewActivityData, t, audience) 93 const data = await createActivityData(url, byActor, viewActivityData, t, audience)
88 94
89 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 95 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
90} 96}
91 97
92async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 98async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel
111 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) 117 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
112 const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) 118 const data = await createActivityData(url, byActor, dislikeActivityData, t, audience)
113 119
114 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 120 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
115} 121}
116 122
117async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 123async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts
index eac60e94f..4e9865af4 100644
--- a/server/lib/activitypub/send/send-follow.ts
+++ b/server/lib/activitypub/send/send-follow.ts
@@ -1,18 +1,17 @@
1import { Transaction } from 'sequelize'
2import { ActivityFollow } from '../../../../shared/models/activitypub' 1import { ActivityFollow } from '../../../../shared/models/activitypub'
3import { ActorModel } from '../../../models/activitypub/actor' 2import { ActorModel } from '../../../models/activitypub/actor'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { getActorFollowActivityPubUrl } from '../url' 4import { getActorFollowActivityPubUrl } from '../url'
6import { unicastTo } from './misc' 5import { unicastTo } from './misc'
7 6
8function sendFollow (actorFollow: ActorFollowModel, t: Transaction) { 7function sendFollow (actorFollow: ActorFollowModel) {
9 const me = actorFollow.ActorFollower 8 const me = actorFollow.ActorFollower
10 const following = actorFollow.ActorFollowing 9 const following = actorFollow.ActorFollowing
11 10
12 const url = getActorFollowActivityPubUrl(actorFollow) 11 const url = getActorFollowActivityPubUrl(actorFollow)
13 const data = followActivityData(url, me, following) 12 const data = followActivityData(url, me, following)
14 13
15 return unicastTo(data, me, following.inboxUrl, t) 14 return unicastTo(data, me, following.inboxUrl)
16} 15}
17 16
18function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { 17function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow {
diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts
index 743646455..78ed1aaf2 100644
--- a/server/lib/activitypub/send/send-like.ts
+++ b/server/lib/activitypub/send/send-like.ts
@@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran
20 const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) 20 const audience = getOriginVideoAudience(video, accountsInvolvedInVideo)
21 const data = await likeActivityData(url, byActor, video, t, audience) 21 const data = await likeActivityData(url, byActor, video, t, audience)
22 22
23 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 23 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
24} 24}
25 25
26async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 26async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts
index 3a0597fba..4a08b5ca1 100644
--- a/server/lib/activitypub/send/send-undo.ts
+++ b/server/lib/activitypub/send/send-undo.ts
@@ -1,11 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { 2import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub'
3 ActivityAudience,
4 ActivityCreate,
5 ActivityFollow,
6 ActivityLike,
7 ActivityUndo
8} from '../../../../shared/models/activitypub'
9import { ActorModel } from '../../../models/activitypub/actor' 3import { ActorModel } from '../../../models/activitypub/actor'
10import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
11import { VideoModel } from '../../../models/video/video' 5import { VideoModel } from '../../../models/video/video'
@@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) {
33 const object = followActivityData(followUrl, me, following) 27 const object = followActivityData(followUrl, me, following)
34 const data = await undoActivityData(undoUrl, me, object, t) 28 const data = await undoActivityData(undoUrl, me, object, t)
35 29
36 return unicastTo(data, me, following.inboxUrl, t) 30 return unicastTo(data, me, following.inboxUrl)
37} 31}
38 32
39async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { 33async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t:
45 const object = await likeActivityData(likeUrl, byActor, video, t) 39 const object = await likeActivityData(likeUrl, byActor, video, t)
46 const data = await undoActivityData(undoUrl, byActor, object, t, audience) 40 const data = await undoActivityData(undoUrl, byActor, object, t, audience)
47 41
48 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 42 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
49} 43}
50 44
51async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 45async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel,
72 66
73 const data = await undoActivityData(undoUrl, byActor, object, t, audience) 67 const data = await undoActivityData(undoUrl, byActor, object, t, audience)
74 68
75 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 69 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
76} 70}
77 71
78async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 72async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 3f780e319..159856cda 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,10 +1,19 @@
1import * as kue from 'kue'
1import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' 5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
5 6
6async function process (payload: ActivityPubHttpPayload, jobId: number) { 7export type ActivitypubHttpBroadcastPayload = {
7 logger.info('Processing ActivityPub broadcast in job %d.', jobId) 8 uris: string[]
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpBroadcast (job: kue.Job) {
14 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpBroadcastPayload
8 17
9 const body = await computeBody(payload) 18 const body = await computeBody(payload)
10 const httpSignatureOptions = await buildSignedRequestOptions(payload) 19 const httpSignatureOptions = await buildSignedRequestOptions(payload)
@@ -26,28 +35,15 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
26 await doRequest(options) 35 await doRequest(options)
27 goodUrls.push(uri) 36 goodUrls.push(uri)
28 } catch (err) { 37 } catch (err) {
29 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) 38 badUrls.push(uri)
30 if (isRetryingLater === false) badUrls.push(uri)
31 } 39 }
32 } 40 }
33 41
34 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) 42 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
35} 43}
36 44
37function onError (err: Error, jobId: number) {
38 logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
39 return Promise.resolve()
40}
41
42function onSuccess (jobId: number) {
43 logger.info('Job %d is a success.', jobId)
44 return Promise.resolve()
45}
46
47// --------------------------------------------------------------------------- 45// ---------------------------------------------------------------------------
48 46
49export { 47export {
50 process, 48 processActivityPubHttpBroadcast
51 onError,
52 onSuccess
53} 49}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index a7b5aabd0..062211c85 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,11 +1,18 @@
1import * as kue from 'kue'
1import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
3import { ACTIVITY_PUB } from '../../../initializers' 4import { ACTIVITY_PUB } from '../../../initializers'
4import { processActivities } from '../../activitypub/process' 5import { processActivities } from '../../activitypub/process'
5import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' 6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
6 7
7async function process (payload: ActivityPubHttpPayload, jobId: number) { 8export type ActivitypubHttpFetcherPayload = {
8 logger.info('Processing ActivityPub fetcher in job %d.', jobId) 9 uris: string[]
10}
11
12async function processActivityPubHttpFetcher (job: kue.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14
15 const payload = job.data as ActivitypubHttpBroadcastPayload
9 16
10 const options = { 17 const options = {
11 method: 'GET', 18 method: 'GET',
@@ -49,20 +56,8 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
49 } 56 }
50} 57}
51 58
52function onError (err: Error, jobId: number) {
53 logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err)
54 return Promise.resolve()
55}
56
57function onSuccess (jobId: number) {
58 logger.info('Job %d is a success.', jobId)
59 return Promise.resolve()
60}
61
62// --------------------------------------------------------------------------- 59// ---------------------------------------------------------------------------
63 60
64export { 61export {
65 process, 62 processActivityPubHttpFetcher
66 onError,
67 onSuccess
68} 63}
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
new file mode 100644
index 000000000..9b4188c50
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -0,0 +1,43 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6
7export type ActivitypubHttpUnicastPayload = {
8 uri: string
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpUnicast (job: kue.Job) {
14 logger.info('Processing ActivityPub unicast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpUnicastPayload
17 const uri = payload.uri
18
19 const body = await computeBody(payload)
20 const httpSignatureOptions = await buildSignedRequestOptions(payload)
21
22 const options = {
23 method: 'POST',
24 uri,
25 json: body,
26 httpSignature: httpSignatureOptions
27 }
28
29 try {
30 await doRequest(options)
31 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
32 } catch (err) {
33 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
34
35 throw err
36 }
37}
38
39// ---------------------------------------------------------------------------
40
41export {
42 processActivityPubHttpUnicast
43}
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
new file mode 100644
index 000000000..c087371c6
--- /dev/null
+++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
@@ -0,0 +1,39 @@
1import { buildSignedActivity } from '../../../../helpers/activitypub'
2import { getServerActor } from '../../../../helpers/utils'
3import { ActorModel } from '../../../../models/activitypub/actor'
4
5async function computeBody (payload: { body: any, signatureActorId?: number }) {
6 let body = payload.body
7
8 if (payload.signatureActorId) {
9 const actorSignature = await ActorModel.load(payload.signatureActorId)
10 if (!actorSignature) throw new Error('Unknown signature actor id.')
11 body = await buildSignedActivity(actorSignature, payload.body)
12 }
13
14 return body
15}
16
17async function buildSignedRequestOptions (payload: { signatureActorId?: number }) {
18 let actor: ActorModel
19 if (payload.signatureActorId) {
20 actor = await ActorModel.load(payload.signatureActorId)
21 if (!actor) throw new Error('Unknown signature actor id.')
22 } else {
23 // We need to sign the request, so use the server
24 actor = await getServerActor()
25 }
26
27 const keyId = actor.getWebfingerUrl()
28 return {
29 algorithm: 'rsa-sha256',
30 authorizationHeaderName: 'Signature',
31 keyId,
32 key: actor.privateKey
33 }
34}
35
36export {
37 computeBody,
38 buildSignedRequestOptions
39}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/job-queue/handlers/video-file.ts
index f224a31b4..5294483bd 100644
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -1,38 +1,60 @@
1import * as Bluebird from 'bluebird' 1import * as kue from 'kue'
2import { VideoResolution } from '../../../../shared'
2import { VideoPrivacy } from '../../../../shared/models/videos' 3import { VideoPrivacy } from '../../../../shared/models/videos'
3import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
4import { computeResolutionsToTranscode } from '../../../helpers/utils' 5import { computeResolutionsToTranscode } from '../../../helpers/utils'
5import { sequelizeTypescript } from '../../../initializers' 6import { sequelizeTypescript } from '../../../initializers'
6import { JobModel } from '../../../models/job/job'
7import { VideoModel } from '../../../models/video/video' 7import { VideoModel } from '../../../models/video/video'
8import { shareVideoByServerAndChannel } from '../../activitypub' 8import { shareVideoByServerAndChannel } from '../../activitypub'
9import { sendCreateVideo } from '../../activitypub/send' 9import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send'
10import { JobScheduler } from '../job-scheduler' 10import { JobQueue } from '../job-queue'
11import { TranscodingJobPayload } from './transcoding-job-scheduler'
12 11
13async function process (data: TranscodingJobPayload, jobId: number) { 12export type VideoFilePayload = {
14 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) 13 videoUUID: string
14 resolution?: VideoResolution
15}
16
17async function processVideoFile (job: kue.Job) {
18 const payload = job.data as VideoFilePayload
19 logger.info('Processing video file in job %d.', job.id)
20
21 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID)
15 // No video, maybe deleted? 22 // No video, maybe deleted?
16 if (!video) { 23 if (!video) {
17 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 24 logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid })
18 return undefined 25 return undefined
19 } 26 }
20 27
21 await video.optimizeOriginalVideofile() 28 // Transcoding in other resolution
29 if (payload.resolution) {
30 await video.transcodeOriginalVideofile(payload.resolution)
31 await onVideoFileTranscoderSuccess(video)
32 } else {
33 await video.optimizeOriginalVideofile()
34 await onVideoFileOptimizerSuccess(video)
35 }
22 36
23 return video 37 return video
24} 38}
25 39
26function onError (err: Error, jobId: number) { 40async function onVideoFileTranscoderSuccess (video: VideoModel) {
27 logger.error('Error when optimized video file in job %d.', jobId, err) 41 if (video === undefined) return undefined
28 return Promise.resolve() 42
43 // Maybe the video changed in database, refresh it
44 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
45 // Video does not exist anymore
46 if (!videoDatabase) return undefined
47
48 if (video.privacy !== VideoPrivacy.PRIVATE) {
49 await sendUpdateVideo(video, undefined)
50 }
51
52 return undefined
29} 53}
30 54
31async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) { 55async function onVideoFileOptimizerSuccess (video: VideoModel) {
32 if (video === undefined) return undefined 56 if (video === undefined) return undefined
33 57
34 logger.info('Job %d is a success.', jobId)
35
36 // Maybe the video changed in database, refresh it 58 // Maybe the video changed in database, refresh it
37 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) 59 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
38 // Video does not exist anymore 60 // Video does not exist anymore
@@ -56,7 +78,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
56 if (resolutionsEnabled.length !== 0) { 78 if (resolutionsEnabled.length !== 0) {
57 try { 79 try {
58 await sequelizeTypescript.transaction(async t => { 80 await sequelizeTypescript.transaction(async t => {
59 const tasks: Bluebird<JobModel>[] = [] 81 const tasks: Promise<any>[] = []
60 82
61 for (const resolution of resolutionsEnabled) { 83 for (const resolution of resolutionsEnabled) {
62 const dataInput = { 84 const dataInput = {
@@ -64,7 +86,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
64 resolution 86 resolution
65 } 87 }
66 88
67 const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) 89 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
68 tasks.push(p) 90 tasks.push(p)
69 } 91 }
70 92
@@ -84,7 +106,5 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
84// --------------------------------------------------------------------------- 106// ---------------------------------------------------------------------------
85 107
86export { 108export {
87 process, 109 processVideoFile
88 onError,
89 onSuccess
90} 110}
diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts
new file mode 100644
index 000000000..57231e649
--- /dev/null
+++ b/server/lib/job-queue/index.ts
@@ -0,0 +1 @@
export * from './job-queue'
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
new file mode 100644
index 000000000..7a2b6c78d
--- /dev/null
+++ b/server/lib/job-queue/job-queue.ts
@@ -0,0 +1,124 @@
1import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
8import { processVideoFile, VideoFilePayload } from './handlers/video-file'
9
10type CreateJobArgument =
11 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
12 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
13 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
14 { type: 'video-file', payload: VideoFilePayload }
15
16const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
17 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
18 'activitypub-http-unicast': processActivityPubHttpUnicast,
19 'activitypub-http-fetcher': processActivityPubHttpFetcher,
20 'video-file': processVideoFile
21}
22
23class JobQueue {
24
25 private static instance: JobQueue
26
27 private jobQueue: kue.Queue
28 private initialized = false
29
30 private constructor () {}
31
32 init () {
33 // Already initialized
34 if (this.initialized === true) return
35 this.initialized = true
36
37 this.jobQueue = kue.createQueue({
38 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
39 redis: {
40 host: CONFIG.REDIS.HOSTNAME,
41 port: CONFIG.REDIS.PORT,
42 auth: CONFIG.REDIS.AUTH
43 }
44 })
45
46 this.jobQueue.on('error', err => {
47 logger.error('Error in job queue.', err)
48 process.exit(-1)
49 })
50 this.jobQueue.watchStuckJobs(5000)
51
52 for (const handlerName of Object.keys(handlers)) {
53 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
54 try {
55 const res = await handlers[ handlerName ](job)
56 return done(null, res)
57 } catch (err) {
58 return done(err)
59 }
60 })
61 }
62 }
63
64 createJob (obj: CreateJobArgument, priority = 'normal') {
65 return new Promise((res, rej) => {
66 this.jobQueue
67 .create(obj.type, obj.payload)
68 .priority(priority)
69 .attempts(JOB_ATTEMPTS[obj.type])
70 .backoff({ type: 'exponential' })
71 .save(err => {
72 if (err) return rej(err)
73
74 return res()
75 })
76 })
77 }
78
79 listForApi (state: JobState, start: number, count: number, sort: string) {
80 return new Promise<kue.Job[]>((res, rej) => {
81 kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
82 if (err) return rej(err)
83
84 return res(jobs)
85 })
86 })
87 }
88
89 count (state: JobState) {
90 return new Promise<number>((res, rej) => {
91 this.jobQueue[state + 'Count']((err, total) => {
92 if (err) return rej(err)
93
94 return res(total)
95 })
96 })
97 }
98
99 removeOldJobs () {
100 const now = new Date().getTime()
101 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
102 if (err) {
103 logger.error('Cannot get jobs when removing old jobs.', err)
104 return
105 }
106
107 for (const job of jobs) {
108 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
109 job.remove()
110 }
111 }
112 })
113 }
114
115 static get Instance () {
116 return this.instance || (this.instance = new this())
117 }
118}
119
120// ---------------------------------------------------------------------------
121
122export {
123 JobQueue
124}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
deleted file mode 100644
index 4459152db..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
+++ /dev/null
@@ -1,94 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { buildSignedActivity } from '../../../helpers/activitypub'
3import { logger } from '../../../helpers/logger'
4import { getServerActor } from '../../../helpers/utils'
5import { ACTIVITY_PUB } from '../../../initializers'
6import { ActorModel } from '../../../models/activitypub/actor'
7import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
8import { JobHandler, JobScheduler } from '../job-scheduler'
9
10import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
11import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
12import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'
13
14type ActivityPubHttpPayload = {
15 uris: string[]
16 signatureActorId?: number
17 body?: any
18 attemptNumber?: number
19}
20
21const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
22 activitypubHttpBroadcastHandler,
23 activitypubHttpUnicastHandler,
24 activitypubHttpFetcherHandler
25}
26const jobCategory: JobCategory = 'activitypub-http'
27
28const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers)
29
30async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) {
31 logger.warn('Cannot make request to %s.', uri, err)
32
33 let attemptNumber = payload.attemptNumber || 1
34 attemptNumber += 1
35
36 if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) {
37 logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err)
38
39 const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined)
40 if (!actor) {
41 logger.debug('Actor %s is not a follower, do not retry the request.', uri)
42 return false
43 }
44
45 const newPayload = Object.assign(payload, {
46 uris: [ uri ],
47 attemptNumber
48 })
49 await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload)
50
51 return true
52 }
53
54 return false
55}
56
57async function computeBody (payload: ActivityPubHttpPayload) {
58 let body = payload.body
59
60 if (payload.signatureActorId) {
61 const actorSignature = await ActorModel.load(payload.signatureActorId)
62 if (!actorSignature) throw new Error('Unknown signature actor id.')
63 body = await buildSignedActivity(actorSignature, payload.body)
64 }
65
66 return body
67}
68
69async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) {
70 let actor: ActorModel
71 if (payload.signatureActorId) {
72 actor = await ActorModel.load(payload.signatureActorId)
73 if (!actor) throw new Error('Unknown signature actor id.')
74 } else {
75 // We need to sign the request, so use the server
76 actor = await getServerActor()
77 }
78
79 const keyId = actor.getWebfingerUrl()
80 return {
81 algorithm: 'rsa-sha256',
82 authorizationHeaderName: 'Signature',
83 keyId,
84 key: actor.privateKey
85 }
86}
87
88export {
89 ActivityPubHttpPayload,
90 activitypubHttpJobScheduler,
91 maybeRetryRequestLater,
92 computeBody,
93 buildSignedRequestOptions
94}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
deleted file mode 100644
index 54a7504e8..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
+++ /dev/null
@@ -1,50 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
5
6async function process (payload: ActivityPubHttpPayload, jobId: number) {
7 logger.info('Processing ActivityPub unicast in job %d.', jobId)
8
9 const uri = payload.uris[0]
10
11 const body = await computeBody(payload)
12 const httpSignatureOptions = await buildSignedRequestOptions(payload)
13
14 const options = {
15 method: 'POST',
16 uri,
17 json: body,
18 httpSignature: httpSignatureOptions
19 }
20
21 try {
22 await doRequest(options)
23 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
24 } catch (err) {
25 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
26 if (isRetryingLater === false) {
27 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
28 }
29
30 throw err
31 }
32}
33
34function onError (err: Error, jobId: number) {
35 logger.error('Error when sending ActivityPub request in job %d.', jobId, err)
36 return Promise.resolve()
37}
38
39function onSuccess (jobId: number) {
40 logger.info('Job %d is a success.', jobId)
41 return Promise.resolve()
42}
43
44// ---------------------------------------------------------------------------
45
46export {
47 process,
48 onError,
49 onSuccess
50}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts
deleted file mode 100644
index ad8f527b4..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './activitypub-http-job-scheduler'
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts
deleted file mode 100644
index 394264ec1..000000000
--- a/server/lib/jobs/index.ts
+++ /dev/null
@@ -1,2 +0,0 @@
1export * from './activitypub-http-job-scheduler'
2export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
deleted file mode 100644
index 9d55880e6..000000000
--- a/server/lib/jobs/job-scheduler.ts
+++ /dev/null
@@ -1,144 +0,0 @@
1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize'
3import { JobCategory } from '../../../shared'
4import { logger } from '../../helpers/logger'
5import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
6import { JobModel } from '../../models/job/job'
7
8export interface JobHandler<P, T> {
9 process (data: object, jobId: number): Promise<T>
10 onError (err: Error, jobId: number)
11 onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any>
12}
13type JobQueueCallback = (err: Error) => void
14
15class JobScheduler<P, T> {
16
17 constructor (
18 private jobCategory: JobCategory,
19 private jobHandlers: { [ id: string ]: JobHandler<P, T> }
20 ) {}
21
22 async activate () {
23 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
24
25 logger.info('Jobs scheduler %s activated.', this.jobCategory)
26
27 const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))
28
29 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING
31 try {
32 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
33
34 this.enqueueJobs(jobsQueue, jobs)
35 } catch (err) {
36 logger.error('Cannot list pending jobs.', err)
37 }
38
39 forever(
40 async next => {
41 if (jobsQueue.length() !== 0) {
42 // Finish processing the queue first
43 return setTimeout(next, JOBS_FETCHING_INTERVAL)
44 }
45
46 const state = JOB_STATES.PENDING
47 try {
48 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
49
50 this.enqueueJobs(jobsQueue, jobs)
51 } catch (err) {
52 logger.error('Cannot list pending jobs.', err)
53 }
54
55 // Optimization: we could use "drain" from queue object
56 return setTimeout(next, JOBS_FETCHING_INTERVAL)
57 },
58
59 err => logger.error('Error in job scheduler queue.', err)
60 )
61 }
62
63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
64 const createQuery = {
65 state: JOB_STATES.PENDING,
66 category: this.jobCategory,
67 handlerName,
68 handlerInputData
69 }
70
71 const options = { transaction }
72
73 return JobModel.create(createQuery, options)
74 }
75
76 private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
77 jobs.forEach(job => jobsQueue.push(job))
78 }
79
80 private async processJob (job: JobModel, callback: (err: Error) => void) {
81 const jobHandler = this.jobHandlers[job.handlerName]
82 if (jobHandler === undefined) {
83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
84 logger.error(errorString)
85
86 const error = new Error(errorString)
87 await this.onJobError(jobHandler, job, error)
88 return callback(error)
89 }
90
91 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
92
93 job.state = JOB_STATES.PROCESSING
94 await job.save()
95
96 try {
97 const result: T = await jobHandler.process(job.handlerInputData, job.id)
98 await this.onJobSuccess(jobHandler, job, result)
99 } catch (err) {
100 logger.error('Error in job handler %s.', job.handlerName, err)
101
102 try {
103 await this.onJobError(jobHandler, job, err)
104 } catch (innerErr) {
105 this.cannotSaveJobError(innerErr)
106 return callback(innerErr)
107 }
108 }
109
110 return callback(null)
111 }
112
113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
114 job.state = JOB_STATES.ERROR
115
116 try {
117 await job.save()
118 if (jobHandler) await jobHandler.onError(err, job.id)
119 } catch (err) {
120 this.cannotSaveJobError(err)
121 }
122 }
123
124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
125 job.state = JOB_STATES.SUCCESS
126
127 try {
128 await job.save()
129 await jobHandler.onSuccess(job.id, jobResult, this)
130 } catch (err) {
131 this.cannotSaveJobError(err)
132 }
133 }
134
135 private cannotSaveJobError (err: Error) {
136 logger.error('Cannot save new job state.', err)
137 }
138}
139
140// ---------------------------------------------------------------------------
141
142export {
143 JobScheduler
144}
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts
deleted file mode 100644
index 73152a1be..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
deleted file mode 100644
index e5530a73c..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
+++ /dev/null
@@ -1,23 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { VideoModel } from '../../../models/video/video'
3import { JobHandler, JobScheduler } from '../job-scheduler'
4
5import * as videoFileOptimizer from './video-file-optimizer-handler'
6import * as videoFileTranscoder from './video-file-transcoder-handler'
7
8type TranscodingJobPayload = {
9 videoUUID: string
10 resolution?: number
11}
12const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = {
13 videoFileOptimizer,
14 videoFileTranscoder
15}
16const jobCategory: JobCategory = 'transcoding'
17
18const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers)
19
20export {
21 TranscodingJobPayload,
22 transcodingJobScheduler
23}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
deleted file mode 100644
index 883d3eba8..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
+++ /dev/null
@@ -1,48 +0,0 @@
1import { VideoResolution } from '../../../../shared'
2import { VideoPrivacy } from '../../../../shared/models/videos'
3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video'
5import { sendUpdateVideo } from '../../activitypub/send'
6
7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
9 // No video, maybe deleted?
10 if (!video) {
11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
12 return undefined
13 }
14
15 await video.transcodeOriginalVideofile(data.resolution)
16
17 return video
18}
19
20function onError (err: Error, jobId: number) {
21 logger.error('Error when transcoding video file in job %d.', jobId, err)
22 return Promise.resolve()
23}
24
25async function onSuccess (jobId: number, video: VideoModel) {
26 if (video === undefined) return undefined
27
28 logger.info('Job %d is a success.', jobId)
29
30 // Maybe the video changed in database, refresh it
31 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
32 // Video does not exist anymore
33 if (!videoDatabase) return undefined
34
35 if (video.privacy !== VideoPrivacy.PRIVATE) {
36 await sendUpdateVideo(video, undefined)
37 }
38
39 return undefined
40}
41
42// ---------------------------------------------------------------------------
43
44export {
45 process,
46 onError,
47 onSuccess
48}
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts
new file mode 100644
index 000000000..add5677ac
--- /dev/null
+++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts
@@ -0,0 +1,19 @@
1import { JobQueue } from '../job-queue'
2import { AbstractScheduler } from './abstract-scheduler'
3
4export class RemoveOldJobsScheduler extends AbstractScheduler {
5
6 private static instance: AbstractScheduler
7
8 private constructor () {
9 super()
10 }
11
12 async execute () {
13 JobQueue.Instance.removeOldJobs()
14 }
15
16 static get Instance () {
17 return this.instance || (this.instance = new this())
18 }
19}