From c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Sat, 18 Feb 2017 10:29:36 +0100 Subject: Server: use a request scheduler object instance for friends communication --- server/lib/request-scheduler.js | 202 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 server/lib/request-scheduler.js (limited to 'server/lib/request-scheduler.js') diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js new file mode 100644 index 000000000..c8bc4af28 --- /dev/null +++ b/server/lib/request-scheduler.js @@ -0,0 +1,202 @@ +'use strict' + +const eachLimit = require('async/eachLimit') + +const constants = require('../initializers/constants') +const db = require('../initializers/database') +const logger = require('../helpers/logger') +const requests = require('../helpers/requests') + +module.exports = class RequestScheduler { + + constructor (name) { + this.name = name + + this.lastRequestTimestamp = 0 + this.timer = null + } + + activate () { + logger.info('Requests scheduler activated.') + this.lastRequestTimestamp = Date.now() + + this.timer = setInterval(() => { + this.lastRequestTimestamp = Date.now() + this.makeRequests() + }, constants.REQUESTS_INTERVAL) + } + + deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(this.timer) + this.timer = null + } + + forceSend () { + logger.info('Force requests scheduler sending.') + this.makeRequests() + } + + remainingMilliSeconds () { + if (this.timer === null) return -1 + + return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) + } + + // { type, endpoint, data, toIds, transaction } + createRequest (options, callback) { + const type = options.type + const endpoint = options.endpoint + const data = options.data + const toIds = options.toIds + const transaction = options.transaction + + const pods = [] + + // If there are no destination pods abort + if (toIds.length === 0) return callback(null) + + toIds.forEach(toPod => { + pods.push(db.Pod.build({ id: toPod })) + }) + + const createQuery = { + endpoint, + request: { + type: type, + data: data + } + } + + const dbRequestOptions = { + transaction + } + + return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { + if (err) return callback(err) + + return request.setPods(pods, dbRequestOptions).asCallback(callback) + }) + } + + // --------------------------------------------------------------------------- + + // Make all the requests of the scheduler + makeRequests () { + // We limit the size of the requests + // We don't want to stuck with the same failing requests so we get a random list + db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { + if (err) { + logger.error('Cannot get the list of requests.', { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No requests to make.') + return + } + + // We want to group requests by destinations pod and endpoint + const requestsToMakeGrouped = this.buildRequestObjects(requests) + + logger.info('Making requests to friends.') + + const goodPods = [] + const badPods = [] + + eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { + const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod + + // Maybe the pod is not our friend anymore so simply remove it + if (!toPod) { + const requestIdsToDelete = requestToMake.ids + + logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) + return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) + } + + this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { + if (success === false) { + badPods.push(requestToMake.toPod.id) + return callbackEach() + } + + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + // Remove the pod id of these request ids + db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) + }) + }, () => { + // All the requests were made, we update the pods score + db.Request.updatePodsScore(goodPods, badPods) + // Flush requests with no pod + db.Request.removeWithEmptyTo(err => { + if (err) logger.error('Error when removing requests with no pods.', { error: err }) + }) + }) + }) + } + + // Make a requests to friends of a certain type + makeRequest (toPod, requestEndpoint, requestsToMake, callback) { + if (!callback) callback = function () {} + + const params = { + toPod: toPod, + sign: true, // Prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, (err, res) => { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + err = err ? err.message : 'Status code not 20x : ' + res.statusCode + logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) + + return callback(false) + } + + return callback(true) + }) + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: request.endpoint, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data, + } + } + + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].datas.push(request.request) + }) + }) + + return requestsToMakeGrouped + } + + flush (callback) { + db.Request.removeAll(err => { + if (err) logger.error('Cannot flush the requests.', { error: err }) + + return callback(err) + }) + } +} -- cgit v1.2.3