From f012319a644fe8d9d33f2f567fa828442a3b39fd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 16 Mar 2022 15:34:21 +0100 Subject: Process video torrents in order Prevent update before video torrent generation for example --- .../lib/job-queue/handlers/manage-video-torrent.ts | 88 ++++++++++++++++++++++ server/lib/job-queue/job-queue.ts | 7 +- 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 server/lib/job-queue/handlers/manage-video-torrent.ts (limited to 'server/lib') diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts new file mode 100644 index 000000000..5cb4287e1 --- /dev/null +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts @@ -0,0 +1,88 @@ +import { Job } from 'bull' +import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' +import { VideoModel } from '@server/models/video/video' +import { VideoFileModel } from '@server/models/video/video-file' +import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' +import { ManageVideoTorrentPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' + +async function processManageVideoTorrent (job: Job) { + const payload = job.data as ManageVideoTorrentPayload + logger.info('Processing torrent in job %d.', job.id) + + if (payload.action === 'create') return doCreateAction(payload) + if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) +} + +// --------------------------------------------------------------------------- + +export { + processManageVideoTorrent +} + +// --------------------------------------------------------------------------- + +async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'create' }) { + const [ video, file ] = await Promise.all([ + loadVideoOrLog(payload.videoId), + loadFileOrLog(payload.videoFileId) + ]) + + await createTorrentAndSetInfoHash(video, file) + + // Refresh videoFile because the createTorrentAndSetInfoHash could be long + const refreshedFile = await VideoFileModel.loadWithVideo(file.id) + // File does not exist anymore, remove the generated torrent + if (!refreshedFile) return file.removeTorrent() + + refreshedFile.infoHash = file.infoHash + refreshedFile.torrentFilename = file.torrentFilename + + return refreshedFile.save() +} + +async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { + const [ video, streamingPlaylist, file ] = await Promise.all([ + loadVideoOrLog(payload.videoId), + loadStreamingPlaylistOrLog(payload.streamingPlaylistId), + loadFileOrLog(payload.videoFileId) + ]) + + await updateTorrentMetadata(video || streamingPlaylist, file) + + await file.save() +} + +async function loadVideoOrLog (videoId: number) { + if (!videoId) return undefined + + const video = await VideoModel.load(videoId) + if (!video) { + logger.debug('Do not process torrent for video %d: does not exist anymore.', videoId) + } + + return video +} + +async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) { + if (!streamingPlaylistId) return undefined + + const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId) + if (!streamingPlaylist) { + logger.debug('Do not process torrent for streaming playlist %d: does not exist anymore.', streamingPlaylistId) + } + + return streamingPlaylist +} + +async function loadFileOrLog (videoFileId: number) { + if (!videoFileId) return undefined + + const file = await VideoFileModel.loadWithVideo(videoFileId) + + if (!file) { + logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId) + } + + return file +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e10a3bab5..3224abcc3 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -12,6 +12,7 @@ import { EmailPayload, JobState, JobType, + ManageVideoTorrentPayload, MoveObjectStoragePayload, RefreshPayload, VideoEditionPayload, @@ -31,6 +32,7 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' import { processEmail } from './handlers/email' +import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { processMoveToObjectStorage } from './handlers/move-to-object-storage' import { processVideoEdition } from './handlers/video-edition' import { processVideoFileImport } from './handlers/video-file-import' @@ -56,6 +58,7 @@ type CreateJobArgument = { type: 'video-redundancy', payload: VideoRedundancyPayload } | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'video-edition', payload: VideoEditionPayload } | + { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } export type CreateJobOptions = { @@ -79,6 +82,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'actor-keys': processActorKeys, 'video-redundancy': processVideoRedundancy, 'move-to-object-storage': processMoveToObjectStorage, + 'manage-video-torrent': processManageVideoTorrent, 'video-edition': processVideoEdition } @@ -98,6 +102,7 @@ const jobTypes: JobType[] = [ 'actor-keys', 'video-live-ending', 'move-to-object-storage', + 'manage-video-torrent', 'video-edition' ] @@ -185,7 +190,7 @@ class JobQueue { } createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { - const queue = this.queues[obj.type] + const queue: Queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) return -- cgit v1.2.3