import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
-import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
+import {
+ removeHLSFileObjectStorageByPath,
+ storeHLSFileFromContent,
+ storeHLSFileFromFilename,
+ storeHLSFileFromPath
+} from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
private readonly user: MUserId
private readonly sessionId: string
private readonly videoLive: MVideoLiveVideo
- private readonly inputUrl: string
+
+ private readonly inputLocalUrl: string
+ private readonly inputPublicUrl: string
+
private readonly fps: number
private readonly allResolutions: number[]
private readonly lTags: LoggerTagsFn
+ // Path -> Queue
+ private readonly objectStorageSendQueues = new Map<string, PQueue>()
+
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
private streamingPlaylist: MStreamingPlaylistVideo
user: MUserId
sessionId: string
videoLive: MVideoLiveVideo
- inputUrl: string
+
+ inputLocalUrl: string
+ inputPublicUrl: string
+
fps: number
bitrate: number
ratio: number
this.user = options.user
this.sessionId = options.sessionId
this.videoLive = options.videoLive
- this.inputUrl = options.inputUrl
+
+ this.inputLocalUrl = options.inputLocalUrl
+ this.inputPublicUrl = options.inputPublicUrl
+
this.fps = options.fps
this.bitrate = options.bitrate
this.watchMasterFile()
this.watchTSFiles()
- this.watchM3U8File()
}
abort () {
})
}
- private watchM3U8File () {
- const sendQueues = new Map<string, PQueue>()
-
- const onChange = async (m3u8Path: string) => {
- if (m3u8Path.endsWith('.m3u8') !== true) return
- if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
-
- logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
-
- try {
- if (!sendQueues.has(m3u8Path)) {
- sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
- }
-
- const queue = sendQueues.get(m3u8Path)
- await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
- } catch (err) {
- logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
- }
- }
-
- this.filesWatcher.on('change', onChange)
- }
-
private watchTSFiles () {
const startStreamDateTime = new Date().getTime()
- const playlistIdMatcher = /^([\d+])-/
-
const addHandler = async (segmentPath: string) => {
if (segmentPath.endsWith('.ts') !== true) return
logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
- const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
+ const playlistId = this.getPlaylistIdFromTS(segmentPath)
const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
this.processSegments(segmentsToProcess)
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+
+ await this.processM3U8ToObjectStorage(segmentPath)
} catch (err) {
logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
}
}
}
+ private async processM3U8ToObjectStorage (segmentPath: string) {
+ const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
+
+ logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
+
+ const segmentName = basename(segmentPath)
+
+ const playlistContent = await readFile(m3u8Path, 'utf-8')
+ // Remove new chunk references, that will be processed later
+ const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
+
+ try {
+ if (!this.objectStorageSendQueues.has(m3u8Path)) {
+ this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+ }
+
+ const queue = this.objectStorageSendQueues.get(m3u8Path)
+ await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
+ } catch (err) {
+ logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+ }
+ }
+
private onTranscodingError () {
this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
}
private onTranscodedEnded () {
this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
- logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
+ logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputLocalUrl, this.lTags())
setTimeout(() => {
// Wait latest segments generation, and close watchers
lTags: this.lTags,
- inputUrl: this.inputUrl,
+ sessionId: this.sessionId,
+ inputLocalUrl: this.inputLocalUrl,
+ inputPublicUrl: this.inputPublicUrl,
toTranscode: this.allResolutions.map(resolution => ({
resolution,
? new RemoteTranscodingWrapper(options)
: new FFmpegTranscodingWrapper(options)
}
+
+ private getPlaylistIdFromTS (segmentPath: string) {
+ const playlistIdMatcher = /^([\d+])-/
+
+ return basename(segmentPath).match(playlistIdMatcher)[1]
+ }
+
+ private getPlaylistNameFromTS (segmentPath: string) {
+ return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
+ }
}
// ---------------------------------------------------------------------------