aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/shared
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/live/shared')
-rw-r--r--server/lib/live/shared/muxing-session.ts191
-rw-r--r--server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts101
-rw-r--r--server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts95
-rw-r--r--server/lib/live/shared/transcoding-wrapper/index.ts3
-rw-r--r--server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts20
5 files changed, 303 insertions, 107 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 2727fc4a7..f3f8fc886 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -1,11 +1,10 @@
1import { mapSeries } from 'bluebird' 1import { mapSeries } from 'bluebird'
2import { FSWatcher, watch } from 'chokidar' 2import { FSWatcher, watch } from 'chokidar'
3import { FfmpegCommand } from 'fluent-ffmpeg' 3import { EventEmitter } from 'events'
4import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 4import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
5import PQueue from 'p-queue' 5import PQueue from 'p-queue'
6import { basename, join } from 'path' 6import { basename, join } from 'path'
7import { EventEmitter } from 'stream' 7import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
9import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
10import { CONFIG } from '@server/initializers/config' 9import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
@@ -20,24 +19,24 @@ import {
20 getLiveDirectory, 19 getLiveDirectory,
21 getLiveReplayBaseDirectory 20 getLiveReplayBaseDirectory
22} from '../../paths' 21} from '../../paths'
23import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
24import { isAbleToUploadVideo } from '../../user' 22import { isAbleToUploadVideo } from '../../user'
25import { LiveQuotaStore } from '../live-quota-store' 23import { LiveQuotaStore } from '../live-quota-store'
26import { LiveSegmentShaStore } from '../live-segment-sha-store' 24import { LiveSegmentShaStore } from '../live-segment-sha-store'
27import { buildConcatenatedName } from '../live-utils' 25import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils'
26import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper'
28 27
29import memoizee = require('memoizee') 28import memoizee = require('memoizee')
30interface MuxingSessionEvents { 29interface MuxingSessionEvents {
31 'live-ready': (options: { videoId: number }) => void 30 'live-ready': (options: { videoUUID: string }) => void
32 31
33 'bad-socket-health': (options: { videoId: number }) => void 32 'bad-socket-health': (options: { videoUUID: string }) => void
34 'duration-exceeded': (options: { videoId: number }) => void 33 'duration-exceeded': (options: { videoUUID: string }) => void
35 'quota-exceeded': (options: { videoId: number }) => void 34 'quota-exceeded': (options: { videoUUID: string }) => void
36 35
37 'ffmpeg-end': (options: { videoId: number }) => void 36 'transcoding-end': (options: { videoUUID: string }) => void
38 'ffmpeg-error': (options: { videoId: number }) => void 37 'transcoding-error': (options: { videoUUID: string }) => void
39 38
40 'after-cleanup': (options: { videoId: number }) => void 39 'after-cleanup': (options: { videoUUID: string }) => void
41} 40}
42 41
43declare interface MuxingSession { 42declare interface MuxingSession {
@@ -52,7 +51,7 @@ declare interface MuxingSession {
52 51
53class MuxingSession extends EventEmitter { 52class MuxingSession extends EventEmitter {
54 53
55 private ffmpegCommand: FfmpegCommand 54 private transcodingWrapper: AbstractTranscodingWrapper
56 55
57 private readonly context: any 56 private readonly context: any
58 private readonly user: MUserId 57 private readonly user: MUserId
@@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter {
67 66
68 private readonly hasAudio: boolean 67 private readonly hasAudio: boolean
69 68
70 private readonly videoId: number
71 private readonly videoUUID: string 69 private readonly videoUUID: string
72 private readonly saveReplay: boolean 70 private readonly saveReplay: boolean
73 71
@@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter {
126 124
127 this.allResolutions = options.allResolutions 125 this.allResolutions = options.allResolutions
128 126
129 this.videoId = this.videoLive.Video.id
130 this.videoUUID = this.videoLive.Video.uuid 127 this.videoUUID = this.videoLive.Video.uuid
131 128
132 this.saveReplay = this.videoLive.saveReplay 129 this.saveReplay = this.videoLive.saveReplay
@@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter {
145 142
146 await this.prepareDirectories() 143 await this.prepareDirectories()
147 144
148 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED 145 this.transcodingWrapper = this.buildTranscodingWrapper()
149 ? await getLiveTranscodingCommand({
150 inputUrl: this.inputUrl,
151 146
152 outPath: this.outDirectory, 147 this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
153 masterPlaylistName: this.streamingPlaylist.playlistFilename, 148 this.transcodingWrapper.on('error', () => this.onTranscodingError())
154 149
155 latencyMode: this.videoLive.latencyMode, 150 await this.transcodingWrapper.run()
156
157 resolutions: this.allResolutions,
158 fps: this.fps,
159 bitrate: this.bitrate,
160 ratio: this.ratio,
161
162 hasAudio: this.hasAudio,
163
164 availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
165 profile: CONFIG.LIVE.TRANSCODING.PROFILE
166 })
167 : getLiveMuxingCommand({
168 inputUrl: this.inputUrl,
169 outPath: this.outDirectory,
170 masterPlaylistName: this.streamingPlaylist.playlistFilename,
171 latencyMode: this.videoLive.latencyMode
172 })
173
174 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
175 151
176 this.watchMasterFile() 152 this.watchMasterFile()
177 this.watchTSFiles() 153 this.watchTSFiles()
178 this.watchM3U8File() 154 this.watchM3U8File()
179
180 let ffmpegShellCommand: string
181 this.ffmpegCommand.on('start', cmdline => {
182 ffmpegShellCommand = cmdline
183
184 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
185 })
186
187 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
188 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
189 })
190
191 this.ffmpegCommand.on('end', () => {
192 this.emit('ffmpeg-end', ({ videoId: this.videoId }))
193
194 this.onFFmpegEnded()
195 })
196
197 this.ffmpegCommand.run()
198 } 155 }
199 156
200 abort () { 157 abort () {
201 if (!this.ffmpegCommand) return 158 if (!this.transcodingWrapper) return
202 159
203 this.aborted = true 160 this.aborted = true
204 this.ffmpegCommand.kill('SIGINT') 161 this.transcodingWrapper.abort()
205 } 162 }
206 163
207 destroy () { 164 destroy () {
@@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter {
210 this.hasClientSocketInBadHealthWithCache.clear() 167 this.hasClientSocketInBadHealthWithCache.clear()
211 } 168 }
212 169
213 private onFFmpegError (options: {
214 err: any
215 stdout: string
216 stderr: string
217 ffmpegShellCommand: string
218 }) {
219 const { err, stdout, stderr, ffmpegShellCommand } = options
220
221 this.onFFmpegEnded()
222
223 // Don't care that we killed the ffmpeg process
224 if (err?.message?.includes('Exiting normally')) return
225
226 logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
227
228 this.emit('ffmpeg-error', ({ videoId: this.videoId }))
229 }
230
231 private onFFmpegEnded () {
232 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
233
234 setTimeout(() => {
235 // Wait latest segments generation, and close watchers
236
237 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
238 .then(() => {
239 // Process remaining segments hash
240 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
241 this.processSegments(this.segmentsToProcessPerPlaylist[key])
242 }
243 })
244 .catch(err => {
245 logger.error(
246 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
247 { err, ...this.lTags() }
248 )
249 })
250
251 this.emit('after-cleanup', { videoId: this.videoId })
252 }, 1000)
253 }
254
255 private watchMasterFile () { 170 private watchMasterFile () {
256 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) 171 this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
257 172
@@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter {
272 187
273 this.masterPlaylistCreated = true 188 this.masterPlaylistCreated = true
274 189
190 logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
191
275 this.masterWatcher.close() 192 this.masterWatcher.close()
276 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) 193 .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
277 }) 194 })
@@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter {
318 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] 235 this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
319 236
320 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { 237 if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
321 this.emit('bad-socket-health', { videoId: this.videoId }) 238 this.emit('bad-socket-health', { videoUUID: this.videoUUID })
322 return 239 return
323 } 240 }
324 241
325 // Duration constraint check 242 // Duration constraint check
326 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 243 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
327 this.emit('duration-exceeded', { videoId: this.videoId }) 244 this.emit('duration-exceeded', { videoUUID: this.videoUUID })
328 return 245 return
329 } 246 }
330 247
331 // Check user quota if the user enabled replay saving 248 // Check user quota if the user enabled replay saving
332 if (await this.isQuotaExceeded(segmentPath) === true) { 249 if (await this.isQuotaExceeded(segmentPath) === true) {
333 this.emit('quota-exceeded', { videoId: this.videoId }) 250 this.emit('quota-exceeded', { videoUUID: this.videoUUID })
334 } 251 }
335 } 252 }
336 253
@@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter {
438 if (this.masterPlaylistCreated && !this.liveReady) { 355 if (this.masterPlaylistCreated && !this.liveReady) {
439 this.liveReady = true 356 this.liveReady = true
440 357
441 this.emit('live-ready', { videoId: this.videoId }) 358 this.emit('live-ready', { videoUUID: this.videoUUID })
442 } 359 }
443 } 360 }
444 361
362 private onTranscodingError () {
363 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
364 }
365
366 private onTranscodedEnded () {
367 this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
368
369 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
370
371 setTimeout(() => {
372 // Wait latest segments generation, and close watchers
373
374 Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
375 .then(() => {
376 // Process remaining segments hash
377 for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
378 this.processSegments(this.segmentsToProcessPerPlaylist[key])
379 }
380 })
381 .catch(err => {
382 logger.error(
383 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
384 { err, ...this.lTags() }
385 )
386 })
387
388 this.emit('after-cleanup', { videoUUID: this.videoUUID })
389 }, 1000)
390 }
391
445 private hasClientSocketInBadHealth (sessionId: string) { 392 private hasClientSocketInBadHealth (sessionId: string) {
446 const rtmpSession = this.context.sessions.get(sessionId) 393 const rtmpSession = this.context.sessions.get(sessionId)
447 394
@@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter {
503 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED 450 sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
504 }) 451 })
505 } 452 }
453
454 private buildTranscodingWrapper () {
455 const options = {
456 streamingPlaylist: this.streamingPlaylist,
457 videoLive: this.videoLive,
458
459 lTags: this.lTags,
460
461 inputUrl: this.inputUrl,
462
463 toTranscode: this.allResolutions.map(resolution => ({
464 resolution,
465 fps: computeOutputFPS({ inputFPS: this.fps, resolution })
466 })),
467
468 fps: this.fps,
469 bitrate: this.bitrate,
470 ratio: this.ratio,
471 hasAudio: this.hasAudio,
472
473 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
474 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
475
476 outDirectory: this.outDirectory
477 }
478
479 return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
480 ? new RemoteTranscodingWrapper(options)
481 : new FFmpegTranscodingWrapper(options)
482 }
506} 483}
507 484
508// --------------------------------------------------------------------------- 485// ---------------------------------------------------------------------------
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
new file mode 100644
index 000000000..226ba4573
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
@@ -0,0 +1,101 @@
1import EventEmitter from 'events'
2import { LoggerTagsFn } from '@server/helpers/logger'
3import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models'
4import { LiveVideoError } from '@shared/models'
5
6interface TranscodingWrapperEvents {
7 'end': () => void
8
9 'error': (options: { err: Error }) => void
10}
11
12declare interface AbstractTranscodingWrapper {
13 on<U extends keyof TranscodingWrapperEvents>(
14 event: U, listener: TranscodingWrapperEvents[U]
15 ): this
16
17 emit<U extends keyof TranscodingWrapperEvents>(
18 event: U, ...args: Parameters<TranscodingWrapperEvents[U]>
19 ): boolean
20}
21
22interface AbstractTranscodingWrapperOptions {
23 streamingPlaylist: MStreamingPlaylistVideo
24 videoLive: MVideoLiveVideo
25
26 lTags: LoggerTagsFn
27
28 inputUrl: string
29 fps: number
30 toTranscode: {
31 resolution: number
32 fps: number
33 }[]
34
35 bitrate: number
36 ratio: number
37 hasAudio: boolean
38
39 segmentListSize: number
40 segmentDuration: number
41
42 outDirectory: string
43}
44
45abstract class AbstractTranscodingWrapper extends EventEmitter {
46 protected readonly videoLive: MVideoLiveVideo
47
48 protected readonly toTranscode: {
49 resolution: number
50 fps: number
51 }[]
52
53 protected readonly inputUrl: string
54 protected readonly fps: number
55 protected readonly bitrate: number
56 protected readonly ratio: number
57 protected readonly hasAudio: boolean
58
59 protected readonly segmentListSize: number
60 protected readonly segmentDuration: number
61
62 protected readonly videoUUID: string
63
64 protected readonly outDirectory: string
65
66 protected readonly lTags: LoggerTagsFn
67
68 protected readonly streamingPlaylist: MStreamingPlaylistVideo
69
70 constructor (options: AbstractTranscodingWrapperOptions) {
71 super()
72
73 this.lTags = options.lTags
74
75 this.videoLive = options.videoLive
76 this.videoUUID = options.videoLive.Video.uuid
77 this.streamingPlaylist = options.streamingPlaylist
78
79 this.inputUrl = options.inputUrl
80 this.fps = options.fps
81 this.toTranscode = options.toTranscode
82
83 this.bitrate = options.bitrate
84 this.ratio = options.ratio
85 this.hasAudio = options.hasAudio
86
87 this.segmentListSize = options.segmentListSize
88 this.segmentDuration = options.segmentDuration
89
90 this.outDirectory = options.outDirectory
91 }
92
93 abstract run (): Promise<void>
94
95 abstract abort (error?: LiveVideoError): void
96}
97
98export {
99 AbstractTranscodingWrapper,
100 AbstractTranscodingWrapperOptions
101}
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
new file mode 100644
index 000000000..1f4c12bd4
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
@@ -0,0 +1,95 @@
1import { FfmpegCommand } from 'fluent-ffmpeg'
2import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
3import { logger } from '@server/helpers/logger'
4import { CONFIG } from '@server/initializers/config'
5import { VIDEO_LIVE } from '@server/initializers/constants'
6import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
7import { FFmpegLive } from '@shared/ffmpeg'
8import { getLiveSegmentTime } from '../../live-utils'
9import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
10
11export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
12 private ffmpegCommand: FfmpegCommand
13 private ended = false
14
15 async run () {
16 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
17 ? await this.buildFFmpegLive().getLiveTranscodingCommand({
18 inputUrl: this.inputUrl,
19
20 outPath: this.outDirectory,
21 masterPlaylistName: this.streamingPlaylist.playlistFilename,
22
23 segmentListSize: this.segmentListSize,
24 segmentDuration: this.segmentDuration,
25
26 toTranscode: this.toTranscode,
27
28 bitrate: this.bitrate,
29 ratio: this.ratio,
30
31 hasAudio: this.hasAudio
32 })
33 : this.buildFFmpegLive().getLiveMuxingCommand({
34 inputUrl: this.inputUrl,
35 outPath: this.outDirectory,
36
37 masterPlaylistName: this.streamingPlaylist.playlistFilename,
38
39 segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
40 segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode)
41 })
42
43 logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags())
44
45 this.ffmpegCommand.run()
46
47 let ffmpegShellCommand: string
48 this.ffmpegCommand.on('start', cmdline => {
49 ffmpegShellCommand = cmdline
50
51 logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() })
52 })
53
54 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
55 this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
56 })
57
58 this.ffmpegCommand.on('end', () => {
59 this.onFFmpegEnded()
60 })
61
62 this.ffmpegCommand.run()
63 }
64
65 abort () {
66 // Nothing to do, ffmpeg will automatically exit
67 }
68
69 private onFFmpegError (options: {
70 err: any
71 stdout: string
72 stderr: string
73 ffmpegShellCommand: string
74 }) {
75 const { err, stdout, stderr, ffmpegShellCommand } = options
76
77 // Don't care that we killed the ffmpeg process
78 if (err?.message?.includes('Exiting normally')) return
79
80 logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
81
82 this.emit('error', { err })
83 }
84
85 private onFFmpegEnded () {
86 if (this.ended) return
87
88 this.ended = true
89 this.emit('end')
90 }
91
92 private buildFFmpegLive () {
93 return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
94 }
95}
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts
new file mode 100644
index 000000000..ae28fa1ca
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/index.ts
@@ -0,0 +1,3 @@
1export * from './abstract-transcoding-wrapper'
2export * from './ffmpeg-transcoding-wrapper'
3export * from './remote-transcoding-wrapper'
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
new file mode 100644
index 000000000..345eaf442
--- /dev/null
+++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
@@ -0,0 +1,20 @@
1import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners'
2import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper'
3
4export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
5 async run () {
6 await new LiveRTMPHLSTranscodingJobHandler().create({
7 rtmpUrl: this.inputUrl,
8 toTranscode: this.toTranscode,
9 video: this.videoLive.Video,
10 outputDirectory: this.outDirectory,
11 playlist: this.streamingPlaylist,
12 segmentListSize: this.segmentListSize,
13 segmentDuration: this.segmentDuration
14 })
15 }
16
17 abort () {
18 this.emit('end')
19 }
20}