diff options
Diffstat (limited to 'server/lib/request-scheduler.js')
-rw-r--r-- | server/lib/request-scheduler.js | 178 |
1 files changed, 38 insertions, 140 deletions
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js index 28dabe339..6b6535519 100644 --- a/server/lib/request-scheduler.js +++ b/server/lib/request-scheduler.js | |||
@@ -1,44 +1,54 @@ | |||
1 | 'use strict' | 1 | 'use strict' |
2 | 2 | ||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../initializers/constants') | 3 | const constants = require('../initializers/constants') |
4 | const BaseRequestScheduler = require('./base-request-scheduler') | ||
6 | const db = require('../initializers/database') | 5 | const db = require('../initializers/database') |
7 | const logger = require('../helpers/logger') | 6 | const logger = require('../helpers/logger') |
8 | const requests = require('../helpers/requests') | ||
9 | 7 | ||
10 | module.exports = class RequestScheduler { | 8 | module.exports = class RequestScheduler extends BaseRequestScheduler { |
11 | 9 | ||
12 | constructor () { | 10 | constructor () { |
13 | this.lastRequestTimestamp = 0 | 11 | super() |
14 | this.timer = null | ||
15 | } | ||
16 | 12 | ||
17 | activate () { | 13 | // We limit the size of the requests |
18 | logger.info('Requests scheduler activated.') | 14 | this.limitPods = constants.REQUESTS_LIMIT_PODS |
19 | this.lastRequestTimestamp = Date.now() | 15 | this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD |
20 | 16 | ||
21 | this.timer = setInterval(() => { | 17 | this.description = 'requests' |
22 | this.lastRequestTimestamp = Date.now() | ||
23 | this.makeRequests() | ||
24 | }, constants.REQUESTS_INTERVAL) | ||
25 | } | 18 | } |
26 | 19 | ||
27 | deactivate () { | 20 | getRequestModel () { |
28 | logger.info('Requests scheduler deactivated.') | 21 | return db.Request |
29 | clearInterval(this.timer) | ||
30 | this.timer = null | ||
31 | } | 22 | } |
32 | 23 | ||
33 | forceSend () { | 24 | getRequestToPodModel () { |
34 | logger.info('Force requests scheduler sending.') | 25 | return db.RequestToPod |
35 | this.makeRequests() | ||
36 | } | 26 | } |
37 | 27 | ||
38 | remainingMilliSeconds () { | 28 | buildRequestObjects (requests) { |
39 | if (this.timer === null) return -1 | 29 | const requestsToMakeGrouped = {} |
30 | |||
31 | Object.keys(requests).forEach(toPodId => { | ||
32 | requests[toPodId].forEach(data => { | ||
33 | const request = data.request | ||
34 | const pod = data.pod | ||
35 | const hashKey = toPodId + request.endpoint | ||
36 | |||
37 | if (!requestsToMakeGrouped[hashKey]) { | ||
38 | requestsToMakeGrouped[hashKey] = { | ||
39 | toPod: pod, | ||
40 | endpoint: request.endpoint, | ||
41 | ids: [], // request ids, to delete them from the DB in the future | ||
42 | datas: [] // requests data, | ||
43 | } | ||
44 | } | ||
45 | |||
46 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
47 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
48 | }) | ||
49 | }) | ||
40 | 50 | ||
41 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | 51 | return requestsToMakeGrouped |
42 | } | 52 | } |
43 | 53 | ||
44 | // { type, endpoint, data, toIds, transaction } | 54 | // { type, endpoint, data, toIds, transaction } |
@@ -79,122 +89,10 @@ module.exports = class RequestScheduler { | |||
79 | 89 | ||
80 | // --------------------------------------------------------------------------- | 90 | // --------------------------------------------------------------------------- |
81 | 91 | ||
82 | // Make all the requests of the scheduler | 92 | afterRequestsHook () { |
83 | makeRequests () { | 93 | // Flush requests with no pod |
84 | // We limit the size of the requests | 94 | this.getRequestModel().removeWithEmptyTo(err => { |
85 | // We don't want to stuck with the same failing requests so we get a random list | 95 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) |
86 | db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { | ||
87 | if (err) { | ||
88 | logger.error('Cannot get the list of requests.', { err: err }) | ||
89 | return // Abort | ||
90 | } | ||
91 | |||
92 | // If there are no requests, abort | ||
93 | if (requests.length === 0) { | ||
94 | logger.info('No requests to make.') | ||
95 | return | ||
96 | } | ||
97 | |||
98 | // We want to group requests by destinations pod and endpoint | ||
99 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
100 | |||
101 | logger.info('Making requests to friends.') | ||
102 | |||
103 | const goodPods = [] | ||
104 | const badPods = [] | ||
105 | |||
106 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
107 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
108 | const toPod = requestToMake.toPod | ||
109 | |||
110 | // Maybe the pod is not our friend anymore so simply remove it | ||
111 | if (!toPod) { | ||
112 | const requestIdsToDelete = requestToMake.ids | ||
113 | |||
114 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
115 | return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
116 | } | ||
117 | |||
118 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { | ||
119 | if (success === false) { | ||
120 | badPods.push(requestToMake.toPod.id) | ||
121 | return callbackEach() | ||
122 | } | ||
123 | |||
124 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
125 | goodPods.push(requestToMake.toPod.id) | ||
126 | |||
127 | // Remove the pod id of these request ids | ||
128 | db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
129 | }) | ||
130 | }, () => { | ||
131 | // All the requests were made, we update the pods score | ||
132 | db.Request.updatePodsScore(goodPods, badPods) | ||
133 | // Flush requests with no pod | ||
134 | db.Request.removeWithEmptyTo(err => { | ||
135 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
136 | }) | ||
137 | }) | ||
138 | }) | ||
139 | } | ||
140 | |||
141 | // Make a requests to friends of a certain type | ||
142 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
143 | if (!callback) callback = function () {} | ||
144 | |||
145 | const params = { | ||
146 | toPod: toPod, | ||
147 | sign: true, // Prove our identity | ||
148 | method: 'POST', | ||
149 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
150 | data: requestsToMake // Requests we need to make | ||
151 | } | ||
152 | |||
153 | // Make multiple retry requests to all of pods | ||
154 | // The function fire some useful callbacks | ||
155 | requests.makeSecureRequest(params, (err, res) => { | ||
156 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
157 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
158 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
159 | |||
160 | return callback(false) | ||
161 | } | ||
162 | |||
163 | return callback(true) | ||
164 | }) | ||
165 | } | ||
166 | |||
167 | buildRequestObjects (requests) { | ||
168 | const requestsToMakeGrouped = {} | ||
169 | |||
170 | Object.keys(requests).forEach(toPodId => { | ||
171 | requests[toPodId].forEach(data => { | ||
172 | const request = data.request | ||
173 | const pod = data.pod | ||
174 | const hashKey = toPodId + request.endpoint | ||
175 | |||
176 | if (!requestsToMakeGrouped[hashKey]) { | ||
177 | requestsToMakeGrouped[hashKey] = { | ||
178 | toPod: pod, | ||
179 | endpoint: request.endpoint, | ||
180 | ids: [], // request ids, to delete them from the DB in the future | ||
181 | datas: [] // requests data, | ||
182 | } | ||
183 | } | ||
184 | |||
185 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
186 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
187 | }) | ||
188 | }) | ||
189 | |||
190 | return requestsToMakeGrouped | ||
191 | } | ||
192 | |||
193 | flush (callback) { | ||
194 | db.Request.removeAll(err => { | ||
195 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
196 | |||
197 | return callback(err) | ||
198 | }) | 96 | }) |
199 | } | 97 | } |
200 | } | 98 | } |