]>
Commit | Line | Data |
---|---|---|
1 | import { isEmpty } from 'lodash' | |
2 | import * as Promise from 'bluebird' | |
3 | ||
4 | import { database as db } from '../../initializers/database' | |
5 | import { logger, makeSecureRequest } from '../../helpers' | |
6 | import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' | |
7 | import { | |
8 | API_VERSION, | |
9 | REQUESTS_IN_PARALLEL, | |
10 | REQUESTS_INTERVAL | |
11 | } from '../../initializers' | |
12 | ||
13 | interface RequestsObjects<U> { | |
14 | [ id: string ]: { | |
15 | toPod: PodInstance | |
16 | endpoint: string | |
17 | ids: number[] // ids | |
18 | datas: U[] | |
19 | } | |
20 | } | |
21 | ||
22 | abstract class AbstractRequestScheduler <T> { | |
23 | requestInterval: number | |
24 | limitPods: number | |
25 | limitPerPod: number | |
26 | ||
27 | protected lastRequestTimestamp: number | |
28 | protected timer: NodeJS.Timer | |
29 | protected description: string | |
30 | ||
31 | constructor () { | |
32 | this.lastRequestTimestamp = 0 | |
33 | this.timer = null | |
34 | this.requestInterval = REQUESTS_INTERVAL | |
35 | } | |
36 | ||
37 | abstract getRequestModel (): AbstractRequestClass<T> | |
38 | abstract getRequestToPodModel (): AbstractRequestToPodClass | |
39 | abstract buildRequestsObjects (requestsGrouped: T): RequestsObjects<any> | |
40 | ||
41 | activate () { | |
42 | logger.info('Requests scheduler activated.') | |
43 | this.lastRequestTimestamp = Date.now() | |
44 | ||
45 | this.timer = setInterval(() => { | |
46 | this.lastRequestTimestamp = Date.now() | |
47 | this.makeRequests() | |
48 | }, this.requestInterval) | |
49 | } | |
50 | ||
51 | deactivate () { | |
52 | logger.info('Requests scheduler deactivated.') | |
53 | clearInterval(this.timer) | |
54 | this.timer = null | |
55 | } | |
56 | ||
57 | forceSend () { | |
58 | logger.info('Force requests scheduler sending.') | |
59 | this.makeRequests() | |
60 | } | |
61 | ||
62 | remainingMilliSeconds () { | |
63 | if (this.timer === null) return -1 | |
64 | ||
65 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | |
66 | } | |
67 | ||
68 | remainingRequestsCount () { | |
69 | return this.getRequestModel().countTotalRequests() | |
70 | } | |
71 | ||
72 | flush () { | |
73 | return this.getRequestModel().removeAll() | |
74 | } | |
75 | ||
76 | // --------------------------------------------------------------------------- | |
77 | ||
78 | // Make a requests to friends of a certain type | |
79 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { | |
80 | const params = { | |
81 | toPod: toPod, | |
82 | method: 'POST' as 'POST', | |
83 | path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, | |
84 | data: requestsToMake // Requests we need to make | |
85 | } | |
86 | ||
87 | // Make multiple retry requests to all of pods | |
88 | // The function fire some useful callbacks | |
89 | return makeSecureRequest(params) | |
90 | .then(({ response, body }) => { | |
91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { | |
92 | throw new Error('Status code not 20x : ' + response.statusCode) | |
93 | } | |
94 | }) | |
95 | .catch(err => { | |
96 | logger.error('Error sending secure request to %s pod.', toPod.host, err) | |
97 | ||
98 | throw err | |
99 | }) | |
100 | } | |
101 | ||
102 | // Make all the requests of the scheduler | |
103 | protected makeRequests () { | |
104 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) | |
105 | .then((requestsGrouped: T) => { | |
106 | // We want to group requests by destinations pod and endpoint | |
107 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | |
108 | ||
109 | // If there are no requests, abort | |
110 | if (isEmpty(requestsToMake) === true) { | |
111 | logger.info('No "%s" to make.', this.description) | |
112 | return { goodPods: [], badPods: [] } | |
113 | } | |
114 | ||
115 | logger.info('Making "%s" to friends.', this.description) | |
116 | ||
117 | const goodPods: number[] = [] | |
118 | const badPods: number[] = [] | |
119 | ||
120 | return Promise.map(Object.keys(requestsToMake), hashKey => { | |
121 | const requestToMake = requestsToMake[hashKey] | |
122 | const toPod: PodInstance = requestToMake.toPod | |
123 | ||
124 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) | |
125 | .then(() => { | |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | |
127 | goodPods.push(requestToMake.toPod.id) | |
128 | ||
129 | this.afterRequestHook() | |
130 | ||
131 | // Remove the pod id of these request ids | |
132 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) | |
133 | }) | |
134 | .catch(err => { | |
135 | badPods.push(requestToMake.toPod.id) | |
136 | logger.info('Cannot make request to %s.', toPod.host, err) | |
137 | }) | |
138 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) | |
139 | }) | |
140 | .then(({ goodPods, badPods }) => { | |
141 | this.afterRequestsHook() | |
142 | ||
143 | // All the requests were made, we update the pods score | |
144 | return db.Pod.updatePodsScore(goodPods, badPods) | |
145 | }) | |
146 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) | |
147 | } | |
148 | ||
149 | protected afterRequestHook () { | |
150 | // Nothing to do, let children reimplement it | |
151 | } | |
152 | ||
153 | protected afterRequestsHook () { | |
154 | // Nothing to do, let children reimplement it | |
155 | } | |
156 | } | |
157 | ||
158 | // --------------------------------------------------------------------------- | |
159 | ||
160 | export { | |
161 | AbstractRequestScheduler, | |
162 | RequestsObjects | |
163 | } |