diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-06-16 09:14:41 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-06-16 09:14:41 +0200 |
commit | 15a302943d84bc0978b84fe33110c4daa451d311 (patch) | |
tree | f4c4f626d94fced1c398d7fa9644c6d8fb9d5871 /server/lib/request/abstract-request-scheduler.ts | |
parent | ed5f91633d96436408c763bb96d2cc463cb7dc00 (diff) | |
download | PeerTube-15a302943d84bc0978b84fe33110c4daa451d311.tar.gz PeerTube-15a302943d84bc0978b84fe33110c4daa451d311.tar.zst PeerTube-15a302943d84bc0978b84fe33110c4daa451d311.zip |
BaseRequestScheduler -> AbstractRequestScheduler
Diffstat (limited to 'server/lib/request/abstract-request-scheduler.ts')
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts new file mode 100644 index 000000000..e81ab9c36 --- /dev/null +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -0,0 +1,156 @@ | |||
1 | import * as eachLimit from 'async/eachLimit' | ||
2 | |||
3 | import { database as db } from '../../initializers/database' | ||
4 | import { logger, makeSecureRequest } from '../../helpers' | ||
5 | import { PodInstance } from '../../models' | ||
6 | import { | ||
7 | API_VERSION, | ||
8 | REQUESTS_IN_PARALLEL, | ||
9 | REQUESTS_INTERVAL | ||
10 | } from '../../initializers' | ||
11 | |||
12 | abstract class AbstractRequestScheduler { | ||
13 | requestInterval: number | ||
14 | limitPods: number | ||
15 | limitPerPod: number | ||
16 | |||
17 | protected lastRequestTimestamp: number | ||
18 | protected timer: NodeJS.Timer | ||
19 | protected description: string | ||
20 | |||
21 | constructor () { | ||
22 | this.lastRequestTimestamp = 0 | ||
23 | this.timer = null | ||
24 | this.requestInterval = REQUESTS_INTERVAL | ||
25 | } | ||
26 | |||
27 | abstract getRequestModel () | ||
28 | abstract getRequestToPodModel () | ||
29 | abstract buildRequestObjects (requests: any) | ||
30 | |||
31 | activate () { | ||
32 | logger.info('Requests scheduler activated.') | ||
33 | this.lastRequestTimestamp = Date.now() | ||
34 | |||
35 | this.timer = setInterval(() => { | ||
36 | this.lastRequestTimestamp = Date.now() | ||
37 | this.makeRequests() | ||
38 | }, this.requestInterval) | ||
39 | } | ||
40 | |||
41 | deactivate () { | ||
42 | logger.info('Requests scheduler deactivated.') | ||
43 | clearInterval(this.timer) | ||
44 | this.timer = null | ||
45 | } | ||
46 | |||
47 | forceSend () { | ||
48 | logger.info('Force requests scheduler sending.') | ||
49 | this.makeRequests() | ||
50 | } | ||
51 | |||
52 | remainingMilliSeconds () { | ||
53 | if (this.timer === null) return -1 | ||
54 | |||
55 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
56 | } | ||
57 | |||
58 | remainingRequestsCount (callback: (err: Error, total: number) => void) { | ||
59 | return this.getRequestModel().countTotalRequests(callback) | ||
60 | } | ||
61 | |||
62 | flush (callback: (err: Error) => void) { | ||
63 | this.getRequestModel().removeAll(callback) | ||
64 | } | ||
65 | |||
66 | // --------------------------------------------------------------------------- | ||
67 | |||
68 | // Make a requests to friends of a certain type | ||
69 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { | ||
70 | if (!callback) callback = function () { /* empty */ } | ||
71 | |||
72 | const params = { | ||
73 | toPod: toPod, | ||
74 | sign: true, // Prove our identity | ||
75 | method: 'POST' as 'POST', | ||
76 | path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, | ||
77 | data: requestsToMake // Requests we need to make | ||
78 | } | ||
79 | |||
80 | // Make multiple retry requests to all of pods | ||
81 | // The function fire some useful callbacks | ||
82 | makeSecureRequest(params, (err, res) => { | ||
83 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
84 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
85 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
86 | |||
87 | return callback(err) | ||
88 | } | ||
89 | |||
90 | return callback(null) | ||
91 | }) | ||
92 | } | ||
93 | |||
94 | // Make all the requests of the scheduler | ||
95 | protected makeRequests () { | ||
96 | this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { | ||
97 | if (err) { | ||
98 | logger.error('Cannot get the list of "%s".', this.description, { err: err }) | ||
99 | return // Abort | ||
100 | } | ||
101 | |||
102 | // If there are no requests, abort | ||
103 | if (requests.length === 0) { | ||
104 | logger.info('No "%s" to make.', this.description) | ||
105 | return | ||
106 | } | ||
107 | |||
108 | // We want to group requests by destinations pod and endpoint | ||
109 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
110 | |||
111 | logger.info('Making "%s" to friends.', this.description) | ||
112 | |||
113 | const goodPods = [] | ||
114 | const badPods = [] | ||
115 | |||
116 | eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
117 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
118 | const toPod = requestToMake.toPod | ||
119 | |||
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { | ||
121 | if (err) { | ||
122 | badPods.push(requestToMake.toPod.id) | ||
123 | return callbackEach() | ||
124 | } | ||
125 | |||
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
127 | goodPods.push(requestToMake.toPod.id) | ||
128 | |||
129 | // Remove the pod id of these request ids | ||
130 | this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
131 | |||
132 | this.afterRequestHook() | ||
133 | }) | ||
134 | }, () => { | ||
135 | // All the requests were made, we update the pods score | ||
136 | db.Pod.updatePodsScore(goodPods, badPods) | ||
137 | |||
138 | this.afterRequestsHook() | ||
139 | }) | ||
140 | }) | ||
141 | } | ||
142 | |||
143 | protected afterRequestHook () { | ||
144 | // Nothing to do, let children reimplement it | ||
145 | } | ||
146 | |||
147 | protected afterRequestsHook () { | ||
148 | // Nothing to do, let children reimplement it | ||
149 | } | ||
150 | } | ||
151 | |||
152 | // --------------------------------------------------------------------------- | ||
153 | |||
154 | export { | ||
155 | AbstractRequestScheduler | ||
156 | } | ||