aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/models/request.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-01-12 15:20:03 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-01-12 15:20:03 +0100
commit99fe265a5fc077cb66c322e7f3d191ff7110aea0 (patch)
treec9e04ccfcc5496d2300d7c26db5833e494b4cdad /server/models/request.js
parentfcc5f77b95d330bfcb439c172b7fcc58f3162e4d (diff)
parent91cc839af88730ba55f84997c56b85ea100070a7 (diff)
downloadPeerTube-99fe265a5fc077cb66c322e7f3d191ff7110aea0.tar.gz
PeerTube-99fe265a5fc077cb66c322e7f3d191ff7110aea0.tar.zst
PeerTube-99fe265a5fc077cb66c322e7f3d191ff7110aea0.zip
Merge branch 'postgresql'
Diffstat (limited to 'server/models/request.js')
-rw-r--r--server/models/request.js275
1 files changed, 164 insertions, 111 deletions
diff --git a/server/models/request.js b/server/models/request.js
index c2cfe83ce..cd52ea767 100644
--- a/server/models/request.js
+++ b/server/models/request.js
@@ -2,66 +2,60 @@
2 2
3const each = require('async/each') 3const each = require('async/each')
4const eachLimit = require('async/eachLimit') 4const eachLimit = require('async/eachLimit')
5const values = require('lodash/values')
6const mongoose = require('mongoose')
7const waterfall = require('async/waterfall') 5const waterfall = require('async/waterfall')
6const values = require('lodash/values')
8 7
9const constants = require('../initializers/constants') 8const constants = require('../initializers/constants')
10const logger = require('../helpers/logger') 9const logger = require('../helpers/logger')
11const requests = require('../helpers/requests') 10const requests = require('../helpers/requests')
12 11
13const Pod = mongoose.model('Pod')
14
15let timer = null 12let timer = null
16let lastRequestTimestamp = 0 13let lastRequestTimestamp = 0
17 14
18// --------------------------------------------------------------------------- 15// ---------------------------------------------------------------------------
19 16
20const RequestSchema = mongoose.Schema({ 17module.exports = function (sequelize, DataTypes) {
21 request: mongoose.Schema.Types.Mixed, 18 const Request = sequelize.define('Request',
22 endpoint: { 19 {
23 type: String, 20 request: {
24 enum: [ values(constants.REQUEST_ENDPOINTS) ] 21 type: DataTypes.JSON,
25 }, 22 allowNull: false
26 to: [ 23 },
24 endpoint: {
25 type: DataTypes.ENUM(values(constants.REQUEST_ENDPOINTS)),
26 allowNull: false
27 }
28 },
27 { 29 {
28 type: mongoose.Schema.Types.ObjectId, 30 classMethods: {
29 ref: 'Pod' 31 associate,
32
33 activate,
34 countTotalRequests,
35 deactivate,
36 flush,
37 forceSend,
38 remainingMilliSeconds
39 }
30 } 40 }
31 ] 41 )
32})
33
34RequestSchema.statics = {
35 activate,
36 deactivate,
37 flush,
38 forceSend,
39 list,
40 remainingMilliSeconds
41}
42
43RequestSchema.pre('save', function (next) {
44 const self = this
45
46 if (self.to.length === 0) {
47 Pod.listAllIds(function (err, podIds) {
48 if (err) return next(err)
49 42
50 // No friends 43 return Request
51 if (podIds.length === 0) return 44}
52
53 self.to = podIds
54 return next()
55 })
56 } else {
57 return next()
58 }
59})
60
61mongoose.model('Request', RequestSchema)
62 45
63// ------------------------------ STATICS ------------------------------ 46// ------------------------------ STATICS ------------------------------
64 47
48function associate (models) {
49 this.belongsToMany(models.Pod, {
50 foreignKey: {
51 name: 'requestId',
52 allowNull: false
53 },
54 through: models.RequestToPod,
55 onDelete: 'CASCADE'
56 })
57}
58
65function activate () { 59function activate () {
66 logger.info('Requests scheduler activated.') 60 logger.info('Requests scheduler activated.')
67 lastRequestTimestamp = Date.now() 61 lastRequestTimestamp = Date.now()
@@ -73,15 +67,25 @@ function activate () {
73 }, constants.REQUESTS_INTERVAL) 67 }, constants.REQUESTS_INTERVAL)
74} 68}
75 69
70function countTotalRequests (callback) {
71 const query = {
72 include: [ this.sequelize.models.Pod ]
73 }
74
75 return this.count(query).asCallback(callback)
76}
77
76function deactivate () { 78function deactivate () {
77 logger.info('Requests scheduler deactivated.') 79 logger.info('Requests scheduler deactivated.')
78 clearInterval(timer) 80 clearInterval(timer)
79 timer = null 81 timer = null
80} 82}
81 83
82function flush () { 84function flush (callback) {
83 removeAll.call(this, function (err) { 85 removeAll.call(this, function (err) {
84 if (err) logger.error('Cannot flush the requests.', { error: err }) 86 if (err) logger.error('Cannot flush the requests.', { error: err })
87
88 return callback(err)
85 }) 89 })
86} 90}
87 91
@@ -90,10 +94,6 @@ function forceSend () {
90 makeRequests.call(this) 94 makeRequests.call(this)
91} 95}
92 96
93function list (callback) {
94 this.find({ }, callback)
95}
96
97function remainingMilliSeconds () { 97function remainingMilliSeconds () {
98 if (timer === null) return -1 98 if (timer === null) return -1
99 99
@@ -122,7 +122,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
122 'Error sending secure request to %s pod.', 122 'Error sending secure request to %s pod.',
123 toPod.host, 123 toPod.host,
124 { 124 {
125 error: err || new Error('Status code not 20x : ' + res.statusCode) 125 error: err ? err.message : 'Status code not 20x : ' + res.statusCode
126 } 126 }
127 ) 127 )
128 128
@@ -136,10 +136,11 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
136// Make all the requests of the scheduler 136// Make all the requests of the scheduler
137function makeRequests () { 137function makeRequests () {
138 const self = this 138 const self = this
139 const RequestToPod = this.sequelize.models.RequestToPod
139 140
140 // We limit the size of the requests (REQUESTS_LIMIT) 141 // We limit the size of the requests
141 // We don't want to stuck with the same failing requests so we get a random list 142 // We don't want to stuck with the same failing requests so we get a random list
142 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { 143 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) {
143 if (err) { 144 if (err) {
144 logger.error('Cannot get the list of requests.', { err: err }) 145 logger.error('Cannot get the list of requests.', { err: err })
145 return // Abort 146 return // Abort
@@ -151,78 +152,77 @@ function makeRequests () {
151 return 152 return
152 } 153 }
153 154
154 logger.info('Making requests to friends.')
155
156 // We want to group requests by destinations pod and endpoint 155 // We want to group requests by destinations pod and endpoint
157 const requestsToMakeGrouped = {} 156 const requestsToMakeGrouped = {}
157 Object.keys(requests).forEach(function (toPodId) {
158 requests[toPodId].forEach(function (data) {
159 const request = data.request
160 const pod = data.pod
161 const hashKey = toPodId + request.endpoint
158 162
159 requests.forEach(function (poolRequest) {
160 poolRequest.to.forEach(function (toPodId) {
161 const hashKey = toPodId + poolRequest.endpoint
162 if (!requestsToMakeGrouped[hashKey]) { 163 if (!requestsToMakeGrouped[hashKey]) {
163 requestsToMakeGrouped[hashKey] = { 164 requestsToMakeGrouped[hashKey] = {
164 toPodId, 165 toPod: pod,
165 endpoint: poolRequest.endpoint, 166 endpoint: request.endpoint,
166 ids: [], // pool request ids, to delete them from the DB in the future 167 ids: [], // request ids, to delete them from the DB in the future
167 datas: [] // requests data, 168 datas: [] // requests data,
168 } 169 }
169 } 170 }
170 171
171 requestsToMakeGrouped[hashKey].ids.push(poolRequest._id) 172 requestsToMakeGrouped[hashKey].ids.push(request.id)
172 requestsToMakeGrouped[hashKey].datas.push(poolRequest.request) 173 requestsToMakeGrouped[hashKey].datas.push(request.request)
173 }) 174 })
174 }) 175 })
175 176
177 logger.info('Making requests to friends.')
178
176 const goodPods = [] 179 const goodPods = []
177 const badPods = [] 180 const badPods = []
178 181
179 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { 182 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
180 const requestToMake = requestsToMakeGrouped[hashKey] 183 const requestToMake = requestsToMakeGrouped[hashKey]
184 const toPod = requestToMake.toPod
181 185
182 // FIXME: mongodb request inside a loop :/ 186 // Maybe the pod is not our friend anymore so simply remove it
183 Pod.load(requestToMake.toPodId, function (err, toPod) { 187 if (!toPod) {
184 if (err) { 188 const requestIdsToDelete = requestToMake.ids
185 logger.error('Error finding pod by id.', { err: err })
186 return callbackEach()
187 }
188
189 // Maybe the pod is not our friend anymore so simply remove it
190 if (!toPod) {
191 const requestIdsToDelete = requestToMake.ids
192 189
193 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) 190 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
194 removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) 191 RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id)
195 return callbackEach() 192 return callbackEach()
196 } 193 }
197 194
198 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { 195 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
199 if (success === true) { 196 if (success === true) {
200 logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) 197 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
201 198
202 goodPods.push(requestToMake.toPodId) 199 goodPods.push(requestToMake.toPod.id)
203 200
204 // Remove the pod id of these request ids 201 // Remove the pod id of these request ids
205 removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach) 202 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
206 } else { 203 } else {
207 badPods.push(requestToMake.toPodId) 204 badPods.push(requestToMake.toPod.id)
208 callbackEach() 205 callbackEach()
209 } 206 }
210 })
211 }) 207 })
212 }, function () { 208 }, function () {
213 // All the requests were made, we update the pods score 209 // All the requests were made, we update the pods score
214 updatePodsScore(goodPods, badPods) 210 updatePodsScore.call(self, goodPods, badPods)
215 // Flush requests with no pod 211 // Flush requests with no pod
216 removeWithEmptyTo.call(self) 212 removeWithEmptyTo.call(self, function (err) {
213 if (err) logger.error('Error when removing requests with no pods.', { error: err })
214 })
217 }) 215 })
218 }) 216 })
219} 217}
220 218
221// Remove pods with a score of 0 (too many requests where they were unreachable) 219// Remove pods with a score of 0 (too many requests where they were unreachable)
222function removeBadPods () { 220function removeBadPods () {
221 const self = this
222
223 waterfall([ 223 waterfall([
224 function findBadPods (callback) { 224 function findBadPods (callback) {
225 Pod.listBadPods(function (err, pods) { 225 self.sequelize.models.Pod.listBadPods(function (err, pods) {
226 if (err) { 226 if (err) {
227 logger.error('Cannot find bad pods.', { error: err }) 227 logger.error('Cannot find bad pods.', { error: err })
228 return callback(err) 228 return callback(err)
@@ -233,10 +233,8 @@ function removeBadPods () {
233 }, 233 },
234 234
235 function removeTheseBadPods (pods, callback) { 235 function removeTheseBadPods (pods, callback) {
236 if (pods.length === 0) return callback(null, 0)
237
238 each(pods, function (pod, callbackEach) { 236 each(pods, function (pod, callbackEach) {
239 pod.remove(callbackEach) 237 pod.destroy().asCallback(callbackEach)
240 }, function (err) { 238 }, function (err) {
241 return callback(err, pods.length) 239 return callback(err, pods.length)
242 }) 240 })
@@ -253,43 +251,98 @@ function removeBadPods () {
253} 251}
254 252
255function updatePodsScore (goodPods, badPods) { 253function updatePodsScore (goodPods, badPods) {
254 const self = this
255 const Pod = this.sequelize.models.Pod
256
256 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) 257 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
257 258
258 Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { 259 if (goodPods.length !== 0) {
259 if (err) logger.error('Cannot increment scores of good pods.') 260 Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
260 }) 261 if (err) logger.error('Cannot increment scores of good pods.', { error: err })
262 })
263 }
261 264
262 Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { 265 if (badPods.length !== 0) {
263 if (err) logger.error('Cannot decrement scores of bad pods.') 266 Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
264 removeBadPods() 267 if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
265 }) 268 removeBadPods.call(self)
269 })
270 }
266} 271}
267 272
268function listWithLimitAndRandom (limit, callback) { 273function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
269 const self = this 274 const self = this
275 const Pod = this.sequelize.models.Pod
270 276
271 self.count(function (err, count) { 277 Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) {
272 if (err) return callback(err) 278 if (err) return callback(err)
273 279
274 let start = Math.floor(Math.random() * count) - limit 280 // We don't have friends that have requests
275 if (start < 0) start = 0 281 if (podIds.length === 0) return callback(null, [])
282
283 // The the first x requests of these pods
284 // It is very important to sort by id ASC to keep the requests order!
285 const query = {
286 order: [
287 [ 'id', 'ASC' ]
288 ],
289 include: [
290 {
291 model: self.sequelize.models.Pod,
292 where: {
293 id: {
294 $in: podIds
295 }
296 }
297 }
298 ]
299 }
300
301 self.findAll(query).asCallback(function (err, requests) {
302 if (err) return callback(err)
276 303
277 self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback) 304 const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod)
305 return callback(err, requestsGrouped)
306 })
278 }) 307 })
279} 308}
280 309
281function removeAll (callback) { 310function groupAndTruncateRequests (requests, limitRequestsPerPod) {
282 this.remove({ }, callback) 311 const requestsGrouped = {}
283}
284 312
285function removePodOf (requestsIds, podId, callback) { 313 requests.forEach(function (request) {
286 if (!callback) callback = function () {} 314 request.Pods.forEach(function (pod) {
315 if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
287 316
288 this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) 317 if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
318 requestsGrouped[pod.id].push({
319 request,
320 pod
321 })
322 }
323 })
324 })
325
326 return requestsGrouped
327}
328
329function removeAll (callback) {
330 // Delete all requests
331 this.truncate({ cascade: true }).asCallback(callback)
289} 332}
290 333
291function removeWithEmptyTo (callback) { 334function removeWithEmptyTo (callback) {
292 if (!callback) callback = function () {} 335 if (!callback) callback = function () {}
293 336
294 this.remove({ to: { $size: 0 } }, callback) 337 const query = {
338 where: {
339 id: {
340 $notIn: [
341 this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
342 ]
343 }
344 }
345 }
346
347 this.destroy(query).asCallback(callback)
295} 348}