diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-01-12 15:20:03 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-01-12 15:20:03 +0100 |
commit | 99fe265a5fc077cb66c322e7f3d191ff7110aea0 (patch) | |
tree | c9e04ccfcc5496d2300d7c26db5833e494b4cdad /server/models/request.js | |
parent | fcc5f77b95d330bfcb439c172b7fcc58f3162e4d (diff) | |
parent | 91cc839af88730ba55f84997c56b85ea100070a7 (diff) | |
download | PeerTube-99fe265a5fc077cb66c322e7f3d191ff7110aea0.tar.gz PeerTube-99fe265a5fc077cb66c322e7f3d191ff7110aea0.tar.zst PeerTube-99fe265a5fc077cb66c322e7f3d191ff7110aea0.zip |
Merge branch 'postgresql'
Diffstat (limited to 'server/models/request.js')
-rw-r--r-- | server/models/request.js | 275 |
1 files changed, 164 insertions, 111 deletions
diff --git a/server/models/request.js b/server/models/request.js index c2cfe83ce..cd52ea767 100644 --- a/server/models/request.js +++ b/server/models/request.js | |||
@@ -2,66 +2,60 @@ | |||
2 | 2 | ||
3 | const each = require('async/each') | 3 | const each = require('async/each') |
4 | const eachLimit = require('async/eachLimit') | 4 | const eachLimit = require('async/eachLimit') |
5 | const values = require('lodash/values') | ||
6 | const mongoose = require('mongoose') | ||
7 | const waterfall = require('async/waterfall') | 5 | const waterfall = require('async/waterfall') |
6 | const values = require('lodash/values') | ||
8 | 7 | ||
9 | const constants = require('../initializers/constants') | 8 | const constants = require('../initializers/constants') |
10 | const logger = require('../helpers/logger') | 9 | const logger = require('../helpers/logger') |
11 | const requests = require('../helpers/requests') | 10 | const requests = require('../helpers/requests') |
12 | 11 | ||
13 | const Pod = mongoose.model('Pod') | ||
14 | |||
15 | let timer = null | 12 | let timer = null |
16 | let lastRequestTimestamp = 0 | 13 | let lastRequestTimestamp = 0 |
17 | 14 | ||
18 | // --------------------------------------------------------------------------- | 15 | // --------------------------------------------------------------------------- |
19 | 16 | ||
20 | const RequestSchema = mongoose.Schema({ | 17 | module.exports = function (sequelize, DataTypes) { |
21 | request: mongoose.Schema.Types.Mixed, | 18 | const Request = sequelize.define('Request', |
22 | endpoint: { | 19 | { |
23 | type: String, | 20 | request: { |
24 | enum: [ values(constants.REQUEST_ENDPOINTS) ] | 21 | type: DataTypes.JSON, |
25 | }, | 22 | allowNull: false |
26 | to: [ | 23 | }, |
24 | endpoint: { | ||
25 | type: DataTypes.ENUM(values(constants.REQUEST_ENDPOINTS)), | ||
26 | allowNull: false | ||
27 | } | ||
28 | }, | ||
27 | { | 29 | { |
28 | type: mongoose.Schema.Types.ObjectId, | 30 | classMethods: { |
29 | ref: 'Pod' | 31 | associate, |
32 | |||
33 | activate, | ||
34 | countTotalRequests, | ||
35 | deactivate, | ||
36 | flush, | ||
37 | forceSend, | ||
38 | remainingMilliSeconds | ||
39 | } | ||
30 | } | 40 | } |
31 | ] | 41 | ) |
32 | }) | ||
33 | |||
34 | RequestSchema.statics = { | ||
35 | activate, | ||
36 | deactivate, | ||
37 | flush, | ||
38 | forceSend, | ||
39 | list, | ||
40 | remainingMilliSeconds | ||
41 | } | ||
42 | |||
43 | RequestSchema.pre('save', function (next) { | ||
44 | const self = this | ||
45 | |||
46 | if (self.to.length === 0) { | ||
47 | Pod.listAllIds(function (err, podIds) { | ||
48 | if (err) return next(err) | ||
49 | 42 | ||
50 | // No friends | 43 | return Request |
51 | if (podIds.length === 0) return | 44 | } |
52 | |||
53 | self.to = podIds | ||
54 | return next() | ||
55 | }) | ||
56 | } else { | ||
57 | return next() | ||
58 | } | ||
59 | }) | ||
60 | |||
61 | mongoose.model('Request', RequestSchema) | ||
62 | 45 | ||
63 | // ------------------------------ STATICS ------------------------------ | 46 | // ------------------------------ STATICS ------------------------------ |
64 | 47 | ||
48 | function associate (models) { | ||
49 | this.belongsToMany(models.Pod, { | ||
50 | foreignKey: { | ||
51 | name: 'requestId', | ||
52 | allowNull: false | ||
53 | }, | ||
54 | through: models.RequestToPod, | ||
55 | onDelete: 'CASCADE' | ||
56 | }) | ||
57 | } | ||
58 | |||
65 | function activate () { | 59 | function activate () { |
66 | logger.info('Requests scheduler activated.') | 60 | logger.info('Requests scheduler activated.') |
67 | lastRequestTimestamp = Date.now() | 61 | lastRequestTimestamp = Date.now() |
@@ -73,15 +67,25 @@ function activate () { | |||
73 | }, constants.REQUESTS_INTERVAL) | 67 | }, constants.REQUESTS_INTERVAL) |
74 | } | 68 | } |
75 | 69 | ||
70 | function countTotalRequests (callback) { | ||
71 | const query = { | ||
72 | include: [ this.sequelize.models.Pod ] | ||
73 | } | ||
74 | |||
75 | return this.count(query).asCallback(callback) | ||
76 | } | ||
77 | |||
76 | function deactivate () { | 78 | function deactivate () { |
77 | logger.info('Requests scheduler deactivated.') | 79 | logger.info('Requests scheduler deactivated.') |
78 | clearInterval(timer) | 80 | clearInterval(timer) |
79 | timer = null | 81 | timer = null |
80 | } | 82 | } |
81 | 83 | ||
82 | function flush () { | 84 | function flush (callback) { |
83 | removeAll.call(this, function (err) { | 85 | removeAll.call(this, function (err) { |
84 | if (err) logger.error('Cannot flush the requests.', { error: err }) | 86 | if (err) logger.error('Cannot flush the requests.', { error: err }) |
87 | |||
88 | return callback(err) | ||
85 | }) | 89 | }) |
86 | } | 90 | } |
87 | 91 | ||
@@ -90,10 +94,6 @@ function forceSend () { | |||
90 | makeRequests.call(this) | 94 | makeRequests.call(this) |
91 | } | 95 | } |
92 | 96 | ||
93 | function list (callback) { | ||
94 | this.find({ }, callback) | ||
95 | } | ||
96 | |||
97 | function remainingMilliSeconds () { | 97 | function remainingMilliSeconds () { |
98 | if (timer === null) return -1 | 98 | if (timer === null) return -1 |
99 | 99 | ||
@@ -122,7 +122,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | |||
122 | 'Error sending secure request to %s pod.', | 122 | 'Error sending secure request to %s pod.', |
123 | toPod.host, | 123 | toPod.host, |
124 | { | 124 | { |
125 | error: err || new Error('Status code not 20x : ' + res.statusCode) | 125 | error: err ? err.message : 'Status code not 20x : ' + res.statusCode |
126 | } | 126 | } |
127 | ) | 127 | ) |
128 | 128 | ||
@@ -136,10 +136,11 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | |||
136 | // Make all the requests of the scheduler | 136 | // Make all the requests of the scheduler |
137 | function makeRequests () { | 137 | function makeRequests () { |
138 | const self = this | 138 | const self = this |
139 | const RequestToPod = this.sequelize.models.RequestToPod | ||
139 | 140 | ||
140 | // We limit the size of the requests (REQUESTS_LIMIT) | 141 | // We limit the size of the requests |
141 | // We don't want to stuck with the same failing requests so we get a random list | 142 | // We don't want to stuck with the same failing requests so we get a random list |
142 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { | 143 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { |
143 | if (err) { | 144 | if (err) { |
144 | logger.error('Cannot get the list of requests.', { err: err }) | 145 | logger.error('Cannot get the list of requests.', { err: err }) |
145 | return // Abort | 146 | return // Abort |
@@ -151,78 +152,77 @@ function makeRequests () { | |||
151 | return | 152 | return |
152 | } | 153 | } |
153 | 154 | ||
154 | logger.info('Making requests to friends.') | ||
155 | |||
156 | // We want to group requests by destinations pod and endpoint | 155 | // We want to group requests by destinations pod and endpoint |
157 | const requestsToMakeGrouped = {} | 156 | const requestsToMakeGrouped = {} |
157 | Object.keys(requests).forEach(function (toPodId) { | ||
158 | requests[toPodId].forEach(function (data) { | ||
159 | const request = data.request | ||
160 | const pod = data.pod | ||
161 | const hashKey = toPodId + request.endpoint | ||
158 | 162 | ||
159 | requests.forEach(function (poolRequest) { | ||
160 | poolRequest.to.forEach(function (toPodId) { | ||
161 | const hashKey = toPodId + poolRequest.endpoint | ||
162 | if (!requestsToMakeGrouped[hashKey]) { | 163 | if (!requestsToMakeGrouped[hashKey]) { |
163 | requestsToMakeGrouped[hashKey] = { | 164 | requestsToMakeGrouped[hashKey] = { |
164 | toPodId, | 165 | toPod: pod, |
165 | endpoint: poolRequest.endpoint, | 166 | endpoint: request.endpoint, |
166 | ids: [], // pool request ids, to delete them from the DB in the future | 167 | ids: [], // request ids, to delete them from the DB in the future |
167 | datas: [] // requests data, | 168 | datas: [] // requests data, |
168 | } | 169 | } |
169 | } | 170 | } |
170 | 171 | ||
171 | requestsToMakeGrouped[hashKey].ids.push(poolRequest._id) | 172 | requestsToMakeGrouped[hashKey].ids.push(request.id) |
172 | requestsToMakeGrouped[hashKey].datas.push(poolRequest.request) | 173 | requestsToMakeGrouped[hashKey].datas.push(request.request) |
173 | }) | 174 | }) |
174 | }) | 175 | }) |
175 | 176 | ||
177 | logger.info('Making requests to friends.') | ||
178 | |||
176 | const goodPods = [] | 179 | const goodPods = [] |
177 | const badPods = [] | 180 | const badPods = [] |
178 | 181 | ||
179 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { | 182 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { |
180 | const requestToMake = requestsToMakeGrouped[hashKey] | 183 | const requestToMake = requestsToMakeGrouped[hashKey] |
184 | const toPod = requestToMake.toPod | ||
181 | 185 | ||
182 | // FIXME: mongodb request inside a loop :/ | 186 | // Maybe the pod is not our friend anymore so simply remove it |
183 | Pod.load(requestToMake.toPodId, function (err, toPod) { | 187 | if (!toPod) { |
184 | if (err) { | 188 | const requestIdsToDelete = requestToMake.ids |
185 | logger.error('Error finding pod by id.', { err: err }) | ||
186 | return callbackEach() | ||
187 | } | ||
188 | |||
189 | // Maybe the pod is not our friend anymore so simply remove it | ||
190 | if (!toPod) { | ||
191 | const requestIdsToDelete = requestToMake.ids | ||
192 | 189 | ||
193 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) | 190 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) |
194 | removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) | 191 | RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id) |
195 | return callbackEach() | 192 | return callbackEach() |
196 | } | 193 | } |
197 | 194 | ||
198 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { | 195 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { |
199 | if (success === true) { | 196 | if (success === true) { |
200 | logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) | 197 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
201 | 198 | ||
202 | goodPods.push(requestToMake.toPodId) | 199 | goodPods.push(requestToMake.toPod.id) |
203 | 200 | ||
204 | // Remove the pod id of these request ids | 201 | // Remove the pod id of these request ids |
205 | removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach) | 202 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) |
206 | } else { | 203 | } else { |
207 | badPods.push(requestToMake.toPodId) | 204 | badPods.push(requestToMake.toPod.id) |
208 | callbackEach() | 205 | callbackEach() |
209 | } | 206 | } |
210 | }) | ||
211 | }) | 207 | }) |
212 | }, function () { | 208 | }, function () { |
213 | // All the requests were made, we update the pods score | 209 | // All the requests were made, we update the pods score |
214 | updatePodsScore(goodPods, badPods) | 210 | updatePodsScore.call(self, goodPods, badPods) |
215 | // Flush requests with no pod | 211 | // Flush requests with no pod |
216 | removeWithEmptyTo.call(self) | 212 | removeWithEmptyTo.call(self, function (err) { |
213 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
214 | }) | ||
217 | }) | 215 | }) |
218 | }) | 216 | }) |
219 | } | 217 | } |
220 | 218 | ||
221 | // Remove pods with a score of 0 (too many requests where they were unreachable) | 219 | // Remove pods with a score of 0 (too many requests where they were unreachable) |
222 | function removeBadPods () { | 220 | function removeBadPods () { |
221 | const self = this | ||
222 | |||
223 | waterfall([ | 223 | waterfall([ |
224 | function findBadPods (callback) { | 224 | function findBadPods (callback) { |
225 | Pod.listBadPods(function (err, pods) { | 225 | self.sequelize.models.Pod.listBadPods(function (err, pods) { |
226 | if (err) { | 226 | if (err) { |
227 | logger.error('Cannot find bad pods.', { error: err }) | 227 | logger.error('Cannot find bad pods.', { error: err }) |
228 | return callback(err) | 228 | return callback(err) |
@@ -233,10 +233,8 @@ function removeBadPods () { | |||
233 | }, | 233 | }, |
234 | 234 | ||
235 | function removeTheseBadPods (pods, callback) { | 235 | function removeTheseBadPods (pods, callback) { |
236 | if (pods.length === 0) return callback(null, 0) | ||
237 | |||
238 | each(pods, function (pod, callbackEach) { | 236 | each(pods, function (pod, callbackEach) { |
239 | pod.remove(callbackEach) | 237 | pod.destroy().asCallback(callbackEach) |
240 | }, function (err) { | 238 | }, function (err) { |
241 | return callback(err, pods.length) | 239 | return callback(err, pods.length) |
242 | }) | 240 | }) |
@@ -253,43 +251,98 @@ function removeBadPods () { | |||
253 | } | 251 | } |
254 | 252 | ||
255 | function updatePodsScore (goodPods, badPods) { | 253 | function updatePodsScore (goodPods, badPods) { |
254 | const self = this | ||
255 | const Pod = this.sequelize.models.Pod | ||
256 | |||
256 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | 257 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) |
257 | 258 | ||
258 | Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | 259 | if (goodPods.length !== 0) { |
259 | if (err) logger.error('Cannot increment scores of good pods.') | 260 | Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { |
260 | }) | 261 | if (err) logger.error('Cannot increment scores of good pods.', { error: err }) |
262 | }) | ||
263 | } | ||
261 | 264 | ||
262 | Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | 265 | if (badPods.length !== 0) { |
263 | if (err) logger.error('Cannot decrement scores of bad pods.') | 266 | Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { |
264 | removeBadPods() | 267 | if (err) logger.error('Cannot decrement scores of bad pods.', { error: err }) |
265 | }) | 268 | removeBadPods.call(self) |
269 | }) | ||
270 | } | ||
266 | } | 271 | } |
267 | 272 | ||
268 | function listWithLimitAndRandom (limit, callback) { | 273 | function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { |
269 | const self = this | 274 | const self = this |
275 | const Pod = this.sequelize.models.Pod | ||
270 | 276 | ||
271 | self.count(function (err, count) { | 277 | Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) { |
272 | if (err) return callback(err) | 278 | if (err) return callback(err) |
273 | 279 | ||
274 | let start = Math.floor(Math.random() * count) - limit | 280 | // We don't have friends that have requests |
275 | if (start < 0) start = 0 | 281 | if (podIds.length === 0) return callback(null, []) |
282 | |||
283 | // The the first x requests of these pods | ||
284 | // It is very important to sort by id ASC to keep the requests order! | ||
285 | const query = { | ||
286 | order: [ | ||
287 | [ 'id', 'ASC' ] | ||
288 | ], | ||
289 | include: [ | ||
290 | { | ||
291 | model: self.sequelize.models.Pod, | ||
292 | where: { | ||
293 | id: { | ||
294 | $in: podIds | ||
295 | } | ||
296 | } | ||
297 | } | ||
298 | ] | ||
299 | } | ||
300 | |||
301 | self.findAll(query).asCallback(function (err, requests) { | ||
302 | if (err) return callback(err) | ||
276 | 303 | ||
277 | self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback) | 304 | const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod) |
305 | return callback(err, requestsGrouped) | ||
306 | }) | ||
278 | }) | 307 | }) |
279 | } | 308 | } |
280 | 309 | ||
281 | function removeAll (callback) { | 310 | function groupAndTruncateRequests (requests, limitRequestsPerPod) { |
282 | this.remove({ }, callback) | 311 | const requestsGrouped = {} |
283 | } | ||
284 | 312 | ||
285 | function removePodOf (requestsIds, podId, callback) { | 313 | requests.forEach(function (request) { |
286 | if (!callback) callback = function () {} | 314 | request.Pods.forEach(function (pod) { |
315 | if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] | ||
287 | 316 | ||
288 | this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) | 317 | if (requestsGrouped[pod.id].length < limitRequestsPerPod) { |
318 | requestsGrouped[pod.id].push({ | ||
319 | request, | ||
320 | pod | ||
321 | }) | ||
322 | } | ||
323 | }) | ||
324 | }) | ||
325 | |||
326 | return requestsGrouped | ||
327 | } | ||
328 | |||
329 | function removeAll (callback) { | ||
330 | // Delete all requests | ||
331 | this.truncate({ cascade: true }).asCallback(callback) | ||
289 | } | 332 | } |
290 | 333 | ||
291 | function removeWithEmptyTo (callback) { | 334 | function removeWithEmptyTo (callback) { |
292 | if (!callback) callback = function () {} | 335 | if (!callback) callback = function () {} |
293 | 336 | ||
294 | this.remove({ to: { $size: 0 } }, callback) | 337 | const query = { |
338 | where: { | ||
339 | id: { | ||
340 | $notIn: [ | ||
341 | this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"') | ||
342 | ] | ||
343 | } | ||
344 | } | ||
345 | } | ||
346 | |||
347 | this.destroy(query).asCallback(callback) | ||
295 | } | 348 | } |