From 9ab330b90decf4edf152ff8e1d2948c065766b2c Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 19 Oct 2022 10:43:53 +0200 Subject: Use private ACL for private videos in s3 --- server/lib/live/live-segment-sha-store.ts | 27 +-- .../shared/object-storage-helpers.ts | 192 +++++++++++++++------ server/lib/object-storage/urls.ts | 29 +++- server/lib/object-storage/videos.ts | 80 ++++++++- server/lib/video-privacy.ts | 89 ++++++---- 5 files changed, 321 insertions(+), 96 deletions(-) (limited to 'server/lib') diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts index faf03dccf..4d03754a9 100644 --- a/server/lib/live/live-segment-sha-store.ts +++ b/server/lib/live/live-segment-sha-store.ts @@ -5,6 +5,7 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger' import { MStreamingPlaylistVideo } from '@server/types/models' import { buildSha256Segment } from '../hls' import { storeHLSFileFromPath } from '../object-storage' +import PQueue from 'p-queue' const lTags = loggerTagsFactory('live') @@ -16,6 +17,7 @@ class LiveSegmentShaStore { private readonly sha256Path: string private readonly streamingPlaylist: MStreamingPlaylistVideo private readonly sendToObjectStorage: boolean + private readonly writeQueue = new PQueue({ concurrency: 1 }) constructor (options: { videoUUID: string @@ -37,7 +39,11 @@ class LiveSegmentShaStore { const segmentName = basename(segmentPath) this.segmentsSha256.set(segmentName, shaResult) - await this.writeToDisk() + try { + await this.writeToDisk() + } catch (err) { + logger.error('Cannot write sha segments to disk.', { err }) + } } async removeSegmentSha (segmentPath: string) { @@ -55,19 +61,20 @@ class LiveSegmentShaStore { await this.writeToDisk() } - private async writeToDisk () { - await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256)) + private writeToDisk () { + return this.writeQueue.add(async () => { + await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256)) - if (this.sendToObjectStorage) { - const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path) + if (this.sendToObjectStorage) { + const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path) - if (this.streamingPlaylist.segmentsSha256Url !== url) { - this.streamingPlaylist.segmentsSha256Url = url - await this.streamingPlaylist.save() + if (this.streamingPlaylist.segmentsSha256Url !== url) { + this.streamingPlaylist.segmentsSha256Url = url + await this.streamingPlaylist.save() + } } - } + }) } - } export { diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index c131977e8..05b52f412 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts @@ -2,18 +2,21 @@ import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-e import { dirname } from 'path' import { Readable } from 'stream' import { + _Object, CompleteMultipartUploadCommandOutput, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, - PutObjectCommandInput + PutObjectAclCommand, + PutObjectCommandInput, + S3Client } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import { pipelinePromise } from '@server/helpers/core-utils' import { isArray } from '@server/helpers/custom-validators/misc' import { logger } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' -import { getPrivateUrl } from '../urls' +import { getInternalUrl } from '../urls' import { getClient } from './client' import { lTags } from './logger' @@ -44,69 +47,91 @@ async function storeObject (options: { inputPath: string objectStorageKey: string bucketInfo: BucketInfo + isPrivate: boolean }): Promise { - const { inputPath, objectStorageKey, bucketInfo } = options + const { inputPath, objectStorageKey, bucketInfo, isPrivate } = options logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) const fileStream = createReadStream(inputPath) - return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) + return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) } // --------------------------------------------------------------------------- -async function removeObject (filename: string, bucketInfo: BucketInfo) { - const command = new DeleteObjectCommand({ +function updateObjectACL (options: { + objectStorageKey: string + bucketInfo: BucketInfo + isPrivate: boolean +}) { + const { objectStorageKey, bucketInfo, isPrivate } = options + + const key = buildKey(objectStorageKey, bucketInfo) + + logger.debug('Updating ACL file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) + + const command = new PutObjectAclCommand({ Bucket: bucketInfo.BUCKET_NAME, - Key: buildKey(filename, bucketInfo) + Key: key, + ACL: getACL(isPrivate) }) return getClient().send(command) } -async function removePrefix (prefix: string, bucketInfo: BucketInfo) { - const s3Client = getClient() - - const commandPrefix = bucketInfo.PREFIX + prefix - const listCommand = new ListObjectsV2Command({ - Bucket: bucketInfo.BUCKET_NAME, - Prefix: commandPrefix +function updatePrefixACL (options: { + prefix: string + bucketInfo: BucketInfo + isPrivate: boolean +}) { + const { prefix, bucketInfo, isPrivate } = options + + logger.debug('Updating ACL of files in prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) + + return applyOnPrefix({ + prefix, + bucketInfo, + commandBuilder: obj => { + return new PutObjectAclCommand({ + Bucket: bucketInfo.BUCKET_NAME, + Key: obj.Key, + ACL: getACL(isPrivate) + }) + } }) +} - const listedObjects = await s3Client.send(listCommand) +// --------------------------------------------------------------------------- - // FIXME: use bulk delete when s3ninja will support this operation - // const deleteParams = { - // Bucket: bucketInfo.BUCKET_NAME, - // Delete: { Objects: [] } - // } +function removeObject (objectStorageKey: string, bucketInfo: BucketInfo) { + const key = buildKey(objectStorageKey, bucketInfo) - if (isArray(listedObjects.Contents) !== true) { - const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` + logger.debug('Removing file %s in bucket %s', key, bucketInfo.BUCKET_NAME, lTags()) - logger.error(message, { response: listedObjects, ...lTags() }) - throw new Error(message) - } - - for (const object of listedObjects.Contents) { - const command = new DeleteObjectCommand({ - Bucket: bucketInfo.BUCKET_NAME, - Key: object.Key - }) - - await s3Client.send(command) + const command = new DeleteObjectCommand({ + Bucket: bucketInfo.BUCKET_NAME, + Key: key + }) - // FIXME: use bulk delete when s3ninja will support this operation - // deleteParams.Delete.Objects.push({ Key: object.Key }) - } + return getClient().send(command) +} +function removePrefix (prefix: string, bucketInfo: BucketInfo) { // FIXME: use bulk delete when s3ninja will support this operation - // const deleteCommand = new DeleteObjectsCommand(deleteParams) - // await s3Client.send(deleteCommand) - // Repeat if not all objects could be listed at once (limit of 1000?) - if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) + logger.debug('Removing prefix %s in bucket %s', prefix, bucketInfo.BUCKET_NAME, lTags()) + + return applyOnPrefix({ + prefix, + bucketInfo, + commandBuilder: obj => { + return new DeleteObjectCommand({ + Bucket: bucketInfo.BUCKET_NAME, + Key: obj.Key + }) + } + }) } // --------------------------------------------------------------------------- @@ -138,14 +163,42 @@ function buildKey (key: string, bucketInfo: BucketInfo) { // --------------------------------------------------------------------------- +async function createObjectReadStream (options: { + key: string + bucketInfo: BucketInfo + rangeHeader: string +}) { + const { key, bucketInfo, rangeHeader } = options + + const command = new GetObjectCommand({ + Bucket: bucketInfo.BUCKET_NAME, + Key: buildKey(key, bucketInfo), + Range: rangeHeader + }) + + const response = await getClient().send(command) + + return response.Body as Readable +} + +// --------------------------------------------------------------------------- + export { BucketInfo, buildKey, + storeObject, + removeObject, removePrefix, + makeAvailable, - listKeysOfPrefix + + updateObjectACL, + updatePrefixACL, + + listKeysOfPrefix, + createObjectReadStream } // --------------------------------------------------------------------------- @@ -154,17 +207,15 @@ async function uploadToStorage (options: { content: ReadStream objectStorageKey: string bucketInfo: BucketInfo + isPrivate: boolean }) { - const { content, objectStorageKey, bucketInfo } = options + const { content, objectStorageKey, bucketInfo, isPrivate } = options const input: PutObjectCommandInput = { Body: content, Bucket: bucketInfo.BUCKET_NAME, - Key: buildKey(objectStorageKey, bucketInfo) - } - - if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) { - input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL + Key: buildKey(objectStorageKey, bucketInfo), + ACL: getACL(isPrivate) } const parallelUploads3 = new Upload({ @@ -194,5 +245,50 @@ async function uploadToStorage (options: { bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() ) - return getPrivateUrl(bucketInfo, objectStorageKey) + return getInternalUrl(bucketInfo, objectStorageKey) +} + +async function applyOnPrefix (options: { + prefix: string + bucketInfo: BucketInfo + commandBuilder: (obj: _Object) => Parameters[0] + + continuationToken?: string +}) { + const { prefix, bucketInfo, commandBuilder, continuationToken } = options + + const s3Client = getClient() + + const commandPrefix = bucketInfo.PREFIX + prefix + const listCommand = new ListObjectsV2Command({ + Bucket: bucketInfo.BUCKET_NAME, + Prefix: commandPrefix, + ContinuationToken: continuationToken + }) + + const listedObjects = await s3Client.send(listCommand) + + if (isArray(listedObjects.Contents) !== true) { + const message = `Cannot apply function on ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` + + logger.error(message, { response: listedObjects, ...lTags() }) + throw new Error(message) + } + + for (const object of listedObjects.Contents) { + const command = commandBuilder(object) + + await s3Client.send(command) + } + + // Repeat if not all objects could be listed at once (limit of 1000?) + if (listedObjects.IsTruncated) { + await applyOnPrefix({ ...options, continuationToken: listedObjects.ContinuationToken }) + } +} + +function getACL (isPrivate: boolean) { + return isPrivate + ? CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PRIVATE + : CONFIG.OBJECT_STORAGE.UPLOAD_ACL.PUBLIC } diff --git a/server/lib/object-storage/urls.ts b/server/lib/object-storage/urls.ts index 2a889190b..a47a98b98 100644 --- a/server/lib/object-storage/urls.ts +++ b/server/lib/object-storage/urls.ts @@ -1,10 +1,14 @@ import { CONFIG } from '@server/initializers/config' +import { OBJECT_STORAGE_PROXY_PATHS, WEBSERVER } from '@server/initializers/constants' +import { MVideoUUID } from '@server/types/models' import { BucketInfo, buildKey, getEndpointParsed } from './shared' -function getPrivateUrl (config: BucketInfo, keyWithoutPrefix: string) { +function getInternalUrl (config: BucketInfo, keyWithoutPrefix: string) { return getBaseUrl(config) + buildKey(keyWithoutPrefix, config) } +// --------------------------------------------------------------------------- + function getWebTorrentPublicFileUrl (fileUrl: string) { const baseUrl = CONFIG.OBJECT_STORAGE.VIDEOS.BASE_URL if (!baseUrl) return fileUrl @@ -19,11 +23,28 @@ function getHLSPublicFileUrl (fileUrl: string) { return replaceByBaseUrl(fileUrl, baseUrl) } +// --------------------------------------------------------------------------- + +function getHLSPrivateFileUrl (video: MVideoUUID, filename: string) { + return WEBSERVER.URL + OBJECT_STORAGE_PROXY_PATHS.STREAMING_PLAYLISTS.PRIVATE_HLS + video.uuid + `/${filename}` +} + +function getWebTorrentPrivateFileUrl (filename: string) { + return WEBSERVER.URL + OBJECT_STORAGE_PROXY_PATHS.PRIVATE_WEBSEED + filename +} + +// --------------------------------------------------------------------------- + export { - getPrivateUrl, + getInternalUrl, + getWebTorrentPublicFileUrl, - replaceByBaseUrl, - getHLSPublicFileUrl + getHLSPublicFileUrl, + + getHLSPrivateFileUrl, + getWebTorrentPrivateFileUrl, + + replaceByBaseUrl } // --------------------------------------------------------------------------- diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts index e323baaa2..003807826 100644 --- a/server/lib/object-storage/videos.ts +++ b/server/lib/object-storage/videos.ts @@ -5,7 +5,17 @@ import { MStreamingPlaylistVideo, MVideo, MVideoFile } from '@server/types/model import { getHLSDirectory } from '../paths' import { VideoPathManager } from '../video-path-manager' import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys' -import { listKeysOfPrefix, lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared' +import { + createObjectReadStream, + listKeysOfPrefix, + lTags, + makeAvailable, + removeObject, + removePrefix, + storeObject, + updateObjectACL, + updatePrefixACL +} from './shared' function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) { return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) @@ -17,7 +27,8 @@ function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename: return storeObject({ inputPath: join(getHLSDirectory(playlist.Video), filename), objectStorageKey: generateHLSObjectStorageKey(playlist, filename), - bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS + bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS, + isPrivate: playlist.Video.hasPrivateStaticPath() }) } @@ -25,7 +36,8 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) return storeObject({ inputPath: path, objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)), - bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS + bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS, + isPrivate: playlist.Video.hasPrivateStaticPath() }) } @@ -35,7 +47,26 @@ function storeWebTorrentFile (video: MVideo, file: MVideoFile) { return storeObject({ inputPath: VideoPathManager.Instance.getFSVideoFileOutputPath(video, file), objectStorageKey: generateWebTorrentObjectStorageKey(file.filename), - bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS + bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS, + isPrivate: video.hasPrivateStaticPath() + }) +} + +// --------------------------------------------------------------------------- + +function updateWebTorrentFileACL (video: MVideo, file: MVideoFile) { + return updateObjectACL({ + objectStorageKey: generateWebTorrentObjectStorageKey(file.filename), + bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS, + isPrivate: video.hasPrivateStaticPath() + }) +} + +function updateHLSFilesACL (playlist: MStreamingPlaylistVideo) { + return updatePrefixACL({ + prefix: generateHLSObjectBaseStorageKey(playlist), + bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS, + isPrivate: playlist.Video.hasPrivateStaticPath() }) } @@ -87,6 +118,39 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin // --------------------------------------------------------------------------- +function getWebTorrentFileReadStream (options: { + filename: string + rangeHeader: string +}) { + const { filename, rangeHeader } = options + + const key = generateWebTorrentObjectStorageKey(filename) + + return createObjectReadStream({ + key, + bucketInfo: CONFIG.OBJECT_STORAGE.VIDEOS, + rangeHeader + }) +} + +function getHLSFileReadStream (options: { + playlist: MStreamingPlaylistVideo + filename: string + rangeHeader: string +}) { + const { playlist, filename, rangeHeader } = options + + const key = generateHLSObjectStorageKey(playlist, filename) + + return createObjectReadStream({ + key, + bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS, + rangeHeader + }) +} + +// --------------------------------------------------------------------------- + export { listHLSFileKeysOf, @@ -94,10 +158,16 @@ export { storeHLSFileFromFilename, storeHLSFileFromPath, + updateWebTorrentFileACL, + updateHLSFilesACL, + removeHLSObjectStorage, removeHLSFileObjectStorage, removeWebTorrentObjectStorage, makeWebTorrentFileAvailable, - makeHLSFileAvailable + makeHLSFileAvailable, + + getWebTorrentFileReadStream, + getHLSFileReadStream } diff --git a/server/lib/video-privacy.ts b/server/lib/video-privacy.ts index 1a4a5a22d..41f9d62b3 100644 --- a/server/lib/video-privacy.ts +++ b/server/lib/video-privacy.ts @@ -2,8 +2,9 @@ import { move } from 'fs-extra' import { join } from 'path' import { logger } from '@server/helpers/logger' import { DIRECTORIES } from '@server/initializers/constants' -import { MVideo, MVideoFullLight } from '@server/types/models' -import { VideoPrivacy } from '@shared/models' +import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' +import { VideoPrivacy, VideoStorage } from '@shared/models' +import { updateHLSFilesACL, updateWebTorrentFileACL } from './object-storage' function setVideoPrivacy (video: MVideo, newPrivacy: VideoPrivacy) { if (video.privacy === VideoPrivacy.PRIVATE && newPrivacy !== VideoPrivacy.PRIVATE) { @@ -50,47 +51,77 @@ export { // --------------------------------------------------------------------------- +type MoveType = 'private-to-public' | 'public-to-private' + async function moveFiles (options: { - type: 'private-to-public' | 'public-to-private' + type: MoveType video: MVideoFullLight }) { const { type, video } = options - const directories = type === 'private-to-public' - ? { - webtorrent: { old: DIRECTORIES.VIDEOS.PRIVATE, new: DIRECTORIES.VIDEOS.PUBLIC }, - hls: { old: DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE, new: DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC } + for (const file of video.VideoFiles) { + if (file.storage === VideoStorage.FILE_SYSTEM) { + await moveWebTorrentFileOnFS(type, video, file) + } else { + await updateWebTorrentFileACL(video, file) } - : { - webtorrent: { old: DIRECTORIES.VIDEOS.PUBLIC, new: DIRECTORIES.VIDEOS.PRIVATE }, - hls: { old: DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC, new: DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE } + } + + const hls = video.getHLSPlaylist() + + if (hls) { + if (hls.storage === VideoStorage.FILE_SYSTEM) { + await moveHLSFilesOnFS(type, video) + } else { + await updateHLSFilesACL(hls) } + } +} - for (const file of video.VideoFiles) { - const source = join(directories.webtorrent.old, file.filename) - const destination = join(directories.webtorrent.new, file.filename) +async function moveWebTorrentFileOnFS (type: MoveType, video: MVideo, file: MVideoFile) { + const directories = getWebTorrentDirectories(type) - try { - logger.info('Moving WebTorrent files of %s after privacy change (%s -> %s).', video.uuid, source, destination) + const source = join(directories.old, file.filename) + const destination = join(directories.new, file.filename) - await move(source, destination) - } catch (err) { - logger.error('Cannot move webtorrent file %s to %s after privacy change', source, destination, { err }) - } + try { + logger.info('Moving WebTorrent files of %s after privacy change (%s -> %s).', video.uuid, source, destination) + + await move(source, destination) + } catch (err) { + logger.error('Cannot move webtorrent file %s to %s after privacy change', source, destination, { err }) + } +} + +function getWebTorrentDirectories (moveType: MoveType) { + if (moveType === 'private-to-public') { + return { old: DIRECTORIES.VIDEOS.PRIVATE, new: DIRECTORIES.VIDEOS.PUBLIC } } - const hls = video.getHLSPlaylist() + return { old: DIRECTORIES.VIDEOS.PUBLIC, new: DIRECTORIES.VIDEOS.PRIVATE } +} - if (hls) { - const source = join(directories.hls.old, video.uuid) - const destination = join(directories.hls.new, video.uuid) +// --------------------------------------------------------------------------- - try { - logger.info('Moving HLS files of %s after privacy change (%s -> %s).', video.uuid, source, destination) +async function moveHLSFilesOnFS (type: MoveType, video: MVideo) { + const directories = getHLSDirectories(type) - await move(source, destination) - } catch (err) { - logger.error('Cannot move HLS file %s to %s after privacy change', source, destination, { err }) - } + const source = join(directories.old, video.uuid) + const destination = join(directories.new, video.uuid) + + try { + logger.info('Moving HLS files of %s after privacy change (%s -> %s).', video.uuid, source, destination) + + await move(source, destination) + } catch (err) { + logger.error('Cannot move HLS file %s to %s after privacy change', source, destination, { err }) + } +} + +function getHLSDirectories (moveType: MoveType) { + if (moveType === 'private-to-public') { + return { old: DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE, new: DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC } } + + return { old: DIRECTORIES.HLS_STREAMING_PLAYLIST.PUBLIC, new: DIRECTORIES.HLS_STREAMING_PLAYLIST.PRIVATE } } -- cgit v1.2.3