aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/friends.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-06-18 16:13:54 +0200
committerChocobozzz <florian.bigard@gmail.com>2016-06-18 16:13:54 +0200
commit528a9efa8272532bbd0dafc35c3e05e57c50f61e (patch)
tree62d4417df4ab9b2e53c44dc7271be81b88e4e0e5 /server/lib/friends.js
parentb2e4c0ba1a33b8a50491a1f8d111468a7da5640f (diff)
downloadPeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.gz
PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.zst
PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.zip
Try to make a better communication (between pods) module
Diffstat (limited to 'server/lib/friends.js')
-rw-r--r--server/lib/friends.js158
1 files changed, 82 insertions, 76 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js
index e986fa006..d81a603ad 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -24,15 +24,15 @@ const pods = {
24 getMyCertificate: getMyCertificate, 24 getMyCertificate: getMyCertificate,
25 makeFriends: makeFriends, 25 makeFriends: makeFriends,
26 quitFriends: quitFriends, 26 quitFriends: quitFriends,
27 removeVideoToFriends: removeVideoToFriends 27 removeVideoToFriends: removeVideoToFriends,
28 sendOwnedVideosToPod: sendOwnedVideosToPod
28} 29}
29 30
30function addVideoToFriends (video) { 31function addVideoToFriends (video) {
31 // To avoid duplicates
32 const id = video.name + video.magnetUri
33 // ensure namePath is null 32 // ensure namePath is null
34 video.namePath = null 33 video.namePath = null
35 requestsScheduler.addRequest(id, 'add', video) 34
35 requestsScheduler.addRequest('add', video)
36} 36}
37 37
38function hasFriends (callback) { 38function hasFriends (callback) {
@@ -60,7 +60,7 @@ function makeFriends (callback) {
60 60
61 const urls = config.get('network.friends') 61 const urls = config.get('network.friends')
62 62
63 async.each(urls, function (url, callbackEach) { 63 async.eachSeries(urls, function (url, callbackEach) {
64 computeForeignPodsList(url, podsScore, callbackEach) 64 computeForeignPodsList(url, podsScore, callbackEach)
65 }, function (err) { 65 }, function (err) {
66 if (err) return callback(err) 66 if (err) return callback(err)
@@ -78,7 +78,7 @@ function quitFriends (callback) {
78 // Stop pool requests 78 // Stop pool requests
79 requestsScheduler.deactivate() 79 requestsScheduler.deactivate()
80 // Flush pool requests 80 // Flush pool requests
81 requestsScheduler.forceSend() 81 requestsScheduler.flush()
82 82
83 async.waterfall([ 83 async.waterfall([
84 function getPodsList (callbackAsync) { 84 function getPodsList (callbackAsync) {
@@ -86,19 +86,25 @@ function quitFriends (callback) {
86 }, 86 },
87 87
88 function announceIQuitMyFriends (pods, callbackAsync) { 88 function announceIQuitMyFriends (pods, callbackAsync) {
89 const request = { 89 const requestParams = {
90 method: 'POST', 90 method: 'POST',
91 path: '/api/' + constants.API_VERSION + '/pods/remove', 91 path: '/api/' + constants.API_VERSION + '/pods/remove',
92 sign: true, 92 sign: true
93 encrypt: true,
94 data: {
95 url: 'me' // Fake data
96 }
97 } 93 }
98 94
99 // Announce we quit them 95 // Announce we quit them
100 requests.makeMultipleRetryRequest(request, pods, function (err) { 96 // We don't care if the request fails
101 return callbackAsync(err) 97 // The other pod will exclude us automatically after a while
98 async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
99 requestParams.toPod = pod
100 requests.makeSecureRequest(requestParams, callbackEach)
101 }, function (err) {
102 if (err) {
103 logger.error('Some errors while quitting friends.', { err: err })
104 // Don't stop the process
105 }
106
107 return callbackAsync()
102 }) 108 })
103 }, 109 },
104 110
@@ -136,9 +142,28 @@ function quitFriends (callback) {
136} 142}
137 143
138function removeVideoToFriends (video) { 144function removeVideoToFriends (video) {
139 // To avoid duplicates 145 requestsScheduler.addRequest('remove', video)
140 const id = video.name + video.magnetUri 146}
141 requestsScheduler.addRequest(id, 'remove', video) 147
148function sendOwnedVideosToPod (podId) {
149 Videos.listOwned(function (err, videosList) {
150 if (err) {
151 logger.error('Cannot get the list of videos we own.')
152 return
153 }
154
155 videosList.forEach(function (video) {
156 videos.convertVideoToRemote(video, function (err, remoteVideo) {
157 if (err) {
158 logger.error('Cannot convert video to remote.', { error: err })
159 // Don't break the process
160 return
161 }
162
163 requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo)
164 })
165 })
166 })
142} 167}
143 168
144// --------------------------------------------------------------------------- 169// ---------------------------------------------------------------------------
@@ -148,18 +173,19 @@ module.exports = pods
148// --------------------------------------------------------------------------- 173// ---------------------------------------------------------------------------
149 174
150function computeForeignPodsList (url, podsScore, callback) { 175function computeForeignPodsList (url, podsScore, callback) {
151 // Let's give 1 point to the pod we ask the friends list
152 podsScore[url] = 1
153
154 getForeignPodsList(url, function (err, foreignPodsList) { 176 getForeignPodsList(url, function (err, foreignPodsList) {
155 if (err) return callback(err) 177 if (err) return callback(err)
156 if (foreignPodsList.length === 0) return callback() 178
179 if (!foreignPodsList) foreignPodsList = []
180
181 // Let's give 1 point to the pod we ask the friends list
182 foreignPodsList.push({ url: url })
157 183
158 foreignPodsList.forEach(function (foreignPod) { 184 foreignPodsList.forEach(function (foreignPod) {
159 const foreignUrl = foreignPod.url 185 const foreignPodUrl = foreignPod.url
160 186
161 if (podsScore[foreignUrl]) podsScore[foreignUrl]++ 187 if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++
162 else podsScore[foreignUrl] = 1 188 else podsScore[foreignPodUrl] = 1
163 }) 189 })
164 190
165 callback() 191 callback()
@@ -194,63 +220,43 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
194 // Flush pool requests 220 // Flush pool requests
195 requestsScheduler.forceSend() 221 requestsScheduler.forceSend()
196 222
197 // Get the list of our videos to send to our new friends 223 async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
198 Videos.listOwned(function (err, videosList) { 224 const params = {
199 if (err) { 225 url: pod.url + '/api/' + constants.API_VERSION + '/pods/',
200 logger.error('Cannot get the list of videos we own.') 226 method: 'POST',
201 return callback(err) 227 json: {
202 } 228 url: http + '://' + host + ':' + port,
203 229 publicKey: cert
204 const data = { 230 }
205 url: http + '://' + host + ':' + port,
206 publicKey: cert,
207 videos: videosList
208 } 231 }
209 232
210 requests.makeMultipleRetryRequest( 233 requests.makeRetryRequest(params, function (err, res, body) {
211 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, 234 if (err) {
212 235 logger.error('Error with adding %s pod.', pod.url, { error: err })
213 podsList, 236 // Don't break the process
214 237 return callbackEach()
215 // Callback called after each request 238 }
216 function eachRequest (err, response, body, url, pod, callbackEachRequest) {
217 // We add the pod if it responded correctly with its public certificate
218 if (!err && response.statusCode === 200) {
219 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
220 if (err) {
221 logger.error('Error with adding %s pod.', pod.url, { error: err })
222 return callbackEachRequest()
223 }
224
225 videos.createRemoteVideos(body.videos, function (err) {
226 if (err) {
227 logger.error('Error with adding videos of pod.', pod.url, { error: err })
228 return callbackEachRequest()
229 }
230
231 logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
232 return callbackEachRequest()
233 })
234 })
235 } else {
236 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
237 return callbackEachRequest()
238 }
239 },
240 239
241 // Final callback, we've ended all the requests 240 if (res.statusCode === 200) {
242 function endRequests (err) { 241 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) {
243 // Now we made new friends, we can re activate the pool of requests 242 if (err) logger.error('Cannot add friend %s pod.', pod.url)
244 requestsScheduler.activate()
245 243
246 if (err) { 244 // Add our videos to the request scheduler
247 logger.error('There was some errors when we wanted to make friends.') 245 sendOwnedVideosToPod(podCreated._id)
248 return callback(err)
249 }
250 246
251 logger.debug('makeRequestsToWinningPods finished.') 247 return callbackEach()
252 return callback(null) 248 })
249 } else {
250 logger.error('Status not 200 for %s pod.', pod.url)
251 return callbackEach()
253 } 252 }
254 ) 253 })
254 }, function endRequests () {
255 // Final callback, we've ended all the requests
256 // Now we made new friends, we can re activate the pool of requests
257 requestsScheduler.activate()
258
259 logger.debug('makeRequestsToWinningPods finished.')
260 return callback()
255 }) 261 })
256} 262}