aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-02-18 10:29:36 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-02-18 10:29:36 +0100
commitc1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de (patch)
tree5d721432dcc25f43ebd72b12f675d4e208faf616 /server/lib
parent361b7df2a257d1c44ec1d79128a9065b563090d8 (diff)
downloadPeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.gz
PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.zst
PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.zip
Server: use a request scheduler object instance for friends
communication
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/friends.js59
-rw-r--r--server/lib/request-scheduler.js202
2 files changed, 217 insertions, 44 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js
index 9b38693c7..7dfa62a2a 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -11,10 +11,13 @@ const db = require('../initializers/database')
11const logger = require('../helpers/logger') 11const logger = require('../helpers/logger')
12const peertubeCrypto = require('../helpers/peertube-crypto') 12const peertubeCrypto = require('../helpers/peertube-crypto')
13const requests = require('../helpers/requests') 13const requests = require('../helpers/requests')
14const RequestScheduler = require('./request-scheduler')
14 15
15const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] 16const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
17const requestScheduler = new RequestScheduler('')
16 18
17const friends = { 19const friends = {
20 activate,
18 addVideoToFriends, 21 addVideoToFriends,
19 updateVideoToFriends, 22 updateVideoToFriends,
20 reportAbuseVideoToFriend, 23 reportAbuseVideoToFriend,
@@ -25,6 +28,10 @@ const friends = {
25 sendOwnedVideosToPod 28 sendOwnedVideosToPod
26} 29}
27 30
31function activate () {
32 requestScheduler.activate()
33}
34
28function addVideoToFriends (videoData, transaction, callback) { 35function addVideoToFriends (videoData, transaction, callback) {
29 const options = { 36 const options = {
30 type: ENDPOINT_ACTIONS.ADD, 37 type: ENDPOINT_ACTIONS.ADD,
@@ -99,11 +106,11 @@ function makeFriends (hosts, callback) {
99 106
100function quitFriends (callback) { 107function quitFriends (callback) {
101 // Stop pool requests 108 // Stop pool requests
102 db.Request.deactivate() 109 requestScheduler.deactivate()
103 110
104 waterfall([ 111 waterfall([
105 function flushRequests (callbackAsync) { 112 function flushRequests (callbackAsync) {
106 db.Request.flush(callbackAsync) 113 requestScheduler.flush(callbackAsync)
107 }, 114 },
108 115
109 function getPodsList (callbackAsync) { 116 function getPodsList (callbackAsync) {
@@ -140,7 +147,7 @@ function quitFriends (callback) {
140 } 147 }
141 ], function (err) { 148 ], function (err) {
142 // Don't forget to re activate the scheduler, even if there was an error 149 // Don't forget to re activate the scheduler, even if there was an error
143 db.Request.activate() 150 requestScheduler.activate()
144 151
145 if (err) return callback(err) 152 if (err) return callback(err)
146 153
@@ -235,9 +242,9 @@ function getForeignPodsList (host, callback) {
235 242
236function makeRequestsToWinningPods (cert, podsList, callback) { 243function makeRequestsToWinningPods (cert, podsList, callback) {
237 // Stop pool requests 244 // Stop pool requests
238 db.Request.deactivate() 245 requestScheduler.deactivate()
239 // Flush pool requests 246 // Flush pool requests
240 db.Request.forceSend() 247 requestScheduler.forceSend()
241 248
242 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 249 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
243 const params = { 250 const params = {
@@ -278,7 +285,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
278 }, function endRequests () { 285 }, function endRequests () {
279 // Final callback, we've ended all the requests 286 // Final callback, we've ended all the requests
280 // Now we made new friends, we can re activate the pool of requests 287 // Now we made new friends, we can re activate the pool of requests
281 db.Request.activate() 288 requestScheduler.activate()
282 289
283 logger.debug('makeRequestsToWinningPods finished.') 290 logger.debug('makeRequestsToWinningPods finished.')
284 return callback() 291 return callback()
@@ -289,7 +296,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
289// { type, endpoint, data, toIds, transaction } 296// { type, endpoint, data, toIds, transaction }
290function createRequest (options, callback) { 297function createRequest (options, callback) {
291 if (!callback) callback = function () {} 298 if (!callback) callback = function () {}
292 if (options.toIds) return _createRequest(options, callback) 299 if (options.toIds) return requestScheduler.createRequest(options, callback)
293 300
294 // If the "toIds" pods is not specified, we send the request to all our friends 301 // If the "toIds" pods is not specified, we send the request to all our friends
295 db.Pod.listAllIds(options.transaction, function (err, podIds) { 302 db.Pod.listAllIds(options.transaction, function (err, podIds) {
@@ -299,43 +306,7 @@ function createRequest (options, callback) {
299 } 306 }
300 307
301 const newOptions = Object.assign(options, { toIds: podIds }) 308 const newOptions = Object.assign(options, { toIds: podIds })
302 return _createRequest(newOptions, callback) 309 return requestScheduler.createRequest(newOptions, callback)
303 })
304}
305
306// { type, endpoint, data, toIds, transaction }
307function _createRequest (options, callback) {
308 const type = options.type
309 const endpoint = options.endpoint
310 const data = options.data
311 const toIds = options.toIds
312 const transaction = options.transaction
313
314 const pods = []
315
316 // If there are no destination pods abort
317 if (toIds.length === 0) return callback(null)
318
319 toIds.forEach(function (toPod) {
320 pods.push(db.Pod.build({ id: toPod }))
321 })
322
323 const createQuery = {
324 endpoint,
325 request: {
326 type: type,
327 data: data
328 }
329 }
330
331 const dbRequestOptions = {
332 transaction
333 }
334
335 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
336 if (err) return callback(err)
337
338 return request.setPods(pods, dbRequestOptions).asCallback(callback)
339 }) 310 })
340} 311}
341 312
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js
new file mode 100644
index 000000000..c8bc4af28
--- /dev/null
+++ b/server/lib/request-scheduler.js
@@ -0,0 +1,202 @@
1'use strict'
2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../initializers/constants')
6const db = require('../initializers/database')
7const logger = require('../helpers/logger')
8const requests = require('../helpers/requests')
9
10module.exports = class RequestScheduler {
11
12 constructor (name) {
13 this.name = name
14
15 this.lastRequestTimestamp = 0
16 this.timer = null
17 }
18
19 activate () {
20 logger.info('Requests scheduler activated.')
21 this.lastRequestTimestamp = Date.now()
22
23 this.timer = setInterval(() => {
24 this.lastRequestTimestamp = Date.now()
25 this.makeRequests()
26 }, constants.REQUESTS_INTERVAL)
27 }
28
29 deactivate () {
30 logger.info('Requests scheduler deactivated.')
31 clearInterval(this.timer)
32 this.timer = null
33 }
34
35 forceSend () {
36 logger.info('Force requests scheduler sending.')
37 this.makeRequests()
38 }
39
40 remainingMilliSeconds () {
41 if (this.timer === null) return -1
42
43 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
44 }
45
46 // { type, endpoint, data, toIds, transaction }
47 createRequest (options, callback) {
48 const type = options.type
49 const endpoint = options.endpoint
50 const data = options.data
51 const toIds = options.toIds
52 const transaction = options.transaction
53
54 const pods = []
55
56 // If there are no destination pods abort
57 if (toIds.length === 0) return callback(null)
58
59 toIds.forEach(toPod => {
60 pods.push(db.Pod.build({ id: toPod }))
61 })
62
63 const createQuery = {
64 endpoint,
65 request: {
66 type: type,
67 data: data
68 }
69 }
70
71 const dbRequestOptions = {
72 transaction
73 }
74
75 return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => {
76 if (err) return callback(err)
77
78 return request.setPods(pods, dbRequestOptions).asCallback(callback)
79 })
80 }
81
82 // ---------------------------------------------------------------------------
83
84 // Make all the requests of the scheduler
85 makeRequests () {
86 // We limit the size of the requests
87 // We don't want to stuck with the same failing requests so we get a random list
88 db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
89 if (err) {
90 logger.error('Cannot get the list of requests.', { err: err })
91 return // Abort
92 }
93
94 // If there are no requests, abort
95 if (requests.length === 0) {
96 logger.info('No requests to make.')
97 return
98 }
99
100 // We want to group requests by destinations pod and endpoint
101 const requestsToMakeGrouped = this.buildRequestObjects(requests)
102
103 logger.info('Making requests to friends.')
104
105 const goodPods = []
106 const badPods = []
107
108 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
109 const requestToMake = requestsToMakeGrouped[hashKey]
110 const toPod = requestToMake.toPod
111
112 // Maybe the pod is not our friend anymore so simply remove it
113 if (!toPod) {
114 const requestIdsToDelete = requestToMake.ids
115
116 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
117 return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
118 }
119
120 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
121 if (success === false) {
122 badPods.push(requestToMake.toPod.id)
123 return callbackEach()
124 }
125
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
127 goodPods.push(requestToMake.toPod.id)
128
129 // Remove the pod id of these request ids
130 db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
131 })
132 }, () => {
133 // All the requests were made, we update the pods score
134 db.Request.updatePodsScore(goodPods, badPods)
135 // Flush requests with no pod
136 db.Request.removeWithEmptyTo(err => {
137 if (err) logger.error('Error when removing requests with no pods.', { error: err })
138 })
139 })
140 })
141 }
142
143 // Make a requests to friends of a certain type
144 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
145 if (!callback) callback = function () {}
146
147 const params = {
148 toPod: toPod,
149 sign: true, // Prove our identity
150 method: 'POST',
151 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
152 data: requestsToMake // Requests we need to make
153 }
154
155 // Make multiple retry requests to all of pods
156 // The function fire some useful callbacks
157 requests.makeSecureRequest(params, (err, res) => {
158 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
159 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
160 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
161
162 return callback(false)
163 }
164
165 return callback(true)
166 })
167 }
168
169 buildRequestObjects (requests) {
170 const requestsToMakeGrouped = {}
171
172 Object.keys(requests).forEach(toPodId => {
173 requests[toPodId].forEach(data => {
174 const request = data.request
175 const pod = data.pod
176 const hashKey = toPodId + request.endpoint
177
178 if (!requestsToMakeGrouped[hashKey]) {
179 requestsToMakeGrouped[hashKey] = {
180 toPod: pod,
181 endpoint: request.endpoint,
182 ids: [], // request ids, to delete them from the DB in the future
183 datas: [] // requests data,
184 }
185 }
186
187 requestsToMakeGrouped[hashKey].ids.push(request.id)
188 requestsToMakeGrouped[hashKey].datas.push(request.request)
189 })
190 })
191
192 return requestsToMakeGrouped
193 }
194
195 flush (callback) {
196 db.Request.removeAll(err => {
197 if (err) logger.error('Cannot flush the requests.', { error: err })
198
199 return callback(err)
200 })
201 }
202}