aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-06-12 20:04:58 +0200
committerChocobozzz <me@florianbigard.com>2018-06-12 20:37:51 +0200
commit2186386cca113506791583cb07d6ccacba7af4e0 (patch)
tree3c214c0b5fbd64332624267fa6e51fd4a9cf6474 /server/lib/job-queue
parent6ccdf3a23ecec5ba2eeaf487fd1fafdc7606b4bf (diff)
downloadPeerTube-2186386cca113506791583cb07d6ccacba7af4e0.tar.gz
PeerTube-2186386cca113506791583cb07d6ccacba7af4e0.tar.zst
PeerTube-2186386cca113506791583cb07d6ccacba7af4e0.zip
Add concept of video state, and add ability to wait transcoding before
publishing a video
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-file.ts127
-rw-r--r--server/lib/job-queue/job-queue.ts1
2 files changed, 70 insertions, 58 deletions
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index 85f7dbfc2..f5ad076a6 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -1,17 +1,16 @@
1import * as kue from 'kue' 1import * as kue from 'kue'
2import { VideoResolution } from '../../../../shared' 2import { VideoResolution, VideoState } from '../../../../shared'
3import { VideoPrivacy } from '../../../../shared/models/videos'
4import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
5import { computeResolutionsToTranscode } from '../../../helpers/utils' 4import { computeResolutionsToTranscode } from '../../../helpers/utils'
6import { sequelizeTypescript } from '../../../initializers'
7import { VideoModel } from '../../../models/video/video' 5import { VideoModel } from '../../../models/video/video'
8import { shareVideoByServerAndChannel } from '../../activitypub'
9import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send'
10import { JobQueue } from '../job-queue' 6import { JobQueue } from '../job-queue'
7import { federateVideoIfNeeded } from '../../activitypub'
8import { retryTransactionWrapper } from '../../../helpers/database-utils'
9import { sequelizeTypescript } from '../../../initializers'
11 10
12export type VideoFilePayload = { 11export type VideoFilePayload = {
13 videoUUID: string 12 videoUUID: string
14 isNewVideo: boolean 13 isNewVideo?: boolean
15 resolution?: VideoResolution 14 resolution?: VideoResolution
16 isPortraitMode?: boolean 15 isPortraitMode?: boolean
17} 16}
@@ -52,10 +51,20 @@ async function processVideoFile (job: kue.Job) {
52 // Transcoding in other resolution 51 // Transcoding in other resolution
53 if (payload.resolution) { 52 if (payload.resolution) {
54 await video.transcodeOriginalVideofile(payload.resolution, payload.isPortraitMode) 53 await video.transcodeOriginalVideofile(payload.resolution, payload.isPortraitMode)
55 await onVideoFileTranscoderOrImportSuccess(video) 54
55 const options = {
56 arguments: [ video ],
57 errorMessage: 'Cannot execute onVideoFileTranscoderOrImportSuccess with many retries.'
58 }
59 await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, options)
56 } else { 60 } else {
57 await video.optimizeOriginalVideofile() 61 await video.optimizeOriginalVideofile()
58 await onVideoFileOptimizerSuccess(video, payload.isNewVideo) 62
63 const options = {
64 arguments: [ video, payload.isNewVideo ],
65 errorMessage: 'Cannot execute onVideoFileOptimizerSuccess with many retries.'
66 }
67 await retryTransactionWrapper(onVideoFileOptimizerSuccess, options)
59 } 68 }
60 69
61 return video 70 return video
@@ -64,68 +73,70 @@ async function processVideoFile (job: kue.Job) {
64async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { 73async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) {
65 if (video === undefined) return undefined 74 if (video === undefined) return undefined
66 75
67 // Maybe the video changed in database, refresh it 76 return sequelizeTypescript.transaction(async t => {
68 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) 77 // Maybe the video changed in database, refresh it
69 // Video does not exist anymore 78 let videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid, t)
70 if (!videoDatabase) return undefined 79 // Video does not exist anymore
80 if (!videoDatabase) return undefined
71 81
72 if (video.privacy !== VideoPrivacy.PRIVATE) { 82 // We transcoded the video file in another format, now we can publish it
73 await sendUpdateVideo(video, undefined) 83 const oldState = videoDatabase.state
74 } 84 videoDatabase.state = VideoState.PUBLISHED
85 videoDatabase = await videoDatabase.save({ transaction: t })
86
87 // If the video was not published, we consider it is a new one for other instances
88 const isNewVideo = oldState !== VideoState.PUBLISHED
89 await federateVideoIfNeeded(videoDatabase, isNewVideo, t)
75 90
76 return undefined 91 return undefined
92 })
77} 93}
78 94
79async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { 95async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) {
80 if (video === undefined) return undefined 96 if (video === undefined) return undefined
81 97
82 // Maybe the video changed in database, refresh it 98 // Outside the transaction (IO on disk)
83 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) 99 const { videoFileResolution } = await video.getOriginalFileResolution()
84 // Video does not exist anymore 100
85 if (!videoDatabase) return undefined 101 return sequelizeTypescript.transaction(async t => {
86 102 // Maybe the video changed in database, refresh it
87 if (video.privacy !== VideoPrivacy.PRIVATE) { 103 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid, t)
88 if (isNewVideo !== false) { 104 // Video does not exist anymore
89 // Now we'll add the video's meta data to our followers 105 if (!videoDatabase) return undefined
90 await sequelizeTypescript.transaction(async t => { 106
91 await sendCreateVideo(video, t) 107 // Create transcoding jobs if there are enabled resolutions
92 await shareVideoByServerAndChannel(video, t) 108 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution)
93 }) 109 logger.info(
94 } else { 110 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution,
95 await sendUpdateVideo(video, undefined) 111 { resolutions: resolutionsEnabled }
96 } 112 )
97 } 113
98 114 if (resolutionsEnabled.length !== 0) {
99 const { videoFileResolution } = await videoDatabase.getOriginalFileResolution() 115 const tasks: Promise<any>[] = []
100 116
101 // Create transcoding jobs if there are enabled resolutions 117 for (const resolution of resolutionsEnabled) {
102 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) 118 const dataInput = {
103 logger.info( 119 videoUUID: videoDatabase.uuid,
104 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, 120 resolution
105 { resolutions: resolutionsEnabled } 121 }
106 ) 122
123 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
124 tasks.push(p)
125 }
107 126
108 if (resolutionsEnabled.length !== 0) { 127 await Promise.all(tasks)
109 const tasks: Promise<any>[] = []
110 128
111 for (const resolution of resolutionsEnabled) { 129 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
112 const dataInput = { 130 } else {
113 videoUUID: videoDatabase.uuid, 131 // No transcoding to do, it's now published
114 resolution, 132 video.state = VideoState.PUBLISHED
115 isNewVideo 133 video = await video.save({ transaction: t })
116 }
117 134
118 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) 135 logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid)
119 tasks.push(p)
120 } 136 }
121 137
122 await Promise.all(tasks) 138 return federateVideoIfNeeded(video, isNewVideo, t)
123 139 })
124 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
125 } else {
126 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
127 return undefined
128 }
129} 140}
130 141
131// --------------------------------------------------------------------------- 142// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index bdfa19b61..695fe0eea 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -79,6 +79,7 @@ class JobQueue {
79 const res = await handlers[ handlerName ](job) 79 const res = await handlers[ handlerName ](job)
80 return done(null, res) 80 return done(null, res)
81 } catch (err) { 81 } catch (err) {
82 logger.error('Cannot execute job %d.', job.id, { err })
82 return done(err) 83 return done(err)
83 } 84 }
84 }) 85 })