diff options
Diffstat (limited to 'server/lib/requestsScheduler.js')
-rw-r--r-- | server/lib/requestsScheduler.js | 270 |
1 files changed, 0 insertions, 270 deletions
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js deleted file mode 100644 index b192d8299..000000000 --- a/server/lib/requestsScheduler.js +++ /dev/null | |||
@@ -1,270 +0,0 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const async = require('async') | ||
4 | const map = require('lodash/map') | ||
5 | const mongoose = require('mongoose') | ||
6 | |||
7 | const constants = require('../initializers/constants') | ||
8 | const logger = require('../helpers/logger') | ||
9 | const Pods = require('../models/pods') | ||
10 | const Requests = require('../models/requests') | ||
11 | const requests = require('../helpers/requests') | ||
12 | |||
13 | const Video = mongoose.model('Video') | ||
14 | |||
15 | let timer = null | ||
16 | |||
17 | const requestsScheduler = { | ||
18 | activate: activate, | ||
19 | addRequest: addRequest, | ||
20 | addRequestTo: addRequestTo, | ||
21 | deactivate: deactivate, | ||
22 | flush: flush, | ||
23 | forceSend: forceSend | ||
24 | } | ||
25 | |||
26 | function activate () { | ||
27 | logger.info('Requests scheduler activated.') | ||
28 | timer = setInterval(makeRequests, constants.INTERVAL) | ||
29 | } | ||
30 | |||
31 | // Add request to the scheduler | ||
32 | function addRequest (type, data) { | ||
33 | logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) | ||
34 | |||
35 | const request = { | ||
36 | type: type, | ||
37 | data: data | ||
38 | } | ||
39 | |||
40 | Pods.listAllIds(function (err, podIds) { | ||
41 | if (err) { | ||
42 | logger.debug('Cannot list pod ids.') | ||
43 | return | ||
44 | } | ||
45 | |||
46 | // No friends | ||
47 | if (!podIds) return | ||
48 | |||
49 | Requests.create(request, podIds, function (err) { | ||
50 | if (err) logger.error('Cannot create a request.', { error: err }) | ||
51 | }) | ||
52 | }) | ||
53 | } | ||
54 | |||
55 | function addRequestTo (podIds, type, data) { | ||
56 | const request = { | ||
57 | type: type, | ||
58 | data: data | ||
59 | } | ||
60 | |||
61 | Requests.create(request, podIds, function (err) { | ||
62 | if (err) logger.error('Cannot create a request.', { error: err }) | ||
63 | }) | ||
64 | } | ||
65 | |||
66 | function deactivate () { | ||
67 | logger.info('Requests scheduler deactivated.') | ||
68 | clearInterval(timer) | ||
69 | } | ||
70 | |||
71 | function flush () { | ||
72 | Requests.removeAll(function (err) { | ||
73 | if (err) { | ||
74 | logger.error('Cannot flush the requests.', { error: err }) | ||
75 | } | ||
76 | }) | ||
77 | } | ||
78 | |||
79 | function forceSend () { | ||
80 | logger.info('Force requests scheduler sending.') | ||
81 | makeRequests() | ||
82 | } | ||
83 | |||
84 | // --------------------------------------------------------------------------- | ||
85 | |||
86 | module.exports = requestsScheduler | ||
87 | |||
88 | // --------------------------------------------------------------------------- | ||
89 | |||
90 | // Make a requests to friends of a certain type | ||
91 | function makeRequest (toPod, requestsToMake, callback) { | ||
92 | if (!callback) callback = function () {} | ||
93 | |||
94 | const params = { | ||
95 | toPod: toPod, | ||
96 | encrypt: true, // Security | ||
97 | sign: true, // To prove our identity | ||
98 | method: 'POST', | ||
99 | path: '/api/' + constants.API_VERSION + '/remote/videos', | ||
100 | data: requestsToMake // Requests we need to make | ||
101 | } | ||
102 | |||
103 | // Make multiple retry requests to all of pods | ||
104 | // The function fire some useful callbacks | ||
105 | requests.makeSecureRequest(params, function (err, res) { | ||
106 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
107 | logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) | ||
108 | |||
109 | return callback(false) | ||
110 | } | ||
111 | |||
112 | return callback(true) | ||
113 | }) | ||
114 | } | ||
115 | |||
116 | // Make all the requests of the scheduler | ||
117 | function makeRequests () { | ||
118 | Requests.list(function (err, requests) { | ||
119 | if (err) { | ||
120 | logger.error('Cannot get the list of requests.', { err: err }) | ||
121 | return // Abort | ||
122 | } | ||
123 | |||
124 | // If there are no requests, abort | ||
125 | if (requests.length === 0) { | ||
126 | logger.info('No requests to make.') | ||
127 | return | ||
128 | } | ||
129 | |||
130 | logger.info('Making requests to friends.') | ||
131 | |||
132 | // Requests by pods id | ||
133 | const requestsToMake = {} | ||
134 | |||
135 | requests.forEach(function (poolRequest) { | ||
136 | poolRequest.to.forEach(function (toPodId) { | ||
137 | if (!requestsToMake[toPodId]) { | ||
138 | requestsToMake[toPodId] = { | ||
139 | ids: [], | ||
140 | datas: [] | ||
141 | } | ||
142 | } | ||
143 | |||
144 | requestsToMake[toPodId].ids.push(poolRequest._id) | ||
145 | requestsToMake[toPodId].datas.push(poolRequest.request) | ||
146 | }) | ||
147 | }) | ||
148 | |||
149 | const goodPods = [] | ||
150 | const badPods = [] | ||
151 | |||
152 | async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { | ||
153 | const requestToMake = requestsToMake[toPodId] | ||
154 | |||
155 | // FIXME: mongodb request inside a loop :/ | ||
156 | Pods.findById(toPodId, function (err, toPod) { | ||
157 | if (err) return logger.error('Error finding pod by id.', { err: err }) | ||
158 | |||
159 | // Maybe the pod is not our friend anymore so simply remove them | ||
160 | if (!toPod) { | ||
161 | Requests.removePodOf(requestToMake.ids, toPodId) | ||
162 | return callbackEach() | ||
163 | } | ||
164 | |||
165 | makeRequest(toPod, requestToMake.datas, function (success) { | ||
166 | if (err) { | ||
167 | logger.error('Errors when sent request to %s.', toPod.url, { error: err }) | ||
168 | // Do not stop the process just for one error | ||
169 | return callbackEach() | ||
170 | } | ||
171 | |||
172 | if (success === true) { | ||
173 | logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) | ||
174 | |||
175 | // Remove the pod id of these request ids | ||
176 | Requests.removePodOf(requestToMake.ids, toPodId) | ||
177 | goodPods.push(toPodId) | ||
178 | } else { | ||
179 | badPods.push(toPodId) | ||
180 | } | ||
181 | |||
182 | callbackEach() | ||
183 | }) | ||
184 | }) | ||
185 | }, function () { | ||
186 | // All the requests were made, we update the pods score | ||
187 | updatePodsScore(goodPods, badPods) | ||
188 | // Flush requests with no pod | ||
189 | Requests.removeWithEmptyTo() | ||
190 | }) | ||
191 | }) | ||
192 | } | ||
193 | |||
194 | // Remove pods with a score of 0 (too many requests where they were unreachable) | ||
195 | function removeBadPods () { | ||
196 | async.waterfall([ | ||
197 | function findBadPods (callback) { | ||
198 | Pods.findBadPods(function (err, pods) { | ||
199 | if (err) { | ||
200 | logger.error('Cannot find bad pods.', { error: err }) | ||
201 | return callback(err) | ||
202 | } | ||
203 | |||
204 | return callback(null, pods) | ||
205 | }) | ||
206 | }, | ||
207 | |||
208 | function listVideosOfTheseBadPods (pods, callback) { | ||
209 | if (pods.length === 0) return callback(null) | ||
210 | |||
211 | const urls = map(pods, 'url') | ||
212 | const ids = map(pods, '_id') | ||
213 | |||
214 | Video.listByUrls(urls, function (err, videosList) { | ||
215 | if (err) { | ||
216 | logger.error('Cannot list videos urls.', { error: err, urls: urls }) | ||
217 | return callback(null, ids, []) | ||
218 | } | ||
219 | |||
220 | return callback(null, ids, videosList) | ||
221 | }) | ||
222 | }, | ||
223 | |||
224 | function removeVideosOfTheseBadPods (podIds, videosList, callback) { | ||
225 | // We don't have to remove pods, skip | ||
226 | if (typeof podIds === 'function') return podIds(null) | ||
227 | |||
228 | async.each(videosList, function (video, callbackEach) { | ||
229 | video.remove(callbackEach) | ||
230 | }, function (err) { | ||
231 | if (err) { | ||
232 | // Don't stop the process | ||
233 | logger.error('Error while removing videos of bad pods.', { error: err }) | ||
234 | return | ||
235 | } | ||
236 | |||
237 | return callback(null, podIds) | ||
238 | }) | ||
239 | }, | ||
240 | |||
241 | function removeBadPodsFromDB (podIds, callback) { | ||
242 | // We don't have to remove pods, skip | ||
243 | if (typeof podIds === 'function') return podIds(null) | ||
244 | |||
245 | Pods.removeAllByIds(podIds, callback) | ||
246 | } | ||
247 | ], function (err, removeResult) { | ||
248 | if (err) { | ||
249 | logger.error('Cannot remove bad pods.', { error: err }) | ||
250 | } else if (removeResult) { | ||
251 | const podsRemoved = removeResult.result.n | ||
252 | logger.info('Removed %d pods.', podsRemoved) | ||
253 | } else { | ||
254 | logger.info('No need to remove bad pods.') | ||
255 | } | ||
256 | }) | ||
257 | } | ||
258 | |||
259 | function updatePodsScore (goodPods, badPods) { | ||
260 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | ||
261 | |||
262 | Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | ||
263 | if (err) logger.error('Cannot increment scores of good pods.') | ||
264 | }) | ||
265 | |||
266 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | ||
267 | if (err) logger.error('Cannot decrement scores of bad pods.') | ||
268 | removeBadPods() | ||
269 | }) | ||
270 | } | ||