X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Flive-segment-sha-store.ts;h=1a0a93985598721cb4fd5ad17bfc8ed7a1a21c13;hb=3733175b6b53d8f9cdc3e1752110a030508907af;hp=4af6f3ebfdfaba48cc530e91ba007852690b762f;hpb=cf21b2cbef61929177b9c09b5e017c3b7eb8535d;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts index 4af6f3ebf..1a0a93985 100644 --- a/server/lib/live/live-segment-sha-store.ts +++ b/server/lib/live/live-segment-sha-store.ts @@ -1,61 +1,90 @@ +import { rename, writeJson } from 'fs-extra' +import PQueue from 'p-queue' import { basename } from 'path' +import { mapToJSON } from '@server/helpers/core-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MStreamingPlaylistVideo } from '@server/types/models' import { buildSha256Segment } from '../hls' +import { storeHLSFileFromPath } from '../object-storage' const lTags = loggerTagsFactory('live') class LiveSegmentShaStore { - private static instance: LiveSegmentShaStore + private readonly segmentsSha256 = new Map() - private readonly segmentsSha256 = new Map>() + private readonly videoUUID: string - private constructor () { - } + private readonly sha256Path: string + private readonly sha256PathTMP: string + + private readonly streamingPlaylist: MStreamingPlaylistVideo + private readonly sendToObjectStorage: boolean + private readonly writeQueue = new PQueue({ concurrency: 1 }) + + constructor (options: { + videoUUID: string + sha256Path: string + streamingPlaylist: MStreamingPlaylistVideo + sendToObjectStorage: boolean + }) { + this.videoUUID = options.videoUUID - getSegmentsSha256 (videoUUID: string) { - return this.segmentsSha256.get(videoUUID) + this.sha256Path = options.sha256Path + this.sha256PathTMP = options.sha256Path + '.tmp' + + this.streamingPlaylist = options.streamingPlaylist + this.sendToObjectStorage = options.sendToObjectStorage } - async addSegmentSha (videoUUID: string, segmentPath: string) { - const segmentName = basename(segmentPath) - logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID)) + async addSegmentSha (segmentPath: string) { + logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID)) const shaResult = await buildSha256Segment(segmentPath) - if (!this.segmentsSha256.has(videoUUID)) { - this.segmentsSha256.set(videoUUID, new Map()) - } + const segmentName = basename(segmentPath) + this.segmentsSha256.set(segmentName, shaResult) - const filesMap = this.segmentsSha256.get(videoUUID) - filesMap.set(segmentName, shaResult) + try { + await this.writeToDisk() + } catch (err) { + logger.error('Cannot write sha segments to disk.', { err }) + } } - removeSegmentSha (videoUUID: string, segmentPath: string) { + async removeSegmentSha (segmentPath: string) { const segmentName = basename(segmentPath) - logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) - - const filesMap = this.segmentsSha256.get(videoUUID) - if (!filesMap) { - logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID)) - return - } + logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) - if (!filesMap.has(segmentName)) { - logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID)) + if (!this.segmentsSha256.has(segmentName)) { + logger.warn( + 'Unknown segment in live segment hash store for video %s and segment %s.', + this.videoUUID, segmentPath, lTags(this.videoUUID) + ) return } - filesMap.delete(segmentName) - } + this.segmentsSha256.delete(segmentName) - cleanupShaSegments (videoUUID: string) { - this.segmentsSha256.delete(videoUUID) + await this.writeToDisk() } - static get Instance () { - return this.instance || (this.instance = new this()) + private writeToDisk () { + return this.writeQueue.add(async () => { + // Atomic write: use rename instead of move that is not atomic + await writeJson(this.sha256PathTMP, mapToJSON(this.segmentsSha256)) + await rename(this.sha256PathTMP, this.sha256Path) + + if (this.sendToObjectStorage) { + const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path) + + if (this.streamingPlaylist.segmentsSha256Url !== url) { + this.streamingPlaylist.segmentsSha256Url = url + await this.streamingPlaylist.save() + } + } + }) } }