From c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Sat, 18 Feb 2017 10:29:36 +0100 Subject: [PATCH] Server: use a request scheduler object instance for friends communication --- server.js | 5 +- server/lib/friends.js | 59 +++------ server/lib/request-scheduler.js | 202 +++++++++++++++++++++++++++++++ server/models/request.js | 208 +++++--------------------------- 4 files changed, 247 insertions(+), 227 deletions(-) create mode 100644 server/lib/request-scheduler.js diff --git a/server.js b/server.js index 7503072af..4a0de72bb 100644 --- a/server.js +++ b/server.js @@ -37,6 +37,7 @@ if (errorMessage !== null) { // ----------- PeerTube modules ----------- const customValidators = require('./server/helpers/custom-validators') +const friends = require('./server/lib/friends') const installer = require('./server/initializers/installer') const migrator = require('./server/initializers/migrator') const routes = require('./server/controllers') @@ -128,8 +129,8 @@ installer.installApplication(function (err) { // ----------- Make the server listening ----------- server.listen(port, function () { - // Activate the pool requests - db.Request.activate() + // Activate the communication with friends + friends.activate() logger.info('Server listening on port %d', port) logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL) diff --git a/server/lib/friends.js b/server/lib/friends.js index 9b38693c7..7dfa62a2a 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -11,10 +11,13 @@ const db = require('../initializers/database') const logger = require('../helpers/logger') const peertubeCrypto = require('../helpers/peertube-crypto') const requests = require('../helpers/requests') +const RequestScheduler = require('./request-scheduler') const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] +const requestScheduler = new RequestScheduler('') const friends = { + activate, addVideoToFriends, updateVideoToFriends, reportAbuseVideoToFriend, @@ -25,6 +28,10 @@ const friends = { sendOwnedVideosToPod } +function activate () { + requestScheduler.activate() +} + function addVideoToFriends (videoData, transaction, callback) { const options = { type: ENDPOINT_ACTIONS.ADD, @@ -99,11 +106,11 @@ function makeFriends (hosts, callback) { function quitFriends (callback) { // Stop pool requests - db.Request.deactivate() + requestScheduler.deactivate() waterfall([ function flushRequests (callbackAsync) { - db.Request.flush(callbackAsync) + requestScheduler.flush(callbackAsync) }, function getPodsList (callbackAsync) { @@ -140,7 +147,7 @@ function quitFriends (callback) { } ], function (err) { // Don't forget to re activate the scheduler, even if there was an error - db.Request.activate() + requestScheduler.activate() if (err) return callback(err) @@ -235,9 +242,9 @@ function getForeignPodsList (host, callback) { function makeRequestsToWinningPods (cert, podsList, callback) { // Stop pool requests - db.Request.deactivate() + requestScheduler.deactivate() // Flush pool requests - db.Request.forceSend() + requestScheduler.forceSend() eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { const params = { @@ -278,7 +285,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) { }, function endRequests () { // Final callback, we've ended all the requests // Now we made new friends, we can re activate the pool of requests - db.Request.activate() + requestScheduler.activate() logger.debug('makeRequestsToWinningPods finished.') return callback() @@ -289,7 +296,7 @@ function makeRequestsToWinningPods (cert, podsList, callback) { // { type, endpoint, data, toIds, transaction } function createRequest (options, callback) { if (!callback) callback = function () {} - if (options.toIds) return _createRequest(options, callback) + if (options.toIds) return requestScheduler.createRequest(options, callback) // If the "toIds" pods is not specified, we send the request to all our friends db.Pod.listAllIds(options.transaction, function (err, podIds) { @@ -299,43 +306,7 @@ function createRequest (options, callback) { } const newOptions = Object.assign(options, { toIds: podIds }) - return _createRequest(newOptions, callback) - }) -} - -// { type, endpoint, data, toIds, transaction } -function _createRequest (options, callback) { - const type = options.type - const endpoint = options.endpoint - const data = options.data - const toIds = options.toIds - const transaction = options.transaction - - const pods = [] - - // If there are no destination pods abort - if (toIds.length === 0) return callback(null) - - toIds.forEach(function (toPod) { - pods.push(db.Pod.build({ id: toPod })) - }) - - const createQuery = { - endpoint, - request: { - type: type, - data: data - } - } - - const dbRequestOptions = { - transaction - } - - return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) { - if (err) return callback(err) - - return request.setPods(pods, dbRequestOptions).asCallback(callback) + return requestScheduler.createRequest(newOptions, callback) }) } diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js new file mode 100644 index 000000000..c8bc4af28 --- /dev/null +++ b/server/lib/request-scheduler.js @@ -0,0 +1,202 @@ +'use strict' + +const eachLimit = require('async/eachLimit') + +const constants = require('../initializers/constants') +const db = require('../initializers/database') +const logger = require('../helpers/logger') +const requests = require('../helpers/requests') + +module.exports = class RequestScheduler { + + constructor (name) { + this.name = name + + this.lastRequestTimestamp = 0 + this.timer = null + } + + activate () { + logger.info('Requests scheduler activated.') + this.lastRequestTimestamp = Date.now() + + this.timer = setInterval(() => { + this.lastRequestTimestamp = Date.now() + this.makeRequests() + }, constants.REQUESTS_INTERVAL) + } + + deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(this.timer) + this.timer = null + } + + forceSend () { + logger.info('Force requests scheduler sending.') + this.makeRequests() + } + + remainingMilliSeconds () { + if (this.timer === null) return -1 + + return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) + } + + // { type, endpoint, data, toIds, transaction } + createRequest (options, callback) { + const type = options.type + const endpoint = options.endpoint + const data = options.data + const toIds = options.toIds + const transaction = options.transaction + + const pods = [] + + // If there are no destination pods abort + if (toIds.length === 0) return callback(null) + + toIds.forEach(toPod => { + pods.push(db.Pod.build({ id: toPod })) + }) + + const createQuery = { + endpoint, + request: { + type: type, + data: data + } + } + + const dbRequestOptions = { + transaction + } + + return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { + if (err) return callback(err) + + return request.setPods(pods, dbRequestOptions).asCallback(callback) + }) + } + + // --------------------------------------------------------------------------- + + // Make all the requests of the scheduler + makeRequests () { + // We limit the size of the requests + // We don't want to stuck with the same failing requests so we get a random list + db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { + if (err) { + logger.error('Cannot get the list of requests.', { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No requests to make.') + return + } + + // We want to group requests by destinations pod and endpoint + const requestsToMakeGrouped = this.buildRequestObjects(requests) + + logger.info('Making requests to friends.') + + const goodPods = [] + const badPods = [] + + eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { + const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod + + // Maybe the pod is not our friend anymore so simply remove it + if (!toPod) { + const requestIdsToDelete = requestToMake.ids + + logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) + return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) + } + + this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { + if (success === false) { + badPods.push(requestToMake.toPod.id) + return callbackEach() + } + + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + // Remove the pod id of these request ids + db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) + }) + }, () => { + // All the requests were made, we update the pods score + db.Request.updatePodsScore(goodPods, badPods) + // Flush requests with no pod + db.Request.removeWithEmptyTo(err => { + if (err) logger.error('Error when removing requests with no pods.', { error: err }) + }) + }) + }) + } + + // Make a requests to friends of a certain type + makeRequest (toPod, requestEndpoint, requestsToMake, callback) { + if (!callback) callback = function () {} + + const params = { + toPod: toPod, + sign: true, // Prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, (err, res) => { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + err = err ? err.message : 'Status code not 20x : ' + res.statusCode + logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) + + return callback(false) + } + + return callback(true) + }) + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: request.endpoint, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data, + } + } + + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].datas.push(request.request) + }) + }) + + return requestsToMakeGrouped + } + + flush (callback) { + db.Request.removeAll(err => { + if (err) logger.error('Cannot flush the requests.', { error: err }) + + return callback(err) + }) + } +} diff --git a/server/models/request.js b/server/models/request.js index baa26fc1b..ca616d130 100644 --- a/server/models/request.js +++ b/server/models/request.js @@ -1,16 +1,11 @@ 'use strict' const each = require('async/each') -const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') const values = require('lodash/values') const constants = require('../initializers/constants') const logger = require('../helpers/logger') -const requests = require('../helpers/requests') - -let timer = null -let lastRequestTimestamp = 0 // --------------------------------------------------------------------------- @@ -30,12 +25,13 @@ module.exports = function (sequelize, DataTypes) { classMethods: { associate, - activate, + listWithLimitAndRandom, + countTotalRequests, - deactivate, - flush, - forceSend, - remainingMilliSeconds + removeBadPods, + updatePodsScore, + removeAll, + removeWithEmptyTo } } ) @@ -56,17 +52,6 @@ function associate (models) { }) } -function activate () { - logger.info('Requests scheduler activated.') - lastRequestTimestamp = Date.now() - - const self = this - timer = setInterval(function () { - lastRequestTimestamp = Date.now() - makeRequests.call(self) - }, constants.REQUESTS_INTERVAL) -} - function countTotalRequests (callback) { const query = { include: [ this.sequelize.models.Pod ] @@ -75,147 +60,6 @@ function countTotalRequests (callback) { return this.count(query).asCallback(callback) } -function deactivate () { - logger.info('Requests scheduler deactivated.') - clearInterval(timer) - timer = null -} - -function flush (callback) { - removeAll.call(this, function (err) { - if (err) logger.error('Cannot flush the requests.', { error: err }) - - return callback(err) - }) -} - -function forceSend () { - logger.info('Force requests scheduler sending.') - makeRequests.call(this) -} - -function remainingMilliSeconds () { - if (timer === null) return -1 - - return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp) -} - -// --------------------------------------------------------------------------- - -// Make a requests to friends of a certain type -function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { - if (!callback) callback = function () {} - - const params = { - toPod: toPod, - sign: true, // Prove our identity - method: 'POST', - path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, - data: requestsToMake // Requests we need to make - } - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeSecureRequest(params, function (err, res) { - if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { - err = err ? err.message : 'Status code not 20x : ' + res.statusCode - logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) - - return callback(false) - } - - return callback(true) - }) -} - -// Make all the requests of the scheduler -function makeRequests () { - const self = this - const RequestToPod = this.sequelize.models.RequestToPod - - // We limit the size of the requests - // We don't want to stuck with the same failing requests so we get a random list - listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) { - if (err) { - logger.error('Cannot get the list of requests.', { err: err }) - return // Abort - } - - // If there are no requests, abort - if (requests.length === 0) { - logger.info('No requests to make.') - return - } - - // We want to group requests by destinations pod and endpoint - const requestsToMakeGrouped = buildRequestObjects(requests) - - logger.info('Making requests to friends.') - - const goodPods = [] - const badPods = [] - - eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { - const requestToMake = requestsToMakeGrouped[hashKey] - const toPod = requestToMake.toPod - - // Maybe the pod is not our friend anymore so simply remove it - if (!toPod) { - const requestIdsToDelete = requestToMake.ids - - logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) - return RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) - } - - makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { - if (success === false) { - badPods.push(requestToMake.toPod.id) - return callbackEach() - } - - logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) - goodPods.push(requestToMake.toPod.id) - - // Remove the pod id of these request ids - RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) - }) - }, function () { - // All the requests were made, we update the pods score - updatePodsScore.call(self, goodPods, badPods) - // Flush requests with no pod - removeWithEmptyTo.call(self, function (err) { - if (err) logger.error('Error when removing requests with no pods.', { error: err }) - }) - }) - }) -} - -function buildRequestObjects (requests) { - const requestsToMakeGrouped = {} - - Object.keys(requests).forEach(function (toPodId) { - requests[toPodId].forEach(function (data) { - const request = data.request - const pod = data.pod - const hashKey = toPodId + request.endpoint - - if (!requestsToMakeGrouped[hashKey]) { - requestsToMakeGrouped[hashKey] = { - toPod: pod, - endpoint: request.endpoint, - ids: [], // request ids, to delete them from the DB in the future - datas: [] // requests data, - } - } - - requestsToMakeGrouped[hashKey].ids.push(request.id) - requestsToMakeGrouped[hashKey].datas.push(request.request) - }) - }) - - return requestsToMakeGrouped -} - // Remove pods with a score of 0 (too many requests where they were unreachable) function removeBadPods () { const self = this @@ -307,25 +151,6 @@ function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) { }) } -function groupAndTruncateRequests (requests, limitRequestsPerPod) { - const requestsGrouped = {} - - requests.forEach(function (request) { - request.Pods.forEach(function (pod) { - if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] - - if (requestsGrouped[pod.id].length < limitRequestsPerPod) { - requestsGrouped[pod.id].push({ - request, - pod - }) - } - }) - }) - - return requestsGrouped -} - function removeAll (callback) { // Delete all requests this.truncate({ cascade: true }).asCallback(callback) @@ -346,3 +171,24 @@ function removeWithEmptyTo (callback) { this.destroy(query).asCallback(callback) } + +// --------------------------------------------------------------------------- + +function groupAndTruncateRequests (requests, limitRequestsPerPod) { + const requestsGrouped = {} + + requests.forEach(function (request) { + request.Pods.forEach(function (pod) { + if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = [] + + if (requestsGrouped[pod.id].length < limitRequestsPerPod) { + requestsGrouped[pod.id].push({ + request, + pod + }) + } + }) + }) + + return requestsGrouped +} -- 2.41.0