aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-06-18 16:13:54 +0200
committerChocobozzz <florian.bigard@gmail.com>2016-06-18 16:13:54 +0200
commit528a9efa8272532bbd0dafc35c3e05e57c50f61e (patch)
tree62d4417df4ab9b2e53c44dc7271be81b88e4e0e5 /server/lib
parentb2e4c0ba1a33b8a50491a1f8d111468a7da5640f (diff)
downloadPeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.gz
PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.zst
PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.zip
Try to make a better communication (between pods) module
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/friends.js158
-rw-r--r--server/lib/requestsScheduler.js200
-rw-r--r--server/lib/videos.js27
3 files changed, 214 insertions, 171 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js
index e986fa006..d81a603ad 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -24,15 +24,15 @@ const pods = {
24 getMyCertificate: getMyCertificate, 24 getMyCertificate: getMyCertificate,
25 makeFriends: makeFriends, 25 makeFriends: makeFriends,
26 quitFriends: quitFriends, 26 quitFriends: quitFriends,
27 removeVideoToFriends: removeVideoToFriends 27 removeVideoToFriends: removeVideoToFriends,
28 sendOwnedVideosToPod: sendOwnedVideosToPod
28} 29}
29 30
30function addVideoToFriends (video) { 31function addVideoToFriends (video) {
31 // To avoid duplicates
32 const id = video.name + video.magnetUri
33 // ensure namePath is null 32 // ensure namePath is null
34 video.namePath = null 33 video.namePath = null
35 requestsScheduler.addRequest(id, 'add', video) 34
35 requestsScheduler.addRequest('add', video)
36} 36}
37 37
38function hasFriends (callback) { 38function hasFriends (callback) {
@@ -60,7 +60,7 @@ function makeFriends (callback) {
60 60
61 const urls = config.get('network.friends') 61 const urls = config.get('network.friends')
62 62
63 async.each(urls, function (url, callbackEach) { 63 async.eachSeries(urls, function (url, callbackEach) {
64 computeForeignPodsList(url, podsScore, callbackEach) 64 computeForeignPodsList(url, podsScore, callbackEach)
65 }, function (err) { 65 }, function (err) {
66 if (err) return callback(err) 66 if (err) return callback(err)
@@ -78,7 +78,7 @@ function quitFriends (callback) {
78 // Stop pool requests 78 // Stop pool requests
79 requestsScheduler.deactivate() 79 requestsScheduler.deactivate()
80 // Flush pool requests 80 // Flush pool requests
81 requestsScheduler.forceSend() 81 requestsScheduler.flush()
82 82
83 async.waterfall([ 83 async.waterfall([
84 function getPodsList (callbackAsync) { 84 function getPodsList (callbackAsync) {
@@ -86,19 +86,25 @@ function quitFriends (callback) {
86 }, 86 },
87 87
88 function announceIQuitMyFriends (pods, callbackAsync) { 88 function announceIQuitMyFriends (pods, callbackAsync) {
89 const request = { 89 const requestParams = {
90 method: 'POST', 90 method: 'POST',
91 path: '/api/' + constants.API_VERSION + '/pods/remove', 91 path: '/api/' + constants.API_VERSION + '/pods/remove',
92 sign: true, 92 sign: true
93 encrypt: true,
94 data: {
95 url: 'me' // Fake data
96 }
97 } 93 }
98 94
99 // Announce we quit them 95 // Announce we quit them
100 requests.makeMultipleRetryRequest(request, pods, function (err) { 96 // We don't care if the request fails
101 return callbackAsync(err) 97 // The other pod will exclude us automatically after a while
98 async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
99 requestParams.toPod = pod
100 requests.makeSecureRequest(requestParams, callbackEach)
101 }, function (err) {
102 if (err) {
103 logger.error('Some errors while quitting friends.', { err: err })
104 // Don't stop the process
105 }
106
107 return callbackAsync()
102 }) 108 })
103 }, 109 },
104 110
@@ -136,9 +142,28 @@ function quitFriends (callback) {
136} 142}
137 143
138function removeVideoToFriends (video) { 144function removeVideoToFriends (video) {
139 // To avoid duplicates 145 requestsScheduler.addRequest('remove', video)
140 const id = video.name + video.magnetUri 146}
141 requestsScheduler.addRequest(id, 'remove', video) 147
148function sendOwnedVideosToPod (podId) {
149 Videos.listOwned(function (err, videosList) {
150 if (err) {
151 logger.error('Cannot get the list of videos we own.')
152 return
153 }
154
155 videosList.forEach(function (video) {
156 videos.convertVideoToRemote(video, function (err, remoteVideo) {
157 if (err) {
158 logger.error('Cannot convert video to remote.', { error: err })
159 // Don't break the process
160 return
161 }
162
163 requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo)
164 })
165 })
166 })
142} 167}
143 168
144// --------------------------------------------------------------------------- 169// ---------------------------------------------------------------------------
@@ -148,18 +173,19 @@ module.exports = pods
148// --------------------------------------------------------------------------- 173// ---------------------------------------------------------------------------
149 174
150function computeForeignPodsList (url, podsScore, callback) { 175function computeForeignPodsList (url, podsScore, callback) {
151 // Let's give 1 point to the pod we ask the friends list
152 podsScore[url] = 1
153
154 getForeignPodsList(url, function (err, foreignPodsList) { 176 getForeignPodsList(url, function (err, foreignPodsList) {
155 if (err) return callback(err) 177 if (err) return callback(err)
156 if (foreignPodsList.length === 0) return callback() 178
179 if (!foreignPodsList) foreignPodsList = []
180
181 // Let's give 1 point to the pod we ask the friends list
182 foreignPodsList.push({ url: url })
157 183
158 foreignPodsList.forEach(function (foreignPod) { 184 foreignPodsList.forEach(function (foreignPod) {
159 const foreignUrl = foreignPod.url 185 const foreignPodUrl = foreignPod.url
160 186
161 if (podsScore[foreignUrl]) podsScore[foreignUrl]++ 187 if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++
162 else podsScore[foreignUrl] = 1 188 else podsScore[foreignPodUrl] = 1
163 }) 189 })
164 190
165 callback() 191 callback()
@@ -194,63 +220,43 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
194 // Flush pool requests 220 // Flush pool requests
195 requestsScheduler.forceSend() 221 requestsScheduler.forceSend()
196 222
197 // Get the list of our videos to send to our new friends 223 async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
198 Videos.listOwned(function (err, videosList) { 224 const params = {
199 if (err) { 225 url: pod.url + '/api/' + constants.API_VERSION + '/pods/',
200 logger.error('Cannot get the list of videos we own.') 226 method: 'POST',
201 return callback(err) 227 json: {
202 } 228 url: http + '://' + host + ':' + port,
203 229 publicKey: cert
204 const data = { 230 }
205 url: http + '://' + host + ':' + port,
206 publicKey: cert,
207 videos: videosList
208 } 231 }
209 232
210 requests.makeMultipleRetryRequest( 233 requests.makeRetryRequest(params, function (err, res, body) {
211 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, 234 if (err) {
212 235 logger.error('Error with adding %s pod.', pod.url, { error: err })
213 podsList, 236 // Don't break the process
214 237 return callbackEach()
215 // Callback called after each request 238 }
216 function eachRequest (err, response, body, url, pod, callbackEachRequest) {
217 // We add the pod if it responded correctly with its public certificate
218 if (!err && response.statusCode === 200) {
219 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
220 if (err) {
221 logger.error('Error with adding %s pod.', pod.url, { error: err })
222 return callbackEachRequest()
223 }
224
225 videos.createRemoteVideos(body.videos, function (err) {
226 if (err) {
227 logger.error('Error with adding videos of pod.', pod.url, { error: err })
228 return callbackEachRequest()
229 }
230
231 logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
232 return callbackEachRequest()
233 })
234 })
235 } else {
236 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
237 return callbackEachRequest()
238 }
239 },
240 239
241 // Final callback, we've ended all the requests 240 if (res.statusCode === 200) {
242 function endRequests (err) { 241 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) {
243 // Now we made new friends, we can re activate the pool of requests 242 if (err) logger.error('Cannot add friend %s pod.', pod.url)
244 requestsScheduler.activate()
245 243
246 if (err) { 244 // Add our videos to the request scheduler
247 logger.error('There was some errors when we wanted to make friends.') 245 sendOwnedVideosToPod(podCreated._id)
248 return callback(err)
249 }
250 246
251 logger.debug('makeRequestsToWinningPods finished.') 247 return callbackEach()
252 return callback(null) 248 })
249 } else {
250 logger.error('Status not 200 for %s pod.', pod.url)
251 return callbackEach()
253 } 252 }
254 ) 253 })
254 }, function endRequests () {
255 // Final callback, we've ended all the requests
256 // Now we made new friends, we can re activate the pool of requests
257 requestsScheduler.activate()
258
259 logger.debug('makeRequestsToWinningPods finished.')
260 return callback()
255 }) 261 })
256} 262}
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js
index 78570209d..ac75e5b93 100644
--- a/server/lib/requestsScheduler.js
+++ b/server/lib/requestsScheduler.js
@@ -11,13 +11,14 @@ const requests = require('../helpers/requests')
11const videos = require('../lib/videos') 11const videos = require('../lib/videos')
12const Videos = require('../models/videos') 12const Videos = require('../models/videos')
13 13
14const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
15let timer = null 14let timer = null
16 15
17const requestsScheduler = { 16const requestsScheduler = {
18 activate: activate, 17 activate: activate,
19 addRequest: addRequest, 18 addRequest: addRequest,
19 addRequestTo: addRequestTo,
20 deactivate: deactivate, 20 deactivate: deactivate,
21 flush: flush,
21 forceSend: forceSend 22 forceSend: forceSend
22} 23}
23 24
@@ -27,35 +28,37 @@ function activate () {
27} 28}
28 29
29// Add request to the scheduler 30// Add request to the scheduler
30function addRequest (id, type, request) { 31function addRequest (type, data) {
31 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) 32 logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
32 33
33 Requests.findById(id, function (err, entity) { 34 const request = {
35 type: type,
36 data: data
37 }
38
39 Pods.listAllIds(function (err, podIds) {
34 if (err) { 40 if (err) {
35 logger.error('Error when trying to find a request.', { error: err }) 41 logger.debug('Cannot list pod ids.')
36 return // Abort 42 return
37 } 43 }
38 44
39 // If there were already a request with this id in the scheduler... 45 // No friends
40 if (entity) { 46 if (!podIds) return
41 if (entity.type === type) {
42 logger.error('Cannot insert two same requests.')
43 return // Abort
44 }
45 47
46 // Remove the request of the other type 48 Requests.create(request, podIds, function (err) {
47 Requests.removeRequestById(id, function (err) { 49 if (err) logger.error('Cannot create a request.', { error: err })
48 if (err) { 50 })
49 logger.error('Cannot remove a request.', { error: err }) 51 })
50 return // Abort 52}
51 } 53
52 }) 54function addRequestTo (podIds, type, data) {
53 } else { 55 const request = {
54 Requests.create(id, type, request, function (err) { 56 type: type,
55 if (err) logger.error('Cannot create a request.', { error: err }) 57 data: data
56 return // Abort 58 }
57 }) 59
58 } 60 Requests.create(request, podIds, function (err) {
61 if (err) logger.error('Cannot create a request.', { error: err })
59 }) 62 })
60} 63}
61 64
@@ -64,6 +67,14 @@ function deactivate () {
64 clearInterval(timer) 67 clearInterval(timer)
65} 68}
66 69
70function flush () {
71 Requests.removeAll(function (err) {
72 if (err) {
73 logger.error('Cannot flush the requests.', { error: err })
74 }
75 })
76}
77
67function forceSend () { 78function forceSend () {
68 logger.info('Force requests scheduler sending.') 79 logger.info('Force requests scheduler sending.')
69 makeRequests() 80 makeRequests()
@@ -76,54 +87,28 @@ module.exports = requestsScheduler
76// --------------------------------------------------------------------------- 87// ---------------------------------------------------------------------------
77 88
78// Make a requests to friends of a certain type 89// Make a requests to friends of a certain type
79function makeRequest (type, requestsToMake, callback) { 90function makeRequest (toPod, requestsToMake, callback) {
80 if (!callback) callback = function () {} 91 if (!callback) callback = function () {}
81 92
82 Pods.list(function (err, pods) { 93 const params = {
83 if (err) return callback(err) 94 toPod: toPod,
84 95 encrypt: true, // Security
85 const params = { 96 sign: true, // To prove our identity
86 encrypt: true, // Security 97 method: 'POST',
87 sign: true, // To prove our identity 98 path: '/api/' + constants.API_VERSION + '/remote/videos',
88 method: 'POST', 99 data: requestsToMake // Requests we need to make
89 path: null, // We build the path later 100 }
90 data: requestsToMake // Requests we need to make 101
91 } 102 // Make multiple retry requests to all of pods
92 103 // The function fire some useful callbacks
93 // If this is a valid type, we build the path 104 requests.makeSecureRequest(params, function (err, res) {
94 if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { 105 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
95 params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type 106 logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
96 } else { 107
97 return callback(new Error('Unkown pool request type.')) 108 return callback(false)
98 }
99
100 const badPods = []
101 const goodPods = []
102
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
105 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
106
107 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
108 // We failed the request, add the pod unreachable to the bad pods list
109 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
110 badPods.push(pod._id)
111 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
112 } else {
113 // Request success
114 goodPods.push(pod._id)
115 }
116
117 return callbackEachPodFinished()
118 } 109 }
119 110
120 function callbackAllPodsFinished (err) { 111 return callback(true)
121 if (err) return callback(err)
122
123 // All the requests were made, we update the pods score
124 updatePodsScore(goodPods, badPods)
125 callback(null)
126 }
127 }) 112 })
128} 113}
129 114
@@ -143,38 +128,65 @@ function makeRequests () {
143 128
144 logger.info('Making requests to friends.') 129 logger.info('Making requests to friends.')
145 130
131 // Requests by pods id
146 const requestsToMake = {} 132 const requestsToMake = {}
147 for (const type of REQUEST_SCHEDULER_TYPE) {
148 requestsToMake[type] = {
149 ids: [],
150 requests: []
151 }
152 }
153 133
154 // For each requests to make, we add it to the correct request type
155 requests.forEach(function (poolRequest) { 134 requests.forEach(function (poolRequest) {
156 if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { 135 poolRequest.to.forEach(function (toPodId) {
157 const requestTypeToMake = requestsToMake[poolRequest.type] 136 if (!requestsToMake[toPodId]) {
158 requestTypeToMake.requests.push(poolRequest.request) 137 requestsToMake[toPodId] = {
159 requestTypeToMake.ids.push(poolRequest._id) 138 ids: [],
160 } else { 139 datas: []
161 logger.error('Unkown request type.', { request_type: poolRequest.type }) 140 }
162 return // abort 141 }
163 } 142
143 requestsToMake[toPodId].ids.push(poolRequest._id)
144 requestsToMake[toPodId].datas.push(poolRequest.request)
145 })
164 }) 146 })
165 147
166 for (let type of Object.keys(requestsToMake)) { 148 const goodPods = []
167 const requestTypeToMake = requestsToMake[type] 149 const badPods = []
168 // If there are requests for this type
169 if (requestTypeToMake.requests.length !== 0) {
170 makeRequest(type, requestTypeToMake.requests, function (err) {
171 if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
172 150
173 // We made the requests, so we can remove them from the scheduler 151 async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
174 Requests.removeRequests(requestTypeToMake.ids) 152 const requestToMake = requestsToMake[toPodId]
153
154 // FIXME: mongodb request inside a loop :/
155 Pods.findById(toPodId, function (err, toPod) {
156 if (err) return logger.error('Error finding pod by id.', { err: err })
157
158 // Maybe the pod is not our friend anymore so simply remove them
159 if (!toPod) {
160 Requests.removePodOf(requestToMake.ids, toPodId)
161 return callbackEach()
162 }
163
164 makeRequest(toPod, requestToMake.datas, function (success) {
165 if (err) {
166 logger.error('Errors when sent request to %s.', toPod.url, { error: err })
167 // Do not stop the process just for one error
168 return callbackEach()
169 }
170
171 if (success === true) {
172 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
173
174 // Remove the pod id of these request ids
175 Requests.removePodOf(requestToMake.ids, toPodId)
176 goodPods.push(toPodId)
177 } else {
178 badPods.push(toPodId)
179 }
180
181 callbackEach()
175 }) 182 })
176 } 183 })
177 } 184 }, function () {
185 // All the requests were made, we update the pods score
186 updatePodsScore(goodPods, badPods)
187 // Flush requests with no pod
188 Requests.removeWithEmptyTo()
189 })
178 }) 190 })
179} 191}
180 192
diff --git a/server/lib/videos.js b/server/lib/videos.js
index e0db0e1d5..a74c77dc4 100644
--- a/server/lib/videos.js
+++ b/server/lib/videos.js
@@ -17,6 +17,7 @@ const uploadDir = pathUtils.join(__dirname, '..', '..', config.get('storage.uplo
17const thumbnailsDir = pathUtils.join(__dirname, '..', '..', config.get('storage.thumbnails')) 17const thumbnailsDir = pathUtils.join(__dirname, '..', '..', config.get('storage.thumbnails'))
18 18
19const videos = { 19const videos = {
20 convertVideoToRemote: convertVideoToRemote,
20 createRemoteVideos: createRemoteVideos, 21 createRemoteVideos: createRemoteVideos,
21 getVideoDuration: getVideoDuration, 22 getVideoDuration: getVideoDuration,
22 getVideoState: getVideoState, 23 getVideoState: getVideoState,
@@ -27,6 +28,29 @@ const videos = {
27 seedAllExisting: seedAllExisting 28 seedAllExisting: seedAllExisting
28} 29}
29 30
31function convertVideoToRemote (video, callback) {
32 fs.readFile(thumbnailsDir + video.thumbnail, function (err, thumbnailData) {
33 if (err) {
34 logger.error('Cannot read the thumbnail of the video')
35 return callback(err)
36 }
37
38 const remoteVideo = {
39 name: video.name,
40 description: video.description,
41 magnetUri: video.magnetUri,
42 author: video.author,
43 duration: video.duration,
44 thumbnailBase64: new Buffer(thumbnailData).toString('base64'),
45 tags: video.tags,
46 createdDate: video.createdDate,
47 podUrl: video.podUrl
48 }
49
50 return callback(null, remoteVideo)
51 })
52}
53
30function createRemoteVideos (videos, callback) { 54function createRemoteVideos (videos, callback) {
31 // Create the remote videos from the new pod 55 // Create the remote videos from the new pod
32 createRemoteVideoObjects(videos, function (err, remoteVideos) { 56 createRemoteVideoObjects(videos, function (err, remoteVideos) {
@@ -154,7 +178,8 @@ function createRemoteVideoObjects (videos, callback) {
154 podUrl: video.podUrl, 178 podUrl: video.podUrl,
155 duration: video.duration, 179 duration: video.duration,
156 thumbnail: thumbnailName, 180 thumbnail: thumbnailName,
157 tags: video.tags 181 tags: video.tags,
182 author: video.author
158 } 183 }
159 remoteVideos.push(params) 184 remoteVideos.push(params)
160 185