diff options
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/live-manager.ts | 6 | ||||
-rw-r--r-- | server/lib/live/live-utils.ts | 40 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 41 |
3 files changed, 60 insertions, 27 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index e04ae9fef..0f14a6851 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -28,7 +28,7 @@ import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, g | |||
28 | import { PeerTubeSocket } from '../peertube-socket' | 28 | import { PeerTubeSocket } from '../peertube-socket' |
29 | import { LiveQuotaStore } from './live-quota-store' | 29 | import { LiveQuotaStore } from './live-quota-store' |
30 | import { LiveSegmentShaStore } from './live-segment-sha-store' | 30 | import { LiveSegmentShaStore } from './live-segment-sha-store' |
31 | import { cleanupLive } from './live-utils' | 31 | import { cleanupPermanentLive } from './live-utils' |
32 | import { MuxingSession } from './shared' | 32 | import { MuxingSession } from './shared' |
33 | 33 | ||
34 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') | 34 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') |
@@ -224,7 +224,9 @@ class LiveManager { | |||
224 | 224 | ||
225 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | 225 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) |
226 | if (oldStreamingPlaylist) { | 226 | if (oldStreamingPlaylist) { |
227 | await cleanupLive(video, oldStreamingPlaylist) | 227 | if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) |
228 | |||
229 | await cleanupPermanentLive(video, oldStreamingPlaylist) | ||
228 | } | 230 | } |
229 | 231 | ||
230 | this.videoSessions.set(video.id, sessionId) | 232 | this.videoSessions.set(video.id, sessionId) |
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index 46c7fd2f8..6365e23db 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts | |||
@@ -1,5 +1,6 @@ | |||
1 | import { remove } from 'fs-extra' | 1 | import { pathExists, readdir, remove } from 'fs-extra' |
2 | import { basename } from 'path' | 2 | import { basename, join } from 'path' |
3 | import { logger } from '@server/helpers/logger' | ||
3 | import { MStreamingPlaylist, MVideo } from '@server/types/models' | 4 | import { MStreamingPlaylist, MVideo } from '@server/types/models' |
4 | import { getLiveDirectory } from '../paths' | 5 | import { getLiveDirectory } from '../paths' |
5 | 6 | ||
@@ -9,7 +10,15 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { | |||
9 | return 'concat-' + num[1] + '.ts' | 10 | return 'concat-' + num[1] + '.ts' |
10 | } | 11 | } |
11 | 12 | ||
12 | async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { | 13 | async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { |
14 | const hlsDirectory = getLiveDirectory(video) | ||
15 | |||
16 | await cleanupTMPLiveFiles(hlsDirectory) | ||
17 | |||
18 | if (streamingPlaylist) await streamingPlaylist.destroy() | ||
19 | } | ||
20 | |||
21 | async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { | ||
13 | const hlsDirectory = getLiveDirectory(video) | 22 | const hlsDirectory = getLiveDirectory(video) |
14 | 23 | ||
15 | await remove(hlsDirectory) | 24 | await remove(hlsDirectory) |
@@ -17,7 +26,30 @@ async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylis | |||
17 | if (streamingPlaylist) await streamingPlaylist.destroy() | 26 | if (streamingPlaylist) await streamingPlaylist.destroy() |
18 | } | 27 | } |
19 | 28 | ||
29 | async function cleanupTMPLiveFiles (hlsDirectory: string) { | ||
30 | if (!await pathExists(hlsDirectory)) return | ||
31 | |||
32 | const files = await readdir(hlsDirectory) | ||
33 | |||
34 | for (const filename of files) { | ||
35 | if ( | ||
36 | filename.endsWith('.ts') || | ||
37 | filename.endsWith('.m3u8') || | ||
38 | filename.endsWith('.mpd') || | ||
39 | filename.endsWith('.m4s') || | ||
40 | filename.endsWith('.tmp') | ||
41 | ) { | ||
42 | const p = join(hlsDirectory, filename) | ||
43 | |||
44 | remove(p) | ||
45 | .catch(err => logger.error('Cannot remove %s.', p, { err })) | ||
46 | } | ||
47 | } | ||
48 | } | ||
49 | |||
20 | export { | 50 | export { |
21 | cleanupLive, | 51 | cleanupPermanentLive, |
52 | cleanupNormalLive, | ||
53 | cleanupTMPLiveFiles, | ||
22 | buildConcatenatedName | 54 | buildConcatenatedName |
23 | } | 55 | } |
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 1ee9b430f..98a7b2613 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -150,8 +150,8 @@ class MuxingSession extends EventEmitter { | |||
150 | 150 | ||
151 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | 151 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) |
152 | 152 | ||
153 | this.watchTSFiles(this.outDirectory) | 153 | this.watchTSFiles() |
154 | this.watchMasterFile(this.outDirectory) | 154 | this.watchMasterFile() |
155 | 155 | ||
156 | let ffmpegShellCommand: string | 156 | let ffmpegShellCommand: string |
157 | this.ffmpegCommand.on('start', cmdline => { | 157 | this.ffmpegCommand.on('start', cmdline => { |
@@ -161,13 +161,13 @@ class MuxingSession extends EventEmitter { | |||
161 | }) | 161 | }) |
162 | 162 | ||
163 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | 163 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { |
164 | this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) | 164 | this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) |
165 | }) | 165 | }) |
166 | 166 | ||
167 | this.ffmpegCommand.on('end', () => { | 167 | this.ffmpegCommand.on('end', () => { |
168 | this.emit('ffmpeg-end', ({ videoId: this.videoId })) | 168 | this.emit('ffmpeg-end', ({ videoId: this.videoId })) |
169 | 169 | ||
170 | this.onFFmpegEnded(this.outDirectory) | 170 | this.onFFmpegEnded() |
171 | }) | 171 | }) |
172 | 172 | ||
173 | this.ffmpegCommand.run() | 173 | this.ffmpegCommand.run() |
@@ -189,12 +189,11 @@ class MuxingSession extends EventEmitter { | |||
189 | err: any | 189 | err: any |
190 | stdout: string | 190 | stdout: string |
191 | stderr: string | 191 | stderr: string |
192 | outPath: string | ||
193 | ffmpegShellCommand: string | 192 | ffmpegShellCommand: string |
194 | }) { | 193 | }) { |
195 | const { err, stdout, stderr, outPath, ffmpegShellCommand } = options | 194 | const { err, stdout, stderr, ffmpegShellCommand } = options |
196 | 195 | ||
197 | this.onFFmpegEnded(outPath) | 196 | this.onFFmpegEnded() |
198 | 197 | ||
199 | // Don't care that we killed the ffmpeg process | 198 | // Don't care that we killed the ffmpeg process |
200 | if (err?.message?.includes('Exiting normally')) return | 199 | if (err?.message?.includes('Exiting normally')) return |
@@ -204,7 +203,7 @@ class MuxingSession extends EventEmitter { | |||
204 | this.emit('ffmpeg-error', ({ videoId: this.videoId })) | 203 | this.emit('ffmpeg-error', ({ videoId: this.videoId })) |
205 | } | 204 | } |
206 | 205 | ||
207 | private onFFmpegEnded (outPath: string) { | 206 | private onFFmpegEnded () { |
208 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) | 207 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) |
209 | 208 | ||
210 | setTimeout(() => { | 209 | setTimeout(() => { |
@@ -214,12 +213,12 @@ class MuxingSession extends EventEmitter { | |||
214 | .then(() => { | 213 | .then(() => { |
215 | // Process remaining segments hash | 214 | // Process remaining segments hash |
216 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 215 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |
217 | this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key]) | 216 | this.processSegments(this.segmentsToProcessPerPlaylist[key]) |
218 | } | 217 | } |
219 | }) | 218 | }) |
220 | .catch(err => { | 219 | .catch(err => { |
221 | logger.error( | 220 | logger.error( |
222 | 'Cannot close watchers of %s or process remaining hash segments.', outPath, | 221 | 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, |
223 | { err, ...this.lTags() } | 222 | { err, ...this.lTags() } |
224 | ) | 223 | ) |
225 | }) | 224 | }) |
@@ -228,21 +227,21 @@ class MuxingSession extends EventEmitter { | |||
228 | }, 1000) | 227 | }, 1000) |
229 | } | 228 | } |
230 | 229 | ||
231 | private watchMasterFile (outPath: string) { | 230 | private watchMasterFile () { |
232 | this.masterWatcher = watch(outPath + '/' + this.streamingPlaylist.playlistFilename) | 231 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) |
233 | 232 | ||
234 | this.masterWatcher.on('add', () => { | 233 | this.masterWatcher.on('add', () => { |
235 | this.emit('master-playlist-created', { videoId: this.videoId }) | 234 | this.emit('master-playlist-created', { videoId: this.videoId }) |
236 | 235 | ||
237 | this.masterWatcher.close() | 236 | this.masterWatcher.close() |
238 | .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags() })) | 237 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) |
239 | }) | 238 | }) |
240 | } | 239 | } |
241 | 240 | ||
242 | private watchTSFiles (outPath: string) { | 241 | private watchTSFiles () { |
243 | const startStreamDateTime = new Date().getTime() | 242 | const startStreamDateTime = new Date().getTime() |
244 | 243 | ||
245 | this.tsWatcher = watch(outPath + '/*.ts') | 244 | this.tsWatcher = watch(this.outDirectory + '/*.ts') |
246 | 245 | ||
247 | const playlistIdMatcher = /^([\d+])-/ | 246 | const playlistIdMatcher = /^([\d+])-/ |
248 | 247 | ||
@@ -252,7 +251,7 @@ class MuxingSession extends EventEmitter { | |||
252 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 251 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] |
253 | 252 | ||
254 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] | 253 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] |
255 | this.processSegments(outPath, segmentsToProcess) | 254 | this.processSegments(segmentsToProcess) |
256 | 255 | ||
257 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] | 256 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] |
258 | 257 | ||
@@ -273,7 +272,7 @@ class MuxingSession extends EventEmitter { | |||
273 | } | 272 | } |
274 | } | 273 | } |
275 | 274 | ||
276 | const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) | 275 | const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) |
277 | 276 | ||
278 | this.tsWatcher.on('add', p => addHandler(p)) | 277 | this.tsWatcher.on('add', p => addHandler(p)) |
279 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 278 | this.tsWatcher.on('unlink', p => deleteHandler(p)) |
@@ -332,15 +331,15 @@ class MuxingSession extends EventEmitter { | |||
332 | return now <= max | 331 | return now <= max |
333 | } | 332 | } |
334 | 333 | ||
335 | private processSegments (hlsVideoPath: string, segmentPaths: string[]) { | 334 | private processSegments (segmentPaths: string[]) { |
336 | mapSeries(segmentPaths, async previousSegment => { | 335 | mapSeries(segmentPaths, async previousSegment => { |
337 | // Add sha hash of previous segments, because ffmpeg should have finished generating them | 336 | // Add sha hash of previous segments, because ffmpeg should have finished generating them |
338 | await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) | 337 | await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) |
339 | 338 | ||
340 | if (this.saveReplay) { | 339 | if (this.saveReplay) { |
341 | await this.addSegmentToReplay(hlsVideoPath, previousSegment) | 340 | await this.addSegmentToReplay(previousSegment) |
342 | } | 341 | } |
343 | }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags() })) | 342 | }).catch(err => logger.error('Cannot process segments', { err, ...this.lTags() })) |
344 | } | 343 | } |
345 | 344 | ||
346 | private hasClientSocketInBadHealth (sessionId: string) { | 345 | private hasClientSocketInBadHealth (sessionId: string) { |
@@ -367,7 +366,7 @@ class MuxingSession extends EventEmitter { | |||
367 | return false | 366 | return false |
368 | } | 367 | } |
369 | 368 | ||
370 | private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { | 369 | private async addSegmentToReplay (segmentPath: string) { |
371 | const segmentName = basename(segmentPath) | 370 | const segmentName = basename(segmentPath) |
372 | const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) | 371 | const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) |
373 | 372 | ||