aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server.js5
-rw-r--r--server/lib/friends.js59
-rw-r--r--server/lib/request-scheduler.js202
-rw-r--r--server/models/request.js208
4 files changed, 247 insertions, 227 deletions
diff --git a/server.js b/server.js
index 7503072af..4a0de72bb 100644
--- a/server.js
+++ b/server.js
@@ -37,6 +37,7 @@ if (errorMessage !== null) {
37 37
38// ----------- PeerTube modules ----------- 38// ----------- PeerTube modules -----------
39const customValidators = require('./server/helpers/custom-validators') 39const customValidators = require('./server/helpers/custom-validators')
40const friends = require('./server/lib/friends')
40const installer = require('./server/initializers/installer') 41const installer = require('./server/initializers/installer')
41const migrator = require('./server/initializers/migrator') 42const migrator = require('./server/initializers/migrator')
42const routes = require('./server/controllers') 43const routes = require('./server/controllers')
@@ -128,8 +129,8 @@ installer.installApplication(function (err) {
128 129
129 // ----------- Make the server listening ----------- 130 // ----------- Make the server listening -----------
130 server.listen(port, function () { 131 server.listen(port, function () {
131 // Activate the pool requests 132 // Activate the communication with friends
132 db.Request.activate() 133 friends.activate()
133 134
134 logger.info('Server listening on port %d', port) 135 logger.info('Server listening on port %d', port)
135 logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL) 136 logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL)
diff --git a/server/lib/friends.js b/server/lib/friends.js
index 9b38693c7..7dfa62a2a 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -11,10 +11,13 @@ const db = require('../initializers/database')
11const logger = require('../helpers/logger') 11const logger = require('../helpers/logger')
12const peertubeCrypto = require('../helpers/peertube-crypto') 12const peertubeCrypto = require('../helpers/peertube-crypto')
13const requests = require('../helpers/requests') 13const requests = require('../helpers/requests')
14const RequestScheduler = require('./request-scheduler')
14 15
15const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] 16const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
17const requestScheduler = new RequestScheduler('')
16 18
17const friends = { 19const friends = {
20 activate,
18 addVideoToFriends, 21 addVideoToFriends,
19 updateVideoToFriends, 22 updateVideoToFriends,
20 reportAbuseVideoToFriend, 23 reportAbuseVideoToFriend,
@@ -25,6 +28,10 @@ const friends = {
25 sendOwnedVideosToPod 28 sendOwnedVideosToPod
26} 29}
27 30
31function activate () {
32 requestScheduler.activate()
33}
34
28function addVideoToFriends (videoData, transaction, callback) { 35function addVideoToFriends (videoData, transaction, callback) {
29 const options = { 36 const options = {
30 type: ENDPOINT_ACTIONS.ADD, 37 type: ENDPOINT_ACTIONS.ADD,
@@ -99,11 +106,11 @@ function makeFriends (hosts, callback) {
99 106
100function quitFriends (callback) { 107function quitFriends (callback) {
101 // Stop pool requests 108 // Stop pool requests
102 db.Request.deactivate() 109 requestScheduler.deactivate()
103 110
104 waterfall([ 111 waterfall([
105 function flushRequests (callbackAsync) { 112 function flushRequests (callbackAsync) {
106 db.Request.flush(callbackAsync) 113 requestScheduler.flush(callbackAsync)
107 }, 114 },
108 115
109 function getPodsList (callbackAsync) { 116 function getPodsList (callbackAsync) {
@@ -140,7 +147,7 @@ function quitFriends (callback) {
140 } 147 }
141 ], function (err) { 148 ], function (err) {
142 // Don't forget to re activate the scheduler, even if there was an error 149 // Don't forget to re activate the scheduler, even if there was an error
143 db.Request.activate() 150 requestScheduler.activate()
144 151
145 if (err) return callback(err) 152 if (err) return callback(err)
146 153
@@ -235,9 +242,9 @@ function getForeignPodsList (host, callback) {
235 242
236function makeRequestsToWinningPods (cert, podsList, callback) { 243function makeRequestsToWinningPods (cert, podsList, callback) {
237 // Stop pool requests 244 // Stop pool requests
238 db.Request.deactivate() 245 requestScheduler.deactivate()
239 // Flush pool requests 246 // Flush pool requests
240 db.Request.forceSend() 247 requestScheduler.forceSend()
241 248
242 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 249 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
243 const params = { 250 const params = {
@@ -278,7 +285,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
278 }, function endRequests () { 285 }, function endRequests () {
279 // Final callback, we've ended all the requests 286 // Final callback, we've ended all the requests
280 // Now we made new friends, we can re activate the pool of requests 287 // Now we made new friends, we can re activate the pool of requests
281 db.Request.activate() 288 requestScheduler.activate()
282 289
283 logger.debug('makeRequestsToWinningPods finished.') 290 logger.debug('makeRequestsToWinningPods finished.')
284 return callback() 291 return callback()
@@ -289,7 +296,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
289// { type, endpoint, data, toIds, transaction } 296// { type, endpoint, data, toIds, transaction }
290function createRequest (options, callback) { 297function createRequest (options, callback) {
291 if (!callback) callback = function () {} 298 if (!callback) callback = function () {}
292 if (options.toIds) return _createRequest(options, callback) 299 if (options.toIds) return requestScheduler.createRequest(options, callback)
293 300
294 // If the "toIds" pods is not specified, we send the request to all our friends 301 // If the "toIds" pods is not specified, we send the request to all our friends
295 db.Pod.listAllIds(options.transaction, function (err, podIds) { 302 db.Pod.listAllIds(options.transaction, function (err, podIds) {
@@ -299,43 +306,7 @@ function createRequest (options, callback) {
299 } 306 }
300 307
301 const newOptions = Object.assign(options, { toIds: podIds }) 308 const newOptions = Object.assign(options, { toIds: podIds })
302 return _createRequest(newOptions, callback) 309 return requestScheduler.createRequest(newOptions, callback)
303 })
304}
305
306// { type, endpoint, data, toIds, transaction }
307function _createRequest (options, callback) {
308 const type = options.type
309 const endpoint = options.endpoint
310 const data = options.data
311 const toIds = options.toIds
312 const transaction = options.transaction
313
314 const pods = []
315
316 // If there are no destination pods abort
317 if (toIds.length === 0) return callback(null)
318
319 toIds.forEach(function (toPod) {
320 pods.push(db.Pod.build({ id: toPod }))
321 })
322
323 const createQuery = {
324 endpoint,
325 request: {
326 type: type,
327 data: data
328 }
329 }
330
331 const dbRequestOptions = {
332 transaction
333 }
334
335 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
336 if (err) return callback(err)
337
338 return request.setPods(pods, dbRequestOptions).asCallback(callback)
339 }) 310 })
340} 311}
341 312
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js
new file mode 100644
index 000000000..c8bc4af28
--- /dev/null
+++ b/server/lib/request-scheduler.js
@@ -0,0 +1,202 @@
1'use strict'
2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../initializers/constants')
6const db = require('../initializers/database')
7const logger = require('../helpers/logger')
8const requests = require('../helpers/requests')
9
10module.exports = class RequestScheduler {
11
12 constructor (name) {
13 this.name = name
14
15 this.lastRequestTimestamp = 0
16 this.timer = null
17 }
18
19 activate () {
20 logger.info('Requests scheduler activated.')
21 this.lastRequestTimestamp = Date.now()
22
23 this.timer = setInterval(() => {
24 this.lastRequestTimestamp = Date.now()
25 this.makeRequests()
26 }, constants.REQUESTS_INTERVAL)
27 }
28
29 deactivate () {
30 logger.info('Requests scheduler deactivated.')
31 clearInterval(this.timer)
32 this.timer = null
33 }
34
35 forceSend () {
36 logger.info('Force requests scheduler sending.')
37 this.makeRequests()
38 }
39
40 remainingMilliSeconds () {
41 if (this.timer === null) return -1
42
43 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
44 }
45
46 // { type, endpoint, data, toIds, transaction }
47 createRequest (options, callback) {
48 const type = options.type
49 const endpoint = options.endpoint
50 const data = options.data
51 const toIds = options.toIds
52 const transaction = options.transaction
53
54 const pods = []
55
56 // If there are no destination pods abort
57 if (toIds.length === 0) return callback(null)
58
59 toIds.forEach(toPod => {
60 pods.push(db.Pod.build({ id: toPod }))
61 })
62
63 const createQuery = {
64 endpoint,
65 request: {
66 type: type,
67 data: data
68 }
69 }
70
71 const dbRequestOptions = {
72 transaction
73 }
74
75 return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => {
76 if (err) return callback(err)
77
78 return request.setPods(pods, dbRequestOptions).asCallback(callback)
79 })
80 }
81
82 // ---------------------------------------------------------------------------
83
84 // Make all the requests of the scheduler
85 makeRequests () {
86 // We limit the size of the requests
87 // We don't want to stuck with the same failing requests so we get a random list
88 db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
89 if (err) {
90 logger.error('Cannot get the list of requests.', { err: err })
91 return // Abort
92 }
93
94 // If there are no requests, abort
95 if (requests.length === 0) {
96 logger.info('No requests to make.')
97 return
98 }
99
100 // We want to group requests by destinations pod and endpoint
101 const requestsToMakeGrouped = this.buildRequestObjects(requests)
102
103 logger.info('Making requests to friends.')
104
105 const goodPods = []
106 const badPods = []
107
108 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
109 const requestToMake = requestsToMakeGrouped[hashKey]
110 const toPod = requestToMake.toPod
111
112 // Maybe the pod is not our friend anymore so simply remove it
113 if (!toPod) {
114 const requestIdsToDelete = requestToMake.ids
115
116 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
117 return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
118 }
119
120 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
121 if (success === false) {
122 badPods.push(requestToMake.toPod.id)
123 return callbackEach()
124 }
125
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
127 goodPods.push(requestToMake.toPod.id)
128
129 // Remove the pod id of these request ids
130 db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
131 })
132 }, () => {
133 // All the requests were made, we update the pods score
134 db.Request.updatePodsScore(goodPods, badPods)
135 // Flush requests with no pod
136 db.Request.removeWithEmptyTo(err => {
137 if (err) logger.error('Error when removing requests with no pods.', { error: err })
138 })
139 })
140 })
141 }
142
143 // Make a requests to friends of a certain type
144 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
145 if (!callback) callback = function () {}
146
147 const params = {
148 toPod: toPod,
149 sign: true, // Prove our identity
150 method: 'POST',
151 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
152 data: requestsToMake // Requests we need to make
153 }
154
155 // Make multiple retry requests to all of pods
156 // The function fire some useful callbacks
157 requests.makeSecureRequest(params, (err, res) => {
158 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
159 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
160 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
161
162 return callback(false)
163 }
164
165 return callback(true)
166 })
167 }
168
169 buildRequestObjects (requests) {
170 const requestsToMakeGrouped = {}
171
172 Object.keys(requests).forEach(toPodId => {
173 requests[toPodId].forEach(data => {
174 const request = data.request
175 const pod = data.pod
176 const hashKey = toPodId + request.endpoint
177
178 if (!requestsToMakeGrouped[hashKey]) {
179 requestsToMakeGrouped[hashKey] = {
180 toPod: pod,
181 endpoint: request.endpoint,
182 ids: [], // request ids, to delete them from the DB in the future
183 datas: [] // requests data,
184 }
185 }
186
187 requestsToMakeGrouped[hashKey].ids.push(request.id)
188 requestsToMakeGrouped[hashKey].datas.push(request.request)
189 })
190 })
191
192 return requestsToMakeGrouped
193 }
194
195 flush (callback) {
196 db.Request.removeAll(err => {
197 if (err) logger.error('Cannot flush the requests.', { error: err })
198
199 return callback(err)
200 })
201 }
202}
diff --git a/server/models/request.js b/server/models/request.js
index baa26fc1b..ca616d130 100644
--- a/server/models/request.js
+++ b/server/models/request.js
@@ -1,16 +1,11 @@
1'use strict' 1'use strict'
2 2
3const each = require('async/each') 3const each = require('async/each')
4const eachLimit = require('async/eachLimit')
5const waterfall = require('async/waterfall') 4const waterfall = require('async/waterfall')
6const values = require('lodash/values') 5const values = require('lodash/values')
7 6
8const constants = require('../initializers/constants') 7const constants = require('../initializers/constants')
9const logger = require('../helpers/logger') 8const logger = require('../helpers/logger')
10const requests = require('../helpers/requests')
11
12let timer = null
13let lastRequestTimestamp = 0
14 9
15// --------------------------------------------------------------------------- 10// ---------------------------------------------------------------------------
16 11
@@ -30,12 +25,13 @@ module.exports = function (sequelize, DataTypes) {
30 classMethods: { 25 classMethods: {
31 associate, 26 associate,
32 27
33 activate, 28 listWithLimitAndRandom,
29
34 countTotalRequests, 30 countTotalRequests,
35 deactivate, 31 removeBadPods,
36 flush, 32 updatePodsScore,
37 forceSend, 33 removeAll,
38 remainingMilliSeconds 34 removeWithEmptyTo
39 } 35 }
40 } 36 }
41 ) 37 )
@@ -56,17 +52,6 @@ function associate (models) {
56 }) 52 })
57} 53}
58 54
59function activate () {
60 logger.info('Requests scheduler activated.')
61 lastRequestTimestamp = Date.now()
62
63 const self = this
64 timer = setInterval(function () {
65 lastRequestTimestamp = Date.now()
66 makeRequests.call(self)
67 }, constants.REQUESTS_INTERVAL)
68}
69
70function countTotalRequests (callback) { 55function countTotalRequests (callback) {
71 const query = { 56 const query = {
72 include: [ this.sequelize.models.Pod ] 57 include: [ this.sequelize.models.Pod ]
@@ -75,147 +60,6 @@ function countTotalRequests (callback) {
75 return this.count(query).asCallback(callback) 60 return this.count(query).asCallback(callback)
76} 61}
77 62
78function deactivate () {
79 logger.info('Requests scheduler deactivated.')
80 clearInterval(timer)
81 timer = null
82}
83
84function flush (callback) {
85 removeAll.call(this, function (err) {
86 if (err) logger.error('Cannot flush the requests.', { error: err })
87
88 return callback(err)
89 })
90}
91
92function forceSend () {
93 logger.info('Force requests scheduler sending.')
94 makeRequests.call(this)
95}
96
97function remainingMilliSeconds () {
98 if (timer === null) return -1
99
100 return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp)
101}
102
103// ---------------------------------------------------------------------------
104
105// Make a requests to friends of a certain type
106function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
107 if (!callback) callback = function () {}
108
109 const params = {
110 toPod: toPod,
111 sign: true, // Prove our identity
112 method: 'POST',
113 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
114 data: requestsToMake // Requests we need to make
115 }
116
117 // Make multiple retry requests to all of pods
118 // The function fire some useful callbacks
119 requests.makeSecureRequest(params, function (err, res) {
120 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
121 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
122 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
123
124 return callback(false)
125 }
126
127 return callback(true)
128 })
129}
130
131// Make all the requests of the scheduler
132function makeRequests () {
133 const self = this
134 const RequestToPod = this.sequelize.models.RequestToPod
135
136 // We limit the size of the requests
137 // We don't want to stuck with the same failing requests so we get a random list
138 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) {
139 if (err) {
140 logger.error('Cannot get the list of requests.', { err: err })
141 return // Abort
142 }
143
144 // If there are no requests, abort
145 if (requests.length === 0) {
146 logger.info('No requests to make.')
147 return
148 }
149
150 // We want to group requests by destinations pod and endpoint
151 const requestsToMakeGrouped = buildRequestObjects(requests)
152
153 logger.info('Making requests to friends.')
154
155 const goodPods = []
156 const badPods = []
157
158 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
159 const requestToMake = requestsToMakeGrouped[hashKey]
160 const toPod = requestToMake.toPod
161
162 // Maybe the pod is not our friend anymore so simply remove it
163 if (!toPod) {
164 const requestIdsToDelete = requestToMake.ids
165
166 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
167 return RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
168 }
169
170 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
171 if (success === false) {
172 badPods.push(requestToMake.toPod.id)
173 return callbackEach()
174 }
175
176 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
177 goodPods.push(requestToMake.toPod.id)
178
179 // Remove the pod id of these request ids
180 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
181 })
182 }, function () {
183 // All the requests were made, we update the pods score
184 updatePodsScore.call(self, goodPods, badPods)
185 // Flush requests with no pod
186 removeWithEmptyTo.call(self, function (err) {
187 if (err) logger.error('Error when removing requests with no pods.', { error: err })
188 })
189 })
190 })
191}
192
193function buildRequestObjects (requests) {
194 const requestsToMakeGrouped = {}
195
196 Object.keys(requests).forEach(function (toPodId) {
197 requests[toPodId].forEach(function (data) {
198 const request = data.request
199 const pod = data.pod
200 const hashKey = toPodId + request.endpoint
201
202 if (!requestsToMakeGrouped[hashKey]) {
203 requestsToMakeGrouped[hashKey] = {
204 toPod: pod,
205 endpoint: request.endpoint,
206 ids: [], // request ids, to delete them from the DB in the future
207 datas: [] // requests data,
208 }
209 }
210
211 requestsToMakeGrouped[hashKey].ids.push(request.id)
212 requestsToMakeGrouped[hashKey].datas.push(request.request)
213 })
214 })
215
216 return requestsToMakeGrouped
217}
218
219// Remove pods with a score of 0 (too many requests where they were unreachable) 63// Remove pods with a score of 0 (too many requests where they were unreachable)
220function removeBadPods () { 64function removeBadPods () {
221 const self = this 65 const self = this
@@ -307,25 +151,6 @@ function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
307 }) 151 })
308} 152}
309 153
310function groupAndTruncateRequests (requests, limitRequestsPerPod) {
311 const requestsGrouped = {}
312
313 requests.forEach(function (request) {
314 request.Pods.forEach(function (pod) {
315 if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
316
317 if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
318 requestsGrouped[pod.id].push({
319 request,
320 pod
321 })
322 }
323 })
324 })
325
326 return requestsGrouped
327}
328
329function removeAll (callback) { 154function removeAll (callback) {
330 // Delete all requests 155 // Delete all requests
331 this.truncate({ cascade: true }).asCallback(callback) 156 this.truncate({ cascade: true }).asCallback(callback)
@@ -346,3 +171,24 @@ function removeWithEmptyTo (callback) {
346 171
347 this.destroy(query).asCallback(callback) 172 this.destroy(query).asCallback(callback)
348} 173}
174
175// ---------------------------------------------------------------------------
176
177function groupAndTruncateRequests (requests, limitRequestsPerPod) {
178 const requestsGrouped = {}
179
180 requests.forEach(function (request) {
181 request.Pods.forEach(function (pod) {
182 if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
183
184 if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
185 requestsGrouped[pod.id].push({
186 request,
187 pod
188 })
189 }
190 })
191 })
192
193 return requestsGrouped
194}