diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-01-10 22:24:42 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-01-10 22:24:42 +0100 |
commit | bd14d16a29e2f90805d04b48378188517741a071 (patch) | |
tree | 226dc461367418321d4a77bb5004d1491681f7cf | |
parent | ed04d94f6d7132055f97a2f757b85c03c5f2a0b6 (diff) | |
download | PeerTube-bd14d16a29e2f90805d04b48378188517741a071.tar.gz PeerTube-bd14d16a29e2f90805d04b48378188517741a071.tar.zst PeerTube-bd14d16a29e2f90805d04b48378188517741a071.zip |
Server: improve requests scheduler
-rw-r--r-- | server/initializers/constants.js | 9 | ||||
-rw-r--r-- | server/lib/friends.js | 16 | ||||
-rw-r--r-- | server/models/pod.js | 37 | ||||
-rw-r--r-- | server/models/request.js | 110 |
4 files changed, 125 insertions, 47 deletions
diff --git a/server/initializers/constants.js b/server/initializers/constants.js index a6adb75bf..97e3c5296 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js | |||
@@ -108,8 +108,10 @@ let REQUESTS_INTERVAL = 600000 | |||
108 | // Number of requests in parallel we can make | 108 | // Number of requests in parallel we can make |
109 | const REQUESTS_IN_PARALLEL = 10 | 109 | const REQUESTS_IN_PARALLEL = 10 |
110 | 110 | ||
111 | // How many requests we put in request | 111 | // To how many pods we send requests |
112 | const REQUESTS_LIMIT = 10 | 112 | const REQUESTS_LIMIT_PODS = 10 |
113 | // How many requests we send to a pod per interval | ||
114 | const REQUESTS_LIMIT_PER_POD = 5 | ||
113 | 115 | ||
114 | // Number of requests to retry for replay requests module | 116 | // Number of requests to retry for replay requests module |
115 | const RETRY_REQUESTS = 5 | 117 | const RETRY_REQUESTS = 5 |
@@ -184,7 +186,8 @@ module.exports = { | |||
184 | REQUEST_ENDPOINTS, | 186 | REQUEST_ENDPOINTS, |
185 | REQUESTS_IN_PARALLEL, | 187 | REQUESTS_IN_PARALLEL, |
186 | REQUESTS_INTERVAL, | 188 | REQUESTS_INTERVAL, |
187 | REQUESTS_LIMIT, | 189 | REQUESTS_LIMIT_PODS, |
190 | REQUESTS_LIMIT_PER_POD, | ||
188 | RETRY_REQUESTS, | 191 | RETRY_REQUESTS, |
189 | SEARCHABLE_COLUMNS, | 192 | SEARCHABLE_COLUMNS, |
190 | SIGNATURE_ALGORITHM, | 193 | SIGNATURE_ALGORITHM, |
diff --git a/server/lib/friends.js b/server/lib/friends.js index 3d3d0fdee..f0575ff2f 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -54,7 +54,13 @@ function removeVideoToFriends (videoParams) { | |||
54 | } | 54 | } |
55 | 55 | ||
56 | function reportAbuseVideoToFriend (reportData, video) { | 56 | function reportAbuseVideoToFriend (reportData, video) { |
57 | createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ]) | 57 | const options = { |
58 | type: 'report-abuse', | ||
59 | endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, | ||
60 | data: reportData, | ||
61 | toIds: [ video.Author.podId ] | ||
62 | } | ||
63 | createRequest(options) | ||
58 | } | 64 | } |
59 | 65 | ||
60 | function hasFriends (callback) { | 66 | function hasFriends (callback) { |
@@ -161,7 +167,13 @@ function sendOwnedVideosToPod (podId) { | |||
161 | return | 167 | return |
162 | } | 168 | } |
163 | 169 | ||
164 | createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ]) | 170 | const options = { |
171 | type: 'add', | ||
172 | endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, | ||
173 | data: remoteVideo, | ||
174 | toIds: [ podId ] | ||
175 | } | ||
176 | createRequest(options) | ||
165 | }) | 177 | }) |
166 | }) | 178 | }) |
167 | }) | 179 | }) |
diff --git a/server/models/pod.js b/server/models/pod.js index 8e7dd1fd8..b3c6db8e8 100644 --- a/server/models/pod.js +++ b/server/models/pod.js | |||
@@ -50,6 +50,7 @@ module.exports = function (sequelize, DataTypes) { | |||
50 | incrementScores, | 50 | incrementScores, |
51 | list, | 51 | list, |
52 | listAllIds, | 52 | listAllIds, |
53 | listRandomPodIdsWithRequest, | ||
53 | listBadPods, | 54 | listBadPods, |
54 | load, | 55 | load, |
55 | loadByHost, | 56 | loadByHost, |
@@ -134,6 +135,42 @@ function listAllIds (transaction, callback) { | |||
134 | }) | 135 | }) |
135 | } | 136 | } |
136 | 137 | ||
138 | function listRandomPodIdsWithRequest (limit, callback) { | ||
139 | const self = this | ||
140 | |||
141 | self.count().asCallback(function (err, count) { | ||
142 | if (err) return callback(err) | ||
143 | |||
144 | // Optimization... | ||
145 | if (count === 0) return callback(null, []) | ||
146 | |||
147 | let start = Math.floor(Math.random() * count) - limit | ||
148 | if (start < 0) start = 0 | ||
149 | |||
150 | const query = { | ||
151 | attributes: [ 'id' ], | ||
152 | order: [ | ||
153 | [ 'id', 'ASC' ] | ||
154 | ], | ||
155 | offset: start, | ||
156 | limit: limit, | ||
157 | where: { | ||
158 | id: { | ||
159 | $in: [ | ||
160 | this.sequelize.literal('SELECT "podId" FROM "RequestToPods"') | ||
161 | ] | ||
162 | } | ||
163 | } | ||
164 | } | ||
165 | |||
166 | return this.findAll(query).asCallback(function (err, pods) { | ||
167 | if (err) return callback(err) | ||
168 | |||
169 | return callback(null, map(pods, 'id')) | ||
170 | }) | ||
171 | }) | ||
172 | } | ||
173 | |||
137 | function listBadPods (callback) { | 174 | function listBadPods (callback) { |
138 | const query = { | 175 | const query = { |
139 | where: { | 176 | where: { |
diff --git a/server/models/request.js b/server/models/request.js index 1d6038044..26953e5f5 100644 --- a/server/models/request.js +++ b/server/models/request.js | |||
@@ -138,9 +138,9 @@ function makeRequests () { | |||
138 | const self = this | 138 | const self = this |
139 | const RequestToPod = this.sequelize.models.RequestToPod | 139 | const RequestToPod = this.sequelize.models.RequestToPod |
140 | 140 | ||
141 | // We limit the size of the requests (REQUESTS_LIMIT) | 141 | // We limit the size of the requests |
142 | // We don't want to stuck with the same failing requests so we get a random list | 142 | // We don't want to stuck with the same failing requests so we get a random list |
143 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { | 143 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { |
144 | if (err) { | 144 | if (err) { |
145 | logger.error('Cannot get the list of requests.', { err: err }) | 145 | logger.error('Cannot get the list of requests.', { err: err }) |
146 | return // Abort | 146 | return // Abort |
@@ -156,13 +156,15 @@ function makeRequests () { | |||
156 | 156 | ||
157 | // We want to group requests by destinations pod and endpoint | 157 | // We want to group requests by destinations pod and endpoint |
158 | const requestsToMakeGrouped = {} | 158 | const requestsToMakeGrouped = {} |
159 | Object.keys(requests).forEach(function (toPodId) { | ||
160 | requests[toPodId].forEach(function (data) { | ||
161 | const request = data.request | ||
162 | const pod = data.pod | ||
163 | const hashKey = toPodId + request.endpoint | ||
159 | 164 | ||
160 | requests.forEach(function (request) { | ||
161 | request.Pods.forEach(function (toPod) { | ||
162 | const hashKey = toPod.id + request.endpoint | ||
163 | if (!requestsToMakeGrouped[hashKey]) { | 165 | if (!requestsToMakeGrouped[hashKey]) { |
164 | requestsToMakeGrouped[hashKey] = { | 166 | requestsToMakeGrouped[hashKey] = { |
165 | toPodId: toPod.id, | 167 | toPod: pod, |
166 | endpoint: request.endpoint, | 168 | endpoint: request.endpoint, |
167 | ids: [], // request ids, to delete them from the DB in the future | 169 | ids: [], // request ids, to delete them from the DB in the future |
168 | datas: [] // requests data, | 170 | datas: [] // requests data, |
@@ -179,36 +181,29 @@ function makeRequests () { | |||
179 | 181 | ||
180 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { | 182 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { |
181 | const requestToMake = requestsToMakeGrouped[hashKey] | 183 | const requestToMake = requestsToMakeGrouped[hashKey] |
184 | const toPod = requestToMake.toPod | ||
182 | 185 | ||
183 | // FIXME: SQL request inside a loop :/ | 186 | // Maybe the pod is not our friend anymore so simply remove it |
184 | self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) { | 187 | if (!toPod) { |
185 | if (err) { | 188 | const requestIdsToDelete = requestToMake.ids |
186 | logger.error('Error finding pod by id.', { err: err }) | ||
187 | return callbackEach() | ||
188 | } | ||
189 | |||
190 | // Maybe the pod is not our friend anymore so simply remove it | ||
191 | if (!toPod) { | ||
192 | const requestIdsToDelete = requestToMake.ids | ||
193 | 189 | ||
194 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) | 190 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) |
195 | RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) | 191 | RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id) |
196 | return callbackEach() | 192 | return callbackEach() |
197 | } | 193 | } |
198 | 194 | ||
199 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { | 195 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { |
200 | if (success === true) { | 196 | if (success === true) { |
201 | logger.debug('Removing requests for pod %s.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) | 197 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
202 | 198 | ||
203 | goodPods.push(requestToMake.toPodId) | 199 | goodPods.push(requestToMake.toPod.id) |
204 | 200 | ||
205 | // Remove the pod id of these request ids | 201 | // Remove the pod id of these request ids |
206 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach) | 202 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) |
207 | } else { | 203 | } else { |
208 | badPods.push(requestToMake.toPodId) | 204 | badPods.push(requestToMake.toPod.id) |
209 | callbackEach() | 205 | callbackEach() |
210 | } | 206 | } |
211 | }) | ||
212 | }) | 207 | }) |
213 | }, function () { | 208 | }, function () { |
214 | // All the requests were made, we update the pods score | 209 | // All the requests were made, we update the pods score |
@@ -275,29 +270,60 @@ function updatePodsScore (goodPods, badPods) { | |||
275 | } | 270 | } |
276 | } | 271 | } |
277 | 272 | ||
278 | function listWithLimitAndRandom (limit, callback) { | 273 | function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { |
279 | const self = this | 274 | const self = this |
275 | const Pod = this.sequelize.models.Pod | ||
280 | 276 | ||
281 | self.count().asCallback(function (err, count) { | 277 | Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) { |
282 | if (err) return callback(err) | 278 | if (err) return callback(err) |
283 | 279 | ||
284 | // Optimization... | 280 | // We don't have friends that have requests |
285 | if (count === 0) return callback(null, []) | 281 | if (podIds.length === 0) return callback(null, []) |
286 | |||
287 | let start = Math.floor(Math.random() * count) - limit | ||
288 | if (start < 0) start = 0 | ||
289 | 282 | ||
283 | // The the first x requests of these pods | ||
284 | // It is very important to sort by id ASC to keep the requests order! | ||
290 | const query = { | 285 | const query = { |
291 | order: [ | 286 | order: [ |
292 | [ 'id', 'ASC' ] | 287 | [ 'id', 'ASC' ] |
293 | ], | 288 | ], |
294 | // offset: start, | 289 | include: [ |
295 | // limit: limit, | 290 | { |
296 | include: [ this.sequelize.models.Pod ] | 291 | model: self.sequelize.models.Pod, |
292 | where: { | ||
293 | id: { | ||
294 | $in: podIds | ||
295 | } | ||
296 | } | ||
297 | } | ||
298 | ] | ||
297 | } | 299 | } |
298 | 300 | ||
299 | self.findAll(query).asCallback(callback) | 301 | self.findAll(query).asCallback(function (err, requests) { |
302 | if (err) return callback(err) | ||
303 | |||
304 | const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod) | ||
305 | return callback(err, requestsGrouped) | ||
306 | }) | ||
307 | }) | ||
308 | } | ||
309 | |||
310 | function groupAndTruncateRequests (requests, limitRequestsPerPod) { | ||
311 | const requestsGrouped = {} | ||
312 | |||
313 | requests.forEach(function (request) { | ||
314 | request.Pods.forEach(function (pod) { | ||
315 | if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] | ||
316 | |||
317 | if (requestsGrouped[pod.id].length < limitRequestsPerPod) { | ||
318 | requestsGrouped[pod.id].push({ | ||
319 | request, | ||
320 | pod | ||
321 | }) | ||
322 | } | ||
323 | }) | ||
300 | }) | 324 | }) |
325 | |||
326 | return requestsGrouped | ||
301 | } | 327 | } |
302 | 328 | ||
303 | function removeAll (callback) { | 329 | function removeAll (callback) { |