diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2016-12-11 21:50:51 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2016-12-19 21:22:28 +0100 |
commit | feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c (patch) | |
tree | 2abc9fbc9569760e218fd52835850b757344b420 /server/models/request.js | |
parent | 108626609eda75e4ecc0a83a650a4d53c46220e0 (diff) | |
download | PeerTube-feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c.tar.gz PeerTube-feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c.tar.zst PeerTube-feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c.zip |
First version with PostgreSQL
Diffstat (limited to 'server/models/request.js')
-rw-r--r-- | server/models/request.js | 187 |
1 files changed, 105 insertions, 82 deletions
diff --git a/server/models/request.js b/server/models/request.js index c2cfe83ce..882f747b7 100644 --- a/server/models/request.js +++ b/server/models/request.js | |||
@@ -2,66 +2,58 @@ | |||
2 | 2 | ||
3 | const each = require('async/each') | 3 | const each = require('async/each') |
4 | const eachLimit = require('async/eachLimit') | 4 | const eachLimit = require('async/eachLimit') |
5 | const values = require('lodash/values') | ||
6 | const mongoose = require('mongoose') | ||
7 | const waterfall = require('async/waterfall') | 5 | const waterfall = require('async/waterfall') |
8 | 6 | ||
9 | const constants = require('../initializers/constants') | 7 | const constants = require('../initializers/constants') |
10 | const logger = require('../helpers/logger') | 8 | const logger = require('../helpers/logger') |
11 | const requests = require('../helpers/requests') | 9 | const requests = require('../helpers/requests') |
12 | 10 | ||
13 | const Pod = mongoose.model('Pod') | ||
14 | |||
15 | let timer = null | 11 | let timer = null |
16 | let lastRequestTimestamp = 0 | 12 | let lastRequestTimestamp = 0 |
17 | 13 | ||
18 | // --------------------------------------------------------------------------- | 14 | // --------------------------------------------------------------------------- |
19 | 15 | ||
20 | const RequestSchema = mongoose.Schema({ | 16 | module.exports = function (sequelize, DataTypes) { |
21 | request: mongoose.Schema.Types.Mixed, | 17 | const Request = sequelize.define('Request', |
22 | endpoint: { | 18 | { |
23 | type: String, | 19 | request: { |
24 | enum: [ values(constants.REQUEST_ENDPOINTS) ] | 20 | type: DataTypes.JSON |
25 | }, | 21 | }, |
26 | to: [ | 22 | endpoint: { |
23 | // TODO: enum? | ||
24 | type: DataTypes.STRING | ||
25 | } | ||
26 | }, | ||
27 | { | 27 | { |
28 | type: mongoose.Schema.Types.ObjectId, | 28 | classMethods: { |
29 | ref: 'Pod' | 29 | associate, |
30 | |||
31 | activate, | ||
32 | countTotalRequests, | ||
33 | deactivate, | ||
34 | flush, | ||
35 | forceSend, | ||
36 | remainingMilliSeconds | ||
37 | } | ||
30 | } | 38 | } |
31 | ] | 39 | ) |
32 | }) | ||
33 | |||
34 | RequestSchema.statics = { | ||
35 | activate, | ||
36 | deactivate, | ||
37 | flush, | ||
38 | forceSend, | ||
39 | list, | ||
40 | remainingMilliSeconds | ||
41 | } | ||
42 | |||
43 | RequestSchema.pre('save', function (next) { | ||
44 | const self = this | ||
45 | |||
46 | if (self.to.length === 0) { | ||
47 | Pod.listAllIds(function (err, podIds) { | ||
48 | if (err) return next(err) | ||
49 | |||
50 | // No friends | ||
51 | if (podIds.length === 0) return | ||
52 | |||
53 | self.to = podIds | ||
54 | return next() | ||
55 | }) | ||
56 | } else { | ||
57 | return next() | ||
58 | } | ||
59 | }) | ||
60 | 40 | ||
61 | mongoose.model('Request', RequestSchema) | 41 | return Request |
42 | } | ||
62 | 43 | ||
63 | // ------------------------------ STATICS ------------------------------ | 44 | // ------------------------------ STATICS ------------------------------ |
64 | 45 | ||
46 | function associate (models) { | ||
47 | this.belongsToMany(models.Pod, { | ||
48 | foreignKey: { | ||
49 | name: 'requestId', | ||
50 | allowNull: false | ||
51 | }, | ||
52 | through: models.RequestToPod, | ||
53 | onDelete: 'CASCADE' | ||
54 | }) | ||
55 | } | ||
56 | |||
65 | function activate () { | 57 | function activate () { |
66 | logger.info('Requests scheduler activated.') | 58 | logger.info('Requests scheduler activated.') |
67 | lastRequestTimestamp = Date.now() | 59 | lastRequestTimestamp = Date.now() |
@@ -73,6 +65,14 @@ function activate () { | |||
73 | }, constants.REQUESTS_INTERVAL) | 65 | }, constants.REQUESTS_INTERVAL) |
74 | } | 66 | } |
75 | 67 | ||
68 | function countTotalRequests (callback) { | ||
69 | const query = { | ||
70 | include: [ this.sequelize.models.Pod ] | ||
71 | } | ||
72 | |||
73 | return this.count(query).asCallback(callback) | ||
74 | } | ||
75 | |||
76 | function deactivate () { | 76 | function deactivate () { |
77 | logger.info('Requests scheduler deactivated.') | 77 | logger.info('Requests scheduler deactivated.') |
78 | clearInterval(timer) | 78 | clearInterval(timer) |
@@ -90,10 +90,6 @@ function forceSend () { | |||
90 | makeRequests.call(this) | 90 | makeRequests.call(this) |
91 | } | 91 | } |
92 | 92 | ||
93 | function list (callback) { | ||
94 | this.find({ }, callback) | ||
95 | } | ||
96 | |||
97 | function remainingMilliSeconds () { | 93 | function remainingMilliSeconds () { |
98 | if (timer === null) return -1 | 94 | if (timer === null) return -1 |
99 | 95 | ||
@@ -136,6 +132,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | |||
136 | // Make all the requests of the scheduler | 132 | // Make all the requests of the scheduler |
137 | function makeRequests () { | 133 | function makeRequests () { |
138 | const self = this | 134 | const self = this |
135 | const RequestToPod = this.sequelize.models.RequestToPod | ||
139 | 136 | ||
140 | // We limit the size of the requests (REQUESTS_LIMIT) | 137 | // We limit the size of the requests (REQUESTS_LIMIT) |
141 | // We don't want to stuck with the same failing requests so we get a random list | 138 | // We don't want to stuck with the same failing requests so we get a random list |
@@ -156,20 +153,20 @@ function makeRequests () { | |||
156 | // We want to group requests by destinations pod and endpoint | 153 | // We want to group requests by destinations pod and endpoint |
157 | const requestsToMakeGrouped = {} | 154 | const requestsToMakeGrouped = {} |
158 | 155 | ||
159 | requests.forEach(function (poolRequest) { | 156 | requests.forEach(function (request) { |
160 | poolRequest.to.forEach(function (toPodId) { | 157 | request.Pods.forEach(function (toPod) { |
161 | const hashKey = toPodId + poolRequest.endpoint | 158 | const hashKey = toPod.id + request.endpoint |
162 | if (!requestsToMakeGrouped[hashKey]) { | 159 | if (!requestsToMakeGrouped[hashKey]) { |
163 | requestsToMakeGrouped[hashKey] = { | 160 | requestsToMakeGrouped[hashKey] = { |
164 | toPodId, | 161 | toPodId: toPod.id, |
165 | endpoint: poolRequest.endpoint, | 162 | endpoint: request.endpoint, |
166 | ids: [], // pool request ids, to delete them from the DB in the future | 163 | ids: [], // request ids, to delete them from the DB in the future |
167 | datas: [] // requests data, | 164 | datas: [] // requests data, |
168 | } | 165 | } |
169 | } | 166 | } |
170 | 167 | ||
171 | requestsToMakeGrouped[hashKey].ids.push(poolRequest._id) | 168 | requestsToMakeGrouped[hashKey].ids.push(request.id) |
172 | requestsToMakeGrouped[hashKey].datas.push(poolRequest.request) | 169 | requestsToMakeGrouped[hashKey].datas.push(request.request) |
173 | }) | 170 | }) |
174 | }) | 171 | }) |
175 | 172 | ||
@@ -179,8 +176,8 @@ function makeRequests () { | |||
179 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { | 176 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { |
180 | const requestToMake = requestsToMakeGrouped[hashKey] | 177 | const requestToMake = requestsToMakeGrouped[hashKey] |
181 | 178 | ||
182 | // FIXME: mongodb request inside a loop :/ | 179 | // FIXME: SQL request inside a loop :/ |
183 | Pod.load(requestToMake.toPodId, function (err, toPod) { | 180 | self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) { |
184 | if (err) { | 181 | if (err) { |
185 | logger.error('Error finding pod by id.', { err: err }) | 182 | logger.error('Error finding pod by id.', { err: err }) |
186 | return callbackEach() | 183 | return callbackEach() |
@@ -191,7 +188,7 @@ function makeRequests () { | |||
191 | const requestIdsToDelete = requestToMake.ids | 188 | const requestIdsToDelete = requestToMake.ids |
192 | 189 | ||
193 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) | 190 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) |
194 | removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) | 191 | RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) |
195 | return callbackEach() | 192 | return callbackEach() |
196 | } | 193 | } |
197 | 194 | ||
@@ -202,7 +199,7 @@ function makeRequests () { | |||
202 | goodPods.push(requestToMake.toPodId) | 199 | goodPods.push(requestToMake.toPodId) |
203 | 200 | ||
204 | // Remove the pod id of these request ids | 201 | // Remove the pod id of these request ids |
205 | removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach) | 202 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach) |
206 | } else { | 203 | } else { |
207 | badPods.push(requestToMake.toPodId) | 204 | badPods.push(requestToMake.toPodId) |
208 | callbackEach() | 205 | callbackEach() |
@@ -211,18 +208,22 @@ function makeRequests () { | |||
211 | }) | 208 | }) |
212 | }, function () { | 209 | }, function () { |
213 | // All the requests were made, we update the pods score | 210 | // All the requests were made, we update the pods score |
214 | updatePodsScore(goodPods, badPods) | 211 | updatePodsScore.call(self, goodPods, badPods) |
215 | // Flush requests with no pod | 212 | // Flush requests with no pod |
216 | removeWithEmptyTo.call(self) | 213 | removeWithEmptyTo.call(self, function (err) { |
214 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
215 | }) | ||
217 | }) | 216 | }) |
218 | }) | 217 | }) |
219 | } | 218 | } |
220 | 219 | ||
221 | // Remove pods with a score of 0 (too many requests where they were unreachable) | 220 | // Remove pods with a score of 0 (too many requests where they were unreachable) |
222 | function removeBadPods () { | 221 | function removeBadPods () { |
222 | const self = this | ||
223 | |||
223 | waterfall([ | 224 | waterfall([ |
224 | function findBadPods (callback) { | 225 | function findBadPods (callback) { |
225 | Pod.listBadPods(function (err, pods) { | 226 | self.sequelize.models.Pod.listBadPods(function (err, pods) { |
226 | if (err) { | 227 | if (err) { |
227 | logger.error('Cannot find bad pods.', { error: err }) | 228 | logger.error('Cannot find bad pods.', { error: err }) |
228 | return callback(err) | 229 | return callback(err) |
@@ -233,10 +234,8 @@ function removeBadPods () { | |||
233 | }, | 234 | }, |
234 | 235 | ||
235 | function removeTheseBadPods (pods, callback) { | 236 | function removeTheseBadPods (pods, callback) { |
236 | if (pods.length === 0) return callback(null, 0) | ||
237 | |||
238 | each(pods, function (pod, callbackEach) { | 237 | each(pods, function (pod, callbackEach) { |
239 | pod.remove(callbackEach) | 238 | pod.destroy().asCallback(callbackEach) |
240 | }, function (err) { | 239 | }, function (err) { |
241 | return callback(err, pods.length) | 240 | return callback(err, pods.length) |
242 | }) | 241 | }) |
@@ -253,43 +252,67 @@ function removeBadPods () { | |||
253 | } | 252 | } |
254 | 253 | ||
255 | function updatePodsScore (goodPods, badPods) { | 254 | function updatePodsScore (goodPods, badPods) { |
255 | const self = this | ||
256 | const Pod = this.sequelize.models.Pod | ||
257 | |||
256 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | 258 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) |
257 | 259 | ||
258 | Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | 260 | if (goodPods.length !== 0) { |
259 | if (err) logger.error('Cannot increment scores of good pods.') | 261 | Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { |
260 | }) | 262 | if (err) logger.error('Cannot increment scores of good pods.') |
263 | }) | ||
264 | } | ||
261 | 265 | ||
262 | Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | 266 | if (badPods.length !== 0) { |
263 | if (err) logger.error('Cannot decrement scores of bad pods.') | 267 | Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { |
264 | removeBadPods() | 268 | if (err) logger.error('Cannot decrement scores of bad pods.') |
265 | }) | 269 | removeBadPods.call(self) |
270 | }) | ||
271 | } | ||
266 | } | 272 | } |
267 | 273 | ||
268 | function listWithLimitAndRandom (limit, callback) { | 274 | function listWithLimitAndRandom (limit, callback) { |
269 | const self = this | 275 | const self = this |
270 | 276 | ||
271 | self.count(function (err, count) { | 277 | self.count().asCallback(function (err, count) { |
272 | if (err) return callback(err) | 278 | if (err) return callback(err) |
273 | 279 | ||
280 | // Optimization... | ||
281 | if (count === 0) return callback(null, []) | ||
282 | |||
274 | let start = Math.floor(Math.random() * count) - limit | 283 | let start = Math.floor(Math.random() * count) - limit |
275 | if (start < 0) start = 0 | 284 | if (start < 0) start = 0 |
276 | 285 | ||
277 | self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback) | 286 | const query = { |
287 | order: [ | ||
288 | [ 'id', 'ASC' ] | ||
289 | ], | ||
290 | offset: start, | ||
291 | limit: limit, | ||
292 | include: [ this.sequelize.models.Pod ] | ||
293 | } | ||
294 | |||
295 | self.findAll(query).asCallback(callback) | ||
278 | }) | 296 | }) |
279 | } | 297 | } |
280 | 298 | ||
281 | function removeAll (callback) { | 299 | function removeAll (callback) { |
282 | this.remove({ }, callback) | 300 | // Delete all requests |
283 | } | 301 | this.destroy({ truncate: true }).asCallback(callback) |
284 | |||
285 | function removePodOf (requestsIds, podId, callback) { | ||
286 | if (!callback) callback = function () {} | ||
287 | |||
288 | this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) | ||
289 | } | 302 | } |
290 | 303 | ||
291 | function removeWithEmptyTo (callback) { | 304 | function removeWithEmptyTo (callback) { |
292 | if (!callback) callback = function () {} | 305 | if (!callback) callback = function () {} |
293 | 306 | ||
294 | this.remove({ to: { $size: 0 } }, callback) | 307 | const query = { |
308 | where: { | ||
309 | id: { | ||
310 | $notIn: [ | ||
311 | this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"') | ||
312 | ] | ||
313 | } | ||
314 | } | ||
315 | } | ||
316 | |||
317 | this.destroy(query).asCallback(callback) | ||
295 | } | 318 | } |