From 00057e85a703713a8f0d96e01c49978be0987eb2 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 28 Jun 2016 20:10:32 +0200 Subject: Request model refractoring -> use mongoose api --- server.js | 18 +-- server/initializers/database.js | 2 + server/lib/friends.js | 42 ++++-- server/lib/requestsScheduler.js | 270 ------------------------------------- server/models/pods.js | 7 +- server/models/request.js | 280 +++++++++++++++++++++++++++++++++++++++ server/models/requests.js | 73 ---------- server/tests/api/multiplePods.js | 2 +- 8 files changed, 327 insertions(+), 367 deletions(-) delete mode 100644 server/lib/requestsScheduler.js create mode 100644 server/models/request.js delete mode 100644 server/models/requests.js diff --git a/server.js b/server.js index 63aeb7145..33e34019d 100644 --- a/server.js +++ b/server.js @@ -21,24 +21,26 @@ if (miss.length !== 0) { throw new Error('Miss some configurations keys : ' + miss) } -// ----------- PeerTube modules ----------- +// ----------- Database ----------- const config = require('config') const constants = require('./server/initializers/constants') -const customValidators = require('./server/helpers/customValidators') const database = require('./server/initializers/database') -const installer = require('./server/initializers/installer') const logger = require('./server/helpers/logger') -const poolRequests = require('./server/lib/requestsScheduler') + +database.connect() + +// ----------- PeerTube modules ----------- +const customValidators = require('./server/helpers/customValidators') +const installer = require('./server/initializers/installer') +const mongoose = require('mongoose') const routes = require('./server/controllers') const utils = require('./server/helpers/utils') const webtorrent = require('./server/lib/webtorrent') +const Request = mongoose.model('Request') // Get configurations const port = config.get('listen.port') -// ----------- Database ----------- -database.connect() - // ----------- Command line ----------- // ----------- App ----------- @@ -135,7 +137,7 @@ installer.installApplication(function (err) { // ----------- Make the server listening ----------- server.listen(port, function () { // Activate the pool requests - poolRequests.activate() + Request.activate() // videos.seedAllExisting(function () { logger.info('Seeded all the videos') diff --git a/server/initializers/database.js b/server/initializers/database.js index 5932a978b..d0d969663 100644 --- a/server/initializers/database.js +++ b/server/initializers/database.js @@ -7,6 +7,8 @@ const logger = require('../helpers/logger') // Bootstrap models require('../models/video') +// Request model needs Video model +require('../models/request') const dbname = 'peertube' + config.get('database.suffix') const host = config.get('database.host') diff --git a/server/lib/friends.js b/server/lib/friends.js index 91cd69f86..617cc1ab4 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -10,12 +10,12 @@ const constants = require('../initializers/constants') const logger = require('../helpers/logger') const peertubeCrypto = require('../helpers/peertubeCrypto') const Pods = require('../models/pods') -const requestsScheduler = require('../lib/requestsScheduler') const requests = require('../helpers/requests') const http = config.get('webserver.https') ? 'https' : 'http' const host = config.get('webserver.host') const port = config.get('webserver.port') +const Request = mongoose.model('Request') const Video = mongoose.model('Video') const pods = { @@ -29,10 +29,7 @@ const pods = { } function addVideoToFriends (video) { - // ensure namePath is null - video.namePath = null - - requestsScheduler.addRequest('add', video) + createRequest('add', video) } function hasFriends (callback) { @@ -76,9 +73,9 @@ function makeFriends (callback) { function quitFriends (callback) { // Stop pool requests - requestsScheduler.deactivate() + Request.deactivate() // Flush pool requests - requestsScheduler.flush() + Request.flush() async.waterfall([ function getPodsList (callbackAsync) { @@ -127,7 +124,7 @@ function quitFriends (callback) { } ], function (err) { // Don't forget to re activate the scheduler, even if there was an error - requestsScheduler.activate() + Request.activate() if (err) return callback(err) @@ -136,8 +133,8 @@ function quitFriends (callback) { }) } -function removeVideoToFriends (video) { - requestsScheduler.addRequest('remove', video) +function removeVideoToFriends (videoParams) { + createRequest('remove', videoParams) } function sendOwnedVideosToPod (podId) { @@ -155,7 +152,7 @@ function sendOwnedVideosToPod (podId) { return } - requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) + createRequest('add', remoteVideo, [ podId ]) }) }) }) @@ -211,9 +208,9 @@ function getForeignPodsList (url, callback) { function makeRequestsToWinningPods (cert, podsList, callback) { // Stop pool requests - requestsScheduler.deactivate() + Request.deactivate() // Flush pool requests - requestsScheduler.forceSend() + Request.forceSend() async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { const params = { @@ -249,9 +246,26 @@ function makeRequestsToWinningPods (cert, podsList, callback) { }, function endRequests () { // Final callback, we've ended all the requests // Now we made new friends, we can re activate the pool of requests - requestsScheduler.activate() + Request.activate() logger.debug('makeRequestsToWinningPods finished.') return callback() }) } + +function createRequest (type, data, to) { + const req = new Request({ + request: { + type: type, + data: data + } + }) + + if (to) { + req.to = to + } + + req.save(function (err) { + if (err) logger.error('Cannot save the request.', { error: err }) + }) +} diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js deleted file mode 100644 index b192d8299..000000000 --- a/server/lib/requestsScheduler.js +++ /dev/null @@ -1,270 +0,0 @@ -'use strict' - -const async = require('async') -const map = require('lodash/map') -const mongoose = require('mongoose') - -const constants = require('../initializers/constants') -const logger = require('../helpers/logger') -const Pods = require('../models/pods') -const Requests = require('../models/requests') -const requests = require('../helpers/requests') - -const Video = mongoose.model('Video') - -let timer = null - -const requestsScheduler = { - activate: activate, - addRequest: addRequest, - addRequestTo: addRequestTo, - deactivate: deactivate, - flush: flush, - forceSend: forceSend -} - -function activate () { - logger.info('Requests scheduler activated.') - timer = setInterval(makeRequests, constants.INTERVAL) -} - -// Add request to the scheduler -function addRequest (type, data) { - logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) - - const request = { - type: type, - data: data - } - - Pods.listAllIds(function (err, podIds) { - if (err) { - logger.debug('Cannot list pod ids.') - return - } - - // No friends - if (!podIds) return - - Requests.create(request, podIds, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - }) - }) -} - -function addRequestTo (podIds, type, data) { - const request = { - type: type, - data: data - } - - Requests.create(request, podIds, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - }) -} - -function deactivate () { - logger.info('Requests scheduler deactivated.') - clearInterval(timer) -} - -function flush () { - Requests.removeAll(function (err) { - if (err) { - logger.error('Cannot flush the requests.', { error: err }) - } - }) -} - -function forceSend () { - logger.info('Force requests scheduler sending.') - makeRequests() -} - -// --------------------------------------------------------------------------- - -module.exports = requestsScheduler - -// --------------------------------------------------------------------------- - -// Make a requests to friends of a certain type -function makeRequest (toPod, requestsToMake, callback) { - if (!callback) callback = function () {} - - const params = { - toPod: toPod, - encrypt: true, // Security - sign: true, // To prove our identity - method: 'POST', - path: '/api/' + constants.API_VERSION + '/remote/videos', - data: requestsToMake // Requests we need to make - } - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeSecureRequest(params, function (err, res) { - if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { - logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) - - return callback(false) - } - - return callback(true) - }) -} - -// Make all the requests of the scheduler -function makeRequests () { - Requests.list(function (err, requests) { - if (err) { - logger.error('Cannot get the list of requests.', { err: err }) - return // Abort - } - - // If there are no requests, abort - if (requests.length === 0) { - logger.info('No requests to make.') - return - } - - logger.info('Making requests to friends.') - - // Requests by pods id - const requestsToMake = {} - - requests.forEach(function (poolRequest) { - poolRequest.to.forEach(function (toPodId) { - if (!requestsToMake[toPodId]) { - requestsToMake[toPodId] = { - ids: [], - datas: [] - } - } - - requestsToMake[toPodId].ids.push(poolRequest._id) - requestsToMake[toPodId].datas.push(poolRequest.request) - }) - }) - - const goodPods = [] - const badPods = [] - - async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { - const requestToMake = requestsToMake[toPodId] - - // FIXME: mongodb request inside a loop :/ - Pods.findById(toPodId, function (err, toPod) { - if (err) return logger.error('Error finding pod by id.', { err: err }) - - // Maybe the pod is not our friend anymore so simply remove them - if (!toPod) { - Requests.removePodOf(requestToMake.ids, toPodId) - return callbackEach() - } - - makeRequest(toPod, requestToMake.datas, function (success) { - if (err) { - logger.error('Errors when sent request to %s.', toPod.url, { error: err }) - // Do not stop the process just for one error - return callbackEach() - } - - if (success === true) { - logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) - - // Remove the pod id of these request ids - Requests.removePodOf(requestToMake.ids, toPodId) - goodPods.push(toPodId) - } else { - badPods.push(toPodId) - } - - callbackEach() - }) - }) - }, function () { - // All the requests were made, we update the pods score - updatePodsScore(goodPods, badPods) - // Flush requests with no pod - Requests.removeWithEmptyTo() - }) - }) -} - -// Remove pods with a score of 0 (too many requests where they were unreachable) -function removeBadPods () { - async.waterfall([ - function findBadPods (callback) { - Pods.findBadPods(function (err, pods) { - if (err) { - logger.error('Cannot find bad pods.', { error: err }) - return callback(err) - } - - return callback(null, pods) - }) - }, - - function listVideosOfTheseBadPods (pods, callback) { - if (pods.length === 0) return callback(null) - - const urls = map(pods, 'url') - const ids = map(pods, '_id') - - Video.listByUrls(urls, function (err, videosList) { - if (err) { - logger.error('Cannot list videos urls.', { error: err, urls: urls }) - return callback(null, ids, []) - } - - return callback(null, ids, videosList) - }) - }, - - function removeVideosOfTheseBadPods (podIds, videosList, callback) { - // We don't have to remove pods, skip - if (typeof podIds === 'function') return podIds(null) - - async.each(videosList, function (video, callbackEach) { - video.remove(callbackEach) - }, function (err) { - if (err) { - // Don't stop the process - logger.error('Error while removing videos of bad pods.', { error: err }) - return - } - - return callback(null, podIds) - }) - }, - - function removeBadPodsFromDB (podIds, callback) { - // We don't have to remove pods, skip - if (typeof podIds === 'function') return podIds(null) - - Pods.removeAllByIds(podIds, callback) - } - ], function (err, removeResult) { - if (err) { - logger.error('Cannot remove bad pods.', { error: err }) - } else if (removeResult) { - const podsRemoved = removeResult.result.n - logger.info('Removed %d pods.', podsRemoved) - } else { - logger.info('No need to remove bad pods.') - } - }) -} - -function updatePodsScore (goodPods, badPods) { - logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) - - Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { - if (err) logger.error('Cannot increment scores of good pods.') - }) - - Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { - if (err) logger.error('Cannot decrement scores of bad pods.') - removeBadPods() - }) -} diff --git a/server/models/pods.js b/server/models/pods.js index daeadeb07..9502d92e4 100644 --- a/server/models/pods.js +++ b/server/models/pods.js @@ -1,6 +1,7 @@ 'use strict' const mongoose = require('mongoose') +const map = require('lodash/map') const constants = require('../initializers/constants') const logger = require('../helpers/logger') @@ -76,7 +77,11 @@ function list (callback) { } function listAllIds (callback) { - return PodsDB.find({}, { _id: 1 }, callback) + return PodsDB.find({}, { _id: 1 }, function (err, pods) { + if (err) return callback(err) + + return callback(null, map(pods, '_id')) + }) } function listAllUrls (callback) { diff --git a/server/models/request.js b/server/models/request.js new file mode 100644 index 000000000..2a407388a --- /dev/null +++ b/server/models/request.js @@ -0,0 +1,280 @@ +'use strict' + +const async = require('async') +const map = require('lodash/map') +const mongoose = require('mongoose') + +const constants = require('../initializers/constants') +const logger = require('../helpers/logger') +const Pods = require('../models/pods') +const requests = require('../helpers/requests') + +const Video = mongoose.model('Video') + +let timer = null + +// --------------------------------------------------------------------------- + +const RequestSchema = mongoose.Schema({ + request: mongoose.Schema.Types.Mixed, + to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ] +}) + +RequestSchema.statics = { + activate, + deactivate, + flush, + forceSend +} + +RequestSchema.pre('save', function (next) { + const self = this + + if (self.to.length === 0) { + Pods.listAllIds(function (err, podIds) { + if (err) return next(err) + + // No friends + if (podIds.length === 0) return + + self.to = podIds + return next() + }) + } else { + return next() + } +}) + +mongoose.model('Request', RequestSchema) + +// ------------------------------ STATICS ------------------------------ + +function activate () { + logger.info('Requests scheduler activated.') + timer = setInterval(makeRequests.bind(this), constants.INTERVAL) +} + +function deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(timer) +} + +function flush () { + removeAll.call(this, function (err) { + if (err) logger.error('Cannot flush the requests.', { error: err }) + }) +} + +function forceSend () { + logger.info('Force requests scheduler sending.') + makeRequests.call(this) +} + +// --------------------------------------------------------------------------- + +// Make a requests to friends of a certain type +function makeRequest (toPod, requestsToMake, callback) { + if (!callback) callback = function () {} + + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) + } + + return callback(true) + }) +} + +// Make all the requests of the scheduler +function makeRequests () { + const self = this + + list.call(self, function (err, requests) { + if (err) { + logger.error('Cannot get the list of requests.', { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No requests to make.') + return + } + + logger.info('Making requests to friends.') + + // Requests by pods id + const requestsToMake = {} + + requests.forEach(function (poolRequest) { + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) + }) + + const goodPods = [] + const badPods = [] + + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) { + logger.error('Error finding pod by id.', { err: err }) + return callbackEach() + } + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + removePodOf.call(self, requestToMake.ids, toPodId) + return callbackEach() + } + + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + removePodOf.call(self, requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } + + callbackEach() + }) + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + removeWithEmptyTo.call(self) + }) + }) +} + +// Remove pods with a score of 0 (too many requests where they were unreachable) +function removeBadPods () { + async.waterfall([ + function findBadPods (callback) { + Pods.findBadPods(function (err, pods) { + if (err) { + logger.error('Cannot find bad pods.', { error: err }) + return callback(err) + } + + return callback(null, pods) + }) + }, + + function listVideosOfTheseBadPods (pods, callback) { + if (pods.length === 0) return callback(null) + + const urls = map(pods, 'url') + const ids = map(pods, '_id') + + Video.listByUrls(urls, function (err, videosList) { + if (err) { + logger.error('Cannot list videos urls.', { error: err, urls: urls }) + return callback(null, ids, []) + } + + return callback(null, ids, videosList) + }) + }, + + function removeVideosOfTheseBadPods (podIds, videosList, callback) { + // We don't have to remove pods, skip + if (typeof podIds === 'function') return podIds(null) + + async.each(videosList, function (video, callbackEach) { + video.remove(callbackEach) + }, function (err) { + if (err) { + // Don't stop the process + logger.error('Error while removing videos of bad pods.', { error: err }) + return + } + + return callback(null, podIds) + }) + }, + + function removeBadPodsFromDB (podIds, callback) { + // We don't have to remove pods, skip + if (typeof podIds === 'function') return podIds(null) + + Pods.removeAllByIds(podIds, callback) + } + ], function (err, removeResult) { + if (err) { + logger.error('Cannot remove bad pods.', { error: err }) + } else if (removeResult) { + const podsRemoved = removeResult.result.n + logger.info('Removed %d pods.', podsRemoved) + } else { + logger.info('No need to remove bad pods.') + } + }) +} + +function updatePodsScore (goodPods, badPods) { + logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) + + Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { + if (err) logger.error('Cannot increment scores of good pods.') + }) + + Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { + if (err) logger.error('Cannot decrement scores of bad pods.') + removeBadPods() + }) +} + +function list (callback) { + this.find({ }, { _id: 1, request: 1, to: 1 }, callback) +} + +function removeAll (callback) { + this.remove({ }, callback) +} + +function removePodOf (requestsIds, podId, callback) { + if (!callback) callback = function () {} + + this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) +} + +function removeWithEmptyTo (callback) { + if (!callback) callback = function () {} + + this.remove({ to: { $size: 0 } }, callback) +} diff --git a/server/models/requests.js b/server/models/requests.js deleted file mode 100644 index e67ccad56..000000000 --- a/server/models/requests.js +++ /dev/null @@ -1,73 +0,0 @@ -'use strict' - -const mongoose = require('mongoose') - -const logger = require('../helpers/logger') - -// --------------------------------------------------------------------------- - -const requestsSchema = mongoose.Schema({ - request: mongoose.Schema.Types.Mixed, - to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ] -}) -const RequestsDB = mongoose.model('requests', requestsSchema) - -// --------------------------------------------------------------------------- - -const Requests = { - create: create, - findById: findById, - list: list, - removeAll: removeAll, - removePodOf: removePodOf, - removeRequestById: removeRequestById, - removeRequests: removeRequests, - removeWithEmptyTo: removeWithEmptyTo -} - -function create (request, to, callback) { - RequestsDB.create({ request: request, to: to }, callback) -} - -function findById (id, callback) { - RequestsDB.findOne({ id: id }, callback) -} - -function list (callback) { - RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback) -} - -function removeAll (callback) { - RequestsDB.remove({ }, callback) -} - -function removePodOf (requestsIds, podId, callback) { - if (!callback) callback = function () {} - - RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) -} - -function removeRequestById (id, callback) { - RequestsDB.remove({ id: id }, callback) -} - -function removeRequests (ids) { - RequestsDB.remove({ _id: { $in: ids } }, function (err) { - if (err) { - logger.error('Cannot remove requests from the requests database.', { error: err }) - return // Abort - } - - logger.info('Pool requests flushed.') - }) -} - -function removeWithEmptyTo (callback) { - if (!callback) callback = function () {} - - RequestsDB.remove({ to: { $size: 0 } }, callback) -} - -// --------------------------------------------------------------------------- - -module.exports = Requests diff --git a/server/tests/api/multiplePods.js b/server/tests/api/multiplePods.js index 52dfda137..2a1bc64e6 100644 --- a/server/tests/api/multiplePods.js +++ b/server/tests/api/multiplePods.js @@ -414,7 +414,7 @@ describe('Test multiple pods', function () { // Keep the logs if the test failed if (this.ok) { - // utils.flushTests(done) + utils.flushTests(done) } else { done() } -- cgit v1.2.3