From cda021079ff455cc0fd0eb95a5395fa808ab63d1 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Sat, 30 Jan 2016 17:05:22 +0100 Subject: New directory organization --- lib/poolRequests.js | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/webTorrentNode.js | 160 +++++++++++++++++++++++++++++++++++++++ lib/webtorrent.js | 91 ++++++++++++++++++++++ 3 files changed, 457 insertions(+) create mode 100644 lib/poolRequests.js create mode 100644 lib/webTorrentNode.js create mode 100644 lib/webtorrent.js (limited to 'lib') diff --git a/lib/poolRequests.js b/lib/poolRequests.js new file mode 100644 index 000000000..9c7f3238b --- /dev/null +++ b/lib/poolRequests.js @@ -0,0 +1,206 @@ +;(function () { + 'use strict' + + var async = require('async') + + var constants = require('../initializers/constants') + var logger = require('../helpers/logger') + var database = require('../initializers/database') + var pluck = require('lodash-node/compat/collection/pluck') + var PoolRequestsDB = database.PoolRequestsDB + var PodsDB = database.PodsDB + var utils = require('../helpers/utils') + var VideosDB = database.VideosDB + + var poolRequests = {} + + // ----------- Private ----------- + var timer = null + + function removePoolRequestsFromDB (ids) { + PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) { + if (err) { + logger.error('Cannot remove requests from the pool requests database.', { error: err }) + return + } + + logger.info('Pool requests flushed.') + }) + } + + function makePoolRequests () { + logger.info('Making pool requests to friends.') + + PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) { + if (err) throw err + + if (pool_requests.length === 0) return + + var requests = { + add: { + ids: [], + requests: [] + }, + remove: { + ids: [], + requests: [] + } + } + + async.each(pool_requests, function (pool_request, callback_each) { + if (pool_request.type === 'add') { + requests.add.requests.push(pool_request.request) + requests.add.ids.push(pool_request._id) + } else if (pool_request.type === 'remove') { + requests.remove.requests.push(pool_request.request) + requests.remove.ids.push(pool_request._id) + } else { + throw new Error('Unkown pool request type.') + } + + callback_each() + }, function () { + // Send the add requests + if (requests.add.requests.length !== 0) { + makePoolRequest('add', requests.add.requests, function (err) { + if (err) logger.error('Errors when sent add pool requests.', { error: err }) + + removePoolRequestsFromDB(requests.add.ids) + }) + } + + // Send the remove requests + if (requests.remove.requests.length !== 0) { + makePoolRequest('remove', requests.remove.requests, function (err) { + if (err) logger.error('Errors when sent remove pool requests.', { error: err }) + + removePoolRequestsFromDB(requests.remove.ids) + }) + } + }) + }) + } + + function updatePodsScore (good_pods, bad_pods) { + logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) + + PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec() + PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) { + if (err) throw err + removeBadPods() + }) + } + + function removeBadPods () { + PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) { + if (err) throw err + + if (pods.length === 0) return + + var urls = pluck(pods, 'url') + var ids = pluck(pods, '_id') + + VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) { + if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err }) + var videos_removed = r.result.n + logger.info('Removed %d videos.', videos_removed) + + PodsDB.remove({ _id: { $in: ids } }, function (err, r) { + if (err) logger.error('Cannot remove bad pods.', { error: err }) + + var pods_removed = r.result.n + logger.info('Removed %d pods.', pods_removed) + }) + }) + }) + } + + function makePoolRequest (type, requests, callback) { + if (!callback) callback = function () {} + + PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { + if (err) throw err + + var params = { + encrypt: true, + sign: true, + method: 'POST', + path: null, + data: requests + } + + if (type === 'add') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' + } else if (type === 'remove') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' + } else { + throw new Error('Unkown pool request type.') + } + + var bad_pods = [] + var good_pods = [] + + utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) + + function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { + if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { + bad_pods.push(pod._id) + logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) + } else { + good_pods.push(pod._id) + } + + return callback_each_pod_finished() + } + + function callbackAllPodsFinished (err) { + if (err) return callback(err) + + updatePodsScore(good_pods, bad_pods) + callback(null) + } + }) + } + + // ----------- Public ----------- + poolRequests.activate = function () { + logger.info('Pool requests activated.') + timer = setInterval(makePoolRequests, constants.INTERVAL) + } + + poolRequests.addToPoolRequests = function (id, type, request) { + logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) + + PoolRequestsDB.findOne({ id: id }, function (err, entity) { + if (err) logger.error(err) + + if (entity) { + if (entity.type === type) { + logger.error(new Error('Cannot insert two same requests.')) + return + } + + // Remove the request of the other type + PoolRequestsDB.remove({ id: id }, function (err) { + if (err) logger.error(err) + }) + } else { + PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { + if (err) logger.error(err) + }) + } + }) + } + + poolRequests.deactivate = function () { + logger.info('Pool requests deactivated.') + clearInterval(timer) + } + + poolRequests.forceSend = function () { + logger.info('Force pool requests sending.') + makePoolRequests() + } + + module.exports = poolRequests +})() diff --git a/lib/webTorrentNode.js b/lib/webTorrentNode.js new file mode 100644 index 000000000..8827c68c5 --- /dev/null +++ b/lib/webTorrentNode.js @@ -0,0 +1,160 @@ +;(function () { + 'use strict' + + var config = require('config') + var ipc = require('node-ipc') + var pathUtils = require('path') + var spawn = require('electron-spawn') + + var logger = require('../helpers/logger') + + var host = config.get('webserver.host') + var port = config.get('webserver.port') + + var nodeKey = 'webtorrentnode' + port + var processKey = 'webtorrent' + port + + ipc.config.silent = true + ipc.config.id = nodeKey + + var webtorrentnode = {} + + // Useful for beautiful tests + webtorrentnode.silent = false + + // Useful to kill it + webtorrentnode.app = null + + webtorrentnode.create = function (options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + // Override options + if (options.host) host = options.host + if (options.port) { + port = options.port + nodeKey = 'webtorrentnode' + port + processKey = 'webtorrent' + port + ipc.config.id = nodeKey + } + + ipc.serve(function () { + if (!webtorrentnode.silent) logger.info('IPC server ready.') + + // Run a timeout of 30s after which we exit the process + var timeout_webtorrent_process = setTimeout(function () { + logger.error('Timeout : cannot run the webtorrent process. Please ensure you have electron-prebuilt npm package installed with xvfb-run.') + process.exit() + }, 30000) + + ipc.server.on(processKey + '.ready', function () { + if (!webtorrentnode.silent) logger.info('Webtorrent process ready.') + clearTimeout(timeout_webtorrent_process) + callback() + }) + + ipc.server.on(processKey + '.exception', function (data) { + logger.error('Received exception error from webtorrent process.', { exception: data.exception }) + process.exit() + }) + + var webtorrent_process = spawn(__dirname + '/webtorrent.js', host, port, { detached: true }) + webtorrent_process.stderr.on('data', function (data) { + // logger.debug('Webtorrent process stderr: ', data.toString()) + }) + + webtorrent_process.stdout.on('data', function (data) { + // logger.debug('Webtorrent process:', data.toString()) + }) + + webtorrentnode.app = webtorrent_process + }) + + ipc.server.start() + } + + webtorrentnode.seed = function (path, callback) { + var extension = pathUtils.extname(path) + var basename = pathUtils.basename(path, extension) + var data = { + _id: basename, + args: { + path: path + } + } + + if (!webtorrentnode.silent) logger.debug('Node wants to seed %s.', data._id) + + // Finish signal + var event_key = nodeKey + '.seedDone.' + data._id + ipc.server.on(event_key, function listener (received) { + if (!webtorrentnode.silent) logger.debug('Process seeded torrent %s.', received.magnetUri) + + // This is a fake object, we just use the magnetUri in this project + var torrent = { + magnetURI: received.magnetUri + } + + ipc.server.off(event_key) + callback(torrent) + }) + + ipc.server.broadcast(processKey + '.seed', data) + } + + webtorrentnode.add = function (magnetUri, callback) { + var data = { + _id: magnetUri, + args: { + magnetUri: magnetUri + } + } + + if (!webtorrentnode.silent) logger.debug('Node wants to add ' + data._id) + + // Finish signal + var event_key = nodeKey + '.addDone.' + data._id + ipc.server.on(event_key, function (received) { + if (!webtorrentnode.silent) logger.debug('Process added torrent.') + + // This is a fake object, we just use the magnetUri in this project + var torrent = { + files: received.files + } + + ipc.server.off(event_key) + callback(torrent) + }) + + ipc.server.broadcast(processKey + '.add', data) + } + + webtorrentnode.remove = function (magnetUri, callback) { + var data = { + _id: magnetUri, + args: { + magnetUri: magnetUri + } + } + + if (!webtorrentnode.silent) logger.debug('Node wants to stop seeding %s.', data._id) + + // Finish signal + var event_key = nodeKey + '.removeDone.' + data._id + ipc.server.on(event_key, function (received) { + if (!webtorrentnode.silent) logger.debug('Process removed torrent %s.', data._id) + + var err = null + if (received.err) err = received.err + + ipc.server.off(event_key) + callback(err) + }) + + ipc.server.broadcast(processKey + '.remove', data) + } + + module.exports = webtorrentnode +})() diff --git a/lib/webtorrent.js b/lib/webtorrent.js new file mode 100644 index 000000000..b72bc500d --- /dev/null +++ b/lib/webtorrent.js @@ -0,0 +1,91 @@ +;(function () { + 'use strict' + + module.exports = function (args) { + var WebTorrent = require('webtorrent') + var ipc = require('node-ipc') + + if (args.length !== 3) { + console.log('Wrong arguments number: ' + args.length + '/3') + process.exit(-1) + } + + var host = args[1] + var port = args[2] + var nodeKey = 'webtorrentnode' + port + var processKey = 'webtorrent' + port + + ipc.config.silent = true + ipc.config.id = processKey + + if (host === 'client' && port === '1') global.WEBTORRENT_ANNOUNCE = [] + else global.WEBTORRENT_ANNOUNCE = 'ws://' + host + ':' + port + '/tracker/socket' + var wt = new WebTorrent({ dht: false }) + + function seed (data) { + var args = data.args + var path = args.path + var _id = data._id + + wt.seed(path, { announceList: '' }, function (torrent) { + var to_send = { + magnetUri: torrent.magnetURI + } + + ipc.of[nodeKey].emit(nodeKey + '.seedDone.' + _id, to_send) + }) + } + + function add (data) { + var args = data.args + var magnetUri = args.magnetUri + var _id = data._id + + wt.add(magnetUri, function (torrent) { + var to_send = { + files: [] + } + + torrent.files.forEach(function (file) { + to_send.files.push({ path: file.path }) + }) + + ipc.of[nodeKey].emit(nodeKey + '.addDone.' + _id, to_send) + }) + } + + function remove (data) { + var args = data.args + var magnetUri = args.magnetUri + var _id = data._id + + try { + wt.remove(magnetUri, callback) + } catch (err) { + console.log('Cannot remove the torrent from WebTorrent', { err: err }) + return callback(null) + } + + function callback () { + var to_send = {} + ipc.of[nodeKey].emit(nodeKey + '.removeDone.' + _id, to_send) + } + } + + console.log('Configuration: ' + host + ':' + port) + console.log('Connecting to IPC...') + + ipc.connectTo(nodeKey, function () { + ipc.of[nodeKey].on(processKey + '.seed', seed) + ipc.of[nodeKey].on(processKey + '.add', add) + ipc.of[nodeKey].on(processKey + '.remove', remove) + + ipc.of[nodeKey].emit(processKey + '.ready') + console.log('Ready.') + }) + + process.on('uncaughtException', function (e) { + ipc.of[nodeKey].emit(processKey + '.exception', { exception: e }) + }) + } +})() -- cgit v1.2.3