const videos = require('../lib/videos')
const Videos = require('../models/videos')
+const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
let timer = null
const requestsScheduler = {
timer = setInterval(makeRequests, constants.INTERVAL)
}
+// Add request to the scheduler
function addRequest (id, type, request) {
logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
Requests.findById(id, function (err, entity) {
if (err) {
- logger.error('Cannot find one request.', { error: err })
+ logger.error('Error when trying to find a request.', { error: err })
return // Abort
}
+ // If there were already a request with this id in the scheduler...
if (entity) {
if (entity.type === type) {
logger.error('Cannot insert two same requests.')
// ---------------------------------------------------------------------------
+// Make a requests to friends of a certain type
function makeRequest (type, requestsToMake, callback) {
if (!callback) callback = function () {}
if (err) return callback(err)
const params = {
- encrypt: true,
- sign: true,
+ encrypt: true, // Security
+ sign: true, // To prove our identity
method: 'POST',
- path: null,
- data: requestsToMake
+ path: null, // We build the path later
+ data: requestsToMake // Requests we need to make
}
- if (type === 'add') {
- params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
- } else if (type === 'remove') {
- params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
+ // If this is a valid type, we build the path
+ if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) {
+ params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type
} else {
return callback(new Error('Unkown pool request type.'))
}
const badPods = []
const goodPods = []
+ // Make multiple retry requests to all of pods
+ // The function fire some useful callbacks
requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
+ // We failed the request, add the pod unreachable to the bad pods list
if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
badPods.push(pod._id)
logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
} else {
+ // Request success
goodPods.push(pod._id)
}
function callbackAllPodsFinished (err) {
if (err) return callback(err)
+ // All the requests were made, we update the pods score
updatePodsScore(goodPods, badPods)
callback(null)
}
})
}
+// Make all the requests of the scheduler
function makeRequests () {
- logger.info('Making requests to friends.')
-
Requests.list(function (err, requests) {
if (err) {
logger.error('Cannot get the list of requests.', { err: err })
return // Abort
}
- if (requests.length === 0) return
+ // If there are no requests, abort
+ if (requests.length === 0) {
+ logger.info('No requests to make.')
+ return
+ }
- const requestsToMake = {
- add: {
- ids: [],
- requests: []
- },
- remove: {
+ logger.info('Making requests to friends.')
+
+ const requestsToMake = {}
+ for (const type of REQUEST_SCHEDULER_TYPE) {
+ requestsToMake[type] = {
ids: [],
requests: []
}
}
+ // For each requests to make, we add it to the correct request type
async.each(requests, function (poolRequest, callbackEach) {
- if (poolRequest.type === 'add') {
- requestsToMake.add.requests.push(poolRequest.request)
- requestsToMake.add.ids.push(poolRequest._id)
- } else if (poolRequest.type === 'remove') {
- requestsToMake.remove.requests.push(poolRequest.request)
- requestsToMake.remove.ids.push(poolRequest._id)
+ if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) {
+ const requestTypeToMake = requestsToMake[poolRequest.type]
+ requestTypeToMake.requests.push(poolRequest.request)
+ requestTypeToMake.ids.push(poolRequest._id)
} else {
logger.error('Unkown request type.', { request_type: poolRequest.type })
return // abort
callbackEach()
}, function () {
- // Send the add requests
- if (requestsToMake.add.requests.length !== 0) {
- makeRequest('add', requestsToMake.add.requests, function (err) {
- if (err) logger.error('Errors when sent add requests.', { error: err })
-
- Requests.removeRequests(requestsToMake.add.ids)
- })
- }
-
- // Send the remove requests
- if (requestsToMake.remove.requests.length !== 0) {
- makeRequest('remove', requestsToMake.remove.requests, function (err) {
- if (err) logger.error('Errors when sent remove pool requests.', { error: err })
-
- Requests.removeRequests(requestsToMake.remove.ids)
- })
+ for (let type of Object.keys(requestsToMake)) {
+ const requestTypeToMake = requestsToMake[type]
+ // If there are requests for this type
+ if (requestTypeToMake.requests.length !== 0) {
+ makeRequest(type, requestTypeToMake.requests, function (err) {
+ if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
+
+ // We made the requests, so we can remove them from the scheduler
+ Requests.removeRequests(requestTypeToMake.ids)
+ })
+ }
}
})
})
}
+// Remove pods with a score of 0 (too many requests where they were unreachable)
function removeBadPods () {
async.waterfall([
function findBadPods (callback) {
})
Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
- if (err) logger.error('Cannot increment scores of bad pods.')
+ if (err) logger.error('Cannot decrement scores of bad pods.')
removeBadPods()
})
}