diff options
author | Chocobozzz <me@florianbigard.com> | 2020-09-25 10:04:21 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2020-11-09 15:33:04 +0100 |
commit | a5cf76afa378aae81af2a9b0ce548e5d2582f832 (patch) | |
tree | 58da320232bee7c9656774c5d6811e82bbf6c696 /server/lib/live-manager.ts | |
parent | de6310b2fcbb8a6b79c546b23dfa1920724faaa7 (diff) | |
download | PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.gz PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.zst PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.zip |
Add watch messages if live has not started
Diffstat (limited to 'server/lib/live-manager.ts')
-rw-r--r-- | server/lib/live-manager.ts | 129 |
1 files changed, 84 insertions, 45 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index f602bfb6d..41176d197 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts | |||
@@ -2,18 +2,22 @@ | |||
2 | import { AsyncQueue, queue } from 'async' | 2 | import { AsyncQueue, queue } from 'async' |
3 | import * as chokidar from 'chokidar' | 3 | import * as chokidar from 'chokidar' |
4 | import { FfmpegCommand } from 'fluent-ffmpeg' | 4 | import { FfmpegCommand } from 'fluent-ffmpeg' |
5 | import { ensureDir, readdir, remove } from 'fs-extra' | 5 | import { ensureDir } from 'fs-extra' |
6 | import { basename, join } from 'path' | 6 | import { basename } from 'path' |
7 | import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' | 7 | import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' |
8 | import { logger } from '@server/helpers/logger' | 8 | import { logger } from '@server/helpers/logger' |
9 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 9 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
10 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' | 10 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' |
11 | import { VideoModel } from '@server/models/video/video' | ||
11 | import { VideoFileModel } from '@server/models/video/video-file' | 12 | import { VideoFileModel } from '@server/models/video/video-file' |
12 | import { VideoLiveModel } from '@server/models/video/video-live' | 13 | import { VideoLiveModel } from '@server/models/video/video-live' |
13 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 14 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
14 | import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models' | 15 | import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models' |
15 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' | 16 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' |
17 | import { federateVideoIfNeeded } from './activitypub/videos' | ||
16 | import { buildSha256Segment } from './hls' | 18 | import { buildSha256Segment } from './hls' |
19 | import { JobQueue } from './job-queue' | ||
20 | import { PeerTubeSocket } from './peertube-socket' | ||
17 | import { getHLSDirectory } from './video-paths' | 21 | import { getHLSDirectory } from './video-paths' |
18 | 22 | ||
19 | const NodeRtmpServer = require('node-media-server/node_rtmp_server') | 23 | const NodeRtmpServer = require('node-media-server/node_rtmp_server') |
@@ -47,6 +51,7 @@ class LiveManager { | |||
47 | private static instance: LiveManager | 51 | private static instance: LiveManager |
48 | 52 | ||
49 | private readonly transSessions = new Map<string, FfmpegCommand>() | 53 | private readonly transSessions = new Map<string, FfmpegCommand>() |
54 | private readonly videoSessions = new Map<number, string>() | ||
50 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() | 55 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() |
51 | 56 | ||
52 | private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> | 57 | private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> |
@@ -56,7 +61,8 @@ class LiveManager { | |||
56 | } | 61 | } |
57 | 62 | ||
58 | init () { | 63 | init () { |
59 | this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => { | 64 | const events = this.getContext().nodeEvent |
65 | events.on('postPublish', (sessionId: string, streamPath: string) => { | ||
60 | logger.debug('RTMP received stream', { id: sessionId, streamPath }) | 66 | logger.debug('RTMP received stream', { id: sessionId, streamPath }) |
61 | 67 | ||
62 | const splittedPath = streamPath.split('/') | 68 | const splittedPath = streamPath.split('/') |
@@ -69,7 +75,7 @@ class LiveManager { | |||
69 | .catch(err => logger.error('Cannot handle sessions.', { err })) | 75 | .catch(err => logger.error('Cannot handle sessions.', { err })) |
70 | }) | 76 | }) |
71 | 77 | ||
72 | this.getContext().nodeEvent.on('donePublish', sessionId => { | 78 | events.on('donePublish', sessionId => { |
73 | this.abortSession(sessionId) | 79 | this.abortSession(sessionId) |
74 | }) | 80 | }) |
75 | 81 | ||
@@ -115,6 +121,16 @@ class LiveManager { | |||
115 | return this.segmentsSha256.get(videoUUID) | 121 | return this.segmentsSha256.get(videoUUID) |
116 | } | 122 | } |
117 | 123 | ||
124 | stopSessionOf (videoId: number) { | ||
125 | const sessionId = this.videoSessions.get(videoId) | ||
126 | if (!sessionId) return | ||
127 | |||
128 | this.abortSession(sessionId) | ||
129 | |||
130 | this.onEndTransmuxing(videoId) | ||
131 | .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) | ||
132 | } | ||
133 | |||
118 | private getContext () { | 134 | private getContext () { |
119 | return context | 135 | return context |
120 | } | 136 | } |
@@ -135,6 +151,13 @@ class LiveManager { | |||
135 | } | 151 | } |
136 | 152 | ||
137 | const video = videoLive.Video | 153 | const video = videoLive.Video |
154 | if (video.isBlacklisted()) { | ||
155 | logger.warn('Video is blacklisted. Refusing stream %s.', streamKey) | ||
156 | return this.abortSession(sessionId) | ||
157 | } | ||
158 | |||
159 | this.videoSessions.set(video.id, sessionId) | ||
160 | |||
138 | const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) | 161 | const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) |
139 | 162 | ||
140 | const session = this.getContext().sessions.get(sessionId) | 163 | const session = this.getContext().sessions.get(sessionId) |
@@ -154,11 +177,6 @@ class LiveManager { | |||
154 | type: VideoStreamingPlaylistType.HLS | 177 | type: VideoStreamingPlaylistType.HLS |
155 | }, { returning: true }) as [ MStreamingPlaylist, boolean ] | 178 | }, { returning: true }) as [ MStreamingPlaylist, boolean ] |
156 | 179 | ||
157 | video.state = VideoState.PUBLISHED | ||
158 | await video.save() | ||
159 | |||
160 | // FIXME: federation? | ||
161 | |||
162 | return this.runMuxing({ | 180 | return this.runMuxing({ |
163 | sessionId, | 181 | sessionId, |
164 | videoLive, | 182 | videoLive, |
@@ -207,11 +225,46 @@ class LiveManager { | |||
207 | 225 | ||
208 | this.transSessions.set(sessionId, ffmpegExec) | 226 | this.transSessions.set(sessionId, ffmpegExec) |
209 | 227 | ||
228 | const videoUUID = videoLive.Video.uuid | ||
229 | const tsWatcher = chokidar.watch(outPath + '/*.ts') | ||
230 | |||
231 | const updateHandler = segmentPath => { | ||
232 | this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) | ||
233 | } | ||
234 | |||
235 | const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) | ||
236 | |||
237 | tsWatcher.on('add', p => updateHandler(p)) | ||
238 | tsWatcher.on('change', p => updateHandler(p)) | ||
239 | tsWatcher.on('unlink', p => deleteHandler(p)) | ||
240 | |||
241 | const masterWatcher = chokidar.watch(outPath + '/master.m3u8') | ||
242 | masterWatcher.on('add', async () => { | ||
243 | try { | ||
244 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId) | ||
245 | |||
246 | video.state = VideoState.PUBLISHED | ||
247 | await video.save() | ||
248 | videoLive.Video = video | ||
249 | |||
250 | await federateVideoIfNeeded(video, false) | ||
251 | |||
252 | PeerTubeSocket.Instance.sendVideoLiveNewState(video) | ||
253 | } catch (err) { | ||
254 | logger.error('Cannot federate video %d.', videoLive.videoId, { err }) | ||
255 | } finally { | ||
256 | masterWatcher.close() | ||
257 | .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err })) | ||
258 | } | ||
259 | }) | ||
260 | |||
210 | const onFFmpegEnded = () => { | 261 | const onFFmpegEnded = () => { |
211 | watcher.close() | 262 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath) |
212 | .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err })) | ||
213 | 263 | ||
214 | this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath) | 264 | Promise.all([ tsWatcher.close(), masterWatcher.close() ]) |
265 | .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) | ||
266 | |||
267 | this.onEndTransmuxing(videoLive.Video.id) | ||
215 | .catch(err => logger.error('Error in closed transmuxing.', { err })) | 268 | .catch(err => logger.error('Error in closed transmuxing.', { err })) |
216 | } | 269 | } |
217 | 270 | ||
@@ -225,44 +278,30 @@ class LiveManager { | |||
225 | }) | 278 | }) |
226 | 279 | ||
227 | ffmpegExec.on('end', () => onFFmpegEnded()) | 280 | ffmpegExec.on('end', () => onFFmpegEnded()) |
228 | |||
229 | const videoUUID = videoLive.Video.uuid | ||
230 | const watcher = chokidar.watch(outPath + '/*.ts') | ||
231 | |||
232 | const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) | ||
233 | const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) | ||
234 | |||
235 | watcher.on('add', p => updateHandler(p)) | ||
236 | watcher.on('change', p => updateHandler(p)) | ||
237 | watcher.on('unlink', p => deleteHandler(p)) | ||
238 | } | 281 | } |
239 | 282 | ||
240 | private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) { | 283 | private async onEndTransmuxing (videoId: number) { |
241 | logger.info('RTMP transmuxing for %s ended.', streamPath) | 284 | try { |
285 | const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
286 | if (!fullVideo) return | ||
242 | 287 | ||
243 | const files = await readdir(outPath) | 288 | JobQueue.Instance.createJob({ |
289 | type: 'video-live-ending', | ||
290 | payload: { | ||
291 | videoId: fullVideo.id | ||
292 | } | ||
293 | }, { delay: VIDEO_LIVE.CLEANUP_DELAY }) | ||
244 | 294 | ||
245 | for (const filename of files) { | 295 | // FIXME: use end |
246 | if ( | 296 | fullVideo.state = VideoState.WAITING_FOR_LIVE |
247 | filename.endsWith('.ts') || | 297 | await fullVideo.save() |
248 | filename.endsWith('.m3u8') || | ||
249 | filename.endsWith('.mpd') || | ||
250 | filename.endsWith('.m4s') || | ||
251 | filename.endsWith('.tmp') | ||
252 | ) { | ||
253 | const p = join(outPath, filename) | ||
254 | 298 | ||
255 | remove(p) | 299 | PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) |
256 | .catch(err => logger.error('Cannot remove %s.', p, { err })) | ||
257 | } | ||
258 | } | ||
259 | 300 | ||
260 | playlist.destroy() | 301 | await federateVideoIfNeeded(fullVideo, false) |
261 | .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) | 302 | } catch (err) { |
262 | 303 | logger.error('Cannot save/federate new video state of live streaming.', { err }) | |
263 | video.state = VideoState.LIVE_ENDED | 304 | } |
264 | video.save() | ||
265 | .catch(err => logger.error('Cannot save new video state of live streaming.', { err })) | ||
266 | } | 305 | } |
267 | 306 | ||
268 | private async addSegmentSha (options: SegmentSha256QueueParam) { | 307 | private async addSegmentSha (options: SegmentSha256QueueParam) { |