aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-01-10 22:24:42 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-01-10 22:24:42 +0100
commitbd14d16a29e2f90805d04b48378188517741a071 (patch)
tree226dc461367418321d4a77bb5004d1491681f7cf
parented04d94f6d7132055f97a2f757b85c03c5f2a0b6 (diff)
downloadPeerTube-bd14d16a29e2f90805d04b48378188517741a071.tar.gz
PeerTube-bd14d16a29e2f90805d04b48378188517741a071.tar.zst
PeerTube-bd14d16a29e2f90805d04b48378188517741a071.zip
Server: improve requests scheduler
-rw-r--r--server/initializers/constants.js9
-rw-r--r--server/lib/friends.js16
-rw-r--r--server/models/pod.js37
-rw-r--r--server/models/request.js110
4 files changed, 125 insertions, 47 deletions
diff --git a/server/initializers/constants.js b/server/initializers/constants.js
index a6adb75bf..97e3c5296 100644
--- a/server/initializers/constants.js
+++ b/server/initializers/constants.js
@@ -108,8 +108,10 @@ let REQUESTS_INTERVAL = 600000
108// Number of requests in parallel we can make 108// Number of requests in parallel we can make
109const REQUESTS_IN_PARALLEL = 10 109const REQUESTS_IN_PARALLEL = 10
110 110
111// How many requests we put in request 111// To how many pods we send requests
112const REQUESTS_LIMIT = 10 112const REQUESTS_LIMIT_PODS = 10
113// How many requests we send to a pod per interval
114const REQUESTS_LIMIT_PER_POD = 5
113 115
114// Number of requests to retry for replay requests module 116// Number of requests to retry for replay requests module
115const RETRY_REQUESTS = 5 117const RETRY_REQUESTS = 5
@@ -184,7 +186,8 @@ module.exports = {
184 REQUEST_ENDPOINTS, 186 REQUEST_ENDPOINTS,
185 REQUESTS_IN_PARALLEL, 187 REQUESTS_IN_PARALLEL,
186 REQUESTS_INTERVAL, 188 REQUESTS_INTERVAL,
187 REQUESTS_LIMIT, 189 REQUESTS_LIMIT_PODS,
190 REQUESTS_LIMIT_PER_POD,
188 RETRY_REQUESTS, 191 RETRY_REQUESTS,
189 SEARCHABLE_COLUMNS, 192 SEARCHABLE_COLUMNS,
190 SIGNATURE_ALGORITHM, 193 SIGNATURE_ALGORITHM,
diff --git a/server/lib/friends.js b/server/lib/friends.js
index 3d3d0fdee..f0575ff2f 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -54,7 +54,13 @@ function removeVideoToFriends (videoParams) {
54} 54}
55 55
56function reportAbuseVideoToFriend (reportData, video) { 56function reportAbuseVideoToFriend (reportData, video) {
57 createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ]) 57 const options = {
58 type: 'report-abuse',
59 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
60 data: reportData,
61 toIds: [ video.Author.podId ]
62 }
63 createRequest(options)
58} 64}
59 65
60function hasFriends (callback) { 66function hasFriends (callback) {
@@ -161,7 +167,13 @@ function sendOwnedVideosToPod (podId) {
161 return 167 return
162 } 168 }
163 169
164 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ]) 170 const options = {
171 type: 'add',
172 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
173 data: remoteVideo,
174 toIds: [ podId ]
175 }
176 createRequest(options)
165 }) 177 })
166 }) 178 })
167 }) 179 })
diff --git a/server/models/pod.js b/server/models/pod.js
index 8e7dd1fd8..b3c6db8e8 100644
--- a/server/models/pod.js
+++ b/server/models/pod.js
@@ -50,6 +50,7 @@ module.exports = function (sequelize, DataTypes) {
50 incrementScores, 50 incrementScores,
51 list, 51 list,
52 listAllIds, 52 listAllIds,
53 listRandomPodIdsWithRequest,
53 listBadPods, 54 listBadPods,
54 load, 55 load,
55 loadByHost, 56 loadByHost,
@@ -134,6 +135,42 @@ function listAllIds (transaction, callback) {
134 }) 135 })
135} 136}
136 137
138function listRandomPodIdsWithRequest (limit, callback) {
139 const self = this
140
141 self.count().asCallback(function (err, count) {
142 if (err) return callback(err)
143
144 // Optimization...
145 if (count === 0) return callback(null, [])
146
147 let start = Math.floor(Math.random() * count) - limit
148 if (start < 0) start = 0
149
150 const query = {
151 attributes: [ 'id' ],
152 order: [
153 [ 'id', 'ASC' ]
154 ],
155 offset: start,
156 limit: limit,
157 where: {
158 id: {
159 $in: [
160 this.sequelize.literal('SELECT "podId" FROM "RequestToPods"')
161 ]
162 }
163 }
164 }
165
166 return this.findAll(query).asCallback(function (err, pods) {
167 if (err) return callback(err)
168
169 return callback(null, map(pods, 'id'))
170 })
171 })
172}
173
137function listBadPods (callback) { 174function listBadPods (callback) {
138 const query = { 175 const query = {
139 where: { 176 where: {
diff --git a/server/models/request.js b/server/models/request.js
index 1d6038044..26953e5f5 100644
--- a/server/models/request.js
+++ b/server/models/request.js
@@ -138,9 +138,9 @@ function makeRequests () {
138 const self = this 138 const self = this
139 const RequestToPod = this.sequelize.models.RequestToPod 139 const RequestToPod = this.sequelize.models.RequestToPod
140 140
141 // We limit the size of the requests (REQUESTS_LIMIT) 141 // We limit the size of the requests
142 // We don't want to stuck with the same failing requests so we get a random list 142 // We don't want to stuck with the same failing requests so we get a random list
143 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) { 143 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) {
144 if (err) { 144 if (err) {
145 logger.error('Cannot get the list of requests.', { err: err }) 145 logger.error('Cannot get the list of requests.', { err: err })
146 return // Abort 146 return // Abort
@@ -156,13 +156,15 @@ function makeRequests () {
156 156
157 // We want to group requests by destinations pod and endpoint 157 // We want to group requests by destinations pod and endpoint
158 const requestsToMakeGrouped = {} 158 const requestsToMakeGrouped = {}
159 Object.keys(requests).forEach(function (toPodId) {
160 requests[toPodId].forEach(function (data) {
161 const request = data.request
162 const pod = data.pod
163 const hashKey = toPodId + request.endpoint
159 164
160 requests.forEach(function (request) {
161 request.Pods.forEach(function (toPod) {
162 const hashKey = toPod.id + request.endpoint
163 if (!requestsToMakeGrouped[hashKey]) { 165 if (!requestsToMakeGrouped[hashKey]) {
164 requestsToMakeGrouped[hashKey] = { 166 requestsToMakeGrouped[hashKey] = {
165 toPodId: toPod.id, 167 toPod: pod,
166 endpoint: request.endpoint, 168 endpoint: request.endpoint,
167 ids: [], // request ids, to delete them from the DB in the future 169 ids: [], // request ids, to delete them from the DB in the future
168 datas: [] // requests data, 170 datas: [] // requests data,
@@ -179,36 +181,29 @@ function makeRequests () {
179 181
180 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { 182 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
181 const requestToMake = requestsToMakeGrouped[hashKey] 183 const requestToMake = requestsToMakeGrouped[hashKey]
184 const toPod = requestToMake.toPod
182 185
183 // FIXME: SQL request inside a loop :/ 186 // Maybe the pod is not our friend anymore so simply remove it
184 self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) { 187 if (!toPod) {
185 if (err) { 188 const requestIdsToDelete = requestToMake.ids
186 logger.error('Error finding pod by id.', { err: err })
187 return callbackEach()
188 }
189
190 // Maybe the pod is not our friend anymore so simply remove it
191 if (!toPod) {
192 const requestIdsToDelete = requestToMake.ids
193 189
194 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) 190 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
195 RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) 191 RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id)
196 return callbackEach() 192 return callbackEach()
197 } 193 }
198 194
199 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { 195 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
200 if (success === true) { 196 if (success === true) {
201 logger.debug('Removing requests for pod %s.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) 197 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
202 198
203 goodPods.push(requestToMake.toPodId) 199 goodPods.push(requestToMake.toPod.id)
204 200
205 // Remove the pod id of these request ids 201 // Remove the pod id of these request ids
206 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach) 202 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
207 } else { 203 } else {
208 badPods.push(requestToMake.toPodId) 204 badPods.push(requestToMake.toPod.id)
209 callbackEach() 205 callbackEach()
210 } 206 }
211 })
212 }) 207 })
213 }, function () { 208 }, function () {
214 // All the requests were made, we update the pods score 209 // All the requests were made, we update the pods score
@@ -275,29 +270,60 @@ function updatePodsScore (goodPods, badPods) {
275 } 270 }
276} 271}
277 272
278function listWithLimitAndRandom (limit, callback) { 273function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
279 const self = this 274 const self = this
275 const Pod = this.sequelize.models.Pod
280 276
281 self.count().asCallback(function (err, count) { 277 Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) {
282 if (err) return callback(err) 278 if (err) return callback(err)
283 279
284 // Optimization... 280 // We don't have friends that have requests
285 if (count === 0) return callback(null, []) 281 if (podIds.length === 0) return callback(null, [])
286
287 let start = Math.floor(Math.random() * count) - limit
288 if (start < 0) start = 0
289 282
283 // The the first x requests of these pods
284 // It is very important to sort by id ASC to keep the requests order!
290 const query = { 285 const query = {
291 order: [ 286 order: [
292 [ 'id', 'ASC' ] 287 [ 'id', 'ASC' ]
293 ], 288 ],
294 // offset: start, 289 include: [
295 // limit: limit, 290 {
296 include: [ this.sequelize.models.Pod ] 291 model: self.sequelize.models.Pod,
292 where: {
293 id: {
294 $in: podIds
295 }
296 }
297 }
298 ]
297 } 299 }
298 300
299 self.findAll(query).asCallback(callback) 301 self.findAll(query).asCallback(function (err, requests) {
302 if (err) return callback(err)
303
304 const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod)
305 return callback(err, requestsGrouped)
306 })
307 })
308}
309
310function groupAndTruncateRequests (requests, limitRequestsPerPod) {
311 const requestsGrouped = {}
312
313 requests.forEach(function (request) {
314 request.Pods.forEach(function (pod) {
315 if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
316
317 if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
318 requestsGrouped[pod.id].push({
319 request,
320 pod
321 })
322 }
323 })
300 }) 324 })
325
326 return requestsGrouped
301} 327}
302 328
303function removeAll (callback) { 329function removeAll (callback) {