diff options
Diffstat (limited to 'server/lib/live/shared')
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 106 |
1 files changed, 91 insertions, 15 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 505717dce..4c27d5dd8 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers | |||
9 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' | 9 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' |
10 | import { CONFIG } from '@server/initializers/config' | 10 | import { CONFIG } from '@server/initializers/config' |
11 | import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' | 11 | import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' |
12 | import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' | ||
12 | import { VideoFileModel } from '@server/models/video/video-file' | 13 | import { VideoFileModel } from '@server/models/video/video-file' |
13 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | 14 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' |
15 | import { VideoStorage } from '@shared/models' | ||
14 | import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' | 16 | import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' |
15 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' | 17 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' |
16 | import { isAbleToUploadVideo } from '../../user' | 18 | import { isAbleToUploadVideo } from '../../user' |
@@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils' | |||
21 | import memoizee = require('memoizee') | 23 | import memoizee = require('memoizee') |
22 | 24 | ||
23 | interface MuxingSessionEvents { | 25 | interface MuxingSessionEvents { |
24 | 'master-playlist-created': (options: { videoId: number }) => void | 26 | 'live-ready': (options: { videoId: number }) => void |
25 | 27 | ||
26 | 'bad-socket-health': (options: { videoId: number }) => void | 28 | 'bad-socket-health': (options: { videoId: number }) => void |
27 | 'duration-exceeded': (options: { videoId: number }) => void | 29 | 'duration-exceeded': (options: { videoId: number }) => void |
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter { | |||
68 | private readonly outDirectory: string | 70 | private readonly outDirectory: string |
69 | private readonly replayDirectory: string | 71 | private readonly replayDirectory: string |
70 | 72 | ||
73 | private readonly liveSegmentShaStore: LiveSegmentShaStore | ||
74 | |||
71 | private readonly lTags: LoggerTagsFn | 75 | private readonly lTags: LoggerTagsFn |
72 | 76 | ||
73 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} | 77 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} |
74 | 78 | ||
75 | private tsWatcher: FSWatcher | 79 | private tsWatcher: FSWatcher |
76 | private masterWatcher: FSWatcher | 80 | private masterWatcher: FSWatcher |
81 | private m3u8Watcher: FSWatcher | ||
82 | |||
83 | private masterPlaylistCreated = false | ||
84 | private liveReady = false | ||
77 | 85 | ||
78 | private aborted = false | 86 | private aborted = false |
79 | 87 | ||
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter { | |||
123 | this.outDirectory = getLiveDirectory(this.videoLive.Video) | 131 | this.outDirectory = getLiveDirectory(this.videoLive.Video) |
124 | this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) | 132 | this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) |
125 | 133 | ||
134 | this.liveSegmentShaStore = new LiveSegmentShaStore({ | ||
135 | videoUUID: this.videoLive.Video.uuid, | ||
136 | sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), | ||
137 | streamingPlaylist: this.streamingPlaylist, | ||
138 | sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED | ||
139 | }) | ||
140 | |||
126 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) | 141 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) |
127 | } | 142 | } |
128 | 143 | ||
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter { | |||
159 | 174 | ||
160 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | 175 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) |
161 | 176 | ||
162 | this.watchTSFiles() | ||
163 | this.watchMasterFile() | 177 | this.watchMasterFile() |
178 | this.watchTSFiles() | ||
179 | this.watchM3U8File() | ||
164 | 180 | ||
165 | let ffmpegShellCommand: string | 181 | let ffmpegShellCommand: string |
166 | this.ffmpegCommand.on('start', cmdline => { | 182 | this.ffmpegCommand.on('start', cmdline => { |
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter { | |||
219 | setTimeout(() => { | 235 | setTimeout(() => { |
220 | // Wait latest segments generation, and close watchers | 236 | // Wait latest segments generation, and close watchers |
221 | 237 | ||
222 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) | 238 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) |
223 | .then(() => { | 239 | .then(() => { |
224 | // Process remaining segments hash | 240 | // Process remaining segments hash |
225 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 241 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |
@@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter { | |||
240 | private watchMasterFile () { | 256 | private watchMasterFile () { |
241 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) | 257 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) |
242 | 258 | ||
243 | this.masterWatcher.on('add', () => { | 259 | this.masterWatcher.on('add', async () => { |
244 | this.emit('master-playlist-created', { videoId: this.videoId }) | 260 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
261 | try { | ||
262 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) | ||
263 | |||
264 | this.streamingPlaylist.playlistUrl = url | ||
265 | await this.streamingPlaylist.save() | ||
266 | } catch (err) { | ||
267 | logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() }) | ||
268 | } | ||
269 | } | ||
270 | |||
271 | this.masterPlaylistCreated = true | ||
245 | 272 | ||
246 | this.masterWatcher.close() | 273 | this.masterWatcher.close() |
247 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) | 274 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) |
248 | }) | 275 | }) |
249 | } | 276 | } |
250 | 277 | ||
278 | private watchM3U8File () { | ||
279 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') | ||
280 | |||
281 | const onChangeOrAdd = async (m3u8Path: string) => { | ||
282 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | ||
283 | |||
284 | try { | ||
285 | await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) | ||
286 | } catch (err) { | ||
287 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
288 | } | ||
289 | } | ||
290 | |||
291 | this.m3u8Watcher.on('change', onChangeOrAdd) | ||
292 | } | ||
293 | |||
251 | private watchTSFiles () { | 294 | private watchTSFiles () { |
252 | const startStreamDateTime = new Date().getTime() | 295 | const startStreamDateTime = new Date().getTime() |
253 | 296 | ||
@@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter { | |||
282 | } | 325 | } |
283 | } | 326 | } |
284 | 327 | ||
285 | const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) | 328 | const deleteHandler = async (segmentPath: string) => { |
329 | try { | ||
330 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) | ||
331 | } catch (err) { | ||
332 | logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() }) | ||
333 | } | ||
334 | |||
335 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
336 | try { | ||
337 | await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath) | ||
338 | } catch (err) { | ||
339 | logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() }) | ||
340 | } | ||
341 | } | ||
342 | } | ||
286 | 343 | ||
287 | this.tsWatcher.on('add', p => addHandler(p)) | 344 | this.tsWatcher.on('add', p => addHandler(p)) |
288 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 345 | this.tsWatcher.on('unlink', p => deleteHandler(p)) |
@@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter { | |||
315 | extname: '.ts', | 372 | extname: '.ts', |
316 | infoHash: null, | 373 | infoHash: null, |
317 | fps: this.fps, | 374 | fps: this.fps, |
375 | storage: this.streamingPlaylist.storage, | ||
318 | videoStreamingPlaylistId: this.streamingPlaylist.id | 376 | videoStreamingPlaylistId: this.streamingPlaylist.id |
319 | }) | 377 | }) |
320 | 378 | ||
@@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter { | |||
343 | } | 401 | } |
344 | 402 | ||
345 | private processSegments (segmentPaths: string[]) { | 403 | private processSegments (segmentPaths: string[]) { |
346 | mapSeries(segmentPaths, async previousSegment => { | 404 | mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment)) |
347 | // Add sha hash of previous segments, because ffmpeg should have finished generating them | 405 | .catch(err => { |
348 | await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) | 406 | if (this.aborted) return |
407 | |||
408 | logger.error('Cannot process segments', { err, ...this.lTags() }) | ||
409 | }) | ||
410 | } | ||
349 | 411 | ||
350 | if (this.saveReplay) { | 412 | private async processSegment (segmentPath: string) { |
351 | await this.addSegmentToReplay(previousSegment) | 413 | // Add sha hash of previous segments, because ffmpeg should have finished generating them |
414 | await this.liveSegmentShaStore.addSegmentSha(segmentPath) | ||
415 | |||
416 | if (this.saveReplay) { | ||
417 | await this.addSegmentToReplay(segmentPath) | ||
418 | } | ||
419 | |||
420 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
421 | try { | ||
422 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) | ||
423 | } catch (err) { | ||
424 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) | ||
352 | } | 425 | } |
353 | }).catch(err => { | 426 | } |
354 | if (this.aborted) return | ||
355 | 427 | ||
356 | logger.error('Cannot process segments', { err, ...this.lTags() }) | 428 | // Master playlist and segment JSON file are created, live is ready |
357 | }) | 429 | if (this.masterPlaylistCreated && !this.liveReady) { |
430 | this.liveReady = true | ||
431 | |||
432 | this.emit('live-ready', { videoId: this.videoId }) | ||
433 | } | ||
358 | } | 434 | } |
359 | 435 | ||
360 | private hasClientSocketInBadHealth (sessionId: string) { | 436 | private hasClientSocketInBadHealth (sessionId: string) { |