From c173e56520b0fe4206b9ea8049b6add40bfeabcd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 4 Feb 2016 21:10:33 +0100 Subject: Split models --- lib/friends.js | 218 ++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/poolRequests.js | 61 +++------------ lib/videos.js | 51 ++++++++++++ lib/webtorrent.js | 2 +- 4 files changed, 282 insertions(+), 50 deletions(-) create mode 100644 lib/friends.js create mode 100644 lib/videos.js (limited to 'lib') diff --git a/lib/friends.js b/lib/friends.js new file mode 100644 index 000000000..e093c85c4 --- /dev/null +++ b/lib/friends.js @@ -0,0 +1,218 @@ +;(function () { + 'use strict' + + var async = require('async') + var config = require('config') + var fs = require('fs') + var request = require('request') + + var constants = require('../initializers/constants') + var logger = require('../helpers/logger') + var Pods = require('../models/pods') + var PoolRequests = require('../models/poolRequests') + var poolRequests = require('../lib/poolRequests') + var utils = require('../helpers/utils') + var Videos = require('../models/videos') + + var http = config.get('webserver.https') ? 'https' : 'http' + var host = config.get('webserver.host') + var port = config.get('webserver.port') + + var pods = { + addVideoToFriends: addVideoToFriends, + hasFriends: hasFriends, + makeFriends: makeFriends, + quitFriends: quitFriends, + removeVideoToFriends: removeVideoToFriends + } + + function addVideoToFriends (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + // namePath is null + // TODO + video.namePath = null + PoolRequests.addRequest(id, 'add', video) + } + + function hasFriends (callback) { + Pods.count(function (err, count) { + if (err) return callback(err) + + var has_friends = (count !== 0) + callback(null, has_friends) + }) + } + + function makeFriends (callback) { + var pods_score = {} + + logger.info('Make friends!') + fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { + if (err) { + logger.error('Cannot read public cert.', { error: err }) + return callback(err) + } + + var urls = config.get('network.friends') + + async.each(urls, computeForeignPodsList, function () { + logger.debug('Pods scores computed.', { pods_score: pods_score }) + var pods_list = computeWinningPods(urls, pods_score) + logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list }) + + makeRequestsToWinningPods(cert, pods_list) + }) + }) + + // ----------------------------------------------------------------------- + + function computeForeignPodsList (url, callback) { + // Let's give 1 point to the pod we ask the friends list + pods_score[url] = 1 + + getForeignPodsList(url, function (foreign_pods_list) { + if (foreign_pods_list.length === 0) return callback() + + async.each(foreign_pods_list, function (foreign_pod, callback_each) { + var foreign_url = foreign_pod.url + + if (pods_score[foreign_url]) pods_score[foreign_url]++ + else pods_score[foreign_url] = 1 + + callback_each() + }, function () { + callback() + }) + }) + } + + function computeWinningPods (urls, pods_score) { + // Build the list of pods to add + // Only add a pod if it exists in more than a half base pods + var pods_list = [] + var base_score = urls.length / 2 + Object.keys(pods_score).forEach(function (pod) { + if (pods_score[pod] > base_score) pods_list.push({ url: pod }) + }) + + return pods_list + } + + function makeRequestsToWinningPods (cert, pods_list) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + // Get the list of our videos to send to our new friends + Videos.listOwned(function (err, videos_list) { + if (err) throw err + + var data = { + url: http + '://' + host + ':' + port, + publicKey: cert, + videos: videos_list + } + + utils.makeMultipleRetryRequest( + { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, + + pods_list, + + function eachRequest (err, response, body, url, pod, callback_each_request) { + // We add the pod if it responded correctly with its public certificate + if (!err && response.statusCode === 200) { + Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { + if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) + + Videos.addRemotes(body.videos, function (err) { + if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) + + logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) + return callback_each_request() + }) + }) + } else { + logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) + return callback_each_request() + } + }, + + function endRequests (err) { + // Now we made new friends, we can re activate the pool of requests + poolRequests.activate() + + if (err) { + logger.error('There was some errors when we wanted to make friends.', { error: err }) + return callback(err) + } + + logger.debug('makeRequestsToWinningPods finished.') + return callback(null) + } + ) + }) + } + } + + function quitFriends (callback) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + Pods.list(function (err, pods) { + if (err) return callback(err) + + var request = { + method: 'POST', + path: '/api/' + constants.API_VERSION + '/pods/remove', + sign: true, + encrypt: true, + data: { + url: 'me' // Fake data + } + } + + // Announce we quit them + utils.makeMultipleRetryRequest(request, pods, function () { + Pods.removeAll(function (err) { + poolRequests.activate() + + if (err) return callback(err) + + logger.info('Broke friends, so sad :(') + + Videos.removeAllRemotes(function (err) { + if (err) return callback(err) + + logger.info('Removed all remote videos.') + callback(null) + }) + }) + }) + }) + } + + function removeVideoToFriends (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + PoolRequests.addRequest(id, 'remove', video) + } + + // --------------------------------------------------------------------------- + + module.exports = pods + + // --------------------------------------------------------------------------- + + function getForeignPodsList (url, callback) { + var path = '/api/' + constants.API_VERSION + '/pods' + + request.get(url + path, function (err, response, body) { + if (err) throw err + callback(JSON.parse(body)) + }) + } +})() diff --git a/lib/poolRequests.js b/lib/poolRequests.js index 53f47d629..796f06149 100644 --- a/lib/poolRequests.js +++ b/lib/poolRequests.js @@ -5,18 +5,16 @@ var pluck = require('lodash-node/compat/collection/pluck') var constants = require('../initializers/constants') - var database = require('../initializers/database') var logger = require('../helpers/logger') - var PodsDB = database.PodsDB - var PoolRequestsDB = database.PoolRequestsDB + var Pods = require('../models/pods') + var PoolRequests = require('../models/poolRequests') var utils = require('../helpers/utils') - var VideosDB = database.VideosDB + var Videos = require('../models/videos') var timer = null var poolRequests = { activate: activate, - addToPoolRequests: addToPoolRequests, deactivate: deactivate, forceSend: forceSend } @@ -36,30 +34,6 @@ timer = setInterval(makePoolRequests, constants.INTERVAL) } - function addToPoolRequests (id, type, request) { - logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) - - PoolRequestsDB.findOne({ id: id }, function (err, entity) { - if (err) logger.error(err) - - if (entity) { - if (entity.type === type) { - logger.error(new Error('Cannot insert two same requests.')) - return - } - - // Remove the request of the other type - PoolRequestsDB.remove({ id: id }, function (err) { - if (err) logger.error(err) - }) - } else { - PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { - if (err) logger.error(err) - }) - } - }) - } - // --------------------------------------------------------------------------- module.exports = poolRequests @@ -69,7 +43,7 @@ function makePoolRequest (type, requests, callback) { if (!callback) callback = function () {} - PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { + Pods.list(function (err, pods) { if (err) throw err var params = { @@ -116,7 +90,7 @@ function makePoolRequests () { logger.info('Making pool requests to friends.') - PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) { + PoolRequests.list(function (err, pool_requests) { if (err) throw err if (pool_requests.length === 0) return @@ -150,7 +124,7 @@ makePoolRequest('add', requests.add.requests, function (err) { if (err) logger.error('Errors when sent add pool requests.', { error: err }) - removePoolRequestsFromDB(requests.add.ids) + PoolRequests.removeRequests(requests.add.ids) }) } @@ -159,7 +133,7 @@ makePoolRequest('remove', requests.remove.requests, function (err) { if (err) logger.error('Errors when sent remove pool requests.', { error: err }) - removePoolRequestsFromDB(requests.remove.ids) + PoolRequests.removeRequests(requests.remove.ids) }) } }) @@ -167,7 +141,7 @@ } function removeBadPods () { - PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) { + Pods.findBadPods(function (err, pods) { if (err) throw err if (pods.length === 0) return @@ -175,12 +149,12 @@ var urls = pluck(pods, 'url') var ids = pluck(pods, '_id') - VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) { + Videos.removeAllRemotesOf(urls, function (err, r) { if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err }) var videos_removed = r.result.n logger.info('Removed %d videos.', videos_removed) - PodsDB.remove({ _id: { $in: ids } }, function (err, r) { + Pods.removeAllByIds(ids, function (err, r) { if (err) logger.error('Cannot remove bad pods.', { error: err }) var pods_removed = r.result.n @@ -190,22 +164,11 @@ }) } - function removePoolRequestsFromDB (ids) { - PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) { - if (err) { - logger.error('Cannot remove requests from the pool requests database.', { error: err }) - return - } - - logger.info('Pool requests flushed.') - }) - } - function updatePodsScore (good_pods, bad_pods) { logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) - PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec() - PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) { + Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS) + Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { if (err) throw err removeBadPods() }) diff --git a/lib/videos.js b/lib/videos.js new file mode 100644 index 000000000..5d23070a7 --- /dev/null +++ b/lib/videos.js @@ -0,0 +1,51 @@ +;(function () { + 'use strict' + + var async = require('async') + var config = require('config') + var webtorrent = require('../lib/webTorrentNode') + + var logger = require('../helpers/logger') + var Videos = require('../models/videos') + + var uploadDir = __dirname + '/../' + config.get('storage.uploads') + + var videos = { + seed: seed, + seedAllExisting: seedAllExisting + } + + function seed (path, callback) { + logger.info('Seeding %s...', path) + + webtorrent.seed(path, function (torrent) { + logger.info('%s seeded (%s).', path, torrent.magnetURI) + + return callback(null, torrent) + }) + } + + function seedAllExisting (callback) { + Videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos to seed.', { error: err }) + return callback(err) + } + + async.each(videos_list, function (video, each_callback) { + seed(uploadDir + video.namePath, function (err) { + if (err) { + logger.error('Cannot seed this video.', { error: err }) + return callback(err) + } + + each_callback(null) + }) + }, callback) + }) + } + + // --------------------------------------------------------------------------- + + module.exports = videos +})() diff --git a/lib/webtorrent.js b/lib/webtorrent.js index 41e60499f..d1ca3c9f2 100644 --- a/lib/webtorrent.js +++ b/lib/webtorrent.js @@ -62,7 +62,7 @@ try { wt.remove(magnetUri, callback) } catch (err) { - console.log('Cannot remove the torrent from WebTorrent', { err: err }) + console.log('Cannot remove the torrent from WebTorrent') return callback(null) } -- cgit v1.2.3