aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-02 13:51:06 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit3a0c2a77b1a6626699514ddaf8135f4397175443 (patch)
tree60bec7ef8e9cdc008b0a4a56aa403617d036bfab /server/lib/live
parent9a3db678f56eda37d222cf2d2232ae0ef5d533d2 (diff)
downloadPeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.gz
PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.zst
PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.zip
Enable external plugins to test the PR
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/shared/muxing-session.ts40
1 files changed, 22 insertions, 18 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index f3f8fc886..ef4ecb83e 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter {
79 private streamingPlaylist: MStreamingPlaylistVideo 79 private streamingPlaylist: MStreamingPlaylistVideo
80 private liveSegmentShaStore: LiveSegmentShaStore 80 private liveSegmentShaStore: LiveSegmentShaStore
81 81
82 private tsWatcher: FSWatcher 82 private filesWatcher: FSWatcher
83 private masterWatcher: FSWatcher
84 private m3u8Watcher: FSWatcher
85 83
86 private masterPlaylistCreated = false 84 private masterPlaylistCreated = false
87 private liveReady = false 85 private liveReady = false
@@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter {
149 147
150 await this.transcodingWrapper.run() 148 await this.transcodingWrapper.run()
151 149
150 this.filesWatcher = watch(this.outDirectory, { depth: 0 })
151
152 this.watchMasterFile() 152 this.watchMasterFile()
153 this.watchTSFiles() 153 this.watchTSFiles()
154 this.watchM3U8File() 154 this.watchM3U8File()
@@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter {
168 } 168 }
169 169
170 private watchMasterFile () { 170 private watchMasterFile () {
171 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 171 this.filesWatcher.on('add', async path => {
172 if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return
173 if (this.masterPlaylistCreated === true) return
172 174
173 this.masterWatcher.on('add', async () => {
174 try { 175 try {
175 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { 176 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
176 const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) 177 const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
@@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter {
188 this.masterPlaylistCreated = true 189 this.masterPlaylistCreated = true
189 190
190 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) 191 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
191
192 this.masterWatcher.close()
193 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
194 }) 192 })
195 } 193 }
196 194
197 private watchM3U8File () { 195 private watchM3U8File () {
198 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
199
200 const sendQueues = new Map<string, PQueue>() 196 const sendQueues = new Map<string, PQueue>()
201 197
202 const onChangeOrAdd = async (m3u8Path: string) => { 198 const onChange = async (m3u8Path: string) => {
199 if (m3u8Path.endsWith('.m3u8') !== true) return
203 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return 200 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
204 201
202 logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
203
205 try { 204 try {
206 if (!sendQueues.has(m3u8Path)) { 205 if (!sendQueues.has(m3u8Path)) {
207 sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) 206 sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
@@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter {
214 } 213 }
215 } 214 }
216 215
217 this.m3u8Watcher.on('change', onChangeOrAdd) 216 this.filesWatcher.on('change', onChange)
218 } 217 }
219 218
220 private watchTSFiles () { 219 private watchTSFiles () {
221 const startStreamDateTime = new Date().getTime() 220 const startStreamDateTime = new Date().getTime()
222 221
223 this.tsWatcher = watch(this.outDirectory + '/*.ts')
224
225 const playlistIdMatcher = /^([\d+])-/ 222 const playlistIdMatcher = /^([\d+])-/
226 223
227 const addHandler = async (segmentPath: string) => { 224 const addHandler = async (segmentPath: string) => {
228 logger.debug('Live add handler of %s.', segmentPath, this.lTags()) 225 if (segmentPath.endsWith('.ts') !== true) return
226
227 logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
229 228
230 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] 229 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
231 230
@@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter {
252 } 251 }
253 252
254 const deleteHandler = async (segmentPath: string) => { 253 const deleteHandler = async (segmentPath: string) => {
254 if (segmentPath.endsWith('.ts') !== true) return
255
256 logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags())
257
255 try { 258 try {
256 await this.liveSegmentShaStore.removeSegmentSha(segmentPath) 259 await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
257 } catch (err) { 260 } catch (err) {
@@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter {
267 } 270 }
268 } 271 }
269 272
270 this.tsWatcher.on('add', p => addHandler(p)) 273 this.filesWatcher.on('add', p => addHandler(p))
271 this.tsWatcher.on('unlink', p => deleteHandler(p)) 274 this.filesWatcher.on('unlink', p => deleteHandler(p))
272 } 275 }
273 276
274 private async isQuotaExceeded (segmentPath: string) { 277 private async isQuotaExceeded (segmentPath: string) {
@@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter {
371 setTimeout(() => { 374 setTimeout(() => {
372 // Wait latest segments generation, and close watchers 375 // Wait latest segments generation, and close watchers
373 376
374 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) 377 const promise = this.filesWatcher?.close() || Promise.resolve()
378 promise
375 .then(() => { 379 .then(() => {
376 // Process remaining segments hash 380 // Process remaining segments hash
377 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { 381 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {