aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/models/request.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-12-11 21:50:51 +0100
committerChocobozzz <florian.bigard@gmail.com>2016-12-19 21:22:28 +0100
commitfeb4bdfd9b46e87aadfa7c0d5338cde887d1f58c (patch)
tree2abc9fbc9569760e218fd52835850b757344b420 /server/models/request.js
parent108626609eda75e4ecc0a83a650a4d53c46220e0 (diff)
downloadPeerTube-feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c.tar.gz
PeerTube-feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c.tar.zst
PeerTube-feb4bdfd9b46e87aadfa7c0d5338cde887d1f58c.zip
First version with PostgreSQL
Diffstat (limited to 'server/models/request.js')
-rw-r--r--server/models/request.js187
1 files changed, 105 insertions, 82 deletions
diff --git a/server/models/request.js b/server/models/request.js
index c2cfe83ce..882f747b7 100644
--- a/server/models/request.js
+++ b/server/models/request.js
@@ -2,66 +2,58 @@
2 2
3const each = require('async/each') 3const each = require('async/each')
4const eachLimit = require('async/eachLimit') 4const eachLimit = require('async/eachLimit')
5const values = require('lodash/values')
6const mongoose = require('mongoose')
7const waterfall = require('async/waterfall') 5const waterfall = require('async/waterfall')
8 6
9const constants = require('../initializers/constants') 7const constants = require('../initializers/constants')
10const logger = require('../helpers/logger') 8const logger = require('../helpers/logger')
11const requests = require('../helpers/requests') 9const requests = require('../helpers/requests')
12 10
13const Pod = mongoose.model('Pod')
14
15let timer = null 11let timer = null
16let lastRequestTimestamp = 0 12let lastRequestTimestamp = 0
17 13
18// --------------------------------------------------------------------------- 14// ---------------------------------------------------------------------------
19 15
20const RequestSchema = mongoose.Schema({ 16module.exports = function (sequelize, DataTypes) {
21 request: mongoose.Schema.Types.Mixed, 17 const Request = sequelize.define('Request',
22 endpoint: { 18 {
23 type: String, 19 request: {
24 enum: [ values(constants.REQUEST_ENDPOINTS) ] 20 type: DataTypes.JSON
25 }, 21 },
26 to: [ 22 endpoint: {
23 // TODO: enum?
24 type: DataTypes.STRING
25 }
26 },
27 { 27 {
28 type: mongoose.Schema.Types.ObjectId, 28 classMethods: {
29 ref: 'Pod' 29 associate,
30
31 activate,
32 countTotalRequests,
33 deactivate,
34 flush,
35 forceSend,
36 remainingMilliSeconds
37 }
30 } 38 }
31 ] 39 )
32})
33
34RequestSchema.statics = {
35 activate,
36 deactivate,
37 flush,
38 forceSend,
39 list,
40 remainingMilliSeconds
41}
42
43RequestSchema.pre('save', function (next) {
44 const self = this
45
46 if (self.to.length === 0) {
47 Pod.listAllIds(function (err, podIds) {
48 if (err) return next(err)
49
50 // No friends
51 if (podIds.length === 0) return
52
53 self.to = podIds
54 return next()
55 })
56 } else {
57 return next()
58 }
59})
60 40
61mongoose.model('Request', RequestSchema) 41 return Request
42}
62 43
63// ------------------------------ STATICS ------------------------------ 44// ------------------------------ STATICS ------------------------------
64 45
46function associate (models) {
47 this.belongsToMany(models.Pod, {
48 foreignKey: {
49 name: 'requestId',
50 allowNull: false
51 },
52 through: models.RequestToPod,
53 onDelete: 'CASCADE'
54 })
55}
56
65function activate () { 57function activate () {
66 logger.info('Requests scheduler activated.') 58 logger.info('Requests scheduler activated.')
67 lastRequestTimestamp = Date.now() 59 lastRequestTimestamp = Date.now()
@@ -73,6 +65,14 @@ function activate () {
73 }, constants.REQUESTS_INTERVAL) 65 }, constants.REQUESTS_INTERVAL)
74} 66}
75 67
68function countTotalRequests (callback) {
69 const query = {
70 include: [ this.sequelize.models.Pod ]
71 }
72
73 return this.count(query).asCallback(callback)
74}
75
76function deactivate () { 76function deactivate () {
77 logger.info('Requests scheduler deactivated.') 77 logger.info('Requests scheduler deactivated.')
78 clearInterval(timer) 78 clearInterval(timer)
@@ -90,10 +90,6 @@ function forceSend () {
90 makeRequests.call(this) 90 makeRequests.call(this)
91} 91}
92 92
93function list (callback) {
94 this.find({ }, callback)
95}
96
97function remainingMilliSeconds () { 93function remainingMilliSeconds () {
98 if (timer === null) return -1 94 if (timer === null) return -1
99 95
@@ -136,6 +132,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
136// Make all the requests of the scheduler 132// Make all the requests of the scheduler
137function makeRequests () { 133function makeRequests () {
138 const self = this 134 const self = this
135 const RequestToPod = this.sequelize.models.RequestToPod
139 136
140 // We limit the size of the requests (REQUESTS_LIMIT) 137 // We limit the size of the requests (REQUESTS_LIMIT)
141 // We don't want to stuck with the same failing requests so we get a random list 138 // We don't want to stuck with the same failing requests so we get a random list
@@ -156,20 +153,20 @@ function makeRequests () {
156 // We want to group requests by destinations pod and endpoint 153 // We want to group requests by destinations pod and endpoint
157 const requestsToMakeGrouped = {} 154 const requestsToMakeGrouped = {}
158 155
159 requests.forEach(function (poolRequest) { 156 requests.forEach(function (request) {
160 poolRequest.to.forEach(function (toPodId) { 157 request.Pods.forEach(function (toPod) {
161 const hashKey = toPodId + poolRequest.endpoint 158 const hashKey = toPod.id + request.endpoint
162 if (!requestsToMakeGrouped[hashKey]) { 159 if (!requestsToMakeGrouped[hashKey]) {
163 requestsToMakeGrouped[hashKey] = { 160 requestsToMakeGrouped[hashKey] = {
164 toPodId, 161 toPodId: toPod.id,
165 endpoint: poolRequest.endpoint, 162 endpoint: request.endpoint,
166 ids: [], // pool request ids, to delete them from the DB in the future 163 ids: [], // request ids, to delete them from the DB in the future
167 datas: [] // requests data, 164 datas: [] // requests data,
168 } 165 }
169 } 166 }
170 167
171 requestsToMakeGrouped[hashKey].ids.push(poolRequest._id) 168 requestsToMakeGrouped[hashKey].ids.push(request.id)
172 requestsToMakeGrouped[hashKey].datas.push(poolRequest.request) 169 requestsToMakeGrouped[hashKey].datas.push(request.request)
173 }) 170 })
174 }) 171 })
175 172
@@ -179,8 +176,8 @@ function makeRequests () {
179 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { 176 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
180 const requestToMake = requestsToMakeGrouped[hashKey] 177 const requestToMake = requestsToMakeGrouped[hashKey]
181 178
182 // FIXME: mongodb request inside a loop :/ 179 // FIXME: SQL request inside a loop :/
183 Pod.load(requestToMake.toPodId, function (err, toPod) { 180 self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
184 if (err) { 181 if (err) {
185 logger.error('Error finding pod by id.', { err: err }) 182 logger.error('Error finding pod by id.', { err: err })
186 return callbackEach() 183 return callbackEach()
@@ -191,7 +188,7 @@ function makeRequests () {
191 const requestIdsToDelete = requestToMake.ids 188 const requestIdsToDelete = requestToMake.ids
192 189
193 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) 190 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
194 removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) 191 RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
195 return callbackEach() 192 return callbackEach()
196 } 193 }
197 194
@@ -202,7 +199,7 @@ function makeRequests () {
202 goodPods.push(requestToMake.toPodId) 199 goodPods.push(requestToMake.toPodId)
203 200
204 // Remove the pod id of these request ids 201 // Remove the pod id of these request ids
205 removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach) 202 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
206 } else { 203 } else {
207 badPods.push(requestToMake.toPodId) 204 badPods.push(requestToMake.toPodId)
208 callbackEach() 205 callbackEach()
@@ -211,18 +208,22 @@ function makeRequests () {
211 }) 208 })
212 }, function () { 209 }, function () {
213 // All the requests were made, we update the pods score 210 // All the requests were made, we update the pods score
214 updatePodsScore(goodPods, badPods) 211 updatePodsScore.call(self, goodPods, badPods)
215 // Flush requests with no pod 212 // Flush requests with no pod
216 removeWithEmptyTo.call(self) 213 removeWithEmptyTo.call(self, function (err) {
214 if (err) logger.error('Error when removing requests with no pods.', { error: err })
215 })
217 }) 216 })
218 }) 217 })
219} 218}
220 219
221// Remove pods with a score of 0 (too many requests where they were unreachable) 220// Remove pods with a score of 0 (too many requests where they were unreachable)
222function removeBadPods () { 221function removeBadPods () {
222 const self = this
223
223 waterfall([ 224 waterfall([
224 function findBadPods (callback) { 225 function findBadPods (callback) {
225 Pod.listBadPods(function (err, pods) { 226 self.sequelize.models.Pod.listBadPods(function (err, pods) {
226 if (err) { 227 if (err) {
227 logger.error('Cannot find bad pods.', { error: err }) 228 logger.error('Cannot find bad pods.', { error: err })
228 return callback(err) 229 return callback(err)
@@ -233,10 +234,8 @@ function removeBadPods () {
233 }, 234 },
234 235
235 function removeTheseBadPods (pods, callback) { 236 function removeTheseBadPods (pods, callback) {
236 if (pods.length === 0) return callback(null, 0)
237
238 each(pods, function (pod, callbackEach) { 237 each(pods, function (pod, callbackEach) {
239 pod.remove(callbackEach) 238 pod.destroy().asCallback(callbackEach)
240 }, function (err) { 239 }, function (err) {
241 return callback(err, pods.length) 240 return callback(err, pods.length)
242 }) 241 })
@@ -253,43 +252,67 @@ function removeBadPods () {
253} 252}
254 253
255function updatePodsScore (goodPods, badPods) { 254function updatePodsScore (goodPods, badPods) {
255 const self = this
256 const Pod = this.sequelize.models.Pod
257
256 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) 258 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
257 259
258 Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { 260 if (goodPods.length !== 0) {
259 if (err) logger.error('Cannot increment scores of good pods.') 261 Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
260 }) 262 if (err) logger.error('Cannot increment scores of good pods.')
263 })
264 }
261 265
262 Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { 266 if (badPods.length !== 0) {
263 if (err) logger.error('Cannot decrement scores of bad pods.') 267 Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
264 removeBadPods() 268 if (err) logger.error('Cannot decrement scores of bad pods.')
265 }) 269 removeBadPods.call(self)
270 })
271 }
266} 272}
267 273
268function listWithLimitAndRandom (limit, callback) { 274function listWithLimitAndRandom (limit, callback) {
269 const self = this 275 const self = this
270 276
271 self.count(function (err, count) { 277 self.count().asCallback(function (err, count) {
272 if (err) return callback(err) 278 if (err) return callback(err)
273 279
280 // Optimization...
281 if (count === 0) return callback(null, [])
282
274 let start = Math.floor(Math.random() * count) - limit 283 let start = Math.floor(Math.random() * count) - limit
275 if (start < 0) start = 0 284 if (start < 0) start = 0
276 285
277 self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback) 286 const query = {
287 order: [
288 [ 'id', 'ASC' ]
289 ],
290 offset: start,
291 limit: limit,
292 include: [ this.sequelize.models.Pod ]
293 }
294
295 self.findAll(query).asCallback(callback)
278 }) 296 })
279} 297}
280 298
281function removeAll (callback) { 299function removeAll (callback) {
282 this.remove({ }, callback) 300 // Delete all requests
283} 301 this.destroy({ truncate: true }).asCallback(callback)
284
285function removePodOf (requestsIds, podId, callback) {
286 if (!callback) callback = function () {}
287
288 this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
289} 302}
290 303
291function removeWithEmptyTo (callback) { 304function removeWithEmptyTo (callback) {
292 if (!callback) callback = function () {} 305 if (!callback) callback = function () {}
293 306
294 this.remove({ to: { $size: 0 } }, callback) 307 const query = {
308 where: {
309 id: {
310 $notIn: [
311 this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
312 ]
313 }
314 }
315 }
316
317 this.destroy(query).asCallback(callback)
295} 318}