From bd14d16a29e2f90805d04b48378188517741a071 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 10 Jan 2017 22:24:42 +0100 Subject: Server: improve requests scheduler --- server/initializers/constants.js | 9 ++-- server/lib/friends.js | 16 +++++- server/models/pod.js | 37 +++++++++++++ server/models/request.js | 110 ++++++++++++++++++++++++--------------- 4 files changed, 125 insertions(+), 47 deletions(-) diff --git a/server/initializers/constants.js b/server/initializers/constants.js index a6adb75bf..97e3c5296 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js @@ -108,8 +108,10 @@ let REQUESTS_INTERVAL = 600000 // Number of requests in parallel we can make const REQUESTS_IN_PARALLEL = 10 -// How many requests we put in request -const REQUESTS_LIMIT = 10 +// To how many pods we send requests +const REQUESTS_LIMIT_PODS = 10 +// How many requests we send to a pod per interval +const REQUESTS_LIMIT_PER_POD = 5 // Number of requests to retry for replay requests module const RETRY_REQUESTS = 5 @@ -184,7 +186,8 @@ module.exports = { REQUEST_ENDPOINTS, REQUESTS_IN_PARALLEL, REQUESTS_INTERVAL, - REQUESTS_LIMIT, + REQUESTS_LIMIT_PODS, + REQUESTS_LIMIT_PER_POD, RETRY_REQUESTS, SEARCHABLE_COLUMNS, SIGNATURE_ALGORITHM, diff --git a/server/lib/friends.js b/server/lib/friends.js index 3d3d0fdee..f0575ff2f 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -54,7 +54,13 @@ function removeVideoToFriends (videoParams) { } function reportAbuseVideoToFriend (reportData, video) { - createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ]) + const options = { + type: 'report-abuse', + endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, + data: reportData, + toIds: [ video.Author.podId ] + } + createRequest(options) } function hasFriends (callback) { @@ -161,7 +167,13 @@ function sendOwnedVideosToPod (podId) { return } - createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ]) + const options = { + type: 'add', + endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, + data: remoteVideo, + toIds: [ podId ] + } + createRequest(options) }) }) }) diff --git a/server/models/pod.js b/server/models/pod.js index 8e7dd1fd8..b3c6db8e8 100644 --- a/server/models/pod.js +++ b/server/models/pod.js @@ -50,6 +50,7 @@ module.exports = function (sequelize, DataTypes) { incrementScores, list, listAllIds, + listRandomPodIdsWithRequest, listBadPods, load, loadByHost, @@ -134,6 +135,42 @@ function listAllIds (transaction, callback) { }) } +function listRandomPodIdsWithRequest (limit, callback) { + const self = this + + self.count().asCallback(function (err, count) { + if (err) return callback(err) + + // Optimization... + if (count === 0) return callback(null, []) + + let start = Math.floor(Math.random() * count) - limit + if (start < 0) start = 0 + + const query = { + attributes: [ 'id' ], + order: [ + [ 'id', 'ASC' ] + ], + offset: start, + limit: limit, + where: { + id: { + $in: [ + this.sequelize.literal('SELECT "podId" FROM "RequestToPods"') + ] + } + } + } + + return this.findAll(query).asCallback(function (err, pods) { + if (err) return callback(err) + + return callback(null, map(pods, 'id')) + }) + }) +} + function listBadPods (callback) { const query = { where: { diff --git a/server/models/request.js b/server/models/request.js index 1d6038044..26953e5f5 100644 --- a/server/models/request.js +++ b/server/models/request.js @@ -138,9 +138,9 @@ function makeRequests () { const self = this const RequestToPod = this.sequelize.models.RequestToPod - // We limit the size of the requests (REQUESTS_LIMIT) + // We limit the size of the requests // We don't want to stuck with the same failing requests so we get a random list - listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { + listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { if (err) { logger.error('Cannot get the list of requests.', { err: err }) return // Abort @@ -156,13 +156,15 @@ function makeRequests () { // We want to group requests by destinations pod and endpoint const requestsToMakeGrouped = {} + Object.keys(requests).forEach(function (toPodId) { + requests[toPodId].forEach(function (data) { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint - requests.forEach(function (request) { - request.Pods.forEach(function (toPod) { - const hashKey = toPod.id + request.endpoint if (!requestsToMakeGrouped[hashKey]) { requestsToMakeGrouped[hashKey] = { - toPodId: toPod.id, + toPod: pod, endpoint: request.endpoint, ids: [], // request ids, to delete them from the DB in the future datas: [] // requests data, @@ -179,36 +181,29 @@ function makeRequests () { eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod - // FIXME: SQL request inside a loop :/ - self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) { - if (err) { - logger.error('Error finding pod by id.', { err: err }) - return callbackEach() - } - - // Maybe the pod is not our friend anymore so simply remove it - if (!toPod) { - const requestIdsToDelete = requestToMake.ids + // Maybe the pod is not our friend anymore so simply remove it + if (!toPod) { + const requestIdsToDelete = requestToMake.ids - logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) - RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) - return callbackEach() - } + logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) + RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id) + return callbackEach() + } - makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { - if (success === true) { - logger.debug('Removing requests for pod %s.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) + makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { + if (success === true) { + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) - goodPods.push(requestToMake.toPodId) + goodPods.push(requestToMake.toPod.id) - // Remove the pod id of these request ids - RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach) - } else { - badPods.push(requestToMake.toPodId) - callbackEach() - } - }) + // Remove the pod id of these request ids + RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) + } else { + badPods.push(requestToMake.toPod.id) + callbackEach() + } }) }, function () { // All the requests were made, we update the pods score @@ -275,29 +270,60 @@ function updatePodsScore (goodPods, badPods) { } } -function listWithLimitAndRandom (limit, callback) { +function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { const self = this + const Pod = this.sequelize.models.Pod - self.count().asCallback(function (err, count) { + Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) { if (err) return callback(err) - // Optimization... - if (count === 0) return callback(null, []) - - let start = Math.floor(Math.random() * count) - limit - if (start < 0) start = 0 + // We don't have friends that have requests + if (podIds.length === 0) return callback(null, []) + // The the first x requests of these pods + // It is very important to sort by id ASC to keep the requests order! const query = { order: [ [ 'id', 'ASC' ] ], - // offset: start, - // limit: limit, - include: [ this.sequelize.models.Pod ] + include: [ + { + model: self.sequelize.models.Pod, + where: { + id: { + $in: podIds + } + } + } + ] } - self.findAll(query).asCallback(callback) + self.findAll(query).asCallback(function (err, requests) { + if (err) return callback(err) + + const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod) + return callback(err, requestsGrouped) + }) + }) +} + +function groupAndTruncateRequests (requests, limitRequestsPerPod) { + const requestsGrouped = {} + + requests.forEach(function (request) { + request.Pods.forEach(function (pod) { + if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] + + if (requestsGrouped[pod.id].length < limitRequestsPerPod) { + requestsGrouped[pod.id].push({ + request, + pod + }) + } + }) }) + + return requestsGrouped } function removeAll (callback) { -- cgit v1.2.3