]>
Commit | Line | Data |
---|---|---|
1 | import { isEmpty } from 'lodash' | |
2 | import * as Promise from 'bluebird' | |
3 | ||
4 | import { database as db } from '../../initializers/database' | |
5 | import { logger, makeSecureRequest } from '../../helpers' | |
6 | import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' | |
7 | import { | |
8 | API_VERSION, | |
9 | REQUESTS_IN_PARALLEL, | |
10 | REQUESTS_INTERVAL | |
11 | } from '../../initializers' | |
12 | ||
13 | abstract class AbstractRequestScheduler <T> { | |
14 | requestInterval: number | |
15 | limitPods: number | |
16 | limitPerPod: number | |
17 | ||
18 | protected lastRequestTimestamp: number | |
19 | protected timer: NodeJS.Timer | |
20 | protected description: string | |
21 | ||
22 | constructor () { | |
23 | this.lastRequestTimestamp = 0 | |
24 | this.timer = null | |
25 | this.requestInterval = REQUESTS_INTERVAL | |
26 | } | |
27 | ||
28 | abstract getRequestModel (): AbstractRequestClass<T> | |
29 | abstract getRequestToPodModel (): AbstractRequestToPodClass | |
30 | abstract buildRequestObjects (requestsGrouped: T): {} | |
31 | ||
32 | activate () { | |
33 | logger.info('Requests scheduler activated.') | |
34 | this.lastRequestTimestamp = Date.now() | |
35 | ||
36 | this.timer = setInterval(() => { | |
37 | this.lastRequestTimestamp = Date.now() | |
38 | this.makeRequests() | |
39 | }, this.requestInterval) | |
40 | } | |
41 | ||
42 | deactivate () { | |
43 | logger.info('Requests scheduler deactivated.') | |
44 | clearInterval(this.timer) | |
45 | this.timer = null | |
46 | } | |
47 | ||
48 | forceSend () { | |
49 | logger.info('Force requests scheduler sending.') | |
50 | this.makeRequests() | |
51 | } | |
52 | ||
53 | remainingMilliSeconds () { | |
54 | if (this.timer === null) return -1 | |
55 | ||
56 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | |
57 | } | |
58 | ||
59 | remainingRequestsCount () { | |
60 | return this.getRequestModel().countTotalRequests() | |
61 | } | |
62 | ||
63 | flush () { | |
64 | return this.getRequestModel().removeAll() | |
65 | } | |
66 | ||
67 | // --------------------------------------------------------------------------- | |
68 | ||
69 | // Make a requests to friends of a certain type | |
70 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) { | |
71 | const params = { | |
72 | toPod: toPod, | |
73 | method: 'POST' as 'POST', | |
74 | path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, | |
75 | data: requestsToMake // Requests we need to make | |
76 | } | |
77 | ||
78 | // Make multiple retry requests to all of pods | |
79 | // The function fire some useful callbacks | |
80 | return makeSecureRequest(params) | |
81 | .then(({ response, body }) => { | |
82 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { | |
83 | throw new Error('Status code not 20x : ' + response.statusCode) | |
84 | } | |
85 | }) | |
86 | .catch(err => { | |
87 | logger.error('Error sending secure request to %s pod.', toPod.host, err) | |
88 | ||
89 | throw err | |
90 | }) | |
91 | } | |
92 | ||
93 | // Make all the requests of the scheduler | |
94 | protected makeRequests () { | |
95 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) | |
96 | .then((requestsGrouped: T) => { | |
97 | // We want to group requests by destinations pod and endpoint | |
98 | const requestsToMake = this.buildRequestObjects(requestsGrouped) | |
99 | ||
100 | // If there are no requests, abort | |
101 | if (isEmpty(requestsToMake) === true) { | |
102 | logger.info('No "%s" to make.', this.description) | |
103 | return { goodPods: [], badPods: [] } | |
104 | } | |
105 | ||
106 | logger.info('Making "%s" to friends.', this.description) | |
107 | ||
108 | const goodPods = [] | |
109 | const badPods = [] | |
110 | ||
111 | return Promise.map(Object.keys(requestsToMake), hashKey => { | |
112 | const requestToMake = requestsToMake[hashKey] | |
113 | const toPod: PodInstance = requestToMake.toPod | |
114 | ||
115 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) | |
116 | .then(() => { | |
117 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | |
118 | goodPods.push(requestToMake.toPod.id) | |
119 | ||
120 | this.afterRequestHook() | |
121 | ||
122 | // Remove the pod id of these request ids | |
123 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) | |
124 | }) | |
125 | .catch(err => { | |
126 | badPods.push(requestToMake.toPod.id) | |
127 | logger.info('Cannot make request to %s.', toPod.host, err) | |
128 | }) | |
129 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) | |
130 | }) | |
131 | .then(({ goodPods, badPods }) => { | |
132 | this.afterRequestsHook() | |
133 | ||
134 | // All the requests were made, we update the pods score | |
135 | return db.Pod.updatePodsScore(goodPods, badPods) | |
136 | }) | |
137 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) | |
138 | } | |
139 | ||
140 | protected afterRequestHook () { | |
141 | // Nothing to do, let children reimplement it | |
142 | } | |
143 | ||
144 | protected afterRequestsHook () { | |
145 | // Nothing to do, let children reimplement it | |
146 | } | |
147 | } | |
148 | ||
149 | // --------------------------------------------------------------------------- | |
150 | ||
151 | export { | |
152 | AbstractRequestScheduler | |
153 | } |