aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server.js18
-rw-r--r--server/initializers/database.js2
-rw-r--r--server/lib/friends.js42
-rw-r--r--server/models/pods.js7
-rw-r--r--server/models/request.js (renamed from server/lib/requestsScheduler.js)116
-rw-r--r--server/models/requests.js73
-rw-r--r--server/tests/api/multiplePods.js2
7 files changed, 110 insertions, 150 deletions
diff --git a/server.js b/server.js
index 63aeb7145..33e34019d 100644
--- a/server.js
+++ b/server.js
@@ -21,24 +21,26 @@ if (miss.length !== 0) {
21 throw new Error('Miss some configurations keys : ' + miss) 21 throw new Error('Miss some configurations keys : ' + miss)
22} 22}
23 23
24// ----------- PeerTube modules ----------- 24// ----------- Database -----------
25const config = require('config') 25const config = require('config')
26const constants = require('./server/initializers/constants') 26const constants = require('./server/initializers/constants')
27const customValidators = require('./server/helpers/customValidators')
28const database = require('./server/initializers/database') 27const database = require('./server/initializers/database')
29const installer = require('./server/initializers/installer')
30const logger = require('./server/helpers/logger') 28const logger = require('./server/helpers/logger')
31const poolRequests = require('./server/lib/requestsScheduler') 29
30database.connect()
31
32// ----------- PeerTube modules -----------
33const customValidators = require('./server/helpers/customValidators')
34const installer = require('./server/initializers/installer')
35const mongoose = require('mongoose')
32const routes = require('./server/controllers') 36const routes = require('./server/controllers')
33const utils = require('./server/helpers/utils') 37const utils = require('./server/helpers/utils')
34const webtorrent = require('./server/lib/webtorrent') 38const webtorrent = require('./server/lib/webtorrent')
39const Request = mongoose.model('Request')
35 40
36// Get configurations 41// Get configurations
37const port = config.get('listen.port') 42const port = config.get('listen.port')
38 43
39// ----------- Database -----------
40database.connect()
41
42// ----------- Command line ----------- 44// ----------- Command line -----------
43 45
44// ----------- App ----------- 46// ----------- App -----------
@@ -135,7 +137,7 @@ installer.installApplication(function (err) {
135 // ----------- Make the server listening ----------- 137 // ----------- Make the server listening -----------
136 server.listen(port, function () { 138 server.listen(port, function () {
137 // Activate the pool requests 139 // Activate the pool requests
138 poolRequests.activate() 140 Request.activate()
139 141
140 // videos.seedAllExisting(function () { 142 // videos.seedAllExisting(function () {
141 logger.info('Seeded all the videos') 143 logger.info('Seeded all the videos')
diff --git a/server/initializers/database.js b/server/initializers/database.js
index 5932a978b..d0d969663 100644
--- a/server/initializers/database.js
+++ b/server/initializers/database.js
@@ -7,6 +7,8 @@ const logger = require('../helpers/logger')
7 7
8// Bootstrap models 8// Bootstrap models
9require('../models/video') 9require('../models/video')
10// Request model needs Video model
11require('../models/request')
10 12
11const dbname = 'peertube' + config.get('database.suffix') 13const dbname = 'peertube' + config.get('database.suffix')
12const host = config.get('database.host') 14const host = config.get('database.host')
diff --git a/server/lib/friends.js b/server/lib/friends.js
index 91cd69f86..617cc1ab4 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -10,12 +10,12 @@ const constants = require('../initializers/constants')
10const logger = require('../helpers/logger') 10const logger = require('../helpers/logger')
11const peertubeCrypto = require('../helpers/peertubeCrypto') 11const peertubeCrypto = require('../helpers/peertubeCrypto')
12const Pods = require('../models/pods') 12const Pods = require('../models/pods')
13const requestsScheduler = require('../lib/requestsScheduler')
14const requests = require('../helpers/requests') 13const requests = require('../helpers/requests')
15 14
16const http = config.get('webserver.https') ? 'https' : 'http' 15const http = config.get('webserver.https') ? 'https' : 'http'
17const host = config.get('webserver.host') 16const host = config.get('webserver.host')
18const port = config.get('webserver.port') 17const port = config.get('webserver.port')
18const Request = mongoose.model('Request')
19const Video = mongoose.model('Video') 19const Video = mongoose.model('Video')
20 20
21const pods = { 21const pods = {
@@ -29,10 +29,7 @@ const pods = {
29} 29}
30 30
31function addVideoToFriends (video) { 31function addVideoToFriends (video) {
32 // ensure namePath is null 32 createRequest('add', video)
33 video.namePath = null
34
35 requestsScheduler.addRequest('add', video)
36} 33}
37 34
38function hasFriends (callback) { 35function hasFriends (callback) {
@@ -76,9 +73,9 @@ function makeFriends (callback) {
76 73
77function quitFriends (callback) { 74function quitFriends (callback) {
78 // Stop pool requests 75 // Stop pool requests
79 requestsScheduler.deactivate() 76 Request.deactivate()
80 // Flush pool requests 77 // Flush pool requests
81 requestsScheduler.flush() 78 Request.flush()
82 79
83 async.waterfall([ 80 async.waterfall([
84 function getPodsList (callbackAsync) { 81 function getPodsList (callbackAsync) {
@@ -127,7 +124,7 @@ function quitFriends (callback) {
127 } 124 }
128 ], function (err) { 125 ], function (err) {
129 // Don't forget to re activate the scheduler, even if there was an error 126 // Don't forget to re activate the scheduler, even if there was an error
130 requestsScheduler.activate() 127 Request.activate()
131 128
132 if (err) return callback(err) 129 if (err) return callback(err)
133 130
@@ -136,8 +133,8 @@ function quitFriends (callback) {
136 }) 133 })
137} 134}
138 135
139function removeVideoToFriends (video) { 136function removeVideoToFriends (videoParams) {
140 requestsScheduler.addRequest('remove', video) 137 createRequest('remove', videoParams)
141} 138}
142 139
143function sendOwnedVideosToPod (podId) { 140function sendOwnedVideosToPod (podId) {
@@ -155,7 +152,7 @@ function sendOwnedVideosToPod (podId) {
155 return 152 return
156 } 153 }
157 154
158 requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) 155 createRequest('add', remoteVideo, [ podId ])
159 }) 156 })
160 }) 157 })
161 }) 158 })
@@ -211,9 +208,9 @@ function getForeignPodsList (url, callback) {
211 208
212function makeRequestsToWinningPods (cert, podsList, callback) { 209function makeRequestsToWinningPods (cert, podsList, callback) {
213 // Stop pool requests 210 // Stop pool requests
214 requestsScheduler.deactivate() 211 Request.deactivate()
215 // Flush pool requests 212 // Flush pool requests
216 requestsScheduler.forceSend() 213 Request.forceSend()
217 214
218 async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 215 async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
219 const params = { 216 const params = {
@@ -249,9 +246,26 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
249 }, function endRequests () { 246 }, function endRequests () {
250 // Final callback, we've ended all the requests 247 // Final callback, we've ended all the requests
251 // Now we made new friends, we can re activate the pool of requests 248 // Now we made new friends, we can re activate the pool of requests
252 requestsScheduler.activate() 249 Request.activate()
253 250
254 logger.debug('makeRequestsToWinningPods finished.') 251 logger.debug('makeRequestsToWinningPods finished.')
255 return callback() 252 return callback()
256 }) 253 })
257} 254}
255
256function createRequest (type, data, to) {
257 const req = new Request({
258 request: {
259 type: type,
260 data: data
261 }
262 })
263
264 if (to) {
265 req.to = to
266 }
267
268 req.save(function (err) {
269 if (err) logger.error('Cannot save the request.', { error: err })
270 })
271}
diff --git a/server/models/pods.js b/server/models/pods.js
index daeadeb07..9502d92e4 100644
--- a/server/models/pods.js
+++ b/server/models/pods.js
@@ -1,6 +1,7 @@
1'use strict' 1'use strict'
2 2
3const mongoose = require('mongoose') 3const mongoose = require('mongoose')
4const map = require('lodash/map')
4 5
5const constants = require('../initializers/constants') 6const constants = require('../initializers/constants')
6const logger = require('../helpers/logger') 7const logger = require('../helpers/logger')
@@ -76,7 +77,11 @@ function list (callback) {
76} 77}
77 78
78function listAllIds (callback) { 79function listAllIds (callback) {
79 return PodsDB.find({}, { _id: 1 }, callback) 80 return PodsDB.find({}, { _id: 1 }, function (err, pods) {
81 if (err) return callback(err)
82
83 return callback(null, map(pods, '_id'))
84 })
80} 85}
81 86
82function listAllUrls (callback) { 87function listAllUrls (callback) {
diff --git a/server/lib/requestsScheduler.js b/server/models/request.js
index b192d8299..2a407388a 100644
--- a/server/lib/requestsScheduler.js
+++ b/server/models/request.js
@@ -7,60 +7,51 @@ const mongoose = require('mongoose')
7const constants = require('../initializers/constants') 7const constants = require('../initializers/constants')
8const logger = require('../helpers/logger') 8const logger = require('../helpers/logger')
9const Pods = require('../models/pods') 9const Pods = require('../models/pods')
10const Requests = require('../models/requests')
11const requests = require('../helpers/requests') 10const requests = require('../helpers/requests')
12 11
13const Video = mongoose.model('Video') 12const Video = mongoose.model('Video')
14 13
15let timer = null 14let timer = null
16 15
17const requestsScheduler = { 16// ---------------------------------------------------------------------------
18 activate: activate,
19 addRequest: addRequest,
20 addRequestTo: addRequestTo,
21 deactivate: deactivate,
22 flush: flush,
23 forceSend: forceSend
24}
25 17
26function activate () { 18const RequestSchema = mongoose.Schema({
27 logger.info('Requests scheduler activated.') 19 request: mongoose.Schema.Types.Mixed,
28 timer = setInterval(makeRequests, constants.INTERVAL) 20 to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ]
29} 21})
30 22
31// Add request to the scheduler 23RequestSchema.statics = {
32function addRequest (type, data) { 24 activate,
33 logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) 25 deactivate,
26 flush,
27 forceSend
28}
34 29
35 const request = { 30RequestSchema.pre('save', function (next) {
36 type: type, 31 const self = this
37 data: data
38 }
39 32
40 Pods.listAllIds(function (err, podIds) { 33 if (self.to.length === 0) {
41 if (err) { 34 Pods.listAllIds(function (err, podIds) {
42 logger.debug('Cannot list pod ids.') 35 if (err) return next(err)
43 return
44 }
45 36
46 // No friends 37 // No friends
47 if (!podIds) return 38 if (podIds.length === 0) return
48 39
49 Requests.create(request, podIds, function (err) { 40 self.to = podIds
50 if (err) logger.error('Cannot create a request.', { error: err }) 41 return next()
51 }) 42 })
52 }) 43 } else {
53} 44 return next()
54
55function addRequestTo (podIds, type, data) {
56 const request = {
57 type: type,
58 data: data
59 } 45 }
46})
60 47
61 Requests.create(request, podIds, function (err) { 48mongoose.model('Request', RequestSchema)
62 if (err) logger.error('Cannot create a request.', { error: err }) 49
63 }) 50// ------------------------------ STATICS ------------------------------
51
52function activate () {
53 logger.info('Requests scheduler activated.')
54 timer = setInterval(makeRequests.bind(this), constants.INTERVAL)
64} 55}
65 56
66function deactivate () { 57function deactivate () {
@@ -69,24 +60,18 @@ function deactivate () {
69} 60}
70 61
71function flush () { 62function flush () {
72 Requests.removeAll(function (err) { 63 removeAll.call(this, function (err) {
73 if (err) { 64 if (err) logger.error('Cannot flush the requests.', { error: err })
74 logger.error('Cannot flush the requests.', { error: err })
75 }
76 }) 65 })
77} 66}
78 67
79function forceSend () { 68function forceSend () {
80 logger.info('Force requests scheduler sending.') 69 logger.info('Force requests scheduler sending.')
81 makeRequests() 70 makeRequests.call(this)
82} 71}
83 72
84// --------------------------------------------------------------------------- 73// ---------------------------------------------------------------------------
85 74
86module.exports = requestsScheduler
87
88// ---------------------------------------------------------------------------
89
90// Make a requests to friends of a certain type 75// Make a requests to friends of a certain type
91function makeRequest (toPod, requestsToMake, callback) { 76function makeRequest (toPod, requestsToMake, callback) {
92 if (!callback) callback = function () {} 77 if (!callback) callback = function () {}
@@ -115,7 +100,9 @@ function makeRequest (toPod, requestsToMake, callback) {
115 100
116// Make all the requests of the scheduler 101// Make all the requests of the scheduler
117function makeRequests () { 102function makeRequests () {
118 Requests.list(function (err, requests) { 103 const self = this
104
105 list.call(self, function (err, requests) {
119 if (err) { 106 if (err) {
120 logger.error('Cannot get the list of requests.', { err: err }) 107 logger.error('Cannot get the list of requests.', { err: err })
121 return // Abort 108 return // Abort
@@ -154,11 +141,14 @@ function makeRequests () {
154 141
155 // FIXME: mongodb request inside a loop :/ 142 // FIXME: mongodb request inside a loop :/
156 Pods.findById(toPodId, function (err, toPod) { 143 Pods.findById(toPodId, function (err, toPod) {
157 if (err) return logger.error('Error finding pod by id.', { err: err }) 144 if (err) {
145 logger.error('Error finding pod by id.', { err: err })
146 return callbackEach()
147 }
158 148
159 // Maybe the pod is not our friend anymore so simply remove them 149 // Maybe the pod is not our friend anymore so simply remove them
160 if (!toPod) { 150 if (!toPod) {
161 Requests.removePodOf(requestToMake.ids, toPodId) 151 removePodOf.call(self, requestToMake.ids, toPodId)
162 return callbackEach() 152 return callbackEach()
163 } 153 }
164 154
@@ -173,7 +163,7 @@ function makeRequests () {
173 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) 163 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
174 164
175 // Remove the pod id of these request ids 165 // Remove the pod id of these request ids
176 Requests.removePodOf(requestToMake.ids, toPodId) 166 removePodOf.call(self, requestToMake.ids, toPodId)
177 goodPods.push(toPodId) 167 goodPods.push(toPodId)
178 } else { 168 } else {
179 badPods.push(toPodId) 169 badPods.push(toPodId)
@@ -186,7 +176,7 @@ function makeRequests () {
186 // All the requests were made, we update the pods score 176 // All the requests were made, we update the pods score
187 updatePodsScore(goodPods, badPods) 177 updatePodsScore(goodPods, badPods)
188 // Flush requests with no pod 178 // Flush requests with no pod
189 Requests.removeWithEmptyTo() 179 removeWithEmptyTo.call(self)
190 }) 180 })
191 }) 181 })
192} 182}
@@ -268,3 +258,23 @@ function updatePodsScore (goodPods, badPods) {
268 removeBadPods() 258 removeBadPods()
269 }) 259 })
270} 260}
261
262function list (callback) {
263 this.find({ }, { _id: 1, request: 1, to: 1 }, callback)
264}
265
266function removeAll (callback) {
267 this.remove({ }, callback)
268}
269
270function removePodOf (requestsIds, podId, callback) {
271 if (!callback) callback = function () {}
272
273 this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
274}
275
276function removeWithEmptyTo (callback) {
277 if (!callback) callback = function () {}
278
279 this.remove({ to: { $size: 0 } }, callback)
280}
diff --git a/server/models/requests.js b/server/models/requests.js
deleted file mode 100644
index e67ccad56..000000000
--- a/server/models/requests.js
+++ /dev/null
@@ -1,73 +0,0 @@
1'use strict'
2
3const mongoose = require('mongoose')
4
5const logger = require('../helpers/logger')
6
7// ---------------------------------------------------------------------------
8
9const requestsSchema = mongoose.Schema({
10 request: mongoose.Schema.Types.Mixed,
11 to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ]
12})
13const RequestsDB = mongoose.model('requests', requestsSchema)
14
15// ---------------------------------------------------------------------------
16
17const Requests = {
18 create: create,
19 findById: findById,
20 list: list,
21 removeAll: removeAll,
22 removePodOf: removePodOf,
23 removeRequestById: removeRequestById,
24 removeRequests: removeRequests,
25 removeWithEmptyTo: removeWithEmptyTo
26}
27
28function create (request, to, callback) {
29 RequestsDB.create({ request: request, to: to }, callback)
30}
31
32function findById (id, callback) {
33 RequestsDB.findOne({ id: id }, callback)
34}
35
36function list (callback) {
37 RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback)
38}
39
40function removeAll (callback) {
41 RequestsDB.remove({ }, callback)
42}
43
44function removePodOf (requestsIds, podId, callback) {
45 if (!callback) callback = function () {}
46
47 RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
48}
49
50function removeRequestById (id, callback) {
51 RequestsDB.remove({ id: id }, callback)
52}
53
54function removeRequests (ids) {
55 RequestsDB.remove({ _id: { $in: ids } }, function (err) {
56 if (err) {
57 logger.error('Cannot remove requests from the requests database.', { error: err })
58 return // Abort
59 }
60
61 logger.info('Pool requests flushed.')
62 })
63}
64
65function removeWithEmptyTo (callback) {
66 if (!callback) callback = function () {}
67
68 RequestsDB.remove({ to: { $size: 0 } }, callback)
69}
70
71// ---------------------------------------------------------------------------
72
73module.exports = Requests
diff --git a/server/tests/api/multiplePods.js b/server/tests/api/multiplePods.js
index 52dfda137..2a1bc64e6 100644
--- a/server/tests/api/multiplePods.js
+++ b/server/tests/api/multiplePods.js
@@ -414,7 +414,7 @@ describe('Test multiple pods', function () {
414 414
415 // Keep the logs if the test failed 415 // Keep the logs if the test failed
416 if (this.ok) { 416 if (this.ok) {
417 // utils.flushTests(done) 417 utils.flushTests(done)
418 } else { 418 } else {
419 done() 419 done()
420 } 420 }