diff options
-rw-r--r-- | server.js | 5 | ||||
-rw-r--r-- | server/lib/friends.js | 59 | ||||
-rw-r--r-- | server/lib/request-scheduler.js | 202 | ||||
-rw-r--r-- | server/models/request.js | 208 |
4 files changed, 247 insertions, 227 deletions
@@ -37,6 +37,7 @@ if (errorMessage !== null) { | |||
37 | 37 | ||
38 | // ----------- PeerTube modules ----------- | 38 | // ----------- PeerTube modules ----------- |
39 | const customValidators = require('./server/helpers/custom-validators') | 39 | const customValidators = require('./server/helpers/custom-validators') |
40 | const friends = require('./server/lib/friends') | ||
40 | const installer = require('./server/initializers/installer') | 41 | const installer = require('./server/initializers/installer') |
41 | const migrator = require('./server/initializers/migrator') | 42 | const migrator = require('./server/initializers/migrator') |
42 | const routes = require('./server/controllers') | 43 | const routes = require('./server/controllers') |
@@ -128,8 +129,8 @@ installer.installApplication(function (err) { | |||
128 | 129 | ||
129 | // ----------- Make the server listening ----------- | 130 | // ----------- Make the server listening ----------- |
130 | server.listen(port, function () { | 131 | server.listen(port, function () { |
131 | // Activate the pool requests | 132 | // Activate the communication with friends |
132 | db.Request.activate() | 133 | friends.activate() |
133 | 134 | ||
134 | logger.info('Server listening on port %d', port) | 135 | logger.info('Server listening on port %d', port) |
135 | logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL) | 136 | logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL) |
diff --git a/server/lib/friends.js b/server/lib/friends.js index 9b38693c7..7dfa62a2a 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -11,10 +11,13 @@ const db = require('../initializers/database') | |||
11 | const logger = require('../helpers/logger') | 11 | const logger = require('../helpers/logger') |
12 | const peertubeCrypto = require('../helpers/peertube-crypto') | 12 | const peertubeCrypto = require('../helpers/peertube-crypto') |
13 | const requests = require('../helpers/requests') | 13 | const requests = require('../helpers/requests') |
14 | const RequestScheduler = require('./request-scheduler') | ||
14 | 15 | ||
15 | const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] | 16 | const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] |
17 | const requestScheduler = new RequestScheduler('') | ||
16 | 18 | ||
17 | const friends = { | 19 | const friends = { |
20 | activate, | ||
18 | addVideoToFriends, | 21 | addVideoToFriends, |
19 | updateVideoToFriends, | 22 | updateVideoToFriends, |
20 | reportAbuseVideoToFriend, | 23 | reportAbuseVideoToFriend, |
@@ -25,6 +28,10 @@ const friends = { | |||
25 | sendOwnedVideosToPod | 28 | sendOwnedVideosToPod |
26 | } | 29 | } |
27 | 30 | ||
31 | function activate () { | ||
32 | requestScheduler.activate() | ||
33 | } | ||
34 | |||
28 | function addVideoToFriends (videoData, transaction, callback) { | 35 | function addVideoToFriends (videoData, transaction, callback) { |
29 | const options = { | 36 | const options = { |
30 | type: ENDPOINT_ACTIONS.ADD, | 37 | type: ENDPOINT_ACTIONS.ADD, |
@@ -99,11 +106,11 @@ function makeFriends (hosts, callback) { | |||
99 | 106 | ||
100 | function quitFriends (callback) { | 107 | function quitFriends (callback) { |
101 | // Stop pool requests | 108 | // Stop pool requests |
102 | db.Request.deactivate() | 109 | requestScheduler.deactivate() |
103 | 110 | ||
104 | waterfall([ | 111 | waterfall([ |
105 | function flushRequests (callbackAsync) { | 112 | function flushRequests (callbackAsync) { |
106 | db.Request.flush(callbackAsync) | 113 | requestScheduler.flush(callbackAsync) |
107 | }, | 114 | }, |
108 | 115 | ||
109 | function getPodsList (callbackAsync) { | 116 | function getPodsList (callbackAsync) { |
@@ -140,7 +147,7 @@ function quitFriends (callback) { | |||
140 | } | 147 | } |
141 | ], function (err) { | 148 | ], function (err) { |
142 | // Don't forget to re activate the scheduler, even if there was an error | 149 | // Don't forget to re activate the scheduler, even if there was an error |
143 | db.Request.activate() | 150 | requestScheduler.activate() |
144 | 151 | ||
145 | if (err) return callback(err) | 152 | if (err) return callback(err) |
146 | 153 | ||
@@ -235,9 +242,9 @@ function getForeignPodsList (host, callback) { | |||
235 | 242 | ||
236 | function makeRequestsToWinningPods (cert, podsList, callback) { | 243 | function makeRequestsToWinningPods (cert, podsList, callback) { |
237 | // Stop pool requests | 244 | // Stop pool requests |
238 | db.Request.deactivate() | 245 | requestScheduler.deactivate() |
239 | // Flush pool requests | 246 | // Flush pool requests |
240 | db.Request.forceSend() | 247 | requestScheduler.forceSend() |
241 | 248 | ||
242 | eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { | 249 | eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { |
243 | const params = { | 250 | const params = { |
@@ -278,7 +285,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
278 | }, function endRequests () { | 285 | }, function endRequests () { |
279 | // Final callback, we've ended all the requests | 286 | // Final callback, we've ended all the requests |
280 | // Now we made new friends, we can re activate the pool of requests | 287 | // Now we made new friends, we can re activate the pool of requests |
281 | db.Request.activate() | 288 | requestScheduler.activate() |
282 | 289 | ||
283 | logger.debug('makeRequestsToWinningPods finished.') | 290 | logger.debug('makeRequestsToWinningPods finished.') |
284 | return callback() | 291 | return callback() |
@@ -289,7 +296,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
289 | // { type, endpoint, data, toIds, transaction } | 296 | // { type, endpoint, data, toIds, transaction } |
290 | function createRequest (options, callback) { | 297 | function createRequest (options, callback) { |
291 | if (!callback) callback = function () {} | 298 | if (!callback) callback = function () {} |
292 | if (options.toIds) return _createRequest(options, callback) | 299 | if (options.toIds) return requestScheduler.createRequest(options, callback) |
293 | 300 | ||
294 | // If the "toIds" pods is not specified, we send the request to all our friends | 301 | // If the "toIds" pods is not specified, we send the request to all our friends |
295 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | 302 | db.Pod.listAllIds(options.transaction, function (err, podIds) { |
@@ -299,43 +306,7 @@ function createRequest (options, callback) { | |||
299 | } | 306 | } |
300 | 307 | ||
301 | const newOptions = Object.assign(options, { toIds: podIds }) | 308 | const newOptions = Object.assign(options, { toIds: podIds }) |
302 | return _createRequest(newOptions, callback) | 309 | return requestScheduler.createRequest(newOptions, callback) |
303 | }) | ||
304 | } | ||
305 | |||
306 | // { type, endpoint, data, toIds, transaction } | ||
307 | function _createRequest (options, callback) { | ||
308 | const type = options.type | ||
309 | const endpoint = options.endpoint | ||
310 | const data = options.data | ||
311 | const toIds = options.toIds | ||
312 | const transaction = options.transaction | ||
313 | |||
314 | const pods = [] | ||
315 | |||
316 | // If there are no destination pods abort | ||
317 | if (toIds.length === 0) return callback(null) | ||
318 | |||
319 | toIds.forEach(function (toPod) { | ||
320 | pods.push(db.Pod.build({ id: toPod })) | ||
321 | }) | ||
322 | |||
323 | const createQuery = { | ||
324 | endpoint, | ||
325 | request: { | ||
326 | type: type, | ||
327 | data: data | ||
328 | } | ||
329 | } | ||
330 | |||
331 | const dbRequestOptions = { | ||
332 | transaction | ||
333 | } | ||
334 | |||
335 | return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) { | ||
336 | if (err) return callback(err) | ||
337 | |||
338 | return request.setPods(pods, dbRequestOptions).asCallback(callback) | ||
339 | }) | 310 | }) |
340 | } | 311 | } |
341 | 312 | ||
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js new file mode 100644 index 000000000..c8bc4af28 --- /dev/null +++ b/server/lib/request-scheduler.js | |||
@@ -0,0 +1,202 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../initializers/constants') | ||
6 | const db = require('../initializers/database') | ||
7 | const logger = require('../helpers/logger') | ||
8 | const requests = require('../helpers/requests') | ||
9 | |||
10 | module.exports = class RequestScheduler { | ||
11 | |||
12 | constructor (name) { | ||
13 | this.name = name | ||
14 | |||
15 | this.lastRequestTimestamp = 0 | ||
16 | this.timer = null | ||
17 | } | ||
18 | |||
19 | activate () { | ||
20 | logger.info('Requests scheduler activated.') | ||
21 | this.lastRequestTimestamp = Date.now() | ||
22 | |||
23 | this.timer = setInterval(() => { | ||
24 | this.lastRequestTimestamp = Date.now() | ||
25 | this.makeRequests() | ||
26 | }, constants.REQUESTS_INTERVAL) | ||
27 | } | ||
28 | |||
29 | deactivate () { | ||
30 | logger.info('Requests scheduler deactivated.') | ||
31 | clearInterval(this.timer) | ||
32 | this.timer = null | ||
33 | } | ||
34 | |||
35 | forceSend () { | ||
36 | logger.info('Force requests scheduler sending.') | ||
37 | this.makeRequests() | ||
38 | } | ||
39 | |||
40 | remainingMilliSeconds () { | ||
41 | if (this.timer === null) return -1 | ||
42 | |||
43 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
44 | } | ||
45 | |||
46 | // { type, endpoint, data, toIds, transaction } | ||
47 | createRequest (options, callback) { | ||
48 | const type = options.type | ||
49 | const endpoint = options.endpoint | ||
50 | const data = options.data | ||
51 | const toIds = options.toIds | ||
52 | const transaction = options.transaction | ||
53 | |||
54 | const pods = [] | ||
55 | |||
56 | // If there are no destination pods abort | ||
57 | if (toIds.length === 0) return callback(null) | ||
58 | |||
59 | toIds.forEach(toPod => { | ||
60 | pods.push(db.Pod.build({ id: toPod })) | ||
61 | }) | ||
62 | |||
63 | const createQuery = { | ||
64 | endpoint, | ||
65 | request: { | ||
66 | type: type, | ||
67 | data: data | ||
68 | } | ||
69 | } | ||
70 | |||
71 | const dbRequestOptions = { | ||
72 | transaction | ||
73 | } | ||
74 | |||
75 | return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { | ||
76 | if (err) return callback(err) | ||
77 | |||
78 | return request.setPods(pods, dbRequestOptions).asCallback(callback) | ||
79 | }) | ||
80 | } | ||
81 | |||
82 | // --------------------------------------------------------------------------- | ||
83 | |||
84 | // Make all the requests of the scheduler | ||
85 | makeRequests () { | ||
86 | // We limit the size of the requests | ||
87 | // We don't want to stuck with the same failing requests so we get a random list | ||
88 | db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { | ||
89 | if (err) { | ||
90 | logger.error('Cannot get the list of requests.', { err: err }) | ||
91 | return // Abort | ||
92 | } | ||
93 | |||
94 | // If there are no requests, abort | ||
95 | if (requests.length === 0) { | ||
96 | logger.info('No requests to make.') | ||
97 | return | ||
98 | } | ||
99 | |||
100 | // We want to group requests by destinations pod and endpoint | ||
101 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
102 | |||
103 | logger.info('Making requests to friends.') | ||
104 | |||
105 | const goodPods = [] | ||
106 | const badPods = [] | ||
107 | |||
108 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
109 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
110 | const toPod = requestToMake.toPod | ||
111 | |||
112 | // Maybe the pod is not our friend anymore so simply remove it | ||
113 | if (!toPod) { | ||
114 | const requestIdsToDelete = requestToMake.ids | ||
115 | |||
116 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
117 | return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
118 | } | ||
119 | |||
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { | ||
121 | if (success === false) { | ||
122 | badPods.push(requestToMake.toPod.id) | ||
123 | return callbackEach() | ||
124 | } | ||
125 | |||
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
127 | goodPods.push(requestToMake.toPod.id) | ||
128 | |||
129 | // Remove the pod id of these request ids | ||
130 | db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
131 | }) | ||
132 | }, () => { | ||
133 | // All the requests were made, we update the pods score | ||
134 | db.Request.updatePodsScore(goodPods, badPods) | ||
135 | // Flush requests with no pod | ||
136 | db.Request.removeWithEmptyTo(err => { | ||
137 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
138 | }) | ||
139 | }) | ||
140 | }) | ||
141 | } | ||
142 | |||
143 | // Make a requests to friends of a certain type | ||
144 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
145 | if (!callback) callback = function () {} | ||
146 | |||
147 | const params = { | ||
148 | toPod: toPod, | ||
149 | sign: true, // Prove our identity | ||
150 | method: 'POST', | ||
151 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
152 | data: requestsToMake // Requests we need to make | ||
153 | } | ||
154 | |||
155 | // Make multiple retry requests to all of pods | ||
156 | // The function fire some useful callbacks | ||
157 | requests.makeSecureRequest(params, (err, res) => { | ||
158 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
159 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
160 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
161 | |||
162 | return callback(false) | ||
163 | } | ||
164 | |||
165 | return callback(true) | ||
166 | }) | ||
167 | } | ||
168 | |||
169 | buildRequestObjects (requests) { | ||
170 | const requestsToMakeGrouped = {} | ||
171 | |||
172 | Object.keys(requests).forEach(toPodId => { | ||
173 | requests[toPodId].forEach(data => { | ||
174 | const request = data.request | ||
175 | const pod = data.pod | ||
176 | const hashKey = toPodId + request.endpoint | ||
177 | |||
178 | if (!requestsToMakeGrouped[hashKey]) { | ||
179 | requestsToMakeGrouped[hashKey] = { | ||
180 | toPod: pod, | ||
181 | endpoint: request.endpoint, | ||
182 | ids: [], // request ids, to delete them from the DB in the future | ||
183 | datas: [] // requests data, | ||
184 | } | ||
185 | } | ||
186 | |||
187 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
188 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
189 | }) | ||
190 | }) | ||
191 | |||
192 | return requestsToMakeGrouped | ||
193 | } | ||
194 | |||
195 | flush (callback) { | ||
196 | db.Request.removeAll(err => { | ||
197 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
198 | |||
199 | return callback(err) | ||
200 | }) | ||
201 | } | ||
202 | } | ||
diff --git a/server/models/request.js b/server/models/request.js index baa26fc1b..ca616d130 100644 --- a/server/models/request.js +++ b/server/models/request.js | |||
@@ -1,16 +1,11 @@ | |||
1 | 'use strict' | 1 | 'use strict' |
2 | 2 | ||
3 | const each = require('async/each') | 3 | const each = require('async/each') |
4 | const eachLimit = require('async/eachLimit') | ||
5 | const waterfall = require('async/waterfall') | 4 | const waterfall = require('async/waterfall') |
6 | const values = require('lodash/values') | 5 | const values = require('lodash/values') |
7 | 6 | ||
8 | const constants = require('../initializers/constants') | 7 | const constants = require('../initializers/constants') |
9 | const logger = require('../helpers/logger') | 8 | const logger = require('../helpers/logger') |
10 | const requests = require('../helpers/requests') | ||
11 | |||
12 | let timer = null | ||
13 | let lastRequestTimestamp = 0 | ||
14 | 9 | ||
15 | // --------------------------------------------------------------------------- | 10 | // --------------------------------------------------------------------------- |
16 | 11 | ||
@@ -30,12 +25,13 @@ module.exports = function (sequelize, DataTypes) { | |||
30 | classMethods: { | 25 | classMethods: { |
31 | associate, | 26 | associate, |
32 | 27 | ||
33 | activate, | 28 | listWithLimitAndRandom, |
29 | |||
34 | countTotalRequests, | 30 | countTotalRequests, |
35 | deactivate, | 31 | removeBadPods, |
36 | flush, | 32 | updatePodsScore, |
37 | forceSend, | 33 | removeAll, |
38 | remainingMilliSeconds | 34 | removeWithEmptyTo |
39 | } | 35 | } |
40 | } | 36 | } |
41 | ) | 37 | ) |
@@ -56,17 +52,6 @@ function associate (models) { | |||
56 | }) | 52 | }) |
57 | } | 53 | } |
58 | 54 | ||
59 | function activate () { | ||
60 | logger.info('Requests scheduler activated.') | ||
61 | lastRequestTimestamp = Date.now() | ||
62 | |||
63 | const self = this | ||
64 | timer = setInterval(function () { | ||
65 | lastRequestTimestamp = Date.now() | ||
66 | makeRequests.call(self) | ||
67 | }, constants.REQUESTS_INTERVAL) | ||
68 | } | ||
69 | |||
70 | function countTotalRequests (callback) { | 55 | function countTotalRequests (callback) { |
71 | const query = { | 56 | const query = { |
72 | include: [ this.sequelize.models.Pod ] | 57 | include: [ this.sequelize.models.Pod ] |
@@ -75,147 +60,6 @@ function countTotalRequests (callback) { | |||
75 | return this.count(query).asCallback(callback) | 60 | return this.count(query).asCallback(callback) |
76 | } | 61 | } |
77 | 62 | ||
78 | function deactivate () { | ||
79 | logger.info('Requests scheduler deactivated.') | ||
80 | clearInterval(timer) | ||
81 | timer = null | ||
82 | } | ||
83 | |||
84 | function flush (callback) { | ||
85 | removeAll.call(this, function (err) { | ||
86 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
87 | |||
88 | return callback(err) | ||
89 | }) | ||
90 | } | ||
91 | |||
92 | function forceSend () { | ||
93 | logger.info('Force requests scheduler sending.') | ||
94 | makeRequests.call(this) | ||
95 | } | ||
96 | |||
97 | function remainingMilliSeconds () { | ||
98 | if (timer === null) return -1 | ||
99 | |||
100 | return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp) | ||
101 | } | ||
102 | |||
103 | // --------------------------------------------------------------------------- | ||
104 | |||
105 | // Make a requests to friends of a certain type | ||
106 | function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
107 | if (!callback) callback = function () {} | ||
108 | |||
109 | const params = { | ||
110 | toPod: toPod, | ||
111 | sign: true, // Prove our identity | ||
112 | method: 'POST', | ||
113 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
114 | data: requestsToMake // Requests we need to make | ||
115 | } | ||
116 | |||
117 | // Make multiple retry requests to all of pods | ||
118 | // The function fire some useful callbacks | ||
119 | requests.makeSecureRequest(params, function (err, res) { | ||
120 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
121 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
122 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
123 | |||
124 | return callback(false) | ||
125 | } | ||
126 | |||
127 | return callback(true) | ||
128 | }) | ||
129 | } | ||
130 | |||
131 | // Make all the requests of the scheduler | ||
132 | function makeRequests () { | ||
133 | const self = this | ||
134 | const RequestToPod = this.sequelize.models.RequestToPod | ||
135 | |||
136 | // We limit the size of the requests | ||
137 | // We don't want to stuck with the same failing requests so we get a random list | ||
138 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { | ||
139 | if (err) { | ||
140 | logger.error('Cannot get the list of requests.', { err: err }) | ||
141 | return // Abort | ||
142 | } | ||
143 | |||
144 | // If there are no requests, abort | ||
145 | if (requests.length === 0) { | ||
146 | logger.info('No requests to make.') | ||
147 | return | ||
148 | } | ||
149 | |||
150 | // We want to group requests by destinations pod and endpoint | ||
151 | const requestsToMakeGrouped = buildRequestObjects(requests) | ||
152 | |||
153 | logger.info('Making requests to friends.') | ||
154 | |||
155 | const goodPods = [] | ||
156 | const badPods = [] | ||
157 | |||
158 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { | ||
159 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
160 | const toPod = requestToMake.toPod | ||
161 | |||
162 | // Maybe the pod is not our friend anymore so simply remove it | ||
163 | if (!toPod) { | ||
164 | const requestIdsToDelete = requestToMake.ids | ||
165 | |||
166 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
167 | return RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
168 | } | ||
169 | |||
170 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { | ||
171 | if (success === false) { | ||
172 | badPods.push(requestToMake.toPod.id) | ||
173 | return callbackEach() | ||
174 | } | ||
175 | |||
176 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
177 | goodPods.push(requestToMake.toPod.id) | ||
178 | |||
179 | // Remove the pod id of these request ids | ||
180 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
181 | }) | ||
182 | }, function () { | ||
183 | // All the requests were made, we update the pods score | ||
184 | updatePodsScore.call(self, goodPods, badPods) | ||
185 | // Flush requests with no pod | ||
186 | removeWithEmptyTo.call(self, function (err) { | ||
187 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
188 | }) | ||
189 | }) | ||
190 | }) | ||
191 | } | ||
192 | |||
193 | function buildRequestObjects (requests) { | ||
194 | const requestsToMakeGrouped = {} | ||
195 | |||
196 | Object.keys(requests).forEach(function (toPodId) { | ||
197 | requests[toPodId].forEach(function (data) { | ||
198 | const request = data.request | ||
199 | const pod = data.pod | ||
200 | const hashKey = toPodId + request.endpoint | ||
201 | |||
202 | if (!requestsToMakeGrouped[hashKey]) { | ||
203 | requestsToMakeGrouped[hashKey] = { | ||
204 | toPod: pod, | ||
205 | endpoint: request.endpoint, | ||
206 | ids: [], // request ids, to delete them from the DB in the future | ||
207 | datas: [] // requests data, | ||
208 | } | ||
209 | } | ||
210 | |||
211 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
212 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
213 | }) | ||
214 | }) | ||
215 | |||
216 | return requestsToMakeGrouped | ||
217 | } | ||
218 | |||
219 | // Remove pods with a score of 0 (too many requests where they were unreachable) | 63 | // Remove pods with a score of 0 (too many requests where they were unreachable) |
220 | function removeBadPods () { | 64 | function removeBadPods () { |
221 | const self = this | 65 | const self = this |
@@ -307,25 +151,6 @@ function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { | |||
307 | }) | 151 | }) |
308 | } | 152 | } |
309 | 153 | ||
310 | function groupAndTruncateRequests (requests, limitRequestsPerPod) { | ||
311 | const requestsGrouped = {} | ||
312 | |||
313 | requests.forEach(function (request) { | ||
314 | request.Pods.forEach(function (pod) { | ||
315 | if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] | ||
316 | |||
317 | if (requestsGrouped[pod.id].length < limitRequestsPerPod) { | ||
318 | requestsGrouped[pod.id].push({ | ||
319 | request, | ||
320 | pod | ||
321 | }) | ||
322 | } | ||
323 | }) | ||
324 | }) | ||
325 | |||
326 | return requestsGrouped | ||
327 | } | ||
328 | |||
329 | function removeAll (callback) { | 154 | function removeAll (callback) { |
330 | // Delete all requests | 155 | // Delete all requests |
331 | this.truncate({ cascade: true }).asCallback(callback) | 156 | this.truncate({ cascade: true }).asCallback(callback) |
@@ -346,3 +171,24 @@ function removeWithEmptyTo (callback) { | |||
346 | 171 | ||
347 | this.destroy(query).asCallback(callback) | 172 | this.destroy(query).asCallback(callback) |
348 | } | 173 | } |
174 | |||
175 | // --------------------------------------------------------------------------- | ||
176 | |||
177 | function groupAndTruncateRequests (requests, limitRequestsPerPod) { | ||
178 | const requestsGrouped = {} | ||
179 | |||
180 | requests.forEach(function (request) { | ||
181 | request.Pods.forEach(function (pod) { | ||
182 | if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] | ||
183 | |||
184 | if (requestsGrouped[pod.id].length < limitRequestsPerPod) { | ||
185 | requestsGrouped[pod.id].push({ | ||
186 | request, | ||
187 | pod | ||
188 | }) | ||
189 | } | ||
190 | }) | ||
191 | }) | ||
192 | |||
193 | return requestsGrouped | ||
194 | } | ||