diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-02-18 10:29:36 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-02-18 10:29:36 +0100 |
commit | c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de (patch) | |
tree | 5d721432dcc25f43ebd72b12f675d4e208faf616 /server/lib | |
parent | 361b7df2a257d1c44ec1d79128a9065b563090d8 (diff) | |
download | PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.gz PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.zst PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.zip |
Server: use a request scheduler object instance for friends
communication
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/friends.js | 59 | ||||
-rw-r--r-- | server/lib/request-scheduler.js | 202 |
2 files changed, 217 insertions, 44 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js index 9b38693c7..7dfa62a2a 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -11,10 +11,13 @@ const db = require('../initializers/database') | |||
11 | const logger = require('../helpers/logger') | 11 | const logger = require('../helpers/logger') |
12 | const peertubeCrypto = require('../helpers/peertube-crypto') | 12 | const peertubeCrypto = require('../helpers/peertube-crypto') |
13 | const requests = require('../helpers/requests') | 13 | const requests = require('../helpers/requests') |
14 | const RequestScheduler = require('./request-scheduler') | ||
14 | 15 | ||
15 | const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] | 16 | const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] |
17 | const requestScheduler = new RequestScheduler('') | ||
16 | 18 | ||
17 | const friends = { | 19 | const friends = { |
20 | activate, | ||
18 | addVideoToFriends, | 21 | addVideoToFriends, |
19 | updateVideoToFriends, | 22 | updateVideoToFriends, |
20 | reportAbuseVideoToFriend, | 23 | reportAbuseVideoToFriend, |
@@ -25,6 +28,10 @@ const friends = { | |||
25 | sendOwnedVideosToPod | 28 | sendOwnedVideosToPod |
26 | } | 29 | } |
27 | 30 | ||
31 | function activate () { | ||
32 | requestScheduler.activate() | ||
33 | } | ||
34 | |||
28 | function addVideoToFriends (videoData, transaction, callback) { | 35 | function addVideoToFriends (videoData, transaction, callback) { |
29 | const options = { | 36 | const options = { |
30 | type: ENDPOINT_ACTIONS.ADD, | 37 | type: ENDPOINT_ACTIONS.ADD, |
@@ -99,11 +106,11 @@ function makeFriends (hosts, callback) { | |||
99 | 106 | ||
100 | function quitFriends (callback) { | 107 | function quitFriends (callback) { |
101 | // Stop pool requests | 108 | // Stop pool requests |
102 | db.Request.deactivate() | 109 | requestScheduler.deactivate() |
103 | 110 | ||
104 | waterfall([ | 111 | waterfall([ |
105 | function flushRequests (callbackAsync) { | 112 | function flushRequests (callbackAsync) { |
106 | db.Request.flush(callbackAsync) | 113 | requestScheduler.flush(callbackAsync) |
107 | }, | 114 | }, |
108 | 115 | ||
109 | function getPodsList (callbackAsync) { | 116 | function getPodsList (callbackAsync) { |
@@ -140,7 +147,7 @@ function quitFriends (callback) { | |||
140 | } | 147 | } |
141 | ], function (err) { | 148 | ], function (err) { |
142 | // Don't forget to re activate the scheduler, even if there was an error | 149 | // Don't forget to re activate the scheduler, even if there was an error |
143 | db.Request.activate() | 150 | requestScheduler.activate() |
144 | 151 | ||
145 | if (err) return callback(err) | 152 | if (err) return callback(err) |
146 | 153 | ||
@@ -235,9 +242,9 @@ function getForeignPodsList (host, callback) { | |||
235 | 242 | ||
236 | function makeRequestsToWinningPods (cert, podsList, callback) { | 243 | function makeRequestsToWinningPods (cert, podsList, callback) { |
237 | // Stop pool requests | 244 | // Stop pool requests |
238 | db.Request.deactivate() | 245 | requestScheduler.deactivate() |
239 | // Flush pool requests | 246 | // Flush pool requests |
240 | db.Request.forceSend() | 247 | requestScheduler.forceSend() |
241 | 248 | ||
242 | eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { | 249 | eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { |
243 | const params = { | 250 | const params = { |
@@ -278,7 +285,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
278 | }, function endRequests () { | 285 | }, function endRequests () { |
279 | // Final callback, we've ended all the requests | 286 | // Final callback, we've ended all the requests |
280 | // Now we made new friends, we can re activate the pool of requests | 287 | // Now we made new friends, we can re activate the pool of requests |
281 | db.Request.activate() | 288 | requestScheduler.activate() |
282 | 289 | ||
283 | logger.debug('makeRequestsToWinningPods finished.') | 290 | logger.debug('makeRequestsToWinningPods finished.') |
284 | return callback() | 291 | return callback() |
@@ -289,7 +296,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
289 | // { type, endpoint, data, toIds, transaction } | 296 | // { type, endpoint, data, toIds, transaction } |
290 | function createRequest (options, callback) { | 297 | function createRequest (options, callback) { |
291 | if (!callback) callback = function () {} | 298 | if (!callback) callback = function () {} |
292 | if (options.toIds) return _createRequest(options, callback) | 299 | if (options.toIds) return requestScheduler.createRequest(options, callback) |
293 | 300 | ||
294 | // If the "toIds" pods is not specified, we send the request to all our friends | 301 | // If the "toIds" pods is not specified, we send the request to all our friends |
295 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | 302 | db.Pod.listAllIds(options.transaction, function (err, podIds) { |
@@ -299,43 +306,7 @@ function createRequest (options, callback) { | |||
299 | } | 306 | } |
300 | 307 | ||
301 | const newOptions = Object.assign(options, { toIds: podIds }) | 308 | const newOptions = Object.assign(options, { toIds: podIds }) |
302 | return _createRequest(newOptions, callback) | 309 | return requestScheduler.createRequest(newOptions, callback) |
303 | }) | ||
304 | } | ||
305 | |||
306 | // { type, endpoint, data, toIds, transaction } | ||
307 | function _createRequest (options, callback) { | ||
308 | const type = options.type | ||
309 | const endpoint = options.endpoint | ||
310 | const data = options.data | ||
311 | const toIds = options.toIds | ||
312 | const transaction = options.transaction | ||
313 | |||
314 | const pods = [] | ||
315 | |||
316 | // If there are no destination pods abort | ||
317 | if (toIds.length === 0) return callback(null) | ||
318 | |||
319 | toIds.forEach(function (toPod) { | ||
320 | pods.push(db.Pod.build({ id: toPod })) | ||
321 | }) | ||
322 | |||
323 | const createQuery = { | ||
324 | endpoint, | ||
325 | request: { | ||
326 | type: type, | ||
327 | data: data | ||
328 | } | ||
329 | } | ||
330 | |||
331 | const dbRequestOptions = { | ||
332 | transaction | ||
333 | } | ||
334 | |||
335 | return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) { | ||
336 | if (err) return callback(err) | ||
337 | |||
338 | return request.setPods(pods, dbRequestOptions).asCallback(callback) | ||
339 | }) | 310 | }) |
340 | } | 311 | } |
341 | 312 | ||
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js new file mode 100644 index 000000000..c8bc4af28 --- /dev/null +++ b/server/lib/request-scheduler.js | |||
@@ -0,0 +1,202 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../initializers/constants') | ||
6 | const db = require('../initializers/database') | ||
7 | const logger = require('../helpers/logger') | ||
8 | const requests = require('../helpers/requests') | ||
9 | |||
10 | module.exports = class RequestScheduler { | ||
11 | |||
12 | constructor (name) { | ||
13 | this.name = name | ||
14 | |||
15 | this.lastRequestTimestamp = 0 | ||
16 | this.timer = null | ||
17 | } | ||
18 | |||
19 | activate () { | ||
20 | logger.info('Requests scheduler activated.') | ||
21 | this.lastRequestTimestamp = Date.now() | ||
22 | |||
23 | this.timer = setInterval(() => { | ||
24 | this.lastRequestTimestamp = Date.now() | ||
25 | this.makeRequests() | ||
26 | }, constants.REQUESTS_INTERVAL) | ||
27 | } | ||
28 | |||
29 | deactivate () { | ||
30 | logger.info('Requests scheduler deactivated.') | ||
31 | clearInterval(this.timer) | ||
32 | this.timer = null | ||
33 | } | ||
34 | |||
35 | forceSend () { | ||
36 | logger.info('Force requests scheduler sending.') | ||
37 | this.makeRequests() | ||
38 | } | ||
39 | |||
40 | remainingMilliSeconds () { | ||
41 | if (this.timer === null) return -1 | ||
42 | |||
43 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
44 | } | ||
45 | |||
46 | // { type, endpoint, data, toIds, transaction } | ||
47 | createRequest (options, callback) { | ||
48 | const type = options.type | ||
49 | const endpoint = options.endpoint | ||
50 | const data = options.data | ||
51 | const toIds = options.toIds | ||
52 | const transaction = options.transaction | ||
53 | |||
54 | const pods = [] | ||
55 | |||
56 | // If there are no destination pods abort | ||
57 | if (toIds.length === 0) return callback(null) | ||
58 | |||
59 | toIds.forEach(toPod => { | ||
60 | pods.push(db.Pod.build({ id: toPod })) | ||
61 | }) | ||
62 | |||
63 | const createQuery = { | ||
64 | endpoint, | ||
65 | request: { | ||
66 | type: type, | ||
67 | data: data | ||
68 | } | ||
69 | } | ||
70 | |||
71 | const dbRequestOptions = { | ||
72 | transaction | ||
73 | } | ||
74 | |||
75 | return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { | ||
76 | if (err) return callback(err) | ||
77 | |||
78 | return request.setPods(pods, dbRequestOptions).asCallback(callback) | ||
79 | }) | ||
80 | } | ||
81 | |||
82 | // --------------------------------------------------------------------------- | ||
83 | |||
84 | // Make all the requests of the scheduler | ||
85 | makeRequests () { | ||
86 | // We limit the size of the requests | ||
87 | // We don't want to stuck with the same failing requests so we get a random list | ||
88 | db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { | ||
89 | if (err) { | ||
90 | logger.error('Cannot get the list of requests.', { err: err }) | ||
91 | return // Abort | ||
92 | } | ||
93 | |||
94 | // If there are no requests, abort | ||
95 | if (requests.length === 0) { | ||
96 | logger.info('No requests to make.') | ||
97 | return | ||
98 | } | ||
99 | |||
100 | // We want to group requests by destinations pod and endpoint | ||
101 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
102 | |||
103 | logger.info('Making requests to friends.') | ||
104 | |||
105 | const goodPods = [] | ||
106 | const badPods = [] | ||
107 | |||
108 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
109 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
110 | const toPod = requestToMake.toPod | ||
111 | |||
112 | // Maybe the pod is not our friend anymore so simply remove it | ||
113 | if (!toPod) { | ||
114 | const requestIdsToDelete = requestToMake.ids | ||
115 | |||
116 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
117 | return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
118 | } | ||
119 | |||
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { | ||
121 | if (success === false) { | ||
122 | badPods.push(requestToMake.toPod.id) | ||
123 | return callbackEach() | ||
124 | } | ||
125 | |||
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
127 | goodPods.push(requestToMake.toPod.id) | ||
128 | |||
129 | // Remove the pod id of these request ids | ||
130 | db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
131 | }) | ||
132 | }, () => { | ||
133 | // All the requests were made, we update the pods score | ||
134 | db.Request.updatePodsScore(goodPods, badPods) | ||
135 | // Flush requests with no pod | ||
136 | db.Request.removeWithEmptyTo(err => { | ||
137 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
138 | }) | ||
139 | }) | ||
140 | }) | ||
141 | } | ||
142 | |||
143 | // Make a requests to friends of a certain type | ||
144 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
145 | if (!callback) callback = function () {} | ||
146 | |||
147 | const params = { | ||
148 | toPod: toPod, | ||
149 | sign: true, // Prove our identity | ||
150 | method: 'POST', | ||
151 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
152 | data: requestsToMake // Requests we need to make | ||
153 | } | ||
154 | |||
155 | // Make multiple retry requests to all of pods | ||
156 | // The function fire some useful callbacks | ||
157 | requests.makeSecureRequest(params, (err, res) => { | ||
158 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
159 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
160 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
161 | |||
162 | return callback(false) | ||
163 | } | ||
164 | |||
165 | return callback(true) | ||
166 | }) | ||
167 | } | ||
168 | |||
169 | buildRequestObjects (requests) { | ||
170 | const requestsToMakeGrouped = {} | ||
171 | |||
172 | Object.keys(requests).forEach(toPodId => { | ||
173 | requests[toPodId].forEach(data => { | ||
174 | const request = data.request | ||
175 | const pod = data.pod | ||
176 | const hashKey = toPodId + request.endpoint | ||
177 | |||
178 | if (!requestsToMakeGrouped[hashKey]) { | ||
179 | requestsToMakeGrouped[hashKey] = { | ||
180 | toPod: pod, | ||
181 | endpoint: request.endpoint, | ||
182 | ids: [], // request ids, to delete them from the DB in the future | ||
183 | datas: [] // requests data, | ||
184 | } | ||
185 | } | ||
186 | |||
187 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
188 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
189 | }) | ||
190 | }) | ||
191 | |||
192 | return requestsToMakeGrouped | ||
193 | } | ||
194 | |||
195 | flush (callback) { | ||
196 | db.Request.removeAll(err => { | ||
197 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
198 | |||
199 | return callback(err) | ||
200 | }) | ||
201 | } | ||
202 | } | ||