aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2015-12-04 16:13:32 +0100
committerChocobozzz <florian.bigard@gmail.com>2015-12-04 16:13:32 +0100
commit0b69752270f1ceea06a29872b3db23660a55d6d3 (patch)
tree42da726633f3e48f4fe592cfd2c1ca14346a159b /src
parentaf82cae07dc568e3cb10acd70113df56eb8b15a9 (diff)
downloadPeerTube-0b69752270f1ceea06a29872b3db23660a55d6d3.tar.gz
PeerTube-0b69752270f1ceea06a29872b3db23660a55d6d3.tar.zst
PeerTube-0b69752270f1ceea06a29872b3db23660a55d6d3.zip
Add a pool of requests instead of making a request at each action (add
video/remove video) for performance in big networks
Diffstat (limited to 'src')
-rw-r--r--src/customValidators.js29
-rw-r--r--src/database.js12
-rw-r--r--src/pods.js89
-rw-r--r--src/poolRequests.js158
-rw-r--r--src/utils.js6
-rw-r--r--src/videos.js100
6 files changed, 264 insertions, 130 deletions
diff --git a/src/customValidators.js b/src/customValidators.js
new file mode 100644
index 000000000..73c2f8461
--- /dev/null
+++ b/src/customValidators.js
@@ -0,0 +1,29 @@
1;(function () {
2 'use strict'
3
4 var validator = require('validator')
5
6 var customValidators = {}
7
8 customValidators.eachIsRemoteVideosAddValid = function (values) {
9 return values.every(function (val) {
10 return validator.isLength(val.name, 1, 50) &&
11 validator.isLength(val.description, 1, 50) &&
12 validator.isLength(val.magnetUri, 10) &&
13 validator.isURL(val.podUrl)
14 })
15 }
16
17 customValidators.eachIsRemoteVideosRemoveValid = function (values) {
18 return values.every(function (val) {
19 return validator.isLength(val.magnetUri, 10)
20 })
21 }
22
23 customValidators.isArray = function (value) {
24 return Array.isArray(value)
25 }
26
27 // ----------- Export -----------
28 module.exports = customValidators
29})()
diff --git a/src/database.js b/src/database.js
index 740e89fa4..514a622dc 100644
--- a/src/database.js
+++ b/src/database.js
@@ -30,6 +30,15 @@
30 30
31 var PodsDB = mongoose.model('pods', podsSchema) 31 var PodsDB = mongoose.model('pods', podsSchema)
32 32
33 // ----------- PoolRequests -----------
34 var poolRequestsSchema = mongoose.Schema({
35 type: String,
36 id: String, // Special id to find duplicates (video created we want to remove...)
37 request: mongoose.Schema.Types.Mixed
38 })
39
40 var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema)
41
33 // ----------- Connection ----------- 42 // ----------- Connection -----------
34 43
35 mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname) 44 mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname)
@@ -45,6 +54,7 @@
45 // ----------- Export ----------- 54 // ----------- Export -----------
46 module.exports = { 55 module.exports = {
47 VideosDB: VideosDB, 56 VideosDB: VideosDB,
48 PodsDB: PodsDB 57 PodsDB: PodsDB,
58 PoolRequestsDB: PoolRequestsDB
49 } 59 }
50})() 60})()
diff --git a/src/pods.js b/src/pods.js
index e26b3f0ae..9afc6cc96 100644
--- a/src/pods.js
+++ b/src/pods.js
@@ -8,6 +8,7 @@
8 8
9 var logger = require('./logger') 9 var logger = require('./logger')
10 var PodsDB = require('./database').PodsDB 10 var PodsDB = require('./database').PodsDB
11 var poolRequests = require('./poolRequests')
11 var utils = require('./utils') 12 var utils = require('./utils')
12 13
13 var pods = {} 14 var pods = {}
@@ -16,13 +17,6 @@
16 var host = config.get('webserver.host') 17 var host = config.get('webserver.host')
17 var port = config.get('webserver.port') 18 var port = config.get('webserver.port')
18 19
19 // ----------- Constants -----------
20
21 var PODS_SCORE = {
22 MALUS: -10,
23 BONUS: 10
24 }
25
26 // ----------- Private functions ----------- 20 // ----------- Private functions -----------
27 21
28 function getForeignPodsList (url, callback) { 22 function getForeignPodsList (url, callback) {
@@ -34,25 +28,6 @@
34 }) 28 })
35 } 29 }
36 30
37 function updatePodsScore (good_pods, bad_pods) {
38 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
39
40 PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec()
41 PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) {
42 if (err) throw err
43 removeBadPods()
44 })
45 }
46
47 function removeBadPods () {
48 PodsDB.remove({ score: 0 }, function (err, result) {
49 if (err) throw err
50
51 var number_removed = result.result.n
52 if (number_removed !== 0) logger.info('Removed %d pod.', number_removed)
53 })
54 }
55
56 // ----------- Public functions ----------- 31 // ----------- Public functions -----------
57 32
58 pods.list = function (callback) { 33 pods.list = function (callback) {
@@ -93,58 +68,16 @@
93 }) 68 })
94 } 69 }
95 70
96 // { path, data } 71 pods.addVideoToFriends = function (video) {
97 pods.makeSecureRequest = function (data, callback) { 72 // To avoid duplicates
98 if (callback === undefined) callback = function () {} 73 var id = video.name + video.magnetUri
99 74 poolRequests.addToPoolRequests(id, 'add', video)
100 PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { 75 }
101 if (err) {
102 logger.error('Cannot get the list of the pods.', { error: err })
103 return callback(err)
104 }
105
106 logger.debug('Make multiple requests.')
107
108 var params = {
109 encrypt: true,
110 sign: true,
111 method: data.method,
112 path: data.path,
113 data: data.data
114 }
115
116 var bad_pods = []
117 var good_pods = []
118
119 utils.makeMultipleRetryRequest(
120 params,
121
122 pods,
123
124 function callbackEachPodFinished (err, response, body, pod, callback_each_pod_finished) {
125 if (err || response.statusCode !== 200) {
126 bad_pods.push(pod._id)
127 logger.error('Error sending secure request to %s/%s pod.', pod.url, data.path, { error: err })
128 } else {
129 good_pods.push(pod._id)
130 }
131
132 return callback_each_pod_finished()
133 },
134
135 function callbackAllPodsFinished (err) {
136 if (err) {
137 logger.error('There was some errors when sending the video meta data.', { error: err })
138 return callback(err)
139 }
140
141 logger.debug('Finished')
142 76
143 updatePodsScore(good_pods, bad_pods) 77 pods.removeVideoToFriends = function (video) {
144 callback(null) 78 // To avoid duplicates
145 } 79 var id = video.name + video.magnetUri
146 ) 80 poolRequests.addToPoolRequests(id, 'remove', video)
147 })
148 } 81 }
149 82
150 pods.makeFriends = function (callback) { 83 pods.makeFriends = function (callback) {
@@ -214,7 +147,7 @@
214 147
215 pods_list, 148 pods_list,
216 149
217 function eachRequest (err, response, body, pod, callback_each_request) { 150 function eachRequest (err, response, body, url, pod, callback_each_request) {
218 // We add the pod if it responded correctly with its public certificate 151 // We add the pod if it responded correctly with its public certificate
219 if (!err && response.statusCode === 200) { 152 if (!err && response.statusCode === 200) {
220 pods.add({ url: pod.url, publicKey: body.cert, score: global.FRIEND_BASE_SCORE }, function (err) { 153 pods.add({ url: pod.url, publicKey: body.cert, score: global.FRIEND_BASE_SCORE }, function (err) {
diff --git a/src/poolRequests.js b/src/poolRequests.js
new file mode 100644
index 000000000..b117c9923
--- /dev/null
+++ b/src/poolRequests.js
@@ -0,0 +1,158 @@
1;(function () {
2 'use strict'
3
4 var async = require('async')
5
6 var logger = require('./logger')
7 var database = require('./database')
8 var PoolRequestsDB = database.PoolRequestsDB
9 var PodsDB = database.PodsDB
10 var utils = require('./utils')
11
12 var poolRequests = {}
13
14 // ----------- Constants -----------
15
16 // Time to wait between requests to the friends
17 var INTERVAL = utils.isTestInstance() ? 10000 : 60000
18 var PODS_SCORE = {
19 MALUS: -10,
20 BONUS: 10
21 }
22
23 // ----------- Private -----------
24 var timer = null
25
26 function makePoolRequests () {
27 logger.info('Making pool requests to friends.')
28
29 PoolRequestsDB.find({}, { type: 1, request: 1 }, function (err, pool_requests) {
30 if (err) throw err
31
32 var requests = {
33 add: [],
34 remove: []
35 }
36
37 async.each(pool_requests, function (pool_request, callback_each) {
38 if (pool_request.type === 'add') {
39 requests.add.push(pool_request.request)
40 } else if (pool_request.type === 'remove') {
41 requests.remove.push(pool_request.request)
42 } else {
43 throw new Error('Unkown pool request type.')
44 }
45
46 callback_each()
47 }, function () {
48 makePoolRequest('add', requests.add)
49 makePoolRequest('remove', requests.remove)
50 logger.info('Pool requests to friends sent.')
51 })
52 })
53 }
54
55 function updatePodsScore (good_pods, bad_pods) {
56 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
57
58 PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec()
59 PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) {
60 if (err) throw err
61 removeBadPods()
62 })
63 }
64
65 function removeBadPods () {
66 PodsDB.remove({ score: 0 }, function (err, result) {
67 if (err) throw err
68
69 var number_removed = result.result.n
70 if (number_removed !== 0) logger.info('Removed %d pod.', number_removed)
71 })
72 }
73
74 function makePoolRequest (type, requests) {
75 logger.debug('Make pool requests scheduled.')
76 PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
77 if (err) throw err
78
79 var params = {
80 encrypt: true,
81 sign: true,
82 method: 'POST',
83 path: null,
84 data: requests
85 }
86
87 if (type === 'add') {
88 params.path = '/api/' + global.API_VERSION + '/remotevideos/add'
89 } else if (type === 'remove') {
90 params.path = '/api/' + global.API_VERSION + '/remotevideos/remove'
91 } else {
92 throw new Error('Unkown pool request type.')
93 }
94
95 var bad_pods = []
96 var good_pods = []
97
98 utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
99
100 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
101 if (err || response.statusCode !== 200) {
102 bad_pods.push(pod._id)
103 logger.error('Error sending secure request to %s pod.', url, { error: err })
104 } else {
105 good_pods.push(pod._id)
106 }
107
108 return callback_each_pod_finished()
109 }
110
111 function callbackAllPodsFinished (err) {
112 if (err) {
113 logger.error('There was some errors when sending the video meta data.', { error: err })
114 }
115
116 updatePodsScore(good_pods, bad_pods)
117 PoolRequestsDB.remove().exec()
118 }
119 })
120 }
121
122 // ----------- Public -----------
123 poolRequests.activate = function () {
124 logger.info('Pool requests activated.')
125 timer = setInterval(makePoolRequests, INTERVAL)
126 }
127
128 poolRequests.addToPoolRequests = function (id, type, request) {
129 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
130
131 PoolRequestsDB.findOne({ id: id }, function (err, entity) {
132 if (err) logger.error(err)
133
134 if (entity) {
135 if (entity.type === type) {
136 logger.error(new Error('Cannot insert two same requests.'))
137 return
138 }
139
140 // Remove the request of the other type
141 PoolRequestsDB.remove({ id: id }, function (err) {
142 if (err) logger.error(err)
143 })
144 } else {
145 PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
146 if (err) logger.error(err)
147 })
148 }
149 })
150 }
151
152 poolRequests.deactivate = function () {
153 logger.info('Pool requests deactivated.')
154 clearInterval(timer)
155 }
156
157 module.exports = poolRequests
158})()
diff --git a/src/utils.js b/src/utils.js
index dda6c7a0a..4aa1fc55e 100644
--- a/src/utils.js
+++ b/src/utils.js
@@ -36,7 +36,7 @@
36 36
37 replay( 37 replay(
38 request.post(params, function (err, response, body) { 38 request.post(params, function (err, response, body) {
39 callbackEach(err, response, body, to_pod) 39 callbackEach(err, response, body, params.url, to_pod)
40 }), 40 }),
41 { 41 {
42 retries: retries, 42 retries: retries,
@@ -71,8 +71,8 @@
71 71
72 // Make a request for each pod 72 // Make a request for each pod
73 async.each(pods, function (pod, callback_each_async) { 73 async.each(pods, function (pod, callback_each_async) {
74 function callbackEachRetryRequest (err, response, body, pod) { 74 function callbackEachRetryRequest (err, response, body, url, pod) {
75 callbackEach(err, response, body, pod, function () { 75 callbackEach(err, response, body, url, pod, function () {
76 callback_each_async() 76 callback_each_async()
77 }) 77 })
78 } 78 }
diff --git a/src/videos.js b/src/videos.js
index 8c44cad95..e3a5b49f1 100644
--- a/src/videos.js
+++ b/src/videos.js
@@ -3,6 +3,7 @@
3 3
4 var async = require('async') 4 var async = require('async')
5 var config = require('config') 5 var config = require('config')
6 var dz = require('dezalgo')
6 var fs = require('fs') 7 var fs = require('fs')
7 var webtorrent = require('./webTorrentNode') 8 var webtorrent = require('./webTorrentNode')
8 9
@@ -67,19 +68,10 @@
67 return callback(err) 68 return callback(err)
68 } 69 }
69 70
70 // Now we'll send the video's meta data 71 // Now we'll add the video's meta data to our friends
71 params.namePath = null 72 params.namePath = null
72 73
73 logger.info('Sending %s video to friends.', video_file.path) 74 pods.addVideoToFriends(params)
74
75 var data = {
76 path: '/api/' + global.API_VERSION + '/remotevideos/add',
77 method: 'POST',
78 data: params
79 }
80
81 // Do not wait the secure requests
82 pods.makeSecureRequest(data)
83 callback(null) 75 callback(null)
84 }) 76 })
85 }) 77 })
@@ -124,16 +116,12 @@
124 return callback(err) 116 return callback(err)
125 } 117 }
126 118
127 var data = { 119 var params = {
128 path: '/api/' + global.API_VERSION + '/remotevideos/remove', 120 name: video.name,
129 method: 'POST', 121 magnetUri: video.magnetUri
130 data: {
131 magnetUri: video.magnetUri
132 }
133 } 122 }
134 123
135 // Yes this is a POST request because we add some informations in the body (signature, encrypt etc) 124 pods.removeVideoToFriends(params)
136 pods.makeSecureRequest(data)
137 callback(null) 125 callback(null)
138 }) 126 })
139 }) 127 })
@@ -142,49 +130,65 @@
142 } 130 }
143 131
144 // Use the magnet Uri because the _id field is not the same on different servers 132 // Use the magnet Uri because the _id field is not the same on different servers
145 videos.removeRemote = function (fromUrl, magnetUri, callback) { 133 videos.removeRemotes = function (fromUrl, magnetUris, callback) {
146 VideosDB.findOne({ magnetUri: magnetUri }, function (err, video) { 134 VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) {
147 if (err || !video) { 135 if (err || !videos) {
148 logger.error('Cannot find the torrent URI of this remote video.') 136 logger.error('Cannot find the torrent URI of these remote videos.')
149 return callback(err) 137 return callback(err)
150 } 138 }
151 139
152 // TODO: move to reqValidators middleware ? 140 var to_remove = []
153 if (video.podUrl !== fromUrl) { 141 async.each(videos, function (video, callback_async) {
154 logger.error('The pod has not the rights on this video.') 142 callback_async = dz(callback_async)
155 return callback(err)
156 }
157 143
158 VideosDB.findByIdAndRemove(video._id, function (err) { 144 if (video.podUrl !== fromUrl) {
159 if (err) { 145 logger.error('The pod %s has not the rights on the video of %s.', fromUrl, video.podUrl)
160 logger.error('Cannot remove the remote video.') 146 } else {
161 return callback(err) 147 to_remove.push(video._id)
162 } 148 }
163 149
164 callback(null) 150 callback_async()
151 }, function () {
152 VideosDB.remove({ _id: { $in: to_remove } }, function (err) {
153 if (err) {
154 logger.error('Cannot remove the remote videos.')
155 return callback(err)
156 }
157
158 callback(null)
159 })
165 }) 160 })
166 }) 161 })
167 } 162 }
168 163
169 // { name, magnetUri, podUrl } 164 // { name, magnetUri, podUrl }
170 videos.addRemote = function (data, callback) { 165 videos.addRemotes = function (videos, callback) {
171 logger.debug('Add remote video from pod: %s', data.podUrl) 166 var to_add = []
172
173 var params = {
174 name: data.name,
175 namePath: null,
176 description: data.description,
177 magnetUri: data.magnetUri,
178 podUrl: data.podUrl
179 }
180 167
181 VideosDB.create(params, function (err, video) { 168 async.each(videos, function (video, callback_each) {
182 if (err) { 169 callback_each = dz(callback_each)
183 logger.error('Cannot insert this remote video.', { error: err }) 170 logger.debug('Add remote video from pod: %s', video.podUrl)
184 return callback(err) 171
172 var params = {
173 name: video.name,
174 namePath: null,
175 description: video.description,
176 magnetUri: video.magnetUri,
177 podUrl: video.podUrl
185 } 178 }
186 179
187 return callback(null, video) 180 to_add.push(params)
181
182 callback_each()
183 }, function () {
184 VideosDB.create(to_add, function (err, videos) {
185 if (err) {
186 logger.error('Cannot insert this remote video.', { error: err })
187 return callback(err)
188 }
189
190 return callback(null, videos)
191 })
188 }) 192 })
189 } 193 }
190 194