diff options
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/index.ts | 4 | ||||
-rw-r--r-- | server/lib/live/live-manager.ts | 552 | ||||
-rw-r--r-- | server/lib/live/live-quota-store.ts | 48 | ||||
-rw-r--r-- | server/lib/live/live-segment-sha-store.ts | 95 | ||||
-rw-r--r-- | server/lib/live/live-utils.ts | 99 | ||||
-rw-r--r-- | server/lib/live/shared/index.ts | 1 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 518 | ||||
-rw-r--r-- | server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts | 110 | ||||
-rw-r--r-- | server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts | 107 | ||||
-rw-r--r-- | server/lib/live/shared/transcoding-wrapper/index.ts | 3 | ||||
-rw-r--r-- | server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts | 21 |
11 files changed, 0 insertions, 1558 deletions
diff --git a/server/lib/live/index.ts b/server/lib/live/index.ts deleted file mode 100644 index 8b46800da..000000000 --- a/server/lib/live/index.ts +++ /dev/null | |||
@@ -1,4 +0,0 @@ | |||
1 | export * from './live-manager' | ||
2 | export * from './live-quota-store' | ||
3 | export * from './live-segment-sha-store' | ||
4 | export * from './live-utils' | ||
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts deleted file mode 100644 index acb7af274..000000000 --- a/server/lib/live/live-manager.ts +++ /dev/null | |||
@@ -1,552 +0,0 @@ | |||
1 | import { readdir, readFile } from 'fs-extra' | ||
2 | import { createServer, Server } from 'net' | ||
3 | import { join } from 'path' | ||
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | ||
5 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
6 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | ||
7 | import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' | ||
8 | import { sequelizeTypescript } from '@server/initializers/database' | ||
9 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
10 | import { UserModel } from '@server/models/user/user' | ||
11 | import { VideoModel } from '@server/models/video/video' | ||
12 | import { VideoLiveModel } from '@server/models/video/video-live' | ||
13 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
14 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | ||
15 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | ||
16 | import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' | ||
17 | import { pick, wait } from '@shared/core-utils' | ||
18 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg' | ||
19 | import { LiveVideoError, VideoState } from '@shared/models' | ||
20 | import { federateVideoIfNeeded } from '../activitypub/videos' | ||
21 | import { JobQueue } from '../job-queue' | ||
22 | import { getLiveReplayBaseDirectory } from '../paths' | ||
23 | import { PeerTubeSocket } from '../peertube-socket' | ||
24 | import { Hooks } from '../plugins/hooks' | ||
25 | import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions' | ||
26 | import { LiveQuotaStore } from './live-quota-store' | ||
27 | import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils' | ||
28 | import { MuxingSession } from './shared' | ||
29 | |||
30 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') | ||
31 | const context = require('node-media-server/src/node_core_ctx') | ||
32 | const nodeMediaServerLogger = require('node-media-server/src/node_core_logger') | ||
33 | |||
34 | // Disable node media server logs | ||
35 | nodeMediaServerLogger.setLogType(0) | ||
36 | |||
37 | const config = { | ||
38 | rtmp: { | ||
39 | port: CONFIG.LIVE.RTMP.PORT, | ||
40 | chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE, | ||
41 | gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, | ||
42 | ping: VIDEO_LIVE.RTMP.PING, | ||
43 | ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT | ||
44 | } | ||
45 | } | ||
46 | |||
47 | const lTags = loggerTagsFactory('live') | ||
48 | |||
49 | class LiveManager { | ||
50 | |||
51 | private static instance: LiveManager | ||
52 | |||
53 | private readonly muxingSessions = new Map<string, MuxingSession>() | ||
54 | private readonly videoSessions = new Map<string, string>() | ||
55 | |||
56 | private rtmpServer: Server | ||
57 | private rtmpsServer: ServerTLS | ||
58 | |||
59 | private running = false | ||
60 | |||
61 | private constructor () { | ||
62 | } | ||
63 | |||
64 | init () { | ||
65 | const events = this.getContext().nodeEvent | ||
66 | events.on('postPublish', (sessionId: string, streamPath: string) => { | ||
67 | logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) }) | ||
68 | |||
69 | const splittedPath = streamPath.split('/') | ||
70 | if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) { | ||
71 | logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) }) | ||
72 | return this.abortSession(sessionId) | ||
73 | } | ||
74 | |||
75 | const session = this.getContext().sessions.get(sessionId) | ||
76 | const inputLocalUrl = session.inputOriginLocalUrl + streamPath | ||
77 | const inputPublicUrl = session.inputOriginPublicUrl + streamPath | ||
78 | |||
79 | this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] }) | ||
80 | .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) | ||
81 | }) | ||
82 | |||
83 | events.on('donePublish', sessionId => { | ||
84 | logger.info('Live session ended.', { sessionId, ...lTags(sessionId) }) | ||
85 | |||
86 | // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU) | ||
87 | setTimeout(() => this.abortSession(sessionId), 2000) | ||
88 | }) | ||
89 | |||
90 | registerConfigChangedHandler(() => { | ||
91 | if (!this.running && CONFIG.LIVE.ENABLED === true) { | ||
92 | this.run().catch(err => logger.error('Cannot run live server.', { err })) | ||
93 | return | ||
94 | } | ||
95 | |||
96 | if (this.running && CONFIG.LIVE.ENABLED === false) { | ||
97 | this.stop() | ||
98 | } | ||
99 | }) | ||
100 | |||
101 | // Cleanup broken lives, that were terminated by a server restart for example | ||
102 | this.handleBrokenLives() | ||
103 | .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) | ||
104 | } | ||
105 | |||
106 | async run () { | ||
107 | this.running = true | ||
108 | |||
109 | if (CONFIG.LIVE.RTMP.ENABLED) { | ||
110 | logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags()) | ||
111 | |||
112 | this.rtmpServer = createServer(socket => { | ||
113 | const session = new NodeRtmpSession(config, socket) | ||
114 | |||
115 | session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT | ||
116 | session.inputOriginPublicUrl = WEBSERVER.RTMP_URL | ||
117 | session.run() | ||
118 | }) | ||
119 | |||
120 | this.rtmpServer.on('error', err => { | ||
121 | logger.error('Cannot run RTMP server.', { err, ...lTags() }) | ||
122 | }) | ||
123 | |||
124 | this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME) | ||
125 | } | ||
126 | |||
127 | if (CONFIG.LIVE.RTMPS.ENABLED) { | ||
128 | logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags()) | ||
129 | |||
130 | const [ key, cert ] = await Promise.all([ | ||
131 | readFile(CONFIG.LIVE.RTMPS.KEY_FILE), | ||
132 | readFile(CONFIG.LIVE.RTMPS.CERT_FILE) | ||
133 | ]) | ||
134 | const serverOptions = { key, cert } | ||
135 | |||
136 | this.rtmpsServer = createServerTLS(serverOptions, socket => { | ||
137 | const session = new NodeRtmpSession(config, socket) | ||
138 | |||
139 | session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT | ||
140 | session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL | ||
141 | session.run() | ||
142 | }) | ||
143 | |||
144 | this.rtmpsServer.on('error', err => { | ||
145 | logger.error('Cannot run RTMPS server.', { err, ...lTags() }) | ||
146 | }) | ||
147 | |||
148 | this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME) | ||
149 | } | ||
150 | } | ||
151 | |||
152 | stop () { | ||
153 | this.running = false | ||
154 | |||
155 | if (this.rtmpServer) { | ||
156 | logger.info('Stopping RTMP server.', lTags()) | ||
157 | |||
158 | this.rtmpServer.close() | ||
159 | this.rtmpServer = undefined | ||
160 | } | ||
161 | |||
162 | if (this.rtmpsServer) { | ||
163 | logger.info('Stopping RTMPS server.', lTags()) | ||
164 | |||
165 | this.rtmpsServer.close() | ||
166 | this.rtmpsServer = undefined | ||
167 | } | ||
168 | |||
169 | // Sessions is an object | ||
170 | this.getContext().sessions.forEach((session: any) => { | ||
171 | if (session instanceof NodeRtmpSession) { | ||
172 | session.stop() | ||
173 | } | ||
174 | }) | ||
175 | } | ||
176 | |||
177 | isRunning () { | ||
178 | return !!this.rtmpServer | ||
179 | } | ||
180 | |||
181 | hasSession (sessionId: string) { | ||
182 | return this.getContext().sessions.has(sessionId) | ||
183 | } | ||
184 | |||
185 | stopSessionOf (videoUUID: string, error: LiveVideoError | null) { | ||
186 | const sessionId = this.videoSessions.get(videoUUID) | ||
187 | if (!sessionId) { | ||
188 | logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID)) | ||
189 | return | ||
190 | } | ||
191 | |||
192 | logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) | ||
193 | |||
194 | this.saveEndingSession(videoUUID, error) | ||
195 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) })) | ||
196 | |||
197 | this.videoSessions.delete(videoUUID) | ||
198 | this.abortSession(sessionId) | ||
199 | } | ||
200 | |||
201 | private getContext () { | ||
202 | return context | ||
203 | } | ||
204 | |||
205 | private abortSession (sessionId: string) { | ||
206 | const session = this.getContext().sessions.get(sessionId) | ||
207 | if (session) { | ||
208 | session.stop() | ||
209 | this.getContext().sessions.delete(sessionId) | ||
210 | } | ||
211 | |||
212 | const muxingSession = this.muxingSessions.get(sessionId) | ||
213 | if (muxingSession) { | ||
214 | // Muxing session will fire and event so we correctly cleanup the session | ||
215 | muxingSession.abort() | ||
216 | |||
217 | this.muxingSessions.delete(sessionId) | ||
218 | } | ||
219 | } | ||
220 | |||
221 | private async handleSession (options: { | ||
222 | sessionId: string | ||
223 | inputLocalUrl: string | ||
224 | inputPublicUrl: string | ||
225 | streamKey: string | ||
226 | }) { | ||
227 | const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options | ||
228 | |||
229 | const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) | ||
230 | if (!videoLive) { | ||
231 | logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) | ||
232 | return this.abortSession(sessionId) | ||
233 | } | ||
234 | |||
235 | const video = videoLive.Video | ||
236 | if (video.isBlacklisted()) { | ||
237 | logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid)) | ||
238 | return this.abortSession(sessionId) | ||
239 | } | ||
240 | |||
241 | if (this.videoSessions.has(video.uuid)) { | ||
242 | logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid)) | ||
243 | return this.abortSession(sessionId) | ||
244 | } | ||
245 | |||
246 | // Cleanup old potential live (could happen with a permanent live) | ||
247 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | ||
248 | if (oldStreamingPlaylist) { | ||
249 | if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) | ||
250 | |||
251 | await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) | ||
252 | } | ||
253 | |||
254 | this.videoSessions.set(video.uuid, sessionId) | ||
255 | |||
256 | const now = Date.now() | ||
257 | const probe = await ffprobePromise(inputLocalUrl) | ||
258 | |||
259 | const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([ | ||
260 | getVideoStreamDimensionsInfo(inputLocalUrl, probe), | ||
261 | getVideoStreamFPS(inputLocalUrl, probe), | ||
262 | getVideoStreamBitrate(inputLocalUrl, probe), | ||
263 | hasAudioStream(inputLocalUrl, probe) | ||
264 | ]) | ||
265 | |||
266 | logger.info( | ||
267 | '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)', | ||
268 | inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) | ||
269 | ) | ||
270 | |||
271 | const allResolutions = await Hooks.wrapObject( | ||
272 | this.buildAllResolutionsToTranscode(resolution, hasAudio), | ||
273 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
274 | { video } | ||
275 | ) | ||
276 | |||
277 | logger.info( | ||
278 | 'Handling live video of original resolution %d.', resolution, | ||
279 | { allResolutions, ...lTags(sessionId, video.uuid) } | ||
280 | ) | ||
281 | |||
282 | return this.runMuxingSession({ | ||
283 | sessionId, | ||
284 | videoLive, | ||
285 | |||
286 | inputLocalUrl, | ||
287 | inputPublicUrl, | ||
288 | fps, | ||
289 | bitrate, | ||
290 | ratio, | ||
291 | allResolutions, | ||
292 | hasAudio | ||
293 | }) | ||
294 | } | ||
295 | |||
296 | private async runMuxingSession (options: { | ||
297 | sessionId: string | ||
298 | videoLive: MVideoLiveVideoWithSetting | ||
299 | |||
300 | inputLocalUrl: string | ||
301 | inputPublicUrl: string | ||
302 | |||
303 | fps: number | ||
304 | bitrate: number | ||
305 | ratio: number | ||
306 | allResolutions: number[] | ||
307 | hasAudio: boolean | ||
308 | }) { | ||
309 | const { sessionId, videoLive } = options | ||
310 | const videoUUID = videoLive.Video.uuid | ||
311 | const localLTags = lTags(sessionId, videoUUID) | ||
312 | |||
313 | const liveSession = await this.saveStartingSession(videoLive) | ||
314 | |||
315 | const user = await UserModel.loadByLiveId(videoLive.id) | ||
316 | LiveQuotaStore.Instance.addNewLive(user.id, sessionId) | ||
317 | |||
318 | const muxingSession = new MuxingSession({ | ||
319 | context: this.getContext(), | ||
320 | sessionId, | ||
321 | videoLive, | ||
322 | user, | ||
323 | |||
324 | ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) | ||
325 | }) | ||
326 | |||
327 | muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) | ||
328 | |||
329 | muxingSession.on('bad-socket-health', ({ videoUUID }) => { | ||
330 | logger.error( | ||
331 | 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + | ||
332 | ' Stopping session of video %s.', videoUUID, | ||
333 | localLTags | ||
334 | ) | ||
335 | |||
336 | this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH) | ||
337 | }) | ||
338 | |||
339 | muxingSession.on('duration-exceeded', ({ videoUUID }) => { | ||
340 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) | ||
341 | |||
342 | this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED) | ||
343 | }) | ||
344 | |||
345 | muxingSession.on('quota-exceeded', ({ videoUUID }) => { | ||
346 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) | ||
347 | |||
348 | this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED) | ||
349 | }) | ||
350 | |||
351 | muxingSession.on('transcoding-error', ({ videoUUID }) => { | ||
352 | this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR) | ||
353 | }) | ||
354 | |||
355 | muxingSession.on('transcoding-end', ({ videoUUID }) => { | ||
356 | this.onMuxingFFmpegEnd(videoUUID, sessionId) | ||
357 | }) | ||
358 | |||
359 | muxingSession.on('after-cleanup', ({ videoUUID }) => { | ||
360 | this.muxingSessions.delete(sessionId) | ||
361 | |||
362 | LiveQuotaStore.Instance.removeLive(user.id, sessionId) | ||
363 | |||
364 | muxingSession.destroy() | ||
365 | |||
366 | return this.onAfterMuxingCleanup({ videoUUID, liveSession }) | ||
367 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) | ||
368 | }) | ||
369 | |||
370 | this.muxingSessions.set(sessionId, muxingSession) | ||
371 | |||
372 | muxingSession.runMuxing() | ||
373 | .catch(err => { | ||
374 | logger.error('Cannot run muxing.', { err, ...localLTags }) | ||
375 | this.abortSession(sessionId) | ||
376 | }) | ||
377 | } | ||
378 | |||
379 | private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) { | ||
380 | const videoId = live.videoId | ||
381 | |||
382 | try { | ||
383 | const video = await VideoModel.loadFull(videoId) | ||
384 | |||
385 | logger.info('Will publish and federate live %s.', video.url, localLTags) | ||
386 | |||
387 | video.state = VideoState.PUBLISHED | ||
388 | video.publishedAt = new Date() | ||
389 | await video.save() | ||
390 | |||
391 | live.Video = video | ||
392 | |||
393 | await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) | ||
394 | |||
395 | try { | ||
396 | await federateVideoIfNeeded(video, false) | ||
397 | } catch (err) { | ||
398 | logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }) | ||
399 | } | ||
400 | |||
401 | PeerTubeSocket.Instance.sendVideoLiveNewState(video) | ||
402 | |||
403 | Hooks.runAction('action:live.video.state.updated', { video }) | ||
404 | } catch (err) { | ||
405 | logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) | ||
406 | } | ||
407 | } | ||
408 | |||
409 | private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) { | ||
410 | // Session already cleaned up | ||
411 | if (!this.videoSessions.has(videoUUID)) return | ||
412 | |||
413 | this.videoSessions.delete(videoUUID) | ||
414 | |||
415 | this.saveEndingSession(videoUUID, null) | ||
416 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | ||
417 | } | ||
418 | |||
419 | private async onAfterMuxingCleanup (options: { | ||
420 | videoUUID: string | ||
421 | liveSession?: MVideoLiveSession | ||
422 | cleanupNow?: boolean // Default false | ||
423 | }) { | ||
424 | const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options | ||
425 | |||
426 | logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID)) | ||
427 | |||
428 | try { | ||
429 | const fullVideo = await VideoModel.loadFull(videoUUID) | ||
430 | if (!fullVideo) return | ||
431 | |||
432 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) | ||
433 | |||
434 | const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id) | ||
435 | |||
436 | // On server restart during a live | ||
437 | if (!liveSession.endDate) { | ||
438 | liveSession.endDate = new Date() | ||
439 | await liveSession.save() | ||
440 | } | ||
441 | |||
442 | JobQueue.Instance.createJobAsync({ | ||
443 | type: 'video-live-ending', | ||
444 | payload: { | ||
445 | videoId: fullVideo.id, | ||
446 | |||
447 | replayDirectory: live.saveReplay | ||
448 | ? await this.findReplayDirectory(fullVideo) | ||
449 | : undefined, | ||
450 | |||
451 | liveSessionId: liveSession.id, | ||
452 | streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, | ||
453 | |||
454 | publishedAt: fullVideo.publishedAt.toISOString() | ||
455 | }, | ||
456 | |||
457 | delay: cleanupNow | ||
458 | ? 0 | ||
459 | : VIDEO_LIVE.CLEANUP_DELAY | ||
460 | }) | ||
461 | |||
462 | fullVideo.state = live.permanentLive | ||
463 | ? VideoState.WAITING_FOR_LIVE | ||
464 | : VideoState.LIVE_ENDED | ||
465 | |||
466 | await fullVideo.save() | ||
467 | |||
468 | PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) | ||
469 | |||
470 | await federateVideoIfNeeded(fullVideo, false) | ||
471 | |||
472 | Hooks.runAction('action:live.video.state.updated', { video: fullVideo }) | ||
473 | } catch (err) { | ||
474 | logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) }) | ||
475 | } | ||
476 | } | ||
477 | |||
478 | private async handleBrokenLives () { | ||
479 | await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' }) | ||
480 | |||
481 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | ||
482 | |||
483 | for (const uuid of videoUUIDs) { | ||
484 | await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true }) | ||
485 | } | ||
486 | } | ||
487 | |||
488 | private async findReplayDirectory (video: MVideo) { | ||
489 | const directory = getLiveReplayBaseDirectory(video) | ||
490 | const files = await readdir(directory) | ||
491 | |||
492 | if (files.length === 0) return undefined | ||
493 | |||
494 | return join(directory, files.sort().reverse()[0]) | ||
495 | } | ||
496 | |||
497 | private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) { | ||
498 | const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION | ||
499 | |||
500 | const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED | ||
501 | ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio }) | ||
502 | : [] | ||
503 | |||
504 | if (resolutionsEnabled.length === 0) { | ||
505 | return [ originResolution ] | ||
506 | } | ||
507 | |||
508 | return resolutionsEnabled | ||
509 | } | ||
510 | |||
511 | private async saveStartingSession (videoLive: MVideoLiveVideoWithSetting) { | ||
512 | const replaySettings = videoLive.saveReplay | ||
513 | ? new VideoLiveReplaySettingModel({ | ||
514 | privacy: videoLive.ReplaySetting.privacy | ||
515 | }) | ||
516 | : null | ||
517 | |||
518 | return sequelizeTypescript.transaction(async t => { | ||
519 | if (videoLive.saveReplay) { | ||
520 | await replaySettings.save({ transaction: t }) | ||
521 | } | ||
522 | |||
523 | return VideoLiveSessionModel.create({ | ||
524 | startDate: new Date(), | ||
525 | liveVideoId: videoLive.videoId, | ||
526 | saveReplay: videoLive.saveReplay, | ||
527 | replaySettingId: videoLive.saveReplay ? replaySettings.id : null, | ||
528 | endingProcessed: false | ||
529 | }, { transaction: t }) | ||
530 | }) | ||
531 | } | ||
532 | |||
533 | private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) { | ||
534 | const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID) | ||
535 | if (!liveSession) return | ||
536 | |||
537 | liveSession.endDate = new Date() | ||
538 | liveSession.error = error | ||
539 | |||
540 | return liveSession.save() | ||
541 | } | ||
542 | |||
543 | static get Instance () { | ||
544 | return this.instance || (this.instance = new this()) | ||
545 | } | ||
546 | } | ||
547 | |||
548 | // --------------------------------------------------------------------------- | ||
549 | |||
550 | export { | ||
551 | LiveManager | ||
552 | } | ||
diff --git a/server/lib/live/live-quota-store.ts b/server/lib/live/live-quota-store.ts deleted file mode 100644 index 44539faaa..000000000 --- a/server/lib/live/live-quota-store.ts +++ /dev/null | |||
@@ -1,48 +0,0 @@ | |||
1 | class LiveQuotaStore { | ||
2 | |||
3 | private static instance: LiveQuotaStore | ||
4 | |||
5 | private readonly livesPerUser = new Map<number, { sessionId: string, size: number }[]>() | ||
6 | |||
7 | private constructor () { | ||
8 | } | ||
9 | |||
10 | addNewLive (userId: number, sessionId: string) { | ||
11 | if (!this.livesPerUser.has(userId)) { | ||
12 | this.livesPerUser.set(userId, []) | ||
13 | } | ||
14 | |||
15 | const currentUserLive = { sessionId, size: 0 } | ||
16 | const livesOfUser = this.livesPerUser.get(userId) | ||
17 | livesOfUser.push(currentUserLive) | ||
18 | } | ||
19 | |||
20 | removeLive (userId: number, sessionId: string) { | ||
21 | const newLivesPerUser = this.livesPerUser.get(userId) | ||
22 | .filter(o => o.sessionId !== sessionId) | ||
23 | |||
24 | this.livesPerUser.set(userId, newLivesPerUser) | ||
25 | } | ||
26 | |||
27 | addQuotaTo (userId: number, sessionId: string, size: number) { | ||
28 | const lives = this.livesPerUser.get(userId) | ||
29 | const live = lives.find(l => l.sessionId === sessionId) | ||
30 | |||
31 | live.size += size | ||
32 | } | ||
33 | |||
34 | getLiveQuotaOf (userId: number) { | ||
35 | const currentLives = this.livesPerUser.get(userId) | ||
36 | if (!currentLives) return 0 | ||
37 | |||
38 | return currentLives.reduce((sum, obj) => sum + obj.size, 0) | ||
39 | } | ||
40 | |||
41 | static get Instance () { | ||
42 | return this.instance || (this.instance = new this()) | ||
43 | } | ||
44 | } | ||
45 | |||
46 | export { | ||
47 | LiveQuotaStore | ||
48 | } | ||
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts deleted file mode 100644 index 8253c0274..000000000 --- a/server/lib/live/live-segment-sha-store.ts +++ /dev/null | |||
@@ -1,95 +0,0 @@ | |||
1 | import { rename, writeJson } from 'fs-extra' | ||
2 | import PQueue from 'p-queue' | ||
3 | import { basename } from 'path' | ||
4 | import { mapToJSON } from '@server/helpers/core-utils' | ||
5 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
6 | import { MStreamingPlaylistVideo } from '@server/types/models' | ||
7 | import { buildSha256Segment } from '../hls' | ||
8 | import { storeHLSFileFromPath } from '../object-storage' | ||
9 | |||
10 | const lTags = loggerTagsFactory('live') | ||
11 | |||
12 | class LiveSegmentShaStore { | ||
13 | |||
14 | private readonly segmentsSha256 = new Map<string, string>() | ||
15 | |||
16 | private readonly videoUUID: string | ||
17 | |||
18 | private readonly sha256Path: string | ||
19 | private readonly sha256PathTMP: string | ||
20 | |||
21 | private readonly streamingPlaylist: MStreamingPlaylistVideo | ||
22 | private readonly sendToObjectStorage: boolean | ||
23 | private readonly writeQueue = new PQueue({ concurrency: 1 }) | ||
24 | |||
25 | constructor (options: { | ||
26 | videoUUID: string | ||
27 | sha256Path: string | ||
28 | streamingPlaylist: MStreamingPlaylistVideo | ||
29 | sendToObjectStorage: boolean | ||
30 | }) { | ||
31 | this.videoUUID = options.videoUUID | ||
32 | |||
33 | this.sha256Path = options.sha256Path | ||
34 | this.sha256PathTMP = options.sha256Path + '.tmp' | ||
35 | |||
36 | this.streamingPlaylist = options.streamingPlaylist | ||
37 | this.sendToObjectStorage = options.sendToObjectStorage | ||
38 | } | ||
39 | |||
40 | async addSegmentSha (segmentPath: string) { | ||
41 | logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID)) | ||
42 | |||
43 | const shaResult = await buildSha256Segment(segmentPath) | ||
44 | |||
45 | const segmentName = basename(segmentPath) | ||
46 | this.segmentsSha256.set(segmentName, shaResult) | ||
47 | |||
48 | try { | ||
49 | await this.writeToDisk() | ||
50 | } catch (err) { | ||
51 | logger.error('Cannot write sha segments to disk.', { err }) | ||
52 | } | ||
53 | } | ||
54 | |||
55 | async removeSegmentSha (segmentPath: string) { | ||
56 | const segmentName = basename(segmentPath) | ||
57 | |||
58 | logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) | ||
59 | |||
60 | if (!this.segmentsSha256.has(segmentName)) { | ||
61 | logger.warn( | ||
62 | 'Unknown segment in live segment hash store for video %s and segment %s.', | ||
63 | this.videoUUID, segmentPath, lTags(this.videoUUID) | ||
64 | ) | ||
65 | return | ||
66 | } | ||
67 | |||
68 | this.segmentsSha256.delete(segmentName) | ||
69 | |||
70 | await this.writeToDisk() | ||
71 | } | ||
72 | |||
73 | private writeToDisk () { | ||
74 | return this.writeQueue.add(async () => { | ||
75 | logger.debug(`Writing segment sha JSON ${this.sha256Path} of ${this.videoUUID} on disk.`, lTags(this.videoUUID)) | ||
76 | |||
77 | // Atomic write: use rename instead of move that is not atomic | ||
78 | await writeJson(this.sha256PathTMP, mapToJSON(this.segmentsSha256)) | ||
79 | await rename(this.sha256PathTMP, this.sha256Path) | ||
80 | |||
81 | if (this.sendToObjectStorage) { | ||
82 | const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path) | ||
83 | |||
84 | if (this.streamingPlaylist.segmentsSha256Url !== url) { | ||
85 | this.streamingPlaylist.segmentsSha256Url = url | ||
86 | await this.streamingPlaylist.save() | ||
87 | } | ||
88 | } | ||
89 | }) | ||
90 | } | ||
91 | } | ||
92 | |||
93 | export { | ||
94 | LiveSegmentShaStore | ||
95 | } | ||
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts deleted file mode 100644 index 3fb3ce1ce..000000000 --- a/server/lib/live/live-utils.ts +++ /dev/null | |||
@@ -1,99 +0,0 @@ | |||
1 | import { pathExists, readdir, remove } from 'fs-extra' | ||
2 | import { basename, join } from 'path' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { VIDEO_LIVE } from '@server/initializers/constants' | ||
5 | import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' | ||
6 | import { LiveVideoLatencyMode, VideoStorage } from '@shared/models' | ||
7 | import { listHLSFileKeysOf, removeHLSFileObjectStorageByFullKey, removeHLSObjectStorage } from '../object-storage' | ||
8 | import { getLiveDirectory } from '../paths' | ||
9 | |||
10 | function buildConcatenatedName (segmentOrPlaylistPath: string) { | ||
11 | const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) | ||
12 | |||
13 | return 'concat-' + num[1] + '.ts' | ||
14 | } | ||
15 | |||
16 | async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | ||
17 | await cleanupTMPLiveFiles(video, streamingPlaylist) | ||
18 | |||
19 | await streamingPlaylist.destroy() | ||
20 | } | ||
21 | |||
22 | async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | ||
23 | const hlsDirectory = getLiveDirectory(video) | ||
24 | |||
25 | // We uploaded files to object storage too, remove them | ||
26 | if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
27 | await removeHLSObjectStorage(streamingPlaylist.withVideo(video)) | ||
28 | } | ||
29 | |||
30 | await remove(hlsDirectory) | ||
31 | |||
32 | await streamingPlaylist.destroy() | ||
33 | } | ||
34 | |||
35 | async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | ||
36 | await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video)) | ||
37 | |||
38 | await cleanupTMPLiveFilesFromFilesystem(video) | ||
39 | } | ||
40 | |||
41 | function getLiveSegmentTime (latencyMode: LiveVideoLatencyMode) { | ||
42 | if (latencyMode === LiveVideoLatencyMode.SMALL_LATENCY) { | ||
43 | return VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY | ||
44 | } | ||
45 | |||
46 | return VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY | ||
47 | } | ||
48 | |||
49 | export { | ||
50 | cleanupAndDestroyPermanentLive, | ||
51 | cleanupUnsavedNormalLive, | ||
52 | cleanupTMPLiveFiles, | ||
53 | getLiveSegmentTime, | ||
54 | buildConcatenatedName | ||
55 | } | ||
56 | |||
57 | // --------------------------------------------------------------------------- | ||
58 | |||
59 | function isTMPLiveFile (name: string) { | ||
60 | return name.endsWith('.ts') || | ||
61 | name.endsWith('.m3u8') || | ||
62 | name.endsWith('.json') || | ||
63 | name.endsWith('.mpd') || | ||
64 | name.endsWith('.m4s') || | ||
65 | name.endsWith('.tmp') | ||
66 | } | ||
67 | |||
68 | async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) { | ||
69 | const hlsDirectory = getLiveDirectory(video) | ||
70 | |||
71 | if (!await pathExists(hlsDirectory)) return | ||
72 | |||
73 | logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory) | ||
74 | |||
75 | const files = await readdir(hlsDirectory) | ||
76 | |||
77 | for (const filename of files) { | ||
78 | if (isTMPLiveFile(filename)) { | ||
79 | const p = join(hlsDirectory, filename) | ||
80 | |||
81 | remove(p) | ||
82 | .catch(err => logger.error('Cannot remove %s.', p, { err })) | ||
83 | } | ||
84 | } | ||
85 | } | ||
86 | |||
87 | async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) { | ||
88 | if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | ||
89 | |||
90 | logger.info('Cleanup TMP live files from object storage for %s.', streamingPlaylist.Video.uuid) | ||
91 | |||
92 | const keys = await listHLSFileKeysOf(streamingPlaylist) | ||
93 | |||
94 | for (const key of keys) { | ||
95 | if (isTMPLiveFile(key)) { | ||
96 | await removeHLSFileObjectStorageByFullKey(key) | ||
97 | } | ||
98 | } | ||
99 | } | ||
diff --git a/server/lib/live/shared/index.ts b/server/lib/live/shared/index.ts deleted file mode 100644 index c4d1b59ec..000000000 --- a/server/lib/live/shared/index.ts +++ /dev/null | |||
@@ -1 +0,0 @@ | |||
1 | export * from './muxing-session' | ||
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts deleted file mode 100644 index 02691b651..000000000 --- a/server/lib/live/shared/muxing-session.ts +++ /dev/null | |||
@@ -1,518 +0,0 @@ | |||
1 | import { mapSeries } from 'bluebird' | ||
2 | import { FSWatcher, watch } from 'chokidar' | ||
3 | import { EventEmitter } from 'events' | ||
4 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' | ||
5 | import PQueue from 'p-queue' | ||
6 | import { basename, join } from 'path' | ||
7 | import { computeOutputFPS } from '@server/helpers/ffmpeg' | ||
8 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' | ||
9 | import { CONFIG } from '@server/initializers/config' | ||
10 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' | ||
11 | import { removeHLSFileObjectStorageByPath, storeHLSFileFromContent, storeHLSFileFromPath } from '@server/lib/object-storage' | ||
12 | import { VideoFileModel } from '@server/models/video/video-file' | ||
13 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | ||
14 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | ||
15 | import { VideoStorage, VideoStreamingPlaylistType } from '@shared/models' | ||
16 | import { | ||
17 | generateHLSMasterPlaylistFilename, | ||
18 | generateHlsSha256SegmentsFilename, | ||
19 | getLiveDirectory, | ||
20 | getLiveReplayBaseDirectory | ||
21 | } from '../../paths' | ||
22 | import { isAbleToUploadVideo } from '../../user' | ||
23 | import { LiveQuotaStore } from '../live-quota-store' | ||
24 | import { LiveSegmentShaStore } from '../live-segment-sha-store' | ||
25 | import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils' | ||
26 | import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper' | ||
27 | |||
28 | import memoizee = require('memoizee') | ||
29 | interface MuxingSessionEvents { | ||
30 | 'live-ready': (options: { videoUUID: string }) => void | ||
31 | |||
32 | 'bad-socket-health': (options: { videoUUID: string }) => void | ||
33 | 'duration-exceeded': (options: { videoUUID: string }) => void | ||
34 | 'quota-exceeded': (options: { videoUUID: string }) => void | ||
35 | |||
36 | 'transcoding-end': (options: { videoUUID: string }) => void | ||
37 | 'transcoding-error': (options: { videoUUID: string }) => void | ||
38 | |||
39 | 'after-cleanup': (options: { videoUUID: string }) => void | ||
40 | } | ||
41 | |||
42 | declare interface MuxingSession { | ||
43 | on<U extends keyof MuxingSessionEvents>( | ||
44 | event: U, listener: MuxingSessionEvents[U] | ||
45 | ): this | ||
46 | |||
47 | emit<U extends keyof MuxingSessionEvents>( | ||
48 | event: U, ...args: Parameters<MuxingSessionEvents[U]> | ||
49 | ): boolean | ||
50 | } | ||
51 | |||
52 | class MuxingSession extends EventEmitter { | ||
53 | |||
54 | private transcodingWrapper: AbstractTranscodingWrapper | ||
55 | |||
56 | private readonly context: any | ||
57 | private readonly user: MUserId | ||
58 | private readonly sessionId: string | ||
59 | private readonly videoLive: MVideoLiveVideo | ||
60 | |||
61 | private readonly inputLocalUrl: string | ||
62 | private readonly inputPublicUrl: string | ||
63 | |||
64 | private readonly fps: number | ||
65 | private readonly allResolutions: number[] | ||
66 | |||
67 | private readonly bitrate: number | ||
68 | private readonly ratio: number | ||
69 | |||
70 | private readonly hasAudio: boolean | ||
71 | |||
72 | private readonly videoUUID: string | ||
73 | private readonly saveReplay: boolean | ||
74 | |||
75 | private readonly outDirectory: string | ||
76 | private readonly replayDirectory: string | ||
77 | |||
78 | private readonly lTags: LoggerTagsFn | ||
79 | |||
80 | // Path -> Queue | ||
81 | private readonly objectStorageSendQueues = new Map<string, PQueue>() | ||
82 | |||
83 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} | ||
84 | |||
85 | private streamingPlaylist: MStreamingPlaylistVideo | ||
86 | private liveSegmentShaStore: LiveSegmentShaStore | ||
87 | |||
88 | private filesWatcher: FSWatcher | ||
89 | |||
90 | private masterPlaylistCreated = false | ||
91 | private liveReady = false | ||
92 | |||
93 | private aborted = false | ||
94 | |||
95 | private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => { | ||
96 | return isAbleToUploadVideo(userId, 1000) | ||
97 | }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) | ||
98 | |||
99 | private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => { | ||
100 | return this.hasClientSocketInBadHealth(sessionId) | ||
101 | }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH }) | ||
102 | |||
103 | constructor (options: { | ||
104 | context: any | ||
105 | user: MUserId | ||
106 | sessionId: string | ||
107 | videoLive: MVideoLiveVideo | ||
108 | |||
109 | inputLocalUrl: string | ||
110 | inputPublicUrl: string | ||
111 | |||
112 | fps: number | ||
113 | bitrate: number | ||
114 | ratio: number | ||
115 | allResolutions: number[] | ||
116 | hasAudio: boolean | ||
117 | }) { | ||
118 | super() | ||
119 | |||
120 | this.context = options.context | ||
121 | this.user = options.user | ||
122 | this.sessionId = options.sessionId | ||
123 | this.videoLive = options.videoLive | ||
124 | |||
125 | this.inputLocalUrl = options.inputLocalUrl | ||
126 | this.inputPublicUrl = options.inputPublicUrl | ||
127 | |||
128 | this.fps = options.fps | ||
129 | |||
130 | this.bitrate = options.bitrate | ||
131 | this.ratio = options.ratio | ||
132 | |||
133 | this.hasAudio = options.hasAudio | ||
134 | |||
135 | this.allResolutions = options.allResolutions | ||
136 | |||
137 | this.videoUUID = this.videoLive.Video.uuid | ||
138 | |||
139 | this.saveReplay = this.videoLive.saveReplay | ||
140 | |||
141 | this.outDirectory = getLiveDirectory(this.videoLive.Video) | ||
142 | this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) | ||
143 | |||
144 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) | ||
145 | } | ||
146 | |||
147 | async runMuxing () { | ||
148 | this.streamingPlaylist = await this.createLivePlaylist() | ||
149 | |||
150 | this.createLiveShaStore() | ||
151 | this.createFiles() | ||
152 | |||
153 | await this.prepareDirectories() | ||
154 | |||
155 | this.transcodingWrapper = this.buildTranscodingWrapper() | ||
156 | |||
157 | this.transcodingWrapper.on('end', () => this.onTranscodedEnded()) | ||
158 | this.transcodingWrapper.on('error', () => this.onTranscodingError()) | ||
159 | |||
160 | await this.transcodingWrapper.run() | ||
161 | |||
162 | this.filesWatcher = watch(this.outDirectory, { depth: 0 }) | ||
163 | |||
164 | this.watchMasterFile() | ||
165 | this.watchTSFiles() | ||
166 | } | ||
167 | |||
168 | abort () { | ||
169 | if (!this.transcodingWrapper) return | ||
170 | |||
171 | this.aborted = true | ||
172 | this.transcodingWrapper.abort() | ||
173 | } | ||
174 | |||
175 | destroy () { | ||
176 | this.removeAllListeners() | ||
177 | this.isAbleToUploadVideoWithCache.clear() | ||
178 | this.hasClientSocketInBadHealthWithCache.clear() | ||
179 | } | ||
180 | |||
181 | private watchMasterFile () { | ||
182 | this.filesWatcher.on('add', async path => { | ||
183 | if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return | ||
184 | if (this.masterPlaylistCreated === true) return | ||
185 | |||
186 | try { | ||
187 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
188 | const masterContent = await readFile(path, 'utf-8') | ||
189 | logger.debug('Uploading live master playlist on object storage for %s', this.videoUUID, { masterContent, ...this.lTags() }) | ||
190 | |||
191 | const url = await storeHLSFileFromContent(this.streamingPlaylist, this.streamingPlaylist.playlistFilename, masterContent) | ||
192 | |||
193 | this.streamingPlaylist.playlistUrl = url | ||
194 | } | ||
195 | |||
196 | this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions) | ||
197 | |||
198 | await this.streamingPlaylist.save() | ||
199 | } catch (err) { | ||
200 | logger.error('Cannot update streaming playlist.', { err, ...this.lTags() }) | ||
201 | } | ||
202 | |||
203 | this.masterPlaylistCreated = true | ||
204 | |||
205 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) | ||
206 | }) | ||
207 | } | ||
208 | |||
209 | private watchTSFiles () { | ||
210 | const startStreamDateTime = new Date().getTime() | ||
211 | |||
212 | const addHandler = async (segmentPath: string) => { | ||
213 | if (segmentPath.endsWith('.ts') !== true) return | ||
214 | |||
215 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) | ||
216 | |||
217 | const playlistId = this.getPlaylistIdFromTS(segmentPath) | ||
218 | |||
219 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] | ||
220 | this.processSegments(segmentsToProcess) | ||
221 | |||
222 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] | ||
223 | |||
224 | if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { | ||
225 | this.emit('bad-socket-health', { videoUUID: this.videoUUID }) | ||
226 | return | ||
227 | } | ||
228 | |||
229 | // Duration constraint check | ||
230 | if (this.isDurationConstraintValid(startStreamDateTime) !== true) { | ||
231 | this.emit('duration-exceeded', { videoUUID: this.videoUUID }) | ||
232 | return | ||
233 | } | ||
234 | |||
235 | // Check user quota if the user enabled replay saving | ||
236 | if (await this.isQuotaExceeded(segmentPath) === true) { | ||
237 | this.emit('quota-exceeded', { videoUUID: this.videoUUID }) | ||
238 | } | ||
239 | } | ||
240 | |||
241 | const deleteHandler = async (segmentPath: string) => { | ||
242 | if (segmentPath.endsWith('.ts') !== true) return | ||
243 | |||
244 | logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags()) | ||
245 | |||
246 | try { | ||
247 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) | ||
248 | } catch (err) { | ||
249 | logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() }) | ||
250 | } | ||
251 | |||
252 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
253 | try { | ||
254 | await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath) | ||
255 | } catch (err) { | ||
256 | logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() }) | ||
257 | } | ||
258 | } | ||
259 | } | ||
260 | |||
261 | this.filesWatcher.on('add', p => addHandler(p)) | ||
262 | this.filesWatcher.on('unlink', p => deleteHandler(p)) | ||
263 | } | ||
264 | |||
265 | private async isQuotaExceeded (segmentPath: string) { | ||
266 | if (this.saveReplay !== true) return false | ||
267 | if (this.aborted) return false | ||
268 | |||
269 | try { | ||
270 | const segmentStat = await stat(segmentPath) | ||
271 | |||
272 | LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.sessionId, segmentStat.size) | ||
273 | |||
274 | const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id) | ||
275 | |||
276 | return canUpload !== true | ||
277 | } catch (err) { | ||
278 | logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags() }) | ||
279 | } | ||
280 | } | ||
281 | |||
282 | private createFiles () { | ||
283 | for (let i = 0; i < this.allResolutions.length; i++) { | ||
284 | const resolution = this.allResolutions[i] | ||
285 | |||
286 | const file = new VideoFileModel({ | ||
287 | resolution, | ||
288 | size: -1, | ||
289 | extname: '.ts', | ||
290 | infoHash: null, | ||
291 | fps: this.fps, | ||
292 | storage: this.streamingPlaylist.storage, | ||
293 | videoStreamingPlaylistId: this.streamingPlaylist.id | ||
294 | }) | ||
295 | |||
296 | VideoFileModel.customUpsert(file, 'streaming-playlist', null) | ||
297 | .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags() })) | ||
298 | } | ||
299 | } | ||
300 | |||
301 | private async prepareDirectories () { | ||
302 | await ensureDir(this.outDirectory) | ||
303 | |||
304 | if (this.videoLive.saveReplay === true) { | ||
305 | await ensureDir(this.replayDirectory) | ||
306 | } | ||
307 | } | ||
308 | |||
309 | private isDurationConstraintValid (streamingStartTime: number) { | ||
310 | const maxDuration = CONFIG.LIVE.MAX_DURATION | ||
311 | // No limit | ||
312 | if (maxDuration < 0) return true | ||
313 | |||
314 | const now = new Date().getTime() | ||
315 | const max = streamingStartTime + maxDuration | ||
316 | |||
317 | return now <= max | ||
318 | } | ||
319 | |||
320 | private processSegments (segmentPaths: string[]) { | ||
321 | mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment)) | ||
322 | .catch(err => { | ||
323 | if (this.aborted) return | ||
324 | |||
325 | logger.error('Cannot process segments', { err, ...this.lTags() }) | ||
326 | }) | ||
327 | } | ||
328 | |||
329 | private async processSegment (segmentPath: string) { | ||
330 | // Add sha hash of previous segments, because ffmpeg should have finished generating them | ||
331 | await this.liveSegmentShaStore.addSegmentSha(segmentPath) | ||
332 | |||
333 | if (this.saveReplay) { | ||
334 | await this.addSegmentToReplay(segmentPath) | ||
335 | } | ||
336 | |||
337 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
338 | try { | ||
339 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) | ||
340 | |||
341 | await this.processM3U8ToObjectStorage(segmentPath) | ||
342 | } catch (err) { | ||
343 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) | ||
344 | } | ||
345 | } | ||
346 | |||
347 | // Master playlist and segment JSON file are created, live is ready | ||
348 | if (this.masterPlaylistCreated && !this.liveReady) { | ||
349 | this.liveReady = true | ||
350 | |||
351 | this.emit('live-ready', { videoUUID: this.videoUUID }) | ||
352 | } | ||
353 | } | ||
354 | |||
355 | private async processM3U8ToObjectStorage (segmentPath: string) { | ||
356 | const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath)) | ||
357 | |||
358 | logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags()) | ||
359 | |||
360 | const segmentName = basename(segmentPath) | ||
361 | |||
362 | const playlistContent = await readFile(m3u8Path, 'utf-8') | ||
363 | // Remove new chunk references, that will be processed later | ||
364 | const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n' | ||
365 | |||
366 | try { | ||
367 | if (!this.objectStorageSendQueues.has(m3u8Path)) { | ||
368 | this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | ||
369 | } | ||
370 | |||
371 | const queue = this.objectStorageSendQueues.get(m3u8Path) | ||
372 | await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent)) | ||
373 | } catch (err) { | ||
374 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
375 | } | ||
376 | } | ||
377 | |||
378 | private onTranscodingError () { | ||
379 | this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) | ||
380 | } | ||
381 | |||
382 | private onTranscodedEnded () { | ||
383 | this.emit('transcoding-end', ({ videoUUID: this.videoUUID })) | ||
384 | |||
385 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputLocalUrl, this.lTags()) | ||
386 | |||
387 | setTimeout(() => { | ||
388 | // Wait latest segments generation, and close watchers | ||
389 | |||
390 | const promise = this.filesWatcher?.close() || Promise.resolve() | ||
391 | promise | ||
392 | .then(() => { | ||
393 | // Process remaining segments hash | ||
394 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | ||
395 | this.processSegments(this.segmentsToProcessPerPlaylist[key]) | ||
396 | } | ||
397 | }) | ||
398 | .catch(err => { | ||
399 | logger.error( | ||
400 | 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, | ||
401 | { err, ...this.lTags() } | ||
402 | ) | ||
403 | }) | ||
404 | |||
405 | this.emit('after-cleanup', { videoUUID: this.videoUUID }) | ||
406 | }, 1000) | ||
407 | } | ||
408 | |||
409 | private hasClientSocketInBadHealth (sessionId: string) { | ||
410 | const rtmpSession = this.context.sessions.get(sessionId) | ||
411 | |||
412 | if (!rtmpSession) { | ||
413 | logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags()) | ||
414 | return | ||
415 | } | ||
416 | |||
417 | for (const playerSessionId of rtmpSession.players) { | ||
418 | const playerSession = this.context.sessions.get(playerSessionId) | ||
419 | |||
420 | if (!playerSession) { | ||
421 | logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags()) | ||
422 | continue | ||
423 | } | ||
424 | |||
425 | if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) { | ||
426 | return true | ||
427 | } | ||
428 | } | ||
429 | |||
430 | return false | ||
431 | } | ||
432 | |||
433 | private async addSegmentToReplay (segmentPath: string) { | ||
434 | const segmentName = basename(segmentPath) | ||
435 | const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) | ||
436 | |||
437 | try { | ||
438 | const data = await readFile(segmentPath) | ||
439 | |||
440 | await appendFile(dest, data) | ||
441 | } catch (err) { | ||
442 | logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() }) | ||
443 | } | ||
444 | } | ||
445 | |||
446 | private async createLivePlaylist (): Promise<MStreamingPlaylistVideo> { | ||
447 | const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(this.videoLive.Video) | ||
448 | |||
449 | playlist.playlistFilename = generateHLSMasterPlaylistFilename(true) | ||
450 | playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true) | ||
451 | |||
452 | playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION | ||
453 | playlist.type = VideoStreamingPlaylistType.HLS | ||
454 | |||
455 | playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED | ||
456 | ? VideoStorage.OBJECT_STORAGE | ||
457 | : VideoStorage.FILE_SYSTEM | ||
458 | |||
459 | return playlist.save() | ||
460 | } | ||
461 | |||
462 | private createLiveShaStore () { | ||
463 | this.liveSegmentShaStore = new LiveSegmentShaStore({ | ||
464 | videoUUID: this.videoLive.Video.uuid, | ||
465 | sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), | ||
466 | streamingPlaylist: this.streamingPlaylist, | ||
467 | sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED | ||
468 | }) | ||
469 | } | ||
470 | |||
471 | private buildTranscodingWrapper () { | ||
472 | const options = { | ||
473 | streamingPlaylist: this.streamingPlaylist, | ||
474 | videoLive: this.videoLive, | ||
475 | |||
476 | lTags: this.lTags, | ||
477 | |||
478 | sessionId: this.sessionId, | ||
479 | inputLocalUrl: this.inputLocalUrl, | ||
480 | inputPublicUrl: this.inputPublicUrl, | ||
481 | |||
482 | toTranscode: this.allResolutions.map(resolution => ({ | ||
483 | resolution, | ||
484 | fps: computeOutputFPS({ inputFPS: this.fps, resolution }) | ||
485 | })), | ||
486 | |||
487 | fps: this.fps, | ||
488 | bitrate: this.bitrate, | ||
489 | ratio: this.ratio, | ||
490 | hasAudio: this.hasAudio, | ||
491 | |||
492 | segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, | ||
493 | segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode), | ||
494 | |||
495 | outDirectory: this.outDirectory | ||
496 | } | ||
497 | |||
498 | return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED | ||
499 | ? new RemoteTranscodingWrapper(options) | ||
500 | : new FFmpegTranscodingWrapper(options) | ||
501 | } | ||
502 | |||
503 | private getPlaylistIdFromTS (segmentPath: string) { | ||
504 | const playlistIdMatcher = /^([\d+])-/ | ||
505 | |||
506 | return basename(segmentPath).match(playlistIdMatcher)[1] | ||
507 | } | ||
508 | |||
509 | private getPlaylistNameFromTS (segmentPath: string) { | ||
510 | return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8` | ||
511 | } | ||
512 | } | ||
513 | |||
514 | // --------------------------------------------------------------------------- | ||
515 | |||
516 | export { | ||
517 | MuxingSession | ||
518 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts deleted file mode 100644 index 95168745d..000000000 --- a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts +++ /dev/null | |||
@@ -1,110 +0,0 @@ | |||
1 | import EventEmitter from 'events' | ||
2 | import { LoggerTagsFn } from '@server/helpers/logger' | ||
3 | import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models' | ||
4 | import { LiveVideoError } from '@shared/models' | ||
5 | |||
6 | interface TranscodingWrapperEvents { | ||
7 | 'end': () => void | ||
8 | |||
9 | 'error': (options: { err: Error }) => void | ||
10 | } | ||
11 | |||
12 | declare interface AbstractTranscodingWrapper { | ||
13 | on<U extends keyof TranscodingWrapperEvents>( | ||
14 | event: U, listener: TranscodingWrapperEvents[U] | ||
15 | ): this | ||
16 | |||
17 | emit<U extends keyof TranscodingWrapperEvents>( | ||
18 | event: U, ...args: Parameters<TranscodingWrapperEvents[U]> | ||
19 | ): boolean | ||
20 | } | ||
21 | |||
22 | interface AbstractTranscodingWrapperOptions { | ||
23 | streamingPlaylist: MStreamingPlaylistVideo | ||
24 | videoLive: MVideoLiveVideo | ||
25 | |||
26 | lTags: LoggerTagsFn | ||
27 | |||
28 | sessionId: string | ||
29 | inputLocalUrl: string | ||
30 | inputPublicUrl: string | ||
31 | |||
32 | fps: number | ||
33 | toTranscode: { | ||
34 | resolution: number | ||
35 | fps: number | ||
36 | }[] | ||
37 | |||
38 | bitrate: number | ||
39 | ratio: number | ||
40 | hasAudio: boolean | ||
41 | |||
42 | segmentListSize: number | ||
43 | segmentDuration: number | ||
44 | |||
45 | outDirectory: string | ||
46 | } | ||
47 | |||
48 | abstract class AbstractTranscodingWrapper extends EventEmitter { | ||
49 | protected readonly videoLive: MVideoLiveVideo | ||
50 | |||
51 | protected readonly toTranscode: { | ||
52 | resolution: number | ||
53 | fps: number | ||
54 | }[] | ||
55 | |||
56 | protected readonly sessionId: string | ||
57 | protected readonly inputLocalUrl: string | ||
58 | protected readonly inputPublicUrl: string | ||
59 | |||
60 | protected readonly fps: number | ||
61 | protected readonly bitrate: number | ||
62 | protected readonly ratio: number | ||
63 | protected readonly hasAudio: boolean | ||
64 | |||
65 | protected readonly segmentListSize: number | ||
66 | protected readonly segmentDuration: number | ||
67 | |||
68 | protected readonly videoUUID: string | ||
69 | |||
70 | protected readonly outDirectory: string | ||
71 | |||
72 | protected readonly lTags: LoggerTagsFn | ||
73 | |||
74 | protected readonly streamingPlaylist: MStreamingPlaylistVideo | ||
75 | |||
76 | constructor (options: AbstractTranscodingWrapperOptions) { | ||
77 | super() | ||
78 | |||
79 | this.lTags = options.lTags | ||
80 | |||
81 | this.videoLive = options.videoLive | ||
82 | this.videoUUID = options.videoLive.Video.uuid | ||
83 | this.streamingPlaylist = options.streamingPlaylist | ||
84 | |||
85 | this.sessionId = options.sessionId | ||
86 | this.inputLocalUrl = options.inputLocalUrl | ||
87 | this.inputPublicUrl = options.inputPublicUrl | ||
88 | |||
89 | this.fps = options.fps | ||
90 | this.toTranscode = options.toTranscode | ||
91 | |||
92 | this.bitrate = options.bitrate | ||
93 | this.ratio = options.ratio | ||
94 | this.hasAudio = options.hasAudio | ||
95 | |||
96 | this.segmentListSize = options.segmentListSize | ||
97 | this.segmentDuration = options.segmentDuration | ||
98 | |||
99 | this.outDirectory = options.outDirectory | ||
100 | } | ||
101 | |||
102 | abstract run (): Promise<void> | ||
103 | |||
104 | abstract abort (error?: LiveVideoError): void | ||
105 | } | ||
106 | |||
107 | export { | ||
108 | AbstractTranscodingWrapper, | ||
109 | AbstractTranscodingWrapperOptions | ||
110 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts deleted file mode 100644 index c6ee8ebf1..000000000 --- a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts +++ /dev/null | |||
@@ -1,107 +0,0 @@ | |||
1 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
2 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { CONFIG } from '@server/initializers/config' | ||
5 | import { VIDEO_LIVE } from '@server/initializers/constants' | ||
6 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | ||
7 | import { FFmpegLive } from '@shared/ffmpeg' | ||
8 | import { getLiveSegmentTime } from '../../live-utils' | ||
9 | import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' | ||
10 | |||
11 | export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { | ||
12 | private ffmpegCommand: FfmpegCommand | ||
13 | |||
14 | private aborted = false | ||
15 | private errored = false | ||
16 | private ended = false | ||
17 | |||
18 | async run () { | ||
19 | this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED | ||
20 | ? await this.buildFFmpegLive().getLiveTranscodingCommand({ | ||
21 | inputUrl: this.inputLocalUrl, | ||
22 | |||
23 | outPath: this.outDirectory, | ||
24 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
25 | |||
26 | segmentListSize: this.segmentListSize, | ||
27 | segmentDuration: this.segmentDuration, | ||
28 | |||
29 | toTranscode: this.toTranscode, | ||
30 | |||
31 | bitrate: this.bitrate, | ||
32 | ratio: this.ratio, | ||
33 | |||
34 | hasAudio: this.hasAudio | ||
35 | }) | ||
36 | : this.buildFFmpegLive().getLiveMuxingCommand({ | ||
37 | inputUrl: this.inputLocalUrl, | ||
38 | outPath: this.outDirectory, | ||
39 | |||
40 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
41 | |||
42 | segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, | ||
43 | segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode) | ||
44 | }) | ||
45 | |||
46 | logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | ||
47 | |||
48 | let ffmpegShellCommand: string | ||
49 | this.ffmpegCommand.on('start', cmdline => { | ||
50 | ffmpegShellCommand = cmdline | ||
51 | |||
52 | logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) | ||
53 | }) | ||
54 | |||
55 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
56 | this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) | ||
57 | }) | ||
58 | |||
59 | this.ffmpegCommand.on('end', () => { | ||
60 | this.onFFmpegEnded() | ||
61 | }) | ||
62 | |||
63 | this.ffmpegCommand.run() | ||
64 | } | ||
65 | |||
66 | abort () { | ||
67 | if (this.ended || this.errored || this.aborted) return | ||
68 | |||
69 | logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags()) | ||
70 | |||
71 | this.ffmpegCommand.kill('SIGINT') | ||
72 | |||
73 | this.aborted = true | ||
74 | this.emit('end') | ||
75 | } | ||
76 | |||
77 | private onFFmpegError (options: { | ||
78 | err: any | ||
79 | stdout: string | ||
80 | stderr: string | ||
81 | ffmpegShellCommand: string | ||
82 | }) { | ||
83 | const { err, stdout, stderr, ffmpegShellCommand } = options | ||
84 | |||
85 | // Don't care that we killed the ffmpeg process | ||
86 | if (err?.message?.includes('Exiting normally')) return | ||
87 | if (this.ended || this.errored || this.aborted) return | ||
88 | |||
89 | logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) | ||
90 | |||
91 | this.errored = true | ||
92 | this.emit('error', { err }) | ||
93 | } | ||
94 | |||
95 | private onFFmpegEnded () { | ||
96 | if (this.ended || this.errored || this.aborted) return | ||
97 | |||
98 | logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags()) | ||
99 | |||
100 | this.ended = true | ||
101 | this.emit('end') | ||
102 | } | ||
103 | |||
104 | private buildFFmpegLive () { | ||
105 | return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) | ||
106 | } | ||
107 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts deleted file mode 100644 index ae28fa1ca..000000000 --- a/server/lib/live/shared/transcoding-wrapper/index.ts +++ /dev/null | |||
@@ -1,3 +0,0 @@ | |||
1 | export * from './abstract-transcoding-wrapper' | ||
2 | export * from './ffmpeg-transcoding-wrapper' | ||
3 | export * from './remote-transcoding-wrapper' | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts deleted file mode 100644 index 2aeeb31fb..000000000 --- a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts +++ /dev/null | |||
@@ -1,21 +0,0 @@ | |||
1 | import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners' | ||
2 | import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' | ||
3 | |||
4 | export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper { | ||
5 | async run () { | ||
6 | await new LiveRTMPHLSTranscodingJobHandler().create({ | ||
7 | rtmpUrl: this.inputPublicUrl, | ||
8 | sessionId: this.sessionId, | ||
9 | toTranscode: this.toTranscode, | ||
10 | video: this.videoLive.Video, | ||
11 | outputDirectory: this.outDirectory, | ||
12 | playlist: this.streamingPlaylist, | ||
13 | segmentListSize: this.segmentListSize, | ||
14 | segmentDuration: this.segmentDuration | ||
15 | }) | ||
16 | } | ||
17 | |||
18 | abort () { | ||
19 | this.emit('end') | ||
20 | } | ||
21 | } | ||