]>
Commit | Line | Data |
---|---|---|
1 | 'use strict' | |
2 | ||
3 | const each = require('async/each') | |
4 | const eachLimit = require('async/eachLimit') | |
5 | const waterfall = require('async/waterfall') | |
6 | ||
7 | const constants = require('../initializers/constants') | |
8 | const logger = require('../helpers/logger') | |
9 | const requests = require('../helpers/requests') | |
10 | ||
11 | let timer = null | |
12 | let lastRequestTimestamp = 0 | |
13 | ||
14 | // --------------------------------------------------------------------------- | |
15 | ||
16 | module.exports = function (sequelize, DataTypes) { | |
17 | const Request = sequelize.define('Request', | |
18 | { | |
19 | request: { | |
20 | type: DataTypes.JSON | |
21 | }, | |
22 | endpoint: { | |
23 | // TODO: enum? | |
24 | type: DataTypes.STRING | |
25 | } | |
26 | }, | |
27 | { | |
28 | classMethods: { | |
29 | associate, | |
30 | ||
31 | activate, | |
32 | countTotalRequests, | |
33 | deactivate, | |
34 | flush, | |
35 | forceSend, | |
36 | remainingMilliSeconds | |
37 | } | |
38 | } | |
39 | ) | |
40 | ||
41 | return Request | |
42 | } | |
43 | ||
44 | // ------------------------------ STATICS ------------------------------ | |
45 | ||
46 | function associate (models) { | |
47 | this.belongsToMany(models.Pod, { | |
48 | foreignKey: { | |
49 | name: 'requestId', | |
50 | allowNull: false | |
51 | }, | |
52 | through: models.RequestToPod, | |
53 | onDelete: 'CASCADE' | |
54 | }) | |
55 | } | |
56 | ||
57 | function activate () { | |
58 | logger.info('Requests scheduler activated.') | |
59 | lastRequestTimestamp = Date.now() | |
60 | ||
61 | const self = this | |
62 | timer = setInterval(function () { | |
63 | lastRequestTimestamp = Date.now() | |
64 | makeRequests.call(self) | |
65 | }, constants.REQUESTS_INTERVAL) | |
66 | } | |
67 | ||
68 | function countTotalRequests (callback) { | |
69 | const query = { | |
70 | include: [ this.sequelize.models.Pod ] | |
71 | } | |
72 | ||
73 | return this.count(query).asCallback(callback) | |
74 | } | |
75 | ||
76 | function deactivate () { | |
77 | logger.info('Requests scheduler deactivated.') | |
78 | clearInterval(timer) | |
79 | timer = null | |
80 | } | |
81 | ||
82 | function flush (callback) { | |
83 | removeAll.call(this, function (err) { | |
84 | if (err) logger.error('Cannot flush the requests.', { error: err }) | |
85 | ||
86 | return callback(err) | |
87 | }) | |
88 | } | |
89 | ||
90 | function forceSend () { | |
91 | logger.info('Force requests scheduler sending.') | |
92 | makeRequests.call(this) | |
93 | } | |
94 | ||
95 | function remainingMilliSeconds () { | |
96 | if (timer === null) return -1 | |
97 | ||
98 | return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp) | |
99 | } | |
100 | ||
101 | // --------------------------------------------------------------------------- | |
102 | ||
103 | // Make a requests to friends of a certain type | |
104 | function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | |
105 | if (!callback) callback = function () {} | |
106 | ||
107 | const params = { | |
108 | toPod: toPod, | |
109 | sign: true, // Prove our identity | |
110 | method: 'POST', | |
111 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | |
112 | data: requestsToMake // Requests we need to make | |
113 | } | |
114 | ||
115 | // Make multiple retry requests to all of pods | |
116 | // The function fire some useful callbacks | |
117 | requests.makeSecureRequest(params, function (err, res) { | |
118 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | |
119 | logger.error( | |
120 | 'Error sending secure request to %s pod.', | |
121 | toPod.host, | |
122 | { | |
123 | error: err || new Error('Status code not 20x : ' + res.statusCode) | |
124 | } | |
125 | ) | |
126 | ||
127 | return callback(false) | |
128 | } | |
129 | ||
130 | return callback(true) | |
131 | }) | |
132 | } | |
133 | ||
134 | // Make all the requests of the scheduler | |
135 | function makeRequests () { | |
136 | const self = this | |
137 | const RequestToPod = this.sequelize.models.RequestToPod | |
138 | ||
139 | // We limit the size of the requests (REQUESTS_LIMIT) | |
140 | // We don't want to stuck with the same failing requests so we get a random list | |
141 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { | |
142 | if (err) { | |
143 | logger.error('Cannot get the list of requests.', { err: err }) | |
144 | return // Abort | |
145 | } | |
146 | ||
147 | // If there are no requests, abort | |
148 | if (requests.length === 0) { | |
149 | logger.info('No requests to make.') | |
150 | return | |
151 | } | |
152 | ||
153 | logger.info('Making requests to friends.') | |
154 | ||
155 | // We want to group requests by destinations pod and endpoint | |
156 | const requestsToMakeGrouped = {} | |
157 | ||
158 | requests.forEach(function (request) { | |
159 | request.Pods.forEach(function (toPod) { | |
160 | const hashKey = toPod.id + request.endpoint | |
161 | if (!requestsToMakeGrouped[hashKey]) { | |
162 | requestsToMakeGrouped[hashKey] = { | |
163 | toPodId: toPod.id, | |
164 | endpoint: request.endpoint, | |
165 | ids: [], // request ids, to delete them from the DB in the future | |
166 | datas: [] // requests data, | |
167 | } | |
168 | } | |
169 | ||
170 | requestsToMakeGrouped[hashKey].ids.push(request.id) | |
171 | requestsToMakeGrouped[hashKey].datas.push(request.request) | |
172 | }) | |
173 | }) | |
174 | ||
175 | const goodPods = [] | |
176 | const badPods = [] | |
177 | ||
178 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { | |
179 | const requestToMake = requestsToMakeGrouped[hashKey] | |
180 | ||
181 | // FIXME: SQL request inside a loop :/ | |
182 | self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) { | |
183 | if (err) { | |
184 | logger.error('Error finding pod by id.', { err: err }) | |
185 | return callbackEach() | |
186 | } | |
187 | ||
188 | // Maybe the pod is not our friend anymore so simply remove it | |
189 | if (!toPod) { | |
190 | const requestIdsToDelete = requestToMake.ids | |
191 | ||
192 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) | |
193 | RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) | |
194 | return callbackEach() | |
195 | } | |
196 | ||
197 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { | |
198 | if (success === true) { | |
199 | logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) | |
200 | ||
201 | goodPods.push(requestToMake.toPodId) | |
202 | ||
203 | // Remove the pod id of these request ids | |
204 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach) | |
205 | } else { | |
206 | badPods.push(requestToMake.toPodId) | |
207 | callbackEach() | |
208 | } | |
209 | }) | |
210 | }) | |
211 | }, function () { | |
212 | // All the requests were made, we update the pods score | |
213 | updatePodsScore.call(self, goodPods, badPods) | |
214 | // Flush requests with no pod | |
215 | removeWithEmptyTo.call(self, function (err) { | |
216 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | |
217 | }) | |
218 | }) | |
219 | }) | |
220 | } | |
221 | ||
222 | // Remove pods with a score of 0 (too many requests where they were unreachable) | |
223 | function removeBadPods () { | |
224 | const self = this | |
225 | ||
226 | waterfall([ | |
227 | function findBadPods (callback) { | |
228 | self.sequelize.models.Pod.listBadPods(function (err, pods) { | |
229 | if (err) { | |
230 | logger.error('Cannot find bad pods.', { error: err }) | |
231 | return callback(err) | |
232 | } | |
233 | ||
234 | return callback(null, pods) | |
235 | }) | |
236 | }, | |
237 | ||
238 | function removeTheseBadPods (pods, callback) { | |
239 | each(pods, function (pod, callbackEach) { | |
240 | pod.destroy().asCallback(callbackEach) | |
241 | }, function (err) { | |
242 | return callback(err, pods.length) | |
243 | }) | |
244 | } | |
245 | ], function (err, numberOfPodsRemoved) { | |
246 | if (err) { | |
247 | logger.error('Cannot remove bad pods.', { error: err }) | |
248 | } else if (numberOfPodsRemoved) { | |
249 | logger.info('Removed %d pods.', numberOfPodsRemoved) | |
250 | } else { | |
251 | logger.info('No need to remove bad pods.') | |
252 | } | |
253 | }) | |
254 | } | |
255 | ||
256 | function updatePodsScore (goodPods, badPods) { | |
257 | const self = this | |
258 | const Pod = this.sequelize.models.Pod | |
259 | ||
260 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | |
261 | ||
262 | if (goodPods.length !== 0) { | |
263 | Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | |
264 | if (err) logger.error('Cannot increment scores of good pods.') | |
265 | }) | |
266 | } | |
267 | ||
268 | if (badPods.length !== 0) { | |
269 | Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | |
270 | if (err) logger.error('Cannot decrement scores of bad pods.') | |
271 | removeBadPods.call(self) | |
272 | }) | |
273 | } | |
274 | } | |
275 | ||
276 | function listWithLimitAndRandom (limit, callback) { | |
277 | const self = this | |
278 | ||
279 | self.count().asCallback(function (err, count) { | |
280 | if (err) return callback(err) | |
281 | ||
282 | // Optimization... | |
283 | if (count === 0) return callback(null, []) | |
284 | ||
285 | let start = Math.floor(Math.random() * count) - limit | |
286 | if (start < 0) start = 0 | |
287 | ||
288 | const query = { | |
289 | order: [ | |
290 | [ 'id', 'ASC' ] | |
291 | ], | |
292 | offset: start, | |
293 | limit: limit, | |
294 | include: [ this.sequelize.models.Pod ] | |
295 | } | |
296 | ||
297 | self.findAll(query).asCallback(callback) | |
298 | }) | |
299 | } | |
300 | ||
301 | function removeAll (callback) { | |
302 | // Delete all requests | |
303 | this.truncate({ cascade: true }).asCallback(callback) | |
304 | } | |
305 | ||
306 | function removeWithEmptyTo (callback) { | |
307 | if (!callback) callback = function () {} | |
308 | ||
309 | const query = { | |
310 | where: { | |
311 | id: { | |
312 | $notIn: [ | |
313 | this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"') | |
314 | ] | |
315 | } | |
316 | } | |
317 | } | |
318 | ||
319 | this.destroy(query).asCallback(callback) | |
320 | } |