aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-01-23 18:31:58 +0100
committerChocobozzz <florian.bigard@gmail.com>2016-01-23 18:31:58 +0100
commit45239549bf2659998dcf9196d86974b0b625912e (patch)
tree823d324db097400a7b5ae59a03deff54c5fd86ef /src
parent2cc8ebf134b66047cd639ee7324e1ecfd5c5fd18 (diff)
downloadPeerTube-45239549bf2659998dcf9196d86974b0b625912e.tar.gz
PeerTube-45239549bf2659998dcf9196d86974b0b625912e.tar.zst
PeerTube-45239549bf2659998dcf9196d86974b0b625912e.zip
Finalise the join in a network and add the ability to quit it
Diffstat (limited to 'src')
-rw-r--r--src/pods.js152
-rw-r--r--src/poolRequests.js32
-rw-r--r--src/utils.js10
-rw-r--r--src/videos.js34
4 files changed, 191 insertions, 37 deletions
diff --git a/src/pods.js b/src/pods.js
index 8da216a55..defa9b1c1 100644
--- a/src/pods.js
+++ b/src/pods.js
@@ -43,7 +43,9 @@
43 } 43 }
44 44
45 // { url } 45 // { url }
46 // TODO: check if the pod is not already a friend
46 pods.add = function (data, callback) { 47 pods.add = function (data, callback) {
48 var videos = require('./videos')
47 logger.info('Adding pod: %s', data.url) 49 logger.info('Adding pod: %s', data.url)
48 50
49 var params = { 51 var params = {
@@ -58,13 +60,38 @@
58 return callback(err) 60 return callback(err)
59 } 61 }
60 62
63 videos.addRemotes(data.videos)
64
61 fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { 65 fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) {
62 if (err) { 66 if (err) {
63 logger.error('Cannot read cert file.', { error: err }) 67 logger.error('Cannot read cert file.', { error: err })
64 return callback(err) 68 return callback(err)
65 } 69 }
66 70
67 return callback(null, { cert: cert }) 71 videos.listOwned(function (err, videos_list) {
72 if (err) {
73 logger.error('Cannot get the list of owned videos.', { error: err })
74 return callback(err)
75 }
76
77 return callback(null, { cert: cert, videos: videos_list })
78 })
79 })
80 })
81 }
82
83 pods.remove = function (url, callback) {
84 var videos = require('./videos')
85 logger.info('Removing %s pod.', url)
86
87 videos.removeAllRemotesOf(url, function (err) {
88 if (err) logger.error('Cannot remove all remote videos of %s.', url)
89
90 PodsDB.remove({ url: url }, function (err) {
91 if (err) return callback(err)
92
93 logger.info('%s pod removed.', url)
94 callback(null)
68 }) 95 })
69 }) 96 })
70 } 97 }
@@ -82,6 +109,7 @@
82 } 109 }
83 110
84 pods.makeFriends = function (callback) { 111 pods.makeFriends = function (callback) {
112 var videos = require('./videos')
85 var pods_score = {} 113 var pods_score = {}
86 114
87 logger.info('Make friends!') 115 logger.info('Make friends!')
@@ -137,43 +165,109 @@
137 } 165 }
138 166
139 function makeRequestsToWinningPods (cert, pods_list) { 167 function makeRequestsToWinningPods (cert, pods_list) {
140 var data = { 168 // Stop pool requests
141 url: http + '://' + host + ':' + port, 169 poolRequests.deactivate()
142 publicKey: cert 170 // Flush pool requests
143 } 171 poolRequests.forceSend()
172
173 // Get the list of our videos to send to our new friends
174 videos.listOwned(function (err, videos_list) {
175 if (err) throw err
176
177 var data = {
178 url: http + '://' + host + ':' + port,
179 publicKey: cert,
180 videos: videos_list
181 }
144 182
145 utils.makeMultipleRetryRequest( 183 utils.makeMultipleRetryRequest(
146 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, 184 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
147 185
148 pods_list, 186 pods_list,
149 187
150 function eachRequest (err, response, body, url, pod, callback_each_request) { 188 function eachRequest (err, response, body, url, pod, callback_each_request) {
151 // We add the pod if it responded correctly with its public certificate 189 // We add the pod if it responded correctly with its public certificate
152 if (!err && response.statusCode === 200) { 190 if (!err && response.statusCode === 200) {
153 pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { 191 pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
154 if (err) { 192 if (err) logger.error('Error with adding %s pod.', pod.url, { error: err })
155 logger.error('Error with adding %s pod.', pod.url, { error: err })
156 }
157 193
194 videos.addRemotes(body.videos, function (err) {
195 if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err })
196
197 logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
198 return callback_each_request()
199 })
200 })
201 } else {
202 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
158 return callback_each_request() 203 return callback_each_request()
159 }) 204 }
160 } else { 205 },
161 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
162 return callback_each_request()
163 }
164 },
165 206
166 function endRequests (err) { 207 function endRequests (err) {
167 if (err) { 208 // Now we made new friends, we can re activate the pool of requests
168 logger.error('There was some errors when we wanted to make friends.', { error: err }) 209 poolRequests.activate()
169 return callback(err) 210
211 if (err) {
212 logger.error('There was some errors when we wanted to make friends.', { error: err })
213 return callback(err)
214 }
215
216 logger.debug('makeRequestsToWinningPods finished.')
217 return callback(null)
170 } 218 }
219 )
220 })
221 }
222 }
171 223
172 logger.debug('makeRequestsToWinningPods finished.') 224 pods.quitFriends = function (callback) {
173 return callback(null) 225 // Stop pool requests
226 poolRequests.deactivate()
227 // Flush pool requests
228 poolRequests.forceSend()
229
230 PodsDB.find(function (err, pods) {
231 if (err) return callback(err)
232
233 var request = {
234 method: 'POST',
235 path: '/api/' + constants.API_VERSION + '/pods/remove',
236 sign: true,
237 encrypt: true,
238 data: {
239 url: 'me' // Fake data
174 } 240 }
175 ) 241 }
176 } 242
243 // Announce we quit them
244 utils.makeMultipleRetryRequest(request, pods, function () {
245 PodsDB.remove(function (err) {
246 poolRequests.activate()
247
248 if (err) return callback(err)
249
250 logger.info('Broke friends, so sad :(')
251
252 var videos = require('./videos')
253 videos.removeAllRemotes(function (err) {
254 if (err) return callback(err)
255
256 logger.info('Removed all remote videos.')
257 callback(null)
258 })
259 })
260 })
261 })
262 }
263
264 pods.hasFriends = function (callback) {
265 PodsDB.count(function (err, count) {
266 if (err) return callback(err)
267
268 var has_friends = (count !== 0)
269 callback(null, has_friends)
270 })
177 } 271 }
178 272
179 module.exports = pods 273 module.exports = pods
diff --git a/src/poolRequests.js b/src/poolRequests.js
index edb12b1e8..7f422f372 100644
--- a/src/poolRequests.js
+++ b/src/poolRequests.js
@@ -6,9 +6,11 @@
6 var constants = require('./constants') 6 var constants = require('./constants')
7 var logger = require('./logger') 7 var logger = require('./logger')
8 var database = require('./database') 8 var database = require('./database')
9 var pluck = require('lodash-node/compat/collection/pluck')
9 var PoolRequestsDB = database.PoolRequestsDB 10 var PoolRequestsDB = database.PoolRequestsDB
10 var PodsDB = database.PodsDB 11 var PodsDB = database.PodsDB
11 var utils = require('./utils') 12 var utils = require('./utils')
13 var VideosDB = database.VideosDB
12 14
13 var poolRequests = {} 15 var poolRequests = {}
14 16
@@ -90,11 +92,26 @@
90 } 92 }
91 93
92 function removeBadPods () { 94 function removeBadPods () {
93 PodsDB.remove({ score: 0 }, function (err, result) { 95 PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) {
94 if (err) throw err 96 if (err) throw err
95 97
96 var number_removed = result.result.n 98 if (pods.length === 0) return
97 if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) 99
100 var urls = pluck(pods, 'url')
101 var ids = pluck(pods, '_id')
102
103 VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) {
104 if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err })
105 var videos_removed = r.result.n
106 logger.info('Removed %d videos.', videos_removed)
107
108 PodsDB.remove({ _id: { $in: ids } }, function (err, r) {
109 if (err) logger.error('Cannot remove bad pods.', { error: err })
110
111 var pods_removed = r.result.n
112 logger.info('Removed %d pods.', pods_removed)
113 })
114 })
98 }) 115 })
99 } 116 }
100 117
@@ -126,9 +143,9 @@
126 utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) 143 utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
127 144
128 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { 145 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
129 if (err || response.statusCode !== 200) { 146 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
130 bad_pods.push(pod._id) 147 bad_pods.push(pod._id)
131 logger.error('Error sending secure request to %s pod.', url, { error: err }) 148 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
132 } else { 149 } else {
133 good_pods.push(pod._id) 150 good_pods.push(pod._id)
134 } 151 }
@@ -180,5 +197,10 @@
180 clearInterval(timer) 197 clearInterval(timer)
181 } 198 }
182 199
200 poolRequests.forceSend = function () {
201 logger.info('Force pool requests sending.')
202 makePoolRequests()
203 }
204
183 module.exports = poolRequests 205 module.exports = poolRequests
184})() 206})()
diff --git a/src/utils.js b/src/utils.js
index 5880c6c90..176648a31 100644
--- a/src/utils.js
+++ b/src/utils.js
@@ -56,7 +56,7 @@
56 utils.makeMultipleRetryRequest = function (all_data, pods, callbackEach, callback) { 56 utils.makeMultipleRetryRequest = function (all_data, pods, callbackEach, callback) {
57 if (!callback) { 57 if (!callback) {
58 callback = callbackEach 58 callback = callbackEach
59 callbackEach = function () {} 59 callbackEach = null
60 } 60 }
61 61
62 var url = http + '://' + host + ':' + port 62 var url = http + '://' + host + ':' + port
@@ -71,9 +71,13 @@
71 // Make a request for each pod 71 // Make a request for each pod
72 async.each(pods, function (pod, callback_each_async) { 72 async.each(pods, function (pod, callback_each_async) {
73 function callbackEachRetryRequest (err, response, body, url, pod) { 73 function callbackEachRetryRequest (err, response, body, url, pod) {
74 callbackEach(err, response, body, url, pod, function () { 74 if (callbackEach !== null) {
75 callbackEach(err, response, body, url, pod, function () {
76 callback_each_async()
77 })
78 } else {
75 callback_each_async() 79 callback_each_async()
76 }) 80 }
77 } 81 }
78 82
79 var params = { 83 var params = {
diff --git a/src/videos.js b/src/videos.js
index 32f26abe7..90821fdf6 100644
--- a/src/videos.js
+++ b/src/videos.js
@@ -43,6 +43,18 @@
43 }) 43 })
44 } 44 }
45 45
46 videos.listOwned = function (callback) {
47 // If namePath is not null this is *our* video
48 VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) {
49 if (err) {
50 logger.error('Cannot get list of the videos.', { error: err })
51 return callback(err)
52 }
53
54 return callback(null, videos_list)
55 })
56 }
57
46 videos.add = function (data, callback) { 58 videos.add = function (data, callback) {
47 var video_file = data.video 59 var video_file = data.video
48 var video_data = data.data 60 var video_data = data.data
@@ -131,6 +143,8 @@
131 143
132 // Use the magnet Uri because the _id field is not the same on different servers 144 // Use the magnet Uri because the _id field is not the same on different servers
133 videos.removeRemotes = function (fromUrl, magnetUris, callback) { 145 videos.removeRemotes = function (fromUrl, magnetUris, callback) {
146 if (callback === undefined) callback = function () {}
147
134 VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { 148 VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) {
135 if (err || !videos) { 149 if (err || !videos) {
136 logger.error('Cannot find the torrent URI of these remote videos.') 150 logger.error('Cannot find the torrent URI of these remote videos.')
@@ -155,14 +169,34 @@
155 return callback(err) 169 return callback(err)
156 } 170 }
157 171
172 logger.info('Removed remote videos from %s.', fromUrl)
158 callback(null) 173 callback(null)
159 }) 174 })
160 }) 175 })
161 }) 176 })
162 } 177 }
163 178
179 videos.removeAllRemotes = function (callback) {
180 VideosDB.remove({ namePath: null }, function (err) {
181 if (err) return callback(err)
182
183 callback(null)
184 })
185 }
186
187 videos.removeAllRemotesOf = function (fromUrl, callback) {
188 VideosDB.remove({ podUrl: fromUrl }, function (err) {
189 if (err) return callback(err)
190
191 callback(null)
192 })
193 }
194
164 // { name, magnetUri, podUrl } 195 // { name, magnetUri, podUrl }
196 // TODO: avoid doublons
165 videos.addRemotes = function (videos, callback) { 197 videos.addRemotes = function (videos, callback) {
198 if (callback === undefined) callback = function () {}
199
166 var to_add = [] 200 var to_add = []
167 201
168 async.each(videos, function (video, callback_each) { 202 async.each(videos, function (video, callback_each) {