]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/models/request.js
882f747b76f76eaaa4e984f3f86a87c05f733c26
[github/Chocobozzz/PeerTube.git] / server / models / request.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const waterfall = require('async/waterfall')
6
7 const constants = require('../initializers/constants')
8 const logger = require('../helpers/logger')
9 const requests = require('../helpers/requests')
10
11 let timer = null
12 let lastRequestTimestamp = 0
13
14 // ---------------------------------------------------------------------------
15
16 module.exports = function (sequelize, DataTypes) {
17 const Request = sequelize.define('Request',
18 {
19 request: {
20 type: DataTypes.JSON
21 },
22 endpoint: {
23 // TODO: enum?
24 type: DataTypes.STRING
25 }
26 },
27 {
28 classMethods: {
29 associate,
30
31 activate,
32 countTotalRequests,
33 deactivate,
34 flush,
35 forceSend,
36 remainingMilliSeconds
37 }
38 }
39 )
40
41 return Request
42 }
43
44 // ------------------------------ STATICS ------------------------------
45
46 function associate (models) {
47 this.belongsToMany(models.Pod, {
48 foreignKey: {
49 name: 'requestId',
50 allowNull: false
51 },
52 through: models.RequestToPod,
53 onDelete: 'CASCADE'
54 })
55 }
56
57 function activate () {
58 logger.info('Requests scheduler activated.')
59 lastRequestTimestamp = Date.now()
60
61 const self = this
62 timer = setInterval(function () {
63 lastRequestTimestamp = Date.now()
64 makeRequests.call(self)
65 }, constants.REQUESTS_INTERVAL)
66 }
67
68 function countTotalRequests (callback) {
69 const query = {
70 include: [ this.sequelize.models.Pod ]
71 }
72
73 return this.count(query).asCallback(callback)
74 }
75
76 function deactivate () {
77 logger.info('Requests scheduler deactivated.')
78 clearInterval(timer)
79 timer = null
80 }
81
82 function flush () {
83 removeAll.call(this, function (err) {
84 if (err) logger.error('Cannot flush the requests.', { error: err })
85 })
86 }
87
88 function forceSend () {
89 logger.info('Force requests scheduler sending.')
90 makeRequests.call(this)
91 }
92
93 function remainingMilliSeconds () {
94 if (timer === null) return -1
95
96 return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp)
97 }
98
99 // ---------------------------------------------------------------------------
100
101 // Make a requests to friends of a certain type
102 function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
103 if (!callback) callback = function () {}
104
105 const params = {
106 toPod: toPod,
107 sign: true, // Prove our identity
108 method: 'POST',
109 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
110 data: requestsToMake // Requests we need to make
111 }
112
113 // Make multiple retry requests to all of pods
114 // The function fire some useful callbacks
115 requests.makeSecureRequest(params, function (err, res) {
116 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
117 logger.error(
118 'Error sending secure request to %s pod.',
119 toPod.host,
120 {
121 error: err || new Error('Status code not 20x : ' + res.statusCode)
122 }
123 )
124
125 return callback(false)
126 }
127
128 return callback(true)
129 })
130 }
131
132 // Make all the requests of the scheduler
133 function makeRequests () {
134 const self = this
135 const RequestToPod = this.sequelize.models.RequestToPod
136
137 // We limit the size of the requests (REQUESTS_LIMIT)
138 // We don't want to stuck with the same failing requests so we get a random list
139 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) {
140 if (err) {
141 logger.error('Cannot get the list of requests.', { err: err })
142 return // Abort
143 }
144
145 // If there are no requests, abort
146 if (requests.length === 0) {
147 logger.info('No requests to make.')
148 return
149 }
150
151 logger.info('Making requests to friends.')
152
153 // We want to group requests by destinations pod and endpoint
154 const requestsToMakeGrouped = {}
155
156 requests.forEach(function (request) {
157 request.Pods.forEach(function (toPod) {
158 const hashKey = toPod.id + request.endpoint
159 if (!requestsToMakeGrouped[hashKey]) {
160 requestsToMakeGrouped[hashKey] = {
161 toPodId: toPod.id,
162 endpoint: request.endpoint,
163 ids: [], // request ids, to delete them from the DB in the future
164 datas: [] // requests data,
165 }
166 }
167
168 requestsToMakeGrouped[hashKey].ids.push(request.id)
169 requestsToMakeGrouped[hashKey].datas.push(request.request)
170 })
171 })
172
173 const goodPods = []
174 const badPods = []
175
176 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
177 const requestToMake = requestsToMakeGrouped[hashKey]
178
179 // FIXME: SQL request inside a loop :/
180 self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
181 if (err) {
182 logger.error('Error finding pod by id.', { err: err })
183 return callbackEach()
184 }
185
186 // Maybe the pod is not our friend anymore so simply remove it
187 if (!toPod) {
188 const requestIdsToDelete = requestToMake.ids
189
190 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
191 RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
192 return callbackEach()
193 }
194
195 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
196 if (success === true) {
197 logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids })
198
199 goodPods.push(requestToMake.toPodId)
200
201 // Remove the pod id of these request ids
202 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
203 } else {
204 badPods.push(requestToMake.toPodId)
205 callbackEach()
206 }
207 })
208 })
209 }, function () {
210 // All the requests were made, we update the pods score
211 updatePodsScore.call(self, goodPods, badPods)
212 // Flush requests with no pod
213 removeWithEmptyTo.call(self, function (err) {
214 if (err) logger.error('Error when removing requests with no pods.', { error: err })
215 })
216 })
217 })
218 }
219
220 // Remove pods with a score of 0 (too many requests where they were unreachable)
221 function removeBadPods () {
222 const self = this
223
224 waterfall([
225 function findBadPods (callback) {
226 self.sequelize.models.Pod.listBadPods(function (err, pods) {
227 if (err) {
228 logger.error('Cannot find bad pods.', { error: err })
229 return callback(err)
230 }
231
232 return callback(null, pods)
233 })
234 },
235
236 function removeTheseBadPods (pods, callback) {
237 each(pods, function (pod, callbackEach) {
238 pod.destroy().asCallback(callbackEach)
239 }, function (err) {
240 return callback(err, pods.length)
241 })
242 }
243 ], function (err, numberOfPodsRemoved) {
244 if (err) {
245 logger.error('Cannot remove bad pods.', { error: err })
246 } else if (numberOfPodsRemoved) {
247 logger.info('Removed %d pods.', numberOfPodsRemoved)
248 } else {
249 logger.info('No need to remove bad pods.')
250 }
251 })
252 }
253
254 function updatePodsScore (goodPods, badPods) {
255 const self = this
256 const Pod = this.sequelize.models.Pod
257
258 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
259
260 if (goodPods.length !== 0) {
261 Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
262 if (err) logger.error('Cannot increment scores of good pods.')
263 })
264 }
265
266 if (badPods.length !== 0) {
267 Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
268 if (err) logger.error('Cannot decrement scores of bad pods.')
269 removeBadPods.call(self)
270 })
271 }
272 }
273
274 function listWithLimitAndRandom (limit, callback) {
275 const self = this
276
277 self.count().asCallback(function (err, count) {
278 if (err) return callback(err)
279
280 // Optimization...
281 if (count === 0) return callback(null, [])
282
283 let start = Math.floor(Math.random() * count) - limit
284 if (start < 0) start = 0
285
286 const query = {
287 order: [
288 [ 'id', 'ASC' ]
289 ],
290 offset: start,
291 limit: limit,
292 include: [ this.sequelize.models.Pod ]
293 }
294
295 self.findAll(query).asCallback(callback)
296 })
297 }
298
299 function removeAll (callback) {
300 // Delete all requests
301 this.destroy({ truncate: true }).asCallback(callback)
302 }
303
304 function removeWithEmptyTo (callback) {
305 if (!callback) callback = function () {}
306
307 const query = {
308 where: {
309 id: {
310 $notIn: [
311 this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
312 ]
313 }
314 }
315 }
316
317 this.destroy(query).asCallback(callback)
318 }