X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Fmodels%2Frequest.js;h=baa26fc1b2576f7919df03291007cc7bd72a5a2d;hb=1e4b0080ff1b4e802f71ec1f4cbf8cbcc70cdcbd;hp=70aa32610b174d531198320957e4fb09e6396352;hpb=7920c273a204e2469416a30b752b12ccd3160102;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/models/request.js b/server/models/request.js index 70aa32610..baa26fc1b 100644 --- a/server/models/request.js +++ b/server/models/request.js @@ -3,6 +3,7 @@ const each = require('async/each') const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') +const values = require('lodash/values') const constants = require('../initializers/constants') const logger = require('../helpers/logger') @@ -17,11 +18,12 @@ module.exports = function (sequelize, DataTypes) { const Request = sequelize.define('Request', { request: { - type: DataTypes.JSON + type: DataTypes.JSON, + allowNull: false }, endpoint: { - // TODO: enum? - type: DataTypes.STRING + type: DataTypes.ENUM(values(constants.REQUEST_ENDPOINTS)), + allowNull: false } }, { @@ -116,13 +118,8 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { // The function fire some useful callbacks requests.makeSecureRequest(params, function (err, res) { if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { - logger.error( - 'Error sending secure request to %s pod.', - toPod.host, - { - error: err || new Error('Status code not 20x : ' + res.statusCode) - } - ) + err = err ? err.message : 'Status code not 20x : ' + res.statusCode + logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) return callback(false) } @@ -136,9 +133,9 @@ function makeRequests () { const self = this const RequestToPod = this.sequelize.models.RequestToPod - // We limit the size of the requests (REQUESTS_LIMIT) + // We limit the size of the requests // We don't want to stuck with the same failing requests so we get a random list - listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { + listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { if (err) { logger.error('Cannot get the list of requests.', { err: err }) return // Abort @@ -150,63 +147,37 @@ function makeRequests () { return } - logger.info('Making requests to friends.') - // We want to group requests by destinations pod and endpoint - const requestsToMakeGrouped = {} - - requests.forEach(function (request) { - request.Pods.forEach(function (toPod) { - const hashKey = toPod.id + request.endpoint - if (!requestsToMakeGrouped[hashKey]) { - requestsToMakeGrouped[hashKey] = { - toPodId: toPod.id, - endpoint: request.endpoint, - ids: [], // request ids, to delete them from the DB in the future - datas: [] // requests data, - } - } + const requestsToMakeGrouped = buildRequestObjects(requests) - requestsToMakeGrouped[hashKey].ids.push(request.id) - requestsToMakeGrouped[hashKey].datas.push(request.request) - }) - }) + logger.info('Making requests to friends.') const goodPods = [] const badPods = [] eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod - // FIXME: SQL request inside a loop :/ - self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) { - if (err) { - logger.error('Error finding pod by id.', { err: err }) - return callbackEach() - } + // Maybe the pod is not our friend anymore so simply remove it + if (!toPod) { + const requestIdsToDelete = requestToMake.ids - // Maybe the pod is not our friend anymore so simply remove it - if (!toPod) { - const requestIdsToDelete = requestToMake.ids + logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) + return RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) + } - logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) - RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) + makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { + if (success === false) { + badPods.push(requestToMake.toPod.id) return callbackEach() } - makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { - if (success === true) { - logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) - goodPods.push(requestToMake.toPodId) - - // Remove the pod id of these request ids - RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach) - } else { - badPods.push(requestToMake.toPodId) - callbackEach() - } - }) + // Remove the pod id of these request ids + RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) }) }, function () { // All the requests were made, we update the pods score @@ -219,6 +190,32 @@ function makeRequests () { }) } +function buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(function (toPodId) { + requests[toPodId].forEach(function (data) { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: request.endpoint, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data, + } + } + + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].datas.push(request.request) + }) + }) + + return requestsToMakeGrouped +} + // Remove pods with a score of 0 (too many requests where they were unreachable) function removeBadPods () { const self = this @@ -261,41 +258,72 @@ function updatePodsScore (goodPods, badPods) { if (goodPods.length !== 0) { Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { - if (err) logger.error('Cannot increment scores of good pods.') + if (err) logger.error('Cannot increment scores of good pods.', { error: err }) }) } if (badPods.length !== 0) { Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { - if (err) logger.error('Cannot decrement scores of bad pods.') + if (err) logger.error('Cannot decrement scores of bad pods.', { error: err }) removeBadPods.call(self) }) } } -function listWithLimitAndRandom (limit, callback) { +function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { const self = this + const Pod = this.sequelize.models.Pod - self.count().asCallback(function (err, count) { + Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) { if (err) return callback(err) - // Optimization... - if (count === 0) return callback(null, []) - - let start = Math.floor(Math.random() * count) - limit - if (start < 0) start = 0 + // We don't have friends that have requests + if (podIds.length === 0) return callback(null, []) + // The the first x requests of these pods + // It is very important to sort by id ASC to keep the requests order! const query = { order: [ [ 'id', 'ASC' ] ], - offset: start, - limit: limit, - include: [ this.sequelize.models.Pod ] + include: [ + { + model: self.sequelize.models.Pod, + where: { + id: { + $in: podIds + } + } + } + ] } - self.findAll(query).asCallback(callback) + self.findAll(query).asCallback(function (err, requests) { + if (err) return callback(err) + + const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod) + return callback(err, requestsGrouped) + }) + }) +} + +function groupAndTruncateRequests (requests, limitRequestsPerPod) { + const requestsGrouped = {} + + requests.forEach(function (request) { + request.Pods.forEach(function (pod) { + if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] + + if (requestsGrouped[pod.id].length < limitRequestsPerPod) { + requestsGrouped[pod.id].push({ + request, + pod + }) + } + }) }) + + return requestsGrouped } function removeAll (callback) {