diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/friends.js | 42 | ||||
-rw-r--r-- | server/lib/requestsScheduler.js | 270 |
2 files changed, 28 insertions, 284 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js index 91cd69f86..617cc1ab4 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -10,12 +10,12 @@ const constants = require('../initializers/constants') | |||
10 | const logger = require('../helpers/logger') | 10 | const logger = require('../helpers/logger') |
11 | const peertubeCrypto = require('../helpers/peertubeCrypto') | 11 | const peertubeCrypto = require('../helpers/peertubeCrypto') |
12 | const Pods = require('../models/pods') | 12 | const Pods = require('../models/pods') |
13 | const requestsScheduler = require('../lib/requestsScheduler') | ||
14 | const requests = require('../helpers/requests') | 13 | const requests = require('../helpers/requests') |
15 | 14 | ||
16 | const http = config.get('webserver.https') ? 'https' : 'http' | 15 | const http = config.get('webserver.https') ? 'https' : 'http' |
17 | const host = config.get('webserver.host') | 16 | const host = config.get('webserver.host') |
18 | const port = config.get('webserver.port') | 17 | const port = config.get('webserver.port') |
18 | const Request = mongoose.model('Request') | ||
19 | const Video = mongoose.model('Video') | 19 | const Video = mongoose.model('Video') |
20 | 20 | ||
21 | const pods = { | 21 | const pods = { |
@@ -29,10 +29,7 @@ const pods = { | |||
29 | } | 29 | } |
30 | 30 | ||
31 | function addVideoToFriends (video) { | 31 | function addVideoToFriends (video) { |
32 | // ensure namePath is null | 32 | createRequest('add', video) |
33 | video.namePath = null | ||
34 | |||
35 | requestsScheduler.addRequest('add', video) | ||
36 | } | 33 | } |
37 | 34 | ||
38 | function hasFriends (callback) { | 35 | function hasFriends (callback) { |
@@ -76,9 +73,9 @@ function makeFriends (callback) { | |||
76 | 73 | ||
77 | function quitFriends (callback) { | 74 | function quitFriends (callback) { |
78 | // Stop pool requests | 75 | // Stop pool requests |
79 | requestsScheduler.deactivate() | 76 | Request.deactivate() |
80 | // Flush pool requests | 77 | // Flush pool requests |
81 | requestsScheduler.flush() | 78 | Request.flush() |
82 | 79 | ||
83 | async.waterfall([ | 80 | async.waterfall([ |
84 | function getPodsList (callbackAsync) { | 81 | function getPodsList (callbackAsync) { |
@@ -127,7 +124,7 @@ function quitFriends (callback) { | |||
127 | } | 124 | } |
128 | ], function (err) { | 125 | ], function (err) { |
129 | // Don't forget to re activate the scheduler, even if there was an error | 126 | // Don't forget to re activate the scheduler, even if there was an error |
130 | requestsScheduler.activate() | 127 | Request.activate() |
131 | 128 | ||
132 | if (err) return callback(err) | 129 | if (err) return callback(err) |
133 | 130 | ||
@@ -136,8 +133,8 @@ function quitFriends (callback) { | |||
136 | }) | 133 | }) |
137 | } | 134 | } |
138 | 135 | ||
139 | function removeVideoToFriends (video) { | 136 | function removeVideoToFriends (videoParams) { |
140 | requestsScheduler.addRequest('remove', video) | 137 | createRequest('remove', videoParams) |
141 | } | 138 | } |
142 | 139 | ||
143 | function sendOwnedVideosToPod (podId) { | 140 | function sendOwnedVideosToPod (podId) { |
@@ -155,7 +152,7 @@ function sendOwnedVideosToPod (podId) { | |||
155 | return | 152 | return |
156 | } | 153 | } |
157 | 154 | ||
158 | requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) | 155 | createRequest('add', remoteVideo, [ podId ]) |
159 | }) | 156 | }) |
160 | }) | 157 | }) |
161 | }) | 158 | }) |
@@ -211,9 +208,9 @@ function getForeignPodsList (url, callback) { | |||
211 | 208 | ||
212 | function makeRequestsToWinningPods (cert, podsList, callback) { | 209 | function makeRequestsToWinningPods (cert, podsList, callback) { |
213 | // Stop pool requests | 210 | // Stop pool requests |
214 | requestsScheduler.deactivate() | 211 | Request.deactivate() |
215 | // Flush pool requests | 212 | // Flush pool requests |
216 | requestsScheduler.forceSend() | 213 | Request.forceSend() |
217 | 214 | ||
218 | async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { | 215 | async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { |
219 | const params = { | 216 | const params = { |
@@ -249,9 +246,26 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
249 | }, function endRequests () { | 246 | }, function endRequests () { |
250 | // Final callback, we've ended all the requests | 247 | // Final callback, we've ended all the requests |
251 | // Now we made new friends, we can re activate the pool of requests | 248 | // Now we made new friends, we can re activate the pool of requests |
252 | requestsScheduler.activate() | 249 | Request.activate() |
253 | 250 | ||
254 | logger.debug('makeRequestsToWinningPods finished.') | 251 | logger.debug('makeRequestsToWinningPods finished.') |
255 | return callback() | 252 | return callback() |
256 | }) | 253 | }) |
257 | } | 254 | } |
255 | |||
256 | function createRequest (type, data, to) { | ||
257 | const req = new Request({ | ||
258 | request: { | ||
259 | type: type, | ||
260 | data: data | ||
261 | } | ||
262 | }) | ||
263 | |||
264 | if (to) { | ||
265 | req.to = to | ||
266 | } | ||
267 | |||
268 | req.save(function (err) { | ||
269 | if (err) logger.error('Cannot save the request.', { error: err }) | ||
270 | }) | ||
271 | } | ||
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js deleted file mode 100644 index b192d8299..000000000 --- a/server/lib/requestsScheduler.js +++ /dev/null | |||
@@ -1,270 +0,0 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const async = require('async') | ||
4 | const map = require('lodash/map') | ||
5 | const mongoose = require('mongoose') | ||
6 | |||
7 | const constants = require('../initializers/constants') | ||
8 | const logger = require('../helpers/logger') | ||
9 | const Pods = require('../models/pods') | ||
10 | const Requests = require('../models/requests') | ||
11 | const requests = require('../helpers/requests') | ||
12 | |||
13 | const Video = mongoose.model('Video') | ||
14 | |||
15 | let timer = null | ||
16 | |||
17 | const requestsScheduler = { | ||
18 | activate: activate, | ||
19 | addRequest: addRequest, | ||
20 | addRequestTo: addRequestTo, | ||
21 | deactivate: deactivate, | ||
22 | flush: flush, | ||
23 | forceSend: forceSend | ||
24 | } | ||
25 | |||
26 | function activate () { | ||
27 | logger.info('Requests scheduler activated.') | ||
28 | timer = setInterval(makeRequests, constants.INTERVAL) | ||
29 | } | ||
30 | |||
31 | // Add request to the scheduler | ||
32 | function addRequest (type, data) { | ||
33 | logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) | ||
34 | |||
35 | const request = { | ||
36 | type: type, | ||
37 | data: data | ||
38 | } | ||
39 | |||
40 | Pods.listAllIds(function (err, podIds) { | ||
41 | if (err) { | ||
42 | logger.debug('Cannot list pod ids.') | ||
43 | return | ||
44 | } | ||
45 | |||
46 | // No friends | ||
47 | if (!podIds) return | ||
48 | |||
49 | Requests.create(request, podIds, function (err) { | ||
50 | if (err) logger.error('Cannot create a request.', { error: err }) | ||
51 | }) | ||
52 | }) | ||
53 | } | ||
54 | |||
55 | function addRequestTo (podIds, type, data) { | ||
56 | const request = { | ||
57 | type: type, | ||
58 | data: data | ||
59 | } | ||
60 | |||
61 | Requests.create(request, podIds, function (err) { | ||
62 | if (err) logger.error('Cannot create a request.', { error: err }) | ||
63 | }) | ||
64 | } | ||
65 | |||
66 | function deactivate () { | ||
67 | logger.info('Requests scheduler deactivated.') | ||
68 | clearInterval(timer) | ||
69 | } | ||
70 | |||
71 | function flush () { | ||
72 | Requests.removeAll(function (err) { | ||
73 | if (err) { | ||
74 | logger.error('Cannot flush the requests.', { error: err }) | ||
75 | } | ||
76 | }) | ||
77 | } | ||
78 | |||
79 | function forceSend () { | ||
80 | logger.info('Force requests scheduler sending.') | ||
81 | makeRequests() | ||
82 | } | ||
83 | |||
84 | // --------------------------------------------------------------------------- | ||
85 | |||
86 | module.exports = requestsScheduler | ||
87 | |||
88 | // --------------------------------------------------------------------------- | ||
89 | |||
90 | // Make a requests to friends of a certain type | ||
91 | function makeRequest (toPod, requestsToMake, callback) { | ||
92 | if (!callback) callback = function () {} | ||
93 | |||
94 | const params = { | ||
95 | toPod: toPod, | ||
96 | encrypt: true, // Security | ||
97 | sign: true, // To prove our identity | ||
98 | method: 'POST', | ||
99 | path: '/api/' + constants.API_VERSION + '/remote/videos', | ||
100 | data: requestsToMake // Requests we need to make | ||
101 | } | ||
102 | |||
103 | // Make multiple retry requests to all of pods | ||
104 | // The function fire some useful callbacks | ||
105 | requests.makeSecureRequest(params, function (err, res) { | ||
106 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
107 | logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) | ||
108 | |||
109 | return callback(false) | ||
110 | } | ||
111 | |||
112 | return callback(true) | ||
113 | }) | ||
114 | } | ||
115 | |||
116 | // Make all the requests of the scheduler | ||
117 | function makeRequests () { | ||
118 | Requests.list(function (err, requests) { | ||
119 | if (err) { | ||
120 | logger.error('Cannot get the list of requests.', { err: err }) | ||
121 | return // Abort | ||
122 | } | ||
123 | |||
124 | // If there are no requests, abort | ||
125 | if (requests.length === 0) { | ||
126 | logger.info('No requests to make.') | ||
127 | return | ||
128 | } | ||
129 | |||
130 | logger.info('Making requests to friends.') | ||
131 | |||
132 | // Requests by pods id | ||
133 | const requestsToMake = {} | ||
134 | |||
135 | requests.forEach(function (poolRequest) { | ||
136 | poolRequest.to.forEach(function (toPodId) { | ||
137 | if (!requestsToMake[toPodId]) { | ||
138 | requestsToMake[toPodId] = { | ||
139 | ids: [], | ||
140 | datas: [] | ||
141 | } | ||
142 | } | ||
143 | |||
144 | requestsToMake[toPodId].ids.push(poolRequest._id) | ||
145 | requestsToMake[toPodId].datas.push(poolRequest.request) | ||
146 | }) | ||
147 | }) | ||
148 | |||
149 | const goodPods = [] | ||
150 | const badPods = [] | ||
151 | |||
152 | async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { | ||
153 | const requestToMake = requestsToMake[toPodId] | ||
154 | |||
155 | // FIXME: mongodb request inside a loop :/ | ||
156 | Pods.findById(toPodId, function (err, toPod) { | ||
157 | if (err) return logger.error('Error finding pod by id.', { err: err }) | ||
158 | |||
159 | // Maybe the pod is not our friend anymore so simply remove them | ||
160 | if (!toPod) { | ||
161 | Requests.removePodOf(requestToMake.ids, toPodId) | ||
162 | return callbackEach() | ||
163 | } | ||
164 | |||
165 | makeRequest(toPod, requestToMake.datas, function (success) { | ||
166 | if (err) { | ||
167 | logger.error('Errors when sent request to %s.', toPod.url, { error: err }) | ||
168 | // Do not stop the process just for one error | ||
169 | return callbackEach() | ||
170 | } | ||
171 | |||
172 | if (success === true) { | ||
173 | logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) | ||
174 | |||
175 | // Remove the pod id of these request ids | ||
176 | Requests.removePodOf(requestToMake.ids, toPodId) | ||
177 | goodPods.push(toPodId) | ||
178 | } else { | ||
179 | badPods.push(toPodId) | ||
180 | } | ||
181 | |||
182 | callbackEach() | ||
183 | }) | ||
184 | }) | ||
185 | }, function () { | ||
186 | // All the requests were made, we update the pods score | ||
187 | updatePodsScore(goodPods, badPods) | ||
188 | // Flush requests with no pod | ||
189 | Requests.removeWithEmptyTo() | ||
190 | }) | ||
191 | }) | ||
192 | } | ||
193 | |||
194 | // Remove pods with a score of 0 (too many requests where they were unreachable) | ||
195 | function removeBadPods () { | ||
196 | async.waterfall([ | ||
197 | function findBadPods (callback) { | ||
198 | Pods.findBadPods(function (err, pods) { | ||
199 | if (err) { | ||
200 | logger.error('Cannot find bad pods.', { error: err }) | ||
201 | return callback(err) | ||
202 | } | ||
203 | |||
204 | return callback(null, pods) | ||
205 | }) | ||
206 | }, | ||
207 | |||
208 | function listVideosOfTheseBadPods (pods, callback) { | ||
209 | if (pods.length === 0) return callback(null) | ||
210 | |||
211 | const urls = map(pods, 'url') | ||
212 | const ids = map(pods, '_id') | ||
213 | |||
214 | Video.listByUrls(urls, function (err, videosList) { | ||
215 | if (err) { | ||
216 | logger.error('Cannot list videos urls.', { error: err, urls: urls }) | ||
217 | return callback(null, ids, []) | ||
218 | } | ||
219 | |||
220 | return callback(null, ids, videosList) | ||
221 | }) | ||
222 | }, | ||
223 | |||
224 | function removeVideosOfTheseBadPods (podIds, videosList, callback) { | ||
225 | // We don't have to remove pods, skip | ||
226 | if (typeof podIds === 'function') return podIds(null) | ||
227 | |||
228 | async.each(videosList, function (video, callbackEach) { | ||
229 | video.remove(callbackEach) | ||
230 | }, function (err) { | ||
231 | if (err) { | ||
232 | // Don't stop the process | ||
233 | logger.error('Error while removing videos of bad pods.', { error: err }) | ||
234 | return | ||
235 | } | ||
236 | |||
237 | return callback(null, podIds) | ||
238 | }) | ||
239 | }, | ||
240 | |||
241 | function removeBadPodsFromDB (podIds, callback) { | ||
242 | // We don't have to remove pods, skip | ||
243 | if (typeof podIds === 'function') return podIds(null) | ||
244 | |||
245 | Pods.removeAllByIds(podIds, callback) | ||
246 | } | ||
247 | ], function (err, removeResult) { | ||
248 | if (err) { | ||
249 | logger.error('Cannot remove bad pods.', { error: err }) | ||
250 | } else if (removeResult) { | ||
251 | const podsRemoved = removeResult.result.n | ||
252 | logger.info('Removed %d pods.', podsRemoved) | ||
253 | } else { | ||
254 | logger.info('No need to remove bad pods.') | ||
255 | } | ||
256 | }) | ||
257 | } | ||
258 | |||
259 | function updatePodsScore (goodPods, badPods) { | ||
260 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | ||
261 | |||
262 | Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | ||
263 | if (err) logger.error('Cannot increment scores of good pods.') | ||
264 | }) | ||
265 | |||
266 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | ||
267 | if (err) logger.error('Cannot decrement scores of bad pods.') | ||
268 | removeBadPods() | ||
269 | }) | ||
270 | } | ||