From efda99c30f2c04702bf57cc150cdfdd0acccc178 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 14 May 2018 10:57:07 +0200 Subject: Store webtorrent chunks in indexdb --- client/package.json | 2 + client/src/assets/player/peertube-chunk-store.ts | 217 +++++++++++++++++++++ .../src/assets/player/peertube-videojs-plugin.ts | 19 +- client/webpack/webpack.video-embed.js | 4 +- client/yarn.lock | 10 + 5 files changed, 241 insertions(+), 11 deletions(-) create mode 100644 client/src/assets/player/peertube-chunk-store.ts (limited to 'client') diff --git a/client/package.json b/client/package.json index e9f84b593..51d1269ab 100644 --- a/client/package.json +++ b/client/package.json @@ -53,9 +53,11 @@ "angular2-notifications": "^1.0.0", "awesome-typescript-loader": "5.0.0", "bootstrap-sass": "^3.3.7", + "cache-chunk-store": "^2.0.0", "codelyzer": "^4.0.2", "core-js": "^2.4.1", "css-loader": "^0.28.4", + "dexie": "^2.0.3", "extract-text-webpack-plugin": "4.0.0-beta.0", "file-loader": "^1.1.5", "html-webpack-plugin": "^3.2.0", diff --git a/client/src/assets/player/peertube-chunk-store.ts b/client/src/assets/player/peertube-chunk-store.ts new file mode 100644 index 000000000..005e98a81 --- /dev/null +++ b/client/src/assets/player/peertube-chunk-store.ts @@ -0,0 +1,217 @@ +// From https://github.com/MinEduTDF/idb-chunk-store +// We use temporary IndexDB (all data are removed on destroy) to avoid RAM issues +// Thanks @santiagogil and @Feross + +import { EventEmitter } from 'events' +import Dexie from 'dexie' + +class ChunkDatabase extends Dexie { + chunks: Dexie.Table<{ id: number, buf: Buffer }, number> + + constructor (dbname: string) { + super(dbname) + + this.version(1).stores({ + chunks: 'id' + }) + } +} + +class ExpirationDatabase extends Dexie { + databases: Dexie.Table<{ name: string, expiration: number }, number> + + constructor () { + super('webtorrent-expiration') + + this.version(1).stores({ + databases: 'name,expiration' + }) + } +} + +export class PeertubeChunkStore extends EventEmitter { + private static readonly BUFFERING_PUT_MS = 1000 + private static readonly CLEANER_INTERVAL_MS = 1000 * 60 // 1 minute + private static readonly CLEANER_EXPIRATION_MS = 1000 * 60 * 5 // 5 minutes + + chunkLength: number + + private pendingPut: { id: number, buf: Buffer, cb: Function }[] = [] + // If the store is full + private memoryChunks: { [ id: number ]: Buffer | true } = {} + private databaseName: string + private putBulkTimeout + private cleanerInterval + private db: ChunkDatabase + private expirationDB: ExpirationDatabase + private readonly length: number + private readonly lastChunkLength: number + private readonly lastChunkIndex: number + + constructor (chunkLength: number, opts) { + super() + + this.databaseName = 'webtorrent-chunks-' + + if (!opts) opts = {} + if (opts.torrent && opts.torrent.infoHash) this.databaseName += opts.torrent.infoHash + else this.databaseName += '-default' + + this.setMaxListeners(100) + + this.chunkLength = Number(chunkLength) + if (!this.chunkLength) throw new Error('First argument must be a chunk length') + + this.length = Number(opts.length) || Infinity + + if (this.length !== Infinity) { + this.lastChunkLength = (this.length % this.chunkLength) || this.chunkLength + this.lastChunkIndex = Math.ceil(this.length / this.chunkLength) - 1 + } + + this.db = new ChunkDatabase(this.databaseName) + // Track databases that expired + this.expirationDB = new ExpirationDatabase() + + this.runCleaner() + } + + put (index: number, buf: Buffer, cb: Function) { + const isLastChunk = (index === this.lastChunkIndex) + if (isLastChunk && buf.length !== this.lastChunkLength) { + return this.nextTick(cb, new Error('Last chunk length must be ' + this.lastChunkLength)) + } + if (!isLastChunk && buf.length !== this.chunkLength) { + return this.nextTick(cb, new Error('Chunk length must be ' + this.chunkLength)) + } + + // Specify we have this chunk + this.memoryChunks[index] = true + + // Add it to the pending put + this.pendingPut.push({ id: index, buf, cb }) + // If it's already planned, return + if (this.putBulkTimeout) return + + // Plan a future bulk insert + this.putBulkTimeout = setTimeout(async () => { + const processing = this.pendingPut + this.pendingPut = [] + this.putBulkTimeout = undefined + + try { + await this.db.transaction('rw', this.db.chunks, () => { + return this.db.chunks.bulkPut(processing.map(p => ({ id: p.id, buf: p.buf }))) + }) + } catch (err) { + console.log('Cannot bulk insert chunks. Store them in memory.', { err }) + + processing.forEach(p => this.memoryChunks[ p.id ] = p.buf) + } finally { + processing.forEach(p => p.cb()) + } + }, PeertubeChunkStore.BUFFERING_PUT_MS) + } + + get (index: number, opts, cb) { + if (typeof opts === 'function') return this.get(index, null, opts) + + // IndexDB could be slow, use our memory index first + const memoryChunk = this.memoryChunks[index] + if (memoryChunk === undefined) return cb(null, new Buffer(0)) + // Chunk in memory + if (memoryChunk !== true) return cb(null, memoryChunk) + + // Chunk in store + this.db.transaction('r', this.db.chunks, async () => { + const result = await this.db.chunks.get({ id: index }) + if (result === undefined) return cb(null, new Buffer(0)) + + const buf = result.buf + if (!opts) return this.nextTick(cb, null, buf) + + const offset = opts.offset || 0 + const len = opts.length || (buf.length - offset) + return cb(null, buf.slice(offset, len + offset)) + }) + .catch(err => { + console.error(err) + return cb(err) + }) + } + + close (db) { + return this.destroy(db) + } + + async destroy (cb) { + try { + if (this.pendingPut) { + clearTimeout(this.putBulkTimeout) + this.pendingPut = null + } + if (this.cleanerInterval) { + clearInterval(this.cleanerInterval) + this.cleanerInterval = null + } + + if (this.expirationDB) { + await this.expirationDB.close() + this.expirationDB = null + } + + if (this.db) { + console.log('Destroying IndexDB database %s.', this.databaseName) + await this.db.close() + await this.db.delete() + } + + return cb() + } catch (err) { + console.error('Cannot destroy peertube chunk store.', err) + return cb(err) + } + } + + private runCleaner () { + this.checkExpiration() + + this.cleanerInterval = setInterval(async () => { + this.checkExpiration() + }, PeertubeChunkStore.CLEANER_INTERVAL_MS) + } + + private checkExpiration () { + this.expirationDB.transaction('rw', this.expirationDB.databases, async () => { + try { + // Update our database expiration since we are alive + await this.expirationDB.databases.put({ + name: this.databaseName, + expiration: new Date().getTime() + PeertubeChunkStore.CLEANER_EXPIRATION_MS + }) + + const now = new Date().getTime() + const databasesToDeleteInfo = await this.expirationDB.databases.where('expiration').below(now).toArray() + + for (const databaseToDeleteInfo of databasesToDeleteInfo) { + await this.dropDatabase(databaseToDeleteInfo.name) + + await this.expirationDB.databases.where({ name: databaseToDeleteInfo.name }).delete() + } + } catch (err) { + console.error('Cannot check expiration.', err) + } + }) + } + + private dropDatabase (databaseName: string) { + const dbToDelete = new ChunkDatabase(databaseName) + + console.log('Deleting %s.', databaseName) + return dbToDelete.delete() + } + + private nextTick (cb, err, val?) { + process.nextTick(() => cb(err, val), undefined) + } +} diff --git a/client/src/assets/player/peertube-videojs-plugin.ts b/client/src/assets/player/peertube-videojs-plugin.ts index a4f99559c..ac04421a7 100644 --- a/client/src/assets/player/peertube-videojs-plugin.ts +++ b/client/src/assets/player/peertube-videojs-plugin.ts @@ -4,16 +4,11 @@ import { VideoFile } from '../../../../shared/models/videos/video.model' import { renderVideo } from './video-renderer' import './settings-menu-button' import { PeertubePluginOptions, VideoJSComponentInterface, videojsUntyped } from './peertube-videojs-typings' -import { - getAverageBandwidth, - getStoredMute, - getStoredVolume, - saveAverageBandwidth, - saveMuteInStore, - saveVolumeInStore -} from './utils' +import { getAverageBandwidth, getStoredMute, getStoredVolume, saveAverageBandwidth, saveMuteInStore, saveVolumeInStore } from './utils' import minBy from 'lodash-es/minBy' import maxBy from 'lodash-es/maxBy' +import * as CacheChunkStore from 'cache-chunk-store' +import { PeertubeChunkStore } from './peertube-chunk-store' const webtorrent = new WebTorrent({ tracker: { @@ -169,7 +164,13 @@ class PeerTubePlugin extends Plugin { console.log('Adding ' + magnetOrTorrentUrl + '.') const oldTorrent = this.torrent - this.torrent = webtorrent.add(magnetOrTorrentUrl, torrent => { + const options = { + store: (chunkLength, storeOpts) => new CacheChunkStore(new PeertubeChunkStore(chunkLength, storeOpts), { + max: 100 + }) + } + + this.torrent = webtorrent.add(magnetOrTorrentUrl, options, torrent => { console.log('Added ' + magnetOrTorrentUrl + '.') // Pause the old torrent diff --git a/client/webpack/webpack.video-embed.js b/client/webpack/webpack.video-embed.js index 63cda07bd..4a8879031 100644 --- a/client/webpack/webpack.video-embed.js +++ b/client/webpack/webpack.video-embed.js @@ -136,8 +136,8 @@ module.exports = function () { ], performance: { - maxEntrypointSize: 600000, // 600kB - maxAssetSize: 600000 + maxEntrypointSize: 700000, // 600kB + maxAssetSize: 700000 }, node: { diff --git a/client/yarn.lock b/client/yarn.lock index 917c7dbb2..3cbba6be3 100644 --- a/client/yarn.lock +++ b/client/yarn.lock @@ -1665,6 +1665,12 @@ cache-base@^1.0.1: union-value "^1.0.0" unset-value "^1.0.0" +cache-chunk-store@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/cache-chunk-store/-/cache-chunk-store-2.0.0.tgz#65dd1d0dc730c301479c9bb209747f5d2f8c6c9a" + dependencies: + lru "^3.1.0" + cache-loader@^1.2.0: version "1.2.2" resolved "https://registry.yarnpkg.com/cache-loader/-/cache-loader-1.2.2.tgz#6d5c38ded959a09cc5d58190ab5af6f73bd353f5" @@ -2593,6 +2599,10 @@ detect-node@^2.0.3: version "2.0.3" resolved "https://registry.yarnpkg.com/detect-node/-/detect-node-2.0.3.tgz#a2033c09cc8e158d37748fbde7507832bd6ce127" +dexie@^2.0.3: + version "2.0.3" + resolved "https://registry.yarnpkg.com/dexie/-/dexie-2.0.3.tgz#c32db3e8c7d43ac509379ad126f125c57a0e7c9c" + diff@^3.2.0, diff@^3.3.1, diff@^3.5.0: version "3.5.0" resolved "https://registry.yarnpkg.com/diff/-/diff-3.5.0.tgz#800c0dd1e0a8bfbc95835c202ad220fe317e5a12" -- cgit v1.2.3