-import * as eachLimit from 'async/eachLimit'
+import { isEmpty } from 'lodash'
+import * as Promise from 'bluebird'
import { database as db } from '../../initializers/database'
import { logger, makeSecureRequest } from '../../helpers'
-import { PodInstance } from '../../models'
+import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models'
import {
API_VERSION,
REQUESTS_IN_PARALLEL,
REQUESTS_INTERVAL
} from '../../initializers'
-abstract class AbstractRequestScheduler {
+interface RequestsObjects<U> {
+ [ id: string ]: {
+ toPod: PodInstance
+ endpoint: string
+ ids: number[] // ids
+ datas: U[]
+ }
+}
+
+abstract class AbstractRequestScheduler <T> {
requestInterval: number
limitPods: number
limitPerPod: number
this.requestInterval = REQUESTS_INTERVAL
}
- abstract getRequestModel ()
- abstract getRequestToPodModel ()
- abstract buildRequestObjects (requests: any)
+ abstract getRequestModel (): AbstractRequestClass<T>
+ abstract getRequestToPodModel (): AbstractRequestToPodClass
+ abstract buildRequestsObjects (requestsGrouped: T): RequestsObjects<any>
activate () {
logger.info('Requests scheduler activated.')
return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
}
- remainingRequestsCount (callback: (err: Error, total: number) => void) {
- return this.getRequestModel().countTotalRequests(callback)
+ remainingRequestsCount () {
+ return this.getRequestModel().countTotalRequests()
}
- flush (callback: (err: Error) => void) {
- this.getRequestModel().removeAll(callback)
+ flush () {
+ return this.getRequestModel().removeAll()
}
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
- protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) {
- if (!callback) callback = function () { /* empty */ }
-
+ protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
const params = {
toPod: toPod,
- sign: true, // Prove our identity
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/' + requestEndpoint,
data: requestsToMake // Requests we need to make
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
- makeSecureRequest(params, (err, res) => {
- if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
- err = err ? err.message : 'Status code not 20x : ' + res.statusCode
- logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
-
- return callback(err)
- }
+ return makeSecureRequest(params)
+ .then(({ response, body }) => {
+ if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
+ throw new Error('Status code not 20x : ' + response.statusCode)
+ }
+ })
+ .catch(err => {
+ logger.error('Error sending secure request to %s pod.', toPod.host, err)
- return callback(null)
- })
+ throw err
+ })
}
// Make all the requests of the scheduler
protected makeRequests () {
- this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
- if (err) {
- logger.error('Cannot get the list of "%s".', this.description, { err: err })
- return // Abort
- }
-
- // If there are no requests, abort
- if (requests.length === 0) {
- logger.info('No "%s" to make.', this.description)
- return
- }
-
- // We want to group requests by destinations pod and endpoint
- const requestsToMakeGrouped = this.buildRequestObjects(requests)
-
- logger.info('Making "%s" to friends.', this.description)
-
- const goodPods = []
- const badPods = []
-
- eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
- const requestToMake = requestsToMakeGrouped[hashKey]
- const toPod = requestToMake.toPod
-
- this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => {
- if (err) {
- badPods.push(requestToMake.toPod.id)
- return callbackEach()
- }
-
- logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
- goodPods.push(requestToMake.toPod.id)
-
- // Remove the pod id of these request ids
- this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
+ return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
+ .then((requestsGrouped: T) => {
+ // We want to group requests by destinations pod and endpoint
+ const requestsToMake = this.buildRequestsObjects(requestsGrouped)
+
+ // If there are no requests, abort
+ if (isEmpty(requestsToMake) === true) {
+ logger.info('No "%s" to make.', this.description)
+ return { goodPods: [], badPods: [] }
+ }
+
+ logger.info('Making "%s" to friends.', this.description)
+
+ const goodPods: number[] = []
+ const badPods: number[] = []
+
+ return Promise.map(Object.keys(requestsToMake), hashKey => {
+ const requestToMake = requestsToMake[hashKey]
+ const toPod: PodInstance = requestToMake.toPod
+
+ return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
+ .then(() => {
+ logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
+ goodPods.push(requestToMake.toPod.id)
+
+ this.afterRequestHook()
+
+ // Remove the pod id of these request ids
+ return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
+ })
+ .catch(err => {
+ badPods.push(requestToMake.toPod.id)
+ logger.info('Cannot make request to %s.', toPod.host, err)
+ })
+ }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
+ })
+ .then(({ goodPods, badPods }) => {
+ this.afterRequestsHook()
- this.afterRequestHook()
- })
- }, () => {
// All the requests were made, we update the pods score
- db.Pod.updatePodsScore(goodPods, badPods)
-
- this.afterRequestsHook()
+ return db.Pod.updatePodsScore(goodPods, badPods)
})
- })
+ .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
}
protected afterRequestHook () {
// ---------------------------------------------------------------------------
export {
- AbstractRequestScheduler
+ AbstractRequestScheduler,
+ RequestsObjects
}