import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
-import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
+import {
+ removeHLSFileObjectStorageByPath,
+ storeHLSFileFromContent,
+ storeHLSFileFromFilename,
+ storeHLSFileFromPath
+} from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
private readonly lTags: LoggerTagsFn
+ // Path -> Queue
+ private readonly objectStorageSendQueues = new Map<string, PQueue>()
+
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
private streamingPlaylist: MStreamingPlaylistVideo
this.watchMasterFile()
this.watchTSFiles()
- this.watchM3U8File()
}
abort () {
})
}
- private watchM3U8File () {
- const sendQueues = new Map<string, PQueue>()
-
- const onChange = async (m3u8Path: string) => {
- if (m3u8Path.endsWith('.m3u8') !== true) return
- if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
-
- logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
-
- try {
- if (!sendQueues.has(m3u8Path)) {
- sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
- }
-
- const queue = sendQueues.get(m3u8Path)
- await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
- } catch (err) {
- logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
- }
- }
-
- this.filesWatcher.on('change', onChange)
- }
-
private watchTSFiles () {
const startStreamDateTime = new Date().getTime()
- const playlistIdMatcher = /^([\d+])-/
-
const addHandler = async (segmentPath: string) => {
if (segmentPath.endsWith('.ts') !== true) return
logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
- const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
+ const playlistId = this.getPlaylistIdFromTS(segmentPath)
const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
this.processSegments(segmentsToProcess)
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+
+ await this.processM3U8ToObjectStorage(segmentPath)
} catch (err) {
logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
}
}
}
+ private async processM3U8ToObjectStorage (segmentPath: string) {
+ const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
+
+ logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
+
+ const segmentName = basename(segmentPath)
+
+ const playlistContent = await readFile(m3u8Path, 'utf-8')
+ // Remove new chunk references, that will be processed later
+ const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
+
+ try {
+ if (!this.objectStorageSendQueues.has(m3u8Path)) {
+ this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+ }
+
+ const queue = this.objectStorageSendQueues.get(m3u8Path)
+ await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
+ } catch (err) {
+ logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+ }
+ }
+
private onTranscodingError () {
this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
}
? new RemoteTranscodingWrapper(options)
: new FFmpegTranscodingWrapper(options)
}
+
+ private getPlaylistIdFromTS (segmentPath: string) {
+ const playlistIdMatcher = /^([\d+])-/
+
+ return basename(segmentPath).match(playlistIdMatcher)[1]
+ }
+
+ private getPlaylistNameFromTS (segmentPath: string) {
+ return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
+ }
}
// ---------------------------------------------------------------------------
return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
}
+async function storeContent (options: {
+ content: string
+ inputPath: string
+ objectStorageKey: string
+ bucketInfo: BucketInfo
+ isPrivate: boolean
+}): Promise<string> {
+ const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+ logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+ return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
+}
+
// ---------------------------------------------------------------------------
async function updateObjectACL (options: {
buildKey,
storeObject,
+ storeContent,
removeObject,
removeObjectByFullKey,
// ---------------------------------------------------------------------------
async function uploadToStorage (options: {
- content: ReadStream
+ content: ReadStream | string
objectStorageKey: string
bucketInfo: BucketInfo
isPrivate: boolean