diff options
Diffstat (limited to 'server/lib/job-queue/handlers')
8 files changed, 39 insertions, 18 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 0ff7b44a0..7d9dd61e9 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -5,11 +5,13 @@ import { doRequest } from '../../../helpers/requests' | |||
5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
6 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' | 6 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' |
7 | import { ActorFollowScoreCache } from '../../files-cache' | 7 | import { ActorFollowScoreCache } from '../../files-cache' |
8 | import { ContextType } from '@server/helpers/activitypub' | ||
8 | 9 | ||
9 | export type ActivitypubHttpBroadcastPayload = { | 10 | export type ActivitypubHttpBroadcastPayload = { |
10 | uris: string[] | 11 | uris: string[] |
11 | signatureActorId?: number | 12 | signatureActorId?: number |
12 | body: any | 13 | body: any |
14 | contextType?: ContextType | ||
13 | } | 15 | } |
14 | 16 | ||
15 | async function processActivityPubHttpBroadcast (job: Bull.Job) { | 17 | async function processActivityPubHttpBroadcast (job: Bull.Job) { |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index c70ce3be9..6b71e2891 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -4,11 +4,13 @@ import { doRequest } from '../../../helpers/requests' | |||
4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' | 5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' |
6 | import { ActorFollowScoreCache } from '../../files-cache' | 6 | import { ActorFollowScoreCache } from '../../files-cache' |
7 | import { ContextType } from '@server/helpers/activitypub' | ||
7 | 8 | ||
8 | export type ActivitypubHttpUnicastPayload = { | 9 | export type ActivitypubHttpUnicastPayload = { |
9 | uri: string | 10 | uri: string |
10 | signatureActorId?: number | 11 | signatureActorId?: number |
11 | body: any | 12 | body: any |
13 | contextType?: ContextType | ||
12 | } | 14 | } |
13 | 15 | ||
14 | async function processActivityPubHttpUnicast (job: Bull.Job) { | 16 | async function processActivityPubHttpUnicast (job: Bull.Job) { |
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts index d3bde6e6a..54b35840d 100644 --- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -1,11 +1,11 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | 1 | import { buildSignedActivity, ContextType } from '../../../../helpers/activitypub' |
2 | import { getServerActor } from '../../../../helpers/utils' | 2 | import { getServerActor } from '../../../../helpers/utils' |
3 | import { ActorModel } from '../../../../models/activitypub/actor' | 3 | import { ActorModel } from '../../../../models/activitypub/actor' |
4 | import { sha256 } from '../../../../helpers/core-utils' | 4 | import { sha256 } from '../../../../helpers/core-utils' |
5 | import { HTTP_SIGNATURE } from '../../../../initializers/constants' | 5 | import { HTTP_SIGNATURE } from '../../../../initializers/constants' |
6 | import { MActor } from '../../../../typings/models' | 6 | import { MActor } from '../../../../typings/models' |
7 | 7 | ||
8 | type Payload = { body: any, signatureActorId?: number } | 8 | type Payload = { body: any, contextType?: ContextType, signatureActorId?: number } |
9 | 9 | ||
10 | async function computeBody (payload: Payload) { | 10 | async function computeBody (payload: Payload) { |
11 | let body = payload.body | 11 | let body = payload.body |
@@ -13,7 +13,7 @@ async function computeBody (payload: Payload) { | |||
13 | if (payload.signatureActorId) { | 13 | if (payload.signatureActorId) { |
14 | const actorSignature = await ActorModel.load(payload.signatureActorId) | 14 | const actorSignature = await ActorModel.load(payload.signatureActorId) |
15 | if (!actorSignature) throw new Error('Unknown signature actor id.') | 15 | if (!actorSignature) throw new Error('Unknown signature actor id.') |
16 | body = await buildSignedActivity(actorSignature, payload.body) | 16 | body = await buildSignedActivity(actorSignature, payload.body, payload.contextType) |
17 | } | 17 | } |
18 | 18 | ||
19 | return body | 19 | return body |
@@ -42,7 +42,7 @@ async function buildSignedRequestOptions (payload: Payload) { | |||
42 | 42 | ||
43 | function buildGlobalHeaders (body: any) { | 43 | function buildGlobalHeaders (body: any) { |
44 | return { | 44 | return { |
45 | 'Digest': buildDigest(body) | 45 | Digest: buildDigest(body) |
46 | } | 46 | } |
47 | } | 47 | } |
48 | 48 | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 99c991e72..be9e7d181 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -11,7 +11,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
11 | import { getVideoFilePath } from '@server/lib/video-paths' | 11 | import { getVideoFilePath } from '@server/lib/video-paths' |
12 | 12 | ||
13 | export type VideoFileImportPayload = { | 13 | export type VideoFileImportPayload = { |
14 | videoUUID: string, | 14 | videoUUID: string |
15 | filePath: string | 15 | filePath: string |
16 | } | 16 | } |
17 | 17 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 1fca17584..09f225cec 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -221,7 +221,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
221 | isNewVideo: true | 221 | isNewVideo: true |
222 | } | 222 | } |
223 | 223 | ||
224 | await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 224 | await JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }) |
225 | } | 225 | } |
226 | 226 | ||
227 | } catch (err) { | 227 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts new file mode 100644 index 000000000..319d7090e --- /dev/null +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -0,0 +1,20 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' | ||
4 | |||
5 | export type VideoRedundancyPayload = { | ||
6 | videoId: number | ||
7 | } | ||
8 | |||
9 | async function processVideoRedundancy (job: Bull.Job) { | ||
10 | const payload = job.data as VideoRedundancyPayload | ||
11 | logger.info('Processing video redundancy in job %d.', job.id) | ||
12 | |||
13 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | ||
14 | } | ||
15 | |||
16 | // --------------------------------------------------------------------------- | ||
17 | |||
18 | export { | ||
19 | processVideoRedundancy | ||
20 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 39b9fac98..c020057c9 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -6,7 +6,6 @@ import { JobQueue } from '../job-queue' | |||
6 | import { federateVideoIfNeeded } from '../../activitypub' | 6 | import { federateVideoIfNeeded } from '../../activitypub' |
7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
8 | import { sequelizeTypescript } from '../../../initializers' | 8 | import { sequelizeTypescript } from '../../../initializers' |
9 | import * as Bluebird from 'bluebird' | ||
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | 9 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' |
11 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' | 10 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' |
12 | import { Notifier } from '../../notifier' | 11 | import { Notifier } from '../../notifier' |
@@ -40,8 +39,11 @@ interface OptimizeTranscodingPayload extends BaseTranscodingPayload { | |||
40 | type: 'optimize' | 39 | type: 'optimize' |
41 | } | 40 | } |
42 | 41 | ||
43 | export type VideoTranscodingPayload = HLSTranscodingPayload | NewResolutionTranscodingPayload | 42 | export type VideoTranscodingPayload = |
44 | | OptimizeTranscodingPayload | MergeAudioTranscodingPayload | 43 | HLSTranscodingPayload |
44 | | NewResolutionTranscodingPayload | ||
45 | | OptimizeTranscodingPayload | ||
46 | | MergeAudioTranscodingPayload | ||
45 | 47 | ||
46 | async function processVideoTranscoding (job: Bull.Job) { | 48 | async function processVideoTranscoding (job: Bull.Job) { |
47 | const payload = job.data as VideoTranscodingPayload | 49 | const payload = job.data as VideoTranscodingPayload |
@@ -105,7 +107,7 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
105 | 107 | ||
106 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { | 108 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
107 | // Maybe the video changed in database, refresh it | 109 | // Maybe the video changed in database, refresh it |
108 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) | 110 | const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) |
109 | // Video does not exist anymore | 111 | // Video does not exist anymore |
110 | if (!videoDatabase) return undefined | 112 | if (!videoDatabase) return undefined |
111 | 113 | ||
@@ -122,8 +124,6 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
122 | await createHlsJobIfEnabled(hlsPayload) | 124 | await createHlsJobIfEnabled(hlsPayload) |
123 | 125 | ||
124 | if (resolutionsEnabled.length !== 0) { | 126 | if (resolutionsEnabled.length !== 0) { |
125 | const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = [] | ||
126 | |||
127 | for (const resolution of resolutionsEnabled) { | 127 | for (const resolution of resolutionsEnabled) { |
128 | let dataInput: VideoTranscodingPayload | 128 | let dataInput: VideoTranscodingPayload |
129 | 129 | ||
@@ -143,12 +143,9 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
143 | } | 143 | } |
144 | } | 144 | } |
145 | 145 | ||
146 | const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 146 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) |
147 | tasks.push(p) | ||
148 | } | 147 | } |
149 | 148 | ||
150 | await Promise.all(tasks) | ||
151 | |||
152 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 149 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
153 | } else { | 150 | } else { |
154 | // No transcoding to do, it's now published | 151 | // No transcoding to do, it's now published |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index 73fa5ed04..2258cd029 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -23,6 +23,8 @@ async function processVideosViews () { | |||
23 | for (const videoId of videoIds) { | 23 | for (const videoId of videoIds) { |
24 | try { | 24 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 25 | const views = await Redis.Instance.getVideoViews(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
27 | |||
26 | if (views) { | 28 | if (views) { |
27 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) |
28 | 30 | ||
@@ -52,8 +54,6 @@ async function processVideosViews () { | |||
52 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) |
53 | } | 55 | } |
54 | } | 56 | } |
55 | |||
56 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
57 | } catch (err) { | 57 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 59 | } |