aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts49
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts63
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts43
-rw-r--r--server/lib/job-queue/handlers/utils/activitypub-http-utils.ts39
-rw-r--r--server/lib/job-queue/handlers/video-file.ts110
5 files changed, 304 insertions, 0 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
new file mode 100644
index 000000000..159856cda
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -0,0 +1,49 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6
7export type ActivitypubHttpBroadcastPayload = {
8 uris: string[]
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpBroadcast (job: kue.Job) {
14 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpBroadcastPayload
17
18 const body = await computeBody(payload)
19 const httpSignatureOptions = await buildSignedRequestOptions(payload)
20
21 const options = {
22 method: 'POST',
23 uri: '',
24 json: body,
25 httpSignature: httpSignatureOptions
26 }
27
28 const badUrls: string[] = []
29 const goodUrls: string[] = []
30
31 for (const uri of payload.uris) {
32 options.uri = uri
33
34 try {
35 await doRequest(options)
36 goodUrls.push(uri)
37 } catch (err) {
38 badUrls.push(uri)
39 }
40 }
41
42 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
43}
44
45// ---------------------------------------------------------------------------
46
47export {
48 processActivityPubHttpBroadcast
49}
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
new file mode 100644
index 000000000..062211c85
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -0,0 +1,63 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ACTIVITY_PUB } from '../../../initializers'
5import { processActivities } from '../../activitypub/process'
6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
7
8export type ActivitypubHttpFetcherPayload = {
9 uris: string[]
10}
11
12async function processActivityPubHttpFetcher (job: kue.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14
15 const payload = job.data as ActivitypubHttpBroadcastPayload
16
17 const options = {
18 method: 'GET',
19 uri: '',
20 json: true,
21 activityPub: true
22 }
23
24 for (const uri of payload.uris) {
25 options.uri = uri
26 logger.info('Fetching ActivityPub data on %s.', uri)
27
28 const response = await doRequest(options)
29 const firstBody = response.body
30
31 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
32 const activities = firstBody.first.orderedItems
33
34 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
35
36 await processActivities(activities)
37 }
38
39 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
40 let i = 0
41 let nextLink = firstBody.first.next
42 while (nextLink && i < limit) {
43 options.uri = nextLink
44
45 const { body } = await doRequest(options)
46 nextLink = body.next
47 i++
48
49 if (Array.isArray(body.orderedItems)) {
50 const activities = body.orderedItems
51 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
52
53 await processActivities(activities)
54 }
55 }
56 }
57}
58
59// ---------------------------------------------------------------------------
60
61export {
62 processActivityPubHttpFetcher
63}
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
new file mode 100644
index 000000000..9b4188c50
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -0,0 +1,43 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6
7export type ActivitypubHttpUnicastPayload = {
8 uri: string
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpUnicast (job: kue.Job) {
14 logger.info('Processing ActivityPub unicast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpUnicastPayload
17 const uri = payload.uri
18
19 const body = await computeBody(payload)
20 const httpSignatureOptions = await buildSignedRequestOptions(payload)
21
22 const options = {
23 method: 'POST',
24 uri,
25 json: body,
26 httpSignature: httpSignatureOptions
27 }
28
29 try {
30 await doRequest(options)
31 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
32 } catch (err) {
33 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
34
35 throw err
36 }
37}
38
39// ---------------------------------------------------------------------------
40
41export {
42 processActivityPubHttpUnicast
43}
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
new file mode 100644
index 000000000..c087371c6
--- /dev/null
+++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
@@ -0,0 +1,39 @@
1import { buildSignedActivity } from '../../../../helpers/activitypub'
2import { getServerActor } from '../../../../helpers/utils'
3import { ActorModel } from '../../../../models/activitypub/actor'
4
5async function computeBody (payload: { body: any, signatureActorId?: number }) {
6 let body = payload.body
7
8 if (payload.signatureActorId) {
9 const actorSignature = await ActorModel.load(payload.signatureActorId)
10 if (!actorSignature) throw new Error('Unknown signature actor id.')
11 body = await buildSignedActivity(actorSignature, payload.body)
12 }
13
14 return body
15}
16
17async function buildSignedRequestOptions (payload: { signatureActorId?: number }) {
18 let actor: ActorModel
19 if (payload.signatureActorId) {
20 actor = await ActorModel.load(payload.signatureActorId)
21 if (!actor) throw new Error('Unknown signature actor id.')
22 } else {
23 // We need to sign the request, so use the server
24 actor = await getServerActor()
25 }
26
27 const keyId = actor.getWebfingerUrl()
28 return {
29 algorithm: 'rsa-sha256',
30 authorizationHeaderName: 'Signature',
31 keyId,
32 key: actor.privateKey
33 }
34}
35
36export {
37 computeBody,
38 buildSignedRequestOptions
39}
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
new file mode 100644
index 000000000..5294483bd
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -0,0 +1,110 @@
1import * as kue from 'kue'
2import { VideoResolution } from '../../../../shared'
3import { VideoPrivacy } from '../../../../shared/models/videos'
4import { logger } from '../../../helpers/logger'
5import { computeResolutionsToTranscode } from '../../../helpers/utils'
6import { sequelizeTypescript } from '../../../initializers'
7import { VideoModel } from '../../../models/video/video'
8import { shareVideoByServerAndChannel } from '../../activitypub'
9import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send'
10import { JobQueue } from '../job-queue'
11
12export type VideoFilePayload = {
13 videoUUID: string
14 resolution?: VideoResolution
15}
16
17async function processVideoFile (job: kue.Job) {
18 const payload = job.data as VideoFilePayload
19 logger.info('Processing video file in job %d.', job.id)
20
21 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID)
22 // No video, maybe deleted?
23 if (!video) {
24 logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid })
25 return undefined
26 }
27
28 // Transcoding in other resolution
29 if (payload.resolution) {
30 await video.transcodeOriginalVideofile(payload.resolution)
31 await onVideoFileTranscoderSuccess(video)
32 } else {
33 await video.optimizeOriginalVideofile()
34 await onVideoFileOptimizerSuccess(video)
35 }
36
37 return video
38}
39
40async function onVideoFileTranscoderSuccess (video: VideoModel) {
41 if (video === undefined) return undefined
42
43 // Maybe the video changed in database, refresh it
44 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
45 // Video does not exist anymore
46 if (!videoDatabase) return undefined
47
48 if (video.privacy !== VideoPrivacy.PRIVATE) {
49 await sendUpdateVideo(video, undefined)
50 }
51
52 return undefined
53}
54
55async function onVideoFileOptimizerSuccess (video: VideoModel) {
56 if (video === undefined) return undefined
57
58 // Maybe the video changed in database, refresh it
59 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
60 // Video does not exist anymore
61 if (!videoDatabase) return undefined
62
63 if (video.privacy !== VideoPrivacy.PRIVATE) {
64 // Now we'll add the video's meta data to our followers
65 await sendCreateVideo(video, undefined)
66 await shareVideoByServerAndChannel(video, undefined)
67 }
68
69 const originalFileHeight = await videoDatabase.getOriginalFileHeight()
70
71 // Create transcoding jobs if there are enabled resolutions
72 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
73 logger.info(
74 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight,
75 { resolutions: resolutionsEnabled }
76 )
77
78 if (resolutionsEnabled.length !== 0) {
79 try {
80 await sequelizeTypescript.transaction(async t => {
81 const tasks: Promise<any>[] = []
82
83 for (const resolution of resolutionsEnabled) {
84 const dataInput = {
85 videoUUID: videoDatabase.uuid,
86 resolution
87 }
88
89 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
90 tasks.push(p)
91 }
92
93 await Promise.all(tasks)
94 })
95
96 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
97 } catch (err) {
98 logger.warn('Cannot transcode the video.', err)
99 }
100 } else {
101 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
102 return undefined
103 }
104}
105
106// ---------------------------------------------------------------------------
107
108export {
109 processVideoFile
110}