}
JobQueue.Instance.init(true)
- await JobQueue.Instance.createJobWithPromise({ type: 'video-file-import', payload: dataInput })
+ await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput })
console.log('Import job for video %s created.', video.uuid)
}
const account = res.locals.account
if (account.isOutdated()) {
- JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } })
}
return res.json(account.toFormattedJSON())
followerActorId: follower.id
}
- JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
}
for (const handle of handles) {
followerActorId: follower.id
}
- JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
}
return res.status(HttpStatusCode.NO_CONTENT_204).end()
videoId: res.locals.onlyVideo.id
}
- await JobQueue.Instance.createJobWithPromise({
+ await JobQueue.Instance.createJob({
type: 'video-redundancy',
payload
})
followerActorId: user.Account.Actor.id
}
- JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
return res.status(HttpStatusCode.NO_CONTENT_204).end()
}
})
const payload = { actorId: videoChannelCreated.actorId }
- await JobQueue.Instance.createJobWithPromise({ type: 'actor-keys', payload })
+ await JobQueue.Instance.createJob({ type: 'actor-keys', payload })
auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON()))
logger.info('Video channel %s created.', videoChannelCreated.Actor.url)
const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id })
if (videoChannel.isOutdated()) {
- JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } })
}
return res.json(videoChannel.toFormattedJSON())
videoImportId: videoImport.id,
magnetUri
}
- await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload })
+ await JobQueue.Instance.createJob({ type: 'video-import', payload })
auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
videoImportId: videoImport.id,
fileExt
}
- await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload })
+ await JobQueue.Instance.createJob({ type: 'video-import', payload })
auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId })
if (video.isOutdated()) {
- JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } })
}
return res.json(video.toFormattedDetailsJSON())
tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files))
}
- JobQueue.Instance.createJob({ type: 'video-studio-edition', payload })
+ JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload })
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
}
import express from 'express'
import { Transaction } from 'sequelize/types'
import { changeVideoChannelShare } from '@server/lib/activitypub/share'
-import { JobQueue } from '@server/lib/job-queue'
+import { CreateJobArgument, JobQueue } from '@server/lib/job-queue'
import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
import { openapiOperationDoc } from '@server/middlewares/doc'
import { FilteredModelAttributes } from '@server/types'
import { logger, loggerTagsFactory } from '../../../helpers/logger'
import { MIMETYPES } from '../../../initializers/constants'
import { sequelizeTypescript } from '../../../initializers/database'
-import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
-import { Notifier } from '../../../lib/notifier'
import { Hooks } from '../../../lib/plugins/hooks'
import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares'
return { videoInstanceUpdated, isNewVideo }
})
- const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate)
+ Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res })
- await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t))
-
- if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
-
- Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res })
+ await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo })
} catch (err) {
// Force fields we want to update
// If the transaction is retried, sequelize will think the object has not changed
}
}
-async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) {
- if (video.isLive || !videoInfoToUpdate.name) return video
+async function addVideoJobsAfterUpdate (options: {
+ video: MVideoFullLight
+ videoInfoToUpdate: VideoUpdate
+ wasConfidentialVideo: boolean
+ isNewVideo: boolean
+}) {
+ const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options
+ const jobs: CreateJobArgument[] = []
+
+ if (!video.isLive && videoInfoToUpdate.name) {
- for (const file of (video.VideoFiles || [])) {
- const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
+ for (const file of (video.VideoFiles || [])) {
+ const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
- const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
- await JobQueue.Instance.waitJob(job)
- }
+ jobs.push({ type: 'manage-video-torrent', payload })
+ }
- const hls = video.getHLSPlaylist()
+ const hls = video.getHLSPlaylist()
- for (const file of (hls?.VideoFiles || [])) {
- const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
+ for (const file of (hls?.VideoFiles || [])) {
+ const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
- const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
- await JobQueue.Instance.waitJob(job)
+ jobs.push({ type: 'manage-video-torrent', payload })
+ }
+ }
+
+ jobs.push({
+ type: 'federate-video',
+ payload: {
+ videoUUID: video.uuid,
+ isNewVideo
+ }
+ })
+
+ if (wasConfidentialVideo) {
+ jobs.push({
+ type: 'notify',
+ payload: {
+ action: 'new-video',
+ videoUUID: video.uuid
+ }
+ })
}
- // Refresh video since files have changed
- return VideoModel.loadFull(video.id)
+ return JobQueue.Instance.createSequentialJobFlow(...jobs)
}
import { Redis } from '@server/lib/redis'
import { uploadx } from '@server/lib/uploadx'
import {
- addMoveToObjectStorageJob,
- addOptimizeOrMergeAudioJob,
buildLocalVideoFromReq,
+ buildMoveToObjectStorageJob,
+ buildOptimizeOrMergeAudioJob,
buildVideoThumbnailsFromReq,
setVideoTags
} from '@server/lib/video'
import { buildNextVideoState } from '@server/lib/video-state'
import { openapiOperationDoc } from '@server/middlewares/doc'
import { VideoSourceModel } from '@server/models/video/video-source'
-import { MVideoFile, MVideoFullLight } from '@server/types/models'
+import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
import { getLowercaseExtension } from '@shared/core-utils'
import { isAudioFile, uuidToShort } from '@shared/extra-utils'
-import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models'
+import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models'
import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
-import { retryTransactionWrapper } from '../../../helpers/database-utils'
import { createReqFiles } from '../../../helpers/express-utils'
import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg'
import { logger, loggerTagsFactory } from '../../../helpers/logger'
import { MIMETYPES } from '../../../initializers/constants'
import { sequelizeTypescript } from '../../../initializers/database'
-import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
-import { Notifier } from '../../../lib/notifier'
import { Hooks } from '../../../lib/plugins/hooks'
import { generateVideoMiniature } from '../../../lib/thumbnail'
import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
// Channel has a new content, set as updated
await videoCreated.VideoChannel.setAsUpdated()
- createTorrentFederate(videoCreated, videoFile)
- .catch(err => {
- logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })
-
- return videoCreated
- }).then(refreshedVideo => {
- if (!refreshedVideo) return
-
- if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
- return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined })
- }
-
- if (refreshedVideo.state === VideoState.TO_TRANSCODE) {
- return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user })
- }
- }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
+ addVideoJobsAfterUpload(videoCreated, videoFile, user)
+ .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res })
return videoFile
}
-async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) {
- const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
-
- const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
- await JobQueue.Instance.waitJob(job)
-
- const refreshedVideo = await VideoModel.loadFull(video.id)
- if (!refreshedVideo) return
-
- // Only federate and notify after the torrent creation
- Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
+async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) {
+ return JobQueue.Instance.createSequentialJobFlow(
+ {
+ type: 'manage-video-torrent' as 'manage-video-torrent',
+ payload: {
+ videoId: video.id,
+ videoFileId: videoFile.id,
+ action: 'create'
+ }
+ },
+ {
+ type: 'federate-video' as 'federate-video',
+ payload: {
+ videoUUID: video.uuid,
+ isNewVideo: true
+ }
+ },
- await retryTransactionWrapper(() => {
- return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t))
- })
+ video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE
+ ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined })
+ : undefined,
- return refreshedVideo
+ video.state === VideoState.TO_TRANSCODE
+ ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
+ : undefined
+ )
}
async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) {
'video-live-ending': 1,
'video-studio-edition': 1,
'manage-video-torrent': 1,
- 'move-to-object-storage': 3
+ 'move-to-object-storage': 3,
+ 'notify': 1,
+ 'federate-video': 1
}
// Excluded keys are jobs that can be configured by admins
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
'video-live-ending': 10,
'video-studio-edition': 1,
'manage-video-torrent': 1,
- 'move-to-object-storage': 1
+ 'move-to-object-storage': 1,
+ 'notify': 5,
+ 'federate-video': 3
}
const JOB_TTL: { [id in JobType]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
'video-redundancy': 1000 * 3600 * 3, // 3 hours
'video-live-ending': 1000 * 60 * 10, // 10 minutes
'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
+ 'notify': 60000 * 5, // 5 minutes
+ 'federate-video': 60000 * 5, // 5 minutes
'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
}
const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) {
if ((created === true || refreshed === true) && updateCollections === true) {
const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' }
- await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload })
+ await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
}
}
// We created a new account: fetch the playlists
if (created === true && actor.Account && accountPlaylistsUrl) {
const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' }
- await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload })
+ await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
}
}
isAutoFollow: true
}
- JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
}
}
type: 'activity' as 'activity'
}
- return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
+ return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload })
}
export {
function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) {
if (!playlist.isOutdated()) return
- JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } })
}
async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> {
body: activity,
contextType: null
}
- return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
+ return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload }))
}
// ---------------------------------------------------------------------------
contextType
}
- JobQueue.Instance.createJob({
+ JobQueue.Instance.createJobAsync({
type: parallelizable
? 'activitypub-http-broadcast-parallel'
: 'activitypub-http-broadcast',
contextType
}
- JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
}
}
contextType
}
- JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
}
// ---------------------------------------------------------------------------
return refreshVideoIfNeeded(refreshOptions)
}
- await JobQueue.Instance.createJobWithPromise({
+ await JobQueue.Instance.createJob({
type: 'activitypub-refresher',
payload: { type: 'video', url: video.url }
})
}
function createJob (payload: ActivitypubHttpFetcherPayload) {
- return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload })
+ return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
}
function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
}
}
- return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+ return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
}
addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) {
}
}
- return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+ return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
}
addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) {
}
}
- return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+ return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
}
addUserBlockJob (user: MUser, blocked: boolean, reason?: string) {
text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.`
}
- return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+ return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
}
addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) {
}
}
- return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+ return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
}
async sendMail (options: EmailPayload) {
const payload = job.data as ActivitypubFollowPayload
const host = payload.host
- logger.info('Processing ActivityPub follow in job %d.', job.id)
+ logger.info('Processing ActivityPub follow in job %s.', job.id)
let targetActor: MActorFull
if (!host || host === WEBSERVER.HOST) {
import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
async function processActivityPubHttpBroadcast (job: Job) {
- logger.info('Processing ActivityPub broadcast in job %d.', job.id)
+ logger.info('Processing ActivityPub broadcast in job %s.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload
import { addVideoComments } from '../../activitypub/video-comments'
async function processActivityPubHttpFetcher (job: Job) {
- logger.info('Processing ActivityPub fetcher in job %d.', job.id)
+ logger.info('Processing ActivityPub fetcher in job %s.', job.id)
const payload = job.data as ActivitypubHttpFetcherPayload
import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
async function processActivityPubHttpUnicast (job: Job) {
- logger.info('Processing ActivityPub unicast in job %d.', job.id)
+ logger.info('Processing ActivityPub unicast in job %s.', job.id)
const payload = job.data as ActivitypubHttpUnicastPayload
const uri = payload.uri
async function refreshAPObject (job: Job) {
const payload = job.data as RefreshPayload
- logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
+ logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url)
if (payload.type === 'video') return refreshVideo(payload.url)
if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
async function processActorKeys (job: Job) {
const payload = job.data as ActorKeysPayload
- logger.info('Processing actor keys in job %d.', job.id)
+ logger.info('Processing actor keys in job %s.', job.id)
const actor = await ActorModel.load(payload.actorId)
async function processEmail (job: Job) {
const payload = job.data as EmailPayload
- logger.info('Processing email in job %d.', job.id)
+ logger.info('Processing email in job %s.', job.id)
return Emailer.Instance.sendMail(payload)
}
--- /dev/null
+import { Job } from 'bullmq'
+import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { sequelizeTypescript } from '@server/initializers/database'
+import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
+import { VideoModel } from '@server/models/video/video'
+import { FederateVideoPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+function processFederateVideo (job: Job) {
+ const payload = job.data as FederateVideoPayload
+
+ logger.info('Processing video federation in job %s.', job.id)
+
+ return retryTransactionWrapper(() => {
+ return sequelizeTypescript.transaction(async t => {
+ const video = await VideoModel.loadFull(payload.videoUUID, t)
+ if (!video) return
+
+ return federateVideoIfNeeded(video, payload.isNewVideo, t)
+ })
+ })
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ processFederateVideo
+}
async function processManageVideoTorrent (job: Job) {
const payload = job.data as ManageVideoTorrentPayload
- logger.info('Processing torrent in job %d.', job.id)
+ logger.info('Processing torrent in job %s.', job.id)
if (payload.action === 'create') return doCreateAction(payload)
if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
export async function processMoveToObjectStorage (job: Job) {
const payload = job.data as MoveObjectStoragePayload
- logger.info('Moving video %s in job %d.', payload.videoUUID, job.id)
+ logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
const video = await VideoModel.loadWithFiles(payload.videoUUID)
// No video, maybe deleted?
const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
if (pendingMove === 0) {
- logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags)
+ logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
}
--- /dev/null
+import { Job } from 'bullmq'
+import { Notifier } from '@server/lib/notifier'
+import { VideoModel } from '@server/models/video/video'
+import { NotifyPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+async function processNotify (job: Job) {
+ const payload = job.data as NotifyPayload
+ logger.info('Processing %s notification in job %s.', payload.action, job.id)
+
+ if (payload.action === 'new-video') return doNotifyNewVideo(payload)
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ processNotify
+}
+
+// ---------------------------------------------------------------------------
+
+async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) {
+ const refreshedVideo = await VideoModel.loadFull(payload.videoUUID)
+ if (!refreshedVideo) return
+
+ Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
+}
import { CONFIG } from '@server/initializers/config'
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
import { generateWebTorrentVideoFilename } from '@server/lib/paths'
-import { addMoveToObjectStorageJob } from '@server/lib/video'
+import { buildMoveToObjectStorageJob } from '@server/lib/video'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoFileImportPayload, VideoStorage } from '@shared/models'
import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
import { logger } from '../../../helpers/logger'
+import { JobQueue } from '../job-queue'
async function processVideoFileImport (job: Job) {
const payload = job.data as VideoFileImportPayload
- logger.info('Processing video file import in job %d.', job.id)
+ logger.info('Processing video file import in job %s.', job.id)
const video = await VideoModel.loadFull(payload.videoUUID)
// No video, maybe deleted?
await updateVideoFile(video, payload.filePath)
if (CONFIG.OBJECT_STORAGE.ENABLED) {
- await addMoveToObjectStorageJob({ video, previousVideoState: video.state })
+ await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state }))
} else {
await federateVideoIfNeeded(video, false)
}
import { Hooks } from '@server/lib/plugins/hooks'
import { ServerConfigManager } from '@server/lib/server-config-manager'
import { isAbleToUploadVideo } from '@server/lib/user'
-import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video'
+import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { buildNextVideoState } from '@server/lib/video-state'
import { ThumbnailModel } from '@server/models/video/thumbnail'
import { federateVideoIfNeeded } from '../../activitypub/videos'
import { Notifier } from '../../notifier'
import { generateVideoMiniature } from '../../thumbnail'
+import { JobQueue } from '../job-queue'
async function processVideoImport (job: Job) {
const payload = job.data as VideoImportPayload
// ---------------------------------------------------------------------------
async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
- logger.info('Processing torrent video import in job %d.', job.id)
+ logger.info('Processing torrent video import in job %s.', job.id)
const options = { type: payload.type, videoImportId: payload.videoImportId }
}
async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
- logger.info('Processing youtubeDL video import in job %d.', job.id)
+ logger.info('Processing youtubeDL video import in job %s.', job.id)
const options = { type: payload.type, videoImportId: videoImport.id }
}
if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
- return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
+ await JobQueue.Instance.createJob(
+ await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
+ )
}
// Create transcoding jobs?
if (video.state === VideoState.TO_TRANSCODE) {
- await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
+ await JobQueue.Instance.createJob(
+ await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
+ )
}
} catch (err) {
async function processVideoRedundancy (job: Job) {
const payload = job.data as VideoRedundancyPayload
- logger.info('Processing video redundancy in job %d.', job.id)
+ logger.info('Processing video redundancy in job %s.', job.id)
return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId)
}
import { generateWebTorrentVideoFilename } from '@server/lib/paths'
import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
import { isAbleToUploadVideo } from '@server/lib/user'
-import { addOptimizeOrMergeAudioJob } from '@server/lib/video'
+import { buildOptimizeOrMergeAudioJob } from '@server/lib/video'
import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
VideoStudioTaskWatermarkPayload
} from '@shared/models'
import { logger, loggerTagsFactory } from '../../../helpers/logger'
+import { JobQueue } from '../job-queue'
const lTagsBase = loggerTagsFactory('video-edition')
const payload = job.data as VideoStudioEditionPayload
const lTags = lTagsBase(payload.videoUUID)
- logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags)
+ logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags)
const video = await VideoModel.loadFull(payload.videoUUID)
await federateVideoIfNeeded(video, false, undefined)
const user = await UserModel.loadByVideoId(video.id)
- await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
+
+ await JobQueue.Instance.createJob(
+ await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
+ )
}
// ---------------------------------------------------------------------------
import {
+ FlowJob,
+ FlowProducer,
Job,
JobsOptions,
Queue,
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
-import { timeoutPromise } from '@shared/core-utils'
+import { pick, timeoutPromise } from '@shared/core-utils'
import {
ActivitypubFollowPayload,
ActivitypubHttpBroadcastPayload,
ActorKeysPayload,
DeleteResumableUploadMetaFilePayload,
EmailPayload,
+ FederateVideoPayload,
JobState,
JobType,
ManageVideoTorrentPayload,
MoveObjectStoragePayload,
+ NotifyPayload,
RefreshPayload,
VideoFileImportPayload,
VideoImportPayload,
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
import { processEmail } from './handlers/email'
+import { processFederateVideo } from './handlers/federate-video'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
+import { processNotify } from './handlers/notify'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
-type CreateJobArgument =
+export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
{ type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
{ type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
- { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
+ { type: 'notify', payload: NotifyPayload } |
+ { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
+ { type: 'federate-video', payload: FederateVideoPayload }
export type CreateJobOptions = {
delay?: number
'video-redundancy': processVideoRedundancy,
'move-to-object-storage': processMoveToObjectStorage,
'manage-video-torrent': processManageVideoTorrent,
- 'video-studio-edition': processVideoStudioEdition
+ 'notify': processNotify,
+ 'video-studio-edition': processVideoStudioEdition,
+ 'federate-video': processFederateVideo
}
const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
'video-live-ending',
'move-to-object-storage',
'manage-video-torrent',
- 'video-studio-edition'
+ 'video-studio-edition',
+ 'notify',
+ 'federate-video'
]
const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
private queueEvents: { [id in JobType]?: QueueEvents } = {}
+ private flowProducer: FlowProducer
+
private initialized = false
private jobRedisPrefix: string
this.buildQueueEvent(handlerName, produceOnly)
}
+ this.flowProducer = new FlowProducer({
+ connection: this.getRedisConnection(),
+ prefix: this.jobRedisPrefix
+ })
+
this.addRepeatableJobs()
}
}
}
+ // ---------------------------------------------------------------------------
+
async terminate () {
const promises = Object.keys(this.workers)
.map(handlerName => {
}
}
- createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
- this.createJobWithPromise(obj, options)
- .catch(err => logger.error('Cannot create job.', { err, obj }))
+ // ---------------------------------------------------------------------------
+
+ createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
+ this.createJob(options)
+ .catch(err => logger.error('Cannot create job.', { err, options }))
}
- async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
- const queue: Queue = this.queues[obj.type]
+ async createJob (options: CreateJobArgument & CreateJobOptions) {
+ const queue: Queue = this.queues[options.type]
if (queue === undefined) {
- logger.error('Unknown queue %s: cannot create job.', obj.type)
+ logger.error('Unknown queue %s: cannot create job.', options.type)
return
}
- const jobArgs: JobsOptions = {
+ const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
+
+ return queue.add('job', options.payload, jobOptions)
+ }
+
+ async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
+ let lastJob: FlowJob
+
+ for (const job of jobs) {
+ if (!job) continue
+
+ lastJob = {
+ name: 'job',
+ data: job.payload,
+ queueName: job.type,
+ opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+ children: lastJob
+ ? [ lastJob ]
+ : []
+ }
+ }
+
+ return this.flowProducer.add(lastJob)
+ }
+
+ private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
+ return {
backoff: { delay: 60 * 1000, type: 'exponential' },
- attempts: JOB_ATTEMPTS[obj.type],
+ attempts: JOB_ATTEMPTS[type],
priority: options.priority,
delay: options.delay
}
-
- return queue.add('job', obj.payload, jobArgs)
}
+ // ---------------------------------------------------------------------------
+
async listForApi (options: {
state?: JobState
start: number
return Promise.all(promises)
}
+ // ---------------------------------------------------------------------------
+
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]
await liveSession.save()
}
- JobQueue.Instance.createJob({
+ JobQueue.Instance.createJobAsync({
type: 'video-live-ending',
payload: {
videoId: fullVideo.id,
streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
publishedAt: fullVideo.publishedAt.toISOString()
- }
- }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
+ },
+
+ delay: cleanupNow
+ ? 0
+ : VIDEO_LIVE.CLEANUP_DELAY
+ })
fullVideo.state = live.permanentLive
? VideoState.WAITING_FOR_LIVE
for (const to of toEmails) {
const payload = await object.createEmail(to)
- JobQueue.Instance.createJob({ type: 'email', payload })
+ JobQueue.Instance.createJobAsync({ type: 'email', payload })
}
}
isAutoFollow: true
}
- JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+ JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
}
}
import { Transaction } from 'sequelize'
+import { retryTransactionWrapper } from '@server/helpers/database-utils'
import { logger } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { sequelizeTypescript } from '@server/initializers/database'
import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models'
import { VideoState } from '@shared/models'
import { federateVideoIfNeeded } from './activitypub/videos'
+import { JobQueue } from './job-queue'
import { Notifier } from './notifier'
-import { addMoveToObjectStorageJob } from './video'
-import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { buildMoveToObjectStorageJob } from './video'
function buildNextVideoState (currentState?: VideoState) {
if (currentState === VideoState.PUBLISHED) {
logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
try {
- await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo })
+ await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }))
return true
} catch (err) {
import { UploadFiles } from 'express'
+import memoizee from 'memoizee'
import { Transaction } from 'sequelize/types'
+import { CONFIG } from '@server/initializers/config'
import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants'
import { TagModel } from '@server/models/video/tag'
import { VideoModel } from '@server/models/video/video'
import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
import { updateVideoMiniatureFromExisting } from './thumbnail'
-import { CONFIG } from '@server/initializers/config'
-import memoizee from 'memoizee'
function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
return {
// ---------------------------------------------------------------------------
-async function addOptimizeOrMergeAudioJob (options: {
+async function buildOptimizeOrMergeAudioJob (options: {
video: MVideoUUID
videoFile: MVideoFile
user: MUserId
}) {
const { video, videoFile, user, isNewVideo } = options
- let dataInput: VideoTranscodingPayload
+ let payload: VideoTranscodingPayload
if (videoFile.isAudio()) {
- dataInput = {
+ payload = {
type: 'merge-audio-to-webtorrent',
resolution: DEFAULT_AUDIO_RESOLUTION,
videoUUID: video.uuid,
isNewVideo
}
} else {
- dataInput = {
+ payload = {
type: 'optimize-to-webtorrent',
videoUUID: video.uuid,
isNewVideo
}
}
- const jobOptions = {
- priority: await getTranscodingJobPriority(user)
- }
+ await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
- return addTranscodingJob(dataInput, jobOptions)
+ return {
+ type: 'video-transcoding' as 'video-transcoding',
+ priority: await getTranscodingJobPriority(user),
+ payload
+ }
}
async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
- return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options)
+ return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
}
async function getTranscodingJobPriority (user: MUserId) {
// ---------------------------------------------------------------------------
-async function addMoveToObjectStorageJob (options: {
+async function buildMoveToObjectStorageJob (options: {
video: MVideoUUID
previousVideoState: VideoState
isNewVideo?: boolean // Default true
await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove')
- const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState }
- return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput })
+ return {
+ type: 'move-to-object-storage' as 'move-to-object-storage',
+ payload: {
+ videoUUID: video.uuid,
+ isNewVideo,
+ previousVideoState
+ }
+ }
}
// ---------------------------------------------------------------------------
buildLocalVideoFromReq,
buildVideoThumbnailsFromReq,
setVideoTags,
- addOptimizeOrMergeAudioJob,
+ buildOptimizeOrMergeAudioJob,
addTranscodingJob,
- addMoveToObjectStorageJob,
+ buildMoveToObjectStorageJob,
getTranscodingJobPriority,
getCachedVideoDuration
}
| 'manage-video-torrent'
| 'move-to-object-storage'
| 'video-studio-edition'
+ | 'notify'
+ | 'federate-video'
export interface Job {
id: number | string
videoUUID: string
tasks: VideoStudioTaskPayload[]
}
+
+// ---------------------------------------------------------------------------
+
+export type NotifyPayload =
+ {
+ action: 'new-video'
+ videoUUID: string
+ }
+
+// ---------------------------------------------------------------------------
+
+export interface FederateVideoPayload {
+ videoUUID: string
+ isNewVideo: boolean
+}