diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/manage-video-torrent.ts | 37 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/move-to-object-storage.ts | 22 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-channel-import.ts | 23 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 226 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-live-ending.ts | 49 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 100 |
6 files changed, 264 insertions, 193 deletions
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index 03aa414c9..cef93afda 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts | |||
@@ -1,5 +1,7 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { extractVideo } from '@server/helpers/video' | ||
2 | import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' | 3 | import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' |
4 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
3 | import { VideoModel } from '@server/models/video/video' | 5 | import { VideoModel } from '@server/models/video/video' |
4 | import { VideoFileModel } from '@server/models/video/video-file' | 6 | import { VideoFileModel } from '@server/models/video/video-file' |
5 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 7 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
@@ -30,17 +32,23 @@ async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'c | |||
30 | 32 | ||
31 | if (!video || !file) return | 33 | if (!video || !file) return |
32 | 34 | ||
33 | await createTorrentAndSetInfoHash(video, file) | 35 | const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) |
34 | 36 | ||
35 | // Refresh videoFile because the createTorrentAndSetInfoHash could be long | 37 | try { |
36 | const refreshedFile = await VideoFileModel.loadWithVideo(file.id) | 38 | await createTorrentAndSetInfoHash(video, file) |
37 | // File does not exist anymore, remove the generated torrent | ||
38 | if (!refreshedFile) return file.removeTorrent() | ||
39 | 39 | ||
40 | refreshedFile.infoHash = file.infoHash | 40 | // Refresh videoFile because the createTorrentAndSetInfoHash could be long |
41 | refreshedFile.torrentFilename = file.torrentFilename | 41 | const refreshedFile = await VideoFileModel.loadWithVideo(file.id) |
42 | // File does not exist anymore, remove the generated torrent | ||
43 | if (!refreshedFile) return file.removeTorrent() | ||
42 | 44 | ||
43 | return refreshedFile.save() | 45 | refreshedFile.infoHash = file.infoHash |
46 | refreshedFile.torrentFilename = file.torrentFilename | ||
47 | |||
48 | await refreshedFile.save() | ||
49 | } finally { | ||
50 | fileMutexReleaser() | ||
51 | } | ||
44 | } | 52 | } |
45 | 53 | ||
46 | async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { | 54 | async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { |
@@ -52,9 +60,16 @@ async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { ac | |||
52 | 60 | ||
53 | if ((!video && !streamingPlaylist) || !file) return | 61 | if ((!video && !streamingPlaylist) || !file) return |
54 | 62 | ||
55 | await updateTorrentMetadata(video || streamingPlaylist, file) | 63 | const extractedVideo = extractVideo(video || streamingPlaylist) |
64 | const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(extractedVideo.uuid) | ||
56 | 65 | ||
57 | await file.save() | 66 | try { |
67 | await updateTorrentMetadata(video || streamingPlaylist, file) | ||
68 | |||
69 | await file.save() | ||
70 | } finally { | ||
71 | fileMutexReleaser() | ||
72 | } | ||
58 | } | 73 | } |
59 | 74 | ||
60 | async function loadVideoOrLog (videoId: number) { | 75 | async function loadVideoOrLog (videoId: number) { |
@@ -82,7 +97,7 @@ async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) { | |||
82 | async function loadFileOrLog (videoFileId: number) { | 97 | async function loadFileOrLog (videoFileId: number) { |
83 | if (!videoFileId) return undefined | 98 | if (!videoFileId) return undefined |
84 | 99 | ||
85 | const file = await VideoFileModel.loadWithVideo(videoFileId) | 100 | const file = await VideoFileModel.load(videoFileId) |
86 | 101 | ||
87 | if (!file) { | 102 | if (!file) { |
88 | logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId) | 103 | logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId) |
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 25bdebeea..a1530cc57 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -3,10 +3,10 @@ import { remove } from 'fs-extra' | |||
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
5 | import { updateTorrentMetadata } from '@server/helpers/webtorrent' | 5 | import { updateTorrentMetadata } from '@server/helpers/webtorrent' |
6 | import { CONFIG } from '@server/initializers/config' | ||
7 | import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' | 6 | import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' |
8 | import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' | 7 | import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage' |
9 | import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' | 8 | import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' |
9 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
10 | import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' | 10 | import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' |
11 | import { VideoModel } from '@server/models/video/video' | 11 | import { VideoModel } from '@server/models/video/video' |
12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
@@ -28,6 +28,8 @@ export async function processMoveToObjectStorage (job: Job) { | |||
28 | 28 | ||
29 | const lTags = lTagsBase(video.uuid, video.url) | 29 | const lTags = lTagsBase(video.uuid, video.url) |
30 | 30 | ||
31 | const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
32 | |||
31 | try { | 33 | try { |
32 | if (video.VideoFiles) { | 34 | if (video.VideoFiles) { |
33 | logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags) | 35 | logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags) |
@@ -49,6 +51,10 @@ export async function processMoveToObjectStorage (job: Job) { | |||
49 | } | 51 | } |
50 | } catch (err) { | 52 | } catch (err) { |
51 | await onMoveToObjectStorageFailure(job, err) | 53 | await onMoveToObjectStorageFailure(job, err) |
54 | |||
55 | throw err | ||
56 | } finally { | ||
57 | fileMutexReleaser() | ||
52 | } | 58 | } |
53 | 59 | ||
54 | return payload.videoUUID | 60 | return payload.videoUUID |
@@ -72,9 +78,9 @@ async function moveWebTorrentFiles (video: MVideoWithAllFiles) { | |||
72 | for (const file of video.VideoFiles) { | 78 | for (const file of video.VideoFiles) { |
73 | if (file.storage !== VideoStorage.FILE_SYSTEM) continue | 79 | if (file.storage !== VideoStorage.FILE_SYSTEM) continue |
74 | 80 | ||
75 | const fileUrl = await storeWebTorrentFile(file.filename) | 81 | const fileUrl = await storeWebTorrentFile(video, file) |
76 | 82 | ||
77 | const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename) | 83 | const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file) |
78 | await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath }) | 84 | await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath }) |
79 | } | 85 | } |
80 | } | 86 | } |
@@ -88,10 +94,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) { | |||
88 | 94 | ||
89 | // Resolution playlist | 95 | // Resolution playlist |
90 | const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) | 96 | const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) |
91 | await storeHLSFile(playlistWithVideo, playlistFilename) | 97 | await storeHLSFileFromFilename(playlistWithVideo, playlistFilename) |
92 | 98 | ||
93 | // Resolution fragmented file | 99 | // Resolution fragmented file |
94 | const fileUrl = await storeHLSFile(playlistWithVideo, file.filename) | 100 | const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename) |
95 | 101 | ||
96 | const oldPath = join(getHLSDirectory(video), file.filename) | 102 | const oldPath = join(getHLSDirectory(video), file.filename) |
97 | 103 | ||
@@ -113,9 +119,9 @@ async function doAfterLastJob (options: { | |||
113 | const playlistWithVideo = playlist.withVideo(video) | 119 | const playlistWithVideo = playlist.withVideo(video) |
114 | 120 | ||
115 | // Master playlist | 121 | // Master playlist |
116 | playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename) | 122 | playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename) |
117 | // Sha256 segments file | 123 | // Sha256 segments file |
118 | playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename) | 124 | playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename) |
119 | 125 | ||
120 | playlist.storage = VideoStorage.OBJECT_STORAGE | 126 | playlist.storage = VideoStorage.OBJECT_STORAGE |
121 | 127 | ||
diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts index 600292844..035f88e96 100644 --- a/server/lib/job-queue/handlers/video-channel-import.ts +++ b/server/lib/job-queue/handlers/video-channel-import.ts | |||
@@ -5,7 +5,7 @@ import { synchronizeChannel } from '@server/lib/sync-channel' | |||
5 | import { VideoChannelModel } from '@server/models/video/video-channel' | 5 | import { VideoChannelModel } from '@server/models/video/video-channel' |
6 | import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' | 6 | import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' |
7 | import { MChannelSync } from '@server/types/models' | 7 | import { MChannelSync } from '@server/types/models' |
8 | import { VideoChannelImportPayload, VideoChannelSyncState } from '@shared/models' | 8 | import { VideoChannelImportPayload } from '@shared/models' |
9 | 9 | ||
10 | export async function processVideoChannelImport (job: Job) { | 10 | export async function processVideoChannelImport (job: Job) { |
11 | const payload = job.data as VideoChannelImportPayload | 11 | const payload = job.data as VideoChannelImportPayload |
@@ -32,17 +32,12 @@ export async function processVideoChannelImport (job: Job) { | |||
32 | 32 | ||
33 | const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) | 33 | const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) |
34 | 34 | ||
35 | try { | 35 | logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) |
36 | logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) | 36 | |
37 | 37 | await synchronizeChannel({ | |
38 | await synchronizeChannel({ | 38 | channel: videoChannel, |
39 | channel: videoChannel, | 39 | externalChannelUrl: payload.externalChannelUrl, |
40 | externalChannelUrl: payload.externalChannelUrl, | 40 | channelSync, |
41 | channelSync | 41 | videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.FULL_SYNC_VIDEOS_LIMIT |
42 | }) | 42 | }) |
43 | } catch (err) { | ||
44 | logger.error(`Failed to import channel ${videoChannel.name}`, { err }) | ||
45 | channelSync.state = VideoChannelSyncState.FAILED | ||
46 | await channelSync.save() | ||
47 | } | ||
48 | } | 43 | } |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 9901b878c..83d582cb4 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -12,7 +12,8 @@ import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@serv | |||
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 12 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 13 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 14 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
15 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 15 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' |
16 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | ||
16 | import { getLowercaseExtension } from '@shared/core-utils' | 17 | import { getLowercaseExtension } from '@shared/core-utils' |
17 | import { isAudioFile } from '@shared/extra-utils' | 18 | import { isAudioFile } from '@shared/extra-utils' |
18 | import { | 19 | import { |
@@ -36,7 +37,6 @@ import { sequelizeTypescript } from '../../../initializers/database' | |||
36 | import { VideoModel } from '../../../models/video/video' | 37 | import { VideoModel } from '../../../models/video/video' |
37 | import { VideoFileModel } from '../../../models/video/video-file' | 38 | import { VideoFileModel } from '../../../models/video/video-file' |
38 | import { VideoImportModel } from '../../../models/video/video-import' | 39 | import { VideoImportModel } from '../../../models/video/video-import' |
39 | import { MThumbnail } from '../../../types/models/video/thumbnail' | ||
40 | import { federateVideoIfNeeded } from '../../activitypub/videos' | 40 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
41 | import { Notifier } from '../../notifier' | 41 | import { Notifier } from '../../notifier' |
42 | import { generateVideoMiniature } from '../../thumbnail' | 42 | import { generateVideoMiniature } from '../../thumbnail' |
@@ -178,125 +178,159 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
178 | } | 178 | } |
179 | 179 | ||
180 | // Video is accepted, resuming preparation | 180 | // Video is accepted, resuming preparation |
181 | const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) | 181 | const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid) |
182 | // To clean files if the import fails | ||
183 | const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles }) | ||
184 | |||
185 | // Move file | ||
186 | const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) | ||
187 | await move(tempVideoPath, videoDestFile) | ||
188 | tempVideoPath = null // This path is not used anymore | ||
189 | |||
190 | // Generate miniature if the import did not created it | ||
191 | let thumbnailModel: MThumbnail | ||
192 | let thumbnailSave: object | ||
193 | if (!videoImportWithFiles.Video.getMiniature()) { | ||
194 | thumbnailModel = await generateVideoMiniature({ | ||
195 | video: videoImportWithFiles.Video, | ||
196 | videoFile, | ||
197 | type: ThumbnailType.MINIATURE | ||
198 | }) | ||
199 | thumbnailSave = thumbnailModel.toJSON() | ||
200 | } | ||
201 | 182 | ||
202 | // Generate preview if the import did not created it | 183 | try { |
203 | let previewModel: MThumbnail | 184 | const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile) |
204 | let previewSave: object | ||
205 | if (!videoImportWithFiles.Video.getPreview()) { | ||
206 | previewModel = await generateVideoMiniature({ | ||
207 | video: videoImportWithFiles.Video, | ||
208 | videoFile, | ||
209 | type: ThumbnailType.PREVIEW | ||
210 | }) | ||
211 | previewSave = previewModel.toJSON() | ||
212 | } | ||
213 | 185 | ||
214 | // Create torrent | 186 | // Move file |
215 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | 187 | const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) |
188 | await move(tempVideoPath, videoDestFile) | ||
216 | 189 | ||
217 | const videoFileSave = videoFile.toJSON() | 190 | tempVideoPath = null // This path is not used anymore |
218 | 191 | ||
219 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { | 192 | let { |
220 | return sequelizeTypescript.transaction(async t => { | 193 | miniatureModel: thumbnailModel, |
221 | const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo | 194 | miniatureJSONSave: thumbnailSave |
195 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE) | ||
222 | 196 | ||
223 | // Refresh video | 197 | let { |
224 | const video = await VideoModel.load(videoImportToUpdate.videoId, t) | 198 | miniatureModel: previewModel, |
225 | if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') | 199 | miniatureJSONSave: previewSave |
200 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW) | ||
226 | 201 | ||
227 | const videoFileCreated = await videoFile.save({ transaction: t }) | 202 | // Create torrent |
203 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | ||
228 | 204 | ||
229 | // Update video DB object | 205 | const videoFileSave = videoFile.toJSON() |
230 | video.duration = duration | ||
231 | video.state = buildNextVideoState(video.state) | ||
232 | await video.save({ transaction: t }) | ||
233 | 206 | ||
234 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) | 207 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { |
235 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | 208 | return sequelizeTypescript.transaction(async t => { |
209 | // Refresh video | ||
210 | const video = await VideoModel.load(videoImportWithFiles.videoId, t) | ||
211 | if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.') | ||
236 | 212 | ||
237 | // Now we can federate the video (reload from database, we need more attributes) | 213 | await videoFile.save({ transaction: t }) |
238 | const videoForFederation = await VideoModel.loadFull(video.uuid, t) | ||
239 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
240 | 214 | ||
241 | // Update video import object | 215 | // Update video DB object |
242 | videoImportToUpdate.state = VideoImportState.SUCCESS | 216 | video.duration = duration |
243 | const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo | 217 | video.state = buildNextVideoState(video.state) |
244 | videoImportUpdated.Video = video | 218 | await video.save({ transaction: t }) |
245 | 219 | ||
246 | videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) | 220 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) |
221 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | ||
247 | 222 | ||
248 | logger.info('Video %s imported.', video.uuid) | 223 | // Now we can federate the video (reload from database, we need more attributes) |
224 | const videoForFederation = await VideoModel.loadFull(video.uuid, t) | ||
225 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
249 | 226 | ||
250 | return { videoImportUpdated, video: videoForFederation } | 227 | // Update video import object |
251 | }).catch(err => { | 228 | videoImportWithFiles.state = VideoImportState.SUCCESS |
252 | // Reset fields | 229 | const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport |
253 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
254 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
255 | 230 | ||
256 | videoFile = new VideoFileModel(videoFileSave) | 231 | logger.info('Video %s imported.', video.uuid) |
257 | 232 | ||
258 | throw err | 233 | return { videoImportUpdated, video: videoForFederation } |
259 | }) | 234 | }).catch(err => { |
260 | }) | 235 | // Reset fields |
236 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
237 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
261 | 238 | ||
262 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: videoImportUpdated, success: true }) | 239 | videoFile = new VideoFileModel(videoFileSave) |
263 | 240 | ||
264 | if (video.isBlacklisted()) { | 241 | throw err |
265 | const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) | 242 | }) |
243 | }) | ||
266 | 244 | ||
267 | Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) | 245 | await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User }) |
268 | } else { | 246 | } finally { |
269 | Notifier.Instance.notifyOnNewVideoIfNeeded(video) | 247 | videoFileLockReleaser() |
270 | } | 248 | } |
249 | } catch (err) { | ||
250 | await onImportError(err, tempVideoPath, videoImport) | ||
271 | 251 | ||
272 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | 252 | throw err |
273 | await JobQueue.Instance.createJob( | 253 | } |
274 | await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | 254 | } |
275 | ) | ||
276 | } | ||
277 | 255 | ||
278 | // Create transcoding jobs? | 256 | async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> { |
279 | if (video.state === VideoState.TO_TRANSCODE) { | 257 | // Refresh video, privacy may have changed |
280 | await JobQueue.Instance.createJob( | 258 | const video = await videoImport.Video.reload() |
281 | await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | 259 | const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) |
282 | ) | ||
283 | } | ||
284 | 260 | ||
285 | } catch (err) { | 261 | return Object.assign(videoImport, { Video: videoWithFiles }) |
286 | try { | 262 | } |
287 | if (tempVideoPath) await remove(tempVideoPath) | ||
288 | } catch (errUnlink) { | ||
289 | logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink }) | ||
290 | } | ||
291 | 263 | ||
292 | videoImport.error = err.message | 264 | async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) { |
293 | if (videoImport.state !== VideoImportState.REJECTED) { | 265 | // Generate miniature if the import did not created it |
294 | videoImport.state = VideoImportState.FAILED | 266 | const needsMiniature = thumbnailType === ThumbnailType.MINIATURE |
267 | ? !videoImportWithFiles.Video.getMiniature() | ||
268 | : !videoImportWithFiles.Video.getPreview() | ||
269 | |||
270 | if (!needsMiniature) { | ||
271 | return { | ||
272 | miniatureModel: null, | ||
273 | miniatureJSONSave: null | ||
295 | } | 274 | } |
296 | await videoImport.save() | 275 | } |
297 | 276 | ||
298 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) | 277 | const miniatureModel = await generateVideoMiniature({ |
278 | video: videoImportWithFiles.Video, | ||
279 | videoFile, | ||
280 | type: thumbnailType | ||
281 | }) | ||
282 | const miniatureJSONSave = miniatureModel.toJSON() | ||
299 | 283 | ||
300 | throw err | 284 | return { |
285 | miniatureModel, | ||
286 | miniatureJSONSave | ||
287 | } | ||
288 | } | ||
289 | |||
290 | async function afterImportSuccess (options: { | ||
291 | videoImport: MVideoImport | ||
292 | video: MVideoFullLight | ||
293 | videoFile: MVideoFile | ||
294 | user: MUserId | ||
295 | }) { | ||
296 | const { video, videoFile, videoImport, user } = options | ||
297 | |||
298 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true }) | ||
299 | |||
300 | if (video.isBlacklisted()) { | ||
301 | const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) | ||
302 | |||
303 | Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) | ||
304 | } else { | ||
305 | Notifier.Instance.notifyOnNewVideoIfNeeded(video) | ||
301 | } | 306 | } |
307 | |||
308 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
309 | await JobQueue.Instance.createJob( | ||
310 | await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT }) | ||
311 | ) | ||
312 | return | ||
313 | } | ||
314 | |||
315 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? | ||
316 | await JobQueue.Instance.createJob( | ||
317 | await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) | ||
318 | ) | ||
319 | } | ||
320 | } | ||
321 | |||
322 | async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) { | ||
323 | try { | ||
324 | if (tempVideoPath) await remove(tempVideoPath) | ||
325 | } catch (errUnlink) { | ||
326 | logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink }) | ||
327 | } | ||
328 | |||
329 | videoImport.error = err.message | ||
330 | if (videoImport.state !== VideoImportState.REJECTED) { | ||
331 | videoImport.state = VideoImportState.FAILED | ||
332 | } | ||
333 | await videoImport.save() | ||
334 | |||
335 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) | ||
302 | } | 336 | } |
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 8a3ee09a2..c6263f55a 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -4,7 +4,7 @@ import { join } from 'path' | |||
4 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' | 4 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' |
5 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' | 5 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' |
6 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 6 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
7 | import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' | 7 | import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' |
8 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' | 8 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' |
9 | import { generateVideoMiniature } from '@server/lib/thumbnail' | 9 | import { generateVideoMiniature } from '@server/lib/thumbnail' |
10 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' | 10 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' |
@@ -18,6 +18,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin | |||
18 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' | 18 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' |
19 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' | 19 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' |
20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
21 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
21 | 22 | ||
22 | const lTags = loggerTagsFactory('live', 'job') | 23 | const lTags = loggerTagsFactory('live', 'job') |
23 | 24 | ||
@@ -34,13 +35,13 @@ async function processVideoLiveEnding (job: Job) { | |||
34 | const live = await VideoLiveModel.loadByVideoId(payload.videoId) | 35 | const live = await VideoLiveModel.loadByVideoId(payload.videoId) |
35 | const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId) | 36 | const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId) |
36 | 37 | ||
37 | const permanentLive = live.permanentLive | ||
38 | |||
39 | if (!video || !live || !liveSession) { | 38 | if (!video || !live || !liveSession) { |
40 | logError() | 39 | logError() |
41 | return | 40 | return |
42 | } | 41 | } |
43 | 42 | ||
43 | const permanentLive = live.permanentLive | ||
44 | |||
44 | liveSession.endingProcessed = true | 45 | liveSession.endingProcessed = true |
45 | await liveSession.save() | 46 | await liveSession.save() |
46 | 47 | ||
@@ -141,23 +142,22 @@ async function replaceLiveByReplay (options: { | |||
141 | }) { | 142 | }) { |
142 | const { video, liveSession, live, permanentLive, replayDirectory } = options | 143 | const { video, liveSession, live, permanentLive, replayDirectory } = options |
143 | 144 | ||
144 | await cleanupTMPLiveFiles(video) | 145 | const videoWithFiles = await VideoModel.loadFull(video.id) |
146 | const hlsPlaylist = videoWithFiles.getHLSPlaylist() | ||
147 | |||
148 | await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist) | ||
145 | 149 | ||
146 | await live.destroy() | 150 | await live.destroy() |
147 | 151 | ||
148 | video.isLive = false | 152 | videoWithFiles.isLive = false |
149 | video.waitTranscoding = true | 153 | videoWithFiles.waitTranscoding = true |
150 | video.state = VideoState.TO_TRANSCODE | 154 | videoWithFiles.state = VideoState.TO_TRANSCODE |
151 | 155 | ||
152 | await video.save() | 156 | await videoWithFiles.save() |
153 | 157 | ||
154 | liveSession.replayVideoId = video.id | 158 | liveSession.replayVideoId = videoWithFiles.id |
155 | await liveSession.save() | 159 | await liveSession.save() |
156 | 160 | ||
157 | // Remove old HLS playlist video files | ||
158 | const videoWithFiles = await VideoModel.loadFull(video.id) | ||
159 | |||
160 | const hlsPlaylist = videoWithFiles.getHLSPlaylist() | ||
161 | await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) | 161 | await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) |
162 | 162 | ||
163 | // Reset playlist | 163 | // Reset playlist |
@@ -206,18 +206,27 @@ async function assignReplayFilesToVideo (options: { | |||
206 | const concatenatedTsFiles = await readdir(replayDirectory) | 206 | const concatenatedTsFiles = await readdir(replayDirectory) |
207 | 207 | ||
208 | for (const concatenatedTsFile of concatenatedTsFiles) { | 208 | for (const concatenatedTsFile of concatenatedTsFiles) { |
209 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
210 | |||
209 | const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) | 211 | const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) |
210 | 212 | ||
211 | const probe = await ffprobePromise(concatenatedTsFilePath) | 213 | const probe = await ffprobePromise(concatenatedTsFilePath) |
212 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) | 214 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) |
213 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) | 215 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) |
214 | 216 | ||
215 | await generateHlsPlaylistResolutionFromTS({ | 217 | try { |
216 | video, | 218 | await generateHlsPlaylistResolutionFromTS({ |
217 | concatenatedTsFilePath, | 219 | video, |
218 | resolution, | 220 | inputFileMutexReleaser, |
219 | isAAC: audioStream?.codec_name === 'aac' | 221 | concatenatedTsFilePath, |
220 | }) | 222 | resolution, |
223 | isAAC: audioStream?.codec_name === 'aac' | ||
224 | }) | ||
225 | } catch (err) { | ||
226 | logger.error('Cannot generate HLS playlist resolution from TS files.', { err }) | ||
227 | } | ||
228 | |||
229 | inputFileMutexReleaser() | ||
221 | } | 230 | } |
222 | 231 | ||
223 | return video | 232 | return video |
@@ -234,7 +243,7 @@ async function cleanupLiveAndFederate (options: { | |||
234 | 243 | ||
235 | if (streamingPlaylist) { | 244 | if (streamingPlaylist) { |
236 | if (permanentLive) { | 245 | if (permanentLive) { |
237 | await cleanupPermanentLive(video, streamingPlaylist) | 246 | await cleanupAndDestroyPermanentLive(video, streamingPlaylist) |
238 | } else { | 247 | } else { |
239 | await cleanupUnsavedNormalLive(video, streamingPlaylist) | 248 | await cleanupUnsavedNormalLive(video, streamingPlaylist) |
240 | } | 249 | } |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index b0e92acf7..3e6d23363 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -13,7 +13,6 @@ import { | |||
13 | MergeAudioTranscodingPayload, | 13 | MergeAudioTranscodingPayload, |
14 | NewWebTorrentResolutionTranscodingPayload, | 14 | NewWebTorrentResolutionTranscodingPayload, |
15 | OptimizeTranscodingPayload, | 15 | OptimizeTranscodingPayload, |
16 | VideoResolution, | ||
17 | VideoTranscodingPayload | 16 | VideoTranscodingPayload |
18 | } from '@shared/models' | 17 | } from '@shared/models' |
19 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 18 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
@@ -94,15 +93,24 @@ async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MV | |||
94 | 93 | ||
95 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | 94 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() |
96 | 95 | ||
97 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { | 96 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) |
98 | return generateHlsPlaylistResolution({ | 97 | |
99 | video, | 98 | try { |
100 | videoInputPath, | 99 | await videoFileInput.getVideo().reload() |
101 | resolution: payload.resolution, | 100 | |
102 | copyCodecs: payload.copyCodecs, | 101 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { |
103 | job | 102 | return generateHlsPlaylistResolution({ |
103 | video, | ||
104 | videoInputPath, | ||
105 | inputFileMutexReleaser, | ||
106 | resolution: payload.resolution, | ||
107 | copyCodecs: payload.copyCodecs, | ||
108 | job | ||
109 | }) | ||
104 | }) | 110 | }) |
105 | }) | 111 | } finally { |
112 | inputFileMutexReleaser() | ||
113 | } | ||
106 | 114 | ||
107 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 115 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
108 | 116 | ||
@@ -177,38 +185,44 @@ async function onVideoFirstWebTorrentTranscoding ( | |||
177 | transcodeType: TranscodeVODOptionsType, | 185 | transcodeType: TranscodeVODOptionsType, |
178 | user: MUserId | 186 | user: MUserId |
179 | ) { | 187 | ) { |
180 | const { resolution, audioStream } = await videoArg.probeMaxQualityFile() | 188 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) |
181 | 189 | ||
182 | // Maybe the video changed in database, refresh it | 190 | try { |
183 | const videoDatabase = await VideoModel.loadFull(videoArg.uuid) | 191 | // Maybe the video changed in database, refresh it |
184 | // Video does not exist anymore | 192 | const videoDatabase = await VideoModel.loadFull(videoArg.uuid) |
185 | if (!videoDatabase) return undefined | 193 | // Video does not exist anymore |
186 | 194 | if (!videoDatabase) return undefined | |
187 | // Generate HLS version of the original file | 195 | |
188 | const originalFileHLSPayload = { | 196 | const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile() |
189 | ...payload, | 197 | |
190 | 198 | // Generate HLS version of the original file | |
191 | hasAudio: !!audioStream, | 199 | const originalFileHLSPayload = { |
192 | resolution: videoDatabase.getMaxQualityFile().resolution, | 200 | ...payload, |
193 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | 201 | |
194 | copyCodecs: transcodeType !== 'quick-transcode', | 202 | hasAudio: !!audioStream, |
195 | isMaxQuality: true | 203 | resolution: videoDatabase.getMaxQualityFile().resolution, |
196 | } | 204 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues |
197 | const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) | 205 | copyCodecs: transcodeType !== 'quick-transcode', |
198 | const hasNewResolutions = await createLowerResolutionsJobs({ | 206 | isMaxQuality: true |
199 | video: videoDatabase, | 207 | } |
200 | user, | 208 | const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) |
201 | videoFileResolution: resolution, | 209 | const hasNewResolutions = await createLowerResolutionsJobs({ |
202 | hasAudio: !!audioStream, | 210 | video: videoDatabase, |
203 | type: 'webtorrent', | 211 | user, |
204 | isNewVideo: payload.isNewVideo ?? true | 212 | videoFileResolution: resolution, |
205 | }) | 213 | hasAudio: !!audioStream, |
206 | 214 | type: 'webtorrent', | |
207 | await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') | 215 | isNewVideo: payload.isNewVideo ?? true |
208 | 216 | }) | |
209 | // Move to next state if there are no other resolutions to generate | 217 | |
210 | if (!hasHls && !hasNewResolutions) { | 218 | await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') |
211 | await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) | 219 | |
220 | // Move to next state if there are no other resolutions to generate | ||
221 | if (!hasHls && !hasNewResolutions) { | ||
222 | await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) | ||
223 | } | ||
224 | } finally { | ||
225 | mutexReleaser() | ||
212 | } | 226 | } |
213 | } | 227 | } |
214 | 228 | ||
@@ -266,7 +280,7 @@ async function createLowerResolutionsJobs (options: { | |||
266 | 280 | ||
267 | // Create transcoding jobs if there are enabled resolutions | 281 | // Create transcoding jobs if there are enabled resolutions |
268 | const resolutionsEnabled = await Hooks.wrapObject( | 282 | const resolutionsEnabled = await Hooks.wrapObject( |
269 | computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true }), | 283 | computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), |
270 | 'filter:transcoding.auto.resolutions-to-transcode.result', | 284 | 'filter:transcoding.auto.resolutions-to-transcode.result', |
271 | options | 285 | options |
272 | ) | 286 | ) |
@@ -274,8 +288,6 @@ async function createLowerResolutionsJobs (options: { | |||
274 | const resolutionCreated: string[] = [] | 288 | const resolutionCreated: string[] = [] |
275 | 289 | ||
276 | for (const resolution of resolutionsEnabled) { | 290 | for (const resolution of resolutionsEnabled) { |
277 | if (resolution === VideoResolution.H_NOVIDEO && hasAudio === false) continue | ||
278 | |||
279 | let dataInput: VideoTranscodingPayload | 291 | let dataInput: VideoTranscodingPayload |
280 | 292 | ||
281 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { | 293 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { |