From b9a3e09ad5a7673f64556d1dba122ed4c4fac980 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 7 Mar 2016 11:33:59 +0100 Subject: Prepare folders structure for angular app --- server/lib/friends.js | 228 ++++++++++++++++++++++++++++++++++++++++ server/lib/poolRequests.js | 221 ++++++++++++++++++++++++++++++++++++++ server/lib/videos.js | 50 +++++++++ server/lib/webtorrent.js | 157 +++++++++++++++++++++++++++ server/lib/webtorrentProcess.js | 92 ++++++++++++++++ 5 files changed, 748 insertions(+) create mode 100644 server/lib/friends.js create mode 100644 server/lib/poolRequests.js create mode 100644 server/lib/videos.js create mode 100644 server/lib/webtorrent.js create mode 100644 server/lib/webtorrentProcess.js (limited to 'server/lib') diff --git a/server/lib/friends.js b/server/lib/friends.js new file mode 100644 index 000000000..006a64404 --- /dev/null +++ b/server/lib/friends.js @@ -0,0 +1,228 @@ +'use strict' + +var async = require('async') +var config = require('config') +var fs = require('fs') +var request = require('request') + +var constants = require('../initializers/constants') +var logger = require('../helpers/logger') +var peertubeCrypto = require('../helpers/peertubeCrypto') +var Pods = require('../models/pods') +var poolRequests = require('../lib/poolRequests') +var requests = require('../helpers/requests') +var Videos = require('../models/videos') + +var http = config.get('webserver.https') ? 'https' : 'http' +var host = config.get('webserver.host') +var port = config.get('webserver.port') + +var pods = { + addVideoToFriends: addVideoToFriends, + hasFriends: hasFriends, + makeFriends: makeFriends, + quitFriends: quitFriends, + removeVideoToFriends: removeVideoToFriends +} + +function addVideoToFriends (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + // ensure namePath is null + video.namePath = null + poolRequests.addRequest(id, 'add', video) +} + +function hasFriends (callback) { + Pods.count(function (err, count) { + if (err) return callback(err) + + var has_friends = (count !== 0) + callback(null, has_friends) + }) +} + +function makeFriends (callback) { + var pods_score = {} + + logger.info('Make friends!') + fs.readFile(peertubeCrypto.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { + if (err) { + logger.error('Cannot read public cert.') + return callback(err) + } + + var urls = config.get('network.friends') + + async.each(urls, function (url, callback) { + computeForeignPodsList(url, pods_score, callback) + }, function (err) { + if (err) return callback(err) + + logger.debug('Pods scores computed.', { pods_score: pods_score }) + var pods_list = computeWinningPods(urls, pods_score) + logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list }) + + makeRequestsToWinningPods(cert, pods_list, callback) + }) + }) +} + +function quitFriends (callback) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + Pods.list(function (err, pods) { + if (err) return callback(err) + + var request = { + method: 'POST', + path: '/api/' + constants.API_VERSION + '/pods/remove', + sign: true, + encrypt: true, + data: { + url: 'me' // Fake data + } + } + + // Announce we quit them + requests.makeMultipleRetryRequest(request, pods, function () { + Pods.removeAll(function (err) { + poolRequests.activate() + + if (err) return callback(err) + + logger.info('Broke friends, so sad :(') + + Videos.removeAllRemotes(function (err) { + if (err) return callback(err) + + logger.info('Removed all remote videos.') + callback(null) + }) + }) + }) + }) +} + +function removeVideoToFriends (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + poolRequests.addRequest(id, 'remove', video) +} + +// --------------------------------------------------------------------------- + +module.exports = pods + +// --------------------------------------------------------------------------- + +function computeForeignPodsList (url, pods_score, callback) { + // Let's give 1 point to the pod we ask the friends list + pods_score[url] = 1 + + getForeignPodsList(url, function (err, foreign_pods_list) { + if (err) return callback(err) + if (foreign_pods_list.length === 0) return callback() + + async.each(foreign_pods_list, function (foreign_pod, callback_each) { + var foreign_url = foreign_pod.url + + if (pods_score[foreign_url]) pods_score[foreign_url]++ + else pods_score[foreign_url] = 1 + + callback_each() + }, function () { + callback() + }) + }) +} + +function computeWinningPods (urls, pods_score) { + // Build the list of pods to add + // Only add a pod if it exists in more than a half base pods + var pods_list = [] + var base_score = urls.length / 2 + Object.keys(pods_score).forEach(function (pod) { + if (pods_score[pod] > base_score) pods_list.push({ url: pod }) + }) + + return pods_list +} + +function getForeignPodsList (url, callback) { + var path = '/api/' + constants.API_VERSION + '/pods' + + request.get(url + path, function (err, response, body) { + if (err) return callback(err) + + callback(null, JSON.parse(body)) + }) +} + +function makeRequestsToWinningPods (cert, pods_list, callback) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + // Get the list of our videos to send to our new friends + Videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get the list of videos we own.') + return callback(err) + } + + var data = { + url: http + '://' + host + ':' + port, + publicKey: cert, + videos: videos_list + } + + requests.makeMultipleRetryRequest( + { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, + + pods_list, + + function eachRequest (err, response, body, url, pod, callback_each_request) { + // We add the pod if it responded correctly with its public certificate + if (!err && response.statusCode === 200) { + Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { + if (err) { + logger.error('Error with adding %s pod.', pod.url, { error: err }) + return callback_each_request() + } + + Videos.addRemotes(body.videos, function (err) { + if (err) { + logger.error('Error with adding videos of pod.', pod.url, { error: err }) + return callback_each_request() + } + + logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) + return callback_each_request() + }) + }) + } else { + logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) + return callback_each_request() + } + }, + + function endRequests (err) { + // Now we made new friends, we can re activate the pool of requests + poolRequests.activate() + + if (err) { + logger.error('There was some errors when we wanted to make friends.') + return callback(err) + } + + logger.debug('makeRequestsToWinningPods finished.') + return callback(null) + } + ) + }) +} diff --git a/server/lib/poolRequests.js b/server/lib/poolRequests.js new file mode 100644 index 000000000..f786c3c7a --- /dev/null +++ b/server/lib/poolRequests.js @@ -0,0 +1,221 @@ +'use strict' + +var async = require('async') +var pluck = require('lodash-node/compat/collection/pluck') + +var constants = require('../initializers/constants') +var logger = require('../helpers/logger') +var Pods = require('../models/pods') +var PoolRequests = require('../models/poolRequests') +var requests = require('../helpers/requests') +var Videos = require('../models/videos') + +var timer = null + +var poolRequests = { + activate: activate, + addRequest: addRequest, + deactivate: deactivate, + forceSend: forceSend +} + +function activate () { + logger.info('Pool requests activated.') + timer = setInterval(makePoolRequests, constants.INTERVAL) +} + +function addRequest (id, type, request) { + logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) + + PoolRequests.findById(id, function (err, entity) { + if (err) { + logger.error('Cannot find one pool request.', { error: err }) + return // Abort + } + + if (entity) { + if (entity.type === type) { + logger.error('Cannot insert two same requests.') + return // Abort + } + + // Remove the request of the other type + PoolRequests.removeRequestById(id, function (err) { + if (err) { + logger.error('Cannot remove a pool request.', { error: err }) + return // Abort + } + }) + } else { + PoolRequests.create(id, type, request, function (err) { + if (err) logger.error('Cannot create a pool request.', { error: err }) + return // Abort + }) + } + }) +} + +function deactivate () { + logger.info('Pool requests deactivated.') + clearInterval(timer) +} + +function forceSend () { + logger.info('Force pool requests sending.') + makePoolRequests() +} + +// --------------------------------------------------------------------------- + +module.exports = poolRequests + +// --------------------------------------------------------------------------- + +function makePoolRequest (type, requests_to_make, callback) { + if (!callback) callback = function () {} + + Pods.list(function (err, pods) { + if (err) return callback(err) + + var params = { + encrypt: true, + sign: true, + method: 'POST', + path: null, + data: requests_to_make + } + + if (type === 'add') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' + } else if (type === 'remove') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' + } else { + return callback(new Error('Unkown pool request type.')) + } + + var bad_pods = [] + var good_pods = [] + + requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) + + function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { + if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { + bad_pods.push(pod._id) + logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) + } else { + good_pods.push(pod._id) + } + + return callback_each_pod_finished() + } + + function callbackAllPodsFinished (err) { + if (err) return callback(err) + + updatePodsScore(good_pods, bad_pods) + callback(null) + } + }) +} + +function makePoolRequests () { + logger.info('Making pool requests to friends.') + + PoolRequests.list(function (err, pool_requests) { + if (err) { + logger.error('Cannot get the list of pool requests.', { err: err }) + return // Abort + } + + if (pool_requests.length === 0) return + + var requests_to_make = { + add: { + ids: [], + requests: [] + }, + remove: { + ids: [], + requests: [] + } + } + + async.each(pool_requests, function (pool_request, callback_each) { + if (pool_request.type === 'add') { + requests_to_make.add.requests.push(pool_request.request) + requests_to_make.add.ids.push(pool_request._id) + } else if (pool_request.type === 'remove') { + requests_to_make.remove.requests.push(pool_request.request) + requests_to_make.remove.ids.push(pool_request._id) + } else { + logger.error('Unkown pool request type.', { request_type: pool_request.type }) + return // abort + } + + callback_each() + }, function () { + // Send the add requests + if (requests_to_make.add.requests.length !== 0) { + makePoolRequest('add', requests_to_make.add.requests, function (err) { + if (err) logger.error('Errors when sent add pool requests.', { error: err }) + + PoolRequests.removeRequests(requests_to_make.add.ids) + }) + } + + // Send the remove requests + if (requests_to_make.remove.requests.length !== 0) { + makePoolRequest('remove', requests_to_make.remove.requests, function (err) { + if (err) logger.error('Errors when sent remove pool requests.', { error: err }) + + PoolRequests.removeRequests(requests_to_make.remove.ids) + }) + } + }) + }) +} + +function removeBadPods () { + Pods.findBadPods(function (err, pods) { + if (err) { + logger.error('Cannot find bad pods.', { error: err }) + return // abort + } + + if (pods.length === 0) return + + var urls = pluck(pods, 'url') + var ids = pluck(pods, '_id') + + Videos.removeAllRemotesOf(urls, function (err, r) { + if (err) { + logger.error('Cannot remove videos from a pod that we removing.', { error: err }) + } else { + var videos_removed = r.result.n + logger.info('Removed %d videos.', videos_removed) + } + + Pods.removeAllByIds(ids, function (err, r) { + if (err) { + logger.error('Cannot remove bad pods.', { error: err }) + } else { + var pods_removed = r.result.n + logger.info('Removed %d pods.', pods_removed) + } + }) + }) + }) +} + +function updatePodsScore (good_pods, bad_pods) { + logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) + + Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { + if (err) logger.error('Cannot increment scores of good pods.') + }) + + Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { + if (err) logger.error('Cannot increment scores of bad pods.') + removeBadPods() + }) +} diff --git a/server/lib/videos.js b/server/lib/videos.js new file mode 100644 index 000000000..2d7d9500d --- /dev/null +++ b/server/lib/videos.js @@ -0,0 +1,50 @@ +'use strict' + +var async = require('async') +var config = require('config') +var path = require('path') +var webtorrent = require('../lib/webtorrent') + +var logger = require('../helpers/logger') +var Videos = require('../models/videos') + +var uploadDir = path.join(__dirname, '..', config.get('storage.uploads')) + +var videos = { + seed: seed, + seedAllExisting: seedAllExisting +} + +function seed (path, callback) { + logger.info('Seeding %s...', path) + + webtorrent.seed(path, function (torrent) { + logger.info('%s seeded (%s).', path, torrent.magnetURI) + + return callback(null, torrent) + }) +} + +function seedAllExisting (callback) { + Videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos to seed.') + return callback(err) + } + + async.each(videos_list, function (video, each_callback) { + seed(uploadDir + video.namePath, function (err) { + if (err) { + logger.error('Cannot seed this video.') + return callback(err) + } + + each_callback(null) + }) + }, callback) + }) +} + +// --------------------------------------------------------------------------- + +module.exports = videos diff --git a/server/lib/webtorrent.js b/server/lib/webtorrent.js new file mode 100644 index 000000000..cb641fead --- /dev/null +++ b/server/lib/webtorrent.js @@ -0,0 +1,157 @@ +'use strict' + +var config = require('config') +var ipc = require('node-ipc') +var pathUtils = require('path') +var spawn = require('electron-spawn') + +var logger = require('../helpers/logger') + +var host = config.get('webserver.host') +var port = config.get('webserver.port') +var nodeKey = 'webtorrentnode' + port +var processKey = 'webtorrentprocess' + port +ipc.config.silent = true +ipc.config.id = nodeKey + +var webtorrent = { + add: add, + app: null, // Pid of the app + create: create, + remove: remove, + seed: seed, + silent: false // Useful for beautiful tests +} + +function create (options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + // Override options + if (options.host) host = options.host + if (options.port) { + port = options.port + nodeKey = 'webtorrentnode' + port + processKey = 'webtorrentprocess' + port + ipc.config.id = nodeKey + } + + ipc.serve(function () { + if (!webtorrent.silent) logger.info('IPC server ready.') + + // Run a timeout of 30s after which we exit the process + var timeout_webtorrent_process = setTimeout(function () { + throw new Error('Timeout : cannot run the webtorrent process. Please ensure you have electron-prebuilt npm package installed with xvfb-run.') + }, 30000) + + ipc.server.on(processKey + '.ready', function () { + if (!webtorrent.silent) logger.info('Webtorrent process ready.') + clearTimeout(timeout_webtorrent_process) + callback() + }) + + ipc.server.on(processKey + '.exception', function (data) { + throw new Error('Received exception error from webtorrent process.' + data.exception) + }) + + var webtorrent_process = spawn(pathUtils.join(__dirname, 'webtorrentProcess.js'), host, port, { detached: true }) + webtorrent_process.stderr.on('data', function (data) { + // logger.debug('Webtorrent process stderr: ', data.toString()) + }) + + webtorrent_process.stdout.on('data', function (data) { + // logger.debug('Webtorrent process:', data.toString()) + }) + + webtorrent.app = webtorrent_process + }) + + ipc.server.start() +} + +function seed (path, callback) { + var extension = pathUtils.extname(path) + var basename = pathUtils.basename(path, extension) + var data = { + _id: basename, + args: { + path: path + } + } + + if (!webtorrent.silent) logger.debug('Node wants to seed %s.', data._id) + + // Finish signal + var event_key = nodeKey + '.seedDone.' + data._id + ipc.server.on(event_key, function listener (received) { + if (!webtorrent.silent) logger.debug('Process seeded torrent %s.', received.magnetUri) + + // This is a fake object, we just use the magnetUri in this project + var torrent = { + magnetURI: received.magnetUri + } + + ipc.server.off(event_key) + callback(torrent) + }) + + ipc.server.broadcast(processKey + '.seed', data) +} + +function add (magnetUri, callback) { + var data = { + _id: magnetUri, + args: { + magnetUri: magnetUri + } + } + + if (!webtorrent.silent) logger.debug('Node wants to add ' + data._id) + + // Finish signal + var event_key = nodeKey + '.addDone.' + data._id + ipc.server.on(event_key, function (received) { + if (!webtorrent.silent) logger.debug('Process added torrent.') + + // This is a fake object, we just use the magnetUri in this project + var torrent = { + files: received.files + } + + ipc.server.off(event_key) + callback(torrent) + }) + + ipc.server.broadcast(processKey + '.add', data) +} + +function remove (magnetUri, callback) { + var data = { + _id: magnetUri, + args: { + magnetUri: magnetUri + } + } + + if (!webtorrent.silent) logger.debug('Node wants to stop seeding %s.', data._id) + + // Finish signal + var event_key = nodeKey + '.removeDone.' + data._id + ipc.server.on(event_key, function (received) { + if (!webtorrent.silent) logger.debug('Process removed torrent %s.', data._id) + + var err = null + if (received.err) err = received.err + + ipc.server.off(event_key) + callback(err) + }) + + ipc.server.broadcast(processKey + '.remove', data) +} + +// --------------------------------------------------------------------------- + +module.exports = webtorrent diff --git a/server/lib/webtorrentProcess.js b/server/lib/webtorrentProcess.js new file mode 100644 index 000000000..7da52523a --- /dev/null +++ b/server/lib/webtorrentProcess.js @@ -0,0 +1,92 @@ +'use strict' + +var WebTorrent = require('webtorrent') +var ipc = require('node-ipc') + +function webtorrent (args) { + if (args.length !== 3) { + throw new Error('Wrong arguments number: ' + args.length + '/3') + } + + var host = args[1] + var port = args[2] + var nodeKey = 'webtorrentnode' + port + var processKey = 'webtorrentprocess' + port + + ipc.config.silent = true + ipc.config.id = processKey + + if (host === 'client' && port === '1') global.WEBTORRENT_ANNOUNCE = [] + else global.WEBTORRENT_ANNOUNCE = 'ws://' + host + ':' + port + '/tracker/socket' + var wt = new WebTorrent({ dht: false }) + + function seed (data) { + var args = data.args + var path = args.path + var _id = data._id + + wt.seed(path, { announceList: '' }, function (torrent) { + var to_send = { + magnetUri: torrent.magnetURI + } + + ipc.of[nodeKey].emit(nodeKey + '.seedDone.' + _id, to_send) + }) + } + + function add (data) { + var args = data.args + var magnetUri = args.magnetUri + var _id = data._id + + wt.add(magnetUri, function (torrent) { + var to_send = { + files: [] + } + + torrent.files.forEach(function (file) { + to_send.files.push({ path: file.path }) + }) + + ipc.of[nodeKey].emit(nodeKey + '.addDone.' + _id, to_send) + }) + } + + function remove (data) { + var args = data.args + var magnetUri = args.magnetUri + var _id = data._id + + try { + wt.remove(magnetUri, callback) + } catch (err) { + console.log('Cannot remove the torrent from WebTorrent.') + return callback(null) + } + + function callback () { + var to_send = {} + ipc.of[nodeKey].emit(nodeKey + '.removeDone.' + _id, to_send) + } + } + + console.log('Configuration: ' + host + ':' + port) + console.log('Connecting to IPC...') + + ipc.connectTo(nodeKey, function () { + ipc.of[nodeKey].on(processKey + '.seed', seed) + ipc.of[nodeKey].on(processKey + '.add', add) + ipc.of[nodeKey].on(processKey + '.remove', remove) + + ipc.of[nodeKey].emit(processKey + '.ready') + console.log('Ready.') + }) + + process.on('uncaughtException', function (e) { + ipc.of[nodeKey].emit(processKey + '.exception', { exception: e }) + }) +} + +// --------------------------------------------------------------------------- + +module.exports = webtorrent -- cgit v1.2.3