aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/shared/muxing-session.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-10-04 10:03:17 +0200
committerChocobozzz <me@florianbigard.com>2022-10-04 10:03:17 +0200
commitcfd57d2ca0bb058087f7dc90fcc3e8442b0288e1 (patch)
treedc899a1504ecac588e5580553e02571e0f5d7e4b /server/lib/live/shared/muxing-session.ts
parent9c0cdc5047918b959ebd5e075ddad81eb7fb93f0 (diff)
downloadPeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.gz
PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.zst
PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.zip
Live supports object storage
* Sync live files (segments, master playlist, resolution playlist, segment sha file) into object storage * Automatically delete them when the live ends * Segment sha file is now a file on disk, and not stored in memory anymore
Diffstat (limited to 'server/lib/live/shared/muxing-session.ts')
-rw-r--r--server/lib/live/shared/muxing-session.ts106
1 files changed, 91 insertions, 15 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 505717dce..4c27d5dd8 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 10import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' 11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
12import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file' 13import { VideoFileModel } from '@server/models/video/video-file'
13import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 14import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
15import { VideoStorage } from '@shared/models'
14import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' 16import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
15import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' 17import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
16import { isAbleToUploadVideo } from '../../user' 18import { isAbleToUploadVideo } from '../../user'
@@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils'
21import memoizee = require('memoizee') 23import memoizee = require('memoizee')
22 24
23interface MuxingSessionEvents { 25interface MuxingSessionEvents {
24 'master-playlist-created': (options: { videoId: number }) => void 26 'live-ready': (options: { videoId: number }) => void
25 27
26 'bad-socket-health': (options: { videoId: number }) => void 28 'bad-socket-health': (options: { videoId: number }) => void
27 'duration-exceeded': (options: { videoId: number }) => void 29 'duration-exceeded': (options: { videoId: number }) => void
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter {
68 private readonly outDirectory: string 70 private readonly outDirectory: string
69 private readonly replayDirectory: string 71 private readonly replayDirectory: string
70 72
73 private readonly liveSegmentShaStore: LiveSegmentShaStore
74
71 private readonly lTags: LoggerTagsFn 75 private readonly lTags: LoggerTagsFn
72 76
73 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 77 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
74 78
75 private tsWatcher: FSWatcher 79 private tsWatcher: FSWatcher
76 private masterWatcher: FSWatcher 80 private masterWatcher: FSWatcher
81 private m3u8Watcher: FSWatcher
82
83 private masterPlaylistCreated = false
84 private liveReady = false
77 85
78 private aborted = false 86 private aborted = false
79 87
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter {
123 this.outDirectory = getLiveDirectory(this.videoLive.Video) 131 this.outDirectory = getLiveDirectory(this.videoLive.Video)
124 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) 132 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
125 133
134 this.liveSegmentShaStore = new LiveSegmentShaStore({
135 videoUUID: this.videoLive.Video.uuid,
136 sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
137 streamingPlaylist: this.streamingPlaylist,
138 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
139 })
140
126 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) 141 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
127 } 142 }
128 143
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter {
159 174
160 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) 175 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
161 176
162 this.watchTSFiles()
163 this.watchMasterFile() 177 this.watchMasterFile()
178 this.watchTSFiles()
179 this.watchM3U8File()
164 180
165 let ffmpegShellCommand: string 181 let ffmpegShellCommand: string
166 this.ffmpegCommand.on('start', cmdline => { 182 this.ffmpegCommand.on('start', cmdline => {
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter {
219 setTimeout(() => { 235 setTimeout(() => {
220 // Wait latest segments generation, and close watchers 236 // Wait latest segments generation, and close watchers
221 237
222 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) 238 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
223 .then(() => { 239 .then(() => {
224 // Process remaining segments hash 240 // Process remaining segments hash
225 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { 241 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
@@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter {
240 private watchMasterFile () { 256 private watchMasterFile () {
241 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 257 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
242 258
243 this.masterWatcher.on('add', () => { 259 this.masterWatcher.on('add', async () => {
244 this.emit('master-playlist-created', { videoId: this.videoId }) 260 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
261 try {
262 const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
263
264 this.streamingPlaylist.playlistUrl = url
265 await this.streamingPlaylist.save()
266 } catch (err) {
267 logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
268 }
269 }
270
271 this.masterPlaylistCreated = true
245 272
246 this.masterWatcher.close() 273 this.masterWatcher.close()
247 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 274 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
248 }) 275 })
249 } 276 }
250 277
278 private watchM3U8File () {
279 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
280
281 const onChangeOrAdd = async (m3u8Path: string) => {
282 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
283
284 try {
285 await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
286 } catch (err) {
287 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
288 }
289 }
290
291 this.m3u8Watcher.on('change', onChangeOrAdd)
292 }
293
251 private watchTSFiles () { 294 private watchTSFiles () {
252 const startStreamDateTime = new Date().getTime() 295 const startStreamDateTime = new Date().getTime()
253 296
@@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter {
282 } 325 }
283 } 326 }
284 327
285 const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) 328 const deleteHandler = async (segmentPath: string) => {
329 try {
330 await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
331 } catch (err) {
332 logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
333 }
334
335 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
336 try {
337 await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
338 } catch (err) {
339 logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
340 }
341 }
342 }
286 343
287 this.tsWatcher.on('add', p => addHandler(p)) 344 this.tsWatcher.on('add', p => addHandler(p))
288 this.tsWatcher.on('unlink', p => deleteHandler(p)) 345 this.tsWatcher.on('unlink', p => deleteHandler(p))
@@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter {
315 extname: '.ts', 372 extname: '.ts',
316 infoHash: null, 373 infoHash: null,
317 fps: this.fps, 374 fps: this.fps,
375 storage: this.streamingPlaylist.storage,
318 videoStreamingPlaylistId: this.streamingPlaylist.id 376 videoStreamingPlaylistId: this.streamingPlaylist.id
319 }) 377 })
320 378
@@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter {
343 } 401 }
344 402
345 private processSegments (segmentPaths: string[]) { 403 private processSegments (segmentPaths: string[]) {
346 mapSeries(segmentPaths, async previousSegment => { 404 mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
347 // Add sha hash of previous segments, because ffmpeg should have finished generating them 405 .catch(err => {
348 await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) 406 if (this.aborted) return
407
408 logger.error('Cannot process segments', { err, ...this.lTags() })
409 })
410 }
349 411
350 if (this.saveReplay) { 412 private async processSegment (segmentPath: string) {
351 await this.addSegmentToReplay(previousSegment) 413 // Add sha hash of previous segments, because ffmpeg should have finished generating them
414 await this.liveSegmentShaStore.addSegmentSha(segmentPath)
415
416 if (this.saveReplay) {
417 await this.addSegmentToReplay(segmentPath)
418 }
419
420 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
421 try {
422 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
423 } catch (err) {
424 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
352 } 425 }
353 }).catch(err => { 426 }
354 if (this.aborted) return
355 427
356 logger.error('Cannot process segments', { err, ...this.lTags() }) 428 // Master playlist and segment JSON file are created, live is ready
357 }) 429 if (this.masterPlaylistCreated && !this.liveReady) {
430 this.liveReady = true
431
432 this.emit('live-ready', { videoId: this.videoId })
433 }
358 } 434 }
359 435
360 private hasClientSocketInBadHealth (sessionId: string) { 436 private hasClientSocketInBadHealth (sessionId: string) {