X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2FrequestsScheduler.js;h=ac75e5b93646983ba2e74c7d4ea54aa47445fa15;hb=528a9efa8272532bbd0dafc35c3e05e57c50f61e;hp=3c1df5d5c6b61b678cbfeb956c10ac3785352fa4;hpb=8c255eb53c8f47bd64778d6fbcb93b248ee14163;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 3c1df5d5c..ac75e5b93 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js @@ -11,13 +11,14 @@ const requests = require('../helpers/requests') const videos = require('../lib/videos') const Videos = require('../models/videos') -const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE let timer = null const requestsScheduler = { activate: activate, addRequest: addRequest, + addRequestTo: addRequestTo, deactivate: deactivate, + flush: flush, forceSend: forceSend } @@ -27,35 +28,37 @@ function activate () { } // Add request to the scheduler -function addRequest (id, type, request) { - logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) +function addRequest (type, data) { + logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) - Requests.findById(id, function (err, entity) { + const request = { + type: type, + data: data + } + + Pods.listAllIds(function (err, podIds) { if (err) { - logger.error('Error when trying to find a request.', { error: err }) - return // Abort + logger.debug('Cannot list pod ids.') + return } - // If there were already a request with this id in the scheduler... - if (entity) { - if (entity.type === type) { - logger.error('Cannot insert two same requests.') - return // Abort - } + // No friends + if (!podIds) return - // Remove the request of the other type - Requests.removeRequestById(id, function (err) { - if (err) { - logger.error('Cannot remove a request.', { error: err }) - return // Abort - } - }) - } else { - Requests.create(id, type, request, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - return // Abort - }) - } + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) + }) + }) +} + +function addRequestTo (podIds, type, data) { + const request = { + type: type, + data: data + } + + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) }) } @@ -64,6 +67,14 @@ function deactivate () { clearInterval(timer) } +function flush () { + Requests.removeAll(function (err) { + if (err) { + logger.error('Cannot flush the requests.', { error: err }) + } + }) +} + function forceSend () { logger.info('Force requests scheduler sending.') makeRequests() @@ -76,54 +87,28 @@ module.exports = requestsScheduler // --------------------------------------------------------------------------- // Make a requests to friends of a certain type -function makeRequest (type, requestsToMake, callback) { +function makeRequest (toPod, requestsToMake, callback) { if (!callback) callback = function () {} - Pods.list(function (err, pods) { - if (err) return callback(err) - - const params = { - encrypt: true, // Security - sign: true, // To prove our identity - method: 'POST', - path: null, // We build the path later - data: requestsToMake // Requests we need to make - } - - // If this is a valid type, we build the path - if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type - } else { - return callback(new Error('Unkown pool request type.')) + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) } - const badPods = [] - const goodPods = [] - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { - // We failed the request, add the pod unreachable to the bad pods list - if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { - badPods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - // Request success - goodPods.push(pod._id) - } - - return callbackEachPodFinished() - } - - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - // All the requests were made, we update the pods score - updatePodsScore(goodPods, badPods) - callback(null) - } + return callback(true) }) } @@ -143,39 +128,64 @@ function makeRequests () { logger.info('Making requests to friends.') + // Requests by pods id const requestsToMake = {} - for (const type of REQUEST_SCHEDULER_TYPE) { - requestsToMake[type] = { - ids: [], - requests: [] - } - } - // For each requests to make, we add it to the correct request type - async.each(requests, function (poolRequest, callbackEach) { - if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { - const requestTypeToMake = requestsToMake[poolRequest.type] - requestTypeToMake.requests.push(poolRequest.request) - requestTypeToMake.ids.push(poolRequest._id) - } else { - logger.error('Unkown request type.', { request_type: poolRequest.type }) - return // abort - } - - callbackEach() - }, function () { - for (let type of Object.keys(requestsToMake)) { - const requestTypeToMake = requestsToMake[type] - // If there are requests for this type - if (requestTypeToMake.requests.length !== 0) { - makeRequest(type, requestTypeToMake.requests, function (err) { - if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err }) - - // We made the requests, so we can remove them from the scheduler - Requests.removeRequests(requestTypeToMake.ids) - }) + requests.forEach(function (poolRequest) { + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) + }) + + const goodPods = [] + const badPods = [] + + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) return logger.error('Error finding pod by id.', { err: err }) + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + Requests.removePodOf(requestToMake.ids, toPodId) + return callbackEach() } - } + + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + Requests.removePodOf(requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } + + callbackEach() + }) + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + Requests.removeWithEmptyTo() }) }) }