From 00057e85a703713a8f0d96e01c49978be0987eb2 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 28 Jun 2016 20:10:32 +0200 Subject: Request model refractoring -> use mongoose api --- server/models/pods.js | 7 +- server/models/request.js | 280 ++++++++++++++++++++++++++++++++++++++++++++++ server/models/requests.js | 73 ------------ 3 files changed, 286 insertions(+), 74 deletions(-) create mode 100644 server/models/request.js delete mode 100644 server/models/requests.js (limited to 'server/models') diff --git a/server/models/pods.js b/server/models/pods.js index daeadeb07..9502d92e4 100644 --- a/server/models/pods.js +++ b/server/models/pods.js @@ -1,6 +1,7 @@ 'use strict' const mongoose = require('mongoose') +const map = require('lodash/map') const constants = require('../initializers/constants') const logger = require('../helpers/logger') @@ -76,7 +77,11 @@ function list (callback) { } function listAllIds (callback) { - return PodsDB.find({}, { _id: 1 }, callback) + return PodsDB.find({}, { _id: 1 }, function (err, pods) { + if (err) return callback(err) + + return callback(null, map(pods, '_id')) + }) } function listAllUrls (callback) { diff --git a/server/models/request.js b/server/models/request.js new file mode 100644 index 000000000..2a407388a --- /dev/null +++ b/server/models/request.js @@ -0,0 +1,280 @@ +'use strict' + +const async = require('async') +const map = require('lodash/map') +const mongoose = require('mongoose') + +const constants = require('../initializers/constants') +const logger = require('../helpers/logger') +const Pods = require('../models/pods') +const requests = require('../helpers/requests') + +const Video = mongoose.model('Video') + +let timer = null + +// --------------------------------------------------------------------------- + +const RequestSchema = mongoose.Schema({ + request: mongoose.Schema.Types.Mixed, + to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ] +}) + +RequestSchema.statics = { + activate, + deactivate, + flush, + forceSend +} + +RequestSchema.pre('save', function (next) { + const self = this + + if (self.to.length === 0) { + Pods.listAllIds(function (err, podIds) { + if (err) return next(err) + + // No friends + if (podIds.length === 0) return + + self.to = podIds + return next() + }) + } else { + return next() + } +}) + +mongoose.model('Request', RequestSchema) + +// ------------------------------ STATICS ------------------------------ + +function activate () { + logger.info('Requests scheduler activated.') + timer = setInterval(makeRequests.bind(this), constants.INTERVAL) +} + +function deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(timer) +} + +function flush () { + removeAll.call(this, function (err) { + if (err) logger.error('Cannot flush the requests.', { error: err }) + }) +} + +function forceSend () { + logger.info('Force requests scheduler sending.') + makeRequests.call(this) +} + +// --------------------------------------------------------------------------- + +// Make a requests to friends of a certain type +function makeRequest (toPod, requestsToMake, callback) { + if (!callback) callback = function () {} + + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) + } + + return callback(true) + }) +} + +// Make all the requests of the scheduler +function makeRequests () { + const self = this + + list.call(self, function (err, requests) { + if (err) { + logger.error('Cannot get the list of requests.', { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No requests to make.') + return + } + + logger.info('Making requests to friends.') + + // Requests by pods id + const requestsToMake = {} + + requests.forEach(function (poolRequest) { + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) + }) + + const goodPods = [] + const badPods = [] + + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) { + logger.error('Error finding pod by id.', { err: err }) + return callbackEach() + } + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + removePodOf.call(self, requestToMake.ids, toPodId) + return callbackEach() + } + + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + removePodOf.call(self, requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } + + callbackEach() + }) + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + removeWithEmptyTo.call(self) + }) + }) +} + +// Remove pods with a score of 0 (too many requests where they were unreachable) +function removeBadPods () { + async.waterfall([ + function findBadPods (callback) { + Pods.findBadPods(function (err, pods) { + if (err) { + logger.error('Cannot find bad pods.', { error: err }) + return callback(err) + } + + return callback(null, pods) + }) + }, + + function listVideosOfTheseBadPods (pods, callback) { + if (pods.length === 0) return callback(null) + + const urls = map(pods, 'url') + const ids = map(pods, '_id') + + Video.listByUrls(urls, function (err, videosList) { + if (err) { + logger.error('Cannot list videos urls.', { error: err, urls: urls }) + return callback(null, ids, []) + } + + return callback(null, ids, videosList) + }) + }, + + function removeVideosOfTheseBadPods (podIds, videosList, callback) { + // We don't have to remove pods, skip + if (typeof podIds === 'function') return podIds(null) + + async.each(videosList, function (video, callbackEach) { + video.remove(callbackEach) + }, function (err) { + if (err) { + // Don't stop the process + logger.error('Error while removing videos of bad pods.', { error: err }) + return + } + + return callback(null, podIds) + }) + }, + + function removeBadPodsFromDB (podIds, callback) { + // We don't have to remove pods, skip + if (typeof podIds === 'function') return podIds(null) + + Pods.removeAllByIds(podIds, callback) + } + ], function (err, removeResult) { + if (err) { + logger.error('Cannot remove bad pods.', { error: err }) + } else if (removeResult) { + const podsRemoved = removeResult.result.n + logger.info('Removed %d pods.', podsRemoved) + } else { + logger.info('No need to remove bad pods.') + } + }) +} + +function updatePodsScore (goodPods, badPods) { + logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) + + Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { + if (err) logger.error('Cannot increment scores of good pods.') + }) + + Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { + if (err) logger.error('Cannot decrement scores of bad pods.') + removeBadPods() + }) +} + +function list (callback) { + this.find({ }, { _id: 1, request: 1, to: 1 }, callback) +} + +function removeAll (callback) { + this.remove({ }, callback) +} + +function removePodOf (requestsIds, podId, callback) { + if (!callback) callback = function () {} + + this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) +} + +function removeWithEmptyTo (callback) { + if (!callback) callback = function () {} + + this.remove({ to: { $size: 0 } }, callback) +} diff --git a/server/models/requests.js b/server/models/requests.js deleted file mode 100644 index e67ccad56..000000000 --- a/server/models/requests.js +++ /dev/null @@ -1,73 +0,0 @@ -'use strict' - -const mongoose = require('mongoose') - -const logger = require('../helpers/logger') - -// --------------------------------------------------------------------------- - -const requestsSchema = mongoose.Schema({ - request: mongoose.Schema.Types.Mixed, - to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ] -}) -const RequestsDB = mongoose.model('requests', requestsSchema) - -// --------------------------------------------------------------------------- - -const Requests = { - create: create, - findById: findById, - list: list, - removeAll: removeAll, - removePodOf: removePodOf, - removeRequestById: removeRequestById, - removeRequests: removeRequests, - removeWithEmptyTo: removeWithEmptyTo -} - -function create (request, to, callback) { - RequestsDB.create({ request: request, to: to }, callback) -} - -function findById (id, callback) { - RequestsDB.findOne({ id: id }, callback) -} - -function list (callback) { - RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback) -} - -function removeAll (callback) { - RequestsDB.remove({ }, callback) -} - -function removePodOf (requestsIds, podId, callback) { - if (!callback) callback = function () {} - - RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) -} - -function removeRequestById (id, callback) { - RequestsDB.remove({ id: id }, callback) -} - -function removeRequests (ids) { - RequestsDB.remove({ _id: { $in: ids } }, function (err) { - if (err) { - logger.error('Cannot remove requests from the requests database.', { error: err }) - return // Abort - } - - logger.info('Pool requests flushed.') - }) -} - -function removeWithEmptyTo (callback) { - if (!callback) callback = function () {} - - RequestsDB.remove({ to: { $size: 0 } }, callback) -} - -// --------------------------------------------------------------------------- - -module.exports = Requests -- cgit v1.2.3