const requestsScheduler = {
activate: activate,
addRequest: addRequest,
+ addRequestTo: addRequestTo,
deactivate: deactivate,
+ flush: flush,
forceSend: forceSend
}
timer = setInterval(makeRequests, constants.INTERVAL)
}
-function addRequest (id, type, request) {
- logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
+// Add request to the scheduler
+function addRequest (type, data) {
+ logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
- Requests.findById(id, function (err, entity) {
+ const request = {
+ type: type,
+ data: data
+ }
+
+ Pods.listAllIds(function (err, podIds) {
if (err) {
- logger.error('Cannot find one request.', { error: err })
- return // Abort
+ logger.debug('Cannot list pod ids.')
+ return
}
- if (entity) {
- if (entity.type === type) {
- logger.error('Cannot insert two same requests.')
- return // Abort
- }
+ // No friends
+ if (!podIds) return
- // Remove the request of the other type
- Requests.removeRequestById(id, function (err) {
- if (err) {
- logger.error('Cannot remove a request.', { error: err })
- return // Abort
- }
- })
- } else {
- Requests.create(id, type, request, function (err) {
- if (err) logger.error('Cannot create a request.', { error: err })
- return // Abort
- })
- }
+ Requests.create(request, podIds, function (err) {
+ if (err) logger.error('Cannot create a request.', { error: err })
+ })
+ })
+}
+
+function addRequestTo (podIds, type, data) {
+ const request = {
+ type: type,
+ data: data
+ }
+
+ Requests.create(request, podIds, function (err) {
+ if (err) logger.error('Cannot create a request.', { error: err })
})
}
clearInterval(timer)
}
+function flush () {
+ Requests.removeAll(function (err) {
+ if (err) {
+ logger.error('Cannot flush the requests.', { error: err })
+ }
+ })
+}
+
function forceSend () {
logger.info('Force requests scheduler sending.')
makeRequests()
// ---------------------------------------------------------------------------
-function makeRequest (type, requestsToMake, callback) {
+// Make a requests to friends of a certain type
+function makeRequest (toPod, requestsToMake, callback) {
if (!callback) callback = function () {}
- Pods.list(function (err, pods) {
- if (err) return callback(err)
-
- const params = {
- encrypt: true,
- sign: true,
- method: 'POST',
- path: null,
- data: requestsToMake
- }
-
- if (type === 'add') {
- params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
- } else if (type === 'remove') {
- params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
- } else {
- return callback(new Error('Unkown pool request type.'))
- }
-
- const badPods = []
- const goodPods = []
-
- requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
-
- function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
- if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
- badPods.push(pod._id)
- logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
- } else {
- goodPods.push(pod._id)
- }
-
- return callbackEachPodFinished()
+ const params = {
+ toPod: toPod,
+ encrypt: true, // Security
+ sign: true, // To prove our identity
+ method: 'POST',
+ path: '/api/' + constants.API_VERSION + '/remote/videos',
+ data: requestsToMake // Requests we need to make
+ }
+
+ // Make multiple retry requests to all of pods
+ // The function fire some useful callbacks
+ requests.makeSecureRequest(params, function (err, res) {
+ if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
+ logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
+
+ return callback(false)
}
- function callbackAllPodsFinished (err) {
- if (err) return callback(err)
-
- updatePodsScore(goodPods, badPods)
- callback(null)
- }
+ return callback(true)
})
}
+// Make all the requests of the scheduler
function makeRequests () {
- logger.info('Making requests to friends.')
-
Requests.list(function (err, requests) {
if (err) {
logger.error('Cannot get the list of requests.', { err: err })
return // Abort
}
- if (requests.length === 0) return
-
- const requestsToMake = {
- add: {
- ids: [],
- requests: []
- },
- remove: {
- ids: [],
- requests: []
- }
+ // If there are no requests, abort
+ if (requests.length === 0) {
+ logger.info('No requests to make.')
+ return
}
- async.each(requests, function (poolRequest, callbackEach) {
- if (poolRequest.type === 'add') {
- requestsToMake.add.requests.push(poolRequest.request)
- requestsToMake.add.ids.push(poolRequest._id)
- } else if (poolRequest.type === 'remove') {
- requestsToMake.remove.requests.push(poolRequest.request)
- requestsToMake.remove.ids.push(poolRequest._id)
- } else {
- logger.error('Unkown request type.', { request_type: poolRequest.type })
- return // abort
- }
-
- callbackEach()
- }, function () {
- // Send the add requests
- if (requestsToMake.add.requests.length !== 0) {
- makeRequest('add', requestsToMake.add.requests, function (err) {
- if (err) logger.error('Errors when sent add requests.', { error: err })
+ logger.info('Making requests to friends.')
- Requests.removeRequests(requestsToMake.add.ids)
- })
- }
+ // Requests by pods id
+ const requestsToMake = {}
+
+ requests.forEach(function (poolRequest) {
+ poolRequest.to.forEach(function (toPodId) {
+ if (!requestsToMake[toPodId]) {
+ requestsToMake[toPodId] = {
+ ids: [],
+ datas: []
+ }
+ }
+
+ requestsToMake[toPodId].ids.push(poolRequest._id)
+ requestsToMake[toPodId].datas.push(poolRequest.request)
+ })
+ })
+
+ const goodPods = []
+ const badPods = []
- // Send the remove requests
- if (requestsToMake.remove.requests.length !== 0) {
- makeRequest('remove', requestsToMake.remove.requests, function (err) {
- if (err) logger.error('Errors when sent remove pool requests.', { error: err })
+ async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
+ const requestToMake = requestsToMake[toPodId]
+
+ // FIXME: mongodb request inside a loop :/
+ Pods.findById(toPodId, function (err, toPod) {
+ if (err) return logger.error('Error finding pod by id.', { err: err })
+
+ // Maybe the pod is not our friend anymore so simply remove them
+ if (!toPod) {
+ Requests.removePodOf(requestToMake.ids, toPodId)
+ return callbackEach()
+ }
- Requests.removeRequests(requestsToMake.remove.ids)
+ makeRequest(toPod, requestToMake.datas, function (success) {
+ if (err) {
+ logger.error('Errors when sent request to %s.', toPod.url, { error: err })
+ // Do not stop the process just for one error
+ return callbackEach()
+ }
+
+ if (success === true) {
+ logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
+
+ // Remove the pod id of these request ids
+ Requests.removePodOf(requestToMake.ids, toPodId)
+ goodPods.push(toPodId)
+ } else {
+ badPods.push(toPodId)
+ }
+
+ callbackEach()
})
- }
+ })
+ }, function () {
+ // All the requests were made, we update the pods score
+ updatePodsScore(goodPods, badPods)
+ // Flush requests with no pod
+ Requests.removeWithEmptyTo()
})
})
}
+// Remove pods with a score of 0 (too many requests where they were unreachable)
function removeBadPods () {
async.waterfall([
function findBadPods (callback) {
})
Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
- if (err) logger.error('Cannot increment scores of bad pods.')
+ if (err) logger.error('Cannot decrement scores of bad pods.')
removeBadPods()
})
}