diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-02-18 10:29:36 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-02-18 10:29:36 +0100 |
commit | c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de (patch) | |
tree | 5d721432dcc25f43ebd72b12f675d4e208faf616 /server/lib/request-scheduler.js | |
parent | 361b7df2a257d1c44ec1d79128a9065b563090d8 (diff) | |
download | PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.gz PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.zst PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.zip |
Server: use a request scheduler object instance for friends
communication
Diffstat (limited to 'server/lib/request-scheduler.js')
-rw-r--r-- | server/lib/request-scheduler.js | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js new file mode 100644 index 000000000..c8bc4af28 --- /dev/null +++ b/server/lib/request-scheduler.js | |||
@@ -0,0 +1,202 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../initializers/constants') | ||
6 | const db = require('../initializers/database') | ||
7 | const logger = require('../helpers/logger') | ||
8 | const requests = require('../helpers/requests') | ||
9 | |||
10 | module.exports = class RequestScheduler { | ||
11 | |||
12 | constructor (name) { | ||
13 | this.name = name | ||
14 | |||
15 | this.lastRequestTimestamp = 0 | ||
16 | this.timer = null | ||
17 | } | ||
18 | |||
19 | activate () { | ||
20 | logger.info('Requests scheduler activated.') | ||
21 | this.lastRequestTimestamp = Date.now() | ||
22 | |||
23 | this.timer = setInterval(() => { | ||
24 | this.lastRequestTimestamp = Date.now() | ||
25 | this.makeRequests() | ||
26 | }, constants.REQUESTS_INTERVAL) | ||
27 | } | ||
28 | |||
29 | deactivate () { | ||
30 | logger.info('Requests scheduler deactivated.') | ||
31 | clearInterval(this.timer) | ||
32 | this.timer = null | ||
33 | } | ||
34 | |||
35 | forceSend () { | ||
36 | logger.info('Force requests scheduler sending.') | ||
37 | this.makeRequests() | ||
38 | } | ||
39 | |||
40 | remainingMilliSeconds () { | ||
41 | if (this.timer === null) return -1 | ||
42 | |||
43 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
44 | } | ||
45 | |||
46 | // { type, endpoint, data, toIds, transaction } | ||
47 | createRequest (options, callback) { | ||
48 | const type = options.type | ||
49 | const endpoint = options.endpoint | ||
50 | const data = options.data | ||
51 | const toIds = options.toIds | ||
52 | const transaction = options.transaction | ||
53 | |||
54 | const pods = [] | ||
55 | |||
56 | // If there are no destination pods abort | ||
57 | if (toIds.length === 0) return callback(null) | ||
58 | |||
59 | toIds.forEach(toPod => { | ||
60 | pods.push(db.Pod.build({ id: toPod })) | ||
61 | }) | ||
62 | |||
63 | const createQuery = { | ||
64 | endpoint, | ||
65 | request: { | ||
66 | type: type, | ||
67 | data: data | ||
68 | } | ||
69 | } | ||
70 | |||
71 | const dbRequestOptions = { | ||
72 | transaction | ||
73 | } | ||
74 | |||
75 | return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { | ||
76 | if (err) return callback(err) | ||
77 | |||
78 | return request.setPods(pods, dbRequestOptions).asCallback(callback) | ||
79 | }) | ||
80 | } | ||
81 | |||
82 | // --------------------------------------------------------------------------- | ||
83 | |||
84 | // Make all the requests of the scheduler | ||
85 | makeRequests () { | ||
86 | // We limit the size of the requests | ||
87 | // We don't want to stuck with the same failing requests so we get a random list | ||
88 | db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { | ||
89 | if (err) { | ||
90 | logger.error('Cannot get the list of requests.', { err: err }) | ||
91 | return // Abort | ||
92 | } | ||
93 | |||
94 | // If there are no requests, abort | ||
95 | if (requests.length === 0) { | ||
96 | logger.info('No requests to make.') | ||
97 | return | ||
98 | } | ||
99 | |||
100 | // We want to group requests by destinations pod and endpoint | ||
101 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
102 | |||
103 | logger.info('Making requests to friends.') | ||
104 | |||
105 | const goodPods = [] | ||
106 | const badPods = [] | ||
107 | |||
108 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
109 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
110 | const toPod = requestToMake.toPod | ||
111 | |||
112 | // Maybe the pod is not our friend anymore so simply remove it | ||
113 | if (!toPod) { | ||
114 | const requestIdsToDelete = requestToMake.ids | ||
115 | |||
116 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
117 | return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
118 | } | ||
119 | |||
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { | ||
121 | if (success === false) { | ||
122 | badPods.push(requestToMake.toPod.id) | ||
123 | return callbackEach() | ||
124 | } | ||
125 | |||
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
127 | goodPods.push(requestToMake.toPod.id) | ||
128 | |||
129 | // Remove the pod id of these request ids | ||
130 | db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
131 | }) | ||
132 | }, () => { | ||
133 | // All the requests were made, we update the pods score | ||
134 | db.Request.updatePodsScore(goodPods, badPods) | ||
135 | // Flush requests with no pod | ||
136 | db.Request.removeWithEmptyTo(err => { | ||
137 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
138 | }) | ||
139 | }) | ||
140 | }) | ||
141 | } | ||
142 | |||
143 | // Make a requests to friends of a certain type | ||
144 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
145 | if (!callback) callback = function () {} | ||
146 | |||
147 | const params = { | ||
148 | toPod: toPod, | ||
149 | sign: true, // Prove our identity | ||
150 | method: 'POST', | ||
151 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
152 | data: requestsToMake // Requests we need to make | ||
153 | } | ||
154 | |||
155 | // Make multiple retry requests to all of pods | ||
156 | // The function fire some useful callbacks | ||
157 | requests.makeSecureRequest(params, (err, res) => { | ||
158 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
159 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
160 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
161 | |||
162 | return callback(false) | ||
163 | } | ||
164 | |||
165 | return callback(true) | ||
166 | }) | ||
167 | } | ||
168 | |||
169 | buildRequestObjects (requests) { | ||
170 | const requestsToMakeGrouped = {} | ||
171 | |||
172 | Object.keys(requests).forEach(toPodId => { | ||
173 | requests[toPodId].forEach(data => { | ||
174 | const request = data.request | ||
175 | const pod = data.pod | ||
176 | const hashKey = toPodId + request.endpoint | ||
177 | |||
178 | if (!requestsToMakeGrouped[hashKey]) { | ||
179 | requestsToMakeGrouped[hashKey] = { | ||
180 | toPod: pod, | ||
181 | endpoint: request.endpoint, | ||
182 | ids: [], // request ids, to delete them from the DB in the future | ||
183 | datas: [] // requests data, | ||
184 | } | ||
185 | } | ||
186 | |||
187 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
188 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
189 | }) | ||
190 | }) | ||
191 | |||
192 | return requestsToMakeGrouped | ||
193 | } | ||
194 | |||
195 | flush (callback) { | ||
196 | db.Request.removeAll(err => { | ||
197 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
198 | |||
199 | return callback(err) | ||
200 | }) | ||
201 | } | ||
202 | } | ||