aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/requestsScheduler.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-06-06 15:28:33 +0200
committerChocobozzz <florian.bigard@gmail.com>2016-06-06 15:28:33 +0200
commit8c255eb53c8f47bd64778d6fbcb93b248ee14163 (patch)
tree88950578fb8fed21433752996f76b598b068483c /server/lib/requestsScheduler.js
parent8d199cb823e1e47de87fdf421964e260b0ba6eb7 (diff)
downloadPeerTube-8c255eb53c8f47bd64778d6fbcb93b248ee14163.tar.gz
PeerTube-8c255eb53c8f47bd64778d6fbcb93b248ee14163.tar.zst
PeerTube-8c255eb53c8f47bd64778d6fbcb93b248ee14163.zip
Little refractoring of requests scheduler module
Diffstat (limited to 'server/lib/requestsScheduler.js')
-rw-r--r--server/lib/requestsScheduler.js87
1 files changed, 46 insertions, 41 deletions
diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js
index 3d04b8cc8..3c1df5d5c 100644
--- a/server/lib/requestsScheduler.js
+++ b/server/lib/requestsScheduler.js
@@ -11,6 +11,7 @@ const requests = require('../helpers/requests')
11const videos = require('../lib/videos') 11const videos = require('../lib/videos')
12const Videos = require('../models/videos') 12const Videos = require('../models/videos')
13 13
14const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
14let timer = null 15let timer = null
15 16
16const requestsScheduler = { 17const requestsScheduler = {
@@ -25,15 +26,17 @@ function activate () {
25 timer = setInterval(makeRequests, constants.INTERVAL) 26 timer = setInterval(makeRequests, constants.INTERVAL)
26} 27}
27 28
29// Add request to the scheduler
28function addRequest (id, type, request) { 30function addRequest (id, type, request) {
29 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) 31 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
30 32
31 Requests.findById(id, function (err, entity) { 33 Requests.findById(id, function (err, entity) {
32 if (err) { 34 if (err) {
33 logger.error('Cannot find one request.', { error: err }) 35 logger.error('Error when trying to find a request.', { error: err })
34 return // Abort 36 return // Abort
35 } 37 }
36 38
39 // If there were already a request with this id in the scheduler...
37 if (entity) { 40 if (entity) {
38 if (entity.type === type) { 41 if (entity.type === type) {
39 logger.error('Cannot insert two same requests.') 42 logger.error('Cannot insert two same requests.')
@@ -72,6 +75,7 @@ module.exports = requestsScheduler
72 75
73// --------------------------------------------------------------------------- 76// ---------------------------------------------------------------------------
74 77
78// Make a requests to friends of a certain type
75function makeRequest (type, requestsToMake, callback) { 79function makeRequest (type, requestsToMake, callback) {
76 if (!callback) callback = function () {} 80 if (!callback) callback = function () {}
77 81
@@ -79,17 +83,16 @@ function makeRequest (type, requestsToMake, callback) {
79 if (err) return callback(err) 83 if (err) return callback(err)
80 84
81 const params = { 85 const params = {
82 encrypt: true, 86 encrypt: true, // Security
83 sign: true, 87 sign: true, // To prove our identity
84 method: 'POST', 88 method: 'POST',
85 path: null, 89 path: null, // We build the path later
86 data: requestsToMake 90 data: requestsToMake // Requests we need to make
87 } 91 }
88 92
89 if (type === 'add') { 93 // If this is a valid type, we build the path
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' 94 if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) {
91 } else if (type === 'remove') { 95 params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
93 } else { 96 } else {
94 return callback(new Error('Unkown pool request type.')) 97 return callback(new Error('Unkown pool request type.'))
95 } 98 }
@@ -97,13 +100,17 @@ function makeRequest (type, requestsToMake, callback) {
97 const badPods = [] 100 const badPods = []
98 const goodPods = [] 101 const goodPods = []
99 102
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) 105 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101 106
102 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { 107 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
108 // We failed the request, add the pod unreachable to the bad pods list
103 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { 109 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
104 badPods.push(pod._id) 110 badPods.push(pod._id)
105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) 111 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
106 } else { 112 } else {
113 // Request success
107 goodPods.push(pod._id) 114 goodPods.push(pod._id)
108 } 115 }
109 116
@@ -113,41 +120,43 @@ function makeRequest (type, requestsToMake, callback) {
113 function callbackAllPodsFinished (err) { 120 function callbackAllPodsFinished (err) {
114 if (err) return callback(err) 121 if (err) return callback(err)
115 122
123 // All the requests were made, we update the pods score
116 updatePodsScore(goodPods, badPods) 124 updatePodsScore(goodPods, badPods)
117 callback(null) 125 callback(null)
118 } 126 }
119 }) 127 })
120} 128}
121 129
130// Make all the requests of the scheduler
122function makeRequests () { 131function makeRequests () {
123 logger.info('Making requests to friends.')
124
125 Requests.list(function (err, requests) { 132 Requests.list(function (err, requests) {
126 if (err) { 133 if (err) {
127 logger.error('Cannot get the list of requests.', { err: err }) 134 logger.error('Cannot get the list of requests.', { err: err })
128 return // Abort 135 return // Abort
129 } 136 }
130 137
131 if (requests.length === 0) return 138 // If there are no requests, abort
139 if (requests.length === 0) {
140 logger.info('No requests to make.')
141 return
142 }
132 143
133 const requestsToMake = { 144 logger.info('Making requests to friends.')
134 add: { 145
135 ids: [], 146 const requestsToMake = {}
136 requests: [] 147 for (const type of REQUEST_SCHEDULER_TYPE) {
137 }, 148 requestsToMake[type] = {
138 remove: {
139 ids: [], 149 ids: [],
140 requests: [] 150 requests: []
141 } 151 }
142 } 152 }
143 153
154 // For each requests to make, we add it to the correct request type
144 async.each(requests, function (poolRequest, callbackEach) { 155 async.each(requests, function (poolRequest, callbackEach) {
145 if (poolRequest.type === 'add') { 156 if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) {
146 requestsToMake.add.requests.push(poolRequest.request) 157 const requestTypeToMake = requestsToMake[poolRequest.type]
147 requestsToMake.add.ids.push(poolRequest._id) 158 requestTypeToMake.requests.push(poolRequest.request)
148 } else if (poolRequest.type === 'remove') { 159 requestTypeToMake.ids.push(poolRequest._id)
149 requestsToMake.remove.requests.push(poolRequest.request)
150 requestsToMake.remove.ids.push(poolRequest._id)
151 } else { 160 } else {
152 logger.error('Unkown request type.', { request_type: poolRequest.type }) 161 logger.error('Unkown request type.', { request_type: poolRequest.type })
153 return // abort 162 return // abort
@@ -155,27 +164,23 @@ function makeRequests () {
155 164
156 callbackEach() 165 callbackEach()
157 }, function () { 166 }, function () {
158 // Send the add requests 167 for (let type of Object.keys(requestsToMake)) {
159 if (requestsToMake.add.requests.length !== 0) { 168 const requestTypeToMake = requestsToMake[type]
160 makeRequest('add', requestsToMake.add.requests, function (err) { 169 // If there are requests for this type
161 if (err) logger.error('Errors when sent add requests.', { error: err }) 170 if (requestTypeToMake.requests.length !== 0) {
162 171 makeRequest(type, requestTypeToMake.requests, function (err) {
163 Requests.removeRequests(requestsToMake.add.ids) 172 if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
164 }) 173
165 } 174 // We made the requests, so we can remove them from the scheduler
166 175 Requests.removeRequests(requestTypeToMake.ids)
167 // Send the remove requests 176 })
168 if (requestsToMake.remove.requests.length !== 0) { 177 }
169 makeRequest('remove', requestsToMake.remove.requests, function (err) {
170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
171
172 Requests.removeRequests(requestsToMake.remove.ids)
173 })
174 } 178 }
175 }) 179 })
176 }) 180 })
177} 181}
178 182
183// Remove pods with a score of 0 (too many requests where they were unreachable)
179function removeBadPods () { 184function removeBadPods () {
180 async.waterfall([ 185 async.waterfall([
181 function findBadPods (callback) { 186 function findBadPods (callback) {
@@ -243,7 +248,7 @@ function updatePodsScore (goodPods, badPods) {
243 }) 248 })
244 249
245 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { 250 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
246 if (err) logger.error('Cannot increment scores of bad pods.') 251 if (err) logger.error('Cannot decrement scores of bad pods.')
247 removeBadPods() 252 removeBadPods()
248 }) 253 })
249} 254}