diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2016-06-18 16:13:54 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2016-06-18 16:13:54 +0200 |
commit | 528a9efa8272532bbd0dafc35c3e05e57c50f61e (patch) | |
tree | 62d4417df4ab9b2e53c44dc7271be81b88e4e0e5 /server/lib/requestsScheduler.js | |
parent | b2e4c0ba1a33b8a50491a1f8d111468a7da5640f (diff) | |
download | PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.gz PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.zst PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.zip |
Try to make a better communication (between pods) module
Diffstat (limited to 'server/lib/requestsScheduler.js')
-rw-r--r-- | server/lib/requestsScheduler.js | 200 |
1 files changed, 106 insertions, 94 deletions
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 78570209d..ac75e5b93 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js | |||
@@ -11,13 +11,14 @@ const requests = require('../helpers/requests') | |||
11 | const videos = require('../lib/videos') | 11 | const videos = require('../lib/videos') |
12 | const Videos = require('../models/videos') | 12 | const Videos = require('../models/videos') |
13 | 13 | ||
14 | const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE | ||
15 | let timer = null | 14 | let timer = null |
16 | 15 | ||
17 | const requestsScheduler = { | 16 | const requestsScheduler = { |
18 | activate: activate, | 17 | activate: activate, |
19 | addRequest: addRequest, | 18 | addRequest: addRequest, |
19 | addRequestTo: addRequestTo, | ||
20 | deactivate: deactivate, | 20 | deactivate: deactivate, |
21 | flush: flush, | ||
21 | forceSend: forceSend | 22 | forceSend: forceSend |
22 | } | 23 | } |
23 | 24 | ||
@@ -27,35 +28,37 @@ function activate () { | |||
27 | } | 28 | } |
28 | 29 | ||
29 | // Add request to the scheduler | 30 | // Add request to the scheduler |
30 | function addRequest (id, type, request) { | 31 | function addRequest (type, data) { |
31 | logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) | 32 | logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) |
32 | 33 | ||
33 | Requests.findById(id, function (err, entity) { | 34 | const request = { |
35 | type: type, | ||
36 | data: data | ||
37 | } | ||
38 | |||
39 | Pods.listAllIds(function (err, podIds) { | ||
34 | if (err) { | 40 | if (err) { |
35 | logger.error('Error when trying to find a request.', { error: err }) | 41 | logger.debug('Cannot list pod ids.') |
36 | return // Abort | 42 | return |
37 | } | 43 | } |
38 | 44 | ||
39 | // If there were already a request with this id in the scheduler... | 45 | // No friends |
40 | if (entity) { | 46 | if (!podIds) return |
41 | if (entity.type === type) { | ||
42 | logger.error('Cannot insert two same requests.') | ||
43 | return // Abort | ||
44 | } | ||
45 | 47 | ||
46 | // Remove the request of the other type | 48 | Requests.create(request, podIds, function (err) { |
47 | Requests.removeRequestById(id, function (err) { | 49 | if (err) logger.error('Cannot create a request.', { error: err }) |
48 | if (err) { | 50 | }) |
49 | logger.error('Cannot remove a request.', { error: err }) | 51 | }) |
50 | return // Abort | 52 | } |
51 | } | 53 | |
52 | }) | 54 | function addRequestTo (podIds, type, data) { |
53 | } else { | 55 | const request = { |
54 | Requests.create(id, type, request, function (err) { | 56 | type: type, |
55 | if (err) logger.error('Cannot create a request.', { error: err }) | 57 | data: data |
56 | return // Abort | 58 | } |
57 | }) | 59 | |
58 | } | 60 | Requests.create(request, podIds, function (err) { |
61 | if (err) logger.error('Cannot create a request.', { error: err }) | ||
59 | }) | 62 | }) |
60 | } | 63 | } |
61 | 64 | ||
@@ -64,6 +67,14 @@ function deactivate () { | |||
64 | clearInterval(timer) | 67 | clearInterval(timer) |
65 | } | 68 | } |
66 | 69 | ||
70 | function flush () { | ||
71 | Requests.removeAll(function (err) { | ||
72 | if (err) { | ||
73 | logger.error('Cannot flush the requests.', { error: err }) | ||
74 | } | ||
75 | }) | ||
76 | } | ||
77 | |||
67 | function forceSend () { | 78 | function forceSend () { |
68 | logger.info('Force requests scheduler sending.') | 79 | logger.info('Force requests scheduler sending.') |
69 | makeRequests() | 80 | makeRequests() |
@@ -76,54 +87,28 @@ module.exports = requestsScheduler | |||
76 | // --------------------------------------------------------------------------- | 87 | // --------------------------------------------------------------------------- |
77 | 88 | ||
78 | // Make a requests to friends of a certain type | 89 | // Make a requests to friends of a certain type |
79 | function makeRequest (type, requestsToMake, callback) { | 90 | function makeRequest (toPod, requestsToMake, callback) { |
80 | if (!callback) callback = function () {} | 91 | if (!callback) callback = function () {} |
81 | 92 | ||
82 | Pods.list(function (err, pods) { | 93 | const params = { |
83 | if (err) return callback(err) | 94 | toPod: toPod, |
84 | 95 | encrypt: true, // Security | |
85 | const params = { | 96 | sign: true, // To prove our identity |
86 | encrypt: true, // Security | 97 | method: 'POST', |
87 | sign: true, // To prove our identity | 98 | path: '/api/' + constants.API_VERSION + '/remote/videos', |
88 | method: 'POST', | 99 | data: requestsToMake // Requests we need to make |
89 | path: null, // We build the path later | 100 | } |
90 | data: requestsToMake // Requests we need to make | 101 | |
91 | } | 102 | // Make multiple retry requests to all of pods |
92 | 103 | // The function fire some useful callbacks | |
93 | // If this is a valid type, we build the path | 104 | requests.makeSecureRequest(params, function (err, res) { |
94 | if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { | 105 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { |
95 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type | 106 | logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) |
96 | } else { | 107 | |
97 | return callback(new Error('Unkown pool request type.')) | 108 | return callback(false) |
98 | } | ||
99 | |||
100 | const badPods = [] | ||
101 | const goodPods = [] | ||
102 | |||
103 | // Make multiple retry requests to all of pods | ||
104 | // The function fire some useful callbacks | ||
105 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | ||
106 | |||
107 | function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { | ||
108 | // We failed the request, add the pod unreachable to the bad pods list | ||
109 | if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { | ||
110 | badPods.push(pod._id) | ||
111 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) | ||
112 | } else { | ||
113 | // Request success | ||
114 | goodPods.push(pod._id) | ||
115 | } | ||
116 | |||
117 | return callbackEachPodFinished() | ||
118 | } | 109 | } |
119 | 110 | ||
120 | function callbackAllPodsFinished (err) { | 111 | return callback(true) |
121 | if (err) return callback(err) | ||
122 | |||
123 | // All the requests were made, we update the pods score | ||
124 | updatePodsScore(goodPods, badPods) | ||
125 | callback(null) | ||
126 | } | ||
127 | }) | 112 | }) |
128 | } | 113 | } |
129 | 114 | ||
@@ -143,38 +128,65 @@ function makeRequests () { | |||
143 | 128 | ||
144 | logger.info('Making requests to friends.') | 129 | logger.info('Making requests to friends.') |
145 | 130 | ||
131 | // Requests by pods id | ||
146 | const requestsToMake = {} | 132 | const requestsToMake = {} |
147 | for (const type of REQUEST_SCHEDULER_TYPE) { | ||
148 | requestsToMake[type] = { | ||
149 | ids: [], | ||
150 | requests: [] | ||
151 | } | ||
152 | } | ||
153 | 133 | ||
154 | // For each requests to make, we add it to the correct request type | ||
155 | requests.forEach(function (poolRequest) { | 134 | requests.forEach(function (poolRequest) { |
156 | if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { | 135 | poolRequest.to.forEach(function (toPodId) { |
157 | const requestTypeToMake = requestsToMake[poolRequest.type] | 136 | if (!requestsToMake[toPodId]) { |
158 | requestTypeToMake.requests.push(poolRequest.request) | 137 | requestsToMake[toPodId] = { |
159 | requestTypeToMake.ids.push(poolRequest._id) | 138 | ids: [], |
160 | } else { | 139 | datas: [] |
161 | logger.error('Unkown request type.', { request_type: poolRequest.type }) | 140 | } |
162 | return // abort | 141 | } |
163 | } | 142 | |
143 | requestsToMake[toPodId].ids.push(poolRequest._id) | ||
144 | requestsToMake[toPodId].datas.push(poolRequest.request) | ||
145 | }) | ||
164 | }) | 146 | }) |
165 | 147 | ||
166 | for (let type of Object.keys(requestsToMake)) { | 148 | const goodPods = [] |
167 | const requestTypeToMake = requestsToMake[type] | 149 | const badPods = [] |
168 | // If there are requests for this type | ||
169 | if (requestTypeToMake.requests.length !== 0) { | ||
170 | makeRequest(type, requestTypeToMake.requests, function (err) { | ||
171 | if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err }) | ||
172 | 150 | ||
173 | // We made the requests, so we can remove them from the scheduler | 151 | async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { |
174 | Requests.removeRequests(requestTypeToMake.ids) | 152 | const requestToMake = requestsToMake[toPodId] |
153 | |||
154 | // FIXME: mongodb request inside a loop :/ | ||
155 | Pods.findById(toPodId, function (err, toPod) { | ||
156 | if (err) return logger.error('Error finding pod by id.', { err: err }) | ||
157 | |||
158 | // Maybe the pod is not our friend anymore so simply remove them | ||
159 | if (!toPod) { | ||
160 | Requests.removePodOf(requestToMake.ids, toPodId) | ||
161 | return callbackEach() | ||
162 | } | ||
163 | |||
164 | makeRequest(toPod, requestToMake.datas, function (success) { | ||
165 | if (err) { | ||
166 | logger.error('Errors when sent request to %s.', toPod.url, { error: err }) | ||
167 | // Do not stop the process just for one error | ||
168 | return callbackEach() | ||
169 | } | ||
170 | |||
171 | if (success === true) { | ||
172 | logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) | ||
173 | |||
174 | // Remove the pod id of these request ids | ||
175 | Requests.removePodOf(requestToMake.ids, toPodId) | ||
176 | goodPods.push(toPodId) | ||
177 | } else { | ||
178 | badPods.push(toPodId) | ||
179 | } | ||
180 | |||
181 | callbackEach() | ||
175 | }) | 182 | }) |
176 | } | 183 | }) |
177 | } | 184 | }, function () { |
185 | // All the requests were made, we update the pods score | ||
186 | updatePodsScore(goodPods, badPods) | ||
187 | // Flush requests with no pod | ||
188 | Requests.removeWithEmptyTo() | ||
189 | }) | ||
178 | }) | 190 | }) |
179 | } | 191 | } |
180 | 192 | ||