X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2FrequestsScheduler.js;h=ac75e5b93646983ba2e74c7d4ea54aa47445fa15;hb=528a9efa8272532bbd0dafc35c3e05e57c50f61e;hp=3d04b8cc839b847414dce3c2f9a9cd8c484a267a;hpb=e856e334a14ac8449b31db24bd5c6fb18f9a23e7;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 3d04b8cc8..ac75e5b93 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js @@ -16,7 +16,9 @@ let timer = null const requestsScheduler = { activate: activate, addRequest: addRequest, + addRequestTo: addRequestTo, deactivate: deactivate, + flush: flush, forceSend: forceSend } @@ -25,34 +27,38 @@ function activate () { timer = setInterval(makeRequests, constants.INTERVAL) } -function addRequest (id, type, request) { - logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) +// Add request to the scheduler +function addRequest (type, data) { + logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) - Requests.findById(id, function (err, entity) { + const request = { + type: type, + data: data + } + + Pods.listAllIds(function (err, podIds) { if (err) { - logger.error('Cannot find one request.', { error: err }) - return // Abort + logger.debug('Cannot list pod ids.') + return } - if (entity) { - if (entity.type === type) { - logger.error('Cannot insert two same requests.') - return // Abort - } + // No friends + if (!podIds) return - // Remove the request of the other type - Requests.removeRequestById(id, function (err) { - if (err) { - logger.error('Cannot remove a request.', { error: err }) - return // Abort - } - }) - } else { - Requests.create(id, type, request, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - return // Abort - }) - } + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) + }) + }) +} + +function addRequestTo (podIds, type, data) { + const request = { + type: type, + data: data + } + + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) }) } @@ -61,6 +67,14 @@ function deactivate () { clearInterval(timer) } +function flush () { + Requests.removeAll(function (err) { + if (err) { + logger.error('Cannot flush the requests.', { error: err }) + } + }) +} + function forceSend () { logger.info('Force requests scheduler sending.') makeRequests() @@ -72,110 +86,111 @@ module.exports = requestsScheduler // --------------------------------------------------------------------------- -function makeRequest (type, requestsToMake, callback) { +// Make a requests to friends of a certain type +function makeRequest (toPod, requestsToMake, callback) { if (!callback) callback = function () {} - Pods.list(function (err, pods) { - if (err) return callback(err) - - const params = { - encrypt: true, - sign: true, - method: 'POST', - path: null, - data: requestsToMake - } - - if (type === 'add') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' - } else if (type === 'remove') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' - } else { - return callback(new Error('Unkown pool request type.')) - } - - const badPods = [] - const goodPods = [] - - requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { - if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { - badPods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - goodPods.push(pod._id) - } - - return callbackEachPodFinished() + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) } - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - updatePodsScore(goodPods, badPods) - callback(null) - } + return callback(true) }) } +// Make all the requests of the scheduler function makeRequests () { - logger.info('Making requests to friends.') - Requests.list(function (err, requests) { if (err) { logger.error('Cannot get the list of requests.', { err: err }) return // Abort } - if (requests.length === 0) return - - const requestsToMake = { - add: { - ids: [], - requests: [] - }, - remove: { - ids: [], - requests: [] - } + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No requests to make.') + return } - async.each(requests, function (poolRequest, callbackEach) { - if (poolRequest.type === 'add') { - requestsToMake.add.requests.push(poolRequest.request) - requestsToMake.add.ids.push(poolRequest._id) - } else if (poolRequest.type === 'remove') { - requestsToMake.remove.requests.push(poolRequest.request) - requestsToMake.remove.ids.push(poolRequest._id) - } else { - logger.error('Unkown request type.', { request_type: poolRequest.type }) - return // abort - } - - callbackEach() - }, function () { - // Send the add requests - if (requestsToMake.add.requests.length !== 0) { - makeRequest('add', requestsToMake.add.requests, function (err) { - if (err) logger.error('Errors when sent add requests.', { error: err }) + logger.info('Making requests to friends.') - Requests.removeRequests(requestsToMake.add.ids) - }) - } + // Requests by pods id + const requestsToMake = {} + + requests.forEach(function (poolRequest) { + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) + }) + + const goodPods = [] + const badPods = [] - // Send the remove requests - if (requestsToMake.remove.requests.length !== 0) { - makeRequest('remove', requestsToMake.remove.requests, function (err) { - if (err) logger.error('Errors when sent remove pool requests.', { error: err }) + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) return logger.error('Error finding pod by id.', { err: err }) + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + Requests.removePodOf(requestToMake.ids, toPodId) + return callbackEach() + } - Requests.removeRequests(requestsToMake.remove.ids) + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + Requests.removePodOf(requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } + + callbackEach() }) - } + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + Requests.removeWithEmptyTo() }) }) } +// Remove pods with a score of 0 (too many requests where they were unreachable) function removeBadPods () { async.waterfall([ function findBadPods (callback) { @@ -243,7 +258,7 @@ function updatePodsScore (goodPods, badPods) { }) Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { - if (err) logger.error('Cannot increment scores of bad pods.') + if (err) logger.error('Cannot decrement scores of bad pods.') removeBadPods() }) }