From 5a976a8c351d2a9e23ceee1e193fca2893b7f12d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 5 May 2017 17:35:58 +0200 Subject: Server: move requests lib in their own directory --- server/lib/request/base-request-scheduler.js | 136 +++++++++++++++++++++ server/lib/request/request-scheduler.js | 97 +++++++++++++++ .../lib/request/request-video-event-scheduler.js | 108 ++++++++++++++++ server/lib/request/request-video-qadu-scheduler.js | 117 ++++++++++++++++++ 4 files changed, 458 insertions(+) create mode 100644 server/lib/request/base-request-scheduler.js create mode 100644 server/lib/request/request-scheduler.js create mode 100644 server/lib/request/request-video-event-scheduler.js create mode 100644 server/lib/request/request-video-qadu-scheduler.js (limited to 'server/lib/request') diff --git a/server/lib/request/base-request-scheduler.js b/server/lib/request/base-request-scheduler.js new file mode 100644 index 000000000..782448340 --- /dev/null +++ b/server/lib/request/base-request-scheduler.js @@ -0,0 +1,136 @@ +'use strict' + +const eachLimit = require('async/eachLimit') + +const constants = require('../../initializers/constants') +const db = require('../../initializers/database') +const logger = require('../../helpers/logger') +const requests = require('../../helpers/requests') + +module.exports = class BaseRequestScheduler { + constructor (options) { + this.lastRequestTimestamp = 0 + this.timer = null + this.requestInterval = constants.REQUESTS_INTERVAL + } + + activate () { + logger.info('Requests scheduler activated.') + this.lastRequestTimestamp = Date.now() + + this.timer = setInterval(() => { + this.lastRequestTimestamp = Date.now() + this.makeRequests() + }, this.requestInterval) + } + + deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(this.timer) + this.timer = null + } + + forceSend () { + logger.info('Force requests scheduler sending.') + this.makeRequests() + } + + remainingMilliSeconds () { + if (this.timer === null) return -1 + + return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) + } + + remainingRequestsCount (callback) { + return this.getRequestModel().countTotalRequests(callback) + } + + // --------------------------------------------------------------------------- + + // Make a requests to friends of a certain type + makeRequest (toPod, requestEndpoint, requestsToMake, callback) { + if (!callback) callback = function () {} + + const params = { + toPod: toPod, + sign: true, // Prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, (err, res) => { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + err = err ? err.message : 'Status code not 20x : ' + res.statusCode + logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) + + return callback(err) + } + + return callback(null) + }) + } + + // Make all the requests of the scheduler + makeRequests () { + this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { + if (err) { + logger.error('Cannot get the list of "%s".', this.description, { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No "%s" to make.', this.description) + return + } + + // We want to group requests by destinations pod and endpoint + const requestsToMakeGrouped = this.buildRequestObjects(requests) + + logger.info('Making "%s" to friends.', this.description) + + const goodPods = [] + const badPods = [] + + eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { + const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod + + this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { + if (err) { + badPods.push(requestToMake.toPod.id) + return callbackEach() + } + + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + // Remove the pod id of these request ids + this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) + + this.afterRequestHook() + }) + }, () => { + // All the requests were made, we update the pods score + db.Pod.updatePodsScore(goodPods, badPods) + + this.afterRequestsHook() + }) + }) + } + + flush (callback) { + this.getRequestModel().removeAll(callback) + } + + afterRequestHook () { + // Nothing to do, let children reimplement it + } + + afterRequestsHook () { + // Nothing to do, let children reimplement it + } +} diff --git a/server/lib/request/request-scheduler.js b/server/lib/request/request-scheduler.js new file mode 100644 index 000000000..555ec3e54 --- /dev/null +++ b/server/lib/request/request-scheduler.js @@ -0,0 +1,97 @@ +'use strict' + +const constants = require('../../initializers/constants') +const BaseRequestScheduler = require('./base-request-scheduler') +const db = require('../../initializers/database') +const logger = require('../../helpers/logger') + +module.exports = class RequestScheduler extends BaseRequestScheduler { + constructor () { + super() + + // We limit the size of the requests + this.limitPods = constants.REQUESTS_LIMIT_PODS + this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD + + this.description = 'requests' + } + + getRequestModel () { + return db.Request + } + + getRequestToPodModel () { + return db.RequestToPod + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: request.endpoint, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data, + } + } + + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].datas.push(request.request) + }) + }) + + return requestsToMakeGrouped + } + + // { type, endpoint, data, toIds, transaction } + createRequest (options, callback) { + const type = options.type + const endpoint = options.endpoint + const data = options.data + const toIds = options.toIds + const transaction = options.transaction + + const pods = [] + + // If there are no destination pods abort + if (toIds.length === 0) return callback(null) + + toIds.forEach(toPod => { + pods.push(db.Pod.build({ id: toPod })) + }) + + const createQuery = { + endpoint, + request: { + type: type, + data: data + } + } + + const dbRequestOptions = { + transaction + } + + return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { + if (err) return callback(err) + + return request.setPods(pods, dbRequestOptions).asCallback(callback) + }) + } + + // --------------------------------------------------------------------------- + + afterRequestsHook () { + // Flush requests with no pod + this.getRequestModel().removeWithEmptyTo(err => { + if (err) logger.error('Error when removing requests with no pods.', { error: err }) + }) + } +} diff --git a/server/lib/request/request-video-event-scheduler.js b/server/lib/request/request-video-event-scheduler.js new file mode 100644 index 000000000..e54d34f4a --- /dev/null +++ b/server/lib/request/request-video-event-scheduler.js @@ -0,0 +1,108 @@ +'use strict' + +const BaseRequestScheduler = require('./base-request-scheduler') +const constants = require('../../initializers/constants') +const db = require('../../initializers/database') + +module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler { + constructor () { + super() + + // We limit the size of the requests + this.limitPods = constants.REQUESTS_VIDEO_EVENT_LIMIT_PODS + this.limitPerPod = constants.REQUESTS_VIDEO_EVENT_LIMIT_PER_POD + + this.description = 'video event requests' + } + + getRequestModel () { + return db.RequestVideoEvent + } + + getRequestToPodModel () { + return db.RequestVideoEvent + } + + buildRequestObjects (eventsToProcess) { + const requestsToMakeGrouped = {} + + /* Example: + { + pod1: { + video1: { views: 4, likes: 5 }, + video2: { likes: 5 } + } + } + */ + const eventsPerVideoPerPod = {} + + // We group video events per video and per pod + // We add the counts of the same event types + Object.keys(eventsToProcess).forEach(toPodId => { + eventsToProcess[toPodId].forEach(eventToProcess => { + if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} + + if (!requestsToMakeGrouped[toPodId]) { + requestsToMakeGrouped[toPodId] = { + toPod: eventToProcess.pod, + endpoint: constants.REQUEST_VIDEO_EVENT_ENDPOINT, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data + } + } + requestsToMakeGrouped[toPodId].ids.push(eventToProcess.id) + + const eventsPerVideo = eventsPerVideoPerPod[toPodId] + const remoteId = eventToProcess.video.remoteId + if (!eventsPerVideo[remoteId]) eventsPerVideo[remoteId] = {} + + const events = eventsPerVideo[remoteId] + if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 + + events[eventToProcess.type] += eventToProcess.count + }) + }) + + // Now we build our requests array per pod + Object.keys(eventsPerVideoPerPod).forEach(toPodId => { + const eventsForPod = eventsPerVideoPerPod[toPodId] + + Object.keys(eventsForPod).forEach(remoteId => { + const eventsForVideo = eventsForPod[remoteId] + + Object.keys(eventsForVideo).forEach(eventType => { + requestsToMakeGrouped[toPodId].datas.push({ + data: { + remoteId, + eventType, + count: eventsForVideo[eventType] + } + }) + }) + }) + }) + + return requestsToMakeGrouped + } + + // { type, videoId, count?, transaction? } + createRequest (options, callback) { + const type = options.type + const videoId = options.videoId + const transaction = options.transaction + let count = options.count + + if (count === undefined) count = 1 + + const dbRequestOptions = {} + if (transaction) dbRequestOptions.transaction = transaction + + const createQuery = { + type, + count, + videoId + } + + return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) + } +} diff --git a/server/lib/request/request-video-qadu-scheduler.js b/server/lib/request/request-video-qadu-scheduler.js new file mode 100644 index 000000000..17402b556 --- /dev/null +++ b/server/lib/request/request-video-qadu-scheduler.js @@ -0,0 +1,117 @@ +'use strict' + +const BaseRequestScheduler = require('./base-request-scheduler') +const constants = require('../../initializers/constants') +const db = require('../../initializers/database') +const logger = require('../../helpers/logger') + +module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler { + constructor () { + super() + + // We limit the size of the requests + this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS + this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PER_POD + + this.description = 'video QADU requests' + } + + getRequestModel () { + return db.RequestVideoQadu + } + + getRequestToPodModel () { + return db.RequestVideoQadu + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const video = data.video + const pod = data.pod + const hashKey = toPodId + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: constants.REQUEST_VIDEO_QADU_ENDPOINT, + ids: [], // request ids, to delete them from the DB in the future + datas: [], // requests data + videos: {} + } + } + + // Maybe another attribute was filled for this video + let videoData = requestsToMakeGrouped[hashKey].videos[video.id] + if (!videoData) videoData = {} + + switch (request.type) { + case constants.REQUEST_VIDEO_QADU_TYPES.LIKES: + videoData.likes = video.likes + break + + case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES: + videoData.dislikes = video.dislikes + break + + case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS: + videoData.views = video.views + break + + default: + logger.error('Unknown request video QADU type %s.', request.type) + return + } + + // Do not forget the remoteId so the remote pod can identify the video + videoData.remoteId = video.id + requestsToMakeGrouped[hashKey].ids.push(request.id) + + // Maybe there are multiple quick and dirty update for the same video + // We use this hashmap to dedupe them + requestsToMakeGrouped[hashKey].videos[video.id] = videoData + }) + }) + + // Now we deduped similar quick and dirty updates, we can build our requests datas + Object.keys(requestsToMakeGrouped).forEach(hashKey => { + Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => { + const videoData = requestsToMakeGrouped[hashKey].videos[videoId] + + requestsToMakeGrouped[hashKey].datas.push({ + data: videoData + }) + }) + + // We don't need it anymore, it was just to build our datas array + delete requestsToMakeGrouped[hashKey].videos + }) + + return requestsToMakeGrouped + } + + // { type, videoId, transaction? } + createRequest (options, callback) { + const type = options.type + const videoId = options.videoId + const transaction = options.transaction + + const dbRequestOptions = {} + if (transaction) dbRequestOptions.transaction = transaction + + // Send the update to all our friends + db.Pod.listAllIds(options.transaction, function (err, podIds) { + if (err) return callback(err) + + const queries = [] + podIds.forEach(podId => { + queries.push({ type, videoId, podId }) + }) + + return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) + }) + } +} -- cgit v1.2.3