aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/base-request-scheduler.js
blob: 309c1a26166c06ec22c4b4161fb45ca4f7e1eedf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
'use strict'

const eachLimit = require('async/eachLimit')

const constants = require('../initializers/constants')
const db = require('../initializers/database')
const logger = require('../helpers/logger')
const requests = require('../helpers/requests')

module.exports = class BaseRequestScheduler {

  constructor (options) {
    this.lastRequestTimestamp = 0
    this.timer = null
  }

  activate () {
    logger.info('Requests scheduler activated.')
    this.lastRequestTimestamp = Date.now()

    this.timer = setInterval(() => {
      this.lastRequestTimestamp = Date.now()
      this.makeRequests()
    }, constants.REQUESTS_INTERVAL)
  }

  deactivate () {
    logger.info('Requests scheduler deactivated.')
    clearInterval(this.timer)
    this.timer = null
  }

  forceSend () {
    logger.info('Force requests scheduler sending.')
    this.makeRequests()
  }

  remainingMilliSeconds () {
    if (this.timer === null) return -1

    return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
  }

  // ---------------------------------------------------------------------------

  // Make a requests to friends of a certain type
  makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
    if (!callback) callback = function () {}

    const params = {
      toPod: toPod,
      sign: true, // Prove our identity
      method: 'POST',
      path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
      data: requestsToMake // Requests we need to make
    }

    // Make multiple retry requests to all of pods
    // The function fire some useful callbacks
    requests.makeSecureRequest(params, (err, res) => {
      if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
        err = err ? err.message : 'Status code not 20x : ' + res.statusCode
        logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })

        return callback(false)
      }

      return callback(true)
    })
  }

    // Make all the requests of the scheduler
  makeRequests () {
    this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
      if (err) {
        logger.error('Cannot get the list of "%s".', this.description, { err: err })
        return // Abort
      }

      // If there are no requests, abort
      if (requests.length === 0) {
        logger.info('No "%s" to make.', this.description)
        return
      }

      // We want to group requests by destinations pod and endpoint
      const requestsToMakeGrouped = this.buildRequestObjects(requests)

      logger.info('Making "%s" to friends.', this.description)

      const goodPods = []
      const badPods = []

      eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
        const requestToMake = requestsToMakeGrouped[hashKey]
        const toPod = requestToMake.toPod

        this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
          if (success === false) {
            badPods.push(requestToMake.toPod.id)
            return callbackEach()
          }

          logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
          goodPods.push(requestToMake.toPod.id)

          // Remove the pod id of these request ids
          this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)

          this.afterRequestHook()
        })
      }, () => {
        // All the requests were made, we update the pods score
        db.Pod.updatePodsScore(goodPods, badPods)

        this.afterRequestsHook()
      })
    })
  }

  flush (callback) {
    this.getRequestModel().removeAll(callback)
  }

  afterRequestHook () {
   // Nothing to do, let children reimplement it
  }

  afterRequestsHook () {
   // Nothing to do, let children reimplement it
  }
}