aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-10-04 10:03:17 +0200
committerChocobozzz <me@florianbigard.com>2022-10-04 10:03:17 +0200
commitcfd57d2ca0bb058087f7dc90fcc3e8442b0288e1 (patch)
treedc899a1504ecac588e5580553e02571e0f5d7e4b /server/lib/live
parent9c0cdc5047918b959ebd5e075ddad81eb7fb93f0 (diff)
downloadPeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.gz
PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.zst
PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.zip
Live supports object storage
* Sync live files (segments, master playlist, resolution playlist, segment sha file) into object storage * Automatically delete them when the live ends * Segment sha file is now a file on disk, and not stored in memory anymore
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/live-manager.ts12
-rw-r--r--server/lib/live/live-segment-sha-store.ts75
-rw-r--r--server/lib/live/live-utils.ts67
-rw-r--r--server/lib/live/shared/muxing-session.ts106
4 files changed, 188 insertions, 72 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 16715862b..9470b530b 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' 22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
23import { pick, wait } from '@shared/core-utils' 23import { pick, wait } from '@shared/core-utils'
24import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' 24import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models'
25import { federateVideoIfNeeded } from '../activitypub/videos' 25import { federateVideoIfNeeded } from '../activitypub/videos'
26import { JobQueue } from '../job-queue' 26import { JobQueue } from '../job-queue'
27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' 27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
28import { PeerTubeSocket } from '../peertube-socket' 28import { PeerTubeSocket } from '../peertube-socket'
29import { Hooks } from '../plugins/hooks' 29import { Hooks } from '../plugins/hooks'
30import { LiveQuotaStore } from './live-quota-store' 30import { LiveQuotaStore } from './live-quota-store'
31import { cleanupPermanentLive } from './live-utils' 31import { cleanupAndDestroyPermanentLive } from './live-utils'
32import { MuxingSession } from './shared' 32import { MuxingSession } from './shared'
33 33
34const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') 34const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
@@ -224,7 +224,7 @@ class LiveManager {
224 if (oldStreamingPlaylist) { 224 if (oldStreamingPlaylist) {
225 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) 225 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
226 226
227 await cleanupPermanentLive(video, oldStreamingPlaylist) 227 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
228 } 228 }
229 229
230 this.videoSessions.set(video.id, sessionId) 230 this.videoSessions.set(video.id, sessionId)
@@ -301,7 +301,7 @@ class LiveManager {
301 ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) 301 ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
302 }) 302 })
303 303
304 muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) 304 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
305 305
306 muxingSession.on('bad-socket-health', ({ videoId }) => { 306 muxingSession.on('bad-socket-health', ({ videoId }) => {
307 logger.error( 307 logger.error(
@@ -485,6 +485,10 @@ class LiveManager {
485 485
486 playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) 486 playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions)
487 487
488 playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
489 ? VideoStorage.OBJECT_STORAGE
490 : VideoStorage.FILE_SYSTEM
491
488 return playlist.save() 492 return playlist.save()
489 } 493 }
490 494
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts
index 4af6f3ebf..faf03dccf 100644
--- a/server/lib/live/live-segment-sha-store.ts
+++ b/server/lib/live/live-segment-sha-store.ts
@@ -1,62 +1,73 @@
1import { writeJson } from 'fs-extra'
1import { basename } from 'path' 2import { basename } from 'path'
3import { mapToJSON } from '@server/helpers/core-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { MStreamingPlaylistVideo } from '@server/types/models'
3import { buildSha256Segment } from '../hls' 6import { buildSha256Segment } from '../hls'
7import { storeHLSFileFromPath } from '../object-storage'
4 8
5const lTags = loggerTagsFactory('live') 9const lTags = loggerTagsFactory('live')
6 10
7class LiveSegmentShaStore { 11class LiveSegmentShaStore {
8 12
9 private static instance: LiveSegmentShaStore 13 private readonly segmentsSha256 = new Map<string, string>()
10 14
11 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 15 private readonly videoUUID: string
12 16 private readonly sha256Path: string
13 private constructor () { 17 private readonly streamingPlaylist: MStreamingPlaylistVideo
18 private readonly sendToObjectStorage: boolean
19
20 constructor (options: {
21 videoUUID: string
22 sha256Path: string
23 streamingPlaylist: MStreamingPlaylistVideo
24 sendToObjectStorage: boolean
25 }) {
26 this.videoUUID = options.videoUUID
27 this.sha256Path = options.sha256Path
28 this.streamingPlaylist = options.streamingPlaylist
29 this.sendToObjectStorage = options.sendToObjectStorage
14 } 30 }
15 31
16 getSegmentsSha256 (videoUUID: string) { 32 async addSegmentSha (segmentPath: string) {
17 return this.segmentsSha256.get(videoUUID) 33 logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
18 }
19
20 async addSegmentSha (videoUUID: string, segmentPath: string) {
21 const segmentName = basename(segmentPath)
22 logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
23 34
24 const shaResult = await buildSha256Segment(segmentPath) 35 const shaResult = await buildSha256Segment(segmentPath)
25 36
26 if (!this.segmentsSha256.has(videoUUID)) { 37 const segmentName = basename(segmentPath)
27 this.segmentsSha256.set(videoUUID, new Map()) 38 this.segmentsSha256.set(segmentName, shaResult)
28 }
29 39
30 const filesMap = this.segmentsSha256.get(videoUUID) 40 await this.writeToDisk()
31 filesMap.set(segmentName, shaResult)
32 } 41 }
33 42
34 removeSegmentSha (videoUUID: string, segmentPath: string) { 43 async removeSegmentSha (segmentPath: string) {
35 const segmentName = basename(segmentPath) 44 const segmentName = basename(segmentPath)
36 45
37 logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) 46 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
38 47
39 const filesMap = this.segmentsSha256.get(videoUUID) 48 if (!this.segmentsSha256.has(segmentName)) {
40 if (!filesMap) { 49 logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID))
41 logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
42 return 50 return
43 } 51 }
44 52
45 if (!filesMap.has(segmentName)) { 53 this.segmentsSha256.delete(segmentName)
46 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
47 return
48 }
49 54
50 filesMap.delete(segmentName) 55 await this.writeToDisk()
51 } 56 }
52 57
53 cleanupShaSegments (videoUUID: string) { 58 private async writeToDisk () {
54 this.segmentsSha256.delete(videoUUID) 59 await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256))
55 }
56 60
57 static get Instance () { 61 if (this.sendToObjectStorage) {
58 return this.instance || (this.instance = new this()) 62 const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
63
64 if (this.streamingPlaylist.segmentsSha256Url !== url) {
65 this.streamingPlaylist.segmentsSha256Url = url
66 await this.streamingPlaylist.save()
67 }
68 }
59 } 69 }
70
60} 71}
61 72
62export { 73export {
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index bba876642..d2b8e3a55 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -1,9 +1,10 @@
1import { pathExists, readdir, remove } from 'fs-extra' 1import { pathExists, readdir, remove } from 'fs-extra'
2import { basename, join } from 'path' 2import { basename, join } from 'path'
3import { logger } from '@server/helpers/logger' 3import { logger } from '@server/helpers/logger'
4import { MStreamingPlaylist, MVideo } from '@server/types/models' 4import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
5import { VideoStorage } from '@shared/models'
6import { listHLSFileKeysOf, removeHLSFileObjectStorage, removeHLSObjectStorage } from '../object-storage'
5import { getLiveDirectory } from '../paths' 7import { getLiveDirectory } from '../paths'
6import { LiveSegmentShaStore } from './live-segment-sha-store'
7 8
8function buildConcatenatedName (segmentOrPlaylistPath: string) { 9function buildConcatenatedName (segmentOrPlaylistPath: string) {
9 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) 10 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
@@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
11 return 'concat-' + num[1] + '.ts' 12 return 'concat-' + num[1] + '.ts'
12} 13}
13 14
14async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 15async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
15 await cleanupTMPLiveFiles(video) 16 await cleanupTMPLiveFiles(video, streamingPlaylist)
16 17
17 await streamingPlaylist.destroy() 18 await streamingPlaylist.destroy()
18} 19}
@@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin
20async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 21async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
21 const hlsDirectory = getLiveDirectory(video) 22 const hlsDirectory = getLiveDirectory(video)
22 23
24 // We uploaded files to object storage too, remove them
25 if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
26 await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
27 }
28
23 await remove(hlsDirectory) 29 await remove(hlsDirectory)
24 30
25 await streamingPlaylist.destroy() 31 await streamingPlaylist.destroy()
32}
26 33
27 LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) 34async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
35 await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
36
37 await cleanupTMPLiveFilesFromFilesystem(video)
28} 38}
29 39
30async function cleanupTMPLiveFiles (video: MVideo) { 40export {
31 const hlsDirectory = getLiveDirectory(video) 41 cleanupAndDestroyPermanentLive,
42 cleanupUnsavedNormalLive,
43 cleanupTMPLiveFiles,
44 buildConcatenatedName
45}
46
47// ---------------------------------------------------------------------------
32 48
33 LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) 49function isTMPLiveFile (name: string) {
50 return name.endsWith('.ts') ||
51 name.endsWith('.m3u8') ||
52 name.endsWith('.json') ||
53 name.endsWith('.mpd') ||
54 name.endsWith('.m4s') ||
55 name.endsWith('.tmp')
56}
57
58async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
59 const hlsDirectory = getLiveDirectory(video)
34 60
35 if (!await pathExists(hlsDirectory)) return 61 if (!await pathExists(hlsDirectory)) return
36 62
37 logger.info('Cleanup TMP live files of %s.', hlsDirectory) 63 logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
38 64
39 const files = await readdir(hlsDirectory) 65 const files = await readdir(hlsDirectory)
40 66
41 for (const filename of files) { 67 for (const filename of files) {
42 if ( 68 if (isTMPLiveFile(filename)) {
43 filename.endsWith('.ts') ||
44 filename.endsWith('.m3u8') ||
45 filename.endsWith('.mpd') ||
46 filename.endsWith('.m4s') ||
47 filename.endsWith('.tmp')
48 ) {
49 const p = join(hlsDirectory, filename) 69 const p = join(hlsDirectory, filename)
50 70
51 remove(p) 71 remove(p)
@@ -54,9 +74,14 @@ async function cleanupTMPLiveFiles (video: MVideo) {
54 } 74 }
55} 75}
56 76
57export { 77async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
58 cleanupPermanentLive, 78 if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
59 cleanupUnsavedNormalLive, 79
60 cleanupTMPLiveFiles, 80 const keys = await listHLSFileKeysOf(streamingPlaylist)
61 buildConcatenatedName 81
82 for (const key of keys) {
83 if (isTMPLiveFile(key)) {
84 await removeHLSFileObjectStorage(streamingPlaylist, key)
85 }
86 }
62} 87}
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 505717dce..4c27d5dd8 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 10import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' 11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
12import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file' 13import { VideoFileModel } from '@server/models/video/video-file'
13import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 14import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
15import { VideoStorage } from '@shared/models'
14import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' 16import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
15import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' 17import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
16import { isAbleToUploadVideo } from '../../user' 18import { isAbleToUploadVideo } from '../../user'
@@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils'
21import memoizee = require('memoizee') 23import memoizee = require('memoizee')
22 24
23interface MuxingSessionEvents { 25interface MuxingSessionEvents {
24 'master-playlist-created': (options: { videoId: number }) => void 26 'live-ready': (options: { videoId: number }) => void
25 27
26 'bad-socket-health': (options: { videoId: number }) => void 28 'bad-socket-health': (options: { videoId: number }) => void
27 'duration-exceeded': (options: { videoId: number }) => void 29 'duration-exceeded': (options: { videoId: number }) => void
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter {
68 private readonly outDirectory: string 70 private readonly outDirectory: string
69 private readonly replayDirectory: string 71 private readonly replayDirectory: string
70 72
73 private readonly liveSegmentShaStore: LiveSegmentShaStore
74
71 private readonly lTags: LoggerTagsFn 75 private readonly lTags: LoggerTagsFn
72 76
73 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 77 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
74 78
75 private tsWatcher: FSWatcher 79 private tsWatcher: FSWatcher
76 private masterWatcher: FSWatcher 80 private masterWatcher: FSWatcher
81 private m3u8Watcher: FSWatcher
82
83 private masterPlaylistCreated = false
84 private liveReady = false
77 85
78 private aborted = false 86 private aborted = false
79 87
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter {
123 this.outDirectory = getLiveDirectory(this.videoLive.Video) 131 this.outDirectory = getLiveDirectory(this.videoLive.Video)
124 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) 132 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
125 133
134 this.liveSegmentShaStore = new LiveSegmentShaStore({
135 videoUUID: this.videoLive.Video.uuid,
136 sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
137 streamingPlaylist: this.streamingPlaylist,
138 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
139 })
140
126 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) 141 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
127 } 142 }
128 143
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter {
159 174
160 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) 175 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
161 176
162 this.watchTSFiles()
163 this.watchMasterFile() 177 this.watchMasterFile()
178 this.watchTSFiles()
179 this.watchM3U8File()
164 180
165 let ffmpegShellCommand: string 181 let ffmpegShellCommand: string
166 this.ffmpegCommand.on('start', cmdline => { 182 this.ffmpegCommand.on('start', cmdline => {
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter {
219 setTimeout(() => { 235 setTimeout(() => {
220 // Wait latest segments generation, and close watchers 236 // Wait latest segments generation, and close watchers
221 237
222 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) 238 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
223 .then(() => { 239 .then(() => {
224 // Process remaining segments hash 240 // Process remaining segments hash
225 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { 241 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
@@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter {
240 private watchMasterFile () { 256 private watchMasterFile () {
241 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 257 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
242 258
243 this.masterWatcher.on('add', () => { 259 this.masterWatcher.on('add', async () => {
244 this.emit('master-playlist-created', { videoId: this.videoId }) 260 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
261 try {
262 const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
263
264 this.streamingPlaylist.playlistUrl = url
265 await this.streamingPlaylist.save()
266 } catch (err) {
267 logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
268 }
269 }
270
271 this.masterPlaylistCreated = true
245 272
246 this.masterWatcher.close() 273 this.masterWatcher.close()
247 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 274 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
248 }) 275 })
249 } 276 }
250 277
278 private watchM3U8File () {
279 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
280
281 const onChangeOrAdd = async (m3u8Path: string) => {
282 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
283
284 try {
285 await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
286 } catch (err) {
287 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
288 }
289 }
290
291 this.m3u8Watcher.on('change', onChangeOrAdd)
292 }
293
251 private watchTSFiles () { 294 private watchTSFiles () {
252 const startStreamDateTime = new Date().getTime() 295 const startStreamDateTime = new Date().getTime()
253 296
@@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter {
282 } 325 }
283 } 326 }
284 327
285 const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) 328 const deleteHandler = async (segmentPath: string) => {
329 try {
330 await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
331 } catch (err) {
332 logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
333 }
334
335 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
336 try {
337 await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
338 } catch (err) {
339 logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
340 }
341 }
342 }
286 343
287 this.tsWatcher.on('add', p => addHandler(p)) 344 this.tsWatcher.on('add', p => addHandler(p))
288 this.tsWatcher.on('unlink', p => deleteHandler(p)) 345 this.tsWatcher.on('unlink', p => deleteHandler(p))
@@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter {
315 extname: '.ts', 372 extname: '.ts',
316 infoHash: null, 373 infoHash: null,
317 fps: this.fps, 374 fps: this.fps,
375 storage: this.streamingPlaylist.storage,
318 videoStreamingPlaylistId: this.streamingPlaylist.id 376 videoStreamingPlaylistId: this.streamingPlaylist.id
319 }) 377 })
320 378
@@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter {
343 } 401 }
344 402
345 private processSegments (segmentPaths: string[]) { 403 private processSegments (segmentPaths: string[]) {
346 mapSeries(segmentPaths, async previousSegment => { 404 mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
347 // Add sha hash of previous segments, because ffmpeg should have finished generating them 405 .catch(err => {
348 await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) 406 if (this.aborted) return
407
408 logger.error('Cannot process segments', { err, ...this.lTags() })
409 })
410 }
349 411
350 if (this.saveReplay) { 412 private async processSegment (segmentPath: string) {
351 await this.addSegmentToReplay(previousSegment) 413 // Add sha hash of previous segments, because ffmpeg should have finished generating them
414 await this.liveSegmentShaStore.addSegmentSha(segmentPath)
415
416 if (this.saveReplay) {
417 await this.addSegmentToReplay(segmentPath)
418 }
419
420 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
421 try {
422 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
423 } catch (err) {
424 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
352 } 425 }
353 }).catch(err => { 426 }
354 if (this.aborted) return
355 427
356 logger.error('Cannot process segments', { err, ...this.lTags() }) 428 // Master playlist and segment JSON file are created, live is ready
357 }) 429 if (this.masterPlaylistCreated && !this.liveReady) {
430 this.liveReady = true
431
432 this.emit('live-ready', { videoId: this.videoId })
433 }
358 } 434 }
359 435
360 private hasClientSocketInBadHealth (sessionId: string) { 436 private hasClientSocketInBadHealth (sessionId: string) {