diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2016-06-06 15:28:33 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2016-06-06 15:28:33 +0200 |
commit | 8c255eb53c8f47bd64778d6fbcb93b248ee14163 (patch) | |
tree | 88950578fb8fed21433752996f76b598b068483c /server/lib/requestsScheduler.js | |
parent | 8d199cb823e1e47de87fdf421964e260b0ba6eb7 (diff) | |
download | PeerTube-8c255eb53c8f47bd64778d6fbcb93b248ee14163.tar.gz PeerTube-8c255eb53c8f47bd64778d6fbcb93b248ee14163.tar.zst PeerTube-8c255eb53c8f47bd64778d6fbcb93b248ee14163.zip |
Little refractoring of requests scheduler module
Diffstat (limited to 'server/lib/requestsScheduler.js')
-rw-r--r-- | server/lib/requestsScheduler.js | 87 |
1 files changed, 46 insertions, 41 deletions
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 3d04b8cc8..3c1df5d5c 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js | |||
@@ -11,6 +11,7 @@ const requests = require('../helpers/requests') | |||
11 | const videos = require('../lib/videos') | 11 | const videos = require('../lib/videos') |
12 | const Videos = require('../models/videos') | 12 | const Videos = require('../models/videos') |
13 | 13 | ||
14 | const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE | ||
14 | let timer = null | 15 | let timer = null |
15 | 16 | ||
16 | const requestsScheduler = { | 17 | const requestsScheduler = { |
@@ -25,15 +26,17 @@ function activate () { | |||
25 | timer = setInterval(makeRequests, constants.INTERVAL) | 26 | timer = setInterval(makeRequests, constants.INTERVAL) |
26 | } | 27 | } |
27 | 28 | ||
29 | // Add request to the scheduler | ||
28 | function addRequest (id, type, request) { | 30 | function addRequest (id, type, request) { |
29 | logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) | 31 | logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) |
30 | 32 | ||
31 | Requests.findById(id, function (err, entity) { | 33 | Requests.findById(id, function (err, entity) { |
32 | if (err) { | 34 | if (err) { |
33 | logger.error('Cannot find one request.', { error: err }) | 35 | logger.error('Error when trying to find a request.', { error: err }) |
34 | return // Abort | 36 | return // Abort |
35 | } | 37 | } |
36 | 38 | ||
39 | // If there were already a request with this id in the scheduler... | ||
37 | if (entity) { | 40 | if (entity) { |
38 | if (entity.type === type) { | 41 | if (entity.type === type) { |
39 | logger.error('Cannot insert two same requests.') | 42 | logger.error('Cannot insert two same requests.') |
@@ -72,6 +75,7 @@ module.exports = requestsScheduler | |||
72 | 75 | ||
73 | // --------------------------------------------------------------------------- | 76 | // --------------------------------------------------------------------------- |
74 | 77 | ||
78 | // Make a requests to friends of a certain type | ||
75 | function makeRequest (type, requestsToMake, callback) { | 79 | function makeRequest (type, requestsToMake, callback) { |
76 | if (!callback) callback = function () {} | 80 | if (!callback) callback = function () {} |
77 | 81 | ||
@@ -79,17 +83,16 @@ function makeRequest (type, requestsToMake, callback) { | |||
79 | if (err) return callback(err) | 83 | if (err) return callback(err) |
80 | 84 | ||
81 | const params = { | 85 | const params = { |
82 | encrypt: true, | 86 | encrypt: true, // Security |
83 | sign: true, | 87 | sign: true, // To prove our identity |
84 | method: 'POST', | 88 | method: 'POST', |
85 | path: null, | 89 | path: null, // We build the path later |
86 | data: requestsToMake | 90 | data: requestsToMake // Requests we need to make |
87 | } | 91 | } |
88 | 92 | ||
89 | if (type === 'add') { | 93 | // If this is a valid type, we build the path |
90 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' | 94 | if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { |
91 | } else if (type === 'remove') { | 95 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type |
92 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' | ||
93 | } else { | 96 | } else { |
94 | return callback(new Error('Unkown pool request type.')) | 97 | return callback(new Error('Unkown pool request type.')) |
95 | } | 98 | } |
@@ -97,13 +100,17 @@ function makeRequest (type, requestsToMake, callback) { | |||
97 | const badPods = [] | 100 | const badPods = [] |
98 | const goodPods = [] | 101 | const goodPods = [] |
99 | 102 | ||
103 | // Make multiple retry requests to all of pods | ||
104 | // The function fire some useful callbacks | ||
100 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | 105 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) |
101 | 106 | ||
102 | function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { | 107 | function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { |
108 | // We failed the request, add the pod unreachable to the bad pods list | ||
103 | if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { | 109 | if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { |
104 | badPods.push(pod._id) | 110 | badPods.push(pod._id) |
105 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) | 111 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) |
106 | } else { | 112 | } else { |
113 | // Request success | ||
107 | goodPods.push(pod._id) | 114 | goodPods.push(pod._id) |
108 | } | 115 | } |
109 | 116 | ||
@@ -113,41 +120,43 @@ function makeRequest (type, requestsToMake, callback) { | |||
113 | function callbackAllPodsFinished (err) { | 120 | function callbackAllPodsFinished (err) { |
114 | if (err) return callback(err) | 121 | if (err) return callback(err) |
115 | 122 | ||
123 | // All the requests were made, we update the pods score | ||
116 | updatePodsScore(goodPods, badPods) | 124 | updatePodsScore(goodPods, badPods) |
117 | callback(null) | 125 | callback(null) |
118 | } | 126 | } |
119 | }) | 127 | }) |
120 | } | 128 | } |
121 | 129 | ||
130 | // Make all the requests of the scheduler | ||
122 | function makeRequests () { | 131 | function makeRequests () { |
123 | logger.info('Making requests to friends.') | ||
124 | |||
125 | Requests.list(function (err, requests) { | 132 | Requests.list(function (err, requests) { |
126 | if (err) { | 133 | if (err) { |
127 | logger.error('Cannot get the list of requests.', { err: err }) | 134 | logger.error('Cannot get the list of requests.', { err: err }) |
128 | return // Abort | 135 | return // Abort |
129 | } | 136 | } |
130 | 137 | ||
131 | if (requests.length === 0) return | 138 | // If there are no requests, abort |
139 | if (requests.length === 0) { | ||
140 | logger.info('No requests to make.') | ||
141 | return | ||
142 | } | ||
132 | 143 | ||
133 | const requestsToMake = { | 144 | logger.info('Making requests to friends.') |
134 | add: { | 145 | |
135 | ids: [], | 146 | const requestsToMake = {} |
136 | requests: [] | 147 | for (const type of REQUEST_SCHEDULER_TYPE) { |
137 | }, | 148 | requestsToMake[type] = { |
138 | remove: { | ||
139 | ids: [], | 149 | ids: [], |
140 | requests: [] | 150 | requests: [] |
141 | } | 151 | } |
142 | } | 152 | } |
143 | 153 | ||
154 | // For each requests to make, we add it to the correct request type | ||
144 | async.each(requests, function (poolRequest, callbackEach) { | 155 | async.each(requests, function (poolRequest, callbackEach) { |
145 | if (poolRequest.type === 'add') { | 156 | if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { |
146 | requestsToMake.add.requests.push(poolRequest.request) | 157 | const requestTypeToMake = requestsToMake[poolRequest.type] |
147 | requestsToMake.add.ids.push(poolRequest._id) | 158 | requestTypeToMake.requests.push(poolRequest.request) |
148 | } else if (poolRequest.type === 'remove') { | 159 | requestTypeToMake.ids.push(poolRequest._id) |
149 | requestsToMake.remove.requests.push(poolRequest.request) | ||
150 | requestsToMake.remove.ids.push(poolRequest._id) | ||
151 | } else { | 160 | } else { |
152 | logger.error('Unkown request type.', { request_type: poolRequest.type }) | 161 | logger.error('Unkown request type.', { request_type: poolRequest.type }) |
153 | return // abort | 162 | return // abort |
@@ -155,27 +164,23 @@ function makeRequests () { | |||
155 | 164 | ||
156 | callbackEach() | 165 | callbackEach() |
157 | }, function () { | 166 | }, function () { |
158 | // Send the add requests | 167 | for (let type of Object.keys(requestsToMake)) { |
159 | if (requestsToMake.add.requests.length !== 0) { | 168 | const requestTypeToMake = requestsToMake[type] |
160 | makeRequest('add', requestsToMake.add.requests, function (err) { | 169 | // If there are requests for this type |
161 | if (err) logger.error('Errors when sent add requests.', { error: err }) | 170 | if (requestTypeToMake.requests.length !== 0) { |
162 | 171 | makeRequest(type, requestTypeToMake.requests, function (err) { | |
163 | Requests.removeRequests(requestsToMake.add.ids) | 172 | if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err }) |
164 | }) | 173 | |
165 | } | 174 | // We made the requests, so we can remove them from the scheduler |
166 | 175 | Requests.removeRequests(requestTypeToMake.ids) | |
167 | // Send the remove requests | 176 | }) |
168 | if (requestsToMake.remove.requests.length !== 0) { | 177 | } |
169 | makeRequest('remove', requestsToMake.remove.requests, function (err) { | ||
170 | if (err) logger.error('Errors when sent remove pool requests.', { error: err }) | ||
171 | |||
172 | Requests.removeRequests(requestsToMake.remove.ids) | ||
173 | }) | ||
174 | } | 178 | } |
175 | }) | 179 | }) |
176 | }) | 180 | }) |
177 | } | 181 | } |
178 | 182 | ||
183 | // Remove pods with a score of 0 (too many requests where they were unreachable) | ||
179 | function removeBadPods () { | 184 | function removeBadPods () { |
180 | async.waterfall([ | 185 | async.waterfall([ |
181 | function findBadPods (callback) { | 186 | function findBadPods (callback) { |
@@ -243,7 +248,7 @@ function updatePodsScore (goodPods, badPods) { | |||
243 | }) | 248 | }) |
244 | 249 | ||
245 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | 250 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { |
246 | if (err) logger.error('Cannot increment scores of bad pods.') | 251 | if (err) logger.error('Cannot decrement scores of bad pods.') |
247 | removeBadPods() | 252 | removeBadPods() |
248 | }) | 253 | }) |
249 | } | 254 | } |