aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2019-11-15 15:06:03 +0100
committerChocobozzz <me@florianbigard.com>2019-11-25 10:59:43 +0100
commitd7a25329f9e607894d29ab342b9cb66638b56dc0 (patch)
tree6cd6bc4f2689f78944238b313c93427423a932ac /server/lib/job-queue
parent14981d7331da3f63fe6cfaf020ccb7c910006eaf (diff)
downloadPeerTube-d7a25329f9e607894d29ab342b9cb66638b56dc0.tar.gz
PeerTube-d7a25329f9e607894d29ab342b9cb66638b56dc0.tar.zst
PeerTube-d7a25329f9e607894d29ab342b9cb66638b56dc0.zip
Add ability to disable webtorrent
In favour of HLS
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts6
-rw-r--r--server/lib/job-queue/handlers/video-import.ts12
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts119
3 files changed, 74 insertions, 63 deletions
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 5c5b7dccb..99c991e72 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -7,6 +7,8 @@ import { copy, stat } from 'fs-extra'
7import { VideoFileModel } from '../../../models/video/video-file' 7import { VideoFileModel } from '../../../models/video/video-file'
8import { extname } from 'path' 8import { extname } from 'path'
9import { MVideoFile, MVideoWithFile } from '@server/typings/models' 9import { MVideoFile, MVideoWithFile } from '@server/typings/models'
10import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
11import { getVideoFilePath } from '@server/lib/video-paths'
10 12
11export type VideoFileImportPayload = { 13export type VideoFileImportPayload = {
12 videoUUID: string, 14 videoUUID: string,
@@ -68,10 +70,10 @@ async function updateVideoFile (video: MVideoWithFile, inputFilePath: string) {
68 updatedVideoFile = currentVideoFile 70 updatedVideoFile = currentVideoFile
69 } 71 }
70 72
71 const outputPath = video.getVideoFilePath(updatedVideoFile) 73 const outputPath = getVideoFilePath(video, updatedVideoFile)
72 await copy(inputFilePath, outputPath) 74 await copy(inputFilePath, outputPath)
73 75
74 await video.createTorrentAndSetInfoHash(updatedVideoFile) 76 await createTorrentAndSetInfoHash(video, updatedVideoFile)
75 77
76 await updatedVideoFile.save() 78 await updatedVideoFile.save()
77 79
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 93a3e9d90..1fca17584 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -4,14 +4,14 @@ import { downloadYoutubeDLVideo } from '../../../helpers/youtube-dl'
4import { VideoImportModel } from '../../../models/video/video-import' 4import { VideoImportModel } from '../../../models/video/video-import'
5import { VideoImportState } from '../../../../shared/models/videos' 5import { VideoImportState } from '../../../../shared/models/videos'
6import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils' 6import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils'
7import { extname, join } from 'path' 7import { extname } from 'path'
8import { VideoFileModel } from '../../../models/video/video-file' 8import { VideoFileModel } from '../../../models/video/video-file'
9import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants' 9import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants'
10import { VideoState } from '../../../../shared' 10import { VideoState } from '../../../../shared'
11import { JobQueue } from '../index' 11import { JobQueue } from '../index'
12import { federateVideoIfNeeded } from '../../activitypub' 12import { federateVideoIfNeeded } from '../../activitypub'
13import { VideoModel } from '../../../models/video/video' 13import { VideoModel } from '../../../models/video/video'
14import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' 14import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent'
15import { getSecureTorrentName } from '../../../helpers/utils' 15import { getSecureTorrentName } from '../../../helpers/utils'
16import { move, remove, stat } from 'fs-extra' 16import { move, remove, stat } from 'fs-extra'
17import { Notifier } from '../../notifier' 17import { Notifier } from '../../notifier'
@@ -21,7 +21,7 @@ import { createVideoMiniatureFromUrl, generateVideoMiniature } from '../../thumb
21import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type' 21import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type'
22import { MThumbnail } from '../../../typings/models/video/thumbnail' 22import { MThumbnail } from '../../../typings/models/video/thumbnail'
23import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/typings/models/video/video-import' 23import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/typings/models/video/video-import'
24import { MVideoBlacklistVideo, MVideoBlacklist } from '@server/typings/models' 24import { getVideoFilePath } from '@server/lib/video-paths'
25 25
26type VideoImportYoutubeDLPayload = { 26type VideoImportYoutubeDLPayload = {
27 type: 'youtube-dl' 27 type: 'youtube-dl'
@@ -142,12 +142,12 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
142 } 142 }
143 videoFile = new VideoFileModel(videoFileData) 143 videoFile = new VideoFileModel(videoFileData)
144 144
145 const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ] }) 145 const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] })
146 // To clean files if the import fails 146 // To clean files if the import fails
147 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles }) 147 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles })
148 148
149 // Move file 149 // Move file
150 videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImportWithFiles.Video.getVideoFilename(videoFile)) 150 videoDestFile = getVideoFilePath(videoImportWithFiles.Video, videoFile)
151 await move(tempVideoPath, videoDestFile) 151 await move(tempVideoPath, videoDestFile)
152 tempVideoPath = null // This path is not used anymore 152 tempVideoPath = null // This path is not used anymore
153 153
@@ -168,7 +168,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
168 } 168 }
169 169
170 // Create torrent 170 // Create torrent
171 await videoImportWithFiles.Video.createTorrentAndSetInfoHash(videoFile) 171 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile)
172 172
173 const { videoImportUpdated, video } = await sequelizeTypescript.transaction(async t => { 173 const { videoImportUpdated, video } = await sequelizeTypescript.transaction(async t => {
174 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo 174 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 2ebe15bcb..39b9fac98 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,5 +1,5 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { VideoResolution, VideoState } from '../../../../shared' 2import { VideoResolution } from '../../../../shared'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video' 4import { VideoModel } from '../../../models/video/video'
5import { JobQueue } from '../job-queue' 5import { JobQueue } from '../job-queue'
@@ -8,10 +8,10 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils'
8import { sequelizeTypescript } from '../../../initializers' 8import { sequelizeTypescript } from '../../../initializers'
9import * as Bluebird from 'bluebird' 9import * as Bluebird from 'bluebird'
10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' 10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
11import { generateHlsPlaylist, optimizeVideofile, transcodeOriginalVideofile, mergeAudioVideofile } from '../../video-transcoding' 11import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding'
12import { Notifier } from '../../notifier' 12import { Notifier } from '../../notifier'
13import { CONFIG } from '../../../initializers/config' 13import { CONFIG } from '../../../initializers/config'
14import { MVideoUUID, MVideoWithFile } from '@server/typings/models' 14import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/typings/models'
15 15
16interface BaseTranscodingPayload { 16interface BaseTranscodingPayload {
17 videoUUID: string 17 videoUUID: string
@@ -22,6 +22,7 @@ interface HLSTranscodingPayload extends BaseTranscodingPayload {
22 type: 'hls' 22 type: 'hls'
23 isPortraitMode?: boolean 23 isPortraitMode?: boolean
24 resolution: VideoResolution 24 resolution: VideoResolution
25 copyCodecs: boolean
25} 26}
26 27
27interface NewResolutionTranscodingPayload extends BaseTranscodingPayload { 28interface NewResolutionTranscodingPayload extends BaseTranscodingPayload {
@@ -54,11 +55,11 @@ async function processVideoTranscoding (job: Bull.Job) {
54 } 55 }
55 56
56 if (payload.type === 'hls') { 57 if (payload.type === 'hls') {
57 await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false) 58 await generateHlsPlaylist(video, payload.resolution, payload.copyCodecs, payload.isPortraitMode || false)
58 59
59 await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) 60 await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
60 } else if (payload.type === 'new-resolution') { 61 } else if (payload.type === 'new-resolution') {
61 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) 62 await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false)
62 63
63 await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) 64 await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
64 } else if (payload.type === 'merge-audio') { 65 } else if (payload.type === 'merge-audio') {
@@ -66,7 +67,7 @@ async function processVideoTranscoding (job: Bull.Job) {
66 67
67 await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) 68 await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
68 } else { 69 } else {
69 await optimizeVideofile(video) 70 await optimizeOriginalVideofile(video)
70 71
71 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) 72 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
72 } 73 }
@@ -74,48 +75,24 @@ async function processVideoTranscoding (job: Bull.Job) {
74 return video 75 return video
75} 76}
76 77
77async function onHlsPlaylistGenerationSuccess (video: MVideoUUID) { 78async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) {
78 if (video === undefined) return undefined 79 if (video === undefined) return undefined
79 80
80 await sequelizeTypescript.transaction(async t => { 81 // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
81 // Maybe the video changed in database, refresh it 82 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
82 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 83 for (const file of video.VideoFiles) {
83 // Video does not exist anymore 84 await video.removeFile(file)
84 if (!videoDatabase) return undefined 85 await file.destroy()
85
86 // If the video was not published, we consider it is a new one for other instances
87 await federateVideoIfNeeded(videoDatabase, false, t)
88 })
89}
90
91async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) {
92 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
93 // Maybe the video changed in database, refresh it
94 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
95 // Video does not exist anymore
96 if (!videoDatabase) return undefined
97
98 let videoPublished = false
99
100 // We transcoded the video file in another format, now we can publish it
101 if (videoDatabase.state !== VideoState.PUBLISHED) {
102 videoPublished = true
103
104 videoDatabase.state = VideoState.PUBLISHED
105 videoDatabase.publishedAt = new Date()
106 videoDatabase = await videoDatabase.save({ transaction: t })
107 } 86 }
108 87
109 // If the video was not published, we consider it is a new one for other instances 88 video.VideoFiles = []
110 await federateVideoIfNeeded(videoDatabase, videoPublished, t) 89 }
111 90
112 return { videoDatabase, videoPublished } 91 return publishAndFederateIfNeeded(video)
113 }) 92}
114 93
115 if (videoPublished) { 94async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) {
116 Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) 95 await publishAndFederateIfNeeded(video)
117 Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
118 }
119 96
120 await createHlsJobIfEnabled(payload) 97 await createHlsJobIfEnabled(payload)
121} 98}
@@ -124,7 +101,7 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O
124 if (videoArg === undefined) return undefined 101 if (videoArg === undefined) return undefined
125 102
126 // Outside the transaction (IO on disk) 103 // Outside the transaction (IO on disk)
127 const { videoFileResolution } = await videoArg.getOriginalFileResolution() 104 const { videoFileResolution } = await videoArg.getMaxQualityResolution()
128 105
129 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { 106 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
130 // Maybe the video changed in database, refresh it 107 // Maybe the video changed in database, refresh it
@@ -141,14 +118,29 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O
141 118
142 let videoPublished = false 119 let videoPublished = false
143 120
121 const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getMaxQualityFile().resolution })
122 await createHlsJobIfEnabled(hlsPayload)
123
144 if (resolutionsEnabled.length !== 0) { 124 if (resolutionsEnabled.length !== 0) {
145 const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = [] 125 const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = []
146 126
147 for (const resolution of resolutionsEnabled) { 127 for (const resolution of resolutionsEnabled) {
148 const dataInput = { 128 let dataInput: VideoTranscodingPayload
149 type: 'new-resolution' as 'new-resolution', 129
150 videoUUID: videoDatabase.uuid, 130 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
151 resolution 131 dataInput = {
132 type: 'new-resolution' as 'new-resolution',
133 videoUUID: videoDatabase.uuid,
134 resolution
135 }
136 } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
137 dataInput = {
138 type: 'hls',
139 videoUUID: videoDatabase.uuid,
140 resolution,
141 isPortraitMode: false,
142 copyCodecs: false
143 }
152 } 144 }
153 145
154 const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) 146 const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
@@ -159,11 +151,8 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O
159 151
160 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) 152 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
161 } else { 153 } else {
162 videoPublished = true
163
164 // No transcoding to do, it's now published 154 // No transcoding to do, it's now published
165 videoDatabase.state = VideoState.PUBLISHED 155 videoPublished = await videoDatabase.publishIfNeededAndSave(t)
166 videoDatabase = await videoDatabase.save({ transaction: t })
167 156
168 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) 157 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
169 } 158 }
@@ -175,9 +164,6 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O
175 164
176 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) 165 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
177 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) 166 if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
178
179 const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution })
180 await createHlsJobIfEnabled(hlsPayload)
181} 167}
182 168
183// --------------------------------------------------------------------------- 169// ---------------------------------------------------------------------------
@@ -196,9 +182,32 @@ function createHlsJobIfEnabled (payload?: { videoUUID: string, resolution: numbe
196 type: 'hls' as 'hls', 182 type: 'hls' as 'hls',
197 videoUUID: payload.videoUUID, 183 videoUUID: payload.videoUUID,
198 resolution: payload.resolution, 184 resolution: payload.resolution,
199 isPortraitMode: payload.isPortraitMode 185 isPortraitMode: payload.isPortraitMode,
186 copyCodecs: true
200 } 187 }
201 188
202 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) 189 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
203 } 190 }
204} 191}
192
193async function publishAndFederateIfNeeded (video: MVideoUUID) {
194 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
195 // Maybe the video changed in database, refresh it
196 const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
197 // Video does not exist anymore
198 if (!videoDatabase) return undefined
199
200 // We transcoded the video file in another format, now we can publish it
201 const videoPublished = await videoDatabase.publishIfNeededAndSave(t)
202
203 // If the video was not published, we consider it is a new one for other instances
204 await federateVideoIfNeeded(videoDatabase, videoPublished, t)
205
206 return { videoDatabase, videoPublished }
207 })
208
209 if (videoPublished) {
210 Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
211 Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
212 }
213}