import express from 'express'
import { Transaction } from 'sequelize/types'
-import { updateTorrentMetadata } from '@server/helpers/webtorrent'
import { changeVideoChannelShare } from '@server/lib/activitypub/share'
+import { JobQueue } from '@server/lib/job-queue'
import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
import { openapiOperationDoc } from '@server/middlewares/doc'
import { FilteredModelAttributes } from '@server/types'
import { MVideoFullLight } from '@server/types/models'
-import { HttpStatusCode, VideoUpdate } from '@shared/models'
+import { HttpStatusCode, ManageVideoTorrentPayload, VideoUpdate } from '@shared/models'
import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
import { resetSequelizeInstance } from '../../../helpers/database-utils'
import { createReqFiles } from '../../../helpers/express-utils'
return { videoInstanceUpdated, isNewVideo }
})
- if (videoInstanceUpdated.isLive !== true && videoInfoToUpdate.name) {
- await updateTorrentsMetadata(videoInstanceUpdated)
- }
+ const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate)
- await sequelizeTypescript.transaction(t => federateVideoIfNeeded(videoInstanceUpdated, isNewVideo, t))
+ await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t))
- if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoInstanceUpdated)
+ if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
- Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res })
+ Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res })
} catch (err) {
// Force fields we want to update
// If the transaction is retried, sequelize will think the object has not changed
}
}
-async function updateTorrentsMetadata (video: MVideoFullLight) {
+async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) {
+ if (video.isLive || !videoInfoToUpdate.name) return video
+
for (const file of (video.VideoFiles || [])) {
- await updateTorrentMetadata(video, file)
+ const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
- await file.save()
+ const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
+ await job.finished()
}
const hls = video.getHLSPlaylist()
- if (!hls) return
- for (const file of (hls.VideoFiles || [])) {
- await updateTorrentMetadata(hls, file)
+ for (const file of (hls?.VideoFiles || [])) {
+ const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
- await file.save()
+ const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
+ await job.finished()
}
+
+ // Refresh video since files have changed
+ return VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
}
import { move } from 'fs-extra'
import { basename } from 'path'
import { getResumableUploadPath } from '@server/helpers/upload'
-import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
+import { JobQueue } from '@server/lib/job-queue'
import { generateWebTorrentVideoFilename } from '@server/lib/paths'
import { Redis } from '@server/lib/redis'
import { uploadx } from '@server/lib/uploadx'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { buildNextVideoState } from '@server/lib/video-state'
import { openapiOperationDoc } from '@server/middlewares/doc'
-import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
+import { MVideoFile, MVideoFullLight } from '@server/types/models'
import { getLowercaseExtension } from '@shared/core-utils'
import { isAudioFile, uuidToShort } from '@shared/extra-utils'
-import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models'
+import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models'
import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
import { retryTransactionWrapper } from '../../../helpers/database-utils'
import { createReqFiles } from '../../../helpers/express-utils'
// Channel has a new content, set as updated
await videoCreated.VideoChannel.setAsUpdated()
- createTorrentFederate(video, videoFile)
- .then(() => {
- if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
- return addMoveToObjectStorageJob(video)
+ createTorrentFederate(videoCreated, videoFile)
+ .catch(err => {
+ logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })
+
+ return videoCreated
+ }).then(refreshedVideo => {
+ if (!refreshedVideo) return
+
+ if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
+ return addMoveToObjectStorageJob(refreshedVideo)
}
- if (video.state === VideoState.TO_TRANSCODE) {
- return addOptimizeOrMergeAudioJob(videoCreated, videoFile, user)
+ if (refreshedVideo.state === VideoState.TO_TRANSCODE) {
+ return addOptimizeOrMergeAudioJob(refreshedVideo, videoFile, user)
}
- })
- .catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
+ }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res })
return videoFile
}
-async function createTorrentAndSetInfoHashAsync (video: MVideo, fileArg: MVideoFile) {
- await createTorrentAndSetInfoHash(video, fileArg)
+async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) {
+ const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
- // Refresh videoFile because the createTorrentAndSetInfoHash could be long
- const refreshedFile = await VideoFileModel.loadWithVideo(fileArg.id)
- // File does not exist anymore, remove the generated torrent
- if (!refreshedFile) return fileArg.removeTorrent()
+ const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
+ await job.finished()
- refreshedFile.infoHash = fileArg.infoHash
- refreshedFile.torrentFilename = fileArg.torrentFilename
+ const refreshedVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
+ if (!refreshedVideo) return
- return refreshedFile.save()
-}
+ // Only federate and notify after the torrent creation
+ Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
-function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) {
- // Create the torrent file in async way because it could be long
- return createTorrentAndSetInfoHashAsync(video, videoFile)
- .catch(err => logger.error('Cannot create torrent file for video %s', video.url, { err, ...lTags(video.uuid) }))
- .then(() => VideoModel.loadAndPopulateAccountAndServerAndTags(video.id))
- .then(refreshedVideo => {
- if (!refreshedVideo) return
-
- // Only federate and notify after the torrent creation
- Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
+ await retryTransactionWrapper(() => {
+ return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t))
+ })
- return retryTransactionWrapper(() => {
- return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t))
- })
- })
- .catch(err => logger.error('Cannot federate or notify video creation %s', video.url, { err, ...lTags(video.uuid) }))
+ return refreshedVideo
}
async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) {
'video-redundancy': 1,
'video-live-ending': 1,
'video-edition': 1,
+ 'manage-video-torrent': 1,
'move-to-object-storage': 3
}
// Excluded keys are jobs that can be configured by admins
'video-redundancy': 1,
'video-live-ending': 10,
'video-edition': 1,
+ 'manage-video-torrent': 1,
'move-to-object-storage': 1
}
const JOB_TTL: { [id in JobType]: number } = {
'activitypub-refresher': 60000 * 10, // 10 minutes
'video-redundancy': 1000 * 3600 * 3, // 3 hours
'video-live-ending': 1000 * 60 * 10, // 10 minutes
+ 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
}
const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
--- /dev/null
+import { Job } from 'bull'
+import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
+import { VideoModel } from '@server/models/video/video'
+import { VideoFileModel } from '@server/models/video/video-file'
+import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
+import { ManageVideoTorrentPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+async function processManageVideoTorrent (job: Job) {
+ const payload = job.data as ManageVideoTorrentPayload
+ logger.info('Processing torrent in job %d.', job.id)
+
+ if (payload.action === 'create') return doCreateAction(payload)
+ if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ processManageVideoTorrent
+}
+
+// ---------------------------------------------------------------------------
+
+async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'create' }) {
+ const [ video, file ] = await Promise.all([
+ loadVideoOrLog(payload.videoId),
+ loadFileOrLog(payload.videoFileId)
+ ])
+
+ await createTorrentAndSetInfoHash(video, file)
+
+ // Refresh videoFile because the createTorrentAndSetInfoHash could be long
+ const refreshedFile = await VideoFileModel.loadWithVideo(file.id)
+ // File does not exist anymore, remove the generated torrent
+ if (!refreshedFile) return file.removeTorrent()
+
+ refreshedFile.infoHash = file.infoHash
+ refreshedFile.torrentFilename = file.torrentFilename
+
+ return refreshedFile.save()
+}
+
+async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) {
+ const [ video, streamingPlaylist, file ] = await Promise.all([
+ loadVideoOrLog(payload.videoId),
+ loadStreamingPlaylistOrLog(payload.streamingPlaylistId),
+ loadFileOrLog(payload.videoFileId)
+ ])
+
+ await updateTorrentMetadata(video || streamingPlaylist, file)
+
+ await file.save()
+}
+
+async function loadVideoOrLog (videoId: number) {
+ if (!videoId) return undefined
+
+ const video = await VideoModel.load(videoId)
+ if (!video) {
+ logger.debug('Do not process torrent for video %d: does not exist anymore.', videoId)
+ }
+
+ return video
+}
+
+async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) {
+ if (!streamingPlaylistId) return undefined
+
+ const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId)
+ if (!streamingPlaylist) {
+ logger.debug('Do not process torrent for streaming playlist %d: does not exist anymore.', streamingPlaylistId)
+ }
+
+ return streamingPlaylist
+}
+
+async function loadFileOrLog (videoFileId: number) {
+ if (!videoFileId) return undefined
+
+ const file = await VideoFileModel.loadWithVideo(videoFileId)
+
+ if (!file) {
+ logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId)
+ }
+
+ return file
+}
EmailPayload,
JobState,
JobType,
+ ManageVideoTorrentPayload,
MoveObjectStoragePayload,
RefreshPayload,
VideoEditionPayload,
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
import { processEmail } from './handlers/email'
+import { processManageVideoTorrent } from './handlers/manage-video-torrent'
import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { processVideoEdition } from './handlers/video-edition'
import { processVideoFileImport } from './handlers/video-file-import'
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
{ type: 'video-edition', payload: VideoEditionPayload } |
+ { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
export type CreateJobOptions = {
'actor-keys': processActorKeys,
'video-redundancy': processVideoRedundancy,
'move-to-object-storage': processMoveToObjectStorage,
+ 'manage-video-torrent': processManageVideoTorrent,
'video-edition': processVideoEdition
}
'actor-keys',
'video-live-ending',
'move-to-object-storage',
+ 'manage-video-torrent',
'video-edition'
]
}
createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
- const queue = this.queues[obj.type]
+ const queue: Queue = this.queues[obj.type]
if (queue === undefined) {
logger.error('Unknown queue %s: cannot create job.', obj.type)
return
return peertubeTruncate(this.description, { length: maxLength })
}
+ getAllFiles () {
+ let files: MVideoFile[] = []
+
+ if (Array.isArray(this.VideoFiles)) {
+ files = files.concat(this.VideoFiles)
+ }
+
+ if (Array.isArray(this.VideoStreamingPlaylists)) {
+ for (const p of this.VideoStreamingPlaylists) {
+ if (Array.isArray(p.VideoFiles)) {
+ files = files.concat(p.VideoFiles)
+ }
+ }
+ }
+
+ return files
+ }
+
probeMaxQualityFile () {
const file = this.getMaxQualityFile()
const videoOrPlaylist = file.getVideoOrStreamingPlaylist()
| 'video-redundancy'
| 'video-live-ending'
| 'actor-keys'
+ | 'manage-video-torrent'
| 'move-to-object-storage'
| 'video-edition'
videoId: number
}
+export type ManageVideoTorrentPayload =
+ {
+ action: 'create'
+ videoId: number
+ videoFileId: number
+ } | {
+ action: 'update-metadata'
+
+ videoId?: number
+ streamingPlaylistId?: number
+
+ videoFileId: number
+ }
+
// Video transcoding payloads
interface BaseTranscodingPayload {