X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=lib%2FpoolRequests.js;h=796f061491d746d49cb73511095aab749358ac42;hb=c173e56520b0fe4206b9ea8049b6add40bfeabcd;hp=9c7f3238bb03af059615433d2110dc3fae1d6814;hpb=cda021079ff455cc0fd0eb95a5395fa808ab63d1;p=github%2FChocobozzz%2FPeerTube.git diff --git a/lib/poolRequests.js b/lib/poolRequests.js index 9c7f3238b..796f06149 100644 --- a/lib/poolRequests.js +++ b/lib/poolRequests.js @@ -2,36 +2,95 @@ 'use strict' var async = require('async') + var pluck = require('lodash-node/compat/collection/pluck') var constants = require('../initializers/constants') var logger = require('../helpers/logger') - var database = require('../initializers/database') - var pluck = require('lodash-node/compat/collection/pluck') - var PoolRequestsDB = database.PoolRequestsDB - var PodsDB = database.PodsDB + var Pods = require('../models/pods') + var PoolRequests = require('../models/poolRequests') var utils = require('../helpers/utils') - var VideosDB = database.VideosDB - - var poolRequests = {} + var Videos = require('../models/videos') - // ----------- Private ----------- var timer = null - function removePoolRequestsFromDB (ids) { - PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) { - if (err) { - logger.error('Cannot remove requests from the pool requests database.', { error: err }) - return + var poolRequests = { + activate: activate, + deactivate: deactivate, + forceSend: forceSend + } + + function deactivate () { + logger.info('Pool requests deactivated.') + clearInterval(timer) + } + + function forceSend () { + logger.info('Force pool requests sending.') + makePoolRequests() + } + + function activate () { + logger.info('Pool requests activated.') + timer = setInterval(makePoolRequests, constants.INTERVAL) + } + + // --------------------------------------------------------------------------- + + module.exports = poolRequests + + // --------------------------------------------------------------------------- + + function makePoolRequest (type, requests, callback) { + if (!callback) callback = function () {} + + Pods.list(function (err, pods) { + if (err) throw err + + var params = { + encrypt: true, + sign: true, + method: 'POST', + path: null, + data: requests + } + + if (type === 'add') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' + } else if (type === 'remove') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' + } else { + throw new Error('Unkown pool request type.') + } + + var bad_pods = [] + var good_pods = [] + + utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) + + function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { + if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { + bad_pods.push(pod._id) + logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) + } else { + good_pods.push(pod._id) + } + + return callback_each_pod_finished() } - logger.info('Pool requests flushed.') + function callbackAllPodsFinished (err) { + if (err) return callback(err) + + updatePodsScore(good_pods, bad_pods) + callback(null) + } }) } function makePoolRequests () { logger.info('Making pool requests to friends.') - PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) { + PoolRequests.list(function (err, pool_requests) { if (err) throw err if (pool_requests.length === 0) return @@ -65,7 +124,7 @@ makePoolRequest('add', requests.add.requests, function (err) { if (err) logger.error('Errors when sent add pool requests.', { error: err }) - removePoolRequestsFromDB(requests.add.ids) + PoolRequests.removeRequests(requests.add.ids) }) } @@ -74,25 +133,15 @@ makePoolRequest('remove', requests.remove.requests, function (err) { if (err) logger.error('Errors when sent remove pool requests.', { error: err }) - removePoolRequestsFromDB(requests.remove.ids) + PoolRequests.removeRequests(requests.remove.ids) }) } }) }) } - function updatePodsScore (good_pods, bad_pods) { - logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) - - PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec() - PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) { - if (err) throw err - removeBadPods() - }) - } - function removeBadPods () { - PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) { + Pods.findBadPods(function (err, pods) { if (err) throw err if (pods.length === 0) return @@ -100,12 +149,12 @@ var urls = pluck(pods, 'url') var ids = pluck(pods, '_id') - VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) { + Videos.removeAllRemotesOf(urls, function (err, r) { if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err }) var videos_removed = r.result.n logger.info('Removed %d videos.', videos_removed) - PodsDB.remove({ _id: { $in: ids } }, function (err, r) { + Pods.removeAllByIds(ids, function (err, r) { if (err) logger.error('Cannot remove bad pods.', { error: err }) var pods_removed = r.result.n @@ -115,92 +164,13 @@ }) } - function makePoolRequest (type, requests, callback) { - if (!callback) callback = function () {} + function updatePodsScore (good_pods, bad_pods) { + logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) - PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { + Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS) + Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { if (err) throw err - - var params = { - encrypt: true, - sign: true, - method: 'POST', - path: null, - data: requests - } - - if (type === 'add') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' - } else if (type === 'remove') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' - } else { - throw new Error('Unkown pool request type.') - } - - var bad_pods = [] - var good_pods = [] - - utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { - if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { - bad_pods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - good_pods.push(pod._id) - } - - return callback_each_pod_finished() - } - - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - updatePodsScore(good_pods, bad_pods) - callback(null) - } - }) - } - - // ----------- Public ----------- - poolRequests.activate = function () { - logger.info('Pool requests activated.') - timer = setInterval(makePoolRequests, constants.INTERVAL) - } - - poolRequests.addToPoolRequests = function (id, type, request) { - logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) - - PoolRequestsDB.findOne({ id: id }, function (err, entity) { - if (err) logger.error(err) - - if (entity) { - if (entity.type === type) { - logger.error(new Error('Cannot insert two same requests.')) - return - } - - // Remove the request of the other type - PoolRequestsDB.remove({ id: id }, function (err) { - if (err) logger.error(err) - }) - } else { - PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { - if (err) logger.error(err) - }) - } + removeBadPods() }) } - - poolRequests.deactivate = function () { - logger.info('Pool requests deactivated.') - clearInterval(timer) - } - - poolRequests.forceSend = function () { - logger.info('Force pool requests sending.') - makePoolRequests() - } - - module.exports = poolRequests })()