diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2016-01-23 18:31:58 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2016-01-23 18:31:58 +0100 |
commit | 45239549bf2659998dcf9196d86974b0b625912e (patch) | |
tree | 823d324db097400a7b5ae59a03deff54c5fd86ef /src | |
parent | 2cc8ebf134b66047cd639ee7324e1ecfd5c5fd18 (diff) | |
download | PeerTube-45239549bf2659998dcf9196d86974b0b625912e.tar.gz PeerTube-45239549bf2659998dcf9196d86974b0b625912e.tar.zst PeerTube-45239549bf2659998dcf9196d86974b0b625912e.zip |
Finalise the join in a network and add the ability to quit it
Diffstat (limited to 'src')
-rw-r--r-- | src/pods.js | 152 | ||||
-rw-r--r-- | src/poolRequests.js | 32 | ||||
-rw-r--r-- | src/utils.js | 10 | ||||
-rw-r--r-- | src/videos.js | 34 |
4 files changed, 191 insertions, 37 deletions
diff --git a/src/pods.js b/src/pods.js index 8da216a55..defa9b1c1 100644 --- a/src/pods.js +++ b/src/pods.js | |||
@@ -43,7 +43,9 @@ | |||
43 | } | 43 | } |
44 | 44 | ||
45 | // { url } | 45 | // { url } |
46 | // TODO: check if the pod is not already a friend | ||
46 | pods.add = function (data, callback) { | 47 | pods.add = function (data, callback) { |
48 | var videos = require('./videos') | ||
47 | logger.info('Adding pod: %s', data.url) | 49 | logger.info('Adding pod: %s', data.url) |
48 | 50 | ||
49 | var params = { | 51 | var params = { |
@@ -58,13 +60,38 @@ | |||
58 | return callback(err) | 60 | return callback(err) |
59 | } | 61 | } |
60 | 62 | ||
63 | videos.addRemotes(data.videos) | ||
64 | |||
61 | fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { | 65 | fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { |
62 | if (err) { | 66 | if (err) { |
63 | logger.error('Cannot read cert file.', { error: err }) | 67 | logger.error('Cannot read cert file.', { error: err }) |
64 | return callback(err) | 68 | return callback(err) |
65 | } | 69 | } |
66 | 70 | ||
67 | return callback(null, { cert: cert }) | 71 | videos.listOwned(function (err, videos_list) { |
72 | if (err) { | ||
73 | logger.error('Cannot get the list of owned videos.', { error: err }) | ||
74 | return callback(err) | ||
75 | } | ||
76 | |||
77 | return callback(null, { cert: cert, videos: videos_list }) | ||
78 | }) | ||
79 | }) | ||
80 | }) | ||
81 | } | ||
82 | |||
83 | pods.remove = function (url, callback) { | ||
84 | var videos = require('./videos') | ||
85 | logger.info('Removing %s pod.', url) | ||
86 | |||
87 | videos.removeAllRemotesOf(url, function (err) { | ||
88 | if (err) logger.error('Cannot remove all remote videos of %s.', url) | ||
89 | |||
90 | PodsDB.remove({ url: url }, function (err) { | ||
91 | if (err) return callback(err) | ||
92 | |||
93 | logger.info('%s pod removed.', url) | ||
94 | callback(null) | ||
68 | }) | 95 | }) |
69 | }) | 96 | }) |
70 | } | 97 | } |
@@ -82,6 +109,7 @@ | |||
82 | } | 109 | } |
83 | 110 | ||
84 | pods.makeFriends = function (callback) { | 111 | pods.makeFriends = function (callback) { |
112 | var videos = require('./videos') | ||
85 | var pods_score = {} | 113 | var pods_score = {} |
86 | 114 | ||
87 | logger.info('Make friends!') | 115 | logger.info('Make friends!') |
@@ -137,43 +165,109 @@ | |||
137 | } | 165 | } |
138 | 166 | ||
139 | function makeRequestsToWinningPods (cert, pods_list) { | 167 | function makeRequestsToWinningPods (cert, pods_list) { |
140 | var data = { | 168 | // Stop pool requests |
141 | url: http + '://' + host + ':' + port, | 169 | poolRequests.deactivate() |
142 | publicKey: cert | 170 | // Flush pool requests |
143 | } | 171 | poolRequests.forceSend() |
172 | |||
173 | // Get the list of our videos to send to our new friends | ||
174 | videos.listOwned(function (err, videos_list) { | ||
175 | if (err) throw err | ||
176 | |||
177 | var data = { | ||
178 | url: http + '://' + host + ':' + port, | ||
179 | publicKey: cert, | ||
180 | videos: videos_list | ||
181 | } | ||
144 | 182 | ||
145 | utils.makeMultipleRetryRequest( | 183 | utils.makeMultipleRetryRequest( |
146 | { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, | 184 | { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, |
147 | 185 | ||
148 | pods_list, | 186 | pods_list, |
149 | 187 | ||
150 | function eachRequest (err, response, body, url, pod, callback_each_request) { | 188 | function eachRequest (err, response, body, url, pod, callback_each_request) { |
151 | // We add the pod if it responded correctly with its public certificate | 189 | // We add the pod if it responded correctly with its public certificate |
152 | if (!err && response.statusCode === 200) { | 190 | if (!err && response.statusCode === 200) { |
153 | pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { | 191 | pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { |
154 | if (err) { | 192 | if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) |
155 | logger.error('Error with adding %s pod.', pod.url, { error: err }) | ||
156 | } | ||
157 | 193 | ||
194 | videos.addRemotes(body.videos, function (err) { | ||
195 | if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) | ||
196 | |||
197 | logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) | ||
198 | return callback_each_request() | ||
199 | }) | ||
200 | }) | ||
201 | } else { | ||
202 | logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) | ||
158 | return callback_each_request() | 203 | return callback_each_request() |
159 | }) | 204 | } |
160 | } else { | 205 | }, |
161 | logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) | ||
162 | return callback_each_request() | ||
163 | } | ||
164 | }, | ||
165 | 206 | ||
166 | function endRequests (err) { | 207 | function endRequests (err) { |
167 | if (err) { | 208 | // Now we made new friends, we can re activate the pool of requests |
168 | logger.error('There was some errors when we wanted to make friends.', { error: err }) | 209 | poolRequests.activate() |
169 | return callback(err) | 210 | |
211 | if (err) { | ||
212 | logger.error('There was some errors when we wanted to make friends.', { error: err }) | ||
213 | return callback(err) | ||
214 | } | ||
215 | |||
216 | logger.debug('makeRequestsToWinningPods finished.') | ||
217 | return callback(null) | ||
170 | } | 218 | } |
219 | ) | ||
220 | }) | ||
221 | } | ||
222 | } | ||
171 | 223 | ||
172 | logger.debug('makeRequestsToWinningPods finished.') | 224 | pods.quitFriends = function (callback) { |
173 | return callback(null) | 225 | // Stop pool requests |
226 | poolRequests.deactivate() | ||
227 | // Flush pool requests | ||
228 | poolRequests.forceSend() | ||
229 | |||
230 | PodsDB.find(function (err, pods) { | ||
231 | if (err) return callback(err) | ||
232 | |||
233 | var request = { | ||
234 | method: 'POST', | ||
235 | path: '/api/' + constants.API_VERSION + '/pods/remove', | ||
236 | sign: true, | ||
237 | encrypt: true, | ||
238 | data: { | ||
239 | url: 'me' // Fake data | ||
174 | } | 240 | } |
175 | ) | 241 | } |
176 | } | 242 | |
243 | // Announce we quit them | ||
244 | utils.makeMultipleRetryRequest(request, pods, function () { | ||
245 | PodsDB.remove(function (err) { | ||
246 | poolRequests.activate() | ||
247 | |||
248 | if (err) return callback(err) | ||
249 | |||
250 | logger.info('Broke friends, so sad :(') | ||
251 | |||
252 | var videos = require('./videos') | ||
253 | videos.removeAllRemotes(function (err) { | ||
254 | if (err) return callback(err) | ||
255 | |||
256 | logger.info('Removed all remote videos.') | ||
257 | callback(null) | ||
258 | }) | ||
259 | }) | ||
260 | }) | ||
261 | }) | ||
262 | } | ||
263 | |||
264 | pods.hasFriends = function (callback) { | ||
265 | PodsDB.count(function (err, count) { | ||
266 | if (err) return callback(err) | ||
267 | |||
268 | var has_friends = (count !== 0) | ||
269 | callback(null, has_friends) | ||
270 | }) | ||
177 | } | 271 | } |
178 | 272 | ||
179 | module.exports = pods | 273 | module.exports = pods |
diff --git a/src/poolRequests.js b/src/poolRequests.js index edb12b1e8..7f422f372 100644 --- a/src/poolRequests.js +++ b/src/poolRequests.js | |||
@@ -6,9 +6,11 @@ | |||
6 | var constants = require('./constants') | 6 | var constants = require('./constants') |
7 | var logger = require('./logger') | 7 | var logger = require('./logger') |
8 | var database = require('./database') | 8 | var database = require('./database') |
9 | var pluck = require('lodash-node/compat/collection/pluck') | ||
9 | var PoolRequestsDB = database.PoolRequestsDB | 10 | var PoolRequestsDB = database.PoolRequestsDB |
10 | var PodsDB = database.PodsDB | 11 | var PodsDB = database.PodsDB |
11 | var utils = require('./utils') | 12 | var utils = require('./utils') |
13 | var VideosDB = database.VideosDB | ||
12 | 14 | ||
13 | var poolRequests = {} | 15 | var poolRequests = {} |
14 | 16 | ||
@@ -90,11 +92,26 @@ | |||
90 | } | 92 | } |
91 | 93 | ||
92 | function removeBadPods () { | 94 | function removeBadPods () { |
93 | PodsDB.remove({ score: 0 }, function (err, result) { | 95 | PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) { |
94 | if (err) throw err | 96 | if (err) throw err |
95 | 97 | ||
96 | var number_removed = result.result.n | 98 | if (pods.length === 0) return |
97 | if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) | 99 | |
100 | var urls = pluck(pods, 'url') | ||
101 | var ids = pluck(pods, '_id') | ||
102 | |||
103 | VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) { | ||
104 | if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err }) | ||
105 | var videos_removed = r.result.n | ||
106 | logger.info('Removed %d videos.', videos_removed) | ||
107 | |||
108 | PodsDB.remove({ _id: { $in: ids } }, function (err, r) { | ||
109 | if (err) logger.error('Cannot remove bad pods.', { error: err }) | ||
110 | |||
111 | var pods_removed = r.result.n | ||
112 | logger.info('Removed %d pods.', pods_removed) | ||
113 | }) | ||
114 | }) | ||
98 | }) | 115 | }) |
99 | } | 116 | } |
100 | 117 | ||
@@ -126,9 +143,9 @@ | |||
126 | utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | 143 | utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) |
127 | 144 | ||
128 | function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { | 145 | function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { |
129 | if (err || response.statusCode !== 200) { | 146 | if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { |
130 | bad_pods.push(pod._id) | 147 | bad_pods.push(pod._id) |
131 | logger.error('Error sending secure request to %s pod.', url, { error: err }) | 148 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) |
132 | } else { | 149 | } else { |
133 | good_pods.push(pod._id) | 150 | good_pods.push(pod._id) |
134 | } | 151 | } |
@@ -180,5 +197,10 @@ | |||
180 | clearInterval(timer) | 197 | clearInterval(timer) |
181 | } | 198 | } |
182 | 199 | ||
200 | poolRequests.forceSend = function () { | ||
201 | logger.info('Force pool requests sending.') | ||
202 | makePoolRequests() | ||
203 | } | ||
204 | |||
183 | module.exports = poolRequests | 205 | module.exports = poolRequests |
184 | })() | 206 | })() |
diff --git a/src/utils.js b/src/utils.js index 5880c6c90..176648a31 100644 --- a/src/utils.js +++ b/src/utils.js | |||
@@ -56,7 +56,7 @@ | |||
56 | utils.makeMultipleRetryRequest = function (all_data, pods, callbackEach, callback) { | 56 | utils.makeMultipleRetryRequest = function (all_data, pods, callbackEach, callback) { |
57 | if (!callback) { | 57 | if (!callback) { |
58 | callback = callbackEach | 58 | callback = callbackEach |
59 | callbackEach = function () {} | 59 | callbackEach = null |
60 | } | 60 | } |
61 | 61 | ||
62 | var url = http + '://' + host + ':' + port | 62 | var url = http + '://' + host + ':' + port |
@@ -71,9 +71,13 @@ | |||
71 | // Make a request for each pod | 71 | // Make a request for each pod |
72 | async.each(pods, function (pod, callback_each_async) { | 72 | async.each(pods, function (pod, callback_each_async) { |
73 | function callbackEachRetryRequest (err, response, body, url, pod) { | 73 | function callbackEachRetryRequest (err, response, body, url, pod) { |
74 | callbackEach(err, response, body, url, pod, function () { | 74 | if (callbackEach !== null) { |
75 | callbackEach(err, response, body, url, pod, function () { | ||
76 | callback_each_async() | ||
77 | }) | ||
78 | } else { | ||
75 | callback_each_async() | 79 | callback_each_async() |
76 | }) | 80 | } |
77 | } | 81 | } |
78 | 82 | ||
79 | var params = { | 83 | var params = { |
diff --git a/src/videos.js b/src/videos.js index 32f26abe7..90821fdf6 100644 --- a/src/videos.js +++ b/src/videos.js | |||
@@ -43,6 +43,18 @@ | |||
43 | }) | 43 | }) |
44 | } | 44 | } |
45 | 45 | ||
46 | videos.listOwned = function (callback) { | ||
47 | // If namePath is not null this is *our* video | ||
48 | VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) { | ||
49 | if (err) { | ||
50 | logger.error('Cannot get list of the videos.', { error: err }) | ||
51 | return callback(err) | ||
52 | } | ||
53 | |||
54 | return callback(null, videos_list) | ||
55 | }) | ||
56 | } | ||
57 | |||
46 | videos.add = function (data, callback) { | 58 | videos.add = function (data, callback) { |
47 | var video_file = data.video | 59 | var video_file = data.video |
48 | var video_data = data.data | 60 | var video_data = data.data |
@@ -131,6 +143,8 @@ | |||
131 | 143 | ||
132 | // Use the magnet Uri because the _id field is not the same on different servers | 144 | // Use the magnet Uri because the _id field is not the same on different servers |
133 | videos.removeRemotes = function (fromUrl, magnetUris, callback) { | 145 | videos.removeRemotes = function (fromUrl, magnetUris, callback) { |
146 | if (callback === undefined) callback = function () {} | ||
147 | |||
134 | VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { | 148 | VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { |
135 | if (err || !videos) { | 149 | if (err || !videos) { |
136 | logger.error('Cannot find the torrent URI of these remote videos.') | 150 | logger.error('Cannot find the torrent URI of these remote videos.') |
@@ -155,14 +169,34 @@ | |||
155 | return callback(err) | 169 | return callback(err) |
156 | } | 170 | } |
157 | 171 | ||
172 | logger.info('Removed remote videos from %s.', fromUrl) | ||
158 | callback(null) | 173 | callback(null) |
159 | }) | 174 | }) |
160 | }) | 175 | }) |
161 | }) | 176 | }) |
162 | } | 177 | } |
163 | 178 | ||
179 | videos.removeAllRemotes = function (callback) { | ||
180 | VideosDB.remove({ namePath: null }, function (err) { | ||
181 | if (err) return callback(err) | ||
182 | |||
183 | callback(null) | ||
184 | }) | ||
185 | } | ||
186 | |||
187 | videos.removeAllRemotesOf = function (fromUrl, callback) { | ||
188 | VideosDB.remove({ podUrl: fromUrl }, function (err) { | ||
189 | if (err) return callback(err) | ||
190 | |||
191 | callback(null) | ||
192 | }) | ||
193 | } | ||
194 | |||
164 | // { name, magnetUri, podUrl } | 195 | // { name, magnetUri, podUrl } |
196 | // TODO: avoid doublons | ||
165 | videos.addRemotes = function (videos, callback) { | 197 | videos.addRemotes = function (videos, callback) { |
198 | if (callback === undefined) callback = function () {} | ||
199 | |||
166 | var to_add = [] | 200 | var to_add = [] |
167 | 201 | ||
168 | async.each(videos, function (video, callback_each) { | 202 | async.each(videos, function (video, callback_each) { |