diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-02-18 10:29:36 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-02-18 10:29:36 +0100 |
commit | c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de (patch) | |
tree | 5d721432dcc25f43ebd72b12f675d4e208faf616 /server/models | |
parent | 361b7df2a257d1c44ec1d79128a9065b563090d8 (diff) | |
download | PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.gz PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.zst PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.zip |
Server: use a request scheduler object instance for friends
communication
Diffstat (limited to 'server/models')
-rw-r--r-- | server/models/request.js | 208 |
1 files changed, 27 insertions, 181 deletions
diff --git a/server/models/request.js b/server/models/request.js index baa26fc1b..ca616d130 100644 --- a/server/models/request.js +++ b/server/models/request.js | |||
@@ -1,16 +1,11 @@ | |||
1 | 'use strict' | 1 | 'use strict' |
2 | 2 | ||
3 | const each = require('async/each') | 3 | const each = require('async/each') |
4 | const eachLimit = require('async/eachLimit') | ||
5 | const waterfall = require('async/waterfall') | 4 | const waterfall = require('async/waterfall') |
6 | const values = require('lodash/values') | 5 | const values = require('lodash/values') |
7 | 6 | ||
8 | const constants = require('../initializers/constants') | 7 | const constants = require('../initializers/constants') |
9 | const logger = require('../helpers/logger') | 8 | const logger = require('../helpers/logger') |
10 | const requests = require('../helpers/requests') | ||
11 | |||
12 | let timer = null | ||
13 | let lastRequestTimestamp = 0 | ||
14 | 9 | ||
15 | // --------------------------------------------------------------------------- | 10 | // --------------------------------------------------------------------------- |
16 | 11 | ||
@@ -30,12 +25,13 @@ module.exports = function (sequelize, DataTypes) { | |||
30 | classMethods: { | 25 | classMethods: { |
31 | associate, | 26 | associate, |
32 | 27 | ||
33 | activate, | 28 | listWithLimitAndRandom, |
29 | |||
34 | countTotalRequests, | 30 | countTotalRequests, |
35 | deactivate, | 31 | removeBadPods, |
36 | flush, | 32 | updatePodsScore, |
37 | forceSend, | 33 | removeAll, |
38 | remainingMilliSeconds | 34 | removeWithEmptyTo |
39 | } | 35 | } |
40 | } | 36 | } |
41 | ) | 37 | ) |
@@ -56,17 +52,6 @@ function associate (models) { | |||
56 | }) | 52 | }) |
57 | } | 53 | } |
58 | 54 | ||
59 | function activate () { | ||
60 | logger.info('Requests scheduler activated.') | ||
61 | lastRequestTimestamp = Date.now() | ||
62 | |||
63 | const self = this | ||
64 | timer = setInterval(function () { | ||
65 | lastRequestTimestamp = Date.now() | ||
66 | makeRequests.call(self) | ||
67 | }, constants.REQUESTS_INTERVAL) | ||
68 | } | ||
69 | |||
70 | function countTotalRequests (callback) { | 55 | function countTotalRequests (callback) { |
71 | const query = { | 56 | const query = { |
72 | include: [ this.sequelize.models.Pod ] | 57 | include: [ this.sequelize.models.Pod ] |
@@ -75,147 +60,6 @@ function countTotalRequests (callback) { | |||
75 | return this.count(query).asCallback(callback) | 60 | return this.count(query).asCallback(callback) |
76 | } | 61 | } |
77 | 62 | ||
78 | function deactivate () { | ||
79 | logger.info('Requests scheduler deactivated.') | ||
80 | clearInterval(timer) | ||
81 | timer = null | ||
82 | } | ||
83 | |||
84 | function flush (callback) { | ||
85 | removeAll.call(this, function (err) { | ||
86 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
87 | |||
88 | return callback(err) | ||
89 | }) | ||
90 | } | ||
91 | |||
92 | function forceSend () { | ||
93 | logger.info('Force requests scheduler sending.') | ||
94 | makeRequests.call(this) | ||
95 | } | ||
96 | |||
97 | function remainingMilliSeconds () { | ||
98 | if (timer === null) return -1 | ||
99 | |||
100 | return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp) | ||
101 | } | ||
102 | |||
103 | // --------------------------------------------------------------------------- | ||
104 | |||
105 | // Make a requests to friends of a certain type | ||
106 | function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
107 | if (!callback) callback = function () {} | ||
108 | |||
109 | const params = { | ||
110 | toPod: toPod, | ||
111 | sign: true, // Prove our identity | ||
112 | method: 'POST', | ||
113 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
114 | data: requestsToMake // Requests we need to make | ||
115 | } | ||
116 | |||
117 | // Make multiple retry requests to all of pods | ||
118 | // The function fire some useful callbacks | ||
119 | requests.makeSecureRequest(params, function (err, res) { | ||
120 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
121 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
122 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
123 | |||
124 | return callback(false) | ||
125 | } | ||
126 | |||
127 | return callback(true) | ||
128 | }) | ||
129 | } | ||
130 | |||
131 | // Make all the requests of the scheduler | ||
132 | function makeRequests () { | ||
133 | const self = this | ||
134 | const RequestToPod = this.sequelize.models.RequestToPod | ||
135 | |||
136 | // We limit the size of the requests | ||
137 | // We don't want to stuck with the same failing requests so we get a random list | ||
138 | listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { | ||
139 | if (err) { | ||
140 | logger.error('Cannot get the list of requests.', { err: err }) | ||
141 | return // Abort | ||
142 | } | ||
143 | |||
144 | // If there are no requests, abort | ||
145 | if (requests.length === 0) { | ||
146 | logger.info('No requests to make.') | ||
147 | return | ||
148 | } | ||
149 | |||
150 | // We want to group requests by destinations pod and endpoint | ||
151 | const requestsToMakeGrouped = buildRequestObjects(requests) | ||
152 | |||
153 | logger.info('Making requests to friends.') | ||
154 | |||
155 | const goodPods = [] | ||
156 | const badPods = [] | ||
157 | |||
158 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { | ||
159 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
160 | const toPod = requestToMake.toPod | ||
161 | |||
162 | // Maybe the pod is not our friend anymore so simply remove it | ||
163 | if (!toPod) { | ||
164 | const requestIdsToDelete = requestToMake.ids | ||
165 | |||
166 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
167 | return RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
168 | } | ||
169 | |||
170 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { | ||
171 | if (success === false) { | ||
172 | badPods.push(requestToMake.toPod.id) | ||
173 | return callbackEach() | ||
174 | } | ||
175 | |||
176 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
177 | goodPods.push(requestToMake.toPod.id) | ||
178 | |||
179 | // Remove the pod id of these request ids | ||
180 | RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
181 | }) | ||
182 | }, function () { | ||
183 | // All the requests were made, we update the pods score | ||
184 | updatePodsScore.call(self, goodPods, badPods) | ||
185 | // Flush requests with no pod | ||
186 | removeWithEmptyTo.call(self, function (err) { | ||
187 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
188 | }) | ||
189 | }) | ||
190 | }) | ||
191 | } | ||
192 | |||
193 | function buildRequestObjects (requests) { | ||
194 | const requestsToMakeGrouped = {} | ||
195 | |||
196 | Object.keys(requests).forEach(function (toPodId) { | ||
197 | requests[toPodId].forEach(function (data) { | ||
198 | const request = data.request | ||
199 | const pod = data.pod | ||
200 | const hashKey = toPodId + request.endpoint | ||
201 | |||
202 | if (!requestsToMakeGrouped[hashKey]) { | ||
203 | requestsToMakeGrouped[hashKey] = { | ||
204 | toPod: pod, | ||
205 | endpoint: request.endpoint, | ||
206 | ids: [], // request ids, to delete them from the DB in the future | ||
207 | datas: [] // requests data, | ||
208 | } | ||
209 | } | ||
210 | |||
211 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
212 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
213 | }) | ||
214 | }) | ||
215 | |||
216 | return requestsToMakeGrouped | ||
217 | } | ||
218 | |||
219 | // Remove pods with a score of 0 (too many requests where they were unreachable) | 63 | // Remove pods with a score of 0 (too many requests where they were unreachable) |
220 | function removeBadPods () { | 64 | function removeBadPods () { |
221 | const self = this | 65 | const self = this |
@@ -307,25 +151,6 @@ function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { | |||
307 | }) | 151 | }) |
308 | } | 152 | } |
309 | 153 | ||
310 | function groupAndTruncateRequests (requests, limitRequestsPerPod) { | ||
311 | const requestsGrouped = {} | ||
312 | |||
313 | requests.forEach(function (request) { | ||
314 | request.Pods.forEach(function (pod) { | ||
315 | if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] | ||
316 | |||
317 | if (requestsGrouped[pod.id].length < limitRequestsPerPod) { | ||
318 | requestsGrouped[pod.id].push({ | ||
319 | request, | ||
320 | pod | ||
321 | }) | ||
322 | } | ||
323 | }) | ||
324 | }) | ||
325 | |||
326 | return requestsGrouped | ||
327 | } | ||
328 | |||
329 | function removeAll (callback) { | 154 | function removeAll (callback) { |
330 | // Delete all requests | 155 | // Delete all requests |
331 | this.truncate({ cascade: true }).asCallback(callback) | 156 | this.truncate({ cascade: true }).asCallback(callback) |
@@ -346,3 +171,24 @@ function removeWithEmptyTo (callback) { | |||
346 | 171 | ||
347 | this.destroy(query).asCallback(callback) | 172 | this.destroy(query).asCallback(callback) |
348 | } | 173 | } |
174 | |||
175 | // --------------------------------------------------------------------------- | ||
176 | |||
177 | function groupAndTruncateRequests (requests, limitRequestsPerPod) { | ||
178 | const requestsGrouped = {} | ||
179 | |||
180 | requests.forEach(function (request) { | ||
181 | request.Pods.forEach(function (pod) { | ||
182 | if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] | ||
183 | |||
184 | if (requestsGrouped[pod.id].length < limitRequestsPerPod) { | ||
185 | requestsGrouped[pod.id].push({ | ||
186 | request, | ||
187 | pod | ||
188 | }) | ||
189 | } | ||
190 | }) | ||
191 | }) | ||
192 | |||
193 | return requestsGrouped | ||
194 | } | ||