aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/index.ts4
-rw-r--r--server/lib/live/live-manager.ts552
-rw-r--r--server/lib/live/live-quota-store.ts48
-rw-r--r--server/lib/live/live-segment-sha-store.ts95
-rw-r--r--server/lib/live/live-utils.ts99
-rw-r--r--server/lib/live/shared/index.ts1
-rw-r--r--server/lib/live/shared/muxing-session.ts518
-rw-r--r--server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts110
-rw-r--r--server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts107
-rw-r--r--server/lib/live/shared/transcoding-wrapper/index.ts3
-rw-r--r--server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts21
11 files changed, 0 insertions, 1558 deletions
diff --git a/server/lib/live/index.ts b/server/lib/live/index.ts
deleted file mode 100644
index 8b46800da..000000000
--- a/server/lib/live/index.ts
+++ /dev/null
@@ -1,4 +0,0 @@
1export * from './live-manager'
2export * from './live-quota-store'
3export * from './live-segment-sha-store'
4export * from './live-utils'
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
deleted file mode 100644
index acb7af274..000000000
--- a/server/lib/live/live-manager.ts
+++ /dev/null
@@ -1,552 +0,0 @@
1import { readdir, readFile } from 'fs-extra'
2import { createServer, Server } from 'net'
3import { join } from 'path'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { logger, loggerTagsFactory } from '@server/helpers/logger'
6import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
7import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
8import { sequelizeTypescript } from '@server/initializers/database'
9import { RunnerJobModel } from '@server/models/runner/runner-job'
10import { UserModel } from '@server/models/user/user'
11import { VideoModel } from '@server/models/video/video'
12import { VideoLiveModel } from '@server/models/video/video-live'
13import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
14import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
15import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
16import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models'
17import { pick, wait } from '@shared/core-utils'
18import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg'
19import { LiveVideoError, VideoState } from '@shared/models'
20import { federateVideoIfNeeded } from '../activitypub/videos'
21import { JobQueue } from '../job-queue'
22import { getLiveReplayBaseDirectory } from '../paths'
23import { PeerTubeSocket } from '../peertube-socket'
24import { Hooks } from '../plugins/hooks'
25import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions'
26import { LiveQuotaStore } from './live-quota-store'
27import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils'
28import { MuxingSession } from './shared'
29
30const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
31const context = require('node-media-server/src/node_core_ctx')
32const nodeMediaServerLogger = require('node-media-server/src/node_core_logger')
33
34// Disable node media server logs
35nodeMediaServerLogger.setLogType(0)
36
37const config = {
38 rtmp: {
39 port: CONFIG.LIVE.RTMP.PORT,
40 chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
41 gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
42 ping: VIDEO_LIVE.RTMP.PING,
43 ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
44 }
45}
46
47const lTags = loggerTagsFactory('live')
48
49class LiveManager {
50
51 private static instance: LiveManager
52
53 private readonly muxingSessions = new Map<string, MuxingSession>()
54 private readonly videoSessions = new Map<string, string>()
55
56 private rtmpServer: Server
57 private rtmpsServer: ServerTLS
58
59 private running = false
60
61 private constructor () {
62 }
63
64 init () {
65 const events = this.getContext().nodeEvent
66 events.on('postPublish', (sessionId: string, streamPath: string) => {
67 logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
68
69 const splittedPath = streamPath.split('/')
70 if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
71 logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
72 return this.abortSession(sessionId)
73 }
74
75 const session = this.getContext().sessions.get(sessionId)
76 const inputLocalUrl = session.inputOriginLocalUrl + streamPath
77 const inputPublicUrl = session.inputOriginPublicUrl + streamPath
78
79 this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] })
80 .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
81 })
82
83 events.on('donePublish', sessionId => {
84 logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
85
86 // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU)
87 setTimeout(() => this.abortSession(sessionId), 2000)
88 })
89
90 registerConfigChangedHandler(() => {
91 if (!this.running && CONFIG.LIVE.ENABLED === true) {
92 this.run().catch(err => logger.error('Cannot run live server.', { err }))
93 return
94 }
95
96 if (this.running && CONFIG.LIVE.ENABLED === false) {
97 this.stop()
98 }
99 })
100
101 // Cleanup broken lives, that were terminated by a server restart for example
102 this.handleBrokenLives()
103 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
104 }
105
106 async run () {
107 this.running = true
108
109 if (CONFIG.LIVE.RTMP.ENABLED) {
110 logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags())
111
112 this.rtmpServer = createServer(socket => {
113 const session = new NodeRtmpSession(config, socket)
114
115 session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
116 session.inputOriginPublicUrl = WEBSERVER.RTMP_URL
117 session.run()
118 })
119
120 this.rtmpServer.on('error', err => {
121 logger.error('Cannot run RTMP server.', { err, ...lTags() })
122 })
123
124 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME)
125 }
126
127 if (CONFIG.LIVE.RTMPS.ENABLED) {
128 logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags())
129
130 const [ key, cert ] = await Promise.all([
131 readFile(CONFIG.LIVE.RTMPS.KEY_FILE),
132 readFile(CONFIG.LIVE.RTMPS.CERT_FILE)
133 ])
134 const serverOptions = { key, cert }
135
136 this.rtmpsServer = createServerTLS(serverOptions, socket => {
137 const session = new NodeRtmpSession(config, socket)
138
139 session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
140 session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL
141 session.run()
142 })
143
144 this.rtmpsServer.on('error', err => {
145 logger.error('Cannot run RTMPS server.', { err, ...lTags() })
146 })
147
148 this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME)
149 }
150 }
151
152 stop () {
153 this.running = false
154
155 if (this.rtmpServer) {
156 logger.info('Stopping RTMP server.', lTags())
157
158 this.rtmpServer.close()
159 this.rtmpServer = undefined
160 }
161
162 if (this.rtmpsServer) {
163 logger.info('Stopping RTMPS server.', lTags())
164
165 this.rtmpsServer.close()
166 this.rtmpsServer = undefined
167 }
168
169 // Sessions is an object
170 this.getContext().sessions.forEach((session: any) => {
171 if (session instanceof NodeRtmpSession) {
172 session.stop()
173 }
174 })
175 }
176
177 isRunning () {
178 return !!this.rtmpServer
179 }
180
181 hasSession (sessionId: string) {
182 return this.getContext().sessions.has(sessionId)
183 }
184
185 stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
186 const sessionId = this.videoSessions.get(videoUUID)
187 if (!sessionId) {
188 logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
189 return
190 }
191
192 logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
193
194 this.saveEndingSession(videoUUID, error)
195 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
196
197 this.videoSessions.delete(videoUUID)
198 this.abortSession(sessionId)
199 }
200
201 private getContext () {
202 return context
203 }
204
205 private abortSession (sessionId: string) {
206 const session = this.getContext().sessions.get(sessionId)
207 if (session) {
208 session.stop()
209 this.getContext().sessions.delete(sessionId)
210 }
211
212 const muxingSession = this.muxingSessions.get(sessionId)
213 if (muxingSession) {
214 // Muxing session will fire and event so we correctly cleanup the session
215 muxingSession.abort()
216
217 this.muxingSessions.delete(sessionId)
218 }
219 }
220
221 private async handleSession (options: {
222 sessionId: string
223 inputLocalUrl: string
224 inputPublicUrl: string
225 streamKey: string
226 }) {
227 const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options
228
229 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
230 if (!videoLive) {
231 logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
232 return this.abortSession(sessionId)
233 }
234
235 const video = videoLive.Video
236 if (video.isBlacklisted()) {
237 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
238 return this.abortSession(sessionId)
239 }
240
241 if (this.videoSessions.has(video.uuid)) {
242 logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
243 return this.abortSession(sessionId)
244 }
245
246 // Cleanup old potential live (could happen with a permanent live)
247 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
248 if (oldStreamingPlaylist) {
249 if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
250
251 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
252 }
253
254 this.videoSessions.set(video.uuid, sessionId)
255
256 const now = Date.now()
257 const probe = await ffprobePromise(inputLocalUrl)
258
259 const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([
260 getVideoStreamDimensionsInfo(inputLocalUrl, probe),
261 getVideoStreamFPS(inputLocalUrl, probe),
262 getVideoStreamBitrate(inputLocalUrl, probe),
263 hasAudioStream(inputLocalUrl, probe)
264 ])
265
266 logger.info(
267 '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
268 inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
269 )
270
271 const allResolutions = await Hooks.wrapObject(
272 this.buildAllResolutionsToTranscode(resolution, hasAudio),
273 'filter:transcoding.auto.resolutions-to-transcode.result',
274 { video }
275 )
276
277 logger.info(
278 'Handling live video of original resolution %d.', resolution,
279 { allResolutions, ...lTags(sessionId, video.uuid) }
280 )
281
282 return this.runMuxingSession({
283 sessionId,
284 videoLive,
285
286 inputLocalUrl,
287 inputPublicUrl,
288 fps,
289 bitrate,
290 ratio,
291 allResolutions,
292 hasAudio
293 })
294 }
295
296 private async runMuxingSession (options: {
297 sessionId: string
298 videoLive: MVideoLiveVideoWithSetting
299
300 inputLocalUrl: string
301 inputPublicUrl: string
302
303 fps: number
304 bitrate: number
305 ratio: number
306 allResolutions: number[]
307 hasAudio: boolean
308 }) {
309 const { sessionId, videoLive } = options
310 const videoUUID = videoLive.Video.uuid
311 const localLTags = lTags(sessionId, videoUUID)
312
313 const liveSession = await this.saveStartingSession(videoLive)
314
315 const user = await UserModel.loadByLiveId(videoLive.id)
316 LiveQuotaStore.Instance.addNewLive(user.id, sessionId)
317
318 const muxingSession = new MuxingSession({
319 context: this.getContext(),
320 sessionId,
321 videoLive,
322 user,
323
324 ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
325 })
326
327 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
328
329 muxingSession.on('bad-socket-health', ({ videoUUID }) => {
330 logger.error(
331 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
332 ' Stopping session of video %s.', videoUUID,
333 localLTags
334 )
335
336 this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH)
337 })
338
339 muxingSession.on('duration-exceeded', ({ videoUUID }) => {
340 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
341
342 this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED)
343 })
344
345 muxingSession.on('quota-exceeded', ({ videoUUID }) => {
346 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
347
348 this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED)
349 })
350
351 muxingSession.on('transcoding-error', ({ videoUUID }) => {
352 this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR)
353 })
354
355 muxingSession.on('transcoding-end', ({ videoUUID }) => {
356 this.onMuxingFFmpegEnd(videoUUID, sessionId)
357 })
358
359 muxingSession.on('after-cleanup', ({ videoUUID }) => {
360 this.muxingSessions.delete(sessionId)
361
362 LiveQuotaStore.Instance.removeLive(user.id, sessionId)
363
364 muxingSession.destroy()
365
366 return this.onAfterMuxingCleanup({ videoUUID, liveSession })
367 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
368 })
369
370 this.muxingSessions.set(sessionId, muxingSession)
371
372 muxingSession.runMuxing()
373 .catch(err => {
374 logger.error('Cannot run muxing.', { err, ...localLTags })
375 this.abortSession(sessionId)
376 })
377 }
378
379 private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) {
380 const videoId = live.videoId
381
382 try {
383 const video = await VideoModel.loadFull(videoId)
384
385 logger.info('Will publish and federate live %s.', video.url, localLTags)
386
387 video.state = VideoState.PUBLISHED
388 video.publishedAt = new Date()
389 await video.save()
390
391 live.Video = video
392
393 await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
394
395 try {
396 await federateVideoIfNeeded(video, false)
397 } catch (err) {
398 logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
399 }
400
401 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
402
403 Hooks.runAction('action:live.video.state.updated', { video })
404 } catch (err) {
405 logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
406 }
407 }
408
409 private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
410 // Session already cleaned up
411 if (!this.videoSessions.has(videoUUID)) return
412
413 this.videoSessions.delete(videoUUID)
414
415 this.saveEndingSession(videoUUID, null)
416 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
417 }
418
419 private async onAfterMuxingCleanup (options: {
420 videoUUID: string
421 liveSession?: MVideoLiveSession
422 cleanupNow?: boolean // Default false
423 }) {
424 const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
425
426 logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
427
428 try {
429 const fullVideo = await VideoModel.loadFull(videoUUID)
430 if (!fullVideo) return
431
432 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
433
434 const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id)
435
436 // On server restart during a live
437 if (!liveSession.endDate) {
438 liveSession.endDate = new Date()
439 await liveSession.save()
440 }
441
442 JobQueue.Instance.createJobAsync({
443 type: 'video-live-ending',
444 payload: {
445 videoId: fullVideo.id,
446
447 replayDirectory: live.saveReplay
448 ? await this.findReplayDirectory(fullVideo)
449 : undefined,
450
451 liveSessionId: liveSession.id,
452 streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
453
454 publishedAt: fullVideo.publishedAt.toISOString()
455 },
456
457 delay: cleanupNow
458 ? 0
459 : VIDEO_LIVE.CLEANUP_DELAY
460 })
461
462 fullVideo.state = live.permanentLive
463 ? VideoState.WAITING_FOR_LIVE
464 : VideoState.LIVE_ENDED
465
466 await fullVideo.save()
467
468 PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
469
470 await federateVideoIfNeeded(fullVideo, false)
471
472 Hooks.runAction('action:live.video.state.updated', { video: fullVideo })
473 } catch (err) {
474 logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
475 }
476 }
477
478 private async handleBrokenLives () {
479 await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' })
480
481 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
482
483 for (const uuid of videoUUIDs) {
484 await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
485 }
486 }
487
488 private async findReplayDirectory (video: MVideo) {
489 const directory = getLiveReplayBaseDirectory(video)
490 const files = await readdir(directory)
491
492 if (files.length === 0) return undefined
493
494 return join(directory, files.sort().reverse()[0])
495 }
496
497 private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) {
498 const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
499
500 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
501 ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio })
502 : []
503
504 if (resolutionsEnabled.length === 0) {
505 return [ originResolution ]
506 }
507
508 return resolutionsEnabled
509 }
510
511 private async saveStartingSession (videoLive: MVideoLiveVideoWithSetting) {
512 const replaySettings = videoLive.saveReplay
513 ? new VideoLiveReplaySettingModel({
514 privacy: videoLive.ReplaySetting.privacy
515 })
516 : null
517
518 return sequelizeTypescript.transaction(async t => {
519 if (videoLive.saveReplay) {
520 await replaySettings.save({ transaction: t })
521 }
522
523 return VideoLiveSessionModel.create({
524 startDate: new Date(),
525 liveVideoId: videoLive.videoId,
526 saveReplay: videoLive.saveReplay,
527 replaySettingId: videoLive.saveReplay ? replaySettings.id : null,
528 endingProcessed: false
529 }, { transaction: t })
530 })
531 }
532
533 private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) {
534 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
535 if (!liveSession) return
536
537 liveSession.endDate = new Date()
538 liveSession.error = error
539
540 return liveSession.save()
541 }
542
543 static get Instance () {
544 return this.instance || (this.instance = new this())
545 }
546}
547
548// ---------------------------------------------------------------------------
549
550export {
551 LiveManager
552}
diff --git a/server/lib/live/live-quota-store.ts b/server/lib/live/live-quota-store.ts
deleted file mode 100644
index 44539faaa..000000000
--- a/server/lib/live/live-quota-store.ts
+++ /dev/null
@@ -1,48 +0,0 @@
1class LiveQuotaStore {
2
3 private static instance: LiveQuotaStore
4
5 private readonly livesPerUser = new Map<number, { sessionId: string, size: number }[]>()
6
7 private constructor () {
8 }
9
10 addNewLive (userId: number, sessionId: string) {
11 if (!this.livesPerUser.has(userId)) {
12 this.livesPerUser.set(userId, [])
13 }
14
15 const currentUserLive = { sessionId, size: 0 }
16 const livesOfUser = this.livesPerUser.get(userId)
17 livesOfUser.push(currentUserLive)
18 }
19
20 removeLive (userId: number, sessionId: string) {
21 const newLivesPerUser = this.livesPerUser.get(userId)
22 .filter(o => o.sessionId !== sessionId)
23
24 this.livesPerUser.set(userId, newLivesPerUser)
25 }
26
27 addQuotaTo (userId: number, sessionId: string, size: number) {
28 const lives = this.livesPerUser.get(userId)
29 const live = lives.find(l => l.sessionId === sessionId)
30
31 live.size += size
32 }
33
34 getLiveQuotaOf (userId: number) {
35 const currentLives = this.livesPerUser.get(userId)
36 if (!currentLives) return 0
37
38 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
39 }
40
41 static get Instance () {
42 return this.instance || (this.instance = new this())
43 }
44}
45
46export {
47 LiveQuotaStore
48}
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts
deleted file mode 100644
index 8253c0274..000000000
--- a/server/lib/live/live-segment-sha-store.ts
+++ /dev/null
@@ -1,95 +0,0 @@
1import { rename, writeJson } from 'fs-extra'
2import PQueue from 'p-queue'
3import { basename } from 'path'
4import { mapToJSON } from '@server/helpers/core-utils'
5import { logger, loggerTagsFactory } from '@server/helpers/logger'
6import { MStreamingPlaylistVideo } from '@server/types/models'
7import { buildSha256Segment } from '../hls'
8import { storeHLSFileFromPath } from '../object-storage'
9
10const lTags = loggerTagsFactory('live')
11
12class LiveSegmentShaStore {
13
14 private readonly segmentsSha256 = new Map<string, string>()
15
16 private readonly videoUUID: string
17
18 private readonly sha256Path: string
19 private readonly sha256PathTMP: string
20
21 private readonly streamingPlaylist: MStreamingPlaylistVideo
22 private readonly sendToObjectStorage: boolean
23 private readonly writeQueue = new PQueue({ concurrency: 1 })
24
25 constructor (options: {
26 videoUUID: string
27 sha256Path: string
28 streamingPlaylist: MStreamingPlaylistVideo
29 sendToObjectStorage: boolean
30 }) {
31 this.videoUUID = options.videoUUID
32
33 this.sha256Path = options.sha256Path
34 this.sha256PathTMP = options.sha256Path + '.tmp'
35
36 this.streamingPlaylist = options.streamingPlaylist
37 this.sendToObjectStorage = options.sendToObjectStorage
38 }
39
40 async addSegmentSha (segmentPath: string) {
41 logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
42
43 const shaResult = await buildSha256Segment(segmentPath)
44
45 const segmentName = basename(segmentPath)
46 this.segmentsSha256.set(segmentName, shaResult)
47
48 try {
49 await this.writeToDisk()
50 } catch (err) {
51 logger.error('Cannot write sha segments to disk.', { err })
52 }
53 }
54
55 async removeSegmentSha (segmentPath: string) {
56 const segmentName = basename(segmentPath)
57
58 logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
59
60 if (!this.segmentsSha256.has(segmentName)) {
61 logger.warn(
62 'Unknown segment in live segment hash store for video %s and segment %s.',
63 this.videoUUID, segmentPath, lTags(this.videoUUID)
64 )
65 return
66 }
67
68 this.segmentsSha256.delete(segmentName)
69
70 await this.writeToDisk()
71 }
72
73 private writeToDisk () {
74 return this.writeQueue.add(async () => {
75 logger.debug(`Writing segment sha JSON ${this.sha256Path} of ${this.videoUUID} on disk.`, lTags(this.videoUUID))
76
77 // Atomic write: use rename instead of move that is not atomic
78 await writeJson(this.sha256PathTMP, mapToJSON(this.segmentsSha256))
79 await rename(this.sha256PathTMP, this.sha256Path)
80
81 if (this.sendToObjectStorage) {
82 const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
83
84 if (this.streamingPlaylist.segmentsSha256Url !== url) {
85 this.streamingPlaylist.segmentsSha256Url = url
86 await this.streamingPlaylist.save()
87 }
88 }
89 })
90 }
91}
92
93export {
94 LiveSegmentShaStore
95}
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
deleted file mode 100644
index 3fb3ce1ce..000000000
--- a/server/lib/live/live-utils.ts
+++ /dev/null
@@ -1,99 +0,0 @@
1import { pathExists, readdir, remove } from 'fs-extra'
2import { basename, join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { VIDEO_LIVE } from '@server/initializers/constants'
5import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
6import { LiveVideoLatencyMode, VideoStorage } from '@shared/models'
7import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage'
8import { getLiveDirectory } from '../paths'
9
10function buildConcatenatedName (segmentOrPlaylistPath: string) {
11 const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
12
13 return 'concat-' + num[1] + '.ts'
14}
15
16async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
17 await cleanupTMPLiveFiles(video, streamingPlaylist)
18
19 await streamingPlaylist.destroy()
20}
21
22async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
23 const hlsDirectory = getLiveDirectory(video)
24
25 // We uploaded files to object storage too, remove them
26 if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
27 await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
28 }
29
30 await remove(hlsDirectory)
31
32 await streamingPlaylist.destroy()
33}
34
35async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
36 await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
37
38 await cleanupTMPLiveFilesFromFilesystem(video)
39}
40
41function getLiveSegmentTime (latencyMode: LiveVideoLatencyMode) {
42 if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) {
43 return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY
44 }
45
46 return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY
47}
48
49export {
50 cleanupAndDestroyPermanentLive,
51 cleanupUnsavedNormalLive,
52 cleanupTMPLiveFiles,
53 getLiveSegmentTime,
54 buildConcatenatedName
55}
56
57// ---------------------------------------------------------------------------
58
59function isTMPLiveFile (name: string) {
60 return name.endsWith('.ts') ||
61 name.endsWith('.m3u8') ||
62 name.endsWith('.json') ||
63 name.endsWith('.mpd') ||
64 name.endsWith('.m4s') ||
65 name.endsWith('.tmp')
66}
67
68async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
69 const hlsDirectory = getLiveDirectory(video)
70
71 if (!await pathExists(hlsDirectory)) return
72
73 logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
74
75 const files = await readdir(hlsDirectory)
76
77 for (const filename of files) {
78 if (isTMPLiveFile(filename)) {
79 const p = join(hlsDirectory, filename)
80
81 remove(p)
82 .catch(err => logger.error('Cannot remove %s.', p, { err }))
83 }
84 }
85}
86
87async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
88 if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
89
90 logger.info('Cleanup TMP live files from object storage for %s.', streamingPlaylist.Video.uuid)
91
92 const keys = await listHLSFileKeysOf(streamingPlaylist)
93
94 for (const key of keys) {
95 if (isTMPLiveFile(key)) {
96 await removeHLSFileObjectStorageByFullKey(key)
97 }
98 }
99}
diff --git a/server/lib/live/shared/index.ts b/server/lib/live/shared/index.ts
deleted file mode 100644
index c4d1b59ec..000000000
--- a/server/lib/live/shared/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './muxing-session'
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
deleted file mode 100644
index 02691b651..000000000
--- a/server/lib/live/shared/muxing-session.ts
+++ /dev/null
@@ -1,518 +0,0 @@
1import { mapSeries } from 'bluebird'
2import { FSWatcher, watch } from 'chokidar'
3import { EventEmitter } from 'events'
4import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
5import PQueue from 'p-queue'
6import { basename, join } from 'path'
7import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
9import { CONFIG } from '@server/initializers/config'
10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
11import { removeHLSFileObjectStorageByPath, storeHLSFileFromContent, storeHLSFileFromPath } from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
14import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
15import { VideoStorage, VideoStreamingPlaylistType } from '@shared/models'
16import {
17 generateHLSMasterPlaylistFilename,
18 generateHlsSha256SegmentsFilename,
19 getLiveDirectory,
20 getLiveReplayBaseDirectory
21} from '../../paths'
22import { isAbleToUploadVideo } from '../../user'
23import { LiveQuotaStore } from '../live-quota-store'
24import { LiveSegmentShaStore } from '../live-segment-sha-store'
25import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils'
26import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper'
27
28import memoizee = require('memoizee')
29interface MuxingSessionEvents {
30 'live-ready': (options: { videoUUID: string }) => void
31
32 'bad-socket-health': (options: { videoUUID: string }) => void
33 'duration-exceeded': (options: { videoUUID: string }) => void
34 'quota-exceeded': (options: { videoUUID: string }) => void
35
36 'transcoding-end': (options: { videoUUID: string }) => void
37 'transcoding-error': (options: { videoUUID: string }) => void
38
39 'after-cleanup': (options: { videoUUID: string }) => void
40}
41
42declare interface MuxingSession {
43 on<U extends keyof MuxingSessionEvents>(
44 event: U, listener: MuxingSessionEvents[U]
45 ): this
46
47 emit<U extends keyof MuxingSessionEvents>(
48 event: U, ...args: Parameters<MuxingSessionEvents[U]>
49 ): boolean
50}
51
52class MuxingSession extends EventEmitter {
53
54 private transcodingWrapper: AbstractTranscodingWrapper
55
56 private readonly context: any
57 private readonly user: MUserId
58 private readonly sessionId: string
59 private readonly videoLive: MVideoLiveVideo
60
61 private readonly inputLocalUrl: string
62 private readonly inputPublicUrl: string
63
64 private readonly fps: number
65 private readonly allResolutions: number[]
66
67 private readonly bitrate: number
68 private readonly ratio: number
69
70 private readonly hasAudio: boolean
71
72 private readonly videoUUID: string
73 private readonly saveReplay: boolean
74
75 private readonly outDirectory: string
76 private readonly replayDirectory: string
77
78 private readonly lTags: LoggerTagsFn
79
80 // Path -> Queue
81 private readonly objectStorageSendQueues = new Map<string, PQueue>()
82
83 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
84
85 private streamingPlaylist: MStreamingPlaylistVideo
86 private liveSegmentShaStore: LiveSegmentShaStore
87
88 private filesWatcher: FSWatcher
89
90 private masterPlaylistCreated = false
91 private liveReady = false
92
93 private aborted = false
94
95 private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
96 return isAbleToUploadVideo(userId, 1000)
97 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
98
99 private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => {
100 return this.hasClientSocketInBadHealth(sessionId)
101 }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
102
103 constructor (options: {
104 context: any
105 user: MUserId
106 sessionId: string
107 videoLive: MVideoLiveVideo
108
109 inputLocalUrl: string
110 inputPublicUrl: string
111
112 fps: number
113 bitrate: number
114 ratio: number
115 allResolutions: number[]
116 hasAudio: boolean
117 }) {
118 super()
119
120 this.context = options.context
121 this.user = options.user
122 this.sessionId = options.sessionId
123 this.videoLive = options.videoLive
124
125 this.inputLocalUrl = options.inputLocalUrl
126 this.inputPublicUrl = options.inputPublicUrl
127
128 this.fps = options.fps
129
130 this.bitrate = options.bitrate
131 this.ratio = options.ratio
132
133 this.hasAudio = options.hasAudio
134
135 this.allResolutions = options.allResolutions
136
137 this.videoUUID = this.videoLive.Video.uuid
138
139 this.saveReplay = this.videoLive.saveReplay
140
141 this.outDirectory = getLiveDirectory(this.videoLive.Video)
142 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
143
144 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
145 }
146
147 async runMuxing () {
148 this.streamingPlaylist = await this.createLivePlaylist()
149
150 this.createLiveShaStore()
151 this.createFiles()
152
153 await this.prepareDirectories()
154
155 this.transcodingWrapper = this.buildTranscodingWrapper()
156
157 this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
158 this.transcodingWrapper.on('error', () => this.onTranscodingError())
159
160 await this.transcodingWrapper.run()
161
162 this.filesWatcher = watch(this.outDirectory, { depth: 0 })
163
164 this.watchMasterFile()
165 this.watchTSFiles()
166 }
167
168 abort () {
169 if (!this.transcodingWrapper) return
170
171 this.aborted = true
172 this.transcodingWrapper.abort()
173 }
174
175 destroy () {
176 this.removeAllListeners()
177 this.isAbleToUploadVideoWithCache.clear()
178 this.hasClientSocketInBadHealthWithCache.clear()
179 }
180
181 private watchMasterFile () {
182 this.filesWatcher.on('add', async path => {
183 if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return
184 if (this.masterPlaylistCreated === true) return
185
186 try {
187 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
188 const masterContent = await readFile(path, 'utf-8')
189 logger.debug('Uploading live master playlist on object storage for %s', this.videoUUID, { masterContent, ...this.lTags() })
190
191 const url = await storeHLSFileFromContent(this.streamingPlaylist, this.streamingPlaylist.playlistFilename, masterContent)
192
193 this.streamingPlaylist.playlistUrl = url
194 }
195
196 this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions)
197
198 await this.streamingPlaylist.save()
199 } catch (err) {
200 logger.error('Cannot update streaming playlist.', { err, ...this.lTags() })
201 }
202
203 this.masterPlaylistCreated = true
204
205 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
206 })
207 }
208
209 private watchTSFiles () {
210 const startStreamDateTime = new Date().getTime()
211
212 const addHandler = async (segmentPath: string) => {
213 if (segmentPath.endsWith('.ts') !== true) return
214
215 logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
216
217 const playlistId = this.getPlaylistIdFromTS(segmentPath)
218
219 const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
220 this.processSegments(segmentsToProcess)
221
222 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
223
224 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
225 this.emit('bad-socket-health', { videoUUID: this.videoUUID })
226 return
227 }
228
229 // Duration constraint check
230 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
231 this.emit('duration-exceeded', { videoUUID: this.videoUUID })
232 return
233 }
234
235 // Check user quota if the user enabled replay saving
236 if (await this.isQuotaExceeded(segmentPath) === true) {
237 this.emit('quota-exceeded', { videoUUID: this.videoUUID })
238 }
239 }
240
241 const deleteHandler = async (segmentPath: string) => {
242 if (segmentPath.endsWith('.ts') !== true) return
243
244 logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags())
245
246 try {
247 await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
248 } catch (err) {
249 logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
250 }
251
252 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
253 try {
254 await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
255 } catch (err) {
256 logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
257 }
258 }
259 }
260
261 this.filesWatcher.on('add', p => addHandler(p))
262 this.filesWatcher.on('unlink', p => deleteHandler(p))
263 }
264
265 private async isQuotaExceeded (segmentPath: string) {
266 if (this.saveReplay !== true) return false
267 if (this.aborted) return false
268
269 try {
270 const segmentStat = await stat(segmentPath)
271
272 LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.sessionId, segmentStat.size)
273
274 const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id)
275
276 return canUpload !== true
277 } catch (err) {
278 logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags() })
279 }
280 }
281
282 private createFiles () {
283 for (let i = 0; i < this.allResolutions.length; i++) {
284 const resolution = this.allResolutions[i]
285
286 const file = new VideoFileModel({
287 resolution,
288 size: -1,
289 extname: '.ts',
290 infoHash: null,
291 fps: this.fps,
292 storage: this.streamingPlaylist.storage,
293 videoStreamingPlaylistId: this.streamingPlaylist.id
294 })
295
296 VideoFileModel.customUpsert(file, 'streaming-playlist', null)
297 .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags() }))
298 }
299 }
300
301 private async prepareDirectories () {
302 await ensureDir(this.outDirectory)
303
304 if (this.videoLive.saveReplay === true) {
305 await ensureDir(this.replayDirectory)
306 }
307 }
308
309 private isDurationConstraintValid (streamingStartTime: number) {
310 const maxDuration = CONFIG.LIVE.MAX_DURATION
311 // No limit
312 if (maxDuration < 0) return true
313
314 const now = new Date().getTime()
315 const max = streamingStartTime + maxDuration
316
317 return now <= max
318 }
319
320 private processSegments (segmentPaths: string[]) {
321 mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
322 .catch(err => {
323 if (this.aborted) return
324
325 logger.error('Cannot process segments', { err, ...this.lTags() })
326 })
327 }
328
329 private async processSegment (segmentPath: string) {
330 // Add sha hash of previous segments, because ffmpeg should have finished generating them
331 await this.liveSegmentShaStore.addSegmentSha(segmentPath)
332
333 if (this.saveReplay) {
334 await this.addSegmentToReplay(segmentPath)
335 }
336
337 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
338 try {
339 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
340
341 await this.processM3U8ToObjectStorage(segmentPath)
342 } catch (err) {
343 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
344 }
345 }
346
347 // Master playlist and segment JSON file are created, live is ready
348 if (this.masterPlaylistCreated && !this.liveReady) {
349 this.liveReady = true
350
351 this.emit('live-ready', { videoUUID: this.videoUUID })
352 }
353 }
354
355 private async processM3U8ToObjectStorage (segmentPath: string) {
356 const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
357
358 logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
359
360 const segmentName = basename(segmentPath)
361
362 const playlistContent = await readFile(m3u8Path, 'utf-8')
363 // Remove new chunk references, that will be processed later
364 const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
365
366 try {
367 if (!this.objectStorageSendQueues.has(m3u8Path)) {
368 this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
369 }
370
371 const queue = this.objectStorageSendQueues.get(m3u8Path)
372 await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
373 } catch (err) {
374 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
375 }
376 }
377
378 private onTranscodingError () {
379 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
380 }
381
382 private onTranscodedEnded () {
383 this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
384
385 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputLocalUrl, this.lTags())
386
387 setTimeout(() => {
388 // Wait latest segments generation, and close watchers
389
390 const promise = this.filesWatcher?.close() || Promise.resolve()
391 promise
392 .then(() => {
393 // Process remaining segments hash
394 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
395 this.processSegments(this.segmentsToProcessPerPlaylist[key])
396 }
397 })
398 .catch(err => {
399 logger.error(
400 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
401 { err, ...this.lTags() }
402 )
403 })
404
405 this.emit('after-cleanup', { videoUUID: this.videoUUID })
406 }, 1000)
407 }
408
409 private hasClientSocketInBadHealth (sessionId: string) {
410 const rtmpSession = this.context.sessions.get(sessionId)
411
412 if (!rtmpSession) {
413 logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags())
414 return
415 }
416
417 for (const playerSessionId of rtmpSession.players) {
418 const playerSession = this.context.sessions.get(playerSessionId)
419
420 if (!playerSession) {
421 logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags())
422 continue
423 }
424
425 if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
426 return true
427 }
428 }
429
430 return false
431 }
432
433 private async addSegmentToReplay (segmentPath: string) {
434 const segmentName = basename(segmentPath)
435 const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
436
437 try {
438 const data = await readFile(segmentPath)
439
440 await appendFile(dest, data)
441 } catch (err) {
442 logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() })
443 }
444 }
445
446 private async createLivePlaylist (): Promise<MStreamingPlaylistVideo> {
447 const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(this.videoLive.Video)
448
449 playlist.playlistFilename = generateHLSMasterPlaylistFilename(true)
450 playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true)
451
452 playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
453 playlist.type = VideoStreamingPlaylistType.HLS
454
455 playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
456 ? VideoStorage.OBJECT_STORAGE
457 : VideoStorage.FILE_SYSTEM
458
459 return playlist.save()
460 }
461
462 private createLiveShaStore () {
463 this.liveSegmentShaStore = new LiveSegmentShaStore({
464 videoUUID: this.videoLive.Video.uuid,
465 sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
466 streamingPlaylist: this.streamingPlaylist,
467 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
468 })
469 }
470
471 private buildTranscodingWrapper () {
472 const options = {
473 streamingPlaylist: this.streamingPlaylist,
474 videoLive: this.videoLive,
475
476 lTags: this.lTags,
477
478 sessionId: this.sessionId,
479 inputLocalUrl: this.inputLocalUrl,
480 inputPublicUrl: this.inputPublicUrl,
481
482 toTranscode: this.allResolutions.map(resolution => ({
483 resolution,
484 fps: computeOutputFPS({ inputFPS: this.fps, resolution })
485 })),
486
487 fps: this.fps,
488 bitrate: this.bitrate,
489 ratio: this.ratio,
490 hasAudio: this.hasAudio,
491
492 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
493 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
494
495 outDirectory: this.outDirectory
496 }
497
498 return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
499 ? new RemoteTranscodingWrapper(options)
500 : new FFmpegTranscodingWrapper(options)
501 }
502
503 private getPlaylistIdFromTS (segmentPath: string) {
504 const playlistIdMatcher = /^([\d+])-/
505
506 return basename(segmentPath).match(playlistIdMatcher)[1]
507 }
508
509 private getPlaylistNameFromTS (segmentPath: string) {
510 return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
511 }
512}
513
514// ---------------------------------------------------------------------------
515
516export {
517 MuxingSession
518}
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
deleted file mode 100644
index 95168745d..000000000
--- a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
+++ /dev/null
@@ -1,110 +0,0 @@
1import EventEmitter from 'events'
2import { LoggerTagsFn } from '@server/helpers/logger'
3import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models'
4import { LiveVideoError } from '@shared/models'
5
6interface TranscodingWrapperEvents {
7 'end': () => void
8
9 'error': (options: { err: Error }) => void
10}
11
12declare interface AbstractTranscodingWrapper {
13 on<U extends keyof TranscodingWrapperEvents>(
14 event: U, listener: TranscodingWrapperEvents[U]
15 ): this
16
17 emit<U extends keyof TranscodingWrapperEvents>(
18 event: U, ...args: Parameters<TranscodingWrapperEvents[U]>
19 ): boolean
20}
21
22interface AbstractTranscodingWrapperOptions {
23 streamingPlaylist: MStreamingPlaylistVideo
24 videoLive: MVideoLiveVideo
25
26 lTags: LoggerTagsFn
27
28 sessionId: string
29 inputLocalUrl: string
30 inputPublicUrl: string
31
32 fps: number
33 toTranscode: {
34 resolution: number
35 fps: number
36 }[]
37
38 bitrate: number
39 ratio: number
40 hasAudio: boolean
41
42 segmentListSize: number
43 segmentDuration: number
44
45 outDirectory: string
46}
47
48abstract class AbstractTranscodingWrapper extends EventEmitter {
49 protected readonly videoLive: MVideoLiveVideo
50
51 protected readonly toTranscode: {
52 resolution: number
53 fps: number
54 }[]
55
56 protected readonly sessionId: string
57 protected readonly inputLocalUrl: string
58 protected readonly inputPublicUrl: string
59
60 protected readonly fps: number
61 protected readonly bitrate: number
62 protected readonly ratio: number
63 protected readonly hasAudio: boolean
64
65 protected readonly segmentListSize: number
66 protected readonly segmentDuration: number
67
68 protected readonly videoUUID: string
69
70 protected readonly outDirectory: string
71
72 protected readonly lTags: LoggerTagsFn
73
74 protected readonly streamingPlaylist: MStreamingPlaylistVideo
75
76 constructor (options: AbstractTranscodingWrapperOptions) {
77 super()
78
79 this.lTags = options.lTags
80
81 this.videoLive = options.videoLive
82 this.videoUUID = options.videoLive.Video.uuid
83 this.streamingPlaylist = options.streamingPlaylist
84
85 this.sessionId = options.sessionId
86 this.inputLocalUrl = options.inputLocalUrl
87 this.inputPublicUrl = options.inputPublicUrl
88
89 this.fps = options.fps
90 this.toTranscode = options.toTranscode
91
92 this.bitrate = options.bitrate
93 this.ratio = options.ratio
94 this.hasAudio = options.hasAudio
95
96 this.segmentListSize = options.segmentListSize
97 this.segmentDuration = options.segmentDuration
98
99 this.outDirectory = options.outDirectory
100 }
101
102 abstract run (): Promise<void>
103
104 abstract abort (error?: LiveVideoError): void
105}
106
107export {
108 AbstractTranscodingWrapper,
109 AbstractTranscodingWrapperOptions
110}
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
deleted file mode 100644
index c6ee8ebf1..000000000
--- a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
+++ /dev/null
@@ -1,107 +0,0 @@
1import { FfmpegCommand } from 'fluent-ffmpeg'
2import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
3import { logger } from '@server/helpers/logger'
4import { CONFIG } from '@server/initializers/config'
5import { VIDEO_LIVE } from '@server/initializers/constants'
6import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
7import { FFmpegLive } from '@shared/ffmpeg'
8import { getLiveSegmentTime } from '../../live-utils'
9import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
10
11export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
12 private ffmpegCommand: FfmpegCommand
13
14 private aborted = false
15 private errored = false
16 private ended = false
17
18 async run () {
19 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
20 ? await this.buildFFmpegLive().getLiveTranscodingCommand({
21 inputUrl: this.inputLocalUrl,
22
23 outPath: this.outDirectory,
24 masterPlaylistName: this.streamingPlaylist.playlistFilename,
25
26 segmentListSize: this.segmentListSize,
27 segmentDuration: this.segmentDuration,
28
29 toTranscode: this.toTranscode,
30
31 bitrate: this.bitrate,
32 ratio: this.ratio,
33
34 hasAudio: this.hasAudio
35 })
36 : this.buildFFmpegLive().getLiveMuxingCommand({
37 inputUrl: this.inputLocalUrl,
38 outPath: this.outDirectory,
39
40 masterPlaylistName: this.streamingPlaylist.playlistFilename,
41
42 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
43 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode)
44 })
45
46 logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags())
47
48 let ffmpegShellCommand: string
49 this.ffmpegCommand.on('start', cmdline => {
50 ffmpegShellCommand = cmdline
51
52 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
53 })
54
55 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
56 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
57 })
58
59 this.ffmpegCommand.on('end', () => {
60 this.onFFmpegEnded()
61 })
62
63 this.ffmpegCommand.run()
64 }
65
66 abort () {
67 if (this.ended || this.errored || this.aborted) return
68
69 logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags())
70
71 this.ffmpegCommand.kill('SIGINT')
72
73 this.aborted = true
74 this.emit('end')
75 }
76
77 private onFFmpegError (options: {
78 err: any
79 stdout: string
80 stderr: string
81 ffmpegShellCommand: string
82 }) {
83 const { err, stdout, stderr, ffmpegShellCommand } = options
84
85 // Don't care that we killed the ffmpeg process
86 if (err?.message?.includes('Exiting normally')) return
87 if (this.ended || this.errored || this.aborted) return
88
89 logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
90
91 this.errored = true
92 this.emit('error', { err })
93 }
94
95 private onFFmpegEnded () {
96 if (this.ended || this.errored || this.aborted) return
97
98 logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags())
99
100 this.ended = true
101 this.emit('end')
102 }
103
104 private buildFFmpegLive () {
105 return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
106 }
107}
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts
deleted file mode 100644
index ae28fa1ca..000000000
--- a/server/lib/live/shared/transcoding-wrapper/index.ts
+++ /dev/null
@@ -1,3 +0,0 @@
1export * from './abstract-transcoding-wrapper'
2export * from './ffmpeg-transcoding-wrapper'
3export * from './remote-transcoding-wrapper'
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
deleted file mode 100644
index 2aeeb31fb..000000000
--- a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
+++ /dev/null
@@ -1,21 +0,0 @@
1import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners'
2import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
3
4export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
5 async run () {
6 await new LiveRTMPHLSTranscodingJobHandler().create({
7 rtmpUrl: this.inputPublicUrl,
8 sessionId: this.sessionId,
9 toTranscode: this.toTranscode,
10 video: this.videoLive.Video,
11 outputDirectory: this.outDirectory,
12 playlist: this.streamingPlaylist,
13 segmentListSize: this.segmentListSize,
14 segmentDuration: this.segmentDuration
15 })
16 }
17
18 abort () {
19 this.emit('end')
20 }
21}