]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/live-manager.ts
Use a profile manager for transcoding
[github/Chocobozzz/PeerTube.git] / server / lib / live-manager.ts
1
2 import * as Bluebird from 'bluebird'
3 import * as chokidar from 'chokidar'
4 import { FfmpegCommand } from 'fluent-ffmpeg'
5 import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
6 import { createServer, Server } from 'net'
7 import { basename, join } from 'path'
8 import { isTestInstance } from '@server/helpers/core-utils'
9 import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
10 import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
11 import { logger } from '@server/helpers/logger'
12 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
13 import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
14 import { UserModel } from '@server/models/account/user'
15 import { VideoModel } from '@server/models/video/video'
16 import { VideoFileModel } from '@server/models/video/video-file'
17 import { VideoLiveModel } from '@server/models/video/video-live'
18 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
19 import { MStreamingPlaylist, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
20 import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
21 import { federateVideoIfNeeded } from './activitypub/videos'
22 import { buildSha256Segment } from './hls'
23 import { JobQueue } from './job-queue'
24 import { cleanupLive } from './job-queue/handlers/video-live-ending'
25 import { PeerTubeSocket } from './peertube-socket'
26 import { isAbleToUploadVideo } from './user'
27 import { getHLSDirectory } from './video-paths'
28 import { VideoTranscodingProfilesManager } from './video-transcoding-profiles'
29
30 import memoizee = require('memoizee')
31 const NodeRtmpSession = require('node-media-server/node_rtmp_session')
32 const context = require('node-media-server/node_core_ctx')
33 const nodeMediaServerLogger = require('node-media-server/node_core_logger')
34
35 // Disable node media server logs
36 nodeMediaServerLogger.setLogType(0)
37
38 const config = {
39 rtmp: {
40 port: CONFIG.LIVE.RTMP.PORT,
41 chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
42 gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
43 ping: VIDEO_LIVE.RTMP.PING,
44 ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
45 },
46 transcoding: {
47 ffmpeg: 'ffmpeg'
48 }
49 }
50
51 class LiveManager {
52
53 private static instance: LiveManager
54
55 private readonly transSessions = new Map<string, FfmpegCommand>()
56 private readonly videoSessions = new Map<number, string>()
57 // Values are Date().getTime()
58 private readonly watchersPerVideo = new Map<number, number[]>()
59 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
60 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
61
62 private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
63 return isAbleToUploadVideo(userId, 1000)
64 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
65
66 private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => {
67 return this.hasClientSocketsInBadHealth(sessionId)
68 }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
69
70 private rtmpServer: Server
71
72 private constructor () {
73 }
74
75 init () {
76 const events = this.getContext().nodeEvent
77 events.on('postPublish', (sessionId: string, streamPath: string) => {
78 logger.debug('RTMP received stream', { id: sessionId, streamPath })
79
80 const splittedPath = streamPath.split('/')
81 if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
82 logger.warn('Live path is incorrect.', { streamPath })
83 return this.abortSession(sessionId)
84 }
85
86 this.handleSession(sessionId, streamPath, splittedPath[2])
87 .catch(err => logger.error('Cannot handle sessions.', { err }))
88 })
89
90 events.on('donePublish', sessionId => {
91 logger.info('Live session ended.', { sessionId })
92 })
93
94 registerConfigChangedHandler(() => {
95 if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
96 this.run()
97 return
98 }
99
100 if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) {
101 this.stop()
102 }
103 })
104
105 // Cleanup broken lives, that were terminated by a server restart for example
106 this.handleBrokenLives()
107 .catch(err => logger.error('Cannot handle broken lives.', { err }))
108
109 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
110 }
111
112 run () {
113 logger.info('Running RTMP server on port %d', config.rtmp.port)
114
115 this.rtmpServer = createServer(socket => {
116 const session = new NodeRtmpSession(config, socket)
117
118 session.run()
119 })
120
121 this.rtmpServer.on('error', err => {
122 logger.error('Cannot run RTMP server.', { err })
123 })
124
125 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
126 }
127
128 stop () {
129 logger.info('Stopping RTMP server.')
130
131 this.rtmpServer.close()
132 this.rtmpServer = undefined
133
134 // Sessions is an object
135 this.getContext().sessions.forEach((session: any) => {
136 if (session instanceof NodeRtmpSession) {
137 session.stop()
138 }
139 })
140 }
141
142 isRunning () {
143 return !!this.rtmpServer
144 }
145
146 getSegmentsSha256 (videoUUID: string) {
147 return this.segmentsSha256.get(videoUUID)
148 }
149
150 stopSessionOf (videoId: number) {
151 const sessionId = this.videoSessions.get(videoId)
152 if (!sessionId) return
153
154 this.videoSessions.delete(videoId)
155 this.abortSession(sessionId)
156 }
157
158 getLiveQuotaUsedByUser (userId: number) {
159 const currentLives = this.livesPerUser.get(userId)
160 if (!currentLives) return 0
161
162 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
163 }
164
165 addViewTo (videoId: number) {
166 if (this.videoSessions.has(videoId) === false) return
167
168 let watchers = this.watchersPerVideo.get(videoId)
169
170 if (!watchers) {
171 watchers = []
172 this.watchersPerVideo.set(videoId, watchers)
173 }
174
175 watchers.push(new Date().getTime())
176 }
177
178 cleanupShaSegments (videoUUID: string) {
179 this.segmentsSha256.delete(videoUUID)
180 }
181
182 addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
183 const segmentName = basename(segmentPath)
184 const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, this.buildConcatenatedName(segmentName))
185
186 return readFile(segmentPath)
187 .then(data => appendFile(dest, data))
188 .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err }))
189 }
190
191 buildConcatenatedName (segmentOrPlaylistPath: string) {
192 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
193
194 return 'concat-' + num[1] + '.ts'
195 }
196
197 private processSegments (hlsVideoPath: string, videoUUID: string, videoLive: MVideoLive, segmentPaths: string[]) {
198 Bluebird.mapSeries(segmentPaths, async previousSegment => {
199 // Add sha hash of previous segments, because ffmpeg should have finished generating them
200 await this.addSegmentSha(videoUUID, previousSegment)
201
202 if (videoLive.saveReplay) {
203 await this.addSegmentToReplay(hlsVideoPath, previousSegment)
204 }
205 }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err }))
206 }
207
208 private getContext () {
209 return context
210 }
211
212 private abortSession (id: string) {
213 const session = this.getContext().sessions.get(id)
214 if (session) {
215 session.stop()
216 this.getContext().sessions.delete(id)
217 }
218
219 const transSession = this.transSessions.get(id)
220 if (transSession) {
221 transSession.kill('SIGINT')
222 this.transSessions.delete(id)
223 }
224 }
225
226 private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
227 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
228 if (!videoLive) {
229 logger.warn('Unknown live video with stream key %s.', streamKey)
230 return this.abortSession(sessionId)
231 }
232
233 const video = videoLive.Video
234 if (video.isBlacklisted()) {
235 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
236 return this.abortSession(sessionId)
237 }
238
239 // Cleanup old potential live files (could happen with a permanent live)
240 this.cleanupShaSegments(video.uuid)
241
242 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
243 if (oldStreamingPlaylist) {
244 await cleanupLive(video, oldStreamingPlaylist)
245 }
246
247 this.videoSessions.set(video.id, sessionId)
248
249 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
250
251 const session = this.getContext().sessions.get(sessionId)
252 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
253
254 const [ resolutionResult, fps ] = await Promise.all([
255 getVideoFileResolution(rtmpUrl),
256 getVideoFileFPS(rtmpUrl)
257 ])
258
259 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
260 ? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live')
261 : []
262
263 const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
264
265 logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions })
266
267 const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
268 videoId: video.id,
269 playlistUrl,
270 segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive),
271 p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions),
272 p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
273
274 type: VideoStreamingPlaylistType.HLS
275 }, { returning: true }) as [ MStreamingPlaylist, boolean ]
276
277 return this.runMuxing({
278 sessionId,
279 videoLive,
280 playlist: videoStreamingPlaylist,
281 rtmpUrl,
282 fps,
283 allResolutions
284 })
285 }
286
287 private async runMuxing (options: {
288 sessionId: string
289 videoLive: MVideoLiveVideo
290 playlist: MStreamingPlaylist
291 rtmpUrl: string
292 fps: number
293 allResolutions: number[]
294 }) {
295 const { sessionId, videoLive, playlist, allResolutions, fps, rtmpUrl } = options
296 const startStreamDateTime = new Date().getTime()
297
298 const user = await UserModel.loadByLiveId(videoLive.id)
299 if (!this.livesPerUser.has(user.id)) {
300 this.livesPerUser.set(user.id, [])
301 }
302
303 const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
304 const livesOfUser = this.livesPerUser.get(user.id)
305 livesOfUser.push(currentUserLive)
306
307 for (let i = 0; i < allResolutions.length; i++) {
308 const resolution = allResolutions[i]
309
310 const file = new VideoFileModel({
311 resolution,
312 size: -1,
313 extname: '.ts',
314 infoHash: null,
315 fps,
316 videoStreamingPlaylistId: playlist.id
317 })
318
319 VideoFileModel.customUpsert(file, 'streaming-playlist', null)
320 .catch(err => logger.error('Cannot create file for live streaming.', { err }))
321 }
322
323 const outPath = getHLSDirectory(videoLive.Video)
324 await ensureDir(outPath)
325
326 const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
327
328 if (videoLive.saveReplay === true) {
329 await ensureDir(replayDirectory)
330 }
331
332 const videoUUID = videoLive.Video.uuid
333
334 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
335 ? await getLiveTranscodingCommand({
336 rtmpUrl,
337 outPath,
338 resolutions: allResolutions,
339 fps,
340 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
341 profile: 'default'
342 })
343 : getLiveMuxingCommand(rtmpUrl, outPath)
344
345 logger.info('Running live muxing/transcoding for %s.', videoUUID)
346 this.transSessions.set(sessionId, ffmpegExec)
347
348 const tsWatcher = chokidar.watch(outPath + '/*.ts')
349
350 const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
351 const playlistIdMatcher = /^([\d+])-/
352
353 const addHandler = segmentPath => {
354 logger.debug('Live add handler of %s.', segmentPath)
355
356 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
357
358 const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
359 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcess)
360
361 segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
362
363 if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
364 logger.error(
365 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
366 ' Stopping session of video %s.', videoUUID)
367
368 this.stopSessionOf(videoLive.videoId)
369 return
370 }
371
372 // Duration constraint check
373 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
374 logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
375
376 this.stopSessionOf(videoLive.videoId)
377 return
378 }
379
380 // Check user quota if the user enabled replay saving
381 if (videoLive.saveReplay === true) {
382 stat(segmentPath)
383 .then(segmentStat => {
384 currentUserLive.size += segmentStat.size
385 })
386 .then(() => this.isQuotaConstraintValid(user, videoLive))
387 .then(quotaValid => {
388 if (quotaValid !== true) {
389 logger.info('Stopping session of %s: user quota exceeded.', videoUUID)
390
391 this.stopSessionOf(videoLive.videoId)
392 }
393 })
394 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
395 }
396 }
397
398 const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
399
400 tsWatcher.on('add', p => addHandler(p))
401 tsWatcher.on('unlink', p => deleteHandler(p))
402
403 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
404 masterWatcher.on('add', async () => {
405 try {
406 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
407
408 video.state = VideoState.PUBLISHED
409 await video.save()
410 videoLive.Video = video
411
412 setTimeout(() => {
413 federateVideoIfNeeded(video, false)
414 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err }))
415
416 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
417 }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
418
419 } catch (err) {
420 logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err })
421 } finally {
422 masterWatcher.close()
423 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
424 }
425 })
426
427 const onFFmpegEnded = () => {
428 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
429
430 this.transSessions.delete(sessionId)
431
432 this.watchersPerVideo.delete(videoLive.videoId)
433 this.videoSessions.delete(videoLive.videoId)
434
435 const newLivesPerUser = this.livesPerUser.get(user.id)
436 .filter(o => o.liveId !== videoLive.id)
437 this.livesPerUser.set(user.id, newLivesPerUser)
438
439 setTimeout(() => {
440 // Wait latest segments generation, and close watchers
441
442 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
443 .then(() => {
444 // Process remaining segments hash
445 for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
446 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
447 }
448 })
449 .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
450
451 this.onEndTransmuxing(videoLive.Video.id)
452 .catch(err => logger.error('Error in closed transmuxing.', { err }))
453 }, 1000)
454 }
455
456 ffmpegExec.on('error', (err, stdout, stderr) => {
457 onFFmpegEnded()
458
459 // Don't care that we killed the ffmpeg process
460 if (err?.message?.includes('Exiting normally')) return
461
462 logger.error('Live transcoding error.', { err, stdout, stderr })
463
464 this.abortSession(sessionId)
465 })
466
467 ffmpegExec.on('end', () => onFFmpegEnded())
468
469 ffmpegExec.run()
470 }
471
472 private async onEndTransmuxing (videoId: number, cleanupNow = false) {
473 try {
474 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
475 if (!fullVideo) return
476
477 const live = await VideoLiveModel.loadByVideoId(videoId)
478
479 if (!live.permanentLive) {
480 JobQueue.Instance.createJob({
481 type: 'video-live-ending',
482 payload: {
483 videoId: fullVideo.id
484 }
485 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
486
487 fullVideo.state = VideoState.LIVE_ENDED
488 } else {
489 fullVideo.state = VideoState.WAITING_FOR_LIVE
490 }
491
492 await fullVideo.save()
493
494 PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
495
496 await federateVideoIfNeeded(fullVideo, false)
497 } catch (err) {
498 logger.error('Cannot save/federate new video state of live streaming.', { err })
499 }
500 }
501
502 private async addSegmentSha (videoUUID: string, segmentPath: string) {
503 const segmentName = basename(segmentPath)
504 logger.debug('Adding live sha segment %s.', segmentPath)
505
506 const shaResult = await buildSha256Segment(segmentPath)
507
508 if (!this.segmentsSha256.has(videoUUID)) {
509 this.segmentsSha256.set(videoUUID, new Map())
510 }
511
512 const filesMap = this.segmentsSha256.get(videoUUID)
513 filesMap.set(segmentName, shaResult)
514 }
515
516 private removeSegmentSha (videoUUID: string, segmentPath: string) {
517 const segmentName = basename(segmentPath)
518
519 logger.debug('Removing live sha segment %s.', segmentPath)
520
521 const filesMap = this.segmentsSha256.get(videoUUID)
522 if (!filesMap) {
523 logger.warn('Unknown files map to remove sha for %s.', videoUUID)
524 return
525 }
526
527 if (!filesMap.has(segmentName)) {
528 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
529 return
530 }
531
532 filesMap.delete(segmentName)
533 }
534
535 private isDurationConstraintValid (streamingStartTime: number) {
536 const maxDuration = CONFIG.LIVE.MAX_DURATION
537 // No limit
538 if (maxDuration < 0) return true
539
540 const now = new Date().getTime()
541 const max = streamingStartTime + maxDuration
542
543 return now <= max
544 }
545
546 private hasClientSocketsInBadHealth (sessionId: string) {
547 const rtmpSession = this.getContext().sessions.get(sessionId)
548
549 if (!rtmpSession) {
550 logger.warn('Cannot get session %s to check players socket health.', sessionId)
551 return
552 }
553
554 for (const playerSessionId of rtmpSession.players) {
555 const playerSession = this.getContext().sessions.get(playerSessionId)
556
557 if (!playerSession) {
558 logger.error('Cannot get player session %s to check socket health.', playerSession)
559 continue
560 }
561
562 if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
563 return true
564 }
565 }
566
567 return false
568 }
569
570 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
571 if (live.saveReplay !== true) return true
572
573 return this.isAbleToUploadVideoWithCache(user.id)
574 }
575
576 private async updateLiveViews () {
577 if (!this.isRunning()) return
578
579 if (!isTestInstance()) logger.info('Updating live video views.')
580
581 for (const videoId of this.watchersPerVideo.keys()) {
582 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
583
584 const watchers = this.watchersPerVideo.get(videoId)
585
586 const numWatchers = watchers.length
587
588 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
589 video.views = numWatchers
590 await video.save()
591
592 await federateVideoIfNeeded(video, false)
593
594 PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
595
596 // Only keep not expired watchers
597 const newWatchers = watchers.filter(w => w > notBefore)
598 this.watchersPerVideo.set(videoId, newWatchers)
599
600 logger.debug('New live video views for %s is %d.', video.url, numWatchers)
601 }
602 }
603
604 private async handleBrokenLives () {
605 const videoIds = await VideoModel.listPublishedLiveIds()
606
607 for (const id of videoIds) {
608 await this.onEndTransmuxing(id, true)
609 }
610 }
611
612 static get Instance () {
613 return this.instance || (this.instance = new this())
614 }
615 }
616
617 // ---------------------------------------------------------------------------
618
619 export {
620 LiveManager
621 }