aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live-manager.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-09-25 10:04:21 +0200
committerChocobozzz <chocobozzz@cpy.re>2020-11-09 15:33:04 +0100
commita5cf76afa378aae81af2a9b0ce548e5d2582f832 (patch)
tree58da320232bee7c9656774c5d6811e82bbf6c696 /server/lib/live-manager.ts
parentde6310b2fcbb8a6b79c546b23dfa1920724faaa7 (diff)
downloadPeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.gz
PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.zst
PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.zip
Add watch messages if live has not started
Diffstat (limited to 'server/lib/live-manager.ts')
-rw-r--r--server/lib/live-manager.ts129
1 files changed, 84 insertions, 45 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index f602bfb6d..41176d197 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -2,18 +2,22 @@
2import { AsyncQueue, queue } from 'async' 2import { AsyncQueue, queue } from 'async'
3import * as chokidar from 'chokidar' 3import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { ensureDir, readdir, remove } from 'fs-extra' 5import { ensureDir } from 'fs-extra'
6import { basename, join } from 'path' 6import { basename } from 'path'
7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' 7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
8import { logger } from '@server/helpers/logger' 8import { logger } from '@server/helpers/logger'
9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
10import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' 10import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
11import { VideoModel } from '@server/models/video/video'
11import { VideoFileModel } from '@server/models/video/video-file' 12import { VideoFileModel } from '@server/models/video/video-file'
12import { VideoLiveModel } from '@server/models/video/video-live' 13import { VideoLiveModel } from '@server/models/video/video-live'
13import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
14import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models' 15import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models'
15import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 16import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
17import { federateVideoIfNeeded } from './activitypub/videos'
16import { buildSha256Segment } from './hls' 18import { buildSha256Segment } from './hls'
19import { JobQueue } from './job-queue'
20import { PeerTubeSocket } from './peertube-socket'
17import { getHLSDirectory } from './video-paths' 21import { getHLSDirectory } from './video-paths'
18 22
19const NodeRtmpServer = require('node-media-server/node_rtmp_server') 23const NodeRtmpServer = require('node-media-server/node_rtmp_server')
@@ -47,6 +51,7 @@ class LiveManager {
47 private static instance: LiveManager 51 private static instance: LiveManager
48 52
49 private readonly transSessions = new Map<string, FfmpegCommand>() 53 private readonly transSessions = new Map<string, FfmpegCommand>()
54 private readonly videoSessions = new Map<number, string>()
50 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 55 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
51 56
52 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> 57 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
@@ -56,7 +61,8 @@ class LiveManager {
56 } 61 }
57 62
58 init () { 63 init () {
59 this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => { 64 const events = this.getContext().nodeEvent
65 events.on('postPublish', (sessionId: string, streamPath: string) => {
60 logger.debug('RTMP received stream', { id: sessionId, streamPath }) 66 logger.debug('RTMP received stream', { id: sessionId, streamPath })
61 67
62 const splittedPath = streamPath.split('/') 68 const splittedPath = streamPath.split('/')
@@ -69,7 +75,7 @@ class LiveManager {
69 .catch(err => logger.error('Cannot handle sessions.', { err })) 75 .catch(err => logger.error('Cannot handle sessions.', { err }))
70 }) 76 })
71 77
72 this.getContext().nodeEvent.on('donePublish', sessionId => { 78 events.on('donePublish', sessionId => {
73 this.abortSession(sessionId) 79 this.abortSession(sessionId)
74 }) 80 })
75 81
@@ -115,6 +121,16 @@ class LiveManager {
115 return this.segmentsSha256.get(videoUUID) 121 return this.segmentsSha256.get(videoUUID)
116 } 122 }
117 123
124 stopSessionOf (videoId: number) {
125 const sessionId = this.videoSessions.get(videoId)
126 if (!sessionId) return
127
128 this.abortSession(sessionId)
129
130 this.onEndTransmuxing(videoId)
131 .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
132 }
133
118 private getContext () { 134 private getContext () {
119 return context 135 return context
120 } 136 }
@@ -135,6 +151,13 @@ class LiveManager {
135 } 151 }
136 152
137 const video = videoLive.Video 153 const video = videoLive.Video
154 if (video.isBlacklisted()) {
155 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
156 return this.abortSession(sessionId)
157 }
158
159 this.videoSessions.set(video.id, sessionId)
160
138 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) 161 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
139 162
140 const session = this.getContext().sessions.get(sessionId) 163 const session = this.getContext().sessions.get(sessionId)
@@ -154,11 +177,6 @@ class LiveManager {
154 type: VideoStreamingPlaylistType.HLS 177 type: VideoStreamingPlaylistType.HLS
155 }, { returning: true }) as [ MStreamingPlaylist, boolean ] 178 }, { returning: true }) as [ MStreamingPlaylist, boolean ]
156 179
157 video.state = VideoState.PUBLISHED
158 await video.save()
159
160 // FIXME: federation?
161
162 return this.runMuxing({ 180 return this.runMuxing({
163 sessionId, 181 sessionId,
164 videoLive, 182 videoLive,
@@ -207,11 +225,46 @@ class LiveManager {
207 225
208 this.transSessions.set(sessionId, ffmpegExec) 226 this.transSessions.set(sessionId, ffmpegExec)
209 227
228 const videoUUID = videoLive.Video.uuid
229 const tsWatcher = chokidar.watch(outPath + '/*.ts')
230
231 const updateHandler = segmentPath => {
232 this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
233 }
234
235 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
236
237 tsWatcher.on('add', p => updateHandler(p))
238 tsWatcher.on('change', p => updateHandler(p))
239 tsWatcher.on('unlink', p => deleteHandler(p))
240
241 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
242 masterWatcher.on('add', async () => {
243 try {
244 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
245
246 video.state = VideoState.PUBLISHED
247 await video.save()
248 videoLive.Video = video
249
250 await federateVideoIfNeeded(video, false)
251
252 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
253 } catch (err) {
254 logger.error('Cannot federate video %d.', videoLive.videoId, { err })
255 } finally {
256 masterWatcher.close()
257 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
258 }
259 })
260
210 const onFFmpegEnded = () => { 261 const onFFmpegEnded = () => {
211 watcher.close() 262 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath)
212 .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err }))
213 263
214 this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath) 264 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
265 .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
266
267 this.onEndTransmuxing(videoLive.Video.id)
215 .catch(err => logger.error('Error in closed transmuxing.', { err })) 268 .catch(err => logger.error('Error in closed transmuxing.', { err }))
216 } 269 }
217 270
@@ -225,44 +278,30 @@ class LiveManager {
225 }) 278 })
226 279
227 ffmpegExec.on('end', () => onFFmpegEnded()) 280 ffmpegExec.on('end', () => onFFmpegEnded())
228
229 const videoUUID = videoLive.Video.uuid
230 const watcher = chokidar.watch(outPath + '/*.ts')
231
232 const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
233 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
234
235 watcher.on('add', p => updateHandler(p))
236 watcher.on('change', p => updateHandler(p))
237 watcher.on('unlink', p => deleteHandler(p))
238 } 281 }
239 282
240 private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) { 283 private async onEndTransmuxing (videoId: number) {
241 logger.info('RTMP transmuxing for %s ended.', streamPath) 284 try {
285 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
286 if (!fullVideo) return
242 287
243 const files = await readdir(outPath) 288 JobQueue.Instance.createJob({
289 type: 'video-live-ending',
290 payload: {
291 videoId: fullVideo.id
292 }
293 }, { delay: VIDEO_LIVE.CLEANUP_DELAY })
244 294
245 for (const filename of files) { 295 // FIXME: use end
246 if ( 296 fullVideo.state = VideoState.WAITING_FOR_LIVE
247 filename.endsWith('.ts') || 297 await fullVideo.save()
248 filename.endsWith('.m3u8') ||
249 filename.endsWith('.mpd') ||
250 filename.endsWith('.m4s') ||
251 filename.endsWith('.tmp')
252 ) {
253 const p = join(outPath, filename)
254 298
255 remove(p) 299 PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
256 .catch(err => logger.error('Cannot remove %s.', p, { err }))
257 }
258 }
259 300
260 playlist.destroy() 301 await federateVideoIfNeeded(fullVideo, false)
261 .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) 302 } catch (err) {
262 303 logger.error('Cannot save/federate new video state of live streaming.', { err })
263 video.state = VideoState.LIVE_ENDED 304 }
264 video.save()
265 .catch(err => logger.error('Cannot save new video state of live streaming.', { err }))
266 } 305 }
267 306
268 private async addSegmentSha (options: SegmentSha256QueueParam) { 307 private async addSegmentSha (options: SegmentSha256QueueParam) {