]>
Commit | Line | Data |
---|---|---|
9f10b292 C |
1 | 'use strict' |
2 | ||
f0f5567b | 3 | const async = require('async') |
35f1c54e | 4 | const map = require('lodash/map') |
9f10b292 | 5 | |
f0f5567b C |
6 | const constants = require('../initializers/constants') |
7 | const logger = require('../helpers/logger') | |
8 | const Pods = require('../models/pods') | |
e3647ae2 | 9 | const Requests = require('../models/requests') |
f0f5567b | 10 | const requests = require('../helpers/requests') |
cbe2f7c3 | 11 | const videos = require('../lib/videos') |
f0f5567b | 12 | const Videos = require('../models/videos') |
9f10b292 | 13 | |
8c255eb5 | 14 | const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE |
f0f5567b | 15 | let timer = null |
9f10b292 | 16 | |
e3647ae2 | 17 | const requestsScheduler = { |
9f10b292 C |
18 | activate: activate, |
19 | addRequest: addRequest, | |
20 | deactivate: deactivate, | |
21 | forceSend: forceSend | |
22 | } | |
23 | ||
24 | function activate () { | |
e3647ae2 C |
25 | logger.info('Requests scheduler activated.') |
26 | timer = setInterval(makeRequests, constants.INTERVAL) | |
9f10b292 C |
27 | } |
28 | ||
8c255eb5 | 29 | // Add request to the scheduler |
9f10b292 | 30 | function addRequest (id, type, request) { |
e3647ae2 | 31 | logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) |
9f10b292 | 32 | |
e3647ae2 | 33 | Requests.findById(id, function (err, entity) { |
9f10b292 | 34 | if (err) { |
8c255eb5 | 35 | logger.error('Error when trying to find a request.', { error: err }) |
9f10b292 C |
36 | return // Abort |
37 | } | |
38 | ||
8c255eb5 | 39 | // If there were already a request with this id in the scheduler... |
9f10b292 C |
40 | if (entity) { |
41 | if (entity.type === type) { | |
42 | logger.error('Cannot insert two same requests.') | |
1fe5076f C |
43 | return // Abort |
44 | } | |
45 | ||
9f10b292 | 46 | // Remove the request of the other type |
e3647ae2 | 47 | Requests.removeRequestById(id, function (err) { |
9f10b292 | 48 | if (err) { |
e3647ae2 | 49 | logger.error('Cannot remove a request.', { error: err }) |
1fe5076f C |
50 | return // Abort |
51 | } | |
9f10b292 C |
52 | }) |
53 | } else { | |
e3647ae2 C |
54 | Requests.create(id, type, request, function (err) { |
55 | if (err) logger.error('Cannot create a request.', { error: err }) | |
9f10b292 C |
56 | return // Abort |
57 | }) | |
58 | } | |
59 | }) | |
60 | } | |
1fe5076f | 61 | |
9f10b292 | 62 | function deactivate () { |
e3647ae2 | 63 | logger.info('Requests scheduler deactivated.') |
9f10b292 C |
64 | clearInterval(timer) |
65 | } | |
1fe5076f | 66 | |
9f10b292 | 67 | function forceSend () { |
e3647ae2 C |
68 | logger.info('Force requests scheduler sending.') |
69 | makeRequests() | |
9f10b292 | 70 | } |
c45f7f84 | 71 | |
9f10b292 | 72 | // --------------------------------------------------------------------------- |
c45f7f84 | 73 | |
e3647ae2 | 74 | module.exports = requestsScheduler |
c45f7f84 | 75 | |
9f10b292 | 76 | // --------------------------------------------------------------------------- |
c45f7f84 | 77 | |
8c255eb5 | 78 | // Make a requests to friends of a certain type |
bc503c2a | 79 | function makeRequest (type, requestsToMake, callback) { |
9f10b292 | 80 | if (!callback) callback = function () {} |
c45f7f84 | 81 | |
9f10b292 C |
82 | Pods.list(function (err, pods) { |
83 | if (err) return callback(err) | |
c45f7f84 | 84 | |
f0f5567b | 85 | const params = { |
8c255eb5 C |
86 | encrypt: true, // Security |
87 | sign: true, // To prove our identity | |
9f10b292 | 88 | method: 'POST', |
8c255eb5 C |
89 | path: null, // We build the path later |
90 | data: requestsToMake // Requests we need to make | |
9f10b292 | 91 | } |
c45f7f84 | 92 | |
8c255eb5 C |
93 | // If this is a valid type, we build the path |
94 | if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { | |
95 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type | |
9f10b292 C |
96 | } else { |
97 | return callback(new Error('Unkown pool request type.')) | |
98 | } | |
99 | ||
bc503c2a C |
100 | const badPods = [] |
101 | const goodPods = [] | |
c45f7f84 | 102 | |
8c255eb5 C |
103 | // Make multiple retry requests to all of pods |
104 | // The function fire some useful callbacks | |
9f10b292 C |
105 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) |
106 | ||
bc503c2a | 107 | function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { |
8c255eb5 | 108 | // We failed the request, add the pod unreachable to the bad pods list |
cbe2f7c3 | 109 | if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { |
bc503c2a | 110 | badPods.push(pod._id) |
9f10b292 | 111 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) |
c45f7f84 | 112 | } else { |
8c255eb5 | 113 | // Request success |
bc503c2a | 114 | goodPods.push(pod._id) |
c45f7f84 C |
115 | } |
116 | ||
bc503c2a | 117 | return callbackEachPodFinished() |
9f10b292 | 118 | } |
c45f7f84 | 119 | |
9f10b292 C |
120 | function callbackAllPodsFinished (err) { |
121 | if (err) return callback(err) | |
c45f7f84 | 122 | |
8c255eb5 | 123 | // All the requests were made, we update the pods score |
bc503c2a | 124 | updatePodsScore(goodPods, badPods) |
9f10b292 C |
125 | callback(null) |
126 | } | |
127 | }) | |
128 | } | |
129 | ||
8c255eb5 | 130 | // Make all the requests of the scheduler |
e3647ae2 | 131 | function makeRequests () { |
e3647ae2 | 132 | Requests.list(function (err, requests) { |
9f10b292 | 133 | if (err) { |
e3647ae2 | 134 | logger.error('Cannot get the list of requests.', { err: err }) |
9f10b292 C |
135 | return // Abort |
136 | } | |
137 | ||
8c255eb5 C |
138 | // If there are no requests, abort |
139 | if (requests.length === 0) { | |
140 | logger.info('No requests to make.') | |
141 | return | |
142 | } | |
9f10b292 | 143 | |
8c255eb5 C |
144 | logger.info('Making requests to friends.') |
145 | ||
146 | const requestsToMake = {} | |
147 | for (const type of REQUEST_SCHEDULER_TYPE) { | |
148 | requestsToMake[type] = { | |
9f10b292 C |
149 | ids: [], |
150 | requests: [] | |
c45f7f84 | 151 | } |
9f10b292 C |
152 | } |
153 | ||
8c255eb5 | 154 | // For each requests to make, we add it to the correct request type |
bc503c2a | 155 | async.each(requests, function (poolRequest, callbackEach) { |
8c255eb5 C |
156 | if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { |
157 | const requestTypeToMake = requestsToMake[poolRequest.type] | |
158 | requestTypeToMake.requests.push(poolRequest.request) | |
159 | requestTypeToMake.ids.push(poolRequest._id) | |
9f10b292 | 160 | } else { |
bc503c2a | 161 | logger.error('Unkown request type.', { request_type: poolRequest.type }) |
9f10b292 | 162 | return // abort |
c45f7f84 | 163 | } |
8d6ae227 | 164 | |
bc503c2a | 165 | callbackEach() |
9f10b292 | 166 | }, function () { |
8c255eb5 C |
167 | for (let type of Object.keys(requestsToMake)) { |
168 | const requestTypeToMake = requestsToMake[type] | |
169 | // If there are requests for this type | |
170 | if (requestTypeToMake.requests.length !== 0) { | |
171 | makeRequest(type, requestTypeToMake.requests, function (err) { | |
172 | if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err }) | |
173 | ||
174 | // We made the requests, so we can remove them from the scheduler | |
175 | Requests.removeRequests(requestTypeToMake.ids) | |
176 | }) | |
177 | } | |
0b697522 | 178 | } |
9f10b292 C |
179 | }) |
180 | }) | |
181 | } | |
0b697522 | 182 | |
8c255eb5 | 183 | // Remove pods with a score of 0 (too many requests where they were unreachable) |
9f10b292 | 184 | function removeBadPods () { |
e856e334 C |
185 | async.waterfall([ |
186 | function findBadPods (callback) { | |
187 | Pods.findBadPods(function (err, pods) { | |
188 | if (err) { | |
189 | logger.error('Cannot find bad pods.', { error: err }) | |
190 | return callback(err) | |
191 | } | |
8d6ae227 | 192 | |
e856e334 C |
193 | return callback(null, pods) |
194 | }) | |
195 | }, | |
8d6ae227 | 196 | |
e856e334 C |
197 | function listVideosOfTheseBadPods (pods, callback) { |
198 | if (pods.length === 0) return callback(null) | |
0b697522 | 199 | |
e856e334 C |
200 | const urls = map(pods, 'url') |
201 | const ids = map(pods, '_id') | |
0b697522 | 202 | |
e856e334 | 203 | Videos.listFromUrls(urls, function (err, videosList) { |
8425cb89 | 204 | if (err) { |
e856e334 C |
205 | logger.error('Cannot list videos urls.', { error: err, urls: urls }) |
206 | return callback(null, ids, []) | |
8425cb89 | 207 | } |
e856e334 C |
208 | |
209 | return callback(null, ids, videosList) | |
45239549 | 210 | }) |
e856e334 C |
211 | }, |
212 | ||
213 | function removeVideosOfTheseBadPods (podIds, videosList, callback) { | |
214 | // We don't have to remove pods, skip | |
215 | if (typeof podIds === 'function') return podIds(null) | |
216 | ||
217 | // Remove the remote videos | |
218 | videos.removeRemoteVideos(videosList, function (err) { | |
219 | if (err) logger.error('Cannot remove remote videos.', { error: err }) | |
220 | ||
221 | return callback(null, podIds) | |
222 | }) | |
223 | }, | |
224 | ||
225 | function removeBadPodsFromDB (podIds, callback) { | |
226 | // We don't have to remove pods, skip | |
227 | if (typeof podIds === 'function') return podIds(null) | |
228 | ||
229 | Pods.removeAllByIds(podIds, callback) | |
230 | } | |
231 | ], function (err, removeResult) { | |
232 | if (err) { | |
233 | logger.error('Cannot remove bad pods.', { error: err }) | |
234 | } else if (removeResult) { | |
235 | const podsRemoved = removeResult.result.n | |
236 | logger.info('Removed %d pods.', podsRemoved) | |
237 | } else { | |
238 | logger.info('No need to remove bad pods.') | |
239 | } | |
9f10b292 C |
240 | }) |
241 | } | |
0b697522 | 242 | |
bc503c2a C |
243 | function updatePodsScore (goodPods, badPods) { |
244 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | |
0b697522 | 245 | |
bc503c2a | 246 | Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { |
9f10b292 C |
247 | if (err) logger.error('Cannot increment scores of good pods.') |
248 | }) | |
8425cb89 | 249 | |
bc503c2a | 250 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { |
8c255eb5 | 251 | if (err) logger.error('Cannot decrement scores of bad pods.') |
9f10b292 C |
252 | removeBadPods() |
253 | }) | |
254 | } |