aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/process/process-create.ts2
-rw-r--r--server/lib/activitypub/video-comments.ts35
-rw-r--r--server/lib/auth/oauth.ts36
-rw-r--r--server/lib/hls.ts6
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts37
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts22
-rw-r--r--server/lib/job-queue/handlers/video-channel-import.ts23
-rw-r--r--server/lib/job-queue/handlers/video-import.ts226
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts49
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts100
-rw-r--r--server/lib/live/live-manager.ts18
-rw-r--r--server/lib/live/live-segment-sha-store.ts82
-rw-r--r--server/lib/live/live-utils.ts69
-rw-r--r--server/lib/live/shared/muxing-session.ts115
-rw-r--r--server/lib/moderation.ts42
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts223
-rw-r--r--server/lib/object-storage/urls.ts29
-rw-r--r--server/lib/object-storage/videos.ts138
-rw-r--r--server/lib/paths.ts17
-rw-r--r--server/lib/plugins/plugin-helpers-builder.ts28
-rw-r--r--server/lib/plugins/plugin-manager.ts31
-rw-r--r--server/lib/plugins/register-helpers.ts21
-rw-r--r--server/lib/redis.ts25
-rw-r--r--server/lib/schedulers/update-videos-scheduler.ts68
-rw-r--r--server/lib/schedulers/video-channel-sync-latest-scheduler.ts35
-rw-r--r--server/lib/schedulers/videos-redundancy-scheduler.ts25
-rw-r--r--server/lib/sync-channel.ts88
-rw-r--r--server/lib/transcoding/transcoding.ts386
-rw-r--r--server/lib/uploadx.ts14
-rw-r--r--server/lib/video-path-manager.ts51
-rw-r--r--server/lib/video-pre-import.ts (renamed from server/lib/video-import.ts)0
-rw-r--r--server/lib/video-privacy.ts127
-rw-r--r--server/lib/video-tokens-manager.ts49
-rw-r--r--server/lib/video.ts61
34 files changed, 1644 insertions, 634 deletions
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts
index 76ed37aae..1e6e8956c 100644
--- a/server/lib/activitypub/process/process-create.ts
+++ b/server/lib/activitypub/process/process-create.ts
@@ -109,8 +109,10 @@ async function processCreateVideoComment (activity: ActivityCreate, byActor: MAc
109 let video: MVideoAccountLightBlacklistAllFiles 109 let video: MVideoAccountLightBlacklistAllFiles
110 let created: boolean 110 let created: boolean
111 let comment: MCommentOwnerVideo 111 let comment: MCommentOwnerVideo
112
112 try { 113 try {
113 const resolveThreadResult = await resolveThread({ url: commentObject.id, isVideo: false }) 114 const resolveThreadResult = await resolveThread({ url: commentObject.id, isVideo: false })
115 if (!resolveThreadResult) return // Comment not accepted
114 116
115 video = resolveThreadResult.video 117 video = resolveThreadResult.video
116 created = resolveThreadResult.commentCreated 118 created = resolveThreadResult.commentCreated
diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts
index 911c7cd30..b65baf0e9 100644
--- a/server/lib/activitypub/video-comments.ts
+++ b/server/lib/activitypub/video-comments.ts
@@ -4,7 +4,9 @@ import { logger } from '../../helpers/logger'
4import { doJSONRequest } from '../../helpers/requests' 4import { doJSONRequest } from '../../helpers/requests'
5import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers/constants' 5import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers/constants'
6import { VideoCommentModel } from '../../models/video/video-comment' 6import { VideoCommentModel } from '../../models/video/video-comment'
7import { MCommentOwner, MCommentOwnerVideo, MVideoAccountLightBlacklistAllFiles } from '../../types/models/video' 7import { MComment, MCommentOwner, MCommentOwnerVideo, MVideoAccountLightBlacklistAllFiles } from '../../types/models/video'
8import { isRemoteVideoCommentAccepted } from '../moderation'
9import { Hooks } from '../plugins/hooks'
8import { getOrCreateAPActor } from './actors' 10import { getOrCreateAPActor } from './actors'
9import { checkUrlsSameHost } from './url' 11import { checkUrlsSameHost } from './url'
10import { getOrCreateAPVideo } from './videos' 12import { getOrCreateAPVideo } from './videos'
@@ -103,6 +105,10 @@ async function tryToResolveThreadFromVideo (params: ResolveThreadParams) {
103 firstReply.changed('updatedAt', true) 105 firstReply.changed('updatedAt', true)
104 firstReply.Video = video 106 firstReply.Video = video
105 107
108 if (await isRemoteCommentAccepted(firstReply) !== true) {
109 return undefined
110 }
111
106 comments[comments.length - 1] = await firstReply.save() 112 comments[comments.length - 1] = await firstReply.save()
107 113
108 for (let i = comments.length - 2; i >= 0; i--) { 114 for (let i = comments.length - 2; i >= 0; i--) {
@@ -113,6 +119,10 @@ async function tryToResolveThreadFromVideo (params: ResolveThreadParams) {
113 comment.changed('updatedAt', true) 119 comment.changed('updatedAt', true)
114 comment.Video = video 120 comment.Video = video
115 121
122 if (await isRemoteCommentAccepted(comment) !== true) {
123 return undefined
124 }
125
116 comments[i] = await comment.save() 126 comments[i] = await comment.save()
117 } 127 }
118 128
@@ -169,3 +179,26 @@ async function resolveRemoteParentComment (params: ResolveThreadParams) {
169 commentCreated: true 179 commentCreated: true
170 }) 180 })
171} 181}
182
183async function isRemoteCommentAccepted (comment: MComment) {
184 // Already created
185 if (comment.id) return true
186
187 const acceptParameters = {
188 comment
189 }
190
191 const acceptedResult = await Hooks.wrapFun(
192 isRemoteVideoCommentAccepted,
193 acceptParameters,
194 'filter:activity-pub.remote-video-comment.create.accept.result'
195 )
196
197 if (!acceptedResult || acceptedResult.accepted !== true) {
198 logger.info('Refused to create a remote comment.', { acceptedResult, acceptParameters })
199
200 return false
201 }
202
203 return true
204}
diff --git a/server/lib/auth/oauth.ts b/server/lib/auth/oauth.ts
index fa1887315..bc0d4301f 100644
--- a/server/lib/auth/oauth.ts
+++ b/server/lib/auth/oauth.ts
@@ -9,11 +9,23 @@ import OAuth2Server, {
9 UnsupportedGrantTypeError 9 UnsupportedGrantTypeError
10} from '@node-oauth/oauth2-server' 10} from '@node-oauth/oauth2-server'
11import { randomBytesPromise } from '@server/helpers/core-utils' 11import { randomBytesPromise } from '@server/helpers/core-utils'
12import { isOTPValid } from '@server/helpers/otp'
12import { MOAuthClient } from '@server/types/models' 13import { MOAuthClient } from '@server/types/models'
13import { sha1 } from '@shared/extra-utils' 14import { sha1 } from '@shared/extra-utils'
14import { OAUTH_LIFETIME } from '../../initializers/constants' 15import { HttpStatusCode } from '@shared/models'
16import { OAUTH_LIFETIME, OTP } from '../../initializers/constants'
15import { BypassLogin, getClient, getRefreshToken, getUser, revokeToken, saveToken } from './oauth-model' 17import { BypassLogin, getClient, getRefreshToken, getUser, revokeToken, saveToken } from './oauth-model'
16 18
19class MissingTwoFactorError extends Error {
20 code = HttpStatusCode.UNAUTHORIZED_401
21 name = 'missing_two_factor'
22}
23
24class InvalidTwoFactorError extends Error {
25 code = HttpStatusCode.BAD_REQUEST_400
26 name = 'invalid_two_factor'
27}
28
17/** 29/**
18 * 30 *
19 * Reimplement some functions of OAuth2Server to inject external auth methods 31 * Reimplement some functions of OAuth2Server to inject external auth methods
@@ -83,17 +95,15 @@ async function handleOAuthToken (req: express.Request, options: { refreshTokenAu
83 95
84function handleOAuthAuthenticate ( 96function handleOAuthAuthenticate (
85 req: express.Request, 97 req: express.Request,
86 res: express.Response, 98 res: express.Response
87 authenticateInQuery = false
88) { 99) {
89 const options = authenticateInQuery 100 return oAuthServer.authenticate(new Request(req), new Response(res))
90 ? { allowBearerTokensInQueryString: true }
91 : {}
92
93 return oAuthServer.authenticate(new Request(req), new Response(res), options)
94} 101}
95 102
96export { 103export {
104 MissingTwoFactorError,
105 InvalidTwoFactorError,
106
97 handleOAuthToken, 107 handleOAuthToken,
98 handleOAuthAuthenticate 108 handleOAuthAuthenticate
99} 109}
@@ -118,6 +128,16 @@ async function handlePasswordGrant (options: {
118 const user = await getUser(request.body.username, request.body.password, bypassLogin) 128 const user = await getUser(request.body.username, request.body.password, bypassLogin)
119 if (!user) throw new InvalidGrantError('Invalid grant: user credentials are invalid') 129 if (!user) throw new InvalidGrantError('Invalid grant: user credentials are invalid')
120 130
131 if (user.otpSecret) {
132 if (!request.headers[OTP.HEADER_NAME]) {
133 throw new MissingTwoFactorError('Missing two factor header')
134 }
135
136 if (await isOTPValid({ encryptedSecret: user.otpSecret, token: request.headers[OTP.HEADER_NAME] }) !== true) {
137 throw new InvalidTwoFactorError('Invalid two factor header')
138 }
139 }
140
121 const token = await buildToken() 141 const token = await buildToken()
122 142
123 return saveToken(token, client, user, { bypassLogin }) 143 return saveToken(token, client, user, { bypassLogin })
diff --git a/server/lib/hls.ts b/server/lib/hls.ts
index a0a5afc0f..a41f1ae48 100644
--- a/server/lib/hls.ts
+++ b/server/lib/hls.ts
@@ -15,7 +15,7 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, REQUEST_TIMEOUTS } from '../initializers
15import { sequelizeTypescript } from '../initializers/database' 15import { sequelizeTypescript } from '../initializers/database'
16import { VideoFileModel } from '../models/video/video-file' 16import { VideoFileModel } from '../models/video/video-file'
17import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist' 17import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist'
18import { storeHLSFile } from './object-storage' 18import { storeHLSFileFromFilename } from './object-storage'
19import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths' 19import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths'
20import { VideoPathManager } from './video-path-manager' 20import { VideoPathManager } from './video-path-manager'
21 21
@@ -95,7 +95,7 @@ function updateMasterHLSPlaylist (video: MVideo, playlistArg: MStreamingPlaylist
95 await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n') 95 await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n')
96 96
97 if (playlist.storage === VideoStorage.OBJECT_STORAGE) { 97 if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
98 playlist.playlistUrl = await storeHLSFile(playlist, playlist.playlistFilename) 98 playlist.playlistUrl = await storeHLSFileFromFilename(playlist, playlist.playlistFilename)
99 await remove(masterPlaylistPath) 99 await remove(masterPlaylistPath)
100 } 100 }
101 101
@@ -146,7 +146,7 @@ function updateSha256VODSegments (video: MVideo, playlistArg: MStreamingPlaylist
146 await outputJSON(outputPath, json) 146 await outputJSON(outputPath, json)
147 147
148 if (playlist.storage === VideoStorage.OBJECT_STORAGE) { 148 if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
149 playlist.segmentsSha256Url = await storeHLSFile(playlist, playlist.segmentsSha256Filename) 149 playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlist, playlist.segmentsSha256Filename)
150 await remove(outputPath) 150 await remove(outputPath)
151 } 151 }
152 152
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index 03aa414c9..cef93afda 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -1,5 +1,7 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { extractVideo } from '@server/helpers/video'
2import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' 3import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
4import { VideoPathManager } from '@server/lib/video-path-manager'
3import { VideoModel } from '@server/models/video/video' 5import { VideoModel } from '@server/models/video/video'
4import { VideoFileModel } from '@server/models/video/video-file' 6import { VideoFileModel } from '@server/models/video/video-file'
5import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 7import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
@@ -30,17 +32,23 @@ async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'c
30 32
31 if (!video || !file) return 33 if (!video || !file) return
32 34
33 await createTorrentAndSetInfoHash(video, file) 35 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
34 36
35 // Refresh videoFile because the createTorrentAndSetInfoHash could be long 37 try {
36 const refreshedFile = await VideoFileModel.loadWithVideo(file.id) 38 await createTorrentAndSetInfoHash(video, file)
37 // File does not exist anymore, remove the generated torrent
38 if (!refreshedFile) return file.removeTorrent()
39 39
40 refreshedFile.infoHash = file.infoHash 40 // Refresh videoFile because the createTorrentAndSetInfoHash could be long
41 refreshedFile.torrentFilename = file.torrentFilename 41 const refreshedFile = await VideoFileModel.loadWithVideo(file.id)
42 // File does not exist anymore, remove the generated torrent
43 if (!refreshedFile) return file.removeTorrent()
42 44
43 return refreshedFile.save() 45 refreshedFile.infoHash = file.infoHash
46 refreshedFile.torrentFilename = file.torrentFilename
47
48 await refreshedFile.save()
49 } finally {
50 fileMutexReleaser()
51 }
44} 52}
45 53
46async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { 54async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) {
@@ -52,9 +60,16 @@ async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { ac
52 60
53 if ((!video && !streamingPlaylist) || !file) return 61 if ((!video && !streamingPlaylist) || !file) return
54 62
55 await updateTorrentMetadata(video || streamingPlaylist, file) 63 const extractedVideo = extractVideo(video || streamingPlaylist)
64 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(extractedVideo.uuid)
56 65
57 await file.save() 66 try {
67 await updateTorrentMetadata(video || streamingPlaylist, file)
68
69 await file.save()
70 } finally {
71 fileMutexReleaser()
72 }
58} 73}
59 74
60async function loadVideoOrLog (videoId: number) { 75async function loadVideoOrLog (videoId: number) {
@@ -82,7 +97,7 @@ async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) {
82async function loadFileOrLog (videoFileId: number) { 97async function loadFileOrLog (videoFileId: number) {
83 if (!videoFileId) return undefined 98 if (!videoFileId) return undefined
84 99
85 const file = await VideoFileModel.loadWithVideo(videoFileId) 100 const file = await VideoFileModel.load(videoFileId)
86 101
87 if (!file) { 102 if (!file) {
88 logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId) 103 logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId)
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 25bdebeea..a1530cc57 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -3,10 +3,10 @@ import { remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { updateTorrentMetadata } from '@server/helpers/webtorrent' 5import { updateTorrentMetadata } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config'
7import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' 6import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
8import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' 7import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
9import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' 8import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
9import { VideoPathManager } from '@server/lib/video-path-manager'
10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' 10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
11import { VideoModel } from '@server/models/video/video' 11import { VideoModel } from '@server/models/video/video'
12import { VideoJobInfoModel } from '@server/models/video/video-job-info' 12import { VideoJobInfoModel } from '@server/models/video/video-job-info'
@@ -28,6 +28,8 @@ export async function processMoveToObjectStorage (job: Job) {
28 28
29 const lTags = lTagsBase(video.uuid, video.url) 29 const lTags = lTagsBase(video.uuid, video.url)
30 30
31 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
32
31 try { 33 try {
32 if (video.VideoFiles) { 34 if (video.VideoFiles) {
33 logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags) 35 logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags)
@@ -49,6 +51,10 @@ export async function processMoveToObjectStorage (job: Job) {
49 } 51 }
50 } catch (err) { 52 } catch (err) {
51 await onMoveToObjectStorageFailure(job, err) 53 await onMoveToObjectStorageFailure(job, err)
54
55 throw err
56 } finally {
57 fileMutexReleaser()
52 } 58 }
53 59
54 return payload.videoUUID 60 return payload.videoUUID
@@ -72,9 +78,9 @@ async function moveWebTorrentFiles (video: MVideoWithAllFiles) {
72 for (const file of video.VideoFiles) { 78 for (const file of video.VideoFiles) {
73 if (file.storage !== VideoStorage.FILE_SYSTEM) continue 79 if (file.storage !== VideoStorage.FILE_SYSTEM) continue
74 80
75 const fileUrl = await storeWebTorrentFile(file.filename) 81 const fileUrl = await storeWebTorrentFile(video, file)
76 82
77 const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename) 83 const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file)
78 await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath }) 84 await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath })
79 } 85 }
80} 86}
@@ -88,10 +94,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
88 94
89 // Resolution playlist 95 // Resolution playlist
90 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) 96 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
91 await storeHLSFile(playlistWithVideo, playlistFilename) 97 await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
92 98
93 // Resolution fragmented file 99 // Resolution fragmented file
94 const fileUrl = await storeHLSFile(playlistWithVideo, file.filename) 100 const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
95 101
96 const oldPath = join(getHLSDirectory(video), file.filename) 102 const oldPath = join(getHLSDirectory(video), file.filename)
97 103
@@ -113,9 +119,9 @@ async function doAfterLastJob (options: {
113 const playlistWithVideo = playlist.withVideo(video) 119 const playlistWithVideo = playlist.withVideo(video)
114 120
115 // Master playlist 121 // Master playlist
116 playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename) 122 playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
117 // Sha256 segments file 123 // Sha256 segments file
118 playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename) 124 playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
119 125
120 playlist.storage = VideoStorage.OBJECT_STORAGE 126 playlist.storage = VideoStorage.OBJECT_STORAGE
121 127
diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts
index 600292844..035f88e96 100644
--- a/server/lib/job-queue/handlers/video-channel-import.ts
+++ b/server/lib/job-queue/handlers/video-channel-import.ts
@@ -5,7 +5,7 @@ import { synchronizeChannel } from '@server/lib/sync-channel'
5import { VideoChannelModel } from '@server/models/video/video-channel' 5import { VideoChannelModel } from '@server/models/video/video-channel'
6import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' 6import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync'
7import { MChannelSync } from '@server/types/models' 7import { MChannelSync } from '@server/types/models'
8import { VideoChannelImportPayload, VideoChannelSyncState } from '@shared/models' 8import { VideoChannelImportPayload } from '@shared/models'
9 9
10export async function processVideoChannelImport (job: Job) { 10export async function processVideoChannelImport (job: Job) {
11 const payload = job.data as VideoChannelImportPayload 11 const payload = job.data as VideoChannelImportPayload
@@ -32,17 +32,12 @@ export async function processVideoChannelImport (job: Job) {
32 32
33 const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) 33 const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId)
34 34
35 try { 35 logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `)
36 logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) 36
37 37 await synchronizeChannel({
38 await synchronizeChannel({ 38 channel: videoChannel,
39 channel: videoChannel, 39 externalChannelUrl: payload.externalChannelUrl,
40 externalChannelUrl: payload.externalChannelUrl, 40 channelSync,
41 channelSync 41 videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.FULL_SYNC_VIDEOS_LIMIT
42 }) 42 })
43 } catch (err) {
44 logger.error(`Failed to import channel ${videoChannel.name}`, { err })
45 channelSync.state = VideoChannelSyncState.FAILED
46 await channelSync.save()
47 }
48} 43}
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 9901b878c..83d582cb4 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -12,7 +12,8 @@ import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@serv
12import { VideoPathManager } from '@server/lib/video-path-manager' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 13import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
15import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 15import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
16import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
16import { getLowercaseExtension } from '@shared/core-utils' 17import { getLowercaseExtension } from '@shared/core-utils'
17import { isAudioFile } from '@shared/extra-utils' 18import { isAudioFile } from '@shared/extra-utils'
18import { 19import {
@@ -36,7 +37,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
36import { VideoModel } from '../../../models/video/video' 37import { VideoModel } from '../../../models/video/video'
37import { VideoFileModel } from '../../../models/video/video-file' 38import { VideoFileModel } from '../../../models/video/video-file'
38import { VideoImportModel } from '../../../models/video/video-import' 39import { VideoImportModel } from '../../../models/video/video-import'
39import { MThumbnail } from '../../../types/models/video/thumbnail'
40import { federateVideoIfNeeded } from '../../activitypub/videos' 40import { federateVideoIfNeeded } from '../../activitypub/videos'
41import { Notifier } from '../../notifier' 41import { Notifier } from '../../notifier'
42import { generateVideoMiniature } from '../../thumbnail' 42import { generateVideoMiniature } from '../../thumbnail'
@@ -178,125 +178,159 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
178 } 178 }
179 179
180 // Video is accepted, resuming preparation 180 // Video is accepted, resuming preparation
181 const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) 181 const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid)
182 // To clean files if the import fails
183 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles })
184
185 // Move file
186 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
187 await move(tempVideoPath, videoDestFile)
188 tempVideoPath = null // This path is not used anymore
189
190 // Generate miniature if the import did not created it
191 let thumbnailModel: MThumbnail
192 let thumbnailSave: object
193 if (!videoImportWithFiles.Video.getMiniature()) {
194 thumbnailModel = await generateVideoMiniature({
195 video: videoImportWithFiles.Video,
196 videoFile,
197 type: ThumbnailType.MINIATURE
198 })
199 thumbnailSave = thumbnailModel.toJSON()
200 }
201 182
202 // Generate preview if the import did not created it 183 try {
203 let previewModel: MThumbnail 184 const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile)
204 let previewSave: object
205 if (!videoImportWithFiles.Video.getPreview()) {
206 previewModel = await generateVideoMiniature({
207 video: videoImportWithFiles.Video,
208 videoFile,
209 type: ThumbnailType.PREVIEW
210 })
211 previewSave = previewModel.toJSON()
212 }
213 185
214 // Create torrent 186 // Move file
215 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) 187 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
188 await move(tempVideoPath, videoDestFile)
216 189
217 const videoFileSave = videoFile.toJSON() 190 tempVideoPath = null // This path is not used anymore
218 191
219 const { videoImportUpdated, video } = await retryTransactionWrapper(() => { 192 let {
220 return sequelizeTypescript.transaction(async t => { 193 miniatureModel: thumbnailModel,
221 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo 194 miniatureJSONSave: thumbnailSave
195 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE)
222 196
223 // Refresh video 197 let {
224 const video = await VideoModel.load(videoImportToUpdate.videoId, t) 198 miniatureModel: previewModel,
225 if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') 199 miniatureJSONSave: previewSave
200 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW)
226 201
227 const videoFileCreated = await videoFile.save({ transaction: t }) 202 // Create torrent
203 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile)
228 204
229 // Update video DB object 205 const videoFileSave = videoFile.toJSON()
230 video.duration = duration
231 video.state = buildNextVideoState(video.state)
232 await video.save({ transaction: t })
233 206
234 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) 207 const { videoImportUpdated, video } = await retryTransactionWrapper(() => {
235 if (previewModel) await video.addAndSaveThumbnail(previewModel, t) 208 return sequelizeTypescript.transaction(async t => {
209 // Refresh video
210 const video = await VideoModel.load(videoImportWithFiles.videoId, t)
211 if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.')
236 212
237 // Now we can federate the video (reload from database, we need more attributes) 213 await videoFile.save({ transaction: t })
238 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
239 await federateVideoIfNeeded(videoForFederation, true, t)
240 214
241 // Update video import object 215 // Update video DB object
242 videoImportToUpdate.state = VideoImportState.SUCCESS 216 video.duration = duration
243 const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo 217 video.state = buildNextVideoState(video.state)
244 videoImportUpdated.Video = video 218 await video.save({ transaction: t })
245 219
246 videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) 220 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
221 if (previewModel) await video.addAndSaveThumbnail(previewModel, t)
247 222
248 logger.info('Video %s imported.', video.uuid) 223 // Now we can federate the video (reload from database, we need more attributes)
224 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
225 await federateVideoIfNeeded(videoForFederation, true, t)
249 226
250 return { videoImportUpdated, video: videoForFederation } 227 // Update video import object
251 }).catch(err => { 228 videoImportWithFiles.state = VideoImportState.SUCCESS
252 // Reset fields 229 const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport
253 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
254 if (previewModel) previewModel = new ThumbnailModel(previewSave)
255 230
256 videoFile = new VideoFileModel(videoFileSave) 231 logger.info('Video %s imported.', video.uuid)
257 232
258 throw err 233 return { videoImportUpdated, video: videoForFederation }
259 }) 234 }).catch(err => {
260 }) 235 // Reset fields
236 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
237 if (previewModel) previewModel = new ThumbnailModel(previewSave)
261 238
262 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: videoImportUpdated, success: true }) 239 videoFile = new VideoFileModel(videoFileSave)
263 240
264 if (video.isBlacklisted()) { 241 throw err
265 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) 242 })
243 })
266 244
267 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) 245 await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User })
268 } else { 246 } finally {
269 Notifier.Instance.notifyOnNewVideoIfNeeded(video) 247 videoFileLockReleaser()
270 } 248 }
249 } catch (err) {
250 await onImportError(err, tempVideoPath, videoImport)
271 251
272 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 252 throw err
273 await JobQueue.Instance.createJob( 253 }
274 await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) 254}
275 )
276 }
277 255
278 // Create transcoding jobs? 256async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> {
279 if (video.state === VideoState.TO_TRANSCODE) { 257 // Refresh video, privacy may have changed
280 await JobQueue.Instance.createJob( 258 const video = await videoImport.Video.reload()
281 await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) 259 const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] })
282 )
283 }
284 260
285 } catch (err) { 261 return Object.assign(videoImport, { Video: videoWithFiles })
286 try { 262}
287 if (tempVideoPath) await remove(tempVideoPath)
288 } catch (errUnlink) {
289 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
290 }
291 263
292 videoImport.error = err.message 264async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) {
293 if (videoImport.state !== VideoImportState.REJECTED) { 265 // Generate miniature if the import did not created it
294 videoImport.state = VideoImportState.FAILED 266 const needsMiniature = thumbnailType === ThumbnailType.MINIATURE
267 ? !videoImportWithFiles.Video.getMiniature()
268 : !videoImportWithFiles.Video.getPreview()
269
270 if (!needsMiniature) {
271 return {
272 miniatureModel: null,
273 miniatureJSONSave: null
295 } 274 }
296 await videoImport.save() 275 }
297 276
298 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) 277 const miniatureModel = await generateVideoMiniature({
278 video: videoImportWithFiles.Video,
279 videoFile,
280 type: thumbnailType
281 })
282 const miniatureJSONSave = miniatureModel.toJSON()
299 283
300 throw err 284 return {
285 miniatureModel,
286 miniatureJSONSave
287 }
288}
289
290async function afterImportSuccess (options: {
291 videoImport: MVideoImport
292 video: MVideoFullLight
293 videoFile: MVideoFile
294 user: MUserId
295}) {
296 const { video, videoFile, videoImport, user } = options
297
298 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true })
299
300 if (video.isBlacklisted()) {
301 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video })
302
303 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist)
304 } else {
305 Notifier.Instance.notifyOnNewVideoIfNeeded(video)
301 } 306 }
307
308 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
309 await JobQueue.Instance.createJob(
310 await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT })
311 )
312 return
313 }
314
315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs?
316 await JobQueue.Instance.createJob(
317 await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
318 )
319 }
320}
321
322async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) {
323 try {
324 if (tempVideoPath) await remove(tempVideoPath)
325 } catch (errUnlink) {
326 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
327 }
328
329 videoImport.error = err.message
330 if (videoImport.state !== VideoImportState.REJECTED) {
331 videoImport.state = VideoImportState.FAILED
332 }
333 await videoImport.save()
334
335 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false })
302} 336}
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 8a3ee09a2..c6263f55a 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -4,7 +4,7 @@ import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' 4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' 7import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' 8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
9import { generateVideoMiniature } from '@server/lib/thumbnail' 9import { generateVideoMiniature } from '@server/lib/thumbnail'
10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' 10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
@@ -18,6 +18,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin
18import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' 18import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
19import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' 19import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
20import { logger, loggerTagsFactory } from '../../../helpers/logger' 20import { logger, loggerTagsFactory } from '../../../helpers/logger'
21import { VideoPathManager } from '@server/lib/video-path-manager'
21 22
22const lTags = loggerTagsFactory('live', 'job') 23const lTags = loggerTagsFactory('live', 'job')
23 24
@@ -34,13 +35,13 @@ async function processVideoLiveEnding (job: Job) {
34 const live = await VideoLiveModel.loadByVideoId(payload.videoId) 35 const live = await VideoLiveModel.loadByVideoId(payload.videoId)
35 const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId) 36 const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId)
36 37
37 const permanentLive = live.permanentLive
38
39 if (!video || !live || !liveSession) { 38 if (!video || !live || !liveSession) {
40 logError() 39 logError()
41 return 40 return
42 } 41 }
43 42
43 const permanentLive = live.permanentLive
44
44 liveSession.endingProcessed = true 45 liveSession.endingProcessed = true
45 await liveSession.save() 46 await liveSession.save()
46 47
@@ -141,23 +142,22 @@ async function replaceLiveByReplay (options: {
141}) { 142}) {
142 const { video, liveSession, live, permanentLive, replayDirectory } = options 143 const { video, liveSession, live, permanentLive, replayDirectory } = options
143 144
144 await cleanupTMPLiveFiles(video) 145 const videoWithFiles = await VideoModel.loadFull(video.id)
146 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
147
148 await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist)
145 149
146 await live.destroy() 150 await live.destroy()
147 151
148 video.isLive = false 152 videoWithFiles.isLive = false
149 video.waitTranscoding = true 153 videoWithFiles.waitTranscoding = true
150 video.state = VideoState.TO_TRANSCODE 154 videoWithFiles.state = VideoState.TO_TRANSCODE
151 155
152 await video.save() 156 await videoWithFiles.save()
153 157
154 liveSession.replayVideoId = video.id 158 liveSession.replayVideoId = videoWithFiles.id
155 await liveSession.save() 159 await liveSession.save()
156 160
157 // Remove old HLS playlist video files
158 const videoWithFiles = await VideoModel.loadFull(video.id)
159
160 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
161 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) 161 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
162 162
163 // Reset playlist 163 // Reset playlist
@@ -206,18 +206,27 @@ async function assignReplayFilesToVideo (options: {
206 const concatenatedTsFiles = await readdir(replayDirectory) 206 const concatenatedTsFiles = await readdir(replayDirectory)
207 207
208 for (const concatenatedTsFile of concatenatedTsFiles) { 208 for (const concatenatedTsFile of concatenatedTsFiles) {
209 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
210
209 const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) 211 const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile)
210 212
211 const probe = await ffprobePromise(concatenatedTsFilePath) 213 const probe = await ffprobePromise(concatenatedTsFilePath)
212 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) 214 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe)
213 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) 215 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
214 216
215 await generateHlsPlaylistResolutionFromTS({ 217 try {
216 video, 218 await generateHlsPlaylistResolutionFromTS({
217 concatenatedTsFilePath, 219 video,
218 resolution, 220 inputFileMutexReleaser,
219 isAAC: audioStream?.codec_name === 'aac' 221 concatenatedTsFilePath,
220 }) 222 resolution,
223 isAAC: audioStream?.codec_name === 'aac'
224 })
225 } catch (err) {
226 logger.error('Cannot generate HLS playlist resolution from TS files.', { err })
227 }
228
229 inputFileMutexReleaser()
221 } 230 }
222 231
223 return video 232 return video
@@ -234,7 +243,7 @@ async function cleanupLiveAndFederate (options: {
234 243
235 if (streamingPlaylist) { 244 if (streamingPlaylist) {
236 if (permanentLive) { 245 if (permanentLive) {
237 await cleanupPermanentLive(video, streamingPlaylist) 246 await cleanupAndDestroyPermanentLive(video, streamingPlaylist)
238 } else { 247 } else {
239 await cleanupUnsavedNormalLive(video, streamingPlaylist) 248 await cleanupUnsavedNormalLive(video, streamingPlaylist)
240 } 249 }
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index b0e92acf7..3e6d23363 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -13,7 +13,6 @@ import {
13 MergeAudioTranscodingPayload, 13 MergeAudioTranscodingPayload,
14 NewWebTorrentResolutionTranscodingPayload, 14 NewWebTorrentResolutionTranscodingPayload,
15 OptimizeTranscodingPayload, 15 OptimizeTranscodingPayload,
16 VideoResolution,
17 VideoTranscodingPayload 16 VideoTranscodingPayload
18} from '@shared/models' 17} from '@shared/models'
19import { retryTransactionWrapper } from '../../../helpers/database-utils' 18import { retryTransactionWrapper } from '../../../helpers/database-utils'
@@ -94,15 +93,24 @@ async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MV
94 93
95 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() 94 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
96 95
97 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { 96 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
98 return generateHlsPlaylistResolution({ 97
99 video, 98 try {
100 videoInputPath, 99 await videoFileInput.getVideo().reload()
101 resolution: payload.resolution, 100
102 copyCodecs: payload.copyCodecs, 101 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
103 job 102 return generateHlsPlaylistResolution({
103 video,
104 videoInputPath,
105 inputFileMutexReleaser,
106 resolution: payload.resolution,
107 copyCodecs: payload.copyCodecs,
108 job
109 })
104 }) 110 })
105 }) 111 } finally {
112 inputFileMutexReleaser()
113 }
106 114
107 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) 115 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
108 116
@@ -177,38 +185,44 @@ async function onVideoFirstWebTorrentTranscoding (
177 transcodeType: TranscodeVODOptionsType, 185 transcodeType: TranscodeVODOptionsType,
178 user: MUserId 186 user: MUserId
179) { 187) {
180 const { resolution, audioStream } = await videoArg.probeMaxQualityFile() 188 const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid)
181 189
182 // Maybe the video changed in database, refresh it 190 try {
183 const videoDatabase = await VideoModel.loadFull(videoArg.uuid) 191 // Maybe the video changed in database, refresh it
184 // Video does not exist anymore 192 const videoDatabase = await VideoModel.loadFull(videoArg.uuid)
185 if (!videoDatabase) return undefined 193 // Video does not exist anymore
186 194 if (!videoDatabase) return undefined
187 // Generate HLS version of the original file 195
188 const originalFileHLSPayload = { 196 const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile()
189 ...payload, 197
190 198 // Generate HLS version of the original file
191 hasAudio: !!audioStream, 199 const originalFileHLSPayload = {
192 resolution: videoDatabase.getMaxQualityFile().resolution, 200 ...payload,
193 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 201
194 copyCodecs: transcodeType !== 'quick-transcode', 202 hasAudio: !!audioStream,
195 isMaxQuality: true 203 resolution: videoDatabase.getMaxQualityFile().resolution,
196 } 204 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
197 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) 205 copyCodecs: transcodeType !== 'quick-transcode',
198 const hasNewResolutions = await createLowerResolutionsJobs({ 206 isMaxQuality: true
199 video: videoDatabase, 207 }
200 user, 208 const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
201 videoFileResolution: resolution, 209 const hasNewResolutions = await createLowerResolutionsJobs({
202 hasAudio: !!audioStream, 210 video: videoDatabase,
203 type: 'webtorrent', 211 user,
204 isNewVideo: payload.isNewVideo ?? true 212 videoFileResolution: resolution,
205 }) 213 hasAudio: !!audioStream,
206 214 type: 'webtorrent',
207 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') 215 isNewVideo: payload.isNewVideo ?? true
208 216 })
209 // Move to next state if there are no other resolutions to generate 217
210 if (!hasHls && !hasNewResolutions) { 218 await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
211 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) 219
220 // Move to next state if there are no other resolutions to generate
221 if (!hasHls && !hasNewResolutions) {
222 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo })
223 }
224 } finally {
225 mutexReleaser()
212 } 226 }
213} 227}
214 228
@@ -266,7 +280,7 @@ async function createLowerResolutionsJobs (options: {
266 280
267 // Create transcoding jobs if there are enabled resolutions 281 // Create transcoding jobs if there are enabled resolutions
268 const resolutionsEnabled = await Hooks.wrapObject( 282 const resolutionsEnabled = await Hooks.wrapObject(
269 computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true }), 283 computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }),
270 'filter:transcoding.auto.resolutions-to-transcode.result', 284 'filter:transcoding.auto.resolutions-to-transcode.result',
271 options 285 options
272 ) 286 )
@@ -274,8 +288,6 @@ async function createLowerResolutionsJobs (options: {
274 const resolutionCreated: string[] = [] 288 const resolutionCreated: string[] = []
275 289
276 for (const resolution of resolutionsEnabled) { 290 for (const resolution of resolutionsEnabled) {
277 if (resolution === VideoResolution.H_NOVIDEO && hasAudio === false) continue
278
279 let dataInput: VideoTranscodingPayload 291 let dataInput: VideoTranscodingPayload
280 292
281 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { 293 if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 16715862b..5e459f3c3 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' 22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
23import { pick, wait } from '@shared/core-utils' 23import { pick, wait } from '@shared/core-utils'
24import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' 24import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models'
25import { federateVideoIfNeeded } from '../activitypub/videos' 25import { federateVideoIfNeeded } from '../activitypub/videos'
26import { JobQueue } from '../job-queue' 26import { JobQueue } from '../job-queue'
27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' 27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
28import { PeerTubeSocket } from '../peertube-socket' 28import { PeerTubeSocket } from '../peertube-socket'
29import { Hooks } from '../plugins/hooks' 29import { Hooks } from '../plugins/hooks'
30import { LiveQuotaStore } from './live-quota-store' 30import { LiveQuotaStore } from './live-quota-store'
31import { cleanupPermanentLive } from './live-utils' 31import { cleanupAndDestroyPermanentLive } from './live-utils'
32import { MuxingSession } from './shared' 32import { MuxingSession } from './shared'
33 33
34const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') 34const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
@@ -224,7 +224,7 @@ class LiveManager {
224 if (oldStreamingPlaylist) { 224 if (oldStreamingPlaylist) {
225 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) 225 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
226 226
227 await cleanupPermanentLive(video, oldStreamingPlaylist) 227 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
228 } 228 }
229 229
230 this.videoSessions.set(video.id, sessionId) 230 this.videoSessions.set(video.id, sessionId)
@@ -245,7 +245,7 @@ class LiveManager {
245 ) 245 )
246 246
247 const allResolutions = await Hooks.wrapObject( 247 const allResolutions = await Hooks.wrapObject(
248 this.buildAllResolutionsToTranscode(resolution), 248 this.buildAllResolutionsToTranscode(resolution, hasAudio),
249 'filter:transcoding.auto.resolutions-to-transcode.result', 249 'filter:transcoding.auto.resolutions-to-transcode.result',
250 { video } 250 { video }
251 ) 251 )
@@ -301,7 +301,7 @@ class LiveManager {
301 ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) 301 ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
302 }) 302 })
303 303
304 muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) 304 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
305 305
306 muxingSession.on('bad-socket-health', ({ videoId }) => { 306 muxingSession.on('bad-socket-health', ({ videoId }) => {
307 logger.error( 307 logger.error(
@@ -460,11 +460,11 @@ class LiveManager {
460 return join(directory, files.sort().reverse()[0]) 460 return join(directory, files.sort().reverse()[0])
461 } 461 }
462 462
463 private buildAllResolutionsToTranscode (originResolution: number) { 463 private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) {
464 const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION 464 const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
465 465
466 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED 466 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
467 ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false }) 467 ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio })
468 : [] 468 : []
469 469
470 if (resolutionsEnabled.length === 0) { 470 if (resolutionsEnabled.length === 0) {
@@ -485,6 +485,10 @@ class LiveManager {
485 485
486 playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) 486 playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions)
487 487
488 playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
489 ? VideoStorage.OBJECT_STORAGE
490 : VideoStorage.FILE_SYSTEM
491
488 return playlist.save() 492 return playlist.save()
489 } 493 }
490 494
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts
index 4af6f3ebf..4d03754a9 100644
--- a/server/lib/live/live-segment-sha-store.ts
+++ b/server/lib/live/live-segment-sha-store.ts
@@ -1,61 +1,79 @@
1import { writeJson } from 'fs-extra'
1import { basename } from 'path' 2import { basename } from 'path'
3import { mapToJSON } from '@server/helpers/core-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { MStreamingPlaylistVideo } from '@server/types/models'
3import { buildSha256Segment } from '../hls' 6import { buildSha256Segment } from '../hls'
7import { storeHLSFileFromPath } from '../object-storage'
8import PQueue from 'p-queue'
4 9
5const lTags = loggerTagsFactory('live') 10const lTags = loggerTagsFactory('live')
6 11
7class LiveSegmentShaStore { 12class LiveSegmentShaStore {
8 13
9 private static instance: LiveSegmentShaStore 14 private readonly segmentsSha256 = new Map<string, string>()
10 15
11 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 16 private readonly videoUUID: string
12 17 private readonly sha256Path: string
13 private constructor () { 18 private readonly streamingPlaylist: MStreamingPlaylistVideo
14 } 19 private readonly sendToObjectStorage: boolean
15 20 private readonly writeQueue = new PQueue({ concurrency: 1 })
16 getSegmentsSha256 (videoUUID: string) { 21
17 return this.segmentsSha256.get(videoUUID) 22 constructor (options: {
23 videoUUID: string
24 sha256Path: string
25 streamingPlaylist: MStreamingPlaylistVideo
26 sendToObjectStorage: boolean
27 }) {
28 this.videoUUID = options.videoUUID
29 this.sha256Path = options.sha256Path
30 this.streamingPlaylist = options.streamingPlaylist
31 this.sendToObjectStorage = options.sendToObjectStorage
18 } 32 }
19 33
20 async addSegmentSha (videoUUID: string, segmentPath: string) { 34 async addSegmentSha (segmentPath: string) {
21 const segmentName = basename(segmentPath) 35 logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
22 logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
23 36
24 const shaResult = await buildSha256Segment(segmentPath) 37 const shaResult = await buildSha256Segment(segmentPath)
25 38
26 if (!this.segmentsSha256.has(videoUUID)) { 39 const segmentName = basename(segmentPath)
27 this.segmentsSha256.set(videoUUID, new Map()) 40 this.segmentsSha256.set(segmentName, shaResult)
28 }
29 41
30 const filesMap = this.segmentsSha256.get(videoUUID) 42 try {
31 filesMap.set(segmentName, shaResult) 43 await this.writeToDisk()
44 } catch (err) {
45 logger.error('Cannot write sha segments to disk.', { err })
46 }
32 } 47 }
33 48
34 removeSegmentSha (videoUUID: string, segmentPath: string) { 49 async removeSegmentSha (segmentPath: string) {
35 const segmentName = basename(segmentPath) 50 const segmentName = basename(segmentPath)
36 51
37 logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) 52 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
38 53
39 const filesMap = this.segmentsSha256.get(videoUUID) 54 if (!this.segmentsSha256.has(segmentName)) {
40 if (!filesMap) { 55 logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID))
41 logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
42 return 56 return
43 } 57 }
44 58
45 if (!filesMap.has(segmentName)) { 59 this.segmentsSha256.delete(segmentName)
46 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
47 return
48 }
49 60
50 filesMap.delete(segmentName) 61 await this.writeToDisk()
51 } 62 }
52 63
53 cleanupShaSegments (videoUUID: string) { 64 private writeToDisk () {
54 this.segmentsSha256.delete(videoUUID) 65 return this.writeQueue.add(async () => {
55 } 66 await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256))
67
68 if (this.sendToObjectStorage) {
69 const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
56 70
57 static get Instance () { 71 if (this.streamingPlaylist.segmentsSha256Url !== url) {
58 return this.instance || (this.instance = new this()) 72 this.streamingPlaylist.segmentsSha256Url = url
73 await this.streamingPlaylist.save()
74 }
75 }
76 })
59 } 77 }
60} 78}
61 79
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index bba876642..c0dec9829 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -1,9 +1,10 @@
1import { pathExists, readdir, remove } from 'fs-extra' 1import { pathExists, readdir, remove } from 'fs-extra'
2import { basename, join } from 'path' 2import { basename, join } from 'path'
3import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
4import { MStreamingPlaylist, MVideo } from '@server/types/models' 4import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
5import { VideoStorage } from '@shared/models'
6import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage'
5import { getLiveDirectory } from '../paths' 7import { getLiveDirectory } from '../paths'
6import { LiveSegmentShaStore } from './live-segment-sha-store'
7 8
8function buildConcatenatedName (segmentOrPlaylistPath: string) { 9function buildConcatenatedName (segmentOrPlaylistPath: string) {
9 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) 10 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
@@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
11 return 'concat-' + num[1] + '.ts' 12 return 'concat-' + num[1] + '.ts'
12} 13}
13 14
14async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 15async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
15 await cleanupTMPLiveFiles(video) 16 await cleanupTMPLiveFiles(video, streamingPlaylist)
16 17
17 await streamingPlaylist.destroy() 18 await streamingPlaylist.destroy()
18} 19}
@@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin
20async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 21async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
21 const hlsDirectory = getLiveDirectory(video) 22 const hlsDirectory = getLiveDirectory(video)
22 23
24 // We uploaded files to object storage too, remove them
25 if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
26 await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
27 }
28
23 await remove(hlsDirectory) 29 await remove(hlsDirectory)
24 30
25 await streamingPlaylist.destroy() 31 await streamingPlaylist.destroy()
32}
26 33
27 LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) 34async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
35 await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
36
37 await cleanupTMPLiveFilesFromFilesystem(video)
28} 38}
29 39
30async function cleanupTMPLiveFiles (video: MVideo) { 40export {
31 const hlsDirectory = getLiveDirectory(video) 41 cleanupAndDestroyPermanentLive,
42 cleanupUnsavedNormalLive,
43 cleanupTMPLiveFiles,
44 buildConcatenatedName
45}
46
47// ---------------------------------------------------------------------------
32 48
33 LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) 49function isTMPLiveFile (name: string) {
50 return name.endsWith('.ts') ||
51 name.endsWith('.m3u8') ||
52 name.endsWith('.json') ||
53 name.endsWith('.mpd') ||
54 name.endsWith('.m4s') ||
55 name.endsWith('.tmp')
56}
57
58async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
59 const hlsDirectory = getLiveDirectory(video)
34 60
35 if (!await pathExists(hlsDirectory)) return 61 if (!await pathExists(hlsDirectory)) return
36 62
37 logger.info('Cleanup TMP live files of %s.', hlsDirectory) 63 logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
38 64
39 const files = await readdir(hlsDirectory) 65 const files = await readdir(hlsDirectory)
40 66
41 for (const filename of files) { 67 for (const filename of files) {
42 if ( 68 if (isTMPLiveFile(filename)) {
43 filename.endsWith('.ts') ||
44 filename.endsWith('.m3u8') ||
45 filename.endsWith('.mpd') ||
46 filename.endsWith('.m4s') ||
47 filename.endsWith('.tmp')
48 ) {
49 const p = join(hlsDirectory, filename) 69 const p = join(hlsDirectory, filename)
50 70
51 remove(p) 71 remove(p)
@@ -54,9 +74,16 @@ async function cleanupTMPLiveFiles (video: MVideo) {
54 } 74 }
55} 75}
56 76
57export { 77async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
58 cleanupPermanentLive, 78 if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
59 cleanupUnsavedNormalLive, 79
60 cleanupTMPLiveFiles, 80 logger.info('Cleanup TMP live files from object storage for %s.', streamingPlaylist.Video.uuid)
61 buildConcatenatedName 81
82 const keys = await listHLSFileKeysOf(streamingPlaylist)
83
84 for (const key of keys) {
85 if (isTMPLiveFile(key)) {
86 await removeHLSFileObjectStorageByFullKey(key)
87 }
88 }
62} 89}
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 505717dce..6ec126955 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -3,14 +3,17 @@ import { mapSeries } from 'bluebird'
3import { FSWatcher, watch } from 'chokidar' 3import { FSWatcher, watch } from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 5import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
6import PQueue from 'p-queue'
6import { basename, join } from 'path' 7import { basename, join } from 'path'
7import { EventEmitter } from 'stream' 8import { EventEmitter } from 'stream'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' 9import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 10import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 11import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' 12import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
13import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file' 14import { VideoFileModel } from '@server/models/video/video-file'
13import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 15import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
16import { VideoStorage } from '@shared/models'
14import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' 17import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
15import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' 18import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
16import { isAbleToUploadVideo } from '../../user' 19import { isAbleToUploadVideo } from '../../user'
@@ -19,9 +22,8 @@ import { LiveSegmentShaStore } from '../live-segment-sha-store'
19import { buildConcatenatedName } from '../live-utils' 22import { buildConcatenatedName } from '../live-utils'
20 23
21import memoizee = require('memoizee') 24import memoizee = require('memoizee')
22
23interface MuxingSessionEvents { 25interface MuxingSessionEvents {
24 'master-playlist-created': (options: { videoId: number }) => void 26 'live-ready': (options: { videoId: number }) => void
25 27
26 'bad-socket-health': (options: { videoId: number }) => void 28 'bad-socket-health': (options: { videoId: number }) => void
27 'duration-exceeded': (options: { videoId: number }) => void 29 'duration-exceeded': (options: { videoId: number }) => void
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter {
68 private readonly outDirectory: string 70 private readonly outDirectory: string
69 private readonly replayDirectory: string 71 private readonly replayDirectory: string
70 72
73 private readonly liveSegmentShaStore: LiveSegmentShaStore
74
71 private readonly lTags: LoggerTagsFn 75 private readonly lTags: LoggerTagsFn
72 76
73 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 77 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
74 78
75 private tsWatcher: FSWatcher 79 private tsWatcher: FSWatcher
76 private masterWatcher: FSWatcher 80 private masterWatcher: FSWatcher
81 private m3u8Watcher: FSWatcher
82
83 private masterPlaylistCreated = false
84 private liveReady = false
77 85
78 private aborted = false 86 private aborted = false
79 87
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter {
123 this.outDirectory = getLiveDirectory(this.videoLive.Video) 131 this.outDirectory = getLiveDirectory(this.videoLive.Video)
124 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) 132 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
125 133
134 this.liveSegmentShaStore = new LiveSegmentShaStore({
135 videoUUID: this.videoLive.Video.uuid,
136 sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
137 streamingPlaylist: this.streamingPlaylist,
138 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
139 })
140
126 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) 141 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
127 } 142 }
128 143
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter {
159 174
160 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) 175 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
161 176
162 this.watchTSFiles()
163 this.watchMasterFile() 177 this.watchMasterFile()
178 this.watchTSFiles()
179 this.watchM3U8File()
164 180
165 let ffmpegShellCommand: string 181 let ffmpegShellCommand: string
166 this.ffmpegCommand.on('start', cmdline => { 182 this.ffmpegCommand.on('start', cmdline => {
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter {
219 setTimeout(() => { 235 setTimeout(() => {
220 // Wait latest segments generation, and close watchers 236 // Wait latest segments generation, and close watchers
221 237
222 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) 238 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
223 .then(() => { 239 .then(() => {
224 // Process remaining segments hash 240 // Process remaining segments hash
225 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { 241 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
@@ -240,14 +256,48 @@ class MuxingSession extends EventEmitter {
240 private watchMasterFile () { 256 private watchMasterFile () {
241 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 257 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
242 258
243 this.masterWatcher.on('add', () => { 259 this.masterWatcher.on('add', async () => {
244 this.emit('master-playlist-created', { videoId: this.videoId }) 260 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
261 try {
262 const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
263
264 this.streamingPlaylist.playlistUrl = url
265 await this.streamingPlaylist.save()
266 } catch (err) {
267 logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
268 }
269 }
270
271 this.masterPlaylistCreated = true
245 272
246 this.masterWatcher.close() 273 this.masterWatcher.close()
247 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 274 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
248 }) 275 })
249 } 276 }
250 277
278 private watchM3U8File () {
279 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
280
281 const sendQueues = new Map<string, PQueue>()
282
283 const onChangeOrAdd = async (m3u8Path: string) => {
284 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
285
286 try {
287 if (!sendQueues.has(m3u8Path)) {
288 sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
289 }
290
291 const queue = sendQueues.get(m3u8Path)
292 await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
293 } catch (err) {
294 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
295 }
296 }
297
298 this.m3u8Watcher.on('change', onChangeOrAdd)
299 }
300
251 private watchTSFiles () { 301 private watchTSFiles () {
252 const startStreamDateTime = new Date().getTime() 302 const startStreamDateTime = new Date().getTime()
253 303
@@ -282,7 +332,21 @@ class MuxingSession extends EventEmitter {
282 } 332 }
283 } 333 }
284 334
285 const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) 335 const deleteHandler = async (segmentPath: string) => {
336 try {
337 await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
338 } catch (err) {
339 logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
340 }
341
342 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
343 try {
344 await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
345 } catch (err) {
346 logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
347 }
348 }
349 }
286 350
287 this.tsWatcher.on('add', p => addHandler(p)) 351 this.tsWatcher.on('add', p => addHandler(p))
288 this.tsWatcher.on('unlink', p => deleteHandler(p)) 352 this.tsWatcher.on('unlink', p => deleteHandler(p))
@@ -315,6 +379,7 @@ class MuxingSession extends EventEmitter {
315 extname: '.ts', 379 extname: '.ts',
316 infoHash: null, 380 infoHash: null,
317 fps: this.fps, 381 fps: this.fps,
382 storage: this.streamingPlaylist.storage,
318 videoStreamingPlaylistId: this.streamingPlaylist.id 383 videoStreamingPlaylistId: this.streamingPlaylist.id
319 }) 384 })
320 385
@@ -343,18 +408,36 @@ class MuxingSession extends EventEmitter {
343 } 408 }
344 409
345 private processSegments (segmentPaths: string[]) { 410 private processSegments (segmentPaths: string[]) {
346 mapSeries(segmentPaths, async previousSegment => { 411 mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
347 // Add sha hash of previous segments, because ffmpeg should have finished generating them 412 .catch(err => {
348 await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) 413 if (this.aborted) return
414
415 logger.error('Cannot process segments', { err, ...this.lTags() })
416 })
417 }
418
419 private async processSegment (segmentPath: string) {
420 // Add sha hash of previous segments, because ffmpeg should have finished generating them
421 await this.liveSegmentShaStore.addSegmentSha(segmentPath)
422
423 if (this.saveReplay) {
424 await this.addSegmentToReplay(segmentPath)
425 }
349 426
350 if (this.saveReplay) { 427 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
351 await this.addSegmentToReplay(previousSegment) 428 try {
429 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
430 } catch (err) {
431 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
352 } 432 }
353 }).catch(err => { 433 }
354 if (this.aborted) return
355 434
356 logger.error('Cannot process segments', { err, ...this.lTags() }) 435 // Master playlist and segment JSON file are created, live is ready
357 }) 436 if (this.masterPlaylistCreated && !this.liveReady) {
437 this.liveReady = true
438
439 this.emit('live-ready', { videoId: this.videoId })
440 }
358 } 441 }
359 442
360 private hasClientSocketInBadHealth (sessionId: string) { 443 private hasClientSocketInBadHealth (sessionId: string) {
diff --git a/server/lib/moderation.ts b/server/lib/moderation.ts
index c23f5b6a6..3cc92ca30 100644
--- a/server/lib/moderation.ts
+++ b/server/lib/moderation.ts
@@ -1,4 +1,4 @@
1import { VideoUploadFile } from 'express' 1import express, { VideoUploadFile } from 'express'
2import { PathLike } from 'fs-extra' 2import { PathLike } from 'fs-extra'
3import { Transaction } from 'sequelize/types' 3import { Transaction } from 'sequelize/types'
4import { AbuseAuditView, auditLoggerFactory } from '@server/helpers/audit-logger' 4import { AbuseAuditView, auditLoggerFactory } from '@server/helpers/audit-logger'
@@ -13,18 +13,15 @@ import {
13 MAbuseFull, 13 MAbuseFull,
14 MAccountDefault, 14 MAccountDefault,
15 MAccountLight, 15 MAccountLight,
16 MComment,
16 MCommentAbuseAccountVideo, 17 MCommentAbuseAccountVideo,
17 MCommentOwnerVideo, 18 MCommentOwnerVideo,
18 MUser, 19 MUser,
19 MVideoAbuseVideoFull, 20 MVideoAbuseVideoFull,
20 MVideoAccountLightBlacklistAllFiles 21 MVideoAccountLightBlacklistAllFiles
21} from '@server/types/models' 22} from '@server/types/models'
22import { ActivityCreate } from '../../shared/models/activitypub'
23import { VideoObject } from '../../shared/models/activitypub/objects'
24import { VideoCommentObject } from '../../shared/models/activitypub/objects/video-comment-object'
25import { LiveVideoCreate, VideoCreate, VideoImportCreate } from '../../shared/models/videos' 23import { LiveVideoCreate, VideoCreate, VideoImportCreate } from '../../shared/models/videos'
26import { VideoCommentCreate } from '../../shared/models/videos/comment' 24import { VideoCommentCreate } from '../../shared/models/videos/comment'
27import { ActorModel } from '../models/actor/actor'
28import { UserModel } from '../models/user/user' 25import { UserModel } from '../models/user/user'
29import { VideoModel } from '../models/video/video' 26import { VideoModel } from '../models/video/video'
30import { VideoCommentModel } from '../models/video/video-comment' 27import { VideoCommentModel } from '../models/video/video-comment'
@@ -36,7 +33,9 @@ export type AcceptResult = {
36 errorMessage?: string 33 errorMessage?: string
37} 34}
38 35
39// Can be filtered by plugins 36// ---------------------------------------------------------------------------
37
38// Stub function that can be filtered by plugins
40function isLocalVideoAccepted (object: { 39function isLocalVideoAccepted (object: {
41 videoBody: VideoCreate 40 videoBody: VideoCreate
42 videoFile: VideoUploadFile 41 videoFile: VideoUploadFile
@@ -45,6 +44,9 @@ function isLocalVideoAccepted (object: {
45 return { accepted: true } 44 return { accepted: true }
46} 45}
47 46
47// ---------------------------------------------------------------------------
48
49// Stub function that can be filtered by plugins
48function isLocalLiveVideoAccepted (object: { 50function isLocalLiveVideoAccepted (object: {
49 liveVideoBody: LiveVideoCreate 51 liveVideoBody: LiveVideoCreate
50 user: UserModel 52 user: UserModel
@@ -52,7 +54,11 @@ function isLocalLiveVideoAccepted (object: {
52 return { accepted: true } 54 return { accepted: true }
53} 55}
54 56
57// ---------------------------------------------------------------------------
58
59// Stub function that can be filtered by plugins
55function isLocalVideoThreadAccepted (_object: { 60function isLocalVideoThreadAccepted (_object: {
61 req: express.Request
56 commentBody: VideoCommentCreate 62 commentBody: VideoCommentCreate
57 video: VideoModel 63 video: VideoModel
58 user: UserModel 64 user: UserModel
@@ -60,7 +66,9 @@ function isLocalVideoThreadAccepted (_object: {
60 return { accepted: true } 66 return { accepted: true }
61} 67}
62 68
69// Stub function that can be filtered by plugins
63function isLocalVideoCommentReplyAccepted (_object: { 70function isLocalVideoCommentReplyAccepted (_object: {
71 req: express.Request
64 commentBody: VideoCommentCreate 72 commentBody: VideoCommentCreate
65 parentComment: VideoCommentModel 73 parentComment: VideoCommentModel
66 video: VideoModel 74 video: VideoModel
@@ -69,22 +77,18 @@ function isLocalVideoCommentReplyAccepted (_object: {
69 return { accepted: true } 77 return { accepted: true }
70} 78}
71 79
72function isRemoteVideoAccepted (_object: { 80// ---------------------------------------------------------------------------
73 activity: ActivityCreate
74 videoAP: VideoObject
75 byActor: ActorModel
76}): AcceptResult {
77 return { accepted: true }
78}
79 81
82// Stub function that can be filtered by plugins
80function isRemoteVideoCommentAccepted (_object: { 83function isRemoteVideoCommentAccepted (_object: {
81 activity: ActivityCreate 84 comment: MComment
82 commentAP: VideoCommentObject
83 byActor: ActorModel
84}): AcceptResult { 85}): AcceptResult {
85 return { accepted: true } 86 return { accepted: true }
86} 87}
87 88
89// ---------------------------------------------------------------------------
90
91// Stub function that can be filtered by plugins
88function isPreImportVideoAccepted (object: { 92function isPreImportVideoAccepted (object: {
89 videoImportBody: VideoImportCreate 93 videoImportBody: VideoImportCreate
90 user: MUser 94 user: MUser
@@ -92,6 +96,7 @@ function isPreImportVideoAccepted (object: {
92 return { accepted: true } 96 return { accepted: true }
93} 97}
94 98
99// Stub function that can be filtered by plugins
95function isPostImportVideoAccepted (object: { 100function isPostImportVideoAccepted (object: {
96 videoFilePath: PathLike 101 videoFilePath: PathLike
97 videoFile: VideoFileModel 102 videoFile: VideoFileModel
@@ -100,6 +105,8 @@ function isPostImportVideoAccepted (object: {
100 return { accepted: true } 105 return { accepted: true }
101} 106}
102 107
108// ---------------------------------------------------------------------------
109
103async function createVideoAbuse (options: { 110async function createVideoAbuse (options: {
104 baseAbuse: FilteredModelAttributes<AbuseModel> 111 baseAbuse: FilteredModelAttributes<AbuseModel>
105 videoInstance: MVideoAccountLightBlacklistAllFiles 112 videoInstance: MVideoAccountLightBlacklistAllFiles
@@ -189,12 +196,13 @@ function createAccountAbuse (options: {
189 }) 196 })
190} 197}
191 198
199// ---------------------------------------------------------------------------
200
192export { 201export {
193 isLocalLiveVideoAccepted, 202 isLocalLiveVideoAccepted,
194 203
195 isLocalVideoAccepted, 204 isLocalVideoAccepted,
196 isLocalVideoThreadAccepted, 205 isLocalVideoThreadAccepted,
197 isRemoteVideoAccepted,
198 isRemoteVideoCommentAccepted, 206 isRemoteVideoCommentAccepted,
199 isLocalVideoCommentReplyAccepted, 207 isLocalVideoCommentReplyAccepted,
200 isPreImportVideoAccepted, 208 isPreImportVideoAccepted,
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index 16161362c..3046d76bc 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -1,19 +1,23 @@
1import { map } from 'bluebird'
1import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra' 2import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
2import { dirname } from 'path' 3import { dirname } from 'path'
3import { Readable } from 'stream' 4import { Readable } from 'stream'
4import { 5import {
6 _Object,
5 CompleteMultipartUploadCommandOutput, 7 CompleteMultipartUploadCommandOutput,
6 DeleteObjectCommand, 8 DeleteObjectCommand,
7 GetObjectCommand, 9 GetObjectCommand,
8 ListObjectsV2Command, 10 ListObjectsV2Command,
9 PutObjectCommandInput 11 PutObjectAclCommand,
12 PutObjectCommandInput,
13 S3Client
10} from '@aws-sdk/client-s3' 14} from '@aws-sdk/client-s3'
11import { Upload } from '@aws-sdk/lib-storage' 15import { Upload } from '@aws-sdk/lib-storage'
12import { pipelinePromise } from '@server/helpers/core-utils' 16import { pipelinePromise } from '@server/helpers/core-utils'
13import { isArray } from '@server/helpers/custom-validators/misc' 17import { isArray } from '@server/helpers/custom-validators/misc'
14import { logger } from '@server/helpers/logger' 18import { logger } from '@server/helpers/logger'
15import { CONFIG } from '@server/initializers/config' 19import { CONFIG } from '@server/initializers/config'
16import { getPrivateUrl } from '../urls' 20import { getInternalUrl } from '../urls'
17import { getClient } from './client' 21import { getClient } from './client'
18import { lTags } from './logger' 22import { lTags } from './logger'
19 23
@@ -22,73 +26,125 @@ type BucketInfo = {
22 PREFIX?: string 26 PREFIX?: string
23} 27}
24 28
29async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
30 const s3Client = getClient()
31
32 const commandPrefix = bucketInfo.PREFIX + prefix
33 const listCommand = new ListObjectsV2Command({
34 Bucket: bucketInfo.BUCKET_NAME,
35 Prefix: commandPrefix
36 })
37
38 const listedObjects = await s3Client.send(listCommand)
39
40 if (isArray(listedObjects.Contents) !== true) return []
41
42 return listedObjects.Contents.map(c => c.Key)
43}
44
45// ---------------------------------------------------------------------------
46
25async function storeObject (options: { 47async function storeObject (options: {
26 inputPath: string 48 inputPath: string
27 objectStorageKey: string 49 objectStorageKey: string
28 bucketInfo: BucketInfo 50 bucketInfo: BucketInfo
51 isPrivate: boolean
29}): Promise<string> { 52}): Promise<string> {
30 const { inputPath, objectStorageKey, bucketInfo } = options 53 const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options
31 54
32 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) 55 logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
33 56
34 const fileStream = createReadStream(inputPath) 57 const fileStream = createReadStream(inputPath)
35 58
36 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) 59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
37} 60}
38 61
39async function removeObject (filename: string, bucketInfo: BucketInfo) { 62// ---------------------------------------------------------------------------
40 const command = new DeleteObjectCommand({ 63
64function updateObjectACL (options: {
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}) {
69 const { objectStorageKey, bucketInfo, isPrivate } = options
70
71 const key = buildKey(objectStorageKey, bucketInfo)
72
73 logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags())
74
75 const command = new PutObjectAclCommand({
41 Bucket: bucketInfo.BUCKET_NAME, 76 Bucket: bucketInfo.BUCKET_NAME,
42 Key: buildKey(filename, bucketInfo) 77 Key: key,
78 ACL: getACL(isPrivate)
43 }) 79 })
44 80
45 return getClient().send(command) 81 return getClient().send(command)
46} 82}
47 83
48async function removePrefix (prefix: string, bucketInfo: BucketInfo) { 84function updatePrefixACL (options: {
49 const s3Client = getClient() 85 prefix: string
50 86 bucketInfo: BucketInfo
51 const commandPrefix = bucketInfo.PREFIX + prefix 87 isPrivate: boolean
52 const listCommand = new ListObjectsV2Command({ 88}) {
53 Bucket: bucketInfo.BUCKET_NAME, 89 const { prefix, bucketInfo, isPrivate } = options
54 Prefix: commandPrefix 90
91 logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
92
93 return applyOnPrefix({
94 prefix,
95 bucketInfo,
96 commandBuilder: obj => {
97 logger.debug('Updating ACL of %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
98
99 return new PutObjectAclCommand({
100 Bucket: bucketInfo.BUCKET_NAME,
101 Key: obj.Key,
102 ACL: getACL(isPrivate)
103 })
104 }
55 }) 105 })
106}
56 107
57 const listedObjects = await s3Client.send(listCommand) 108// ---------------------------------------------------------------------------
58
59 // FIXME: use bulk delete when s3ninja will support this operation
60 // const deleteParams = {
61 // Bucket: bucketInfo.BUCKET_NAME,
62 // Delete: { Objects: [] }
63 // }
64 109
65 if (isArray(listedObjects.Contents) !== true) { 110function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) {
66 const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` 111 const key = buildKey(objectStorageKey, bucketInfo)
67 112
68 logger.error(message, { response: listedObjects, ...lTags() }) 113 return removeObjectByFullKey(key, bucketInfo)
69 throw new Error(message) 114}
70 }
71 115
72 for (const object of listedObjects.Contents) { 116function removeObjectByFullKey (fullKey: string, bucketInfo: BucketInfo) {
73 const command = new DeleteObjectCommand({ 117 logger.debug('Removing file %s in bucket %s', fullKey, bucketInfo.BUCKET_NAME, lTags())
74 Bucket: bucketInfo.BUCKET_NAME,
75 Key: object.Key
76 })
77 118
78 await s3Client.send(command) 119 const command = new DeleteObjectCommand({
120 Bucket: bucketInfo.BUCKET_NAME,
121 Key: fullKey
122 })
79 123
80 // FIXME: use bulk delete when s3ninja will support this operation 124 return getClient().send(command)
81 // deleteParams.Delete.Objects.push({ Key: object.Key }) 125}
82 }
83 126
127async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
84 // FIXME: use bulk delete when s3ninja will support this operation 128 // FIXME: use bulk delete when s3ninja will support this operation
85 // const deleteCommand = new DeleteObjectsCommand(deleteParams)
86 // await s3Client.send(deleteCommand)
87 129
88 // Repeat if not all objects could be listed at once (limit of 1000?) 130 logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags())
89 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) 131
132 return applyOnPrefix({
133 prefix,
134 bucketInfo,
135 commandBuilder: obj => {
136 logger.debug('Removing %s inside prefix %s in bucket %s', obj.Key, prefix, bucketInfo.BUCKET_NAME, lTags())
137
138 return new DeleteObjectCommand({
139 Bucket: bucketInfo.BUCKET_NAME,
140 Key: obj.Key
141 })
142 }
143 })
90} 144}
91 145
146// ---------------------------------------------------------------------------
147
92async function makeAvailable (options: { 148async function makeAvailable (options: {
93 key: string 149 key: string
94 destination: string 150 destination: string
@@ -116,13 +172,43 @@ function buildKey (key: string, bucketInfo: BucketInfo) {
116 172
117// --------------------------------------------------------------------------- 173// ---------------------------------------------------------------------------
118 174
175async function createObjectReadStream (options: {
176 key: string
177 bucketInfo: BucketInfo
178 rangeHeader: string
179}) {
180 const { key, bucketInfo, rangeHeader } = options
181
182 const command = new GetObjectCommand({
183 Bucket: bucketInfo.BUCKET_NAME,
184 Key: buildKey(key, bucketInfo),
185 Range: rangeHeader
186 })
187
188 const response = await getClient().send(command)
189
190 return response.Body as Readable
191}
192
193// ---------------------------------------------------------------------------
194
119export { 195export {
120 BucketInfo, 196 BucketInfo,
121 buildKey, 197 buildKey,
198
122 storeObject, 199 storeObject,
200
123 removeObject, 201 removeObject,
202 removeObjectByFullKey,
124 removePrefix, 203 removePrefix,
125 makeAvailable 204
205 makeAvailable,
206
207 updateObjectACL,
208 updatePrefixACL,
209
210 listKeysOfPrefix,
211 createObjectReadStream
126} 212}
127 213
128// --------------------------------------------------------------------------- 214// ---------------------------------------------------------------------------
@@ -131,17 +217,15 @@ async function uploadToStorage (options: {
131 content: ReadStream 217 content: ReadStream
132 objectStorageKey: string 218 objectStorageKey: string
133 bucketInfo: BucketInfo 219 bucketInfo: BucketInfo
220 isPrivate: boolean
134}) { 221}) {
135 const { content, objectStorageKey, bucketInfo } = options 222 const { content, objectStorageKey, bucketInfo, isPrivate } = options
136 223
137 const input: PutObjectCommandInput = { 224 const input: PutObjectCommandInput = {
138 Body: content, 225 Body: content,
139 Bucket: bucketInfo.BUCKET_NAME, 226 Bucket: bucketInfo.BUCKET_NAME,
140 Key: buildKey(objectStorageKey, bucketInfo) 227 Key: buildKey(objectStorageKey, bucketInfo),
141 } 228 ACL: getACL(isPrivate)
142
143 if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
144 input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
145 } 229 }
146 230
147 const parallelUploads3 = new Upload({ 231 const parallelUploads3 = new Upload({
@@ -171,5 +255,50 @@ async function uploadToStorage (options: {
171 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() 255 bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
172 ) 256 )
173 257
174 return getPrivateUrl(bucketInfo, objectStorageKey) 258 return getInternalUrl(bucketInfo, objectStorageKey)
259}
260
261async function applyOnPrefix (options: {
262 prefix: string
263 bucketInfo: BucketInfo
264 commandBuilder: (obj: _Object) => Parameters<S3Client['send']>[0]
265
266 continuationToken?: string
267}) {
268 const { prefix, bucketInfo, commandBuilder, continuationToken } = options
269
270 const s3Client = getClient()
271
272 const commandPrefix = buildKey(prefix, bucketInfo)
273 const listCommand = new ListObjectsV2Command({
274 Bucket: bucketInfo.BUCKET_NAME,
275 Prefix: commandPrefix,
276 ContinuationToken: continuationToken
277 })
278
279 const listedObjects = await s3Client.send(listCommand)
280
281 if (isArray(listedObjects.Contents) !== true) {
282 const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
283
284 logger.error(message, { response: listedObjects, ...lTags() })
285 throw new Error(message)
286 }
287
288 await map(listedObjects.Contents, object => {
289 const command = commandBuilder(object)
290
291 return s3Client.send(command)
292 }, { concurrency: 10 })
293
294 // Repeat if not all objects could be listed at once (limit of 1000?)
295 if (listedObjects.IsTruncated) {
296 await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken })
297 }
298}
299
300function getACL (isPrivate: boolean) {
301 return isPrivate
302 ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE
303 : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC
175} 304}
diff --git a/server/lib/object-storage/urls.ts b/server/lib/object-storage/urls.ts
index 2a889190b..a47a98b98 100644
--- a/server/lib/object-storage/urls.ts
+++ b/server/lib/object-storage/urls.ts
@@ -1,10 +1,14 @@
1import { CONFIG } from '@server/initializers/config' 1import { CONFIG } from '@server/initializers/config'
2import { OBJECT_STORAGE_PROXY_PATHS, WEBSERVER } from '@server/initializers/constants'
3import { MVideoUUID } from '@server/types/models'
2import { BucketInfo, buildKey, getEndpointParsed } from './shared' 4import { BucketInfo, buildKey, getEndpointParsed } from './shared'
3 5
4function getPrivateUrl (config: BucketInfo, keyWithoutPrefix: string) { 6function getInternalUrl (config: BucketInfo, keyWithoutPrefix: string) {
5 return getBaseUrl(config) + buildKey(keyWithoutPrefix, config) 7 return getBaseUrl(config) + buildKey(keyWithoutPrefix, config)
6} 8}
7 9
10// ---------------------------------------------------------------------------
11
8function getWebTorrentPublicFileUrl (fileUrl: string) { 12function getWebTorrentPublicFileUrl (fileUrl: string) {
9 const baseUrl = CONFIG.OBJECT_STORAGE.VIDEOS.BASE_URL 13 const baseUrl = CONFIG.OBJECT_STORAGE.VIDEOS.BASE_URL
10 if (!baseUrl) return fileUrl 14 if (!baseUrl) return fileUrl
@@ -19,11 +23,28 @@ function getHLSPublicFileUrl (fileUrl: string) {
19 return replaceByBaseUrl(fileUrl, baseUrl) 23 return replaceByBaseUrl(fileUrl, baseUrl)
20} 24}
21 25
26// ---------------------------------------------------------------------------
27
28function getHLSPrivateFileUrl (video: MVideoUUID, filename: string) {
29 return WEBSERVER.URL + OBJECT_STORAGE_PROXY_PATHS.STREAMING_PLAYLISTS.PRIVATE_HLS + video.uuid + `/${filename}`
30}
31
32function getWebTorrentPrivateFileUrl (filename: string) {
33 return WEBSERVER.URL + OBJECT_STORAGE_PROXY_PATHS.PRIVATE_WEBSEED + filename
34}
35
36// ---------------------------------------------------------------------------
37
22export { 38export {
23 getPrivateUrl, 39 getInternalUrl,
40
24 getWebTorrentPublicFileUrl, 41 getWebTorrentPublicFileUrl,
25 replaceByBaseUrl, 42 getHLSPublicFileUrl,
26 getHLSPublicFileUrl 43
44 getHLSPrivateFileUrl,
45 getWebTorrentPrivateFileUrl,
46
47 replaceByBaseUrl
27} 48}
28 49
29// --------------------------------------------------------------------------- 50// ---------------------------------------------------------------------------
diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts
index 66e738200..b764e4b22 100644
--- a/server/lib/object-storage/videos.ts
+++ b/server/lib/object-storage/videos.ts
@@ -1,39 +1,102 @@
1import { join } from 'path' 1import { basename, join } from 'path'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config' 3import { CONFIG } from '@server/initializers/config'
4import { MStreamingPlaylistVideo, MVideoFile } from '@server/types/models' 4import { MStreamingPlaylistVideo, MVideo, MVideoFile } from '@server/types/models'
5import { getHLSDirectory } from '../paths' 5import { getHLSDirectory } from '../paths'
6import { VideoPathManager } from '../video-path-manager'
6import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys' 7import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys'
7import { lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared' 8import {
9 createObjectReadStream,
10 listKeysOfPrefix,
11 lTags,
12 makeAvailable,
13 removeObject,
14 removeObjectByFullKey,
15 removePrefix,
16 storeObject,
17 updateObjectACL,
18 updatePrefixACL
19} from './shared'
20
21function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) {
22 return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
23}
24
25// ---------------------------------------------------------------------------
8 26
9function storeHLSFile (playlist: MStreamingPlaylistVideo, filename: string, path?: string) { 27function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename: string) {
10 return storeObject({ 28 return storeObject({
11 inputPath: path ?? join(getHLSDirectory(playlist.Video), filename), 29 inputPath: join(getHLSDirectory(playlist.Video), filename),
12 objectStorageKey: generateHLSObjectStorageKey(playlist, filename), 30 objectStorageKey: generateHLSObjectStorageKey(playlist, filename),
13 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS 31 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
32 isPrivate: playlist.Video.hasPrivateStaticPath()
14 }) 33 })
15} 34}
16 35
17function storeWebTorrentFile (filename: string) { 36function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) {
18 return storeObject({ 37 return storeObject({
19 inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename), 38 inputPath: path,
20 objectStorageKey: generateWebTorrentObjectStorageKey(filename), 39 objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
21 bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS 40 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
41 isPrivate: playlist.Video.hasPrivateStaticPath()
42 })
43}
44
45// ---------------------------------------------------------------------------
46
47function storeWebTorrentFile (video: MVideo, file: MVideoFile) {
48 return storeObject({
49 inputPath: VideoPathManager.Instance.getFSVideoFileOutputPath(video, file),
50 objectStorageKey: generateWebTorrentObjectStorageKey(file.filename),
51 bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS,
52 isPrivate: video.hasPrivateStaticPath()
53 })
54}
55
56// ---------------------------------------------------------------------------
57
58function updateWebTorrentFileACL (video: MVideo, file: MVideoFile) {
59 return updateObjectACL({
60 objectStorageKey: generateWebTorrentObjectStorageKey(file.filename),
61 bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS,
62 isPrivate: video.hasPrivateStaticPath()
63 })
64}
65
66function updateHLSFilesACL (playlist: MStreamingPlaylistVideo) {
67 return updatePrefixACL({
68 prefix: generateHLSObjectBaseStorageKey(playlist),
69 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
70 isPrivate: playlist.Video.hasPrivateStaticPath()
22 }) 71 })
23} 72}
24 73
74// ---------------------------------------------------------------------------
75
25function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) { 76function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) {
26 return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) 77 return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
27} 78}
28 79
29function removeHLSFileObjectStorage (playlist: MStreamingPlaylistVideo, filename: string) { 80function removeHLSFileObjectStorageByFilename (playlist: MStreamingPlaylistVideo, filename: string) {
30 return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) 81 return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
31} 82}
32 83
84function removeHLSFileObjectStorageByPath (playlist: MStreamingPlaylistVideo, path: string) {
85 return removeObject(generateHLSObjectStorageKey(playlist, basename(path)), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
86}
87
88function removeHLSFileObjectStorageByFullKey (key: string) {
89 return removeObjectByFullKey(key, CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
90}
91
92// ---------------------------------------------------------------------------
93
33function removeWebTorrentObjectStorage (videoFile: MVideoFile) { 94function removeWebTorrentObjectStorage (videoFile: MVideoFile) {
34 return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS) 95 return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS)
35} 96}
36 97
98// ---------------------------------------------------------------------------
99
37async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) { 100async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) {
38 const key = generateHLSObjectStorageKey(playlist, filename) 101 const key = generateHLSObjectStorageKey(playlist, filename)
39 102
@@ -62,14 +125,61 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin
62 return destination 125 return destination
63} 126}
64 127
128// ---------------------------------------------------------------------------
129
130function getWebTorrentFileReadStream (options: {
131 filename: string
132 rangeHeader: string
133}) {
134 const { filename, rangeHeader } = options
135
136 const key = generateWebTorrentObjectStorageKey(filename)
137
138 return createObjectReadStream({
139 key,
140 bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS,
141 rangeHeader
142 })
143}
144
145function getHLSFileReadStream (options: {
146 playlist: MStreamingPlaylistVideo
147 filename: string
148 rangeHeader: string
149}) {
150 const { playlist, filename, rangeHeader } = options
151
152 const key = generateHLSObjectStorageKey(playlist, filename)
153
154 return createObjectReadStream({
155 key,
156 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
157 rangeHeader
158 })
159}
160
161// ---------------------------------------------------------------------------
162
65export { 163export {
164 listHLSFileKeysOf,
165
66 storeWebTorrentFile, 166 storeWebTorrentFile,
67 storeHLSFile, 167 storeHLSFileFromFilename,
168 storeHLSFileFromPath,
169
170 updateWebTorrentFileACL,
171 updateHLSFilesACL,
68 172
69 removeHLSObjectStorage, 173 removeHLSObjectStorage,
70 removeHLSFileObjectStorage, 174 removeHLSFileObjectStorageByFilename,
175 removeHLSFileObjectStorageByPath,
176 removeHLSFileObjectStorageByFullKey,
177
71 removeWebTorrentObjectStorage, 178 removeWebTorrentObjectStorage,
72 179
73 makeWebTorrentFileAvailable, 180 makeWebTorrentFileAvailable,
74 makeHLSFileAvailable 181 makeHLSFileAvailable,
182
183 getWebTorrentFileReadStream,
184 getHLSFileReadStream
75} 185}
diff --git a/server/lib/paths.ts b/server/lib/paths.ts
index b29854700..470970f55 100644
--- a/server/lib/paths.ts
+++ b/server/lib/paths.ts
@@ -1,9 +1,10 @@
1import { join } from 'path' 1import { join } from 'path'
2import { CONFIG } from '@server/initializers/config' 2import { CONFIG } from '@server/initializers/config'
3import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY, VIDEO_LIVE } from '@server/initializers/constants' 3import { DIRECTORIES, VIDEO_LIVE } from '@server/initializers/constants'
4import { isStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoUUID } from '@server/types/models' 4import { isStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoUUID } from '@server/types/models'
5import { removeFragmentedMP4Ext } from '@shared/core-utils' 5import { removeFragmentedMP4Ext } from '@shared/core-utils'
6import { buildUUID } from '@shared/extra-utils' 6import { buildUUID } from '@shared/extra-utils'
7import { isVideoInPrivateDirectory } from './video-privacy'
7 8
8// ################## Video file name ################## 9// ################## Video file name ##################
9 10
@@ -17,20 +18,24 @@ function generateHLSVideoFilename (resolution: number) {
17 18
18// ################## Streaming playlist ################## 19// ################## Streaming playlist ##################
19 20
20function getLiveDirectory (video: MVideoUUID) { 21function getLiveDirectory (video: MVideo) {
21 return getHLSDirectory(video) 22 return getHLSDirectory(video)
22} 23}
23 24
24function getLiveReplayBaseDirectory (video: MVideoUUID) { 25function getLiveReplayBaseDirectory (video: MVideo) {
25 return join(getLiveDirectory(video), VIDEO_LIVE.REPLAY_DIRECTORY) 26 return join(getLiveDirectory(video), VIDEO_LIVE.REPLAY_DIRECTORY)
26} 27}
27 28
28function getHLSDirectory (video: MVideoUUID) { 29function getHLSDirectory (video: MVideo) {
29 return join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid) 30 if (isVideoInPrivateDirectory(video.privacy)) {
31 return join(DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE, video.uuid)
32 }
33
34 return join(DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC, video.uuid)
30} 35}
31 36
32function getHLSRedundancyDirectory (video: MVideoUUID) { 37function getHLSRedundancyDirectory (video: MVideoUUID) {
33 return join(HLS_REDUNDANCY_DIRECTORY, video.uuid) 38 return join(DIRECTORIES.HLS_REDUNDANCY, video.uuid)
34} 39}
35 40
36function getHlsResolutionPlaylistFilename (videoFilename: string) { 41function getHlsResolutionPlaylistFilename (videoFilename: string) {
diff --git a/server/lib/plugins/plugin-helpers-builder.ts b/server/lib/plugins/plugin-helpers-builder.ts
index 4e799b3d4..7b1def6e3 100644
--- a/server/lib/plugins/plugin-helpers-builder.ts
+++ b/server/lib/plugins/plugin-helpers-builder.ts
@@ -1,4 +1,5 @@
1import express from 'express' 1import express from 'express'
2import { Server } from 'http'
2import { join } from 'path' 3import { join } from 'path'
3import { ffprobePromise } from '@server/helpers/ffmpeg/ffprobe-utils' 4import { ffprobePromise } from '@server/helpers/ffmpeg/ffprobe-utils'
4import { buildLogger } from '@server/helpers/logger' 5import { buildLogger } from '@server/helpers/logger'
@@ -13,15 +14,16 @@ import { ServerBlocklistModel } from '@server/models/server/server-blocklist'
13import { UserModel } from '@server/models/user/user' 14import { UserModel } from '@server/models/user/user'
14import { VideoModel } from '@server/models/video/video' 15import { VideoModel } from '@server/models/video/video'
15import { VideoBlacklistModel } from '@server/models/video/video-blacklist' 16import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
16import { MPlugin } from '@server/types/models' 17import { MPlugin, MVideo, UserNotificationModelForApi } from '@server/types/models'
17import { PeerTubeHelpers } from '@server/types/plugins' 18import { PeerTubeHelpers } from '@server/types/plugins'
18import { VideoBlacklistCreate, VideoStorage } from '@shared/models' 19import { VideoBlacklistCreate, VideoStorage } from '@shared/models'
19import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist' 20import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../blocklist'
21import { PeerTubeSocket } from '../peertube-socket'
20import { ServerConfigManager } from '../server-config-manager' 22import { ServerConfigManager } from '../server-config-manager'
21import { blacklistVideo, unblacklistVideo } from '../video-blacklist' 23import { blacklistVideo, unblacklistVideo } from '../video-blacklist'
22import { VideoPathManager } from '../video-path-manager' 24import { VideoPathManager } from '../video-path-manager'
23 25
24function buildPluginHelpers (pluginModel: MPlugin, npmName: string): PeerTubeHelpers { 26function buildPluginHelpers (httpServer: Server, pluginModel: MPlugin, npmName: string): PeerTubeHelpers {
25 const logger = buildPluginLogger(npmName) 27 const logger = buildPluginLogger(npmName)
26 28
27 const database = buildDatabaseHelpers() 29 const database = buildDatabaseHelpers()
@@ -29,12 +31,14 @@ function buildPluginHelpers (pluginModel: MPlugin, npmName: string): PeerTubeHel
29 31
30 const config = buildConfigHelpers() 32 const config = buildConfigHelpers()
31 33
32 const server = buildServerHelpers() 34 const server = buildServerHelpers(httpServer)
33 35
34 const moderation = buildModerationHelpers() 36 const moderation = buildModerationHelpers()
35 37
36 const plugin = buildPluginRelatedHelpers(pluginModel, npmName) 38 const plugin = buildPluginRelatedHelpers(pluginModel, npmName)
37 39
40 const socket = buildSocketHelpers()
41
38 const user = buildUserHelpers() 42 const user = buildUserHelpers()
39 43
40 return { 44 return {
@@ -45,6 +49,7 @@ function buildPluginHelpers (pluginModel: MPlugin, npmName: string): PeerTubeHel
45 moderation, 49 moderation,
46 plugin, 50 plugin,
47 server, 51 server,
52 socket,
48 user 53 user
49 } 54 }
50} 55}
@@ -65,8 +70,10 @@ function buildDatabaseHelpers () {
65 } 70 }
66} 71}
67 72
68function buildServerHelpers () { 73function buildServerHelpers (httpServer: Server) {
69 return { 74 return {
75 getHTTPServer: () => httpServer,
76
70 getServerActor: () => getServerActor() 77 getServerActor: () => getServerActor()
71 } 78 }
72} 79}
@@ -214,10 +221,23 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) {
214 221
215 getBaseRouterRoute: () => `/plugins/${plugin.name}/${plugin.version}/router/`, 222 getBaseRouterRoute: () => `/plugins/${plugin.name}/${plugin.version}/router/`,
216 223
224 getBaseWebSocketRoute: () => `/plugins/${plugin.name}/${plugin.version}/ws/`,
225
217 getDataDirectoryPath: () => join(CONFIG.STORAGE.PLUGINS_DIR, 'data', npmName) 226 getDataDirectoryPath: () => join(CONFIG.STORAGE.PLUGINS_DIR, 'data', npmName)
218 } 227 }
219} 228}
220 229
230function buildSocketHelpers () {
231 return {
232 sendNotification: (userId: number, notification: UserNotificationModelForApi) => {
233 PeerTubeSocket.Instance.sendNotification(userId, notification)
234 },
235 sendVideoLiveNewState: (video: MVideo) => {
236 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
237 }
238 }
239}
240
221function buildUserHelpers () { 241function buildUserHelpers () {
222 return { 242 return {
223 loadById: (id: number) => { 243 loadById: (id: number) => {
diff --git a/server/lib/plugins/plugin-manager.ts b/server/lib/plugins/plugin-manager.ts
index a46b97fa4..c4d9b6574 100644
--- a/server/lib/plugins/plugin-manager.ts
+++ b/server/lib/plugins/plugin-manager.ts
@@ -1,6 +1,7 @@
1import express from 'express' 1import express from 'express'
2import { createReadStream, createWriteStream } from 'fs' 2import { createReadStream, createWriteStream } from 'fs'
3import { ensureDir, outputFile, readJSON } from 'fs-extra' 3import { ensureDir, outputFile, readJSON } from 'fs-extra'
4import { Server } from 'http'
4import { basename, join } from 'path' 5import { basename, join } from 'path'
5import { decachePlugin } from '@server/helpers/decache' 6import { decachePlugin } from '@server/helpers/decache'
6import { ApplicationModel } from '@server/models/application/application' 7import { ApplicationModel } from '@server/models/application/application'
@@ -67,9 +68,37 @@ export class PluginManager implements ServerHook {
67 private hooks: { [name: string]: HookInformationValue[] } = {} 68 private hooks: { [name: string]: HookInformationValue[] } = {}
68 private translations: PluginLocalesTranslations = {} 69 private translations: PluginLocalesTranslations = {}
69 70
71 private server: Server
72
70 private constructor () { 73 private constructor () {
71 } 74 }
72 75
76 init (server: Server) {
77 this.server = server
78 }
79
80 registerWebSocketRouter () {
81 this.server.on('upgrade', (request, socket, head) => {
82 const url = request.url
83
84 const matched = url.match(`/plugins/([^/]+)/([^/]+/)?ws(/.*)`)
85 if (!matched) return
86
87 const npmName = PluginModel.buildNpmName(matched[1], PluginType.PLUGIN)
88 const subRoute = matched[3]
89
90 const result = this.getRegisteredPluginOrTheme(npmName)
91 if (!result) return
92
93 const routes = result.registerHelpers.getWebSocketRoutes()
94
95 const wss = routes.find(r => r.route.startsWith(subRoute))
96 if (!wss) return
97
98 wss.handler(request, socket, head)
99 })
100 }
101
73 // ###################### Getters ###################### 102 // ###################### Getters ######################
74 103
75 isRegistered (npmName: string) { 104 isRegistered (npmName: string) {
@@ -581,7 +610,7 @@ export class PluginManager implements ServerHook {
581 }) 610 })
582 } 611 }
583 612
584 const registerHelpers = new RegisterHelpers(npmName, plugin, onHookAdded.bind(this)) 613 const registerHelpers = new RegisterHelpers(npmName, plugin, this.server, onHookAdded.bind(this))
585 614
586 return { 615 return {
587 registerStore: registerHelpers, 616 registerStore: registerHelpers,
diff --git a/server/lib/plugins/register-helpers.ts b/server/lib/plugins/register-helpers.ts
index f4d405676..1aaef3606 100644
--- a/server/lib/plugins/register-helpers.ts
+++ b/server/lib/plugins/register-helpers.ts
@@ -1,4 +1,5 @@
1import express from 'express' 1import express from 'express'
2import { Server } from 'http'
2import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
3import { onExternalUserAuthenticated } from '@server/lib/auth/external-auth' 4import { onExternalUserAuthenticated } from '@server/lib/auth/external-auth'
4import { VideoConstantManagerFactory } from '@server/lib/plugins/video-constant-manager-factory' 5import { VideoConstantManagerFactory } from '@server/lib/plugins/video-constant-manager-factory'
@@ -8,7 +9,8 @@ import {
8 RegisterServerAuthExternalResult, 9 RegisterServerAuthExternalResult,
9 RegisterServerAuthPassOptions, 10 RegisterServerAuthPassOptions,
10 RegisterServerExternalAuthenticatedResult, 11 RegisterServerExternalAuthenticatedResult,
11 RegisterServerOptions 12 RegisterServerOptions,
13 RegisterServerWebSocketRouteOptions
12} from '@server/types/plugins' 14} from '@server/types/plugins'
13import { 15import {
14 EncoderOptionsBuilder, 16 EncoderOptionsBuilder,
@@ -49,12 +51,15 @@ export class RegisterHelpers {
49 51
50 private readonly onSettingsChangeCallbacks: SettingsChangeCallback[] = [] 52 private readonly onSettingsChangeCallbacks: SettingsChangeCallback[] = []
51 53
54 private readonly webSocketRoutes: RegisterServerWebSocketRouteOptions[] = []
55
52 private readonly router: express.Router 56 private readonly router: express.Router
53 private readonly videoConstantManagerFactory: VideoConstantManagerFactory 57 private readonly videoConstantManagerFactory: VideoConstantManagerFactory
54 58
55 constructor ( 59 constructor (
56 private readonly npmName: string, 60 private readonly npmName: string,
57 private readonly plugin: PluginModel, 61 private readonly plugin: PluginModel,
62 private readonly server: Server,
58 private readonly onHookAdded: (options: RegisterServerHookOptions) => void 63 private readonly onHookAdded: (options: RegisterServerHookOptions) => void
59 ) { 64 ) {
60 this.router = express.Router() 65 this.router = express.Router()
@@ -66,6 +71,7 @@ export class RegisterHelpers {
66 const registerSetting = this.buildRegisterSetting() 71 const registerSetting = this.buildRegisterSetting()
67 72
68 const getRouter = this.buildGetRouter() 73 const getRouter = this.buildGetRouter()
74 const registerWebSocketRoute = this.buildRegisterWebSocketRoute()
69 75
70 const settingsManager = this.buildSettingsManager() 76 const settingsManager = this.buildSettingsManager()
71 const storageManager = this.buildStorageManager() 77 const storageManager = this.buildStorageManager()
@@ -85,13 +91,14 @@ export class RegisterHelpers {
85 const unregisterIdAndPassAuth = this.buildUnregisterIdAndPassAuth() 91 const unregisterIdAndPassAuth = this.buildUnregisterIdAndPassAuth()
86 const unregisterExternalAuth = this.buildUnregisterExternalAuth() 92 const unregisterExternalAuth = this.buildUnregisterExternalAuth()
87 93
88 const peertubeHelpers = buildPluginHelpers(this.plugin, this.npmName) 94 const peertubeHelpers = buildPluginHelpers(this.server, this.plugin, this.npmName)
89 95
90 return { 96 return {
91 registerHook, 97 registerHook,
92 registerSetting, 98 registerSetting,
93 99
94 getRouter, 100 getRouter,
101 registerWebSocketRoute,
95 102
96 settingsManager, 103 settingsManager,
97 storageManager, 104 storageManager,
@@ -180,10 +187,20 @@ export class RegisterHelpers {
180 return this.onSettingsChangeCallbacks 187 return this.onSettingsChangeCallbacks
181 } 188 }
182 189
190 getWebSocketRoutes () {
191 return this.webSocketRoutes
192 }
193
183 private buildGetRouter () { 194 private buildGetRouter () {
184 return () => this.router 195 return () => this.router
185 } 196 }
186 197
198 private buildRegisterWebSocketRoute () {
199 return (options: RegisterServerWebSocketRouteOptions) => {
200 this.webSocketRoutes.push(options)
201 }
202 }
203
187 private buildRegisterSetting () { 204 private buildRegisterSetting () {
188 return (options: RegisterServerSettingOptions) => { 205 return (options: RegisterServerSettingOptions) => {
189 this.settings.push(options) 206 this.settings.push(options)
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 9b3c72300..b7523492a 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -9,6 +9,7 @@ import {
9 CONTACT_FORM_LIFETIME, 9 CONTACT_FORM_LIFETIME,
10 RESUMABLE_UPLOAD_SESSION_LIFETIME, 10 RESUMABLE_UPLOAD_SESSION_LIFETIME,
11 TRACKER_RATE_LIMITS, 11 TRACKER_RATE_LIMITS,
12 TWO_FACTOR_AUTH_REQUEST_TOKEN_LIFETIME,
12 USER_EMAIL_VERIFY_LIFETIME, 13 USER_EMAIL_VERIFY_LIFETIME,
13 USER_PASSWORD_CREATE_LIFETIME, 14 USER_PASSWORD_CREATE_LIFETIME,
14 USER_PASSWORD_RESET_LIFETIME, 15 USER_PASSWORD_RESET_LIFETIME,
@@ -108,10 +109,24 @@ class Redis {
108 return this.removeValue(this.generateResetPasswordKey(userId)) 109 return this.removeValue(this.generateResetPasswordKey(userId))
109 } 110 }
110 111
111 async getResetPasswordLink (userId: number) { 112 async getResetPasswordVerificationString (userId: number) {
112 return this.getValue(this.generateResetPasswordKey(userId)) 113 return this.getValue(this.generateResetPasswordKey(userId))
113 } 114 }
114 115
116 /* ************ Two factor auth request ************ */
117
118 async setTwoFactorRequest (userId: number, otpSecret: string) {
119 const requestToken = await generateRandomString(32)
120
121 await this.setValue(this.generateTwoFactorRequestKey(userId, requestToken), otpSecret, TWO_FACTOR_AUTH_REQUEST_TOKEN_LIFETIME)
122
123 return requestToken
124 }
125
126 async getTwoFactorRequestToken (userId: number, requestToken: string) {
127 return this.getValue(this.generateTwoFactorRequestKey(userId, requestToken))
128 }
129
115 /* ************ Email verification ************ */ 130 /* ************ Email verification ************ */
116 131
117 async setVerifyEmailVerificationString (userId: number) { 132 async setVerifyEmailVerificationString (userId: number) {
@@ -342,6 +357,10 @@ class Redis {
342 return 'reset-password-' + userId 357 return 'reset-password-' + userId
343 } 358 }
344 359
360 private generateTwoFactorRequestKey (userId: number, token: string) {
361 return 'two-factor-request-' + userId + '-' + token
362 }
363
345 private generateVerifyEmailKey (userId: number) { 364 private generateVerifyEmailKey (userId: number) {
346 return 'verify-email-' + userId 365 return 'verify-email-' + userId
347 } 366 }
@@ -391,8 +410,8 @@ class Redis {
391 return JSON.parse(value) 410 return JSON.parse(value)
392 } 411 }
393 412
394 private setObject (key: string, value: { [ id: string ]: number | string }) { 413 private setObject (key: string, value: { [ id: string ]: number | string }, expirationMilliseconds?: number) {
395 return this.setValue(key, JSON.stringify(value)) 414 return this.setValue(key, JSON.stringify(value), expirationMilliseconds)
396 } 415 }
397 416
398 private async setValue (key: string, value: string, expirationMilliseconds?: number) { 417 private async setValue (key: string, value: string, expirationMilliseconds?: number) {
diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts
index 5bfbc3cd2..e38685c04 100644
--- a/server/lib/schedulers/update-videos-scheduler.ts
+++ b/server/lib/schedulers/update-videos-scheduler.ts
@@ -1,11 +1,14 @@
1import { VideoModel } from '@server/models/video/video' 1import { VideoModel } from '@server/models/video/video'
2import { MVideoFullLight } from '@server/types/models' 2import { MScheduleVideoUpdate } from '@server/types/models'
3import { VideoPrivacy, VideoState } from '@shared/models'
3import { logger } from '../../helpers/logger' 4import { logger } from '../../helpers/logger'
4import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' 5import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
5import { sequelizeTypescript } from '../../initializers/database' 6import { sequelizeTypescript } from '../../initializers/database'
6import { ScheduleVideoUpdateModel } from '../../models/video/schedule-video-update' 7import { ScheduleVideoUpdateModel } from '../../models/video/schedule-video-update'
7import { federateVideoIfNeeded } from '../activitypub/videos'
8import { Notifier } from '../notifier' 8import { Notifier } from '../notifier'
9import { addVideoJobsAfterUpdate } from '../video'
10import { VideoPathManager } from '../video-path-manager'
11import { setVideoPrivacy } from '../video-privacy'
9import { AbstractScheduler } from './abstract-scheduler' 12import { AbstractScheduler } from './abstract-scheduler'
10 13
11export class UpdateVideosScheduler extends AbstractScheduler { 14export class UpdateVideosScheduler extends AbstractScheduler {
@@ -26,35 +29,58 @@ export class UpdateVideosScheduler extends AbstractScheduler {
26 if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined 29 if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined
27 30
28 const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate() 31 const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate()
29 const publishedVideos: MVideoFullLight[] = []
30 32
31 for (const schedule of schedules) { 33 for (const schedule of schedules) {
32 await sequelizeTypescript.transaction(async t => { 34 const videoOnly = await VideoModel.load(schedule.videoId)
33 const video = await VideoModel.loadFull(schedule.videoId, t) 35 const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoOnly.uuid)
34 36
35 logger.info('Executing scheduled video update on %s.', video.uuid) 37 try {
38 const { video, published } = await this.updateAVideo(schedule)
36 39
37 if (schedule.privacy) { 40 if (published) Notifier.Instance.notifyOnVideoPublishedAfterScheduledUpdate(video)
38 const wasConfidentialVideo = video.isConfidential() 41 } catch (err) {
39 const isNewVideo = video.isNewVideo(schedule.privacy) 42 logger.error('Cannot update video', { err })
43 }
40 44
41 video.setPrivacy(schedule.privacy) 45 mutexReleaser()
42 await video.save({ transaction: t }) 46 }
43 await federateVideoIfNeeded(video, isNewVideo, t) 47 }
48
49 private async updateAVideo (schedule: MScheduleVideoUpdate) {
50 let oldPrivacy: VideoPrivacy
51 let isNewVideo: boolean
52 let published = false
53
54 const video = await sequelizeTypescript.transaction(async t => {
55 const video = await VideoModel.loadFull(schedule.videoId, t)
56 if (video.state === VideoState.TO_TRANSCODE) return null
57
58 logger.info('Executing scheduled video update on %s.', video.uuid)
59
60 if (schedule.privacy) {
61 isNewVideo = video.isNewVideo(schedule.privacy)
62 oldPrivacy = video.privacy
44 63
45 if (wasConfidentialVideo) { 64 setVideoPrivacy(video, schedule.privacy)
46 publishedVideos.push(video) 65 await video.save({ transaction: t })
47 } 66
67 if (oldPrivacy === VideoPrivacy.PRIVATE) {
68 published = true
48 } 69 }
70 }
49 71
50 await schedule.destroy({ transaction: t }) 72 await schedule.destroy({ transaction: t })
51 }) 73
52 } 74 return video
75 })
53 76
54 for (const v of publishedVideos) { 77 if (!video) {
55 Notifier.Instance.notifyOnNewVideoIfNeeded(v) 78 return { video, published: false }
56 Notifier.Instance.notifyOnVideoPublishedAfterScheduledUpdate(v)
57 } 79 }
80
81 await addVideoJobsAfterUpdate({ video, oldPrivacy, isNewVideo, nameChanged: false })
82
83 return { video, published }
58 } 84 }
59 85
60 static get Instance () { 86 static get Instance () {
diff --git a/server/lib/schedulers/video-channel-sync-latest-scheduler.ts b/server/lib/schedulers/video-channel-sync-latest-scheduler.ts
index a527f68b5..efb957fac 100644
--- a/server/lib/schedulers/video-channel-sync-latest-scheduler.ts
+++ b/server/lib/schedulers/video-channel-sync-latest-scheduler.ts
@@ -2,7 +2,6 @@ import { logger } from '@server/helpers/logger'
2import { CONFIG } from '@server/initializers/config' 2import { CONFIG } from '@server/initializers/config'
3import { VideoChannelModel } from '@server/models/video/video-channel' 3import { VideoChannelModel } from '@server/models/video/video-channel'
4import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' 4import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync'
5import { VideoChannelSyncState } from '@shared/models'
6import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' 5import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
7import { synchronizeChannel } from '../sync-channel' 6import { synchronizeChannel } from '../sync-channel'
8import { AbstractScheduler } from './abstract-scheduler' 7import { AbstractScheduler } from './abstract-scheduler'
@@ -28,26 +27,20 @@ export class VideoChannelSyncLatestScheduler extends AbstractScheduler {
28 for (const sync of channelSyncs) { 27 for (const sync of channelSyncs) {
29 const channel = await VideoChannelModel.loadAndPopulateAccount(sync.videoChannelId) 28 const channel = await VideoChannelModel.loadAndPopulateAccount(sync.videoChannelId)
30 29
31 try { 30 logger.info(
32 logger.info( 31 'Creating video import jobs for "%s" sync with external channel "%s"',
33 'Creating video import jobs for "%s" sync with external channel "%s"', 32 channel.Actor.preferredUsername, sync.externalChannelUrl
34 channel.Actor.preferredUsername, sync.externalChannelUrl 33 )
35 ) 34
36 35 const onlyAfter = sync.lastSyncAt || sync.createdAt
37 const onlyAfter = sync.lastSyncAt || sync.createdAt 36
38 37 await synchronizeChannel({
39 await synchronizeChannel({ 38 channel,
40 channel, 39 externalChannelUrl: sync.externalChannelUrl,
41 externalChannelUrl: sync.externalChannelUrl, 40 videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.VIDEOS_LIMIT_PER_SYNCHRONIZATION,
42 videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.VIDEOS_LIMIT_PER_SYNCHRONIZATION, 41 channelSync: sync,
43 channelSync: sync, 42 onlyAfter
44 onlyAfter 43 })
45 })
46 } catch (err) {
47 logger.error(`Failed to synchronize channel ${channel.Actor.preferredUsername}`, { err })
48 sync.state = VideoChannelSyncState.FAILED
49 await sync.save()
50 }
51 } 44 }
52 } 45 }
53 46
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts
index 91c217615..dc450c338 100644
--- a/server/lib/schedulers/videos-redundancy-scheduler.ts
+++ b/server/lib/schedulers/videos-redundancy-scheduler.ts
@@ -16,7 +16,7 @@ import { VideosRedundancyStrategy } from '../../../shared/models/redundancy'
16import { logger, loggerTagsFactory } from '../../helpers/logger' 16import { logger, loggerTagsFactory } from '../../helpers/logger'
17import { downloadWebTorrentVideo } from '../../helpers/webtorrent' 17import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
18import { CONFIG } from '../../initializers/config' 18import { CONFIG } from '../../initializers/config'
19import { HLS_REDUNDANCY_DIRECTORY, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants' 19import { DIRECTORIES, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants'
20import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' 20import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
21import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' 21import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
22import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url' 22import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url'
@@ -115,16 +115,29 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
115 for (const redundancyModel of expired) { 115 for (const redundancyModel of expired) {
116 try { 116 try {
117 const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) 117 const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
118
119 // If the admin disabled the redundancy, remove this redundancy instead of extending it
120 if (!redundancyConfig) {
121 logger.info(
122 'Destroying redundancy %s because the redundancy %s does not exist anymore.',
123 redundancyModel.url, redundancyModel.strategy
124 )
125
126 await removeVideoRedundancy(redundancyModel)
127 continue
128 }
129
118 const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy) 130 const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy)
119 131
120 // If the administrator disabled the redundancy or decreased the cache size, remove this redundancy instead of extending it 132 // If the admin decreased the cache size, remove this redundancy instead of extending it
121 if (!redundancyConfig || totalUsed > redundancyConfig.size) { 133 if (totalUsed > redundancyConfig.size) {
122 logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy) 134 logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
123 135
124 await removeVideoRedundancy(redundancyModel) 136 await removeVideoRedundancy(redundancyModel)
125 } else { 137 continue
126 await this.extendsRedundancy(redundancyModel)
127 } 138 }
139
140 await this.extendsRedundancy(redundancyModel)
128 } catch (err) { 141 } catch (err) {
129 logger.error( 142 logger.error(
130 'Cannot extend or remove expiration of %s video from our redundancy system.', 143 'Cannot extend or remove expiration of %s video from our redundancy system.',
@@ -262,7 +275,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
262 275
263 logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid)) 276 logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid))
264 277
265 const destDirectory = join(HLS_REDUNDANCY_DIRECTORY, video.uuid) 278 const destDirectory = join(DIRECTORIES.HLS_REDUNDANCY, video.uuid)
266 const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video) 279 const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video)
267 280
268 const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000 281 const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000
diff --git a/server/lib/sync-channel.ts b/server/lib/sync-channel.ts
index f91599c14..10167ee38 100644
--- a/server/lib/sync-channel.ts
+++ b/server/lib/sync-channel.ts
@@ -1,7 +1,7 @@
1import { logger } from '@server/helpers/logger' 1import { logger } from '@server/helpers/logger'
2import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' 2import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'
3import { CONFIG } from '@server/initializers/config' 3import { CONFIG } from '@server/initializers/config'
4import { buildYoutubeDLImport } from '@server/lib/video-import' 4import { buildYoutubeDLImport } from '@server/lib/video-pre-import'
5import { UserModel } from '@server/models/user/user' 5import { UserModel } from '@server/models/user/user'
6import { VideoImportModel } from '@server/models/video/video-import' 6import { VideoImportModel } from '@server/models/video/video-import'
7import { MChannel, MChannelAccountDefault, MChannelSync } from '@server/types/models' 7import { MChannel, MChannelAccountDefault, MChannelSync } from '@server/types/models'
@@ -12,8 +12,8 @@ import { ServerConfigManager } from './server-config-manager'
12export async function synchronizeChannel (options: { 12export async function synchronizeChannel (options: {
13 channel: MChannelAccountDefault 13 channel: MChannelAccountDefault
14 externalChannelUrl: string 14 externalChannelUrl: string
15 videosCountLimit: number
15 channelSync?: MChannelSync 16 channelSync?: MChannelSync
16 videosCountLimit?: number
17 onlyAfter?: Date 17 onlyAfter?: Date
18}) { 18}) {
19 const { channel, externalChannelUrl, videosCountLimit, onlyAfter, channelSync } = options 19 const { channel, externalChannelUrl, videosCountLimit, onlyAfter, channelSync } = options
@@ -24,56 +24,62 @@ export async function synchronizeChannel (options: {
24 await channelSync.save() 24 await channelSync.save()
25 } 25 }
26 26
27 const user = await UserModel.loadByChannelActorId(channel.actorId) 27 try {
28 const youtubeDL = new YoutubeDLWrapper( 28 const user = await UserModel.loadByChannelActorId(channel.actorId)
29 externalChannelUrl, 29 const youtubeDL = new YoutubeDLWrapper(
30 ServerConfigManager.Instance.getEnabledResolutions('vod'), 30 externalChannelUrl,
31 CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION 31 ServerConfigManager.Instance.getEnabledResolutions('vod'),
32 ) 32 CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
33 )
33 34
34 const targetUrls = await youtubeDL.getInfoForListImport({ latestVideosCount: videosCountLimit }) 35 const targetUrls = await youtubeDL.getInfoForListImport({ latestVideosCount: videosCountLimit })
35 36
36 logger.info( 37 logger.info(
37 'Fetched %d candidate URLs for sync channel %s.', 38 'Fetched %d candidate URLs for sync channel %s.',
38 targetUrls.length, channel.Actor.preferredUsername, { targetUrls } 39 targetUrls.length, channel.Actor.preferredUsername, { targetUrls }
39 ) 40 )
40 41
41 if (targetUrls.length === 0) { 42 if (targetUrls.length === 0) {
42 if (channelSync) { 43 if (channelSync) {
43 channelSync.state = VideoChannelSyncState.SYNCED 44 channelSync.state = VideoChannelSyncState.SYNCED
44 await channelSync.save() 45 await channelSync.save()
46 }
47
48 return
45 } 49 }
46 50
47 return 51 const children: CreateJobArgument[] = []
48 }
49 52
50 const children: CreateJobArgument[] = [] 53 for (const targetUrl of targetUrls) {
54 if (await skipImport(channel, targetUrl, onlyAfter)) continue
51 55
52 for (const targetUrl of targetUrls) { 56 const { job } = await buildYoutubeDLImport({
53 if (await skipImport(channel, targetUrl, onlyAfter)) continue 57 user,
58 channel,
59 targetUrl,
60 channelSync,
61 importDataOverride: {
62 privacy: VideoPrivacy.PUBLIC
63 }
64 })
54 65
55 const { job } = await buildYoutubeDLImport({ 66 children.push(job)
56 user, 67 }
57 channel,
58 targetUrl,
59 channelSync,
60 importDataOverride: {
61 privacy: VideoPrivacy.PUBLIC
62 }
63 })
64
65 children.push(job)
66 }
67 68
68 // Will update the channel sync status 69 // Will update the channel sync status
69 const parent: CreateJobArgument = { 70 const parent: CreateJobArgument = {
70 type: 'after-video-channel-import', 71 type: 'after-video-channel-import',
71 payload: { 72 payload: {
72 channelSyncId: channelSync?.id 73 channelSyncId: channelSync?.id
74 }
73 } 75 }
74 }
75 76
76 await JobQueue.Instance.createJobWithChildren(parent, children) 77 await JobQueue.Instance.createJobWithChildren(parent, children)
78 } catch (err) {
79 logger.error(`Failed to import channel ${channel.name}`, { err })
80 channelSync.state = VideoChannelSyncState.FAILED
81 await channelSync.save()
82 }
77} 83}
78 84
79// --------------------------------------------------------------------------- 85// ---------------------------------------------------------------------------
diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts
index 44e26754d..c7b61e9ba 100644
--- a/server/lib/transcoding/transcoding.ts
+++ b/server/lib/transcoding/transcoding.ts
@@ -1,3 +1,4 @@
1import { MutexInterface } from 'async-mutex'
1import { Job } from 'bullmq' 2import { Job } from 'bullmq'
2import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' 3import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
3import { basename, extname as extnameUtil, join } from 'path' 4import { basename, extname as extnameUtil, join } from 'path'
@@ -6,11 +7,13 @@ import { retryTransactionWrapper } from '@server/helpers/database-utils'
6import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 7import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
7import { sequelizeTypescript } from '@server/initializers/database' 8import { sequelizeTypescript } from '@server/initializers/database'
8import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' 9import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
10import { pick } from '@shared/core-utils'
9import { VideoResolution, VideoStorage } from '../../../shared/models/videos' 11import { VideoResolution, VideoStorage } from '../../../shared/models/videos'
10import { 12import {
11 buildFileMetadata, 13 buildFileMetadata,
12 canDoQuickTranscode, 14 canDoQuickTranscode,
13 computeResolutionsToTranscode, 15 computeResolutionsToTranscode,
16 ffprobePromise,
14 getVideoStreamDuration, 17 getVideoStreamDuration,
15 getVideoStreamFPS, 18 getVideoStreamFPS,
16 transcodeVOD, 19 transcodeVOD,
@@ -33,7 +36,7 @@ import { VideoTranscodingProfilesManager } from './default-transcoding-profiles'
33 */ 36 */
34 37
35// Optimize the original video file and replace it. The resolution is not changed. 38// Optimize the original video file and replace it. The resolution is not changed.
36function optimizeOriginalVideofile (options: { 39async function optimizeOriginalVideofile (options: {
37 video: MVideoFullLight 40 video: MVideoFullLight
38 inputVideoFile: MVideoFile 41 inputVideoFile: MVideoFile
39 job: Job 42 job: Job
@@ -43,49 +46,62 @@ function optimizeOriginalVideofile (options: {
43 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR 46 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
44 const newExtname = '.mp4' 47 const newExtname = '.mp4'
45 48
46 return VideoPathManager.Instance.makeAvailableVideoFile(inputVideoFile.withVideoOrPlaylist(video), async videoInputPath => { 49 // Will be released by our transcodeVOD function once ffmpeg is ran
47 const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) 50 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
48 51
49 const transcodeType: TranscodeVODOptionsType = await canDoQuickTranscode(videoInputPath) 52 try {
50 ? 'quick-transcode' 53 await video.reload()
51 : 'video'
52 54
53 const resolution = buildOriginalFileResolution(inputVideoFile.resolution) 55 const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video)
54 56
55 const transcodeOptions: TranscodeVODOptions = { 57 const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async videoInputPath => {
56 type: transcodeType, 58 const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
57 59
58 inputPath: videoInputPath, 60 const transcodeType: TranscodeVODOptionsType = await canDoQuickTranscode(videoInputPath)
59 outputPath: videoTranscodedPath, 61 ? 'quick-transcode'
62 : 'video'
60 63
61 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 64 const resolution = buildOriginalFileResolution(inputVideoFile.resolution)
62 profile: CONFIG.TRANSCODING.PROFILE,
63 65
64 resolution, 66 const transcodeOptions: TranscodeVODOptions = {
67 type: transcodeType,
65 68
66 job 69 inputPath: videoInputPath,
67 } 70 outputPath: videoTranscodedPath,
68 71
69 // Could be very long! 72 inputFileMutexReleaser,
70 await transcodeVOD(transcodeOptions)
71 73
72 // Important to do this before getVideoFilename() to take in account the new filename 74 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
73 inputVideoFile.resolution = resolution 75 profile: CONFIG.TRANSCODING.PROFILE,
74 inputVideoFile.extname = newExtname
75 inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname)
76 inputVideoFile.storage = VideoStorage.FILE_SYSTEM
77 76
78 const videoOutputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, inputVideoFile) 77 resolution,
79 78
80 const { videoFile } = await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) 79 job
81 await remove(videoInputPath) 80 }
82 81
83 return { transcodeType, videoFile } 82 // Could be very long!
84 }) 83 await transcodeVOD(transcodeOptions)
84
85 // Important to do this before getVideoFilename() to take in account the new filename
86 inputVideoFile.resolution = resolution
87 inputVideoFile.extname = newExtname
88 inputVideoFile.filename = generateWebTorrentVideoFilename(resolution, newExtname)
89 inputVideoFile.storage = VideoStorage.FILE_SYSTEM
90
91 const { videoFile } = await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile)
92 await remove(videoInputPath)
93
94 return { transcodeType, videoFile }
95 })
96
97 return result
98 } finally {
99 inputFileMutexReleaser()
100 }
85} 101}
86 102
87// Transcode the original video file to a lower resolution compatible with WebTorrent 103// Transcode the original video file to a lower resolution compatible with WebTorrent
88function transcodeNewWebTorrentResolution (options: { 104async function transcodeNewWebTorrentResolution (options: {
89 video: MVideoFullLight 105 video: MVideoFullLight
90 resolution: VideoResolution 106 resolution: VideoResolution
91 job: Job 107 job: Job
@@ -95,53 +111,68 @@ function transcodeNewWebTorrentResolution (options: {
95 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR 111 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
96 const newExtname = '.mp4' 112 const newExtname = '.mp4'
97 113
98 return VideoPathManager.Instance.makeAvailableVideoFile(video.getMaxQualityFile().withVideoOrPlaylist(video), async videoInputPath => { 114 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
99 const newVideoFile = new VideoFileModel({
100 resolution,
101 extname: newExtname,
102 filename: generateWebTorrentVideoFilename(resolution, newExtname),
103 size: 0,
104 videoId: video.id
105 })
106 115
107 const videoOutputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile) 116 try {
108 const videoTranscodedPath = join(transcodeDirectory, newVideoFile.filename) 117 await video.reload()
109 118
110 const transcodeOptions = resolution === VideoResolution.H_NOVIDEO 119 const file = video.getMaxQualityFile().withVideoOrPlaylist(video)
111 ? {
112 type: 'only-audio' as 'only-audio',
113 120
114 inputPath: videoInputPath, 121 const result = await VideoPathManager.Instance.makeAvailableVideoFile(file, async videoInputPath => {
115 outputPath: videoTranscodedPath, 122 const newVideoFile = new VideoFileModel({
123 resolution,
124 extname: newExtname,
125 filename: generateWebTorrentVideoFilename(resolution, newExtname),
126 size: 0,
127 videoId: video.id
128 })
116 129
117 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 130 const videoTranscodedPath = join(transcodeDirectory, newVideoFile.filename)
118 profile: CONFIG.TRANSCODING.PROFILE,
119 131
120 resolution, 132 const transcodeOptions = resolution === VideoResolution.H_NOVIDEO
133 ? {
134 type: 'only-audio' as 'only-audio',
121 135
122 job 136 inputPath: videoInputPath,
123 } 137 outputPath: videoTranscodedPath,
124 : {
125 type: 'video' as 'video',
126 inputPath: videoInputPath,
127 outputPath: videoTranscodedPath,
128 138
129 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 139 inputFileMutexReleaser,
130 profile: CONFIG.TRANSCODING.PROFILE,
131 140
132 resolution, 141 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
142 profile: CONFIG.TRANSCODING.PROFILE,
133 143
134 job 144 resolution,
135 }
136 145
137 await transcodeVOD(transcodeOptions) 146 job
147 }
148 : {
149 type: 'video' as 'video',
150 inputPath: videoInputPath,
151 outputPath: videoTranscodedPath,
138 152
139 return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, videoOutputPath) 153 inputFileMutexReleaser,
140 }) 154
155 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
156 profile: CONFIG.TRANSCODING.PROFILE,
157
158 resolution,
159
160 job
161 }
162
163 await transcodeVOD(transcodeOptions)
164
165 return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, newVideoFile)
166 })
167
168 return result
169 } finally {
170 inputFileMutexReleaser()
171 }
141} 172}
142 173
143// Merge an image with an audio file to create a video 174// Merge an image with an audio file to create a video
144function mergeAudioVideofile (options: { 175async function mergeAudioVideofile (options: {
145 video: MVideoFullLight 176 video: MVideoFullLight
146 resolution: VideoResolution 177 resolution: VideoResolution
147 job: Job 178 job: Job
@@ -151,54 +182,67 @@ function mergeAudioVideofile (options: {
151 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR 182 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
152 const newExtname = '.mp4' 183 const newExtname = '.mp4'
153 184
154 const inputVideoFile = video.getMinQualityFile() 185 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
155 186
156 return VideoPathManager.Instance.makeAvailableVideoFile(inputVideoFile.withVideoOrPlaylist(video), async audioInputPath => { 187 try {
157 const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname) 188 await video.reload()
158 189
159 // If the user updates the video preview during transcoding 190 const inputVideoFile = video.getMinQualityFile()
160 const previewPath = video.getPreview().getPath()
161 const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath))
162 await copyFile(previewPath, tmpPreviewPath)
163 191
164 const transcodeOptions = { 192 const fileWithVideoOrPlaylist = inputVideoFile.withVideoOrPlaylist(video)
165 type: 'merge-audio' as 'merge-audio',
166 193
167 inputPath: tmpPreviewPath, 194 const result = await VideoPathManager.Instance.makeAvailableVideoFile(fileWithVideoOrPlaylist, async audioInputPath => {
168 outputPath: videoTranscodedPath, 195 const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
169 196
170 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), 197 // If the user updates the video preview during transcoding
171 profile: CONFIG.TRANSCODING.PROFILE, 198 const previewPath = video.getPreview().getPath()
199 const tmpPreviewPath = join(CONFIG.STORAGE.TMP_DIR, basename(previewPath))
200 await copyFile(previewPath, tmpPreviewPath)
172 201
173 audioPath: audioInputPath, 202 const transcodeOptions = {
174 resolution, 203 type: 'merge-audio' as 'merge-audio',
175 204
176 job 205 inputPath: tmpPreviewPath,
177 } 206 outputPath: videoTranscodedPath,
178 207
179 try { 208 inputFileMutexReleaser,
180 await transcodeVOD(transcodeOptions)
181 209
182 await remove(audioInputPath) 210 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
183 await remove(tmpPreviewPath) 211 profile: CONFIG.TRANSCODING.PROFILE,
184 } catch (err) {
185 await remove(tmpPreviewPath)
186 throw err
187 }
188 212
189 // Important to do this before getVideoFilename() to take in account the new file extension 213 audioPath: audioInputPath,
190 inputVideoFile.extname = newExtname 214 resolution,
191 inputVideoFile.resolution = resolution
192 inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname)
193 215
194 const videoOutputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, inputVideoFile) 216 job
195 // ffmpeg generated a new video file, so update the video duration 217 }
196 // See https://trac.ffmpeg.org/ticket/5456
197 video.duration = await getVideoStreamDuration(videoTranscodedPath)
198 await video.save()
199 218
200 return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) 219 try {
201 }) 220 await transcodeVOD(transcodeOptions)
221
222 await remove(audioInputPath)
223 await remove(tmpPreviewPath)
224 } catch (err) {
225 await remove(tmpPreviewPath)
226 throw err
227 }
228
229 // Important to do this before getVideoFilename() to take in account the new file extension
230 inputVideoFile.extname = newExtname
231 inputVideoFile.resolution = resolution
232 inputVideoFile.filename = generateWebTorrentVideoFilename(inputVideoFile.resolution, newExtname)
233
234 // ffmpeg generated a new video file, so update the video duration
235 // See https://trac.ffmpeg.org/ticket/5456
236 video.duration = await getVideoStreamDuration(videoTranscodedPath)
237 await video.save()
238
239 return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, inputVideoFile)
240 })
241
242 return result
243 } finally {
244 inputFileMutexReleaser()
245 }
202} 246}
203 247
204// Concat TS segments from a live video to a fragmented mp4 HLS playlist 248// Concat TS segments from a live video to a fragmented mp4 HLS playlist
@@ -207,13 +251,13 @@ async function generateHlsPlaylistResolutionFromTS (options: {
207 concatenatedTsFilePath: string 251 concatenatedTsFilePath: string
208 resolution: VideoResolution 252 resolution: VideoResolution
209 isAAC: boolean 253 isAAC: boolean
254 inputFileMutexReleaser: MutexInterface.Releaser
210}) { 255}) {
211 return generateHlsPlaylistCommon({ 256 return generateHlsPlaylistCommon({
212 video: options.video,
213 resolution: options.resolution,
214 inputPath: options.concatenatedTsFilePath,
215 type: 'hls-from-ts' as 'hls-from-ts', 257 type: 'hls-from-ts' as 'hls-from-ts',
216 isAAC: options.isAAC 258 inputPath: options.concatenatedTsFilePath,
259
260 ...pick(options, [ 'video', 'resolution', 'inputFileMutexReleaser', 'isAAC' ])
217 }) 261 })
218} 262}
219 263
@@ -223,15 +267,14 @@ function generateHlsPlaylistResolution (options: {
223 videoInputPath: string 267 videoInputPath: string
224 resolution: VideoResolution 268 resolution: VideoResolution
225 copyCodecs: boolean 269 copyCodecs: boolean
270 inputFileMutexReleaser: MutexInterface.Releaser
226 job?: Job 271 job?: Job
227}) { 272}) {
228 return generateHlsPlaylistCommon({ 273 return generateHlsPlaylistCommon({
229 video: options.video,
230 resolution: options.resolution,
231 copyCodecs: options.copyCodecs,
232 inputPath: options.videoInputPath,
233 type: 'hls' as 'hls', 274 type: 'hls' as 'hls',
234 job: options.job 275 inputPath: options.videoInputPath,
276
277 ...pick(options, [ 'video', 'resolution', 'copyCodecs', 'inputFileMutexReleaser', 'job' ])
235 }) 278 })
236} 279}
237 280
@@ -251,27 +294,39 @@ async function onWebTorrentVideoFileTranscoding (
251 video: MVideoFullLight, 294 video: MVideoFullLight,
252 videoFile: MVideoFile, 295 videoFile: MVideoFile,
253 transcodingPath: string, 296 transcodingPath: string,
254 outputPath: string 297 newVideoFile: MVideoFile
255) { 298) {
256 const stats = await stat(transcodingPath) 299 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
257 const fps = await getVideoStreamFPS(transcodingPath)
258 const metadata = await buildFileMetadata(transcodingPath)
259 300
260 await move(transcodingPath, outputPath, { overwrite: true }) 301 try {
302 await video.reload()
261 303
262 videoFile.size = stats.size 304 const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile)
263 videoFile.fps = fps
264 videoFile.metadata = metadata
265 305
266 await createTorrentAndSetInfoHash(video, videoFile) 306 const stats = await stat(transcodingPath)
267 307
268 const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution }) 308 const probe = await ffprobePromise(transcodingPath)
269 if (oldFile) await video.removeWebTorrentFile(oldFile) 309 const fps = await getVideoStreamFPS(transcodingPath, probe)
310 const metadata = await buildFileMetadata(transcodingPath, probe)
270 311
271 await VideoFileModel.customUpsert(videoFile, 'video', undefined) 312 await move(transcodingPath, outputPath, { overwrite: true })
272 video.VideoFiles = await video.$get('VideoFiles')
273 313
274 return { video, videoFile } 314 videoFile.size = stats.size
315 videoFile.fps = fps
316 videoFile.metadata = metadata
317
318 await createTorrentAndSetInfoHash(video, videoFile)
319
320 const oldFile = await VideoFileModel.loadWebTorrentFile({ videoId: video.id, fps: videoFile.fps, resolution: videoFile.resolution })
321 if (oldFile) await video.removeWebTorrentFile(oldFile)
322
323 await VideoFileModel.customUpsert(videoFile, 'video', undefined)
324 video.VideoFiles = await video.$get('VideoFiles')
325
326 return { video, videoFile }
327 } finally {
328 mutexReleaser()
329 }
275} 330}
276 331
277async function generateHlsPlaylistCommon (options: { 332async function generateHlsPlaylistCommon (options: {
@@ -279,12 +334,15 @@ async function generateHlsPlaylistCommon (options: {
279 video: MVideo 334 video: MVideo
280 inputPath: string 335 inputPath: string
281 resolution: VideoResolution 336 resolution: VideoResolution
337
338 inputFileMutexReleaser: MutexInterface.Releaser
339
282 copyCodecs?: boolean 340 copyCodecs?: boolean
283 isAAC?: boolean 341 isAAC?: boolean
284 342
285 job?: Job 343 job?: Job
286}) { 344}) {
287 const { type, video, inputPath, resolution, copyCodecs, isAAC, job } = options 345 const { type, video, inputPath, resolution, copyCodecs, isAAC, job, inputFileMutexReleaser } = options
288 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR 346 const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
289 347
290 const videoTranscodedBasePath = join(transcodeDirectory, type) 348 const videoTranscodedBasePath = join(transcodeDirectory, type)
@@ -308,6 +366,8 @@ async function generateHlsPlaylistCommon (options: {
308 366
309 isAAC, 367 isAAC,
310 368
369 inputFileMutexReleaser,
370
311 hlsPlaylist: { 371 hlsPlaylist: {
312 videoFilename 372 videoFilename
313 }, 373 },
@@ -333,47 +393,73 @@ async function generateHlsPlaylistCommon (options: {
333 videoStreamingPlaylistId: playlist.id 393 videoStreamingPlaylistId: playlist.id
334 }) 394 })
335 395
336 const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, newVideoFile) 396 const mutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
337 await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video))
338 397
339 // Move playlist file 398 try {
340 const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, resolutionPlaylistFilename) 399 // VOD transcoding is a long task, refresh video attributes
341 await move(resolutionPlaylistFileTranscodePath, resolutionPlaylistPath, { overwrite: true }) 400 await video.reload()
342 // Move video file
343 await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true })
344 401
345 // Update video duration if it was not set (in case of a live for example) 402 const videoFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(playlist, newVideoFile)
346 if (!video.duration) { 403 await ensureDir(VideoPathManager.Instance.getFSHLSOutputPath(video))
347 video.duration = await getVideoStreamDuration(videoFilePath)
348 await video.save()
349 }
350 404
351 const stats = await stat(videoFilePath) 405 // Move playlist file
406 const resolutionPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, resolutionPlaylistFilename)
407 await move(resolutionPlaylistFileTranscodePath, resolutionPlaylistPath, { overwrite: true })
408 // Move video file
409 await move(join(videoTranscodedBasePath, videoFilename), videoFilePath, { overwrite: true })
352 410
353 newVideoFile.size = stats.size 411 // Update video duration if it was not set (in case of a live for example)
354 newVideoFile.fps = await getVideoStreamFPS(videoFilePath) 412 if (!video.duration) {
355 newVideoFile.metadata = await buildFileMetadata(videoFilePath) 413 video.duration = await getVideoStreamDuration(videoFilePath)
414 await video.save()
415 }
356 416
357 await createTorrentAndSetInfoHash(playlist, newVideoFile) 417 const stats = await stat(videoFilePath)
358 418
359 const oldFile = await VideoFileModel.loadHLSFile({ playlistId: playlist.id, fps: newVideoFile.fps, resolution: newVideoFile.resolution }) 419 newVideoFile.size = stats.size
360 if (oldFile) { 420 newVideoFile.fps = await getVideoStreamFPS(videoFilePath)
361 await video.removeStreamingPlaylistVideoFile(playlist, oldFile) 421 newVideoFile.metadata = await buildFileMetadata(videoFilePath)
362 await oldFile.destroy()
363 }
364 422
365 const savedVideoFile = await VideoFileModel.customUpsert(newVideoFile, 'streaming-playlist', undefined) 423 await createTorrentAndSetInfoHash(playlist, newVideoFile)
366 424
367 await updatePlaylistAfterFileChange(video, playlist) 425 const oldFile = await VideoFileModel.loadHLSFile({
426 playlistId: playlist.id,
427 fps: newVideoFile.fps,
428 resolution: newVideoFile.resolution
429 })
430
431 if (oldFile) {
432 await video.removeStreamingPlaylistVideoFile(playlist, oldFile)
433 await oldFile.destroy()
434 }
435
436 const savedVideoFile = await VideoFileModel.customUpsert(newVideoFile, 'streaming-playlist', undefined)
437
438 await updatePlaylistAfterFileChange(video, playlist)
368 439
369 return { resolutionPlaylistPath, videoFile: savedVideoFile } 440 return { resolutionPlaylistPath, videoFile: savedVideoFile }
441 } finally {
442 mutexReleaser()
443 }
370} 444}
371 445
372function buildOriginalFileResolution (inputResolution: number) { 446function buildOriginalFileResolution (inputResolution: number) {
373 if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) return toEven(inputResolution) 447 if (CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION === true) {
448 return toEven(inputResolution)
449 }
374 450
375 const resolutions = computeResolutionsToTranscode({ input: inputResolution, type: 'vod', includeInput: false, strictLower: false }) 451 const resolutions = computeResolutionsToTranscode({
376 if (resolutions.length === 0) return toEven(inputResolution) 452 input: inputResolution,
453 type: 'vod',
454 includeInput: false,
455 strictLower: false,
456 // We don't really care about the audio resolution in this context
457 hasAudio: true
458 })
459
460 if (resolutions.length === 0) {
461 return toEven(inputResolution)
462 }
377 463
378 return Math.max(...resolutions) 464 return Math.max(...resolutions)
379} 465}
diff --git a/server/lib/uploadx.ts b/server/lib/uploadx.ts
index 9484eff75..58040cb6d 100644
--- a/server/lib/uploadx.ts
+++ b/server/lib/uploadx.ts
@@ -1,6 +1,10 @@
1import express from 'express' 1import express from 'express'
2import { buildLogger } from '@server/helpers/logger'
2import { getResumableUploadPath } from '@server/helpers/upload' 3import { getResumableUploadPath } from '@server/helpers/upload'
3import { Uploadx } from '@uploadx/core' 4import { CONFIG } from '@server/initializers/config'
5import { LogLevel, Uploadx } from '@uploadx/core'
6
7const logger = buildLogger('uploadx')
4 8
5const uploadx = new Uploadx({ 9const uploadx = new Uploadx({
6 directory: getResumableUploadPath(), 10 directory: getResumableUploadPath(),
@@ -10,6 +14,14 @@ const uploadx = new Uploadx({
10 // Could be big with thumbnails/previews 14 // Could be big with thumbnails/previews
11 maxMetadataSize: '10MB', 15 maxMetadataSize: '10MB',
12 16
17 logger: {
18 logLevel: CONFIG.LOG.LEVEL as LogLevel,
19 debug: logger.debug.bind(logger),
20 info: logger.info.bind(logger),
21 warn: logger.warn.bind(logger),
22 error: logger.error.bind(logger)
23 },
24
13 userIdentifier: (_, res: express.Response) => { 25 userIdentifier: (_, res: express.Response) => {
14 if (!res.locals.oauth) return undefined 26 if (!res.locals.oauth) return undefined
15 27
diff --git a/server/lib/video-path-manager.ts b/server/lib/video-path-manager.ts
index c3f55fd95..9953cae5d 100644
--- a/server/lib/video-path-manager.ts
+++ b/server/lib/video-path-manager.ts
@@ -1,29 +1,31 @@
1import { Mutex } from 'async-mutex'
1import { remove } from 'fs-extra' 2import { remove } from 'fs-extra'
2import { extname, join } from 'path' 3import { extname, join } from 'path'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { extractVideo } from '@server/helpers/video' 5import { extractVideo } from '@server/helpers/video'
4import { CONFIG } from '@server/initializers/config' 6import { CONFIG } from '@server/initializers/config'
5import { 7import { DIRECTORIES } from '@server/initializers/constants'
6 MStreamingPlaylistVideo, 8import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoFileStreamingPlaylistVideo, MVideoFileVideo } from '@server/types/models'
7 MVideo,
8 MVideoFile,
9 MVideoFileStreamingPlaylistVideo,
10 MVideoFileVideo,
11 MVideoUUID
12} from '@server/types/models'
13import { buildUUID } from '@shared/extra-utils' 9import { buildUUID } from '@shared/extra-utils'
14import { VideoStorage } from '@shared/models' 10import { VideoStorage } from '@shared/models'
15import { makeHLSFileAvailable, makeWebTorrentFileAvailable } from './object-storage' 11import { makeHLSFileAvailable, makeWebTorrentFileAvailable } from './object-storage'
16import { getHLSDirectory, getHLSRedundancyDirectory, getHlsResolutionPlaylistFilename } from './paths' 12import { getHLSDirectory, getHLSRedundancyDirectory, getHlsResolutionPlaylistFilename } from './paths'
13import { isVideoInPrivateDirectory } from './video-privacy'
17 14
18type MakeAvailableCB <T> = (path: string) => Promise<T> | T 15type MakeAvailableCB <T> = (path: string) => Promise<T> | T
19 16
17const lTags = loggerTagsFactory('video-path-manager')
18
20class VideoPathManager { 19class VideoPathManager {
21 20
22 private static instance: VideoPathManager 21 private static instance: VideoPathManager
23 22
23 // Key is a video UUID
24 private readonly videoFileMutexStore = new Map<string, Mutex>()
25
24 private constructor () {} 26 private constructor () {}
25 27
26 getFSHLSOutputPath (video: MVideoUUID, filename?: string) { 28 getFSHLSOutputPath (video: MVideo, filename?: string) {
27 const base = getHLSDirectory(video) 29 const base = getHLSDirectory(video)
28 if (!filename) return base 30 if (!filename) return base
29 31
@@ -41,13 +43,17 @@ class VideoPathManager {
41 } 43 }
42 44
43 getFSVideoFileOutputPath (videoOrPlaylist: MVideo | MStreamingPlaylistVideo, videoFile: MVideoFile) { 45 getFSVideoFileOutputPath (videoOrPlaylist: MVideo | MStreamingPlaylistVideo, videoFile: MVideoFile) {
44 if (videoFile.isHLS()) { 46 const video = extractVideo(videoOrPlaylist)
45 const video = extractVideo(videoOrPlaylist)
46 47
48 if (videoFile.isHLS()) {
47 return join(getHLSDirectory(video), videoFile.filename) 49 return join(getHLSDirectory(video), videoFile.filename)
48 } 50 }
49 51
50 return join(CONFIG.STORAGE.VIDEOS_DIR, videoFile.filename) 52 if (isVideoInPrivateDirectory(video.privacy)) {
53 return join(DIRECTORIES.VIDEOS.PRIVATE, videoFile.filename)
54 }
55
56 return join(DIRECTORIES.VIDEOS.PUBLIC, videoFile.filename)
51 } 57 }
52 58
53 async makeAvailableVideoFile <T> (videoFile: MVideoFileVideo | MVideoFileStreamingPlaylistVideo, cb: MakeAvailableCB<T>) { 59 async makeAvailableVideoFile <T> (videoFile: MVideoFileVideo | MVideoFileStreamingPlaylistVideo, cb: MakeAvailableCB<T>) {
@@ -113,6 +119,27 @@ class VideoPathManager {
113 ) 119 )
114 } 120 }
115 121
122 async lockFiles (videoUUID: string) {
123 if (!this.videoFileMutexStore.has(videoUUID)) {
124 this.videoFileMutexStore.set(videoUUID, new Mutex())
125 }
126
127 const mutex = this.videoFileMutexStore.get(videoUUID)
128 const releaser = await mutex.acquire()
129
130 logger.debug('Locked files of %s.', videoUUID, lTags(videoUUID))
131
132 return releaser
133 }
134
135 unlockFiles (videoUUID: string) {
136 const mutex = this.videoFileMutexStore.get(videoUUID)
137
138 mutex.release()
139
140 logger.debug('Released lockfiles of %s.', videoUUID, lTags(videoUUID))
141 }
142
116 private async makeAvailableFactory <T> (method: () => Promise<string> | string, clean: boolean, cb: MakeAvailableCB<T>) { 143 private async makeAvailableFactory <T> (method: () => Promise<string> | string, clean: boolean, cb: MakeAvailableCB<T>) {
117 let result: T 144 let result: T
118 145
diff --git a/server/lib/video-import.ts b/server/lib/video-pre-import.ts
index 796079875..796079875 100644
--- a/server/lib/video-import.ts
+++ b/server/lib/video-pre-import.ts
diff --git a/server/lib/video-privacy.ts b/server/lib/video-privacy.ts
new file mode 100644
index 000000000..41f9d62b3
--- /dev/null
+++ b/server/lib/video-privacy.ts
@@ -0,0 +1,127 @@
1import { move } from 'fs-extra'
2import { join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { DIRECTORIES } from '@server/initializers/constants'
5import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
6import { VideoPrivacy, VideoStorage } from '@shared/models'
7import { updateHLSFilesACL, updateWebTorrentFileACL } from './object-storage'
8
9function setVideoPrivacy (video: MVideo, newPrivacy: VideoPrivacy) {
10 if (video.privacy === VideoPrivacy.PRIVATE && newPrivacy !== VideoPrivacy.PRIVATE) {
11 video.publishedAt = new Date()
12 }
13
14 video.privacy = newPrivacy
15}
16
17function isVideoInPrivateDirectory (privacy: VideoPrivacy) {
18 return privacy === VideoPrivacy.PRIVATE || privacy === VideoPrivacy.INTERNAL
19}
20
21function isVideoInPublicDirectory (privacy: VideoPrivacy) {
22 return !isVideoInPrivateDirectory(privacy)
23}
24
25async function moveFilesIfPrivacyChanged (video: MVideoFullLight, oldPrivacy: VideoPrivacy) {
26 // Now public, previously private
27 if (isVideoInPublicDirectory(video.privacy) && isVideoInPrivateDirectory(oldPrivacy)) {
28 await moveFiles({ type: 'private-to-public', video })
29
30 return true
31 }
32
33 // Now private, previously public
34 if (isVideoInPrivateDirectory(video.privacy) && isVideoInPublicDirectory(oldPrivacy)) {
35 await moveFiles({ type: 'public-to-private', video })
36
37 return true
38 }
39
40 return false
41}
42
43export {
44 setVideoPrivacy,
45
46 isVideoInPrivateDirectory,
47 isVideoInPublicDirectory,
48
49 moveFilesIfPrivacyChanged
50}
51
52// ---------------------------------------------------------------------------
53
54type MoveType = 'private-to-public' | 'public-to-private'
55
56async function moveFiles (options: {
57 type: MoveType
58 video: MVideoFullLight
59}) {
60 const { type, video } = options
61
62 for (const file of video.VideoFiles) {
63 if (file.storage === VideoStorage.FILE_SYSTEM) {
64 await moveWebTorrentFileOnFS(type, video, file)
65 } else {
66 await updateWebTorrentFileACL(video, file)
67 }
68 }
69
70 const hls = video.getHLSPlaylist()
71
72 if (hls) {
73 if (hls.storage === VideoStorage.FILE_SYSTEM) {
74 await moveHLSFilesOnFS(type, video)
75 } else {
76 await updateHLSFilesACL(hls)
77 }
78 }
79}
80
81async function moveWebTorrentFileOnFS (type: MoveType, video: MVideo, file: MVideoFile) {
82 const directories = getWebTorrentDirectories(type)
83
84 const source = join(directories.old, file.filename)
85 const destination = join(directories.new, file.filename)
86
87 try {
88 logger.info('Moving WebTorrent files of %s after privacy change (%s -> %s).', video.uuid, source, destination)
89
90 await move(source, destination)
91 } catch (err) {
92 logger.error('Cannot move webtorrent file %s to %s after privacy change', source, destination, { err })
93 }
94}
95
96function getWebTorrentDirectories (moveType: MoveType) {
97 if (moveType === 'private-to-public') {
98 return { old: DIRECTORIES.VIDEOS.PRIVATE, new: DIRECTORIES.VIDEOS.PUBLIC }
99 }
100
101 return { old: DIRECTORIES.VIDEOS.PUBLIC, new: DIRECTORIES.VIDEOS.PRIVATE }
102}
103
104// ---------------------------------------------------------------------------
105
106async function moveHLSFilesOnFS (type: MoveType, video: MVideo) {
107 const directories = getHLSDirectories(type)
108
109 const source = join(directories.old, video.uuid)
110 const destination = join(directories.new, video.uuid)
111
112 try {
113 logger.info('Moving HLS files of %s after privacy change (%s -> %s).', video.uuid, source, destination)
114
115 await move(source, destination)
116 } catch (err) {
117 logger.error('Cannot move HLS file %s to %s after privacy change', source, destination, { err })
118 }
119}
120
121function getHLSDirectories (moveType: MoveType) {
122 if (moveType === 'private-to-public') {
123 return { old: DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE, new: DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC }
124 }
125
126 return { old: DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC, new: DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE }
127}
diff --git a/server/lib/video-tokens-manager.ts b/server/lib/video-tokens-manager.ts
new file mode 100644
index 000000000..c43085d16
--- /dev/null
+++ b/server/lib/video-tokens-manager.ts
@@ -0,0 +1,49 @@
1import LRUCache from 'lru-cache'
2import { LRU_CACHE } from '@server/initializers/constants'
3import { buildUUID } from '@shared/extra-utils'
4
5// ---------------------------------------------------------------------------
6// Create temporary tokens that can be used as URL query parameters to access video static files
7// ---------------------------------------------------------------------------
8
9class VideoTokensManager {
10
11 private static instance: VideoTokensManager
12
13 private readonly lruCache = new LRUCache<string, string>({
14 max: LRU_CACHE.VIDEO_TOKENS.MAX_SIZE,
15 ttl: LRU_CACHE.VIDEO_TOKENS.TTL
16 })
17
18 private constructor () {}
19
20 create (videoUUID: string) {
21 const token = buildUUID()
22
23 const expires = new Date(new Date().getTime() + LRU_CACHE.VIDEO_TOKENS.TTL)
24
25 this.lruCache.set(token, videoUUID)
26
27 return { token, expires }
28 }
29
30 hasToken (options: {
31 token: string
32 videoUUID: string
33 }) {
34 const value = this.lruCache.get(options.token)
35 if (!value) return false
36
37 return value === options.videoUUID
38 }
39
40 static get Instance () {
41 return this.instance || (this.instance = new this())
42 }
43}
44
45// ---------------------------------------------------------------------------
46
47export {
48 VideoTokensManager
49}
diff --git a/server/lib/video.ts b/server/lib/video.ts
index 6c4f3ce7b..aacc41a7a 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -7,10 +7,11 @@ import { TagModel } from '@server/models/video/tag'
7import { VideoModel } from '@server/models/video/video' 7import { VideoModel } from '@server/models/video/video'
8import { VideoJobInfoModel } from '@server/models/video/video-job-info' 8import { VideoJobInfoModel } from '@server/models/video/video-job-info'
9import { FilteredModelAttributes } from '@server/types' 9import { FilteredModelAttributes } from '@server/types'
10import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' 10import { MThumbnail, MUserId, MVideoFile, MVideoFullLight, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 11import { ManageVideoTorrentPayload, ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
12import { CreateJobOptions } from './job-queue/job-queue' 12import { CreateJobArgument, CreateJobOptions, JobQueue } from './job-queue/job-queue'
13import { updateVideoMiniatureFromExisting } from './thumbnail' 13import { updateVideoMiniatureFromExisting } from './thumbnail'
14import { moveFilesIfPrivacyChanged } from './video-privacy'
14 15
15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { 16function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
16 return { 17 return {
@@ -177,6 +178,59 @@ const getCachedVideoDuration = memoizee(getVideoDuration, {
177 178
178// --------------------------------------------------------------------------- 179// ---------------------------------------------------------------------------
179 180
181async function addVideoJobsAfterUpdate (options: {
182 video: MVideoFullLight
183 isNewVideo: boolean
184
185 nameChanged: boolean
186 oldPrivacy: VideoPrivacy
187}) {
188 const { video, nameChanged, oldPrivacy, isNewVideo } = options
189 const jobs: CreateJobArgument[] = []
190
191 const filePathChanged = await moveFilesIfPrivacyChanged(video, oldPrivacy)
192
193 if (!video.isLive && (nameChanged || filePathChanged)) {
194 for (const file of (video.VideoFiles || [])) {
195 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
196
197 jobs.push({ type: 'manage-video-torrent', payload })
198 }
199
200 const hls = video.getHLSPlaylist()
201
202 for (const file of (hls?.VideoFiles || [])) {
203 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
204
205 jobs.push({ type: 'manage-video-torrent', payload })
206 }
207 }
208
209 jobs.push({
210 type: 'federate-video',
211 payload: {
212 videoUUID: video.uuid,
213 isNewVideo
214 }
215 })
216
217 const wasConfidentialVideo = new Set([ VideoPrivacy.PRIVATE, VideoPrivacy.UNLISTED, VideoPrivacy.INTERNAL ]).has(oldPrivacy)
218
219 if (wasConfidentialVideo) {
220 jobs.push({
221 type: 'notify',
222 payload: {
223 action: 'new-video',
224 videoUUID: video.uuid
225 }
226 })
227 }
228
229 return JobQueue.Instance.createSequentialJobFlow(...jobs)
230}
231
232// ---------------------------------------------------------------------------
233
180export { 234export {
181 buildLocalVideoFromReq, 235 buildLocalVideoFromReq,
182 buildVideoThumbnailsFromReq, 236 buildVideoThumbnailsFromReq,
@@ -185,5 +239,6 @@ export {
185 buildTranscodingJob, 239 buildTranscodingJob,
186 buildMoveToObjectStorageJob, 240 buildMoveToObjectStorageJob,
187 getTranscodingJobPriority, 241 getTranscodingJobPriority,
242 addVideoJobsAfterUpdate,
188 getCachedVideoDuration 243 getCachedVideoDuration
189} 244}