aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/requestsScheduler.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-06-18 16:13:54 +0200
committerChocobozzz <florian.bigard@gmail.com>2016-06-18 16:13:54 +0200
commit528a9efa8272532bbd0dafc35c3e05e57c50f61e (patch)
tree62d4417df4ab9b2e53c44dc7271be81b88e4e0e5 /server/lib/requestsScheduler.js
parentb2e4c0ba1a33b8a50491a1f8d111468a7da5640f (diff)
downloadPeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.gz
PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.tar.zst
PeerTube-528a9efa8272532bbd0dafc35c3e05e57c50f61e.zip
Try to make a better communication (between pods) module
Diffstat (limited to 'server/lib/requestsScheduler.js')
-rw-r--r--server/lib/requestsScheduler.js200
1 files changed, 106 insertions, 94 deletions
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js
index 78570209d..ac75e5b93 100644
--- a/server/lib/requestsScheduler.js
+++ b/server/lib/requestsScheduler.js
@@ -11,13 +11,14 @@ const requests = require('../helpers/requests')
11const videos = require('../lib/videos') 11const videos = require('../lib/videos')
12const Videos = require('../models/videos') 12const Videos = require('../models/videos')
13 13
14const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
15let timer = null 14let timer = null
16 15
17const requestsScheduler = { 16const requestsScheduler = {
18 activate: activate, 17 activate: activate,
19 addRequest: addRequest, 18 addRequest: addRequest,
19 addRequestTo: addRequestTo,
20 deactivate: deactivate, 20 deactivate: deactivate,
21 flush: flush,
21 forceSend: forceSend 22 forceSend: forceSend
22} 23}
23 24
@@ -27,35 +28,37 @@ function activate () {
27} 28}
28 29
29// Add request to the scheduler 30// Add request to the scheduler
30function addRequest (id, type, request) { 31function addRequest (type, data) {
31 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) 32 logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
32 33
33 Requests.findById(id, function (err, entity) { 34 const request = {
35 type: type,
36 data: data
37 }
38
39 Pods.listAllIds(function (err, podIds) {
34 if (err) { 40 if (err) {
35 logger.error('Error when trying to find a request.', { error: err }) 41 logger.debug('Cannot list pod ids.')
36 return // Abort 42 return
37 } 43 }
38 44
39 // If there were already a request with this id in the scheduler... 45 // No friends
40 if (entity) { 46 if (!podIds) return
41 if (entity.type === type) {
42 logger.error('Cannot insert two same requests.')
43 return // Abort
44 }
45 47
46 // Remove the request of the other type 48 Requests.create(request, podIds, function (err) {
47 Requests.removeRequestById(id, function (err) { 49 if (err) logger.error('Cannot create a request.', { error: err })
48 if (err) { 50 })
49 logger.error('Cannot remove a request.', { error: err }) 51 })
50 return // Abort 52}
51 } 53
52 }) 54function addRequestTo (podIds, type, data) {
53 } else { 55 const request = {
54 Requests.create(id, type, request, function (err) { 56 type: type,
55 if (err) logger.error('Cannot create a request.', { error: err }) 57 data: data
56 return // Abort 58 }
57 }) 59
58 } 60 Requests.create(request, podIds, function (err) {
61 if (err) logger.error('Cannot create a request.', { error: err })
59 }) 62 })
60} 63}
61 64
@@ -64,6 +67,14 @@ function deactivate () {
64 clearInterval(timer) 67 clearInterval(timer)
65} 68}
66 69
70function flush () {
71 Requests.removeAll(function (err) {
72 if (err) {
73 logger.error('Cannot flush the requests.', { error: err })
74 }
75 })
76}
77
67function forceSend () { 78function forceSend () {
68 logger.info('Force requests scheduler sending.') 79 logger.info('Force requests scheduler sending.')
69 makeRequests() 80 makeRequests()
@@ -76,54 +87,28 @@ module.exports = requestsScheduler
76// --------------------------------------------------------------------------- 87// ---------------------------------------------------------------------------
77 88
78// Make a requests to friends of a certain type 89// Make a requests to friends of a certain type
79function makeRequest (type, requestsToMake, callback) { 90function makeRequest (toPod, requestsToMake, callback) {
80 if (!callback) callback = function () {} 91 if (!callback) callback = function () {}
81 92
82 Pods.list(function (err, pods) { 93 const params = {
83 if (err) return callback(err) 94 toPod: toPod,
84 95 encrypt: true, // Security
85 const params = { 96 sign: true, // To prove our identity
86 encrypt: true, // Security 97 method: 'POST',
87 sign: true, // To prove our identity 98 path: '/api/' + constants.API_VERSION + '/remote/videos',
88 method: 'POST', 99 data: requestsToMake // Requests we need to make
89 path: null, // We build the path later 100 }
90 data: requestsToMake // Requests we need to make 101
91 } 102 // Make multiple retry requests to all of pods
92 103 // The function fire some useful callbacks
93 // If this is a valid type, we build the path 104 requests.makeSecureRequest(params, function (err, res) {
94 if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { 105 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
95 params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type 106 logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
96 } else { 107
97 return callback(new Error('Unkown pool request type.')) 108 return callback(false)
98 }
99
100 const badPods = []
101 const goodPods = []
102
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
105 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
106
107 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
108 // We failed the request, add the pod unreachable to the bad pods list
109 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
110 badPods.push(pod._id)
111 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
112 } else {
113 // Request success
114 goodPods.push(pod._id)
115 }
116
117 return callbackEachPodFinished()
118 } 109 }
119 110
120 function callbackAllPodsFinished (err) { 111 return callback(true)
121 if (err) return callback(err)
122
123 // All the requests were made, we update the pods score
124 updatePodsScore(goodPods, badPods)
125 callback(null)
126 }
127 }) 112 })
128} 113}
129 114
@@ -143,38 +128,65 @@ function makeRequests () {
143 128
144 logger.info('Making requests to friends.') 129 logger.info('Making requests to friends.')
145 130
131 // Requests by pods id
146 const requestsToMake = {} 132 const requestsToMake = {}
147 for (const type of REQUEST_SCHEDULER_TYPE) {
148 requestsToMake[type] = {
149 ids: [],
150 requests: []
151 }
152 }
153 133
154 // For each requests to make, we add it to the correct request type
155 requests.forEach(function (poolRequest) { 134 requests.forEach(function (poolRequest) {
156 if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { 135 poolRequest.to.forEach(function (toPodId) {
157 const requestTypeToMake = requestsToMake[poolRequest.type] 136 if (!requestsToMake[toPodId]) {
158 requestTypeToMake.requests.push(poolRequest.request) 137 requestsToMake[toPodId] = {
159 requestTypeToMake.ids.push(poolRequest._id) 138 ids: [],
160 } else { 139 datas: []
161 logger.error('Unkown request type.', { request_type: poolRequest.type }) 140 }
162 return // abort 141 }
163 } 142
143 requestsToMake[toPodId].ids.push(poolRequest._id)
144 requestsToMake[toPodId].datas.push(poolRequest.request)
145 })
164 }) 146 })
165 147
166 for (let type of Object.keys(requestsToMake)) { 148 const goodPods = []
167 const requestTypeToMake = requestsToMake[type] 149 const badPods = []
168 // If there are requests for this type
169 if (requestTypeToMake.requests.length !== 0) {
170 makeRequest(type, requestTypeToMake.requests, function (err) {
171 if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
172 150
173 // We made the requests, so we can remove them from the scheduler 151 async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
174 Requests.removeRequests(requestTypeToMake.ids) 152 const requestToMake = requestsToMake[toPodId]
153
154 // FIXME: mongodb request inside a loop :/
155 Pods.findById(toPodId, function (err, toPod) {
156 if (err) return logger.error('Error finding pod by id.', { err: err })
157
158 // Maybe the pod is not our friend anymore so simply remove them
159 if (!toPod) {
160 Requests.removePodOf(requestToMake.ids, toPodId)
161 return callbackEach()
162 }
163
164 makeRequest(toPod, requestToMake.datas, function (success) {
165 if (err) {
166 logger.error('Errors when sent request to %s.', toPod.url, { error: err })
167 // Do not stop the process just for one error
168 return callbackEach()
169 }
170
171 if (success === true) {
172 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
173
174 // Remove the pod id of these request ids
175 Requests.removePodOf(requestToMake.ids, toPodId)
176 goodPods.push(toPodId)
177 } else {
178 badPods.push(toPodId)
179 }
180
181 callbackEach()
175 }) 182 })
176 } 183 })
177 } 184 }, function () {
185 // All the requests were made, we update the pods score
186 updatePodsScore(goodPods, badPods)
187 // Flush requests with no pod
188 Requests.removeWithEmptyTo()
189 })
178 }) 190 })
179} 191}
180 192