aboutsummaryrefslogblamecommitdiffhomepage
path: root/server/lib/request-scheduler.js
blob: c8bc4af288e7de14d482f2329a45326d8b034926 (plain) (tree)









































































































































































































                                                                                                                           
'use strict'

const eachLimit = require('async/eachLimit')

const constants = require('../initializers/constants')
const db = require('../initializers/database')
const logger = require('../helpers/logger')
const requests = require('../helpers/requests')

module.exports = class RequestScheduler {

  constructor (name) {
    this.name = name

    this.lastRequestTimestamp = 0
    this.timer = null
  }

  activate () {
    logger.info('Requests scheduler activated.')
    this.lastRequestTimestamp = Date.now()

    this.timer = setInterval(() => {
      this.lastRequestTimestamp = Date.now()
      this.makeRequests()
    }, constants.REQUESTS_INTERVAL)
  }

  deactivate () {
    logger.info('Requests scheduler deactivated.')
    clearInterval(this.timer)
    this.timer = null
  }

  forceSend () {
    logger.info('Force requests scheduler sending.')
    this.makeRequests()
  }

  remainingMilliSeconds () {
    if (this.timer === null) return -1

    return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
  }

  // { type, endpoint, data, toIds, transaction }
  createRequest (options, callback) {
    const type = options.type
    const endpoint = options.endpoint
    const data = options.data
    const toIds = options.toIds
    const transaction = options.transaction

    const pods = []

    // If there are no destination pods abort
    if (toIds.length === 0) return callback(null)

    toIds.forEach(toPod => {
      pods.push(db.Pod.build({ id: toPod }))
    })

    const createQuery = {
      endpoint,
      request: {
        type: type,
        data: data
      }
    }

    const dbRequestOptions = {
      transaction
    }

    return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => {
      if (err) return callback(err)

      return request.setPods(pods, dbRequestOptions).asCallback(callback)
    })
  }

  // ---------------------------------------------------------------------------

  // Make all the requests of the scheduler
  makeRequests () {
    // We limit the size of the requests
    // We don't want to stuck with the same failing requests so we get a random list
    db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
      if (err) {
        logger.error('Cannot get the list of requests.', { err: err })
        return // Abort
      }

      // If there are no requests, abort
      if (requests.length === 0) {
        logger.info('No requests to make.')
        return
      }

      // We want to group requests by destinations pod and endpoint
      const requestsToMakeGrouped = this.buildRequestObjects(requests)

      logger.info('Making requests to friends.')

      const goodPods = []
      const badPods = []

      eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
        const requestToMake = requestsToMakeGrouped[hashKey]
        const toPod = requestToMake.toPod

        // Maybe the pod is not our friend anymore so simply remove it
        if (!toPod) {
          const requestIdsToDelete = requestToMake.ids

          logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
          return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
        }

        this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
          if (success === false) {
            badPods.push(requestToMake.toPod.id)
            return callbackEach()
          }

          logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
          goodPods.push(requestToMake.toPod.id)

          // Remove the pod id of these request ids
          db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
        })
      }, () => {
        // All the requests were made, we update the pods score
        db.Request.updatePodsScore(goodPods, badPods)
        // Flush requests with no pod
        db.Request.removeWithEmptyTo(err => {
          if (err) logger.error('Error when removing requests with no pods.', { error: err })
        })
      })
    })
  }

  // Make a requests to friends of a certain type
  makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
    if (!callback) callback = function () {}

    const params = {
      toPod: toPod,
      sign: true, // Prove our identity
      method: 'POST',
      path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
      data: requestsToMake // Requests we need to make
    }

    // Make multiple retry requests to all of pods
    // The function fire some useful callbacks
    requests.makeSecureRequest(params, (err, res) => {
      if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
        err = err ? err.message : 'Status code not 20x : ' + res.statusCode
        logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })

        return callback(false)
      }

      return callback(true)
    })
  }

  buildRequestObjects (requests) {
    const requestsToMakeGrouped = {}

    Object.keys(requests).forEach(toPodId => {
      requests[toPodId].forEach(data => {
        const request = data.request
        const pod = data.pod
        const hashKey = toPodId + request.endpoint

        if (!requestsToMakeGrouped[hashKey]) {
          requestsToMakeGrouped[hashKey] = {
            toPod: pod,
            endpoint: request.endpoint,
            ids: [], // request ids, to delete them from the DB in the future
            datas: [] // requests data,
          }
        }

        requestsToMakeGrouped[hashKey].ids.push(request.id)
        requestsToMakeGrouped[hashKey].datas.push(request.request)
      })
    })

    return requestsToMakeGrouped
  }

  flush (callback) {
    db.Request.removeAll(err => {
      if (err) logger.error('Cannot flush the requests.', { error: err })

      return callback(err)
    })
  }
}