diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2016-06-18 16:13:54 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2016-06-18 16:13:54 +0200 |
commit | 528a9efa8272532bbd0dafc35c3e05e57c50f61e (patch) | |
tree | 62d4417df4ab9b2e53c44dc7271be81b88e4e0e5 /server/lib/friends.js | |
parent | b2e4c0ba1a33b8a50491a1f8d111468a7da5640f (diff) | |
download | PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.gz PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.zst PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.zip |
Try to make a better communication (between pods) module
Diffstat (limited to 'server/lib/friends.js')
-rw-r--r-- | server/lib/friends.js | 158 |
1 files changed, 82 insertions, 76 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js index e986fa006..d81a603ad 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -24,15 +24,15 @@ const pods = { | |||
24 | getMyCertificate: getMyCertificate, | 24 | getMyCertificate: getMyCertificate, |
25 | makeFriends: makeFriends, | 25 | makeFriends: makeFriends, |
26 | quitFriends: quitFriends, | 26 | quitFriends: quitFriends, |
27 | removeVideoToFriends: removeVideoToFriends | 27 | removeVideoToFriends: removeVideoToFriends, |
28 | sendOwnedVideosToPod: sendOwnedVideosToPod | ||
28 | } | 29 | } |
29 | 30 | ||
30 | function addVideoToFriends (video) { | 31 | function addVideoToFriends (video) { |
31 | // To avoid duplicates | ||
32 | const id = video.name + video.magnetUri | ||
33 | // ensure namePath is null | 32 | // ensure namePath is null |
34 | video.namePath = null | 33 | video.namePath = null |
35 | requestsScheduler.addRequest(id, 'add', video) | 34 | |
35 | requestsScheduler.addRequest('add', video) | ||
36 | } | 36 | } |
37 | 37 | ||
38 | function hasFriends (callback) { | 38 | function hasFriends (callback) { |
@@ -60,7 +60,7 @@ function makeFriends (callback) { | |||
60 | 60 | ||
61 | const urls = config.get('network.friends') | 61 | const urls = config.get('network.friends') |
62 | 62 | ||
63 | async.each(urls, function (url, callbackEach) { | 63 | async.eachSeries(urls, function (url, callbackEach) { |
64 | computeForeignPodsList(url, podsScore, callbackEach) | 64 | computeForeignPodsList(url, podsScore, callbackEach) |
65 | }, function (err) { | 65 | }, function (err) { |
66 | if (err) return callback(err) | 66 | if (err) return callback(err) |
@@ -78,7 +78,7 @@ function quitFriends (callback) { | |||
78 | // Stop pool requests | 78 | // Stop pool requests |
79 | requestsScheduler.deactivate() | 79 | requestsScheduler.deactivate() |
80 | // Flush pool requests | 80 | // Flush pool requests |
81 | requestsScheduler.forceSend() | 81 | requestsScheduler.flush() |
82 | 82 | ||
83 | async.waterfall([ | 83 | async.waterfall([ |
84 | function getPodsList (callbackAsync) { | 84 | function getPodsList (callbackAsync) { |
@@ -86,19 +86,25 @@ function quitFriends (callback) { | |||
86 | }, | 86 | }, |
87 | 87 | ||
88 | function announceIQuitMyFriends (pods, callbackAsync) { | 88 | function announceIQuitMyFriends (pods, callbackAsync) { |
89 | const request = { | 89 | const requestParams = { |
90 | method: 'POST', | 90 | method: 'POST', |
91 | path: '/api/' + constants.API_VERSION + '/pods/remove', | 91 | path: '/api/' + constants.API_VERSION + '/pods/remove', |
92 | sign: true, | 92 | sign: true |
93 | encrypt: true, | ||
94 | data: { | ||
95 | url: 'me' // Fake data | ||
96 | } | ||
97 | } | 93 | } |
98 | 94 | ||
99 | // Announce we quit them | 95 | // Announce we quit them |
100 | requests.makeMultipleRetryRequest(request, pods, function (err) { | 96 | // We don't care if the request fails |
101 | return callbackAsync(err) | 97 | // The other pod will exclude us automatically after a while |
98 | async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { | ||
99 | requestParams.toPod = pod | ||
100 | requests.makeSecureRequest(requestParams, callbackEach) | ||
101 | }, function (err) { | ||
102 | if (err) { | ||
103 | logger.error('Some errors while quitting friends.', { err: err }) | ||
104 | // Don't stop the process | ||
105 | } | ||
106 | |||
107 | return callbackAsync() | ||
102 | }) | 108 | }) |
103 | }, | 109 | }, |
104 | 110 | ||
@@ -136,9 +142,28 @@ function quitFriends (callback) { | |||
136 | } | 142 | } |
137 | 143 | ||
138 | function removeVideoToFriends (video) { | 144 | function removeVideoToFriends (video) { |
139 | // To avoid duplicates | 145 | requestsScheduler.addRequest('remove', video) |
140 | const id = video.name + video.magnetUri | 146 | } |
141 | requestsScheduler.addRequest(id, 'remove', video) | 147 | |
148 | function sendOwnedVideosToPod (podId) { | ||
149 | Videos.listOwned(function (err, videosList) { | ||
150 | if (err) { | ||
151 | logger.error('Cannot get the list of videos we own.') | ||
152 | return | ||
153 | } | ||
154 | |||
155 | videosList.forEach(function (video) { | ||
156 | videos.convertVideoToRemote(video, function (err, remoteVideo) { | ||
157 | if (err) { | ||
158 | logger.error('Cannot convert video to remote.', { error: err }) | ||
159 | // Don't break the process | ||
160 | return | ||
161 | } | ||
162 | |||
163 | requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) | ||
164 | }) | ||
165 | }) | ||
166 | }) | ||
142 | } | 167 | } |
143 | 168 | ||
144 | // --------------------------------------------------------------------------- | 169 | // --------------------------------------------------------------------------- |
@@ -148,18 +173,19 @@ module.exports = pods | |||
148 | // --------------------------------------------------------------------------- | 173 | // --------------------------------------------------------------------------- |
149 | 174 | ||
150 | function computeForeignPodsList (url, podsScore, callback) { | 175 | function computeForeignPodsList (url, podsScore, callback) { |
151 | // Let's give 1 point to the pod we ask the friends list | ||
152 | podsScore[url] = 1 | ||
153 | |||
154 | getForeignPodsList(url, function (err, foreignPodsList) { | 176 | getForeignPodsList(url, function (err, foreignPodsList) { |
155 | if (err) return callback(err) | 177 | if (err) return callback(err) |
156 | if (foreignPodsList.length === 0) return callback() | 178 | |
179 | if (!foreignPodsList) foreignPodsList = [] | ||
180 | |||
181 | // Let's give 1 point to the pod we ask the friends list | ||
182 | foreignPodsList.push({ url: url }) | ||
157 | 183 | ||
158 | foreignPodsList.forEach(function (foreignPod) { | 184 | foreignPodsList.forEach(function (foreignPod) { |
159 | const foreignUrl = foreignPod.url | 185 | const foreignPodUrl = foreignPod.url |
160 | 186 | ||
161 | if (podsScore[foreignUrl]) podsScore[foreignUrl]++ | 187 | if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++ |
162 | else podsScore[foreignUrl] = 1 | 188 | else podsScore[foreignPodUrl] = 1 |
163 | }) | 189 | }) |
164 | 190 | ||
165 | callback() | 191 | callback() |
@@ -194,63 +220,43 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
194 | // Flush pool requests | 220 | // Flush pool requests |
195 | requestsScheduler.forceSend() | 221 | requestsScheduler.forceSend() |
196 | 222 | ||
197 | // Get the list of our videos to send to our new friends | 223 | async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { |
198 | Videos.listOwned(function (err, videosList) { | 224 | const params = { |
199 | if (err) { | 225 | url: pod.url + '/api/' + constants.API_VERSION + '/pods/', |
200 | logger.error('Cannot get the list of videos we own.') | 226 | method: 'POST', |
201 | return callback(err) | 227 | json: { |
202 | } | 228 | url: http + '://' + host + ':' + port, |
203 | 229 | publicKey: cert | |
204 | const data = { | 230 | } |
205 | url: http + '://' + host + ':' + port, | ||
206 | publicKey: cert, | ||
207 | videos: videosList | ||
208 | } | 231 | } |
209 | 232 | ||
210 | requests.makeMultipleRetryRequest( | 233 | requests.makeRetryRequest(params, function (err, res, body) { |
211 | { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, | 234 | if (err) { |
212 | 235 | logger.error('Error with adding %s pod.', pod.url, { error: err }) | |
213 | podsList, | 236 | // Don't break the process |
214 | 237 | return callbackEach() | |
215 | // Callback called after each request | 238 | } |
216 | function eachRequest (err, response, body, url, pod, callbackEachRequest) { | ||
217 | // We add the pod if it responded correctly with its public certificate | ||
218 | if (!err && response.statusCode === 200) { | ||
219 | Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { | ||
220 | if (err) { | ||
221 | logger.error('Error with adding %s pod.', pod.url, { error: err }) | ||
222 | return callbackEachRequest() | ||
223 | } | ||
224 | |||
225 | videos.createRemoteVideos(body.videos, function (err) { | ||
226 | if (err) { | ||
227 | logger.error('Error with adding videos of pod.', pod.url, { error: err }) | ||
228 | return callbackEachRequest() | ||
229 | } | ||
230 | |||
231 | logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) | ||
232 | return callbackEachRequest() | ||
233 | }) | ||
234 | }) | ||
235 | } else { | ||
236 | logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) | ||
237 | return callbackEachRequest() | ||
238 | } | ||
239 | }, | ||
240 | 239 | ||
241 | // Final callback, we've ended all the requests | 240 | if (res.statusCode === 200) { |
242 | function endRequests (err) { | 241 | Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) { |
243 | // Now we made new friends, we can re activate the pool of requests | 242 | if (err) logger.error('Cannot add friend %s pod.', pod.url) |
244 | requestsScheduler.activate() | ||
245 | 243 | ||
246 | if (err) { | 244 | // Add our videos to the request scheduler |
247 | logger.error('There was some errors when we wanted to make friends.') | 245 | sendOwnedVideosToPod(podCreated._id) |
248 | return callback(err) | ||
249 | } | ||
250 | 246 | ||
251 | logger.debug('makeRequestsToWinningPods finished.') | 247 | return callbackEach() |
252 | return callback(null) | 248 | }) |
249 | } else { | ||
250 | logger.error('Status not 200 for %s pod.', pod.url) | ||
251 | return callbackEach() | ||
253 | } | 252 | } |
254 | ) | 253 | }) |
254 | }, function endRequests () { | ||
255 | // Final callback, we've ended all the requests | ||
256 | // Now we made new friends, we can re activate the pool of requests | ||
257 | requestsScheduler.activate() | ||
258 | |||
259 | logger.debug('makeRequestsToWinningPods finished.') | ||
260 | return callback() | ||
255 | }) | 261 | }) |
256 | } | 262 | } |