X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2FrequestsScheduler.js;h=ac75e5b93646983ba2e74c7d4ea54aa47445fa15;hb=528a9efa8272532bbd0dafc35c3e05e57c50f61e;hp=4953f6a91388441832c1783428ec7f665abe207b;hpb=cbe2f7c34822b1bd3b1f8c691f79f0c29cf21f07;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 4953f6a91..ac75e5b93 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js @@ -16,7 +16,9 @@ let timer = null const requestsScheduler = { activate: activate, addRequest: addRequest, + addRequestTo: addRequestTo, deactivate: deactivate, + flush: flush, forceSend: forceSend } @@ -25,34 +27,38 @@ function activate () { timer = setInterval(makeRequests, constants.INTERVAL) } -function addRequest (id, type, request) { - logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) +// Add request to the scheduler +function addRequest (type, data) { + logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) - Requests.findById(id, function (err, entity) { + const request = { + type: type, + data: data + } + + Pods.listAllIds(function (err, podIds) { if (err) { - logger.error('Cannot find one request.', { error: err }) - return // Abort + logger.debug('Cannot list pod ids.') + return } - if (entity) { - if (entity.type === type) { - logger.error('Cannot insert two same requests.') - return // Abort - } + // No friends + if (!podIds) return - // Remove the request of the other type - Requests.removeRequestById(id, function (err) { - if (err) { - logger.error('Cannot remove a request.', { error: err }) - return // Abort - } - }) - } else { - Requests.create(id, type, request, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - return // Abort - }) - } + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) + }) + }) +} + +function addRequestTo (podIds, type, data) { + const request = { + type: type, + data: data + } + + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) }) } @@ -61,6 +67,14 @@ function deactivate () { clearInterval(timer) } +function flush () { + Requests.removeAll(function (err) { + if (err) { + logger.error('Cannot flush the requests.', { error: err }) + } + }) +} + function forceSend () { logger.info('Force requests scheduler sending.') makeRequests() @@ -72,152 +86,179 @@ module.exports = requestsScheduler // --------------------------------------------------------------------------- -function makeRequest (type, requests_to_make, callback) { +// Make a requests to friends of a certain type +function makeRequest (toPod, requestsToMake, callback) { if (!callback) callback = function () {} - Pods.list(function (err, pods) { - if (err) return callback(err) - - const params = { - encrypt: true, - sign: true, - method: 'POST', - path: null, - data: requests_to_make - } - - if (type === 'add') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' - } else if (type === 'remove') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' - } else { - return callback(new Error('Unkown pool request type.')) - } - - const bad_pods = [] - const good_pods = [] - - requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { - if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { - bad_pods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - good_pods.push(pod._id) - } - - return callback_each_pod_finished() + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) } - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - updatePodsScore(good_pods, bad_pods) - callback(null) - } + return callback(true) }) } +// Make all the requests of the scheduler function makeRequests () { - logger.info('Making requests to friends.') - Requests.list(function (err, requests) { if (err) { logger.error('Cannot get the list of requests.', { err: err }) return // Abort } - if (requests.length === 0) return - - const requests_to_make = { - add: { - ids: [], - requests: [] - }, - remove: { - ids: [], - requests: [] - } + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No requests to make.') + return } - async.each(requests, function (pool_request, callback_each) { - if (pool_request.type === 'add') { - requests_to_make.add.requests.push(pool_request.request) - requests_to_make.add.ids.push(pool_request._id) - } else if (pool_request.type === 'remove') { - requests_to_make.remove.requests.push(pool_request.request) - requests_to_make.remove.ids.push(pool_request._id) - } else { - logger.error('Unkown request type.', { request_type: pool_request.type }) - return // abort - } - - callback_each() - }, function () { - // Send the add requests - if (requests_to_make.add.requests.length !== 0) { - makeRequest('add', requests_to_make.add.requests, function (err) { - if (err) logger.error('Errors when sent add requests.', { error: err }) + logger.info('Making requests to friends.') - Requests.removeRequests(requests_to_make.add.ids) - }) - } + // Requests by pods id + const requestsToMake = {} - // Send the remove requests - if (requests_to_make.remove.requests.length !== 0) { - makeRequest('remove', requests_to_make.remove.requests, function (err) { - if (err) logger.error('Errors when sent remove pool requests.', { error: err }) + requests.forEach(function (poolRequest) { + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) + }) + + const goodPods = [] + const badPods = [] + + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) return logger.error('Error finding pod by id.', { err: err }) + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + Requests.removePodOf(requestToMake.ids, toPodId) + return callbackEach() + } + + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + Requests.removePodOf(requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } - Requests.removeRequests(requests_to_make.remove.ids) + callbackEach() }) - } + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + Requests.removeWithEmptyTo() }) }) } +// Remove pods with a score of 0 (too many requests where they were unreachable) function removeBadPods () { - Pods.findBadPods(function (err, pods) { - if (err) { - logger.error('Cannot find bad pods.', { error: err }) - return // abort - } + async.waterfall([ + function findBadPods (callback) { + Pods.findBadPods(function (err, pods) { + if (err) { + logger.error('Cannot find bad pods.', { error: err }) + return callback(err) + } - if (pods.length === 0) return + return callback(null, pods) + }) + }, - const urls = map(pods, 'url') - const ids = map(pods, '_id') + function listVideosOfTheseBadPods (pods, callback) { + if (pods.length === 0) return callback(null) - Videos.listFromUrls(urls, function (err, videos_list) { - if (err) { - logger.error('Cannot list videos urls.', { error: err, urls: urls }) - } else { - videos.removeRemoteVideos(videos_list, function (err) { - if (err) logger.error('Cannot remove remote videos.', { error: err }) - }) - } + const urls = map(pods, 'url') + const ids = map(pods, '_id') - Pods.removeAllByIds(ids, function (err, r) { + Videos.listFromUrls(urls, function (err, videosList) { if (err) { - logger.error('Cannot remove bad pods.', { error: err }) - } else { - const pods_removed = r.result.n - logger.info('Removed %d pods.', pods_removed) + logger.error('Cannot list videos urls.', { error: err, urls: urls }) + return callback(null, ids, []) } + + return callback(null, ids, videosList) }) - }) + }, + + function removeVideosOfTheseBadPods (podIds, videosList, callback) { + // We don't have to remove pods, skip + if (typeof podIds === 'function') return podIds(null) + + // Remove the remote videos + videos.removeRemoteVideos(videosList, function (err) { + if (err) logger.error('Cannot remove remote videos.', { error: err }) + + return callback(null, podIds) + }) + }, + + function removeBadPodsFromDB (podIds, callback) { + // We don't have to remove pods, skip + if (typeof podIds === 'function') return podIds(null) + + Pods.removeAllByIds(podIds, callback) + } + ], function (err, removeResult) { + if (err) { + logger.error('Cannot remove bad pods.', { error: err }) + } else if (removeResult) { + const podsRemoved = removeResult.result.n + logger.info('Removed %d pods.', podsRemoved) + } else { + logger.info('No need to remove bad pods.') + } }) } -function updatePodsScore (good_pods, bad_pods) { - logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) +function updatePodsScore (goodPods, badPods) { + logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) - Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { + Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { if (err) logger.error('Cannot increment scores of good pods.') }) - Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { - if (err) logger.error('Cannot increment scores of bad pods.') + Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { + if (err) logger.error('Cannot decrement scores of bad pods.') removeBadPods() }) }