diff options
author | Chocobozzz <me@florianbigard.com> | 2022-10-04 10:03:17 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-10-04 10:03:17 +0200 |
commit | cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1 (patch) | |
tree | dc899a1504ecac588e5580553e02571e0f5d7e4b /server/lib/live | |
parent | 9c0cdc5047918b959ebd5e075ddad81eb7fb93f0 (diff) | |
download | PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.gz PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.tar.zst PeerTube-cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1.zip |
Live supports object storage
* Sync live files (segments, master playlist, resolution playlist,
segment sha file) into object storage
* Automatically delete them when the live ends
* Segment sha file is now a file on disk, and not stored in memory
anymore
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/live-manager.ts | 12 | ||||
-rw-r--r-- | server/lib/live/live-segment-sha-store.ts | 75 | ||||
-rw-r--r-- | server/lib/live/live-utils.ts | 67 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 106 |
4 files changed, 188 insertions, 72 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 16715862b..9470b530b 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | |||
21 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 21 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
22 | import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' | 22 | import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' |
23 | import { pick, wait } from '@shared/core-utils' | 23 | import { pick, wait } from '@shared/core-utils' |
24 | import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' | 24 | import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models' |
25 | import { federateVideoIfNeeded } from '../activitypub/videos' | 25 | import { federateVideoIfNeeded } from '../activitypub/videos' |
26 | import { JobQueue } from '../job-queue' | 26 | import { JobQueue } from '../job-queue' |
27 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' | 27 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' |
28 | import { PeerTubeSocket } from '../peertube-socket' | 28 | import { PeerTubeSocket } from '../peertube-socket' |
29 | import { Hooks } from '../plugins/hooks' | 29 | import { Hooks } from '../plugins/hooks' |
30 | import { LiveQuotaStore } from './live-quota-store' | 30 | import { LiveQuotaStore } from './live-quota-store' |
31 | import { cleanupPermanentLive } from './live-utils' | 31 | import { cleanupAndDestroyPermanentLive } from './live-utils' |
32 | import { MuxingSession } from './shared' | 32 | import { MuxingSession } from './shared' |
33 | 33 | ||
34 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') | 34 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') |
@@ -224,7 +224,7 @@ class LiveManager { | |||
224 | if (oldStreamingPlaylist) { | 224 | if (oldStreamingPlaylist) { |
225 | if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) | 225 | if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) |
226 | 226 | ||
227 | await cleanupPermanentLive(video, oldStreamingPlaylist) | 227 | await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) |
228 | } | 228 | } |
229 | 229 | ||
230 | this.videoSessions.set(video.id, sessionId) | 230 | this.videoSessions.set(video.id, sessionId) |
@@ -301,7 +301,7 @@ class LiveManager { | |||
301 | ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) | 301 | ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) |
302 | }) | 302 | }) |
303 | 303 | ||
304 | muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) | 304 | muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) |
305 | 305 | ||
306 | muxingSession.on('bad-socket-health', ({ videoId }) => { | 306 | muxingSession.on('bad-socket-health', ({ videoId }) => { |
307 | logger.error( | 307 | logger.error( |
@@ -485,6 +485,10 @@ class LiveManager { | |||
485 | 485 | ||
486 | playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) | 486 | playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) |
487 | 487 | ||
488 | playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED | ||
489 | ? VideoStorage.OBJECT_STORAGE | ||
490 | : VideoStorage.FILE_SYSTEM | ||
491 | |||
488 | return playlist.save() | 492 | return playlist.save() |
489 | } | 493 | } |
490 | 494 | ||
diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts index 4af6f3ebf..faf03dccf 100644 --- a/server/lib/live/live-segment-sha-store.ts +++ b/server/lib/live/live-segment-sha-store.ts | |||
@@ -1,62 +1,73 @@ | |||
1 | import { writeJson } from 'fs-extra' | ||
1 | import { basename } from 'path' | 2 | import { basename } from 'path' |
3 | import { mapToJSON } from '@server/helpers/core-utils' | ||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
5 | import { MStreamingPlaylistVideo } from '@server/types/models' | ||
3 | import { buildSha256Segment } from '../hls' | 6 | import { buildSha256Segment } from '../hls' |
7 | import { storeHLSFileFromPath } from '../object-storage' | ||
4 | 8 | ||
5 | const lTags = loggerTagsFactory('live') | 9 | const lTags = loggerTagsFactory('live') |
6 | 10 | ||
7 | class LiveSegmentShaStore { | 11 | class LiveSegmentShaStore { |
8 | 12 | ||
9 | private static instance: LiveSegmentShaStore | 13 | private readonly segmentsSha256 = new Map<string, string>() |
10 | 14 | ||
11 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() | 15 | private readonly videoUUID: string |
12 | 16 | private readonly sha256Path: string | |
13 | private constructor () { | 17 | private readonly streamingPlaylist: MStreamingPlaylistVideo |
18 | private readonly sendToObjectStorage: boolean | ||
19 | |||
20 | constructor (options: { | ||
21 | videoUUID: string | ||
22 | sha256Path: string | ||
23 | streamingPlaylist: MStreamingPlaylistVideo | ||
24 | sendToObjectStorage: boolean | ||
25 | }) { | ||
26 | this.videoUUID = options.videoUUID | ||
27 | this.sha256Path = options.sha256Path | ||
28 | this.streamingPlaylist = options.streamingPlaylist | ||
29 | this.sendToObjectStorage = options.sendToObjectStorage | ||
14 | } | 30 | } |
15 | 31 | ||
16 | getSegmentsSha256 (videoUUID: string) { | 32 | async addSegmentSha (segmentPath: string) { |
17 | return this.segmentsSha256.get(videoUUID) | 33 | logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID)) |
18 | } | ||
19 | |||
20 | async addSegmentSha (videoUUID: string, segmentPath: string) { | ||
21 | const segmentName = basename(segmentPath) | ||
22 | logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID)) | ||
23 | 34 | ||
24 | const shaResult = await buildSha256Segment(segmentPath) | 35 | const shaResult = await buildSha256Segment(segmentPath) |
25 | 36 | ||
26 | if (!this.segmentsSha256.has(videoUUID)) { | 37 | const segmentName = basename(segmentPath) |
27 | this.segmentsSha256.set(videoUUID, new Map()) | 38 | this.segmentsSha256.set(segmentName, shaResult) |
28 | } | ||
29 | 39 | ||
30 | const filesMap = this.segmentsSha256.get(videoUUID) | 40 | await this.writeToDisk() |
31 | filesMap.set(segmentName, shaResult) | ||
32 | } | 41 | } |
33 | 42 | ||
34 | removeSegmentSha (videoUUID: string, segmentPath: string) { | 43 | async removeSegmentSha (segmentPath: string) { |
35 | const segmentName = basename(segmentPath) | 44 | const segmentName = basename(segmentPath) |
36 | 45 | ||
37 | logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) | 46 | logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) |
38 | 47 | ||
39 | const filesMap = this.segmentsSha256.get(videoUUID) | 48 | if (!this.segmentsSha256.has(segmentName)) { |
40 | if (!filesMap) { | 49 | logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID)) |
41 | logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID)) | ||
42 | return | 50 | return |
43 | } | 51 | } |
44 | 52 | ||
45 | if (!filesMap.has(segmentName)) { | 53 | this.segmentsSha256.delete(segmentName) |
46 | logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID)) | ||
47 | return | ||
48 | } | ||
49 | 54 | ||
50 | filesMap.delete(segmentName) | 55 | await this.writeToDisk() |
51 | } | 56 | } |
52 | 57 | ||
53 | cleanupShaSegments (videoUUID: string) { | 58 | private async writeToDisk () { |
54 | this.segmentsSha256.delete(videoUUID) | 59 | await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256)) |
55 | } | ||
56 | 60 | ||
57 | static get Instance () { | 61 | if (this.sendToObjectStorage) { |
58 | return this.instance || (this.instance = new this()) | 62 | const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path) |
63 | |||
64 | if (this.streamingPlaylist.segmentsSha256Url !== url) { | ||
65 | this.streamingPlaylist.segmentsSha256Url = url | ||
66 | await this.streamingPlaylist.save() | ||
67 | } | ||
68 | } | ||
59 | } | 69 | } |
70 | |||
60 | } | 71 | } |
61 | 72 | ||
62 | export { | 73 | export { |
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index bba876642..d2b8e3a55 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts | |||
@@ -1,9 +1,10 @@ | |||
1 | import { pathExists, readdir, remove } from 'fs-extra' | 1 | import { pathExists, readdir, remove } from 'fs-extra' |
2 | import { basename, join } from 'path' | 2 | import { basename, join } from 'path' |
3 | import { logger } from '@server/helpers/logger' | 3 | import { logger } from '@server/helpers/logger' |
4 | import { MStreamingPlaylist, MVideo } from '@server/types/models' | 4 | import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' |
5 | import { VideoStorage } from '@shared/models' | ||
6 | import { listHLSFileKeysOf, removeHLSFileObjectStorage, removeHLSObjectStorage } from '../object-storage' | ||
5 | import { getLiveDirectory } from '../paths' | 7 | import { getLiveDirectory } from '../paths' |
6 | import { LiveSegmentShaStore } from './live-segment-sha-store' | ||
7 | 8 | ||
8 | function buildConcatenatedName (segmentOrPlaylistPath: string) { | 9 | function buildConcatenatedName (segmentOrPlaylistPath: string) { |
9 | const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) | 10 | const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) |
@@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { | |||
11 | return 'concat-' + num[1] + '.ts' | 12 | return 'concat-' + num[1] + '.ts' |
12 | } | 13 | } |
13 | 14 | ||
14 | async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | 15 | async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { |
15 | await cleanupTMPLiveFiles(video) | 16 | await cleanupTMPLiveFiles(video, streamingPlaylist) |
16 | 17 | ||
17 | await streamingPlaylist.destroy() | 18 | await streamingPlaylist.destroy() |
18 | } | 19 | } |
@@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin | |||
20 | async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | 21 | async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { |
21 | const hlsDirectory = getLiveDirectory(video) | 22 | const hlsDirectory = getLiveDirectory(video) |
22 | 23 | ||
24 | // We uploaded files to object storage too, remove them | ||
25 | if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
26 | await removeHLSObjectStorage(streamingPlaylist.withVideo(video)) | ||
27 | } | ||
28 | |||
23 | await remove(hlsDirectory) | 29 | await remove(hlsDirectory) |
24 | 30 | ||
25 | await streamingPlaylist.destroy() | 31 | await streamingPlaylist.destroy() |
32 | } | ||
26 | 33 | ||
27 | LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) | 34 | async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) { |
35 | await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video)) | ||
36 | |||
37 | await cleanupTMPLiveFilesFromFilesystem(video) | ||
28 | } | 38 | } |
29 | 39 | ||
30 | async function cleanupTMPLiveFiles (video: MVideo) { | 40 | export { |
31 | const hlsDirectory = getLiveDirectory(video) | 41 | cleanupAndDestroyPermanentLive, |
42 | cleanupUnsavedNormalLive, | ||
43 | cleanupTMPLiveFiles, | ||
44 | buildConcatenatedName | ||
45 | } | ||
46 | |||
47 | // --------------------------------------------------------------------------- | ||
32 | 48 | ||
33 | LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) | 49 | function isTMPLiveFile (name: string) { |
50 | return name.endsWith('.ts') || | ||
51 | name.endsWith('.m3u8') || | ||
52 | name.endsWith('.json') || | ||
53 | name.endsWith('.mpd') || | ||
54 | name.endsWith('.m4s') || | ||
55 | name.endsWith('.tmp') | ||
56 | } | ||
57 | |||
58 | async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) { | ||
59 | const hlsDirectory = getLiveDirectory(video) | ||
34 | 60 | ||
35 | if (!await pathExists(hlsDirectory)) return | 61 | if (!await pathExists(hlsDirectory)) return |
36 | 62 | ||
37 | logger.info('Cleanup TMP live files of %s.', hlsDirectory) | 63 | logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory) |
38 | 64 | ||
39 | const files = await readdir(hlsDirectory) | 65 | const files = await readdir(hlsDirectory) |
40 | 66 | ||
41 | for (const filename of files) { | 67 | for (const filename of files) { |
42 | if ( | 68 | if (isTMPLiveFile(filename)) { |
43 | filename.endsWith('.ts') || | ||
44 | filename.endsWith('.m3u8') || | ||
45 | filename.endsWith('.mpd') || | ||
46 | filename.endsWith('.m4s') || | ||
47 | filename.endsWith('.tmp') | ||
48 | ) { | ||
49 | const p = join(hlsDirectory, filename) | 69 | const p = join(hlsDirectory, filename) |
50 | 70 | ||
51 | remove(p) | 71 | remove(p) |
@@ -54,9 +74,14 @@ async function cleanupTMPLiveFiles (video: MVideo) { | |||
54 | } | 74 | } |
55 | } | 75 | } |
56 | 76 | ||
57 | export { | 77 | async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) { |
58 | cleanupPermanentLive, | 78 | if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return |
59 | cleanupUnsavedNormalLive, | 79 | |
60 | cleanupTMPLiveFiles, | 80 | const keys = await listHLSFileKeysOf(streamingPlaylist) |
61 | buildConcatenatedName | 81 | |
82 | for (const key of keys) { | ||
83 | if (isTMPLiveFile(key)) { | ||
84 | await removeHLSFileObjectStorage(streamingPlaylist, key) | ||
85 | } | ||
86 | } | ||
62 | } | 87 | } |
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 505717dce..4c27d5dd8 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers | |||
9 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' | 9 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' |
10 | import { CONFIG } from '@server/initializers/config' | 10 | import { CONFIG } from '@server/initializers/config' |
11 | import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' | 11 | import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' |
12 | import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' | ||
12 | import { VideoFileModel } from '@server/models/video/video-file' | 13 | import { VideoFileModel } from '@server/models/video/video-file' |
13 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | 14 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' |
15 | import { VideoStorage } from '@shared/models' | ||
14 | import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' | 16 | import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' |
15 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' | 17 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' |
16 | import { isAbleToUploadVideo } from '../../user' | 18 | import { isAbleToUploadVideo } from '../../user' |
@@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils' | |||
21 | import memoizee = require('memoizee') | 23 | import memoizee = require('memoizee') |
22 | 24 | ||
23 | interface MuxingSessionEvents { | 25 | interface MuxingSessionEvents { |
24 | 'master-playlist-created': (options: { videoId: number }) => void | 26 | 'live-ready': (options: { videoId: number }) => void |
25 | 27 | ||
26 | 'bad-socket-health': (options: { videoId: number }) => void | 28 | 'bad-socket-health': (options: { videoId: number }) => void |
27 | 'duration-exceeded': (options: { videoId: number }) => void | 29 | 'duration-exceeded': (options: { videoId: number }) => void |
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter { | |||
68 | private readonly outDirectory: string | 70 | private readonly outDirectory: string |
69 | private readonly replayDirectory: string | 71 | private readonly replayDirectory: string |
70 | 72 | ||
73 | private readonly liveSegmentShaStore: LiveSegmentShaStore | ||
74 | |||
71 | private readonly lTags: LoggerTagsFn | 75 | private readonly lTags: LoggerTagsFn |
72 | 76 | ||
73 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} | 77 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} |
74 | 78 | ||
75 | private tsWatcher: FSWatcher | 79 | private tsWatcher: FSWatcher |
76 | private masterWatcher: FSWatcher | 80 | private masterWatcher: FSWatcher |
81 | private m3u8Watcher: FSWatcher | ||
82 | |||
83 | private masterPlaylistCreated = false | ||
84 | private liveReady = false | ||
77 | 85 | ||
78 | private aborted = false | 86 | private aborted = false |
79 | 87 | ||
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter { | |||
123 | this.outDirectory = getLiveDirectory(this.videoLive.Video) | 131 | this.outDirectory = getLiveDirectory(this.videoLive.Video) |
124 | this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) | 132 | this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) |
125 | 133 | ||
134 | this.liveSegmentShaStore = new LiveSegmentShaStore({ | ||
135 | videoUUID: this.videoLive.Video.uuid, | ||
136 | sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), | ||
137 | streamingPlaylist: this.streamingPlaylist, | ||
138 | sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED | ||
139 | }) | ||
140 | |||
126 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) | 141 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) |
127 | } | 142 | } |
128 | 143 | ||
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter { | |||
159 | 174 | ||
160 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | 175 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) |
161 | 176 | ||
162 | this.watchTSFiles() | ||
163 | this.watchMasterFile() | 177 | this.watchMasterFile() |
178 | this.watchTSFiles() | ||
179 | this.watchM3U8File() | ||
164 | 180 | ||
165 | let ffmpegShellCommand: string | 181 | let ffmpegShellCommand: string |
166 | this.ffmpegCommand.on('start', cmdline => { | 182 | this.ffmpegCommand.on('start', cmdline => { |
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter { | |||
219 | setTimeout(() => { | 235 | setTimeout(() => { |
220 | // Wait latest segments generation, and close watchers | 236 | // Wait latest segments generation, and close watchers |
221 | 237 | ||
222 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) | 238 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) |
223 | .then(() => { | 239 | .then(() => { |
224 | // Process remaining segments hash | 240 | // Process remaining segments hash |
225 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 241 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |
@@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter { | |||
240 | private watchMasterFile () { | 256 | private watchMasterFile () { |
241 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) | 257 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) |
242 | 258 | ||
243 | this.masterWatcher.on('add', () => { | 259 | this.masterWatcher.on('add', async () => { |
244 | this.emit('master-playlist-created', { videoId: this.videoId }) | 260 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
261 | try { | ||
262 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) | ||
263 | |||
264 | this.streamingPlaylist.playlistUrl = url | ||
265 | await this.streamingPlaylist.save() | ||
266 | } catch (err) { | ||
267 | logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() }) | ||
268 | } | ||
269 | } | ||
270 | |||
271 | this.masterPlaylistCreated = true | ||
245 | 272 | ||
246 | this.masterWatcher.close() | 273 | this.masterWatcher.close() |
247 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) | 274 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) |
248 | }) | 275 | }) |
249 | } | 276 | } |
250 | 277 | ||
278 | private watchM3U8File () { | ||
279 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') | ||
280 | |||
281 | const onChangeOrAdd = async (m3u8Path: string) => { | ||
282 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | ||
283 | |||
284 | try { | ||
285 | await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) | ||
286 | } catch (err) { | ||
287 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
288 | } | ||
289 | } | ||
290 | |||
291 | this.m3u8Watcher.on('change', onChangeOrAdd) | ||
292 | } | ||
293 | |||
251 | private watchTSFiles () { | 294 | private watchTSFiles () { |
252 | const startStreamDateTime = new Date().getTime() | 295 | const startStreamDateTime = new Date().getTime() |
253 | 296 | ||
@@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter { | |||
282 | } | 325 | } |
283 | } | 326 | } |
284 | 327 | ||
285 | const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) | 328 | const deleteHandler = async (segmentPath: string) => { |
329 | try { | ||
330 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) | ||
331 | } catch (err) { | ||
332 | logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() }) | ||
333 | } | ||
334 | |||
335 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
336 | try { | ||
337 | await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath) | ||
338 | } catch (err) { | ||
339 | logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() }) | ||
340 | } | ||
341 | } | ||
342 | } | ||
286 | 343 | ||
287 | this.tsWatcher.on('add', p => addHandler(p)) | 344 | this.tsWatcher.on('add', p => addHandler(p)) |
288 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 345 | this.tsWatcher.on('unlink', p => deleteHandler(p)) |
@@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter { | |||
315 | extname: '.ts', | 372 | extname: '.ts', |
316 | infoHash: null, | 373 | infoHash: null, |
317 | fps: this.fps, | 374 | fps: this.fps, |
375 | storage: this.streamingPlaylist.storage, | ||
318 | videoStreamingPlaylistId: this.streamingPlaylist.id | 376 | videoStreamingPlaylistId: this.streamingPlaylist.id |
319 | }) | 377 | }) |
320 | 378 | ||
@@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter { | |||
343 | } | 401 | } |
344 | 402 | ||
345 | private processSegments (segmentPaths: string[]) { | 403 | private processSegments (segmentPaths: string[]) { |
346 | mapSeries(segmentPaths, async previousSegment => { | 404 | mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment)) |
347 | // Add sha hash of previous segments, because ffmpeg should have finished generating them | 405 | .catch(err => { |
348 | await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) | 406 | if (this.aborted) return |
407 | |||
408 | logger.error('Cannot process segments', { err, ...this.lTags() }) | ||
409 | }) | ||
410 | } | ||
349 | 411 | ||
350 | if (this.saveReplay) { | 412 | private async processSegment (segmentPath: string) { |
351 | await this.addSegmentToReplay(previousSegment) | 413 | // Add sha hash of previous segments, because ffmpeg should have finished generating them |
414 | await this.liveSegmentShaStore.addSegmentSha(segmentPath) | ||
415 | |||
416 | if (this.saveReplay) { | ||
417 | await this.addSegmentToReplay(segmentPath) | ||
418 | } | ||
419 | |||
420 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | ||
421 | try { | ||
422 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) | ||
423 | } catch (err) { | ||
424 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) | ||
352 | } | 425 | } |
353 | }).catch(err => { | 426 | } |
354 | if (this.aborted) return | ||
355 | 427 | ||
356 | logger.error('Cannot process segments', { err, ...this.lTags() }) | 428 | // Master playlist and segment JSON file are created, live is ready |
357 | }) | 429 | if (this.masterPlaylistCreated && !this.liveReady) { |
430 | this.liveReady = true | ||
431 | |||
432 | this.emit('live-ready', { videoId: this.videoId }) | ||
433 | } | ||
358 | } | 434 | } |
359 | 435 | ||
360 | private hasClientSocketInBadHealth (sessionId: string) { | 436 | private hasClientSocketInBadHealth (sessionId: string) { |