From efda99c30f2c04702bf57cc150cdfdd0acccc178 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 14 May 2018 10:57:07 +0200 Subject: Store webtorrent chunks in indexdb --- client/src/assets/player/peertube-chunk-store.ts | 217 +++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 client/src/assets/player/peertube-chunk-store.ts (limited to 'client/src/assets/player/peertube-chunk-store.ts') diff --git a/client/src/assets/player/peertube-chunk-store.ts b/client/src/assets/player/peertube-chunk-store.ts new file mode 100644 index 000000000..005e98a81 --- /dev/null +++ b/client/src/assets/player/peertube-chunk-store.ts @@ -0,0 +1,217 @@ +// From https://github.com/MinEduTDF/idb-chunk-store +// We use temporary IndexDB (all data are removed on destroy) to avoid RAM issues +// Thanks @santiagogil and @Feross + +import { EventEmitter } from 'events' +import Dexie from 'dexie' + +class ChunkDatabase extends Dexie { + chunks: Dexie.Table<{ id: number, buf: Buffer }, number> + + constructor (dbname: string) { + super(dbname) + + this.version(1).stores({ + chunks: 'id' + }) + } +} + +class ExpirationDatabase extends Dexie { + databases: Dexie.Table<{ name: string, expiration: number }, number> + + constructor () { + super('webtorrent-expiration') + + this.version(1).stores({ + databases: 'name,expiration' + }) + } +} + +export class PeertubeChunkStore extends EventEmitter { + private static readonly BUFFERING_PUT_MS = 1000 + private static readonly CLEANER_INTERVAL_MS = 1000 * 60 // 1 minute + private static readonly CLEANER_EXPIRATION_MS = 1000 * 60 * 5 // 5 minutes + + chunkLength: number + + private pendingPut: { id: number, buf: Buffer, cb: Function }[] = [] + // If the store is full + private memoryChunks: { [ id: number ]: Buffer | true } = {} + private databaseName: string + private putBulkTimeout + private cleanerInterval + private db: ChunkDatabase + private expirationDB: ExpirationDatabase + private readonly length: number + private readonly lastChunkLength: number + private readonly lastChunkIndex: number + + constructor (chunkLength: number, opts) { + super() + + this.databaseName = 'webtorrent-chunks-' + + if (!opts) opts = {} + if (opts.torrent && opts.torrent.infoHash) this.databaseName += opts.torrent.infoHash + else this.databaseName += '-default' + + this.setMaxListeners(100) + + this.chunkLength = Number(chunkLength) + if (!this.chunkLength) throw new Error('First argument must be a chunk length') + + this.length = Number(opts.length) || Infinity + + if (this.length !== Infinity) { + this.lastChunkLength = (this.length % this.chunkLength) || this.chunkLength + this.lastChunkIndex = Math.ceil(this.length / this.chunkLength) - 1 + } + + this.db = new ChunkDatabase(this.databaseName) + // Track databases that expired + this.expirationDB = new ExpirationDatabase() + + this.runCleaner() + } + + put (index: number, buf: Buffer, cb: Function) { + const isLastChunk = (index === this.lastChunkIndex) + if (isLastChunk && buf.length !== this.lastChunkLength) { + return this.nextTick(cb, new Error('Last chunk length must be ' + this.lastChunkLength)) + } + if (!isLastChunk && buf.length !== this.chunkLength) { + return this.nextTick(cb, new Error('Chunk length must be ' + this.chunkLength)) + } + + // Specify we have this chunk + this.memoryChunks[index] = true + + // Add it to the pending put + this.pendingPut.push({ id: index, buf, cb }) + // If it's already planned, return + if (this.putBulkTimeout) return + + // Plan a future bulk insert + this.putBulkTimeout = setTimeout(async () => { + const processing = this.pendingPut + this.pendingPut = [] + this.putBulkTimeout = undefined + + try { + await this.db.transaction('rw', this.db.chunks, () => { + return this.db.chunks.bulkPut(processing.map(p => ({ id: p.id, buf: p.buf }))) + }) + } catch (err) { + console.log('Cannot bulk insert chunks. Store them in memory.', { err }) + + processing.forEach(p => this.memoryChunks[ p.id ] = p.buf) + } finally { + processing.forEach(p => p.cb()) + } + }, PeertubeChunkStore.BUFFERING_PUT_MS) + } + + get (index: number, opts, cb) { + if (typeof opts === 'function') return this.get(index, null, opts) + + // IndexDB could be slow, use our memory index first + const memoryChunk = this.memoryChunks[index] + if (memoryChunk === undefined) return cb(null, new Buffer(0)) + // Chunk in memory + if (memoryChunk !== true) return cb(null, memoryChunk) + + // Chunk in store + this.db.transaction('r', this.db.chunks, async () => { + const result = await this.db.chunks.get({ id: index }) + if (result === undefined) return cb(null, new Buffer(0)) + + const buf = result.buf + if (!opts) return this.nextTick(cb, null, buf) + + const offset = opts.offset || 0 + const len = opts.length || (buf.length - offset) + return cb(null, buf.slice(offset, len + offset)) + }) + .catch(err => { + console.error(err) + return cb(err) + }) + } + + close (db) { + return this.destroy(db) + } + + async destroy (cb) { + try { + if (this.pendingPut) { + clearTimeout(this.putBulkTimeout) + this.pendingPut = null + } + if (this.cleanerInterval) { + clearInterval(this.cleanerInterval) + this.cleanerInterval = null + } + + if (this.expirationDB) { + await this.expirationDB.close() + this.expirationDB = null + } + + if (this.db) { + console.log('Destroying IndexDB database %s.', this.databaseName) + await this.db.close() + await this.db.delete() + } + + return cb() + } catch (err) { + console.error('Cannot destroy peertube chunk store.', err) + return cb(err) + } + } + + private runCleaner () { + this.checkExpiration() + + this.cleanerInterval = setInterval(async () => { + this.checkExpiration() + }, PeertubeChunkStore.CLEANER_INTERVAL_MS) + } + + private checkExpiration () { + this.expirationDB.transaction('rw', this.expirationDB.databases, async () => { + try { + // Update our database expiration since we are alive + await this.expirationDB.databases.put({ + name: this.databaseName, + expiration: new Date().getTime() + PeertubeChunkStore.CLEANER_EXPIRATION_MS + }) + + const now = new Date().getTime() + const databasesToDeleteInfo = await this.expirationDB.databases.where('expiration').below(now).toArray() + + for (const databaseToDeleteInfo of databasesToDeleteInfo) { + await this.dropDatabase(databaseToDeleteInfo.name) + + await this.expirationDB.databases.where({ name: databaseToDeleteInfo.name }).delete() + } + } catch (err) { + console.error('Cannot check expiration.', err) + } + }) + } + + private dropDatabase (databaseName: string) { + const dbToDelete = new ChunkDatabase(databaseName) + + console.log('Deleting %s.', databaseName) + return dbToDelete.delete() + } + + private nextTick (cb, err, val?) { + process.nextTick(() => cb(err, val), undefined) + } +} -- cgit v1.2.3