From 45239549bf2659998dcf9196d86974b0b625912e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Sat, 23 Jan 2016 18:31:58 +0100 Subject: Finalise the join in a network and add the ability to quit it --- src/pods.js | 152 ++++++++++++++++++++++++++++++++++++++++++---------- src/poolRequests.js | 32 +++++++++-- src/utils.js | 10 ++-- src/videos.js | 34 ++++++++++++ 4 files changed, 191 insertions(+), 37 deletions(-) (limited to 'src') diff --git a/src/pods.js b/src/pods.js index 8da216a55..defa9b1c1 100644 --- a/src/pods.js +++ b/src/pods.js @@ -43,7 +43,9 @@ } // { url } + // TODO: check if the pod is not already a friend pods.add = function (data, callback) { + var videos = require('./videos') logger.info('Adding pod: %s', data.url) var params = { @@ -58,13 +60,38 @@ return callback(err) } + videos.addRemotes(data.videos) + fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { if (err) { logger.error('Cannot read cert file.', { error: err }) return callback(err) } - return callback(null, { cert: cert }) + videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get the list of owned videos.', { error: err }) + return callback(err) + } + + return callback(null, { cert: cert, videos: videos_list }) + }) + }) + }) + } + + pods.remove = function (url, callback) { + var videos = require('./videos') + logger.info('Removing %s pod.', url) + + videos.removeAllRemotesOf(url, function (err) { + if (err) logger.error('Cannot remove all remote videos of %s.', url) + + PodsDB.remove({ url: url }, function (err) { + if (err) return callback(err) + + logger.info('%s pod removed.', url) + callback(null) }) }) } @@ -82,6 +109,7 @@ } pods.makeFriends = function (callback) { + var videos = require('./videos') var pods_score = {} logger.info('Make friends!') @@ -137,43 +165,109 @@ } function makeRequestsToWinningPods (cert, pods_list) { - var data = { - url: http + '://' + host + ':' + port, - publicKey: cert - } + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + // Get the list of our videos to send to our new friends + videos.listOwned(function (err, videos_list) { + if (err) throw err + + var data = { + url: http + '://' + host + ':' + port, + publicKey: cert, + videos: videos_list + } - utils.makeMultipleRetryRequest( - { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, + utils.makeMultipleRetryRequest( + { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, - pods_list, + pods_list, - function eachRequest (err, response, body, url, pod, callback_each_request) { - // We add the pod if it responded correctly with its public certificate - if (!err && response.statusCode === 200) { - pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { - if (err) { - logger.error('Error with adding %s pod.', pod.url, { error: err }) - } + function eachRequest (err, response, body, url, pod, callback_each_request) { + // We add the pod if it responded correctly with its public certificate + if (!err && response.statusCode === 200) { + pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { + if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) + videos.addRemotes(body.videos, function (err) { + if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) + + logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) + return callback_each_request() + }) + }) + } else { + logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) return callback_each_request() - }) - } else { - logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) - return callback_each_request() - } - }, + } + }, - function endRequests (err) { - if (err) { - logger.error('There was some errors when we wanted to make friends.', { error: err }) - return callback(err) + function endRequests (err) { + // Now we made new friends, we can re activate the pool of requests + poolRequests.activate() + + if (err) { + logger.error('There was some errors when we wanted to make friends.', { error: err }) + return callback(err) + } + + logger.debug('makeRequestsToWinningPods finished.') + return callback(null) } + ) + }) + } + } - logger.debug('makeRequestsToWinningPods finished.') - return callback(null) + pods.quitFriends = function (callback) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + PodsDB.find(function (err, pods) { + if (err) return callback(err) + + var request = { + method: 'POST', + path: '/api/' + constants.API_VERSION + '/pods/remove', + sign: true, + encrypt: true, + data: { + url: 'me' // Fake data } - ) - } + } + + // Announce we quit them + utils.makeMultipleRetryRequest(request, pods, function () { + PodsDB.remove(function (err) { + poolRequests.activate() + + if (err) return callback(err) + + logger.info('Broke friends, so sad :(') + + var videos = require('./videos') + videos.removeAllRemotes(function (err) { + if (err) return callback(err) + + logger.info('Removed all remote videos.') + callback(null) + }) + }) + }) + }) + } + + pods.hasFriends = function (callback) { + PodsDB.count(function (err, count) { + if (err) return callback(err) + + var has_friends = (count !== 0) + callback(null, has_friends) + }) } module.exports = pods diff --git a/src/poolRequests.js b/src/poolRequests.js index edb12b1e8..7f422f372 100644 --- a/src/poolRequests.js +++ b/src/poolRequests.js @@ -6,9 +6,11 @@ var constants = require('./constants') var logger = require('./logger') var database = require('./database') + var pluck = require('lodash-node/compat/collection/pluck') var PoolRequestsDB = database.PoolRequestsDB var PodsDB = database.PodsDB var utils = require('./utils') + var VideosDB = database.VideosDB var poolRequests = {} @@ -90,11 +92,26 @@ } function removeBadPods () { - PodsDB.remove({ score: 0 }, function (err, result) { + PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) { if (err) throw err - var number_removed = result.result.n - if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) + if (pods.length === 0) return + + var urls = pluck(pods, 'url') + var ids = pluck(pods, '_id') + + VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) { + if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err }) + var videos_removed = r.result.n + logger.info('Removed %d videos.', videos_removed) + + PodsDB.remove({ _id: { $in: ids } }, function (err, r) { + if (err) logger.error('Cannot remove bad pods.', { error: err }) + + var pods_removed = r.result.n + logger.info('Removed %d pods.', pods_removed) + }) + }) }) } @@ -126,9 +143,9 @@ utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { - if (err || response.statusCode !== 200) { + if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { bad_pods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err }) + logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) } else { good_pods.push(pod._id) } @@ -180,5 +197,10 @@ clearInterval(timer) } + poolRequests.forceSend = function () { + logger.info('Force pool requests sending.') + makePoolRequests() + } + module.exports = poolRequests })() diff --git a/src/utils.js b/src/utils.js index 5880c6c90..176648a31 100644 --- a/src/utils.js +++ b/src/utils.js @@ -56,7 +56,7 @@ utils.makeMultipleRetryRequest = function (all_data, pods, callbackEach, callback) { if (!callback) { callback = callbackEach - callbackEach = function () {} + callbackEach = null } var url = http + '://' + host + ':' + port @@ -71,9 +71,13 @@ // Make a request for each pod async.each(pods, function (pod, callback_each_async) { function callbackEachRetryRequest (err, response, body, url, pod) { - callbackEach(err, response, body, url, pod, function () { + if (callbackEach !== null) { + callbackEach(err, response, body, url, pod, function () { + callback_each_async() + }) + } else { callback_each_async() - }) + } } var params = { diff --git a/src/videos.js b/src/videos.js index 32f26abe7..90821fdf6 100644 --- a/src/videos.js +++ b/src/videos.js @@ -43,6 +43,18 @@ }) } + videos.listOwned = function (callback) { + // If namePath is not null this is *our* video + VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos.', { error: err }) + return callback(err) + } + + return callback(null, videos_list) + }) + } + videos.add = function (data, callback) { var video_file = data.video var video_data = data.data @@ -131,6 +143,8 @@ // Use the magnet Uri because the _id field is not the same on different servers videos.removeRemotes = function (fromUrl, magnetUris, callback) { + if (callback === undefined) callback = function () {} + VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { if (err || !videos) { logger.error('Cannot find the torrent URI of these remote videos.') @@ -155,14 +169,34 @@ return callback(err) } + logger.info('Removed remote videos from %s.', fromUrl) callback(null) }) }) }) } + videos.removeAllRemotes = function (callback) { + VideosDB.remove({ namePath: null }, function (err) { + if (err) return callback(err) + + callback(null) + }) + } + + videos.removeAllRemotesOf = function (fromUrl, callback) { + VideosDB.remove({ podUrl: fromUrl }, function (err) { + if (err) return callback(err) + + callback(null) + }) + } + // { name, magnetUri, podUrl } + // TODO: avoid doublons videos.addRemotes = function (videos, callback) { + if (callback === undefined) callback = function () {} + var to_add = [] async.each(videos, function (video, callback_each) { -- cgit v1.2.3