diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/after-video-channel-import.ts | 37 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-channel-import.ts | 36 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 20 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 19 |
4 files changed, 103 insertions, 9 deletions
diff --git a/server/lib/job-queue/handlers/after-video-channel-import.ts b/server/lib/job-queue/handlers/after-video-channel-import.ts new file mode 100644 index 000000000..ffdd8c5b5 --- /dev/null +++ b/server/lib/job-queue/handlers/after-video-channel-import.ts | |||
@@ -0,0 +1,37 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' | ||
4 | import { AfterVideoChannelImportPayload, VideoChannelSyncState, VideoImportPreventExceptionResult } from '@shared/models' | ||
5 | |||
6 | export async function processAfterVideoChannelImport (job: Job) { | ||
7 | const payload = job.data as AfterVideoChannelImportPayload | ||
8 | if (!payload.channelSyncId) return | ||
9 | |||
10 | logger.info('Processing after video channel import in job %s.', job.id) | ||
11 | |||
12 | const sync = await VideoChannelSyncModel.loadWithChannel(payload.channelSyncId) | ||
13 | if (!sync) { | ||
14 | logger.error('Unknown sync id %d.', payload.channelSyncId) | ||
15 | return | ||
16 | } | ||
17 | |||
18 | const childrenValues = await job.getChildrenValues<VideoImportPreventExceptionResult>() | ||
19 | |||
20 | let errors = 0 | ||
21 | let successes = 0 | ||
22 | |||
23 | for (const value of Object.values(childrenValues)) { | ||
24 | if (value.resultType === 'success') successes++ | ||
25 | else if (value.resultType === 'error') errors++ | ||
26 | } | ||
27 | |||
28 | if (errors > 0) { | ||
29 | sync.state = VideoChannelSyncState.FAILED | ||
30 | logger.error(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" with failures.`, { errors, successes }) | ||
31 | } else { | ||
32 | sync.state = VideoChannelSyncState.SYNCED | ||
33 | logger.info(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" successfully.`, { successes }) | ||
34 | } | ||
35 | |||
36 | await sync.save() | ||
37 | } | ||
diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts new file mode 100644 index 000000000..9bdb2d269 --- /dev/null +++ b/server/lib/job-queue/handlers/video-channel-import.ts | |||
@@ -0,0 +1,36 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { CONFIG } from '@server/initializers/config' | ||
4 | import { synchronizeChannel } from '@server/lib/sync-channel' | ||
5 | import { VideoChannelModel } from '@server/models/video/video-channel' | ||
6 | import { VideoChannelImportPayload } from '@shared/models' | ||
7 | |||
8 | export async function processVideoChannelImport (job: Job) { | ||
9 | const payload = job.data as VideoChannelImportPayload | ||
10 | |||
11 | logger.info('Processing video channel import in job %s.', job.id) | ||
12 | |||
13 | // Channel import requires only http upload to be allowed | ||
14 | if (!CONFIG.IMPORT.VIDEOS.HTTP.ENABLED) { | ||
15 | logger.error('Cannot import channel as the HTTP upload is disabled') | ||
16 | return | ||
17 | } | ||
18 | |||
19 | if (!CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.ENABLED) { | ||
20 | logger.error('Cannot import channel as the synchronization is disabled') | ||
21 | return | ||
22 | } | ||
23 | |||
24 | const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) | ||
25 | |||
26 | try { | ||
27 | logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) | ||
28 | |||
29 | await synchronizeChannel({ | ||
30 | channel: videoChannel, | ||
31 | externalChannelUrl: payload.externalChannelUrl | ||
32 | }) | ||
33 | } catch (err) { | ||
34 | logger.error(`Failed to import channel ${videoChannel.name}`, { err }) | ||
35 | } | ||
36 | } | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index f4629159c..9901b878c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
8 | import { Hooks } from '@server/lib/plugins/hooks' | 8 | import { Hooks } from '@server/lib/plugins/hooks' |
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | 9 | import { ServerConfigManager } from '@server/lib/server-config-manager' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video' | 11 | import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' |
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 12 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 13 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 14 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
@@ -18,6 +18,7 @@ import { isAudioFile } from '@shared/extra-utils' | |||
18 | import { | 18 | import { |
19 | ThumbnailType, | 19 | ThumbnailType, |
20 | VideoImportPayload, | 20 | VideoImportPayload, |
21 | VideoImportPreventExceptionResult, | ||
21 | VideoImportState, | 22 | VideoImportState, |
22 | VideoImportTorrentPayload, | 23 | VideoImportTorrentPayload, |
23 | VideoImportTorrentPayloadType, | 24 | VideoImportTorrentPayloadType, |
@@ -41,20 +42,29 @@ import { Notifier } from '../../notifier' | |||
41 | import { generateVideoMiniature } from '../../thumbnail' | 42 | import { generateVideoMiniature } from '../../thumbnail' |
42 | import { JobQueue } from '../job-queue' | 43 | import { JobQueue } from '../job-queue' |
43 | 44 | ||
44 | async function processVideoImport (job: Job) { | 45 | async function processVideoImport (job: Job): Promise<VideoImportPreventExceptionResult> { |
45 | const payload = job.data as VideoImportPayload | 46 | const payload = job.data as VideoImportPayload |
46 | 47 | ||
47 | const videoImport = await getVideoImportOrDie(payload) | 48 | const videoImport = await getVideoImportOrDie(payload) |
48 | if (videoImport.state === VideoImportState.CANCELLED) { | 49 | if (videoImport.state === VideoImportState.CANCELLED) { |
49 | logger.info('Do not process import since it has been cancelled', { payload }) | 50 | logger.info('Do not process import since it has been cancelled', { payload }) |
50 | return | 51 | return { resultType: 'success' } |
51 | } | 52 | } |
52 | 53 | ||
53 | videoImport.state = VideoImportState.PROCESSING | 54 | videoImport.state = VideoImportState.PROCESSING |
54 | await videoImport.save() | 55 | await videoImport.save() |
55 | 56 | ||
56 | if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, videoImport, payload) | 57 | try { |
57 | if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') return processTorrentImport(job, videoImport, payload) | 58 | if (payload.type === 'youtube-dl') await processYoutubeDLImport(job, videoImport, payload) |
59 | if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') await processTorrentImport(job, videoImport, payload) | ||
60 | |||
61 | return { resultType: 'success' } | ||
62 | } catch (err) { | ||
63 | if (!payload.preventException) throw err | ||
64 | |||
65 | logger.warn('Catch error in video import to send value to parent job.', { payload, err }) | ||
66 | return { resultType: 'error' } | ||
67 | } | ||
58 | } | 68 | } |
59 | 69 | ||
60 | // --------------------------------------------------------------------------- | 70 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 281e2e51a..3970d48b7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -22,6 +22,7 @@ import { | |||
22 | ActivitypubHttpFetcherPayload, | 22 | ActivitypubHttpFetcherPayload, |
23 | ActivitypubHttpUnicastPayload, | 23 | ActivitypubHttpUnicastPayload, |
24 | ActorKeysPayload, | 24 | ActorKeysPayload, |
25 | AfterVideoChannelImportPayload, | ||
25 | DeleteResumableUploadMetaFilePayload, | 26 | DeleteResumableUploadMetaFilePayload, |
26 | EmailPayload, | 27 | EmailPayload, |
27 | FederateVideoPayload, | 28 | FederateVideoPayload, |
@@ -31,6 +32,7 @@ import { | |||
31 | MoveObjectStoragePayload, | 32 | MoveObjectStoragePayload, |
32 | NotifyPayload, | 33 | NotifyPayload, |
33 | RefreshPayload, | 34 | RefreshPayload, |
35 | VideoChannelImportPayload, | ||
34 | VideoFileImportPayload, | 36 | VideoFileImportPayload, |
35 | VideoImportPayload, | 37 | VideoImportPayload, |
36 | VideoLiveEndingPayload, | 38 | VideoLiveEndingPayload, |
@@ -53,12 +55,14 @@ import { processFederateVideo } from './handlers/federate-video' | |||
53 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 55 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
54 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 56 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
55 | import { processNotify } from './handlers/notify' | 57 | import { processNotify } from './handlers/notify' |
58 | import { processVideoChannelImport } from './handlers/video-channel-import' | ||
56 | import { processVideoFileImport } from './handlers/video-file-import' | 59 | import { processVideoFileImport } from './handlers/video-file-import' |
57 | import { processVideoImport } from './handlers/video-import' | 60 | import { processVideoImport } from './handlers/video-import' |
58 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 61 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
59 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 62 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
60 | import { processVideoTranscoding } from './handlers/video-transcoding' | 63 | import { processVideoTranscoding } from './handlers/video-transcoding' |
61 | import { processVideosViewsStats } from './handlers/video-views-stats' | 64 | import { processVideosViewsStats } from './handlers/video-views-stats' |
65 | import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' | ||
62 | 66 | ||
63 | export type CreateJobArgument = | 67 | export type CreateJobArgument = |
64 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -79,6 +83,9 @@ export type CreateJobArgument = | |||
79 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | 83 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
80 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | 84 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
81 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | 85 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
86 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
87 | { type: 'video-channel-import', payload: VideoChannelImportPayload } | | ||
88 | { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | | ||
82 | { type: 'notify', payload: NotifyPayload } | | 89 | { type: 'notify', payload: NotifyPayload } | |
83 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | 90 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | |
84 | { type: 'federate-video', payload: FederateVideoPayload } | 91 | { type: 'federate-video', payload: FederateVideoPayload } |
@@ -106,8 +113,10 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
106 | 'video-redundancy': processVideoRedundancy, | 113 | 'video-redundancy': processVideoRedundancy, |
107 | 'move-to-object-storage': processMoveToObjectStorage, | 114 | 'move-to-object-storage': processMoveToObjectStorage, |
108 | 'manage-video-torrent': processManageVideoTorrent, | 115 | 'manage-video-torrent': processManageVideoTorrent, |
109 | 'notify': processNotify, | ||
110 | 'video-studio-edition': processVideoStudioEdition, | 116 | 'video-studio-edition': processVideoStudioEdition, |
117 | 'video-channel-import': processVideoChannelImport, | ||
118 | 'after-video-channel-import': processAfterVideoChannelImport, | ||
119 | 'notify': processNotify, | ||
111 | 'federate-video': processFederateVideo | 120 | 'federate-video': processFederateVideo |
112 | } | 121 | } |
113 | 122 | ||
@@ -134,6 +143,8 @@ const jobTypes: JobType[] = [ | |||
134 | 'move-to-object-storage', | 143 | 'move-to-object-storage', |
135 | 'manage-video-torrent', | 144 | 'manage-video-torrent', |
136 | 'video-studio-edition', | 145 | 'video-studio-edition', |
146 | 'video-channel-import', | ||
147 | 'after-video-channel-import', | ||
137 | 'notify', | 148 | 'notify', |
138 | 'federate-video' | 149 | 'federate-video' |
139 | ] | 150 | ] |
@@ -306,7 +317,7 @@ class JobQueue { | |||
306 | .catch(err => logger.error('Cannot create job.', { err, options })) | 317 | .catch(err => logger.error('Cannot create job.', { err, options })) |
307 | } | 318 | } |
308 | 319 | ||
309 | async createJob (options: CreateJobArgument & CreateJobOptions) { | 320 | createJob (options: CreateJobArgument & CreateJobOptions) { |
310 | const queue: Queue = this.queues[options.type] | 321 | const queue: Queue = this.queues[options.type] |
311 | if (queue === undefined) { | 322 | if (queue === undefined) { |
312 | logger.error('Unknown queue %s: cannot create job.', options.type) | 323 | logger.error('Unknown queue %s: cannot create job.', options.type) |
@@ -318,7 +329,7 @@ class JobQueue { | |||
318 | return queue.add('job', options.payload, jobOptions) | 329 | return queue.add('job', options.payload, jobOptions) |
319 | } | 330 | } |
320 | 331 | ||
321 | async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | 332 | createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { |
322 | let lastJob: FlowJob | 333 | let lastJob: FlowJob |
323 | 334 | ||
324 | for (const job of jobs) { | 335 | for (const job of jobs) { |
@@ -336,7 +347,7 @@ class JobQueue { | |||
336 | return this.flowProducer.add(lastJob) | 347 | return this.flowProducer.add(lastJob) |
337 | } | 348 | } |
338 | 349 | ||
339 | async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | 350 | createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { |
340 | return this.flowProducer.add({ | 351 | return this.flowProducer.add({ |
341 | ...this.buildJobFlowOption(parent), | 352 | ...this.buildJobFlowOption(parent), |
342 | 353 | ||