1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
import * as eachLimit from 'async/eachLimit'
import { database as db } from '../../initializers/database'
import { logger, makeSecureRequest } from '../../helpers'
import { PodInstance } from '../../models'
import {
API_VERSION,
REQUESTS_IN_PARALLEL,
REQUESTS_INTERVAL
} from '../../initializers'
abstract class BaseRequestScheduler {
requestInterval: number
limitPods: number
limitPerPod: number
protected lastRequestTimestamp: number
protected timer: NodeJS.Timer
protected description: string
constructor () {
this.lastRequestTimestamp = 0
this.timer = null
this.requestInterval = REQUESTS_INTERVAL
}
abstract getRequestModel ()
abstract getRequestToPodModel ()
abstract buildRequestObjects (requests: any)
activate () {
logger.info('Requests scheduler activated.')
this.lastRequestTimestamp = Date.now()
this.timer = setInterval(() => {
this.lastRequestTimestamp = Date.now()
this.makeRequests()
}, this.requestInterval)
}
deactivate () {
logger.info('Requests scheduler deactivated.')
clearInterval(this.timer)
this.timer = null
}
forceSend () {
logger.info('Force requests scheduler sending.')
this.makeRequests()
}
remainingMilliSeconds () {
if (this.timer === null) return -1
return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
}
remainingRequestsCount (callback: (err: Error, total: number) => void) {
return this.getRequestModel().countTotalRequests(callback)
}
flush (callback: (err: Error) => void) {
this.getRequestModel().removeAll(callback)
}
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) {
if (!callback) callback = function () { /* empty */ }
const params = {
toPod: toPod,
sign: true, // Prove our identity
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/' + requestEndpoint,
data: requestsToMake // Requests we need to make
}
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
makeSecureRequest(params, (err, res) => {
if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
err = err ? err.message : 'Status code not 20x : ' + res.statusCode
logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
return callback(err)
}
return callback(null)
})
}
// Make all the requests of the scheduler
protected makeRequests () {
this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
if (err) {
logger.error('Cannot get the list of "%s".', this.description, { err: err })
return // Abort
}
// If there are no requests, abort
if (requests.length === 0) {
logger.info('No "%s" to make.', this.description)
return
}
// We want to group requests by destinations pod and endpoint
const requestsToMakeGrouped = this.buildRequestObjects(requests)
logger.info('Making "%s" to friends.', this.description)
const goodPods = []
const badPods = []
eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
const requestToMake = requestsToMakeGrouped[hashKey]
const toPod = requestToMake.toPod
this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => {
if (err) {
badPods.push(requestToMake.toPod.id)
return callbackEach()
}
logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
goodPods.push(requestToMake.toPod.id)
// Remove the pod id of these request ids
this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
this.afterRequestHook()
})
}, () => {
// All the requests were made, we update the pods score
db.Pod.updatePodsScore(goodPods, badPods)
this.afterRequestsHook()
})
})
}
protected afterRequestHook () {
// Nothing to do, let children reimplement it
}
protected afterRequestsHook () {
// Nothing to do, let children reimplement it
}
}
// ---------------------------------------------------------------------------
export {
BaseRequestScheduler
}
|