diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2015-12-04 16:13:32 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2015-12-04 16:13:32 +0100 |
commit | 0b69752270f1ceea06a29872b3db23660a55d6d3 (patch) | |
tree | 42da726633f3e48f4fe592cfd2c1ca14346a159b /src | |
parent | af82cae07dc568e3cb10acd70113df56eb8b15a9 (diff) | |
download | PeerTube-0b69752270f1ceea06a29872b3db23660a55d6d3.tar.gz PeerTube-0b69752270f1ceea06a29872b3db23660a55d6d3.tar.zst PeerTube-0b69752270f1ceea06a29872b3db23660a55d6d3.zip |
Add a pool of requests instead of making a request at each action (add
video/remove video) for performance in big networks
Diffstat (limited to 'src')
-rw-r--r-- | src/customValidators.js | 29 | ||||
-rw-r--r-- | src/database.js | 12 | ||||
-rw-r--r-- | src/pods.js | 89 | ||||
-rw-r--r-- | src/poolRequests.js | 158 | ||||
-rw-r--r-- | src/utils.js | 6 | ||||
-rw-r--r-- | src/videos.js | 100 |
6 files changed, 264 insertions, 130 deletions
diff --git a/src/customValidators.js b/src/customValidators.js new file mode 100644 index 000000000..73c2f8461 --- /dev/null +++ b/src/customValidators.js | |||
@@ -0,0 +1,29 @@ | |||
1 | ;(function () { | ||
2 | 'use strict' | ||
3 | |||
4 | var validator = require('validator') | ||
5 | |||
6 | var customValidators = {} | ||
7 | |||
8 | customValidators.eachIsRemoteVideosAddValid = function (values) { | ||
9 | return values.every(function (val) { | ||
10 | return validator.isLength(val.name, 1, 50) && | ||
11 | validator.isLength(val.description, 1, 50) && | ||
12 | validator.isLength(val.magnetUri, 10) && | ||
13 | validator.isURL(val.podUrl) | ||
14 | }) | ||
15 | } | ||
16 | |||
17 | customValidators.eachIsRemoteVideosRemoveValid = function (values) { | ||
18 | return values.every(function (val) { | ||
19 | return validator.isLength(val.magnetUri, 10) | ||
20 | }) | ||
21 | } | ||
22 | |||
23 | customValidators.isArray = function (value) { | ||
24 | return Array.isArray(value) | ||
25 | } | ||
26 | |||
27 | // ----------- Export ----------- | ||
28 | module.exports = customValidators | ||
29 | })() | ||
diff --git a/src/database.js b/src/database.js index 740e89fa4..514a622dc 100644 --- a/src/database.js +++ b/src/database.js | |||
@@ -30,6 +30,15 @@ | |||
30 | 30 | ||
31 | var PodsDB = mongoose.model('pods', podsSchema) | 31 | var PodsDB = mongoose.model('pods', podsSchema) |
32 | 32 | ||
33 | // ----------- PoolRequests ----------- | ||
34 | var poolRequestsSchema = mongoose.Schema({ | ||
35 | type: String, | ||
36 | id: String, // Special id to find duplicates (video created we want to remove...) | ||
37 | request: mongoose.Schema.Types.Mixed | ||
38 | }) | ||
39 | |||
40 | var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema) | ||
41 | |||
33 | // ----------- Connection ----------- | 42 | // ----------- Connection ----------- |
34 | 43 | ||
35 | mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname) | 44 | mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname) |
@@ -45,6 +54,7 @@ | |||
45 | // ----------- Export ----------- | 54 | // ----------- Export ----------- |
46 | module.exports = { | 55 | module.exports = { |
47 | VideosDB: VideosDB, | 56 | VideosDB: VideosDB, |
48 | PodsDB: PodsDB | 57 | PodsDB: PodsDB, |
58 | PoolRequestsDB: PoolRequestsDB | ||
49 | } | 59 | } |
50 | })() | 60 | })() |
diff --git a/src/pods.js b/src/pods.js index e26b3f0ae..9afc6cc96 100644 --- a/src/pods.js +++ b/src/pods.js | |||
@@ -8,6 +8,7 @@ | |||
8 | 8 | ||
9 | var logger = require('./logger') | 9 | var logger = require('./logger') |
10 | var PodsDB = require('./database').PodsDB | 10 | var PodsDB = require('./database').PodsDB |
11 | var poolRequests = require('./poolRequests') | ||
11 | var utils = require('./utils') | 12 | var utils = require('./utils') |
12 | 13 | ||
13 | var pods = {} | 14 | var pods = {} |
@@ -16,13 +17,6 @@ | |||
16 | var host = config.get('webserver.host') | 17 | var host = config.get('webserver.host') |
17 | var port = config.get('webserver.port') | 18 | var port = config.get('webserver.port') |
18 | 19 | ||
19 | // ----------- Constants ----------- | ||
20 | |||
21 | var PODS_SCORE = { | ||
22 | MALUS: -10, | ||
23 | BONUS: 10 | ||
24 | } | ||
25 | |||
26 | // ----------- Private functions ----------- | 20 | // ----------- Private functions ----------- |
27 | 21 | ||
28 | function getForeignPodsList (url, callback) { | 22 | function getForeignPodsList (url, callback) { |
@@ -34,25 +28,6 @@ | |||
34 | }) | 28 | }) |
35 | } | 29 | } |
36 | 30 | ||
37 | function updatePodsScore (good_pods, bad_pods) { | ||
38 | logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) | ||
39 | |||
40 | PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec() | ||
41 | PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) { | ||
42 | if (err) throw err | ||
43 | removeBadPods() | ||
44 | }) | ||
45 | } | ||
46 | |||
47 | function removeBadPods () { | ||
48 | PodsDB.remove({ score: 0 }, function (err, result) { | ||
49 | if (err) throw err | ||
50 | |||
51 | var number_removed = result.result.n | ||
52 | if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) | ||
53 | }) | ||
54 | } | ||
55 | |||
56 | // ----------- Public functions ----------- | 31 | // ----------- Public functions ----------- |
57 | 32 | ||
58 | pods.list = function (callback) { | 33 | pods.list = function (callback) { |
@@ -93,58 +68,16 @@ | |||
93 | }) | 68 | }) |
94 | } | 69 | } |
95 | 70 | ||
96 | // { path, data } | 71 | pods.addVideoToFriends = function (video) { |
97 | pods.makeSecureRequest = function (data, callback) { | 72 | // To avoid duplicates |
98 | if (callback === undefined) callback = function () {} | 73 | var id = video.name + video.magnetUri |
99 | 74 | poolRequests.addToPoolRequests(id, 'add', video) | |
100 | PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { | 75 | } |
101 | if (err) { | ||
102 | logger.error('Cannot get the list of the pods.', { error: err }) | ||
103 | return callback(err) | ||
104 | } | ||
105 | |||
106 | logger.debug('Make multiple requests.') | ||
107 | |||
108 | var params = { | ||
109 | encrypt: true, | ||
110 | sign: true, | ||
111 | method: data.method, | ||
112 | path: data.path, | ||
113 | data: data.data | ||
114 | } | ||
115 | |||
116 | var bad_pods = [] | ||
117 | var good_pods = [] | ||
118 | |||
119 | utils.makeMultipleRetryRequest( | ||
120 | params, | ||
121 | |||
122 | pods, | ||
123 | |||
124 | function callbackEachPodFinished (err, response, body, pod, callback_each_pod_finished) { | ||
125 | if (err || response.statusCode !== 200) { | ||
126 | bad_pods.push(pod._id) | ||
127 | logger.error('Error sending secure request to %s/%s pod.', pod.url, data.path, { error: err }) | ||
128 | } else { | ||
129 | good_pods.push(pod._id) | ||
130 | } | ||
131 | |||
132 | return callback_each_pod_finished() | ||
133 | }, | ||
134 | |||
135 | function callbackAllPodsFinished (err) { | ||
136 | if (err) { | ||
137 | logger.error('There was some errors when sending the video meta data.', { error: err }) | ||
138 | return callback(err) | ||
139 | } | ||
140 | |||
141 | logger.debug('Finished') | ||
142 | 76 | ||
143 | updatePodsScore(good_pods, bad_pods) | 77 | pods.removeVideoToFriends = function (video) { |
144 | callback(null) | 78 | // To avoid duplicates |
145 | } | 79 | var id = video.name + video.magnetUri |
146 | ) | 80 | poolRequests.addToPoolRequests(id, 'remove', video) |
147 | }) | ||
148 | } | 81 | } |
149 | 82 | ||
150 | pods.makeFriends = function (callback) { | 83 | pods.makeFriends = function (callback) { |
@@ -214,7 +147,7 @@ | |||
214 | 147 | ||
215 | pods_list, | 148 | pods_list, |
216 | 149 | ||
217 | function eachRequest (err, response, body, pod, callback_each_request) { | 150 | function eachRequest (err, response, body, url, pod, callback_each_request) { |
218 | // We add the pod if it responded correctly with its public certificate | 151 | // We add the pod if it responded correctly with its public certificate |
219 | if (!err && response.statusCode === 200) { | 152 | if (!err && response.statusCode === 200) { |
220 | pods.add({ url: pod.url, publicKey: body.cert, score: global.FRIEND_BASE_SCORE }, function (err) { | 153 | pods.add({ url: pod.url, publicKey: body.cert, score: global.FRIEND_BASE_SCORE }, function (err) { |
diff --git a/src/poolRequests.js b/src/poolRequests.js new file mode 100644 index 000000000..b117c9923 --- /dev/null +++ b/src/poolRequests.js | |||
@@ -0,0 +1,158 @@ | |||
1 | ;(function () { | ||
2 | 'use strict' | ||
3 | |||
4 | var async = require('async') | ||
5 | |||
6 | var logger = require('./logger') | ||
7 | var database = require('./database') | ||
8 | var PoolRequestsDB = database.PoolRequestsDB | ||
9 | var PodsDB = database.PodsDB | ||
10 | var utils = require('./utils') | ||
11 | |||
12 | var poolRequests = {} | ||
13 | |||
14 | // ----------- Constants ----------- | ||
15 | |||
16 | // Time to wait between requests to the friends | ||
17 | var INTERVAL = utils.isTestInstance() ? 10000 : 60000 | ||
18 | var PODS_SCORE = { | ||
19 | MALUS: -10, | ||
20 | BONUS: 10 | ||
21 | } | ||
22 | |||
23 | // ----------- Private ----------- | ||
24 | var timer = null | ||
25 | |||
26 | function makePoolRequests () { | ||
27 | logger.info('Making pool requests to friends.') | ||
28 | |||
29 | PoolRequestsDB.find({}, { type: 1, request: 1 }, function (err, pool_requests) { | ||
30 | if (err) throw err | ||
31 | |||
32 | var requests = { | ||
33 | add: [], | ||
34 | remove: [] | ||
35 | } | ||
36 | |||
37 | async.each(pool_requests, function (pool_request, callback_each) { | ||
38 | if (pool_request.type === 'add') { | ||
39 | requests.add.push(pool_request.request) | ||
40 | } else if (pool_request.type === 'remove') { | ||
41 | requests.remove.push(pool_request.request) | ||
42 | } else { | ||
43 | throw new Error('Unkown pool request type.') | ||
44 | } | ||
45 | |||
46 | callback_each() | ||
47 | }, function () { | ||
48 | makePoolRequest('add', requests.add) | ||
49 | makePoolRequest('remove', requests.remove) | ||
50 | logger.info('Pool requests to friends sent.') | ||
51 | }) | ||
52 | }) | ||
53 | } | ||
54 | |||
55 | function updatePodsScore (good_pods, bad_pods) { | ||
56 | logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) | ||
57 | |||
58 | PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec() | ||
59 | PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) { | ||
60 | if (err) throw err | ||
61 | removeBadPods() | ||
62 | }) | ||
63 | } | ||
64 | |||
65 | function removeBadPods () { | ||
66 | PodsDB.remove({ score: 0 }, function (err, result) { | ||
67 | if (err) throw err | ||
68 | |||
69 | var number_removed = result.result.n | ||
70 | if (number_removed !== 0) logger.info('Removed %d pod.', number_removed) | ||
71 | }) | ||
72 | } | ||
73 | |||
74 | function makePoolRequest (type, requests) { | ||
75 | logger.debug('Make pool requests scheduled.') | ||
76 | PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { | ||
77 | if (err) throw err | ||
78 | |||
79 | var params = { | ||
80 | encrypt: true, | ||
81 | sign: true, | ||
82 | method: 'POST', | ||
83 | path: null, | ||
84 | data: requests | ||
85 | } | ||
86 | |||
87 | if (type === 'add') { | ||
88 | params.path = '/api/' + global.API_VERSION + '/remotevideos/add' | ||
89 | } else if (type === 'remove') { | ||
90 | params.path = '/api/' + global.API_VERSION + '/remotevideos/remove' | ||
91 | } else { | ||
92 | throw new Error('Unkown pool request type.') | ||
93 | } | ||
94 | |||
95 | var bad_pods = [] | ||
96 | var good_pods = [] | ||
97 | |||
98 | utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | ||
99 | |||
100 | function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { | ||
101 | if (err || response.statusCode !== 200) { | ||
102 | bad_pods.push(pod._id) | ||
103 | logger.error('Error sending secure request to %s pod.', url, { error: err }) | ||
104 | } else { | ||
105 | good_pods.push(pod._id) | ||
106 | } | ||
107 | |||
108 | return callback_each_pod_finished() | ||
109 | } | ||
110 | |||
111 | function callbackAllPodsFinished (err) { | ||
112 | if (err) { | ||
113 | logger.error('There was some errors when sending the video meta data.', { error: err }) | ||
114 | } | ||
115 | |||
116 | updatePodsScore(good_pods, bad_pods) | ||
117 | PoolRequestsDB.remove().exec() | ||
118 | } | ||
119 | }) | ||
120 | } | ||
121 | |||
122 | // ----------- Public ----------- | ||
123 | poolRequests.activate = function () { | ||
124 | logger.info('Pool requests activated.') | ||
125 | timer = setInterval(makePoolRequests, INTERVAL) | ||
126 | } | ||
127 | |||
128 | poolRequests.addToPoolRequests = function (id, type, request) { | ||
129 | logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) | ||
130 | |||
131 | PoolRequestsDB.findOne({ id: id }, function (err, entity) { | ||
132 | if (err) logger.error(err) | ||
133 | |||
134 | if (entity) { | ||
135 | if (entity.type === type) { | ||
136 | logger.error(new Error('Cannot insert two same requests.')) | ||
137 | return | ||
138 | } | ||
139 | |||
140 | // Remove the request of the other type | ||
141 | PoolRequestsDB.remove({ id: id }, function (err) { | ||
142 | if (err) logger.error(err) | ||
143 | }) | ||
144 | } else { | ||
145 | PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { | ||
146 | if (err) logger.error(err) | ||
147 | }) | ||
148 | } | ||
149 | }) | ||
150 | } | ||
151 | |||
152 | poolRequests.deactivate = function () { | ||
153 | logger.info('Pool requests deactivated.') | ||
154 | clearInterval(timer) | ||
155 | } | ||
156 | |||
157 | module.exports = poolRequests | ||
158 | })() | ||
diff --git a/src/utils.js b/src/utils.js index dda6c7a0a..4aa1fc55e 100644 --- a/src/utils.js +++ b/src/utils.js | |||
@@ -36,7 +36,7 @@ | |||
36 | 36 | ||
37 | replay( | 37 | replay( |
38 | request.post(params, function (err, response, body) { | 38 | request.post(params, function (err, response, body) { |
39 | callbackEach(err, response, body, to_pod) | 39 | callbackEach(err, response, body, params.url, to_pod) |
40 | }), | 40 | }), |
41 | { | 41 | { |
42 | retries: retries, | 42 | retries: retries, |
@@ -71,8 +71,8 @@ | |||
71 | 71 | ||
72 | // Make a request for each pod | 72 | // Make a request for each pod |
73 | async.each(pods, function (pod, callback_each_async) { | 73 | async.each(pods, function (pod, callback_each_async) { |
74 | function callbackEachRetryRequest (err, response, body, pod) { | 74 | function callbackEachRetryRequest (err, response, body, url, pod) { |
75 | callbackEach(err, response, body, pod, function () { | 75 | callbackEach(err, response, body, url, pod, function () { |
76 | callback_each_async() | 76 | callback_each_async() |
77 | }) | 77 | }) |
78 | } | 78 | } |
diff --git a/src/videos.js b/src/videos.js index 8c44cad95..e3a5b49f1 100644 --- a/src/videos.js +++ b/src/videos.js | |||
@@ -3,6 +3,7 @@ | |||
3 | 3 | ||
4 | var async = require('async') | 4 | var async = require('async') |
5 | var config = require('config') | 5 | var config = require('config') |
6 | var dz = require('dezalgo') | ||
6 | var fs = require('fs') | 7 | var fs = require('fs') |
7 | var webtorrent = require('./webTorrentNode') | 8 | var webtorrent = require('./webTorrentNode') |
8 | 9 | ||
@@ -67,19 +68,10 @@ | |||
67 | return callback(err) | 68 | return callback(err) |
68 | } | 69 | } |
69 | 70 | ||
70 | // Now we'll send the video's meta data | 71 | // Now we'll add the video's meta data to our friends |
71 | params.namePath = null | 72 | params.namePath = null |
72 | 73 | ||
73 | logger.info('Sending %s video to friends.', video_file.path) | 74 | pods.addVideoToFriends(params) |
74 | |||
75 | var data = { | ||
76 | path: '/api/' + global.API_VERSION + '/remotevideos/add', | ||
77 | method: 'POST', | ||
78 | data: params | ||
79 | } | ||
80 | |||
81 | // Do not wait the secure requests | ||
82 | pods.makeSecureRequest(data) | ||
83 | callback(null) | 75 | callback(null) |
84 | }) | 76 | }) |
85 | }) | 77 | }) |
@@ -124,16 +116,12 @@ | |||
124 | return callback(err) | 116 | return callback(err) |
125 | } | 117 | } |
126 | 118 | ||
127 | var data = { | 119 | var params = { |
128 | path: '/api/' + global.API_VERSION + '/remotevideos/remove', | 120 | name: video.name, |
129 | method: 'POST', | 121 | magnetUri: video.magnetUri |
130 | data: { | ||
131 | magnetUri: video.magnetUri | ||
132 | } | ||
133 | } | 122 | } |
134 | 123 | ||
135 | // Yes this is a POST request because we add some informations in the body (signature, encrypt etc) | 124 | pods.removeVideoToFriends(params) |
136 | pods.makeSecureRequest(data) | ||
137 | callback(null) | 125 | callback(null) |
138 | }) | 126 | }) |
139 | }) | 127 | }) |
@@ -142,49 +130,65 @@ | |||
142 | } | 130 | } |
143 | 131 | ||
144 | // Use the magnet Uri because the _id field is not the same on different servers | 132 | // Use the magnet Uri because the _id field is not the same on different servers |
145 | videos.removeRemote = function (fromUrl, magnetUri, callback) { | 133 | videos.removeRemotes = function (fromUrl, magnetUris, callback) { |
146 | VideosDB.findOne({ magnetUri: magnetUri }, function (err, video) { | 134 | VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { |
147 | if (err || !video) { | 135 | if (err || !videos) { |
148 | logger.error('Cannot find the torrent URI of this remote video.') | 136 | logger.error('Cannot find the torrent URI of these remote videos.') |
149 | return callback(err) | 137 | return callback(err) |
150 | } | 138 | } |
151 | 139 | ||
152 | // TODO: move to reqValidators middleware ? | 140 | var to_remove = [] |
153 | if (video.podUrl !== fromUrl) { | 141 | async.each(videos, function (video, callback_async) { |
154 | logger.error('The pod has not the rights on this video.') | 142 | callback_async = dz(callback_async) |
155 | return callback(err) | ||
156 | } | ||
157 | 143 | ||
158 | VideosDB.findByIdAndRemove(video._id, function (err) { | 144 | if (video.podUrl !== fromUrl) { |
159 | if (err) { | 145 | logger.error('The pod %s has not the rights on the video of %s.', fromUrl, video.podUrl) |
160 | logger.error('Cannot remove the remote video.') | 146 | } else { |
161 | return callback(err) | 147 | to_remove.push(video._id) |
162 | } | 148 | } |
163 | 149 | ||
164 | callback(null) | 150 | callback_async() |
151 | }, function () { | ||
152 | VideosDB.remove({ _id: { $in: to_remove } }, function (err) { | ||
153 | if (err) { | ||
154 | logger.error('Cannot remove the remote videos.') | ||
155 | return callback(err) | ||
156 | } | ||
157 | |||
158 | callback(null) | ||
159 | }) | ||
165 | }) | 160 | }) |
166 | }) | 161 | }) |
167 | } | 162 | } |
168 | 163 | ||
169 | // { name, magnetUri, podUrl } | 164 | // { name, magnetUri, podUrl } |
170 | videos.addRemote = function (data, callback) { | 165 | videos.addRemotes = function (videos, callback) { |
171 | logger.debug('Add remote video from pod: %s', data.podUrl) | 166 | var to_add = [] |
172 | |||
173 | var params = { | ||
174 | name: data.name, | ||
175 | namePath: null, | ||
176 | description: data.description, | ||
177 | magnetUri: data.magnetUri, | ||
178 | podUrl: data.podUrl | ||
179 | } | ||
180 | 167 | ||
181 | VideosDB.create(params, function (err, video) { | 168 | async.each(videos, function (video, callback_each) { |
182 | if (err) { | 169 | callback_each = dz(callback_each) |
183 | logger.error('Cannot insert this remote video.', { error: err }) | 170 | logger.debug('Add remote video from pod: %s', video.podUrl) |
184 | return callback(err) | 171 | |
172 | var params = { | ||
173 | name: video.name, | ||
174 | namePath: null, | ||
175 | description: video.description, | ||
176 | magnetUri: video.magnetUri, | ||
177 | podUrl: video.podUrl | ||
185 | } | 178 | } |
186 | 179 | ||
187 | return callback(null, video) | 180 | to_add.push(params) |
181 | |||
182 | callback_each() | ||
183 | }, function () { | ||
184 | VideosDB.create(to_add, function (err, videos) { | ||
185 | if (err) { | ||
186 | logger.error('Cannot insert this remote video.', { error: err }) | ||
187 | return callback(err) | ||
188 | } | ||
189 | |||
190 | return callback(null, videos) | ||
191 | }) | ||
188 | }) | 192 | }) |
189 | } | 193 | } |
190 | 194 | ||