aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-06-28 20:10:32 +0200
committerChocobozzz <florian.bigard@gmail.com>2016-06-28 20:10:32 +0200
commit00057e85a703713a8f0d96e01c49978be0987eb2 (patch)
tree54735435062d2ededa72448de9fcaa677cf2c79c /server/lib
parentaaf61f3810e6d57c5130af959bd2860df32775e7 (diff)
downloadPeerTube-00057e85a703713a8f0d96e01c49978be0987eb2.tar.gz
PeerTube-00057e85a703713a8f0d96e01c49978be0987eb2.tar.zst
PeerTube-00057e85a703713a8f0d96e01c49978be0987eb2.zip
Request model refractoring -> use mongoose api
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/friends.js42
-rw-r--r--server/lib/requestsScheduler.js270
2 files changed, 28 insertions, 284 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js
index 91cd69f86..617cc1ab4 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -10,12 +10,12 @@ const constants = require('../initializers/constants')
10const logger = require('../helpers/logger') 10const logger = require('../helpers/logger')
11const peertubeCrypto = require('../helpers/peertubeCrypto') 11const peertubeCrypto = require('../helpers/peertubeCrypto')
12const Pods = require('../models/pods') 12const Pods = require('../models/pods')
13const requestsScheduler = require('../lib/requestsScheduler')
14const requests = require('../helpers/requests') 13const requests = require('../helpers/requests')
15 14
16const http = config.get('webserver.https') ? 'https' : 'http' 15const http = config.get('webserver.https') ? 'https' : 'http'
17const host = config.get('webserver.host') 16const host = config.get('webserver.host')
18const port = config.get('webserver.port') 17const port = config.get('webserver.port')
18const Request = mongoose.model('Request')
19const Video = mongoose.model('Video') 19const Video = mongoose.model('Video')
20 20
21const pods = { 21const pods = {
@@ -29,10 +29,7 @@ const pods = {
29} 29}
30 30
31function addVideoToFriends (video) { 31function addVideoToFriends (video) {
32 // ensure namePath is null 32 createRequest('add', video)
33 video.namePath = null
34
35 requestsScheduler.addRequest('add', video)
36} 33}
37 34
38function hasFriends (callback) { 35function hasFriends (callback) {
@@ -76,9 +73,9 @@ function makeFriends (callback) {
76 73
77function quitFriends (callback) { 74function quitFriends (callback) {
78 // Stop pool requests 75 // Stop pool requests
79 requestsScheduler.deactivate() 76 Request.deactivate()
80 // Flush pool requests 77 // Flush pool requests
81 requestsScheduler.flush() 78 Request.flush()
82 79
83 async.waterfall([ 80 async.waterfall([
84 function getPodsList (callbackAsync) { 81 function getPodsList (callbackAsync) {
@@ -127,7 +124,7 @@ function quitFriends (callback) {
127 } 124 }
128 ], function (err) { 125 ], function (err) {
129 // Don't forget to re activate the scheduler, even if there was an error 126 // Don't forget to re activate the scheduler, even if there was an error
130 requestsScheduler.activate() 127 Request.activate()
131 128
132 if (err) return callback(err) 129 if (err) return callback(err)
133 130
@@ -136,8 +133,8 @@ function quitFriends (callback) {
136 }) 133 })
137} 134}
138 135
139function removeVideoToFriends (video) { 136function removeVideoToFriends (videoParams) {
140 requestsScheduler.addRequest('remove', video) 137 createRequest('remove', videoParams)
141} 138}
142 139
143function sendOwnedVideosToPod (podId) { 140function sendOwnedVideosToPod (podId) {
@@ -155,7 +152,7 @@ function sendOwnedVideosToPod (podId) {
155 return 152 return
156 } 153 }
157 154
158 requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) 155 createRequest('add', remoteVideo, [ podId ])
159 }) 156 })
160 }) 157 })
161 }) 158 })
@@ -211,9 +208,9 @@ function getForeignPodsList (url, callback) {
211 208
212function makeRequestsToWinningPods (cert, podsList, callback) { 209function makeRequestsToWinningPods (cert, podsList, callback) {
213 // Stop pool requests 210 // Stop pool requests
214 requestsScheduler.deactivate() 211 Request.deactivate()
215 // Flush pool requests 212 // Flush pool requests
216 requestsScheduler.forceSend() 213 Request.forceSend()
217 214
218 async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 215 async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
219 const params = { 216 const params = {
@@ -249,9 +246,26 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
249 }, function endRequests () { 246 }, function endRequests () {
250 // Final callback, we've ended all the requests 247 // Final callback, we've ended all the requests
251 // Now we made new friends, we can re activate the pool of requests 248 // Now we made new friends, we can re activate the pool of requests
252 requestsScheduler.activate() 249 Request.activate()
253 250
254 logger.debug('makeRequestsToWinningPods finished.') 251 logger.debug('makeRequestsToWinningPods finished.')
255 return callback() 252 return callback()
256 }) 253 })
257} 254}
255
256function createRequest (type, data, to) {
257 const req = new Request({
258 request: {
259 type: type,
260 data: data
261 }
262 })
263
264 if (to) {
265 req.to = to
266 }
267
268 req.save(function (err) {
269 if (err) logger.error('Cannot save the request.', { error: err })
270 })
271}
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js
deleted file mode 100644
index b192d8299..000000000
--- a/server/lib/requestsScheduler.js
+++ /dev/null
@@ -1,270 +0,0 @@
1'use strict'
2
3const async = require('async')
4const map = require('lodash/map')
5const mongoose = require('mongoose')
6
7const constants = require('../initializers/constants')
8const logger = require('../helpers/logger')
9const Pods = require('../models/pods')
10const Requests = require('../models/requests')
11const requests = require('../helpers/requests')
12
13const Video = mongoose.model('Video')
14
15let timer = null
16
17const requestsScheduler = {
18 activate: activate,
19 addRequest: addRequest,
20 addRequestTo: addRequestTo,
21 deactivate: deactivate,
22 flush: flush,
23 forceSend: forceSend
24}
25
26function activate () {
27 logger.info('Requests scheduler activated.')
28 timer = setInterval(makeRequests, constants.INTERVAL)
29}
30
31// Add request to the scheduler
32function addRequest (type, data) {
33 logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
34
35 const request = {
36 type: type,
37 data: data
38 }
39
40 Pods.listAllIds(function (err, podIds) {
41 if (err) {
42 logger.debug('Cannot list pod ids.')
43 return
44 }
45
46 // No friends
47 if (!podIds) return
48
49 Requests.create(request, podIds, function (err) {
50 if (err) logger.error('Cannot create a request.', { error: err })
51 })
52 })
53}
54
55function addRequestTo (podIds, type, data) {
56 const request = {
57 type: type,
58 data: data
59 }
60
61 Requests.create(request, podIds, function (err) {
62 if (err) logger.error('Cannot create a request.', { error: err })
63 })
64}
65
66function deactivate () {
67 logger.info('Requests scheduler deactivated.')
68 clearInterval(timer)
69}
70
71function flush () {
72 Requests.removeAll(function (err) {
73 if (err) {
74 logger.error('Cannot flush the requests.', { error: err })
75 }
76 })
77}
78
79function forceSend () {
80 logger.info('Force requests scheduler sending.')
81 makeRequests()
82}
83
84// ---------------------------------------------------------------------------
85
86module.exports = requestsScheduler
87
88// ---------------------------------------------------------------------------
89
90// Make a requests to friends of a certain type
91function makeRequest (toPod, requestsToMake, callback) {
92 if (!callback) callback = function () {}
93
94 const params = {
95 toPod: toPod,
96 encrypt: true, // Security
97 sign: true, // To prove our identity
98 method: 'POST',
99 path: '/api/' + constants.API_VERSION + '/remote/videos',
100 data: requestsToMake // Requests we need to make
101 }
102
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
105 requests.makeSecureRequest(params, function (err, res) {
106 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
107 logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
108
109 return callback(false)
110 }
111
112 return callback(true)
113 })
114}
115
116// Make all the requests of the scheduler
117function makeRequests () {
118 Requests.list(function (err, requests) {
119 if (err) {
120 logger.error('Cannot get the list of requests.', { err: err })
121 return // Abort
122 }
123
124 // If there are no requests, abort
125 if (requests.length === 0) {
126 logger.info('No requests to make.')
127 return
128 }
129
130 logger.info('Making requests to friends.')
131
132 // Requests by pods id
133 const requestsToMake = {}
134
135 requests.forEach(function (poolRequest) {
136 poolRequest.to.forEach(function (toPodId) {
137 if (!requestsToMake[toPodId]) {
138 requestsToMake[toPodId] = {
139 ids: [],
140 datas: []
141 }
142 }
143
144 requestsToMake[toPodId].ids.push(poolRequest._id)
145 requestsToMake[toPodId].datas.push(poolRequest.request)
146 })
147 })
148
149 const goodPods = []
150 const badPods = []
151
152 async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
153 const requestToMake = requestsToMake[toPodId]
154
155 // FIXME: mongodb request inside a loop :/
156 Pods.findById(toPodId, function (err, toPod) {
157 if (err) return logger.error('Error finding pod by id.', { err: err })
158
159 // Maybe the pod is not our friend anymore so simply remove them
160 if (!toPod) {
161 Requests.removePodOf(requestToMake.ids, toPodId)
162 return callbackEach()
163 }
164
165 makeRequest(toPod, requestToMake.datas, function (success) {
166 if (err) {
167 logger.error('Errors when sent request to %s.', toPod.url, { error: err })
168 // Do not stop the process just for one error
169 return callbackEach()
170 }
171
172 if (success === true) {
173 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
174
175 // Remove the pod id of these request ids
176 Requests.removePodOf(requestToMake.ids, toPodId)
177 goodPods.push(toPodId)
178 } else {
179 badPods.push(toPodId)
180 }
181
182 callbackEach()
183 })
184 })
185 }, function () {
186 // All the requests were made, we update the pods score
187 updatePodsScore(goodPods, badPods)
188 // Flush requests with no pod
189 Requests.removeWithEmptyTo()
190 })
191 })
192}
193
194// Remove pods with a score of 0 (too many requests where they were unreachable)
195function removeBadPods () {
196 async.waterfall([
197 function findBadPods (callback) {
198 Pods.findBadPods(function (err, pods) {
199 if (err) {
200 logger.error('Cannot find bad pods.', { error: err })
201 return callback(err)
202 }
203
204 return callback(null, pods)
205 })
206 },
207
208 function listVideosOfTheseBadPods (pods, callback) {
209 if (pods.length === 0) return callback(null)
210
211 const urls = map(pods, 'url')
212 const ids = map(pods, '_id')
213
214 Video.listByUrls(urls, function (err, videosList) {
215 if (err) {
216 logger.error('Cannot list videos urls.', { error: err, urls: urls })
217 return callback(null, ids, [])
218 }
219
220 return callback(null, ids, videosList)
221 })
222 },
223
224 function removeVideosOfTheseBadPods (podIds, videosList, callback) {
225 // We don't have to remove pods, skip
226 if (typeof podIds === 'function') return podIds(null)
227
228 async.each(videosList, function (video, callbackEach) {
229 video.remove(callbackEach)
230 }, function (err) {
231 if (err) {
232 // Don't stop the process
233 logger.error('Error while removing videos of bad pods.', { error: err })
234 return
235 }
236
237 return callback(null, podIds)
238 })
239 },
240
241 function removeBadPodsFromDB (podIds, callback) {
242 // We don't have to remove pods, skip
243 if (typeof podIds === 'function') return podIds(null)
244
245 Pods.removeAllByIds(podIds, callback)
246 }
247 ], function (err, removeResult) {
248 if (err) {
249 logger.error('Cannot remove bad pods.', { error: err })
250 } else if (removeResult) {
251 const podsRemoved = removeResult.result.n
252 logger.info('Removed %d pods.', podsRemoved)
253 } else {
254 logger.info('No need to remove bad pods.')
255 }
256 })
257}
258
259function updatePodsScore (goodPods, badPods) {
260 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
261
262 Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
263 if (err) logger.error('Cannot increment scores of good pods.')
264 })
265
266 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
267 if (err) logger.error('Cannot decrement scores of bad pods.')
268 removeBadPods()
269 })
270}