diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-02 13:51:06 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 3a0c2a77b1a6626699514ddaf8135f4397175443 (patch) | |
tree | 60bec7ef8e9cdc008b0a4a56aa403617d036bfab /server/lib/live | |
parent | 9a3db678f56eda37d222cf2d2232ae0ef5d533d2 (diff) | |
download | PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.gz PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.zst PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.zip |
Enable external plugins to test the PR
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index f3f8fc886..ef4ecb83e 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter { | |||
79 | private streamingPlaylist: MStreamingPlaylistVideo | 79 | private streamingPlaylist: MStreamingPlaylistVideo |
80 | private liveSegmentShaStore: LiveSegmentShaStore | 80 | private liveSegmentShaStore: LiveSegmentShaStore |
81 | 81 | ||
82 | private tsWatcher: FSWatcher | 82 | private filesWatcher: FSWatcher |
83 | private masterWatcher: FSWatcher | ||
84 | private m3u8Watcher: FSWatcher | ||
85 | 83 | ||
86 | private masterPlaylistCreated = false | 84 | private masterPlaylistCreated = false |
87 | private liveReady = false | 85 | private liveReady = false |
@@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter { | |||
149 | 147 | ||
150 | await this.transcodingWrapper.run() | 148 | await this.transcodingWrapper.run() |
151 | 149 | ||
150 | this.filesWatcher = watch(this.outDirectory, { depth: 0 }) | ||
151 | |||
152 | this.watchMasterFile() | 152 | this.watchMasterFile() |
153 | this.watchTSFiles() | 153 | this.watchTSFiles() |
154 | this.watchM3U8File() | 154 | this.watchM3U8File() |
@@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter { | |||
168 | } | 168 | } |
169 | 169 | ||
170 | private watchMasterFile () { | 170 | private watchMasterFile () { |
171 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) | 171 | this.filesWatcher.on('add', async path => { |
172 | if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return | ||
173 | if (this.masterPlaylistCreated === true) return | ||
172 | 174 | ||
173 | this.masterWatcher.on('add', async () => { | ||
174 | try { | 175 | try { |
175 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | 176 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
176 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) | 177 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) |
@@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter { | |||
188 | this.masterPlaylistCreated = true | 189 | this.masterPlaylistCreated = true |
189 | 190 | ||
190 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) | 191 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) |
191 | |||
192 | this.masterWatcher.close() | ||
193 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) | ||
194 | }) | 192 | }) |
195 | } | 193 | } |
196 | 194 | ||
197 | private watchM3U8File () { | 195 | private watchM3U8File () { |
198 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') | ||
199 | |||
200 | const sendQueues = new Map<string, PQueue>() | 196 | const sendQueues = new Map<string, PQueue>() |
201 | 197 | ||
202 | const onChangeOrAdd = async (m3u8Path: string) => { | 198 | const onChange = async (m3u8Path: string) => { |
199 | if (m3u8Path.endsWith('.m3u8') !== true) return | ||
203 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | 200 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return |
204 | 201 | ||
202 | logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) | ||
203 | |||
205 | try { | 204 | try { |
206 | if (!sendQueues.has(m3u8Path)) { | 205 | if (!sendQueues.has(m3u8Path)) { |
207 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | 206 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) |
@@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter { | |||
214 | } | 213 | } |
215 | } | 214 | } |
216 | 215 | ||
217 | this.m3u8Watcher.on('change', onChangeOrAdd) | 216 | this.filesWatcher.on('change', onChange) |
218 | } | 217 | } |
219 | 218 | ||
220 | private watchTSFiles () { | 219 | private watchTSFiles () { |
221 | const startStreamDateTime = new Date().getTime() | 220 | const startStreamDateTime = new Date().getTime() |
222 | 221 | ||
223 | this.tsWatcher = watch(this.outDirectory + '/*.ts') | ||
224 | |||
225 | const playlistIdMatcher = /^([\d+])-/ | 222 | const playlistIdMatcher = /^([\d+])-/ |
226 | 223 | ||
227 | const addHandler = async (segmentPath: string) => { | 224 | const addHandler = async (segmentPath: string) => { |
228 | logger.debug('Live add handler of %s.', segmentPath, this.lTags()) | 225 | if (segmentPath.endsWith('.ts') !== true) return |
226 | |||
227 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) | ||
229 | 228 | ||
230 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 229 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] |
231 | 230 | ||
@@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter { | |||
252 | } | 251 | } |
253 | 252 | ||
254 | const deleteHandler = async (segmentPath: string) => { | 253 | const deleteHandler = async (segmentPath: string) => { |
254 | if (segmentPath.endsWith('.ts') !== true) return | ||
255 | |||
256 | logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags()) | ||
257 | |||
255 | try { | 258 | try { |
256 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) | 259 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) |
257 | } catch (err) { | 260 | } catch (err) { |
@@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter { | |||
267 | } | 270 | } |
268 | } | 271 | } |
269 | 272 | ||
270 | this.tsWatcher.on('add', p => addHandler(p)) | 273 | this.filesWatcher.on('add', p => addHandler(p)) |
271 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 274 | this.filesWatcher.on('unlink', p => deleteHandler(p)) |
272 | } | 275 | } |
273 | 276 | ||
274 | private async isQuotaExceeded (segmentPath: string) { | 277 | private async isQuotaExceeded (segmentPath: string) { |
@@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter { | |||
371 | setTimeout(() => { | 374 | setTimeout(() => { |
372 | // Wait latest segments generation, and close watchers | 375 | // Wait latest segments generation, and close watchers |
373 | 376 | ||
374 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) | 377 | const promise = this.filesWatcher?.close() || Promise.resolve() |
378 | promise | ||
375 | .then(() => { | 379 | .then(() => { |
376 | // Process remaining segments hash | 380 | // Process remaining segments hash |
377 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 381 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |