From 0b69752270f1ceea06a29872b3db23660a55d6d3 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 4 Dec 2015 16:13:32 +0100 Subject: Add a pool of requests instead of making a request at each action (add video/remove video) for performance in big networks --- src/customValidators.js | 29 +++++++++ src/database.js | 12 +++- src/pods.js | 89 ++++----------------------- src/poolRequests.js | 158 ++++++++++++++++++++++++++++++++++++++++++++++++ src/utils.js | 6 +- src/videos.js | 100 +++++++++++++++--------------- 6 files changed, 264 insertions(+), 130 deletions(-) create mode 100644 src/customValidators.js create mode 100644 src/poolRequests.js (limited to 'src') diff --git a/src/customValidators.js b/src/customValidators.js new file mode 100644 index 000000000..73c2f8461 --- /dev/null +++ b/src/customValidators.js @@ -0,0 +1,29 @@ +;(function () { + 'use strict' + + var validator = require('validator') + + var customValidators = {} + + customValidators.eachIsRemoteVideosAddValid = function (values) { + return values.every(function (val) { + return validator.isLength(val.name, 1, 50) && + validator.isLength(val.description, 1, 50) && + validator.isLength(val.magnetUri, 10) && + validator.isURL(val.podUrl) + }) + } + + customValidators.eachIsRemoteVideosRemoveValid = function (values) { + return values.every(function (val) { + return validator.isLength(val.magnetUri, 10) + }) + } + + customValidators.isArray = function (value) { + return Array.isArray(value) + } + + // ----------- Export ----------- + module.exports = customValidators +})() diff --git a/src/database.js b/src/database.js index 740e89fa4..514a622dc 100644 --- a/src/database.js +++ b/src/database.js @@ -30,6 +30,15 @@ var PodsDB = mongoose.model('pods', podsSchema) + // ----------- PoolRequests ----------- + var poolRequestsSchema = mongoose.Schema({ + type: String, + id: String, // Special id to find duplicates (video created we want to remove...) + request: mongoose.Schema.Types.Mixed + }) + + var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema) + // ----------- Connection ----------- mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname) @@ -45,6 +54,7 @@ // ----------- Export ----------- module.exports = { VideosDB: VideosDB, - PodsDB: PodsDB + PodsDB: PodsDB, + PoolRequestsDB: PoolRequestsDB } })() diff --git a/src/pods.js b/src/pods.js index e26b3f0ae..9afc6cc96 100644 --- a/src/pods.js +++ b/src/pods.js @@ -8,6 +8,7 @@ var logger = require('./logger') var PodsDB = require('./database').PodsDB + var poolRequests = require('./poolRequests') var utils = require('./utils') var pods = {} @@ -16,13 +17,6 @@ var host = config.get('webserver.host') var port = config.get('webserver.port') - // ----------- Constants ----------- - - var PODS_SCORE = { - MALUS: -10, - BONUS: 10 - } - // ----------- Private functions ----------- function getForeignPodsList (url, callback) { @@ -34,25 +28,6 @@ }) } - function updatePodsScore (good_pods, bad_pods) { - logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) - - PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec() - PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) { - if (err) throw err - removeBadPods() - }) - } - - function removeBadPods () { - PodsDB.remove({ score: 0 }, function (err, result) { - if (err) throw err - - var number_removed = result.result.n - if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) - }) - } - // ----------- Public functions ----------- pods.list = function (callback) { @@ -93,58 +68,16 @@ }) } - // { path, data } - pods.makeSecureRequest = function (data, callback) { - if (callback === undefined) callback = function () {} - - PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { - if (err) { - logger.error('Cannot get the list of the pods.', { error: err }) - return callback(err) - } - - logger.debug('Make multiple requests.') - - var params = { - encrypt: true, - sign: true, - method: data.method, - path: data.path, - data: data.data - } - - var bad_pods = [] - var good_pods = [] - - utils.makeMultipleRetryRequest( - params, - - pods, - - function callbackEachPodFinished (err, response, body, pod, callback_each_pod_finished) { - if (err || response.statusCode !== 200) { - bad_pods.push(pod._id) - logger.error('Error sending secure request to %s/%s pod.', pod.url, data.path, { error: err }) - } else { - good_pods.push(pod._id) - } - - return callback_each_pod_finished() - }, - - function callbackAllPodsFinished (err) { - if (err) { - logger.error('There was some errors when sending the video meta data.', { error: err }) - return callback(err) - } - - logger.debug('Finished') + pods.addVideoToFriends = function (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + poolRequests.addToPoolRequests(id, 'add', video) + } - updatePodsScore(good_pods, bad_pods) - callback(null) - } - ) - }) + pods.removeVideoToFriends = function (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + poolRequests.addToPoolRequests(id, 'remove', video) } pods.makeFriends = function (callback) { @@ -214,7 +147,7 @@ pods_list, - function eachRequest (err, response, body, pod, callback_each_request) { + function eachRequest (err, response, body, url, pod, callback_each_request) { // We add the pod if it responded correctly with its public certificate if (!err && response.statusCode === 200) { pods.add({ url: pod.url, publicKey: body.cert, score: global.FRIEND_BASE_SCORE }, function (err) { diff --git a/src/poolRequests.js b/src/poolRequests.js new file mode 100644 index 000000000..b117c9923 --- /dev/null +++ b/src/poolRequests.js @@ -0,0 +1,158 @@ +;(function () { + 'use strict' + + var async = require('async') + + var logger = require('./logger') + var database = require('./database') + var PoolRequestsDB = database.PoolRequestsDB + var PodsDB = database.PodsDB + var utils = require('./utils') + + var poolRequests = {} + + // ----------- Constants ----------- + + // Time to wait between requests to the friends + var INTERVAL = utils.isTestInstance() ? 10000 : 60000 + var PODS_SCORE = { + MALUS: -10, + BONUS: 10 + } + + // ----------- Private ----------- + var timer = null + + function makePoolRequests () { + logger.info('Making pool requests to friends.') + + PoolRequestsDB.find({}, { type: 1, request: 1 }, function (err, pool_requests) { + if (err) throw err + + var requests = { + add: [], + remove: [] + } + + async.each(pool_requests, function (pool_request, callback_each) { + if (pool_request.type === 'add') { + requests.add.push(pool_request.request) + } else if (pool_request.type === 'remove') { + requests.remove.push(pool_request.request) + } else { + throw new Error('Unkown pool request type.') + } + + callback_each() + }, function () { + makePoolRequest('add', requests.add) + makePoolRequest('remove', requests.remove) + logger.info('Pool requests to friends sent.') + }) + }) + } + + function updatePodsScore (good_pods, bad_pods) { + logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) + + PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec() + PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) { + if (err) throw err + removeBadPods() + }) + } + + function removeBadPods () { + PodsDB.remove({ score: 0 }, function (err, result) { + if (err) throw err + + var number_removed = result.result.n + if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) + }) + } + + function makePoolRequest (type, requests) { + logger.debug('Make pool requests scheduled.') + PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { + if (err) throw err + + var params = { + encrypt: true, + sign: true, + method: 'POST', + path: null, + data: requests + } + + if (type === 'add') { + params.path = '/api/' + global.API_VERSION + '/remotevideos/add' + } else if (type === 'remove') { + params.path = '/api/' + global.API_VERSION + '/remotevideos/remove' + } else { + throw new Error('Unkown pool request type.') + } + + var bad_pods = [] + var good_pods = [] + + utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) + + function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { + if (err || response.statusCode !== 200) { + bad_pods.push(pod._id) + logger.error('Error sending secure request to %s pod.', url, { error: err }) + } else { + good_pods.push(pod._id) + } + + return callback_each_pod_finished() + } + + function callbackAllPodsFinished (err) { + if (err) { + logger.error('There was some errors when sending the video meta data.', { error: err }) + } + + updatePodsScore(good_pods, bad_pods) + PoolRequestsDB.remove().exec() + } + }) + } + + // ----------- Public ----------- + poolRequests.activate = function () { + logger.info('Pool requests activated.') + timer = setInterval(makePoolRequests, INTERVAL) + } + + poolRequests.addToPoolRequests = function (id, type, request) { + logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) + + PoolRequestsDB.findOne({ id: id }, function (err, entity) { + if (err) logger.error(err) + + if (entity) { + if (entity.type === type) { + logger.error(new Error('Cannot insert two same requests.')) + return + } + + // Remove the request of the other type + PoolRequestsDB.remove({ id: id }, function (err) { + if (err) logger.error(err) + }) + } else { + PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { + if (err) logger.error(err) + }) + } + }) + } + + poolRequests.deactivate = function () { + logger.info('Pool requests deactivated.') + clearInterval(timer) + } + + module.exports = poolRequests +})() diff --git a/src/utils.js b/src/utils.js index dda6c7a0a..4aa1fc55e 100644 --- a/src/utils.js +++ b/src/utils.js @@ -36,7 +36,7 @@ replay( request.post(params, function (err, response, body) { - callbackEach(err, response, body, to_pod) + callbackEach(err, response, body, params.url, to_pod) }), { retries: retries, @@ -71,8 +71,8 @@ // Make a request for each pod async.each(pods, function (pod, callback_each_async) { - function callbackEachRetryRequest (err, response, body, pod) { - callbackEach(err, response, body, pod, function () { + function callbackEachRetryRequest (err, response, body, url, pod) { + callbackEach(err, response, body, url, pod, function () { callback_each_async() }) } diff --git a/src/videos.js b/src/videos.js index 8c44cad95..e3a5b49f1 100644 --- a/src/videos.js +++ b/src/videos.js @@ -3,6 +3,7 @@ var async = require('async') var config = require('config') + var dz = require('dezalgo') var fs = require('fs') var webtorrent = require('./webTorrentNode') @@ -67,19 +68,10 @@ return callback(err) } - // Now we'll send the video's meta data + // Now we'll add the video's meta data to our friends params.namePath = null - logger.info('Sending %s video to friends.', video_file.path) - - var data = { - path: '/api/' + global.API_VERSION + '/remotevideos/add', - method: 'POST', - data: params - } - - // Do not wait the secure requests - pods.makeSecureRequest(data) + pods.addVideoToFriends(params) callback(null) }) }) @@ -124,16 +116,12 @@ return callback(err) } - var data = { - path: '/api/' + global.API_VERSION + '/remotevideos/remove', - method: 'POST', - data: { - magnetUri: video.magnetUri - } + var params = { + name: video.name, + magnetUri: video.magnetUri } - // Yes this is a POST request because we add some informations in the body (signature, encrypt etc) - pods.makeSecureRequest(data) + pods.removeVideoToFriends(params) callback(null) }) }) @@ -142,49 +130,65 @@ } // Use the magnet Uri because the _id field is not the same on different servers - videos.removeRemote = function (fromUrl, magnetUri, callback) { - VideosDB.findOne({ magnetUri: magnetUri }, function (err, video) { - if (err || !video) { - logger.error('Cannot find the torrent URI of this remote video.') + videos.removeRemotes = function (fromUrl, magnetUris, callback) { + VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { + if (err || !videos) { + logger.error('Cannot find the torrent URI of these remote videos.') return callback(err) } - // TODO: move to reqValidators middleware ? - if (video.podUrl !== fromUrl) { - logger.error('The pod has not the rights on this video.') - return callback(err) - } + var to_remove = [] + async.each(videos, function (video, callback_async) { + callback_async = dz(callback_async) - VideosDB.findByIdAndRemove(video._id, function (err) { - if (err) { - logger.error('Cannot remove the remote video.') - return callback(err) + if (video.podUrl !== fromUrl) { + logger.error('The pod %s has not the rights on the video of %s.', fromUrl, video.podUrl) + } else { + to_remove.push(video._id) } - callback(null) + callback_async() + }, function () { + VideosDB.remove({ _id: { $in: to_remove } }, function (err) { + if (err) { + logger.error('Cannot remove the remote videos.') + return callback(err) + } + + callback(null) + }) }) }) } // { name, magnetUri, podUrl } - videos.addRemote = function (data, callback) { - logger.debug('Add remote video from pod: %s', data.podUrl) - - var params = { - name: data.name, - namePath: null, - description: data.description, - magnetUri: data.magnetUri, - podUrl: data.podUrl - } + videos.addRemotes = function (videos, callback) { + var to_add = [] - VideosDB.create(params, function (err, video) { - if (err) { - logger.error('Cannot insert this remote video.', { error: err }) - return callback(err) + async.each(videos, function (video, callback_each) { + callback_each = dz(callback_each) + logger.debug('Add remote video from pod: %s', video.podUrl) + + var params = { + name: video.name, + namePath: null, + description: video.description, + magnetUri: video.magnetUri, + podUrl: video.podUrl } - return callback(null, video) + to_add.push(params) + + callback_each() + }, function () { + VideosDB.create(to_add, function (err, videos) { + if (err) { + logger.error('Cannot insert this remote video.', { error: err }) + return callback(err) + } + + return callback(null, videos) + }) }) } -- cgit v1.2.3