aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-10-04 10:03:17 +0200
committerChocobozzz <me@florianbigard.com>2022-10-04 10:03:17 +0200
commitcfd57d2ca0bb058087f7dc90fcc3e8442b0288e1 (patch)
treedc899a1504ecac588e5580553e02571e0f5d7e4b /server/lib
parent9c0cdc5047918b959ebd5e075ddad81eb7fb93f0 (diff)
downloadPeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.gz
PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.zst
PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.zip
Live supports object storage
* Sync live files (segments, master playlist, resolution playlist, segment sha file) into object storage * Automatically delete them when the live ends * Segment sha file is now a file on disk, and not stored in memory anymore
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/hls.ts6
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts10
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts23
-rw-r--r--server/lib/live/live-manager.ts12
-rw-r--r--server/lib/live/live-segment-sha-store.ts75
-rw-r--r--server/lib/live/live-utils.ts67
-rw-r--r--server/lib/live/shared/muxing-session.ts106
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts25
-rw-r--r--server/lib/object-storage/videos.ts37
9 files changed, 263 insertions, 98 deletions
diff --git a/server/lib/hls.ts b/server/lib/hls.ts
index a0a5afc0f..a41f1ae48 100644
--- a/server/lib/hls.ts
+++ b/server/lib/hls.ts
@@ -15,7 +15,7 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, REQUEST_TIMEOUTS } from '../initializers
15import { sequelizeTypescript } from '../initializers/database' 15import { sequelizeTypescript } from '../initializers/database'
16import { VideoFileModel } from '../models/video/video-file' 16import { VideoFileModel } from '../models/video/video-file'
17import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist' 17import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist'
18import { storeHLSFile } from './object-storage' 18import { storeHLSFileFromFilename } from './object-storage'
19import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths' 19import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths'
20import { VideoPathManager } from './video-path-manager' 20import { VideoPathManager } from './video-path-manager'
21 21
@@ -95,7 +95,7 @@ function updateMasterHLSPlaylist (video: MVideo, playlistArg: MStreamingPlaylist
95 await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n') 95 await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n')
96 96
97 if (playlist.storage === VideoStorage.OBJECT_STORAGE) { 97 if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
98 playlist.playlistUrl = await storeHLSFile(playlist, playlist.playlistFilename) 98 playlist.playlistUrl = await storeHLSFileFromFilename(playlist, playlist.playlistFilename)
99 await remove(masterPlaylistPath) 99 await remove(masterPlaylistPath)
100 } 100 }
101 101
@@ -146,7 +146,7 @@ function updateSha256VODSegments (video: MVideo, playlistArg: MStreamingPlaylist
146 await outputJSON(outputPath, json) 146 await outputJSON(outputPath, json)
147 147
148 if (playlist.storage === VideoStorage.OBJECT_STORAGE) { 148 if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
149 playlist.segmentsSha256Url = await storeHLSFile(playlist, playlist.segmentsSha256Filename) 149 playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlist, playlist.segmentsSha256Filename)
150 await remove(outputPath) 150 await remove(outputPath)
151 } 151 }
152 152
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 25bdebeea..28c3d325d 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -5,7 +5,7 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { updateTorrentMetadata } from '@server/helpers/webtorrent' 5import { updateTorrentMetadata } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config' 6import { CONFIG } from '@server/initializers/config'
7import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' 7import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
8import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' 8import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
9import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' 9import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' 10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
11import { VideoModel } from '@server/models/video/video' 11import { VideoModel } from '@server/models/video/video'
@@ -88,10 +88,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
88 88
89 // Resolution playlist 89 // Resolution playlist
90 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) 90 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
91 await storeHLSFile(playlistWithVideo, playlistFilename) 91 await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
92 92
93 // Resolution fragmented file 93 // Resolution fragmented file
94 const fileUrl = await storeHLSFile(playlistWithVideo, file.filename) 94 const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
95 95
96 const oldPath = join(getHLSDirectory(video), file.filename) 96 const oldPath = join(getHLSDirectory(video), file.filename)
97 97
@@ -113,9 +113,9 @@ async function doAfterLastJob (options: {
113 const playlistWithVideo = playlist.withVideo(video) 113 const playlistWithVideo = playlist.withVideo(video)
114 114
115 // Master playlist 115 // Master playlist
116 playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename) 116 playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
117 // Sha256 segments file 117 // Sha256 segments file
118 playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename) 118 playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
119 119
120 playlist.storage = VideoStorage.OBJECT_STORAGE 120 playlist.storage = VideoStorage.OBJECT_STORAGE
121 121
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 8a3ee09a2..abfaf1cd7 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -4,7 +4,7 @@ import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' 4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' 7import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' 8import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
9import { generateVideoMiniature } from '@server/lib/thumbnail' 9import { generateVideoMiniature } from '@server/lib/thumbnail'
10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' 10import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
@@ -141,23 +141,22 @@ async function replaceLiveByReplay (options: {
141}) { 141}) {
142 const { video, liveSession, live, permanentLive, replayDirectory } = options 142 const { video, liveSession, live, permanentLive, replayDirectory } = options
143 143
144 await cleanupTMPLiveFiles(video) 144 const videoWithFiles = await VideoModel.loadFull(video.id)
145 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
146
147 await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist)
145 148
146 await live.destroy() 149 await live.destroy()
147 150
148 video.isLive = false 151 videoWithFiles.isLive = false
149 video.waitTranscoding = true 152 videoWithFiles.waitTranscoding = true
150 video.state = VideoState.TO_TRANSCODE 153 videoWithFiles.state = VideoState.TO_TRANSCODE
151 154
152 await video.save() 155 await videoWithFiles.save()
153 156
154 liveSession.replayVideoId = video.id 157 liveSession.replayVideoId = videoWithFiles.id
155 await liveSession.save() 158 await liveSession.save()
156 159
157 // Remove old HLS playlist video files
158 const videoWithFiles = await VideoModel.loadFull(video.id)
159
160 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
161 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) 160 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
162 161
163 // Reset playlist 162 // Reset playlist
@@ -234,7 +233,7 @@ async function cleanupLiveAndFederate (options: {
234 233
235 if (streamingPlaylist) { 234 if (streamingPlaylist) {
236 if (permanentLive) { 235 if (permanentLive) {
237 await cleanupPermanentLive(video, streamingPlaylist) 236 await cleanupAndDestroyPermanentLive(video, streamingPlaylist)
238 } else { 237 } else {
239 await cleanupUnsavedNormalLive(video, streamingPlaylist) 238 await cleanupUnsavedNormalLive(video, streamingPlaylist)
240 } 239 }
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 16715862b..9470b530b 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' 22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
23import { pick, wait } from '@shared/core-utils' 23import { pick, wait } from '@shared/core-utils'
24import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' 24import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models'
25import { federateVideoIfNeeded } from '../activitypub/videos' 25import { federateVideoIfNeeded } from '../activitypub/videos'
26import { JobQueue } from '../job-queue' 26import { JobQueue } from '../job-queue'
27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' 27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
28import { PeerTubeSocket } from '../peertube-socket' 28import { PeerTubeSocket } from '../peertube-socket'
29import { Hooks } from '../plugins/hooks' 29import { Hooks } from '../plugins/hooks'
30import { LiveQuotaStore } from './live-quota-store' 30import { LiveQuotaStore } from './live-quota-store'
31import { cleanupPermanentLive } from './live-utils' 31import { cleanupAndDestroyPermanentLive } from './live-utils'
32import { MuxingSession } from './shared' 32import { MuxingSession } from './shared'
33 33
34const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') 34const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
@@ -224,7 +224,7 @@ class LiveManager {
224 if (oldStreamingPlaylist) { 224 if (oldStreamingPlaylist) {
225 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) 225 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
226 226
227 await cleanupPermanentLive(video, oldStreamingPlaylist) 227 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
228 } 228 }
229 229
230 this.videoSessions.set(video.id, sessionId) 230 this.videoSessions.set(video.id, sessionId)
@@ -301,7 +301,7 @@ class LiveManager {
301 ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) 301 ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
302 }) 302 })
303 303
304 muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) 304 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
305 305
306 muxingSession.on('bad-socket-health', ({ videoId }) => { 306 muxingSession.on('bad-socket-health', ({ videoId }) => {
307 logger.error( 307 logger.error(
@@ -485,6 +485,10 @@ class LiveManager {
485 485
486 playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) 486 playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions)
487 487
488 playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
489 ? VideoStorage.OBJECT_STORAGE
490 : VideoStorage.FILE_SYSTEM
491
488 return playlist.save() 492 return playlist.save()
489 } 493 }
490 494
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts
index 4af6f3ebf..faf03dccf 100644
--- a/server/lib/live/live-segment-sha-store.ts
+++ b/server/lib/live/live-segment-sha-store.ts
@@ -1,62 +1,73 @@
1import { writeJson } from 'fs-extra'
1import { basename } from 'path' 2import { basename } from 'path'
3import { mapToJSON } from '@server/helpers/core-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { MStreamingPlaylistVideo } from '@server/types/models'
3import { buildSha256Segment } from '../hls' 6import { buildSha256Segment } from '../hls'
7import { storeHLSFileFromPath } from '../object-storage'
4 8
5const lTags = loggerTagsFactory('live') 9const lTags = loggerTagsFactory('live')
6 10
7class LiveSegmentShaStore { 11class LiveSegmentShaStore {
8 12
9 private static instance: LiveSegmentShaStore 13 private readonly segmentsSha256 = new Map<string, string>()
10 14
11 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 15 private readonly videoUUID: string
12 16 private readonly sha256Path: string
13 private constructor () { 17 private readonly streamingPlaylist: MStreamingPlaylistVideo
18 private readonly sendToObjectStorage: boolean
19
20 constructor (options: {
21 videoUUID: string
22 sha256Path: string
23 streamingPlaylist: MStreamingPlaylistVideo
24 sendToObjectStorage: boolean
25 }) {
26 this.videoUUID = options.videoUUID
27 this.sha256Path = options.sha256Path
28 this.streamingPlaylist = options.streamingPlaylist
29 this.sendToObjectStorage = options.sendToObjectStorage
14 } 30 }
15 31
16 getSegmentsSha256 (videoUUID: string) { 32 async addSegmentSha (segmentPath: string) {
17 return this.segmentsSha256.get(videoUUID) 33 logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
18 }
19
20 async addSegmentSha (videoUUID: string, segmentPath: string) {
21 const segmentName = basename(segmentPath)
22 logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
23 34
24 const shaResult = await buildSha256Segment(segmentPath) 35 const shaResult = await buildSha256Segment(segmentPath)
25 36
26 if (!this.segmentsSha256.has(videoUUID)) { 37 const segmentName = basename(segmentPath)
27 this.segmentsSha256.set(videoUUID, new Map()) 38 this.segmentsSha256.set(segmentName, shaResult)
28 }
29 39
30 const filesMap = this.segmentsSha256.get(videoUUID) 40 await this.writeToDisk()
31 filesMap.set(segmentName, shaResult)
32 } 41 }
33 42
34 removeSegmentSha (videoUUID: string, segmentPath: string) { 43 async removeSegmentSha (segmentPath: string) {
35 const segmentName = basename(segmentPath) 44 const segmentName = basename(segmentPath)
36 45
37 logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) 46 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
38 47
39 const filesMap = this.segmentsSha256.get(videoUUID) 48 if (!this.segmentsSha256.has(segmentName)) {
40 if (!filesMap) { 49 logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID))
41 logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
42 return 50 return
43 } 51 }
44 52
45 if (!filesMap.has(segmentName)) { 53 this.segmentsSha256.delete(segmentName)
46 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
47 return
48 }
49 54
50 filesMap.delete(segmentName) 55 await this.writeToDisk()
51 } 56 }
52 57
53 cleanupShaSegments (videoUUID: string) { 58 private async writeToDisk () {
54 this.segmentsSha256.delete(videoUUID) 59 await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256))
55 }
56 60
57 static get Instance () { 61 if (this.sendToObjectStorage) {
58 return this.instance || (this.instance = new this()) 62 const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
63
64 if (this.streamingPlaylist.segmentsSha256Url !== url) {
65 this.streamingPlaylist.segmentsSha256Url = url
66 await this.streamingPlaylist.save()
67 }
68 }
59 } 69 }
70
60} 71}
61 72
62export { 73export {
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index bba876642..d2b8e3a55 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -1,9 +1,10 @@
1import { pathExists, readdir, remove } from 'fs-extra' 1import { pathExists, readdir, remove } from 'fs-extra'
2import { basename, join } from 'path' 2import { basename, join } from 'path'
3import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
4import { MStreamingPlaylist, MVideo } from '@server/types/models' 4import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
5import { VideoStorage } from '@shared/models'
6import { listHLSFileKeysOf, removeHLSFileObjectStorage, removeHLSObjectStorage } from '../object-storage'
5import { getLiveDirectory } from '../paths' 7import { getLiveDirectory } from '../paths'
6import { LiveSegmentShaStore } from './live-segment-sha-store'
7 8
8function buildConcatenatedName (segmentOrPlaylistPath: string) { 9function buildConcatenatedName (segmentOrPlaylistPath: string) {
9 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) 10 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
@@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
11 return 'concat-' + num[1] + '.ts' 12 return 'concat-' + num[1] + '.ts'
12} 13}
13 14
14async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 15async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
15 await cleanupTMPLiveFiles(video) 16 await cleanupTMPLiveFiles(video, streamingPlaylist)
16 17
17 await streamingPlaylist.destroy() 18 await streamingPlaylist.destroy()
18} 19}
@@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin
20async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 21async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
21 const hlsDirectory = getLiveDirectory(video) 22 const hlsDirectory = getLiveDirectory(video)
22 23
24 // We uploaded files to object storage too, remove them
25 if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
26 await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
27 }
28
23 await remove(hlsDirectory) 29 await remove(hlsDirectory)
24 30
25 await streamingPlaylist.destroy() 31 await streamingPlaylist.destroy()
32}
26 33
27 LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) 34async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
35 await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
36
37 await cleanupTMPLiveFilesFromFilesystem(video)
28} 38}
29 39
30async function cleanupTMPLiveFiles (video: MVideo) { 40export {
31 const hlsDirectory = getLiveDirectory(video) 41 cleanupAndDestroyPermanentLive,
42 cleanupUnsavedNormalLive,
43 cleanupTMPLiveFiles,
44 buildConcatenatedName
45}
46
47// ---------------------------------------------------------------------------
32 48
33 LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) 49function isTMPLiveFile (name: string) {
50 return name.endsWith('.ts') ||
51 name.endsWith('.m3u8') ||
52 name.endsWith('.json') ||
53 name.endsWith('.mpd') ||
54 name.endsWith('.m4s') ||
55 name.endsWith('.tmp')
56}
57
58async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
59 const hlsDirectory = getLiveDirectory(video)
34 60
35 if (!await pathExists(hlsDirectory)) return 61 if (!await pathExists(hlsDirectory)) return
36 62
37 logger.info('Cleanup TMP live files of %s.', hlsDirectory) 63 logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
38 64
39 const files = await readdir(hlsDirectory) 65 const files = await readdir(hlsDirectory)
40 66
41 for (const filename of files) { 67 for (const filename of files) {
42 if ( 68 if (isTMPLiveFile(filename)) {
43 filename.endsWith('.ts') ||
44 filename.endsWith('.m3u8') ||
45 filename.endsWith('.mpd') ||
46 filename.endsWith('.m4s') ||
47 filename.endsWith('.tmp')
48 ) {
49 const p = join(hlsDirectory, filename) 69 const p = join(hlsDirectory, filename)
50 70
51 remove(p) 71 remove(p)
@@ -54,9 +74,14 @@ async function cleanupTMPLiveFiles (video: MVideo) {
54 } 74 }
55} 75}
56 76
57export { 77async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
58 cleanupPermanentLive, 78 if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
59 cleanupUnsavedNormalLive, 79
60 cleanupTMPLiveFiles, 80 const keys = await listHLSFileKeysOf(streamingPlaylist)
61 buildConcatenatedName 81
82 for (const key of keys) {
83 if (isTMPLiveFile(key)) {
84 await removeHLSFileObjectStorage(streamingPlaylist, key)
85 }
86 }
62} 87}
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 505717dce..4c27d5dd8 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 10import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' 11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
12import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file' 13import { VideoFileModel } from '@server/models/video/video-file'
13import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 14import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
15import { VideoStorage } from '@shared/models'
14import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' 16import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
15import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' 17import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
16import { isAbleToUploadVideo } from '../../user' 18import { isAbleToUploadVideo } from '../../user'
@@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils'
21import memoizee = require('memoizee') 23import memoizee = require('memoizee')
22 24
23interface MuxingSessionEvents { 25interface MuxingSessionEvents {
24 'master-playlist-created': (options: { videoId: number }) => void 26 'live-ready': (options: { videoId: number }) => void
25 27
26 'bad-socket-health': (options: { videoId: number }) => void 28 'bad-socket-health': (options: { videoId: number }) => void
27 'duration-exceeded': (options: { videoId: number }) => void 29 'duration-exceeded': (options: { videoId: number }) => void
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter {
68 private readonly outDirectory: string 70 private readonly outDirectory: string
69 private readonly replayDirectory: string 71 private readonly replayDirectory: string
70 72
73 private readonly liveSegmentShaStore: LiveSegmentShaStore
74
71 private readonly lTags: LoggerTagsFn 75 private readonly lTags: LoggerTagsFn
72 76
73 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 77 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
74 78
75 private tsWatcher: FSWatcher 79 private tsWatcher: FSWatcher
76 private masterWatcher: FSWatcher 80 private masterWatcher: FSWatcher
81 private m3u8Watcher: FSWatcher
82
83 private masterPlaylistCreated = false
84 private liveReady = false
77 85
78 private aborted = false 86 private aborted = false
79 87
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter {
123 this.outDirectory = getLiveDirectory(this.videoLive.Video) 131 this.outDirectory = getLiveDirectory(this.videoLive.Video)
124 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) 132 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
125 133
134 this.liveSegmentShaStore = new LiveSegmentShaStore({
135 videoUUID: this.videoLive.Video.uuid,
136 sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
137 streamingPlaylist: this.streamingPlaylist,
138 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
139 })
140
126 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) 141 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
127 } 142 }
128 143
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter {
159 174
160 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) 175 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
161 176
162 this.watchTSFiles()
163 this.watchMasterFile() 177 this.watchMasterFile()
178 this.watchTSFiles()
179 this.watchM3U8File()
164 180
165 let ffmpegShellCommand: string 181 let ffmpegShellCommand: string
166 this.ffmpegCommand.on('start', cmdline => { 182 this.ffmpegCommand.on('start', cmdline => {
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter {
219 setTimeout(() => { 235 setTimeout(() => {
220 // Wait latest segments generation, and close watchers 236 // Wait latest segments generation, and close watchers
221 237
222 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) 238 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
223 .then(() => { 239 .then(() => {
224 // Process remaining segments hash 240 // Process remaining segments hash
225 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { 241 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
@@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter {
240 private watchMasterFile () { 256 private watchMasterFile () {
241 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 257 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
242 258
243 this.masterWatcher.on('add', () => { 259 this.masterWatcher.on('add', async () => {
244 this.emit('master-playlist-created', { videoId: this.videoId }) 260 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
261 try {
262 const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
263
264 this.streamingPlaylist.playlistUrl = url
265 await this.streamingPlaylist.save()
266 } catch (err) {
267 logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
268 }
269 }
270
271 this.masterPlaylistCreated = true
245 272
246 this.masterWatcher.close() 273 this.masterWatcher.close()
247 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 274 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
248 }) 275 })
249 } 276 }
250 277
278 private watchM3U8File () {
279 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
280
281 const onChangeOrAdd = async (m3u8Path: string) => {
282 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
283
284 try {
285 await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
286 } catch (err) {
287 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
288 }
289 }
290
291 this.m3u8Watcher.on('change', onChangeOrAdd)
292 }
293
251 private watchTSFiles () { 294 private watchTSFiles () {
252 const startStreamDateTime = new Date().getTime() 295 const startStreamDateTime = new Date().getTime()
253 296
@@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter {
282 } 325 }
283 } 326 }
284 327
285 const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) 328 const deleteHandler = async (segmentPath: string) => {
329 try {
330 await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
331 } catch (err) {
332 logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
333 }
334
335 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
336 try {
337 await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
338 } catch (err) {
339 logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
340 }
341 }
342 }
286 343
287 this.tsWatcher.on('add', p => addHandler(p)) 344 this.tsWatcher.on('add', p => addHandler(p))
288 this.tsWatcher.on('unlink', p => deleteHandler(p)) 345 this.tsWatcher.on('unlink', p => deleteHandler(p))
@@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter {
315 extname: '.ts', 372 extname: '.ts',
316 infoHash: null, 373 infoHash: null,
317 fps: this.fps, 374 fps: this.fps,
375 storage: this.streamingPlaylist.storage,
318 videoStreamingPlaylistId: this.streamingPlaylist.id 376 videoStreamingPlaylistId: this.streamingPlaylist.id
319 }) 377 })
320 378
@@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter {
343 } 401 }
344 402
345 private processSegments (segmentPaths: string[]) { 403 private processSegments (segmentPaths: string[]) {
346 mapSeries(segmentPaths, async previousSegment => { 404 mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
347 // Add sha hash of previous segments, because ffmpeg should have finished generating them 405 .catch(err => {
348 await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) 406 if (this.aborted) return
407
408 logger.error('Cannot process segments', { err, ...this.lTags() })
409 })
410 }
349 411
350 if (this.saveReplay) { 412 private async processSegment (segmentPath: string) {
351 await this.addSegmentToReplay(previousSegment) 413 // Add sha hash of previous segments, because ffmpeg should have finished generating them
414 await this.liveSegmentShaStore.addSegmentSha(segmentPath)
415
416 if (this.saveReplay) {
417 await this.addSegmentToReplay(segmentPath)
418 }
419
420 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
421 try {
422 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
423 } catch (err) {
424 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
352 } 425 }
353 }).catch(err => { 426 }
354 if (this.aborted) return
355 427
356 logger.error('Cannot process segments', { err, ...this.lTags() }) 428 // Master playlist and segment JSON file are created, live is ready
357 }) 429 if (this.masterPlaylistCreated && !this.liveReady) {
430 this.liveReady = true
431
432 this.emit('live-ready', { videoId: this.videoId })
433 }
358 } 434 }
359 435
360 private hasClientSocketInBadHealth (sessionId: string) { 436 private hasClientSocketInBadHealth (sessionId: string) {
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index 16161362c..c131977e8 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -22,6 +22,24 @@ type BucketInfo = {
22 PREFIX?: string 22 PREFIX?: string
23} 23}
24 24
25async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
26 const s3Client = getClient()
27
28 const commandPrefix = bucketInfo.PREFIX + prefix
29 const listCommand = new ListObjectsV2Command({
30 Bucket: bucketInfo.BUCKET_NAME,
31 Prefix: commandPrefix
32 })
33
34 const listedObjects = await s3Client.send(listCommand)
35
36 if (isArray(listedObjects.Contents) !== true) return []
37
38 return listedObjects.Contents.map(c => c.Key)
39}
40
41// ---------------------------------------------------------------------------
42
25async function storeObject (options: { 43async function storeObject (options: {
26 inputPath: string 44 inputPath: string
27 objectStorageKey: string 45 objectStorageKey: string
@@ -36,6 +54,8 @@ async function storeObject (options: {
36 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) 54 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
37} 55}
38 56
57// ---------------------------------------------------------------------------
58
39async function removeObject (filename: string, bucketInfo: BucketInfo) { 59async function removeObject (filename: string, bucketInfo: BucketInfo) {
40 const command = new DeleteObjectCommand({ 60 const command = new DeleteObjectCommand({
41 Bucket: bucketInfo.BUCKET_NAME, 61 Bucket: bucketInfo.BUCKET_NAME,
@@ -89,6 +109,8 @@ async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
89 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) 109 if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
90} 110}
91 111
112// ---------------------------------------------------------------------------
113
92async function makeAvailable (options: { 114async function makeAvailable (options: {
93 key: string 115 key: string
94 destination: string 116 destination: string
@@ -122,7 +144,8 @@ export {
122 storeObject, 144 storeObject,
123 removeObject, 145 removeObject,
124 removePrefix, 146 removePrefix,
125 makeAvailable 147 makeAvailable,
148 listKeysOfPrefix
126} 149}
127 150
128// --------------------------------------------------------------------------- 151// ---------------------------------------------------------------------------
diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts
index 66e738200..62aae248b 100644
--- a/server/lib/object-storage/videos.ts
+++ b/server/lib/object-storage/videos.ts
@@ -1,19 +1,35 @@
1import { join } from 'path' 1import { basename, join } from 'path'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config' 3import { CONFIG } from '@server/initializers/config'
4import { MStreamingPlaylistVideo, MVideoFile } from '@server/types/models' 4import { MStreamingPlaylistVideo, MVideoFile } from '@server/types/models'
5import { getHLSDirectory } from '../paths' 5import { getHLSDirectory } from '../paths'
6import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys' 6import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys'
7import { lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared' 7import { listKeysOfPrefix, lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
8 8
9function storeHLSFile (playlist: MStreamingPlaylistVideo, filename: string, path?: string) { 9function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) {
10 return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
11}
12
13// ---------------------------------------------------------------------------
14
15function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename: string) {
10 return storeObject({ 16 return storeObject({
11 inputPath: path ?? join(getHLSDirectory(playlist.Video), filename), 17 inputPath: join(getHLSDirectory(playlist.Video), filename),
12 objectStorageKey: generateHLSObjectStorageKey(playlist, filename), 18 objectStorageKey: generateHLSObjectStorageKey(playlist, filename),
13 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS 19 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
14 }) 20 })
15} 21}
16 22
23function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) {
24 return storeObject({
25 inputPath: path,
26 objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
27 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
28 })
29}
30
31// ---------------------------------------------------------------------------
32
17function storeWebTorrentFile (filename: string) { 33function storeWebTorrentFile (filename: string) {
18 return storeObject({ 34 return storeObject({
19 inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename), 35 inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename),
@@ -22,6 +38,8 @@ function storeWebTorrentFile (filename: string) {
22 }) 38 })
23} 39}
24 40
41// ---------------------------------------------------------------------------
42
25function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) { 43function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) {
26 return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) 44 return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
27} 45}
@@ -30,10 +48,14 @@ function removeHLSFileObjectStorage (playlist: MStreamingPlaylistVideo, filename
30 return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) 48 return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
31} 49}
32 50
51// ---------------------------------------------------------------------------
52
33function removeWebTorrentObjectStorage (videoFile: MVideoFile) { 53function removeWebTorrentObjectStorage (videoFile: MVideoFile) {
34 return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS) 54 return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS)
35} 55}
36 56
57// ---------------------------------------------------------------------------
58
37async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) { 59async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) {
38 const key = generateHLSObjectStorageKey(playlist, filename) 60 const key = generateHLSObjectStorageKey(playlist, filename)
39 61
@@ -62,9 +84,14 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin
62 return destination 84 return destination
63} 85}
64 86
87// ---------------------------------------------------------------------------
88
65export { 89export {
90 listHLSFileKeysOf,
91
66 storeWebTorrentFile, 92 storeWebTorrentFile,
67 storeHLSFile, 93 storeHLSFileFromFilename,
94 storeHLSFileFromPath,
68 95
69 removeHLSObjectStorage, 96 removeHLSObjectStorage,
70 removeHLSFileObjectStorage, 97 removeHLSFileObjectStorage,