diff options
-rw-r--r-- | server/initializers/constants.js | 12 | ||||
-rw-r--r-- | server/initializers/migrations/0020-requests-endpoint.js | 15 | ||||
-rw-r--r-- | server/lib/friends.js | 9 | ||||
-rw-r--r-- | server/models/request.js | 59 |
4 files changed, 68 insertions, 27 deletions
diff --git a/server/initializers/constants.js b/server/initializers/constants.js index 5ccd42773..0efbbb916 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js | |||
@@ -1,6 +1,7 @@ | |||
1 | 'use strict' | 1 | 'use strict' |
2 | 2 | ||
3 | const config = require('config') | 3 | const config = require('config') |
4 | const maxBy = require('lodash/maxBy') | ||
4 | const path = require('path') | 5 | const path = require('path') |
5 | 6 | ||
6 | // --------------------------------------------------------------------------- | 7 | // --------------------------------------------------------------------------- |
@@ -92,9 +93,13 @@ const MONGO_MIGRATION_SCRIPTS = [ | |||
92 | { | 93 | { |
93 | script: '0015-admin-role', | 94 | script: '0015-admin-role', |
94 | version: 15 | 95 | version: 15 |
96 | }, | ||
97 | { | ||
98 | script: '0020-requests-endpoint', | ||
99 | version: 20 | ||
95 | } | 100 | } |
96 | ] | 101 | ] |
97 | const LAST_MONGO_SCHEMA_VERSION = 15 | 102 | const LAST_MONGO_SCHEMA_VERSION = (maxBy(MONGO_MIGRATION_SCRIPTS, 'version'))['version'] |
98 | 103 | ||
99 | // --------------------------------------------------------------------------- | 104 | // --------------------------------------------------------------------------- |
100 | 105 | ||
@@ -116,6 +121,10 @@ const REQUESTS_LIMIT = 10 | |||
116 | // Number of requests to retry for replay requests module | 121 | // Number of requests to retry for replay requests module |
117 | const RETRY_REQUESTS = 5 | 122 | const RETRY_REQUESTS = 5 |
118 | 123 | ||
124 | const REQUEST_ENDPOINTS = { | ||
125 | VIDEOS: 'videos' | ||
126 | } | ||
127 | |||
119 | // --------------------------------------------------------------------------- | 128 | // --------------------------------------------------------------------------- |
120 | 129 | ||
121 | // Password encryption | 130 | // Password encryption |
@@ -162,6 +171,7 @@ module.exports = { | |||
162 | OAUTH_LIFETIME, | 171 | OAUTH_LIFETIME, |
163 | PAGINATION_COUNT_DEFAULT, | 172 | PAGINATION_COUNT_DEFAULT, |
164 | PODS_SCORE, | 173 | PODS_SCORE, |
174 | REQUEST_ENDPOINTS, | ||
165 | REQUESTS_IN_PARALLEL, | 175 | REQUESTS_IN_PARALLEL, |
166 | REQUESTS_INTERVAL, | 176 | REQUESTS_INTERVAL, |
167 | REQUESTS_LIMIT, | 177 | REQUESTS_LIMIT, |
diff --git a/server/initializers/migrations/0020-requests-endpoint.js b/server/initializers/migrations/0020-requests-endpoint.js new file mode 100644 index 000000000..55feec571 --- /dev/null +++ b/server/initializers/migrations/0020-requests-endpoint.js | |||
@@ -0,0 +1,15 @@ | |||
1 | /* | ||
2 | Set the endpoint videos for requests. | ||
3 | */ | ||
4 | |||
5 | const mongoose = require('mongoose') | ||
6 | |||
7 | const Request = mongoose.model('Request') | ||
8 | |||
9 | exports.up = function (callback) { | ||
10 | Request.update({ }, { endpoint: 'videos' }, callback) | ||
11 | } | ||
12 | |||
13 | exports.down = function (callback) { | ||
14 | throw new Error('Not implemented.') | ||
15 | } | ||
diff --git a/server/lib/friends.js b/server/lib/friends.js index 3f100545c..eafffaab0 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -28,7 +28,7 @@ const friends = { | |||
28 | } | 28 | } |
29 | 29 | ||
30 | function addVideoToFriends (video) { | 30 | function addVideoToFriends (video) { |
31 | createRequest('add', video) | 31 | createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video) |
32 | } | 32 | } |
33 | 33 | ||
34 | function hasFriends (callback) { | 34 | function hasFriends (callback) { |
@@ -119,7 +119,7 @@ function quitFriends (callback) { | |||
119 | } | 119 | } |
120 | 120 | ||
121 | function removeVideoToFriends (videoParams) { | 121 | function removeVideoToFriends (videoParams) { |
122 | createRequest('remove', videoParams) | 122 | createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams) |
123 | } | 123 | } |
124 | 124 | ||
125 | function sendOwnedVideosToPod (podId) { | 125 | function sendOwnedVideosToPod (podId) { |
@@ -137,7 +137,7 @@ function sendOwnedVideosToPod (podId) { | |||
137 | return | 137 | return |
138 | } | 138 | } |
139 | 139 | ||
140 | createRequest('add', remoteVideo, [ podId ]) | 140 | createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ]) |
141 | }) | 141 | }) |
142 | }) | 142 | }) |
143 | }) | 143 | }) |
@@ -250,8 +250,9 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
250 | }) | 250 | }) |
251 | } | 251 | } |
252 | 252 | ||
253 | function createRequest (type, data, to) { | 253 | function createRequest (type, endpoint, data, to) { |
254 | const req = new Request({ | 254 | const req = new Request({ |
255 | endpoint, | ||
255 | request: { | 256 | request: { |
256 | type: type, | 257 | type: type, |
257 | data: data | 258 | data: data |
diff --git a/server/models/request.js b/server/models/request.js index 34a4287ea..f5eec2134 100644 --- a/server/models/request.js +++ b/server/models/request.js | |||
@@ -2,6 +2,7 @@ | |||
2 | 2 | ||
3 | const each = require('async/each') | 3 | const each = require('async/each') |
4 | const eachLimit = require('async/eachLimit') | 4 | const eachLimit = require('async/eachLimit') |
5 | const values = require('lodash/values') | ||
5 | const mongoose = require('mongoose') | 6 | const mongoose = require('mongoose') |
6 | const waterfall = require('async/waterfall') | 7 | const waterfall = require('async/waterfall') |
7 | 8 | ||
@@ -18,7 +19,16 @@ let lastRequestTimestamp = 0 | |||
18 | 19 | ||
19 | const RequestSchema = mongoose.Schema({ | 20 | const RequestSchema = mongoose.Schema({ |
20 | request: mongoose.Schema.Types.Mixed, | 21 | request: mongoose.Schema.Types.Mixed, |
21 | to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'Pod' } ] | 22 | endpoint: { |
23 | type: String, | ||
24 | enum: [ values(constants.REQUEST_ENDPOINTS) ] | ||
25 | }, | ||
26 | to: [ | ||
27 | { | ||
28 | type: mongoose.Schema.Types.ObjectId, | ||
29 | ref: 'Pod' | ||
30 | } | ||
31 | ] | ||
22 | }) | 32 | }) |
23 | 33 | ||
24 | RequestSchema.statics = { | 34 | RequestSchema.statics = { |
@@ -93,7 +103,7 @@ function remainingMilliSeconds () { | |||
93 | // --------------------------------------------------------------------------- | 103 | // --------------------------------------------------------------------------- |
94 | 104 | ||
95 | // Make a requests to friends of a certain type | 105 | // Make a requests to friends of a certain type |
96 | function makeRequest (toPod, requestsToMake, callback) { | 106 | function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { |
97 | if (!callback) callback = function () {} | 107 | if (!callback) callback = function () {} |
98 | 108 | ||
99 | const params = { | 109 | const params = { |
@@ -101,7 +111,7 @@ function makeRequest (toPod, requestsToMake, callback) { | |||
101 | encrypt: true, // Security | 111 | encrypt: true, // Security |
102 | sign: true, // To prove our identity | 112 | sign: true, // To prove our identity |
103 | method: 'POST', | 113 | method: 'POST', |
104 | path: '/api/' + constants.API_VERSION + '/remote/videos', | 114 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, |
105 | data: requestsToMake // Requests we need to make | 115 | data: requestsToMake // Requests we need to make |
106 | } | 116 | } |
107 | 117 | ||
@@ -144,31 +154,34 @@ function makeRequests () { | |||
144 | 154 | ||
145 | logger.info('Making requests to friends.') | 155 | logger.info('Making requests to friends.') |
146 | 156 | ||
147 | // Requests by pods id | 157 | // We want to group requests by destinations pod and endpoint |
148 | const requestsToMake = {} | 158 | const requestsToMakeGrouped = {} |
149 | 159 | ||
150 | requests.forEach(function (poolRequest) { | 160 | requests.forEach(function (poolRequest) { |
151 | poolRequest.to.forEach(function (toPodId) { | 161 | poolRequest.to.forEach(function (toPodId) { |
152 | if (!requestsToMake[toPodId]) { | 162 | const hashKey = toPodId + poolRequest.endpoint |
153 | requestsToMake[toPodId] = { | 163 | if (!requestsToMakeGrouped[hashKey]) { |
154 | ids: [], | 164 | requestsToMakeGrouped[hashKey] = { |
155 | datas: [] | 165 | toPodId, |
166 | endpoint: poolRequest.endpoint, | ||
167 | ids: [], // pool request ids, to delete them from the DB in the future | ||
168 | datas: [] // requests data, | ||
156 | } | 169 | } |
157 | } | 170 | } |
158 | 171 | ||
159 | requestsToMake[toPodId].ids.push(poolRequest._id) | 172 | requestsToMakeGrouped[hashKey].ids.push(poolRequest._id) |
160 | requestsToMake[toPodId].datas.push(poolRequest.request) | 173 | requestsToMakeGrouped[hashKey].datas.push(poolRequest.request) |
161 | }) | 174 | }) |
162 | }) | 175 | }) |
163 | 176 | ||
164 | const goodPods = [] | 177 | const goodPods = [] |
165 | const badPods = [] | 178 | const badPods = [] |
166 | 179 | ||
167 | eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { | 180 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { |
168 | const requestToMake = requestsToMake[toPodId] | 181 | const requestToMake = requestsToMakeGrouped[hashKey] |
169 | 182 | ||
170 | // FIXME: mongodb request inside a loop :/ | 183 | // FIXME: mongodb request inside a loop :/ |
171 | Pod.load(toPodId, function (err, toPod) { | 184 | Pod.load(requestToMake.toPodId, function (err, toPod) { |
172 | if (err) { | 185 | if (err) { |
173 | logger.error('Error finding pod by id.', { err: err }) | 186 | logger.error('Error finding pod by id.', { err: err }) |
174 | return callbackEach() | 187 | return callbackEach() |
@@ -176,21 +189,23 @@ function makeRequests () { | |||
176 | 189 | ||
177 | // Maybe the pod is not our friend anymore so simply remove it | 190 | // Maybe the pod is not our friend anymore so simply remove it |
178 | if (!toPod) { | 191 | if (!toPod) { |
179 | logger.info('Removing %d requests of unexisting pod %s.', requestToMake.ids.length, toPodId) | 192 | const requestIdsToDelete = requestToMake.ids |
180 | removePodOf.call(self, requestToMake.ids, toPodId) | 193 | |
194 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) | ||
195 | removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) | ||
181 | return callbackEach() | 196 | return callbackEach() |
182 | } | 197 | } |
183 | 198 | ||
184 | makeRequest(toPod, requestToMake.datas, function (success) { | 199 | makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { |
185 | if (success === true) { | 200 | if (success === true) { |
186 | logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) | 201 | logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) |
187 | 202 | ||
188 | goodPods.push(toPodId) | 203 | goodPods.push(requestToMake.toPodId) |
189 | 204 | ||
190 | // Remove the pod id of these request ids | 205 | // Remove the pod id of these request ids |
191 | removePodOf.call(self, requestToMake.ids, toPodId, callbackEach) | 206 | removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach) |
192 | } else { | 207 | } else { |
193 | badPods.push(toPodId) | 208 | badPods.push(requestToMake.toPodId) |
194 | callbackEach() | 209 | callbackEach() |
195 | } | 210 | } |
196 | }) | 211 | }) |
@@ -260,7 +275,7 @@ function listWithLimitAndRandom (limit, callback) { | |||
260 | let start = Math.floor(Math.random() * count) - limit | 275 | let start = Math.floor(Math.random() * count) - limit |
261 | if (start < 0) start = 0 | 276 | if (start < 0) start = 0 |
262 | 277 | ||
263 | self.find({ }, { _id: 1, request: 1, to: 1 }).sort({ _id: 1 }).skip(start).limit(limit).exec(callback) | 278 | self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback) |
264 | }) | 279 | }) |
265 | } | 280 | } |
266 | 281 | ||