aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts2
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts37
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts22
-rw-r--r--server/lib/job-queue/handlers/video-channel-import.ts23
-rw-r--r--server/lib/job-queue/handlers/video-import.ts228
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts49
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts100
-rw-r--r--server/lib/job-queue/job-queue.ts21
8 files changed, 272 insertions, 210 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index 84c0a2de2..a25f00b0a 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -88,7 +88,7 @@ async function updateObjectIfNeeded <T> (options: {
88 const { body } = await doJSONRequest<any>(url, { activityPub: true }) 88 const { body } = await doJSONRequest<any>(url, { activityPub: true })
89 89
90 // If not same id, check same host and update 90 // If not same id, check same host and update
91 if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) 91 if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
92 92
93 if (body.type === 'Tombstone') { 93 if (body.type === 'Tombstone') {
94 return on404OrTombstone() 94 return on404OrTombstone()
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index 03aa414c9..cef93afda 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -1,5 +1,7 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { extractVideo } from '@server/helpers/video'
2import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' 3import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
4import { VideoPathManager } from '@server/lib/video-path-manager'
3import { VideoModel } from '@server/models/video/video' 5import { VideoModel } from '@server/models/video/video'
4import { VideoFileModel } from '@server/models/video/video-file' 6import { VideoFileModel } from '@server/models/video/video-file'
5import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 7import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
@@ -30,17 +32,23 @@ async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'c
30 32
31 if (!video || !file) return 33 if (!video || !file) return
32 34
33 await createTorrentAndSetInfoHash(video, file) 35 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
34 36
35 // Refresh videoFile because the createTorrentAndSetInfoHash could be long 37 try {
36 const refreshedFile = await VideoFileModel.loadWithVideo(file.id) 38 await createTorrentAndSetInfoHash(video, file)
37 // File does not exist anymore, remove the generated torrent
38 if (!refreshedFile) return file.removeTorrent()
39 39
40 refreshedFile.infoHash = file.infoHash 40 // Refresh videoFile because the createTorrentAndSetInfoHash could be long
41 refreshedFile.torrentFilename = file.torrentFilename 41 const refreshedFile = await VideoFileModel.loadWithVideo(file.id)
42 // File does not exist anymore, remove the generated torrent
43 if (!refreshedFile) return file.removeTorrent()
42 44
43 return refreshedFile.save() 45 refreshedFile.infoHash = file.infoHash
46 refreshedFile.torrentFilename = file.torrentFilename
47
48 await refreshedFile.save()
49 } finally {
50 fileMutexReleaser()
51 }
44} 52}
45 53
46async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { 54async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) {
@@ -52,9 +60,16 @@ async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { ac
52 60
53 if ((!video && !streamingPlaylist) || !file) return 61 if ((!video && !streamingPlaylist) || !file) return
54 62
55 await updateTorrentMetadata(video || streamingPlaylist, file) 63 const extractedVideo = extractVideo(video || streamingPlaylist)
64 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(extractedVideo.uuid)
56 65
57 await file.save() 66 try {
67 await updateTorrentMetadata(video || streamingPlaylist, file)
68
69 await file.save()
70 } finally {
71 fileMutexReleaser()
72 }
58} 73}
59 74
60async function loadVideoOrLog (videoId: number) { 75async function loadVideoOrLog (videoId: number) {
@@ -82,7 +97,7 @@ async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) {
82async function loadFileOrLog (videoFileId: number) { 97async function loadFileOrLog (videoFileId: number) {
83 if (!videoFileId) return undefined 98 if (!videoFileId) return undefined
84 99
85 const file = await VideoFileModel.loadWithVideo(videoFileId) 100 const file = await VideoFileModel.load(videoFileId)
86 101
87 if (!file) { 102 if (!file) {
88 logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId) 103 logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId)
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 25bdebeea..a1530cc57 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -3,10 +3,10 @@ import { remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { updateTorrentMetadata } from '@server/helpers/webtorrent' 5import { updateTorrentMetadata } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config'
7import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' 6import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
8import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' 7import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
9import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' 8import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
9import { VideoPathManager } from '@server/lib/video-path-manager'
10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' 10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
11import { VideoModel } from '@server/models/video/video' 11import { VideoModel } from '@server/models/video/video'
12import { VideoJobInfoModel } from '@server/models/video/video-job-info' 12import { VideoJobInfoModel } from '@server/models/video/video-job-info'
@@ -28,6 +28,8 @@ export async function processMoveToObjectStorage (job: Job) {
28 28
29 const lTags = lTagsBase(video.uuid, video.url) 29 const lTags = lTagsBase(video.uuid, video.url)
30 30
31 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
32
31 try { 33 try {
32 if (video.VideoFiles) { 34 if (video.VideoFiles) {
33 logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags) 35 logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags)
@@ -49,6 +51,10 @@ export async function processMoveToObjectStorage (job: Job) {
49 } 51 }
50 } catch (err) { 52 } catch (err) {
51 await onMoveToObjectStorageFailure(job, err) 53 await onMoveToObjectStorageFailure(job, err)
54
55 throw err
56 } finally {
57 fileMutexReleaser()
52 } 58 }
53 59
54 return payload.videoUUID 60 return payload.videoUUID
@@ -72,9 +78,9 @@ async function moveWebTorrentFiles (video: MVideoWithAllFiles) {
72 for (const file of video.VideoFiles) { 78 for (const file of video.VideoFiles) {
73 if (file.storage !== VideoStorage.FILE_SYSTEM) continue 79 if (file.storage !== VideoStorage.FILE_SYSTEM) continue
74 80
75 const fileUrl = await storeWebTorrentFile(file.filename) 81 const fileUrl = await storeWebTorrentFile(video, file)
76 82
77 const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename) 83 const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file)
78 await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath }) 84 await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath })
79 } 85 }
80} 86}
@@ -88,10 +94,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
88 94
89 // Resolution playlist 95 // Resolution playlist
90 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) 96 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
91 await storeHLSFile(playlistWithVideo, playlistFilename) 97 await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
92 98
93 // Resolution fragmented file 99 // Resolution fragmented file
94 const fileUrl = await storeHLSFile(playlistWithVideo, file.filename) 100 const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
95 101
96 const oldPath = join(getHLSDirectory(video), file.filename) 102 const oldPath = join(getHLSDirectory(video), file.filename)
97 103
@@ -113,9 +119,9 @@ async function doAfterLastJob (options: {
113 const playlistWithVideo = playlist.withVideo(video) 119 const playlistWithVideo = playlist.withVideo(video)
114 120
115 // Master playlist 121 // Master playlist
116 playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename) 122 playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
117 // Sha256 segments file 123 // Sha256 segments file
118 playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename) 124 playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
119 125
120 playlist.storage = VideoStorage.OBJECT_STORAGE 126 playlist.storage = VideoStorage.OBJECT_STORAGE
121 127
diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts
index 600292844..035f88e96 100644
--- a/server/lib/job-queue/handlers/video-channel-import.ts
+++ b/server/lib/job-queue/handlers/video-channel-import.ts
@@ -5,7 +5,7 @@ import { synchronizeChannel } from '@server/lib/sync-channel'
5import { VideoChannelModel } from '@server/models/video/video-channel' 5import { VideoChannelModel } from '@server/models/video/video-channel'
6import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' 6import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync'
7import { MChannelSync } from '@server/types/models' 7import { MChannelSync } from '@server/types/models'
8import { VideoChannelImportPayload, VideoChannelSyncState } from '@shared/models' 8import { VideoChannelImportPayload } from '@shared/models'
9 9
10export async function processVideoChannelImport (job: Job) { 10export async function processVideoChannelImport (job: Job) {
11 const payload = job.data as VideoChannelImportPayload 11 const payload = job.data as VideoChannelImportPayload
@@ -32,17 +32,12 @@ export async function processVideoChannelImport (job: Job) {
32 32
33 const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) 33 const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId)
34 34
35 try { 35 logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `)
36 logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) 36
37 37 await synchronizeChannel({
38 await synchronizeChannel({ 38 channel: videoChannel,
39 channel: videoChannel, 39 externalChannelUrl: payload.externalChannelUrl,
40 externalChannelUrl: payload.externalChannelUrl, 40 channelSync,
41 channelSync 41 videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.FULL_SYNC_VIDEOS_LIMIT
42 }) 42 })
43 } catch (err) {
44 logger.error(`Failed to import channel ${videoChannel.name}`, { err })
45 channelSync.state = VideoChannelSyncState.FAILED
46 await channelSync.save()
47 }
48} 43}
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 9901b878c..4d361c7b9 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -12,7 +12,8 @@ import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@serv
12import { VideoPathManager } from '@server/lib/video-path-manager' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 13import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
15import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 15import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
16import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
16import { getLowercaseExtension } from '@shared/core-utils' 17import { getLowercaseExtension } from '@shared/core-utils'
17import { isAudioFile } from '@shared/extra-utils' 18import { isAudioFile } from '@shared/extra-utils'
18import { 19import {
@@ -36,7 +37,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
36import { VideoModel } from '../../../models/video/video' 37import { VideoModel } from '../../../models/video/video'
37import { VideoFileModel } from '../../../models/video/video-file' 38import { VideoFileModel } from '../../../models/video/video-file'
38import { VideoImportModel } from '../../../models/video/video-import' 39import { VideoImportModel } from '../../../models/video/video-import'
39import { MThumbnail } from '../../../types/models/video/thumbnail'
40import { federateVideoIfNeeded } from '../../activitypub/videos' 40import { federateVideoIfNeeded } from '../../activitypub/videos'
41import { Notifier } from '../../notifier' 41import { Notifier } from '../../notifier'
42import { generateVideoMiniature } from '../../thumbnail' 42import { generateVideoMiniature } from '../../thumbnail'
@@ -107,7 +107,7 @@ async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefaul
107 107
108async function getVideoImportOrDie (payload: VideoImportPayload) { 108async function getVideoImportOrDie (payload: VideoImportPayload) {
109 const videoImport = await VideoImportModel.loadAndPopulateVideo(payload.videoImportId) 109 const videoImport = await VideoImportModel.loadAndPopulateVideo(payload.videoImportId)
110 if (!videoImport || !videoImport.Video) { 110 if (!videoImport?.Video) {
111 throw new Error(`Cannot import video ${payload.videoImportId}: the video import or video linked to this import does not exist anymore.`) 111 throw new Error(`Cannot import video ${payload.videoImportId}: the video import or video linked to this import does not exist anymore.`)
112 } 112 }
113 113
@@ -178,125 +178,159 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
178 } 178 }
179 179
180 // Video is accepted, resuming preparation 180 // Video is accepted, resuming preparation
181 const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) 181 const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid)
182 // To clean files if the import fails
183 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles })
184
185 // Move file
186 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
187 await move(tempVideoPath, videoDestFile)
188 tempVideoPath = null // This path is not used anymore
189
190 // Generate miniature if the import did not created it
191 let thumbnailModel: MThumbnail
192 let thumbnailSave: object
193 if (!videoImportWithFiles.Video.getMiniature()) {
194 thumbnailModel = await generateVideoMiniature({
195 video: videoImportWithFiles.Video,
196 videoFile,
197 type: ThumbnailType.MINIATURE
198 })
199 thumbnailSave = thumbnailModel.toJSON()
200 }
201 182
202 // Generate preview if the import did not created it 183 try {
203 let previewModel: MThumbnail 184 const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile)
204 let previewSave: object
205 if (!videoImportWithFiles.Video.getPreview()) {
206 previewModel = await generateVideoMiniature({
207 video: videoImportWithFiles.Video,
208 videoFile,
209 type: ThumbnailType.PREVIEW
210 })
211 previewSave = previewModel.toJSON()
212 }
213 185
214 // Create torrent 186 // Move file
215 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) 187 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
188 await move(tempVideoPath, videoDestFile)
216 189
217 const videoFileSave = videoFile.toJSON() 190 tempVideoPath = null // This path is not used anymore
218 191
219 const { videoImportUpdated, video } = await retryTransactionWrapper(() => { 192 let {
220 return sequelizeTypescript.transaction(async t => { 193 miniatureModel: thumbnailModel,
221 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo 194 miniatureJSONSave: thumbnailSave
195 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE)
222 196
223 // Refresh video 197 let {
224 const video = await VideoModel.load(videoImportToUpdate.videoId, t) 198 miniatureModel: previewModel,
225 if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') 199 miniatureJSONSave: previewSave
200 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW)
226 201
227 const videoFileCreated = await videoFile.save({ transaction: t }) 202 // Create torrent
203 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile)
228 204
229 // Update video DB object 205 const videoFileSave = videoFile.toJSON()
230 video.duration = duration
231 video.state = buildNextVideoState(video.state)
232 await video.save({ transaction: t })
233 206
234 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) 207 const { videoImportUpdated, video } = await retryTransactionWrapper(() => {
235 if (previewModel) await video.addAndSaveThumbnail(previewModel, t) 208 return sequelizeTypescript.transaction(async t => {
209 // Refresh video
210 const video = await VideoModel.load(videoImportWithFiles.videoId, t)
211 if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.')
236 212
237 // Now we can federate the video (reload from database, we need more attributes) 213 await videoFile.save({ transaction: t })
238 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
239 await federateVideoIfNeeded(videoForFederation, true, t)
240 214
241 // Update video import object 215 // Update video DB object
242 videoImportToUpdate.state = VideoImportState.SUCCESS 216 video.duration = duration
243 const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo 217 video.state = buildNextVideoState(video.state)
244 videoImportUpdated.Video = video 218 await video.save({ transaction: t })
245 219
246 videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) 220 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
221 if (previewModel) await video.addAndSaveThumbnail(previewModel, t)
247 222
248 logger.info('Video %s imported.', video.uuid) 223 // Now we can federate the video (reload from database, we need more attributes)
224 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
225 await federateVideoIfNeeded(videoForFederation, true, t)
249 226
250 return { videoImportUpdated, video: videoForFederation } 227 // Update video import object
251 }).catch(err => { 228 videoImportWithFiles.state = VideoImportState.SUCCESS
252 // Reset fields 229 const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport
253 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
254 if (previewModel) previewModel = new ThumbnailModel(previewSave)
255 230
256 videoFile = new VideoFileModel(videoFileSave) 231 logger.info('Video %s imported.', video.uuid)
257 232
258 throw err 233 return { videoImportUpdated, video: videoForFederation }
259 }) 234 }).catch(err => {
260 }) 235 // Reset fields
236 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
237 if (previewModel) previewModel = new ThumbnailModel(previewSave)
261 238
262 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: videoImportUpdated, success: true }) 239 videoFile = new VideoFileModel(videoFileSave)
263 240
264 if (video.isBlacklisted()) { 241 throw err
265 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) 242 })
243 })
266 244
267 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) 245 await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User })
268 } else { 246 } finally {
269 Notifier.Instance.notifyOnNewVideoIfNeeded(video) 247 videoFileLockReleaser()
270 } 248 }
249 } catch (err) {
250 await onImportError(err, tempVideoPath, videoImport)
271 251
272 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 252 throw err
273 await JobQueue.Instance.createJob( 253 }
274 await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) 254}
275 )
276 }
277 255
278 // Create transcoding jobs? 256async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> {
279 if (video.state === VideoState.TO_TRANSCODE) { 257 // Refresh video, privacy may have changed
280 await JobQueue.Instance.createJob( 258 const video = await videoImport.Video.reload()
281 await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) 259 const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] })
282 )
283 }
284 260
285 } catch (err) { 261 return Object.assign(videoImport, { Video: videoWithFiles })
286 try { 262}
287 if (tempVideoPath) await remove(tempVideoPath)
288 } catch (errUnlink) {
289 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
290 }
291 263
292 videoImport.error = err.message 264async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) {
293 if (videoImport.state !== VideoImportState.REJECTED) { 265 // Generate miniature if the import did not created it
294 videoImport.state = VideoImportState.FAILED 266 const needsMiniature = thumbnailType === ThumbnailType.MINIATURE
267 ? !videoImportWithFiles.Video.getMiniature()
268 : !videoImportWithFiles.Video.getPreview()
269
270 if (!needsMiniature) {
271 return {
272 miniatureModel: null,
273 miniatureJSONSave: null
295 } 274 }
296 await videoImport.save() 275 }
297 276
298 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) 277 const miniatureModel = await generateVideoMiniature({
278 video: videoImportWithFiles.Video,
279 videoFile,
280 type: thumbnailType
281 })
282 const miniatureJSONSave = miniatureModel.toJSON()
299 283
300 throw err 284 return {
285 miniatureModel,
286 miniatureJSONSave
287 }
288}
289
290async function afterImportSuccess (options: {
291 videoImport: MVideoImport
292 video: MVideoFullLight
293 videoFile: MVideoFile
294 user: MUserId
295}) {
296 const { video, videoFile, videoImport, user } = options
297
298 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true })
299
300 if (video.isBlacklisted()) {
301 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video })
302
303 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist)
304 } else {
305 Notifier.Instance.notifyOnNewVideoIfNeeded(video)
301 } 306 }
307
308 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
309 await JobQueue.Instance.createJob(
310 await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT })
311 )
312 return
313 }
314
315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs?
316 await JobQueue.Instance.createJob(
317 await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
318 )
319 }
320}
321
322async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) {
323 try {
324 if (tempVideoPath) await remove(tempVideoPath)
325 } catch (errUnlink) {
326 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
327 }
328
329 videoImport.error = err.message
330 if (videoImport.state !== VideoImportState.REJECTED) {
331 videoImport.state = VideoImportState.FAILED
332 }
333 await videoImport.save()
334
335 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false })
302} 336}
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 8a3ee09a2..c6263f55a 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -4,7 +4,7 @@ import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' 4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' 7import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' 8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
9import { generateVideoMiniature } from '@server/lib/thumbnail' 9import { generateVideoMiniature } from '@server/lib/thumbnail'
10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' 10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
@@ -18,6 +18,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin
18import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' 18import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
19import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' 19import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
20import { logger, loggerTagsFactory } from '../../../helpers/logger' 20import { logger, loggerTagsFactory } from '../../../helpers/logger'
21import { VideoPathManager } from '@server/lib/video-path-manager'
21 22
22const lTags = loggerTagsFactory('live', 'job') 23const lTags = loggerTagsFactory('live', 'job')
23 24
@@ -34,13 +35,13 @@ async function processVideoLiveEnding (job: Job) {
34 const live = await VideoLiveModel.loadByVideoId(payload.videoId) 35 const live = await VideoLiveModel.loadByVideoId(payload.videoId)
35 const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId) 36 const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId)
36 37
37 const permanentLive = live.permanentLive
38
39 if (!video || !live || !liveSession) { 38 if (!video || !live || !liveSession) {
40 logError() 39 logError()
41 return 40 return
42 } 41 }
43 42
43 const permanentLive = live.permanentLive
44
44 liveSession.endingProcessed = true 45 liveSession.endingProcessed = true
45 await liveSession.save() 46 await liveSession.save()
46 47
@@ -141,23 +142,22 @@ async function replaceLiveByReplay (options: {
141}) { 142}) {
142 const { video, liveSession, live, permanentLive, replayDirectory } = options 143 const { video, liveSession, live, permanentLive, replayDirectory } = options
143 144
144 await cleanupTMPLiveFiles(video) 145 const videoWithFiles = await VideoModel.loadFull(video.id)
146 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
147
148 await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist)
145 149
146 await live.destroy() 150 await live.destroy()
147 151
148 video.isLive = false 152 videoWithFiles.isLive = false
149 video.waitTranscoding = true 153 videoWithFiles.waitTranscoding = true
150 video.state = VideoState.TO_TRANSCODE 154 videoWithFiles.state = VideoState.TO_TRANSCODE
151 155
152 await video.save() 156 await videoWithFiles.save()
153 157
154 liveSession.replayVideoId = video.id 158 liveSession.replayVideoId = videoWithFiles.id
155 await liveSession.save() 159 await liveSession.save()
156 160
157 // Remove old HLS playlist video files
158 const videoWithFiles = await VideoModel.loadFull(video.id)
159
160 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
161 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) 161 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
162 162
163 // Reset playlist 163 // Reset playlist
@@ -206,18 +206,27 @@ async function assignReplayFilesToVideo (options: {
206 const concatenatedTsFiles = await readdir(replayDirectory) 206 const concatenatedTsFiles = await readdir(replayDirectory)
207 207
208 for (const concatenatedTsFile of concatenatedTsFiles) { 208 for (const concatenatedTsFile of concatenatedTsFiles) {
209 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
210
209 const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) 211 const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile)
210 212
211 const probe = await ffprobePromise(concatenatedTsFilePath) 213 const probe = await ffprobePromise(concatenatedTsFilePath)
212 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) 214 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe)
213 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) 215 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
214 216
215 await generateHlsPlaylistResolutionFromTS({ 217 try {
216 video, 218 await generateHlsPlaylistResolutionFromTS({
217 concatenatedTsFilePath, 219 video,
218 resolution, 220 inputFileMutexReleaser,
219 isAAC: audioStream?.codec_name === 'aac' 221 concatenatedTsFilePath,
220 }) 222 resolution,
223 isAAC: audioStream?.codec_name === 'aac'
224 })
225 } catch (err) {
226 logger.error('Cannot generate HLS playlist resolution from TS files.', { err })
227 }
228
229 inputFileMutexReleaser()
221 } 230 }
222 231
223 return video 232 return video
@@ -234,7 +243,7 @@ async function cleanupLiveAndFederate (options: {
234 243
235 if (streamingPlaylist) { 244 if (streamingPlaylist) {
236 if (permanentLive) { 245 if (permanentLive) {
237 await cleanupPermanentLive(video, streamingPlaylist) 246 await cleanupAndDestroyPermanentLive(video, streamingPlaylist)
238 } else { 247 } else {
239 await cleanupUnsavedNormalLive(video, streamingPlaylist) 248 await cleanupUnsavedNormalLive(video, streamingPlaylist)
240 } 249 }
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index b0e92acf7..3e6d23363 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -13,7 +13,6 @@ import {
13 MergeAudioTranscodingPayload, 13 MergeAudioTranscodingPayload,
14 NewWebTorrentResolutionTranscodingPayload, 14 NewWebTorrentResolutionTranscodingPayload,
15 OptimizeTranscodingPayload, 15 OptimizeTranscodingPayload,
16 VideoResolution,
17 VideoTranscodingPayload 16 VideoTranscodingPayload
18} from '@shared/models' 17} from '@shared/models'
19import { retryTransactionWrapper } from '../../../helpers/database-utils' 18import { retryTransactionWrapper } from '../../../helpers/database-utils'
@@ -94,15 +93,24 @@ async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MV
94 93
95 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() 94 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
96 95
97 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { 96 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
98 return generateHlsPlaylistResolution({ 97
99 video, 98 try {
100 videoInputPath, 99 await videoFileInput.getVideo().reload()
101 resolution: payload.resolution, 100
102 copyCodecs: payload.copyCodecs, 101 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
103 job 102 return generateHlsPlaylistResolution({
103 video,
104 videoInputPath,
105 inputFileMutexReleaser,
106 resolution: payload.resolution,
107 copyCodecs: payload.copyCodecs,
108 job
109 })
104 }) 110 })
105 }) 111 } finally {
112 inputFileMutexReleaser()
113 }
106 114
107 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) 115 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
108 116
@@ -177,38 +185,44 @@ async function onVideoFirstWebTorrentTranscoding (
177 transcodeType: TranscodeVODOptionsType, 185 transcodeType: TranscodeVODOptionsType,
178 user: MUserId 186 user: MUserId
179) { 187) {
180 const { resolution, audioStream } = await videoArg.probeMaxQualityFile() 188 const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid)
181 189
182 // Maybe the video changed in database, refresh it 190 try {
183 const videoDatabase = await VideoModel.loadFull(videoArg.uuid) 191 // Maybe the video changed in database, refresh it
184 // Video does not exist anymore 192 const videoDatabase = await VideoModel.loadFull(videoArg.uuid)
185 if (!videoDatabase) return undefined 193 // Video does not exist anymore
186 194 if (!videoDatabase) return undefined
187 // Generate HLS version of the original file 195
188 const originalFileHLSPayload = { 196 const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile()
189 ...payload, 197
190 198 // Generate HLS version of the original file
191 hasAudio: !!audioStream, 199 const originalFileHLSPayload = {
192 resolution: videoDatabase.getMaxQualityFile().resolution, 200 ...payload,
193 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 201
194 copyCodecs: transcodeType !== 'quick-transcode', 202 hasAudio: !!audioStream,
195 isMaxQuality: true 203 resolution: videoDatabase.getMaxQualityFile().resolution,
196 } 204 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
197 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) 205 copyCodecs: transcodeType !== 'quick-transcode',
198 const hasNewResolutions = await createLowerResolutionsJobs({ 206 isMaxQuality: true
199 video: videoDatabase, 207 }
200 user, 208 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
201 videoFileResolution: resolution, 209 const hasNewResolutions = await createLowerResolutionsJobs({
202 hasAudio: !!audioStream, 210 video: videoDatabase,
203 type: 'webtorrent', 211 user,
204 isNewVideo: payload.isNewVideo ?? true 212 videoFileResolution: resolution,
205 }) 213 hasAudio: !!audioStream,
206 214 type: 'webtorrent',
207 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') 215 isNewVideo: payload.isNewVideo ?? true
208 216 })
209 // Move to next state if there are no other resolutions to generate 217
210 if (!hasHls && !hasNewResolutions) { 218 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
211 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) 219
220 // Move to next state if there are no other resolutions to generate
221 if (!hasHls && !hasNewResolutions) {
222 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo })
223 }
224 } finally {
225 mutexReleaser()
212 } 226 }
213} 227}
214 228
@@ -266,7 +280,7 @@ async function createLowerResolutionsJobs (options: {
266 280
267 // Create transcoding jobs if there are enabled resolutions 281 // Create transcoding jobs if there are enabled resolutions
268 const resolutionsEnabled = await Hooks.wrapObject( 282 const resolutionsEnabled = await Hooks.wrapObject(
269 computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true }), 283 computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }),
270 'filter:transcoding.auto.resolutions-to-transcode.result', 284 'filter:transcoding.auto.resolutions-to-transcode.result',
271 options 285 options
272 ) 286 )
@@ -274,8 +288,6 @@ async function createLowerResolutionsJobs (options: {
274 const resolutionCreated: string[] = [] 288 const resolutionCreated: string[] = []
275 289
276 for (const resolution of resolutionsEnabled) { 290 for (const resolution of resolutionsEnabled) {
277 if (resolution === VideoResolution.H_NOVIDEO && hasAudio === false) continue
278
279 let dataInput: VideoTranscodingPayload 291 let dataInput: VideoTranscodingPayload
280 292
281 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { 293 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 655be6568..6bc59732f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
63import { processVideoStudioEdition } from './handlers/video-studio-edition' 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
64import { processVideoTranscoding } from './handlers/video-transcoding' 64import { processVideoTranscoding } from './handlers/video-transcoding'
65import { processVideosViewsStats } from './handlers/video-views-stats' 65import { processVideosViewsStats } from './handlers/video-views-stats'
66import { Redis } from '../redis'
66 67
67export type CreateJobArgument = 68export type CreateJobArgument =
68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 69 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -183,7 +184,7 @@ class JobQueue {
183 } 184 }
184 185
185 this.flowProducer = new FlowProducer({ 186 this.flowProducer = new FlowProducer({
186 connection: this.getRedisConnection(), 187 connection: Redis.getRedisClientOptions('FlowProducer'),
187 prefix: this.jobRedisPrefix 188 prefix: this.jobRedisPrefix
188 }) 189 })
189 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) 190 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
@@ -196,7 +197,7 @@ class JobQueue {
196 autorun: false, 197 autorun: false,
197 concurrency: this.getJobConcurrency(handlerName), 198 concurrency: this.getJobConcurrency(handlerName),
198 prefix: this.jobRedisPrefix, 199 prefix: this.jobRedisPrefix,
199 connection: this.getRedisConnection() 200 connection: Redis.getRedisClientOptions('Worker')
200 } 201 }
201 202
202 const handler = function (job: Job) { 203 const handler = function (job: Job) {
@@ -236,7 +237,7 @@ class JobQueue {
236 237
237 private buildQueue (handlerName: JobType) { 238 private buildQueue (handlerName: JobType) {
238 const queueOptions: QueueOptions = { 239 const queueOptions: QueueOptions = {
239 connection: this.getRedisConnection(), 240 connection: Redis.getRedisClientOptions('Queue'),
240 prefix: this.jobRedisPrefix 241 prefix: this.jobRedisPrefix
241 } 242 }
242 243
@@ -249,7 +250,7 @@ class JobQueue {
249 private buildQueueScheduler (handlerName: JobType) { 250 private buildQueueScheduler (handlerName: JobType) {
250 const queueSchedulerOptions: QueueSchedulerOptions = { 251 const queueSchedulerOptions: QueueSchedulerOptions = {
251 autorun: false, 252 autorun: false,
252 connection: this.getRedisConnection(), 253 connection: Redis.getRedisClientOptions('QueueScheduler'),
253 prefix: this.jobRedisPrefix, 254 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10 255 maxStalledCount: 10
255 } 256 }
@@ -263,7 +264,7 @@ class JobQueue {
263 private buildQueueEvent (handlerName: JobType) { 264 private buildQueueEvent (handlerName: JobType) {
264 const queueEventsOptions: QueueEventsOptions = { 265 const queueEventsOptions: QueueEventsOptions = {
265 autorun: false, 266 autorun: false,
266 connection: this.getRedisConnection(), 267 connection: Redis.getRedisClientOptions('QueueEvent'),
267 prefix: this.jobRedisPrefix 268 prefix: this.jobRedisPrefix
268 } 269 }
269 270
@@ -273,16 +274,6 @@ class JobQueue {
273 this.queueEvents[handlerName] = queueEvents 274 this.queueEvents[handlerName] = queueEvents
274 } 275 }
275 276
276 private getRedisConnection () {
277 return {
278 password: CONFIG.REDIS.AUTH,
279 db: CONFIG.REDIS.DB,
280 host: CONFIG.REDIS.HOSTNAME,
281 port: CONFIG.REDIS.PORT,
282 path: CONFIG.REDIS.SOCKET
283 }
284 }
285
286 // --------------------------------------------------------------------------- 277 // ---------------------------------------------------------------------------
287 278
288 async terminate () { 279 async terminate () {