aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/shared/muxing-session.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/live/shared/muxing-session.ts')
-rw-r--r--server/lib/live/shared/muxing-session.ts191
1 files changed, 84 insertions, 107 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 2727fc4a7..f3f8fc886 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -1,11 +1,10 @@
1import { mapSeries } from 'bluebird' 1import { mapSeries } from 'bluebird'
2import { FSWatcher, watch } from 'chokidar' 2import { FSWatcher, watch } from 'chokidar'
3import { FfmpegCommand } from 'fluent-ffmpeg' 3import { EventEmitter } from 'events'
4import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 4import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
5import PQueue from 'p-queue' 5import PQueue from 'p-queue'
6import { basename, join } from 'path' 6import { basename, join } from 'path'
7import { EventEmitter } from 'stream' 7import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 9import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
@@ -20,24 +19,24 @@ import {
20 getLiveDirectory, 19 getLiveDirectory,
21 getLiveReplayBaseDirectory 20 getLiveReplayBaseDirectory
22} from '../../paths' 21} from '../../paths'
23import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
24import { isAbleToUploadVideo } from '../../user' 22import { isAbleToUploadVideo } from '../../user'
25import { LiveQuotaStore } from '../live-quota-store' 23import { LiveQuotaStore } from '../live-quota-store'
26import { LiveSegmentShaStore } from '../live-segment-sha-store' 24import { LiveSegmentShaStore } from '../live-segment-sha-store'
27import { buildConcatenatedName } from '../live-utils' 25import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils'
26import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper'
28 27
29import memoizee = require('memoizee') 28import memoizee = require('memoizee')
30interface MuxingSessionEvents { 29interface MuxingSessionEvents {
31 'live-ready': (options: { videoId: number }) => void 30 'live-ready': (options: { videoUUID: string }) => void
32 31
33 'bad-socket-health': (options: { videoId: number }) => void 32 'bad-socket-health': (options: { videoUUID: string }) => void
34 'duration-exceeded': (options: { videoId: number }) => void 33 'duration-exceeded': (options: { videoUUID: string }) => void
35 'quota-exceeded': (options: { videoId: number }) => void 34 'quota-exceeded': (options: { videoUUID: string }) => void
36 35
37 'ffmpeg-end': (options: { videoId: number }) => void 36 'transcoding-end': (options: { videoUUID: string }) => void
38 'ffmpeg-error': (options: { videoId: number }) => void 37 'transcoding-error': (options: { videoUUID: string }) => void
39 38
40 'after-cleanup': (options: { videoId: number }) => void 39 'after-cleanup': (options: { videoUUID: string }) => void
41} 40}
42 41
43declare interface MuxingSession { 42declare interface MuxingSession {
@@ -52,7 +51,7 @@ declare interface MuxingSession {
52 51
53class MuxingSession extends EventEmitter { 52class MuxingSession extends EventEmitter {
54 53
55 private ffmpegCommand: FfmpegCommand 54 private transcodingWrapper: AbstractTranscodingWrapper
56 55
57 private readonly context: any 56 private readonly context: any
58 private readonly user: MUserId 57 private readonly user: MUserId
@@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter {
67 66
68 private readonly hasAudio: boolean 67 private readonly hasAudio: boolean
69 68
70 private readonly videoId: number
71 private readonly videoUUID: string 69 private readonly videoUUID: string
72 private readonly saveReplay: boolean 70 private readonly saveReplay: boolean
73 71
@@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter {
126 124
127 this.allResolutions = options.allResolutions 125 this.allResolutions = options.allResolutions
128 126
129 this.videoId = this.videoLive.Video.id
130 this.videoUUID = this.videoLive.Video.uuid 127 this.videoUUID = this.videoLive.Video.uuid
131 128
132 this.saveReplay = this.videoLive.saveReplay 129 this.saveReplay = this.videoLive.saveReplay
@@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter {
145 142
146 await this.prepareDirectories() 143 await this.prepareDirectories()
147 144
148 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED 145 this.transcodingWrapper = this.buildTranscodingWrapper()
149 ? await getLiveTranscodingCommand({
150 inputUrl: this.inputUrl,
151 146
152 outPath: this.outDirectory, 147 this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
153 masterPlaylistName: this.streamingPlaylist.playlistFilename, 148 this.transcodingWrapper.on('error', () => this.onTranscodingError())
154 149
155 latencyMode: this.videoLive.latencyMode, 150 await this.transcodingWrapper.run()
156
157 resolutions: this.allResolutions,
158 fps: this.fps,
159 bitrate: this.bitrate,
160 ratio: this.ratio,
161
162 hasAudio: this.hasAudio,
163
164 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
165 profile: CONFIG.LIVE.TRANSCODING.PROFILE
166 })
167 : getLiveMuxingCommand({
168 inputUrl: this.inputUrl,
169 outPath: this.outDirectory,
170 masterPlaylistName: this.streamingPlaylist.playlistFilename,
171 latencyMode: this.videoLive.latencyMode
172 })
173
174 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
175 151
176 this.watchMasterFile() 152 this.watchMasterFile()
177 this.watchTSFiles() 153 this.watchTSFiles()
178 this.watchM3U8File() 154 this.watchM3U8File()
179
180 let ffmpegShellCommand: string
181 this.ffmpegCommand.on('start', cmdline => {
182 ffmpegShellCommand = cmdline
183
184 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
185 })
186
187 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
188 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
189 })
190
191 this.ffmpegCommand.on('end', () => {
192 this.emit('ffmpeg-end', ({ videoId: this.videoId }))
193
194 this.onFFmpegEnded()
195 })
196
197 this.ffmpegCommand.run()
198 } 155 }
199 156
200 abort () { 157 abort () {
201 if (!this.ffmpegCommand) return 158 if (!this.transcodingWrapper) return
202 159
203 this.aborted = true 160 this.aborted = true
204 this.ffmpegCommand.kill('SIGINT') 161 this.transcodingWrapper.abort()
205 } 162 }
206 163
207 destroy () { 164 destroy () {
@@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter {
210 this.hasClientSocketInBadHealthWithCache.clear() 167 this.hasClientSocketInBadHealthWithCache.clear()
211 } 168 }
212 169
213 private onFFmpegError (options: {
214 err: any
215 stdout: string
216 stderr: string
217 ffmpegShellCommand: string
218 }) {
219 const { err, stdout, stderr, ffmpegShellCommand } = options
220
221 this.onFFmpegEnded()
222
223 // Don't care that we killed the ffmpeg process
224 if (err?.message?.includes('Exiting normally')) return
225
226 logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
227
228 this.emit('ffmpeg-error', ({ videoId: this.videoId }))
229 }
230
231 private onFFmpegEnded () {
232 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
233
234 setTimeout(() => {
235 // Wait latest segments generation, and close watchers
236
237 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
238 .then(() => {
239 // Process remaining segments hash
240 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
241 this.processSegments(this.segmentsToProcessPerPlaylist[key])
242 }
243 })
244 .catch(err => {
245 logger.error(
246 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
247 { err, ...this.lTags() }
248 )
249 })
250
251 this.emit('after-cleanup', { videoId: this.videoId })
252 }, 1000)
253 }
254
255 private watchMasterFile () { 170 private watchMasterFile () {
256 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 171 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
257 172
@@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter {
272 187
273 this.masterPlaylistCreated = true 188 this.masterPlaylistCreated = true
274 189
190 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
191
275 this.masterWatcher.close() 192 this.masterWatcher.close()
276 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 193 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
277 }) 194 })
@@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter {
318 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] 235 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
319 236
320 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { 237 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
321 this.emit('bad-socket-health', { videoId: this.videoId }) 238 this.emit('bad-socket-health', { videoUUID: this.videoUUID })
322 return 239 return
323 } 240 }
324 241
325 // Duration constraint check 242 // Duration constraint check
326 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 243 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
327 this.emit('duration-exceeded', { videoId: this.videoId }) 244 this.emit('duration-exceeded', { videoUUID: this.videoUUID })
328 return 245 return
329 } 246 }
330 247
331 // Check user quota if the user enabled replay saving 248 // Check user quota if the user enabled replay saving
332 if (await this.isQuotaExceeded(segmentPath) === true) { 249 if (await this.isQuotaExceeded(segmentPath) === true) {
333 this.emit('quota-exceeded', { videoId: this.videoId }) 250 this.emit('quota-exceeded', { videoUUID: this.videoUUID })
334 } 251 }
335 } 252 }
336 253
@@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter {
438 if (this.masterPlaylistCreated && !this.liveReady) { 355 if (this.masterPlaylistCreated && !this.liveReady) {
439 this.liveReady = true 356 this.liveReady = true
440 357
441 this.emit('live-ready', { videoId: this.videoId }) 358 this.emit('live-ready', { videoUUID: this.videoUUID })
442 } 359 }
443 } 360 }
444 361
362 private onTranscodingError () {
363 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
364 }
365
366 private onTranscodedEnded () {
367 this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
368
369 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
370
371 setTimeout(() => {
372 // Wait latest segments generation, and close watchers
373
374 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
375 .then(() => {
376 // Process remaining segments hash
377 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
378 this.processSegments(this.segmentsToProcessPerPlaylist[key])
379 }
380 })
381 .catch(err => {
382 logger.error(
383 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
384 { err, ...this.lTags() }
385 )
386 })
387
388 this.emit('after-cleanup', { videoUUID: this.videoUUID })
389 }, 1000)
390 }
391
445 private hasClientSocketInBadHealth (sessionId: string) { 392 private hasClientSocketInBadHealth (sessionId: string) {
446 const rtmpSession = this.context.sessions.get(sessionId) 393 const rtmpSession = this.context.sessions.get(sessionId)
447 394
@@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter {
503 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED 450 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
504 }) 451 })
505 } 452 }
453
454 private buildTranscodingWrapper () {
455 const options = {
456 streamingPlaylist: this.streamingPlaylist,
457 videoLive: this.videoLive,
458
459 lTags: this.lTags,
460
461 inputUrl: this.inputUrl,
462
463 toTranscode: this.allResolutions.map(resolution => ({
464 resolution,
465 fps: computeOutputFPS({ inputFPS: this.fps, resolution })
466 })),
467
468 fps: this.fps,
469 bitrate: this.bitrate,
470 ratio: this.ratio,
471 hasAudio: this.hasAudio,
472
473 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
474 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
475
476 outDirectory: this.outDirectory
477 }
478
479 return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
480 ? new RemoteTranscodingWrapper(options)
481 : new FFmpegTranscodingWrapper(options)
482 }
506} 483}
507 484
508// --------------------------------------------------------------------------- 485// ---------------------------------------------------------------------------