From 2a491182e483b97afb1b65c908b23cb48d591807 Mon Sep 17 00:00:00 2001 From: Florent Date: Wed, 10 Aug 2022 09:53:39 +0200 Subject: Channel sync (#5135) * Add external channel URL for channel update / creation (#754) * Disallow synchronisation if user has no video quota (#754) * More constraints serverside (#754) * Disable sync if server configuration does not allow HTTP import (#754) * Working version synchronizing videos with a job (#754) TODO: refactoring, too much code duplication * More logs and try/catch (#754) * Fix eslint error (#754) * WIP: support synchronization time change (#754) * New frontend #754 * WIP: Create sync front (#754) * Enhance UI, sync creation form (#754) * Warning message when HTTP upload is disallowed * More consistent names (#754) * Binding Front with API (#754) * Add a /me API (#754) * Improve list UI (#754) * Implement creation and deletion routes (#754) * Lint (#754) * Lint again (#754) * WIP: UI for triggering import existing videos (#754) * Implement jobs for syncing and importing channels * Don't sync videos before sync creation + avoid concurrency issue (#754) * Cleanup (#754) * Cleanup: OpenAPI + API rework (#754) * Remove dead code (#754) * Eslint (#754) * Revert the mess with whitespaces in constants.ts (#754) * Some fixes after rebase (#754) * Several fixes after PR remarks (#754) * Front + API: Rename video-channels-sync to video-channel-syncs (#754) * Allow enabling channel sync through UI (#754) * getChannelInfo (#754) * Minor fixes: openapi + model + sql (#754) * Simplified API validators (#754) * Rename MChannelSync to MChannelSyncChannel (#754) * Add command for VideoChannelSync (#754) * Use synchronization.enabled config (#754) * Check parameters test + some fixes (#754) * Fix conflict mistake (#754) * Restrict access to video channel sync list API (#754) * Start adding unit test for synchronization (#754) * Continue testing (#754) * Tests finished + convertion of job to scheduler (#754) * Add lastSyncAt field (#754) * Fix externalRemoteUrl sort + creation date not well formatted (#754) * Small fix (#754) * Factorize addYoutubeDLImport and buildVideo (#754) * Check duplicates on channel not on users (#754) * factorize thumbnail generation (#754) * Fetch error should return status 400 (#754) * Separate video-channel-import and video-channel-sync-latest (#754) * Bump DB migration version after rebase (#754) * Prettier states in UI table (#754) * Add DefaultScope in VideoChannelSyncModel (#754) * Fix audit logs (#754) * Ensure user can upload when importing channel + minor fixes (#754) * Mark synchronization as failed on exception + typos (#754) * Change REST API for importing videos into channel (#754) * Add option for fully synchronize a chnanel (#754) * Return a whole sync object on creation to avoid tricks in Front (#754) * Various remarks (#754) * Single quotes by default (#754) * Rename synchronization to video_channel_synchronization * Add check.latest_videos_count and max_per_user options (#754) * Better channel rendering in list #754 * Allow sorting with channel name and state (#754) * Add missing tests for channel imports (#754) * Prefer using a parent job for channel sync * Styling * Client styling Co-authored-by: Chocobozzz --- .../handlers/after-video-channel-import.ts | 37 ++++++++++++++++++++++ .../lib/job-queue/handlers/video-channel-import.ts | 36 +++++++++++++++++++++ server/lib/job-queue/handlers/video-import.ts | 20 +++++++++--- server/lib/job-queue/job-queue.ts | 19 ++++++++--- 4 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 server/lib/job-queue/handlers/after-video-channel-import.ts create mode 100644 server/lib/job-queue/handlers/video-channel-import.ts (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/after-video-channel-import.ts b/server/lib/job-queue/handlers/after-video-channel-import.ts new file mode 100644 index 000000000..ffdd8c5b5 --- /dev/null +++ b/server/lib/job-queue/handlers/after-video-channel-import.ts @@ -0,0 +1,37 @@ +import { Job } from 'bullmq' +import { logger } from '@server/helpers/logger' +import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' +import { AfterVideoChannelImportPayload, VideoChannelSyncState, VideoImportPreventExceptionResult } from '@shared/models' + +export async function processAfterVideoChannelImport (job: Job) { + const payload = job.data as AfterVideoChannelImportPayload + if (!payload.channelSyncId) return + + logger.info('Processing after video channel import in job %s.', job.id) + + const sync = await VideoChannelSyncModel.loadWithChannel(payload.channelSyncId) + if (!sync) { + logger.error('Unknown sync id %d.', payload.channelSyncId) + return + } + + const childrenValues = await job.getChildrenValues() + + let errors = 0 + let successes = 0 + + for (const value of Object.values(childrenValues)) { + if (value.resultType === 'success') successes++ + else if (value.resultType === 'error') errors++ + } + + if (errors > 0) { + sync.state = VideoChannelSyncState.FAILED + logger.error(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" with failures.`, { errors, successes }) + } else { + sync.state = VideoChannelSyncState.SYNCED + logger.info(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" successfully.`, { successes }) + } + + await sync.save() +} diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts new file mode 100644 index 000000000..9bdb2d269 --- /dev/null +++ b/server/lib/job-queue/handlers/video-channel-import.ts @@ -0,0 +1,36 @@ +import { Job } from 'bullmq' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { synchronizeChannel } from '@server/lib/sync-channel' +import { VideoChannelModel } from '@server/models/video/video-channel' +import { VideoChannelImportPayload } from '@shared/models' + +export async function processVideoChannelImport (job: Job) { + const payload = job.data as VideoChannelImportPayload + + logger.info('Processing video channel import in job %s.', job.id) + + // Channel import requires only http upload to be allowed + if (!CONFIG.IMPORT.VIDEOS.HTTP.ENABLED) { + logger.error('Cannot import channel as the HTTP upload is disabled') + return + } + + if (!CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.ENABLED) { + logger.error('Cannot import channel as the synchronization is disabled') + return + } + + const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) + + try { + logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) + + await synchronizeChannel({ + channel: videoChannel, + externalChannelUrl: payload.externalChannelUrl + }) + } catch (err) { + logger.error(`Failed to import channel ${videoChannel.name}`, { err }) + } +} diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index f4629159c..9901b878c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { Hooks } from '@server/lib/plugins/hooks' import { ServerConfigManager } from '@server/lib/server-config-manager' import { isAbleToUploadVideo } from '@server/lib/user' -import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video' +import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { ThumbnailModel } from '@server/models/video/thumbnail' @@ -18,6 +18,7 @@ import { isAudioFile } from '@shared/extra-utils' import { ThumbnailType, VideoImportPayload, + VideoImportPreventExceptionResult, VideoImportState, VideoImportTorrentPayload, VideoImportTorrentPayloadType, @@ -41,20 +42,29 @@ import { Notifier } from '../../notifier' import { generateVideoMiniature } from '../../thumbnail' import { JobQueue } from '../job-queue' -async function processVideoImport (job: Job) { +async function processVideoImport (job: Job): Promise { const payload = job.data as VideoImportPayload const videoImport = await getVideoImportOrDie(payload) if (videoImport.state === VideoImportState.CANCELLED) { logger.info('Do not process import since it has been cancelled', { payload }) - return + return { resultType: 'success' } } videoImport.state = VideoImportState.PROCESSING await videoImport.save() - if (payload.type === 'youtube-dl') return processYoutubeDLImport(job, videoImport, payload) - if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') return processTorrentImport(job, videoImport, payload) + try { + if (payload.type === 'youtube-dl') await processYoutubeDLImport(job, videoImport, payload) + if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') await processTorrentImport(job, videoImport, payload) + + return { resultType: 'success' } + } catch (err) { + if (!payload.preventException) throw err + + logger.warn('Catch error in video import to send value to parent job.', { payload, err }) + return { resultType: 'error' } + } } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 281e2e51a..3970d48b7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -22,6 +22,7 @@ import { ActivitypubHttpFetcherPayload, ActivitypubHttpUnicastPayload, ActorKeysPayload, + AfterVideoChannelImportPayload, DeleteResumableUploadMetaFilePayload, EmailPayload, FederateVideoPayload, @@ -31,6 +32,7 @@ import { MoveObjectStoragePayload, NotifyPayload, RefreshPayload, + VideoChannelImportPayload, VideoFileImportPayload, VideoImportPayload, VideoLiveEndingPayload, @@ -53,12 +55,14 @@ import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processNotify } from './handlers/notify' +import { processVideoChannelImport } from './handlers/video-channel-import' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' +import { processAfterVideoChannelImport } from './handlers/after-video-channel-import' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -79,6 +83,9 @@ export type CreateJobArgument = { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | + { type: 'video-channel-import', payload: VideoChannelImportPayload } | + { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } | { type: 'notify', payload: NotifyPayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | { type: 'federate-video', payload: FederateVideoPayload } @@ -106,8 +113,10 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-redundancy': processVideoRedundancy, 'move-to-object-storage': processMoveToObjectStorage, 'manage-video-torrent': processManageVideoTorrent, - 'notify': processNotify, 'video-studio-edition': processVideoStudioEdition, + 'video-channel-import': processVideoChannelImport, + 'after-video-channel-import': processAfterVideoChannelImport, + 'notify': processNotify, 'federate-video': processFederateVideo } @@ -134,6 +143,8 @@ const jobTypes: JobType[] = [ 'move-to-object-storage', 'manage-video-torrent', 'video-studio-edition', + 'video-channel-import', + 'after-video-channel-import', 'notify', 'federate-video' ] @@ -306,7 +317,7 @@ class JobQueue { .catch(err => logger.error('Cannot create job.', { err, options })) } - async createJob (options: CreateJobArgument & CreateJobOptions) { + createJob (options: CreateJobArgument & CreateJobOptions) { const queue: Queue = this.queues[options.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', options.type) @@ -318,7 +329,7 @@ class JobQueue { return queue.add('job', options.payload, jobOptions) } - async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { + createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { let lastJob: FlowJob for (const job of jobs) { @@ -336,7 +347,7 @@ class JobQueue { return this.flowProducer.add(lastJob) } - async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { + createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { return this.flowProducer.add({ ...this.buildJobFlowOption(parent), -- cgit v1.2.3