]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/models/request.js
First version with PostgreSQL
[github/Chocobozzz/PeerTube.git] / server / models / request.js
CommitLineData
9f10b292
C
1'use strict'
2
1a42c9e2
C
3const each = require('async/each')
4const eachLimit = require('async/eachLimit')
1a42c9e2 5const waterfall = require('async/waterfall')
9f10b292 6
f0f5567b
C
7const constants = require('../initializers/constants')
8const logger = require('../helpers/logger')
f0f5567b 9const requests = require('../helpers/requests')
aaf61f38 10
f0f5567b 11let timer = null
5abeec31 12let lastRequestTimestamp = 0
9f10b292 13
00057e85 14// ---------------------------------------------------------------------------
9f10b292 15
feb4bdfd
C
16module.exports = function (sequelize, DataTypes) {
17 const Request = sequelize.define('Request',
18 {
19 request: {
20 type: DataTypes.JSON
21 },
22 endpoint: {
23 // TODO: enum?
24 type: DataTypes.STRING
25 }
26 },
4b08096b 27 {
feb4bdfd
C
28 classMethods: {
29 associate,
30
31 activate,
32 countTotalRequests,
33 deactivate,
34 flush,
35 forceSend,
36 remainingMilliSeconds
37 }
4b08096b 38 }
feb4bdfd 39 )
528a9efa 40
feb4bdfd
C
41 return Request
42}
00057e85
C
43
44// ------------------------------ STATICS ------------------------------
45
feb4bdfd
C
46function associate (models) {
47 this.belongsToMany(models.Pod, {
48 foreignKey: {
49 name: 'requestId',
50 allowNull: false
51 },
52 through: models.RequestToPod,
53 onDelete: 'CASCADE'
54 })
55}
56
00057e85
C
57function activate () {
58 logger.info('Requests scheduler activated.')
5abeec31
C
59 lastRequestTimestamp = Date.now()
60
61 const self = this
62 timer = setInterval(function () {
63 lastRequestTimestamp = Date.now()
64 makeRequests.call(self)
65 }, constants.REQUESTS_INTERVAL)
9f10b292 66}
1fe5076f 67
feb4bdfd
C
68function countTotalRequests (callback) {
69 const query = {
70 include: [ this.sequelize.models.Pod ]
71 }
72
73 return this.count(query).asCallback(callback)
74}
75
9f10b292 76function deactivate () {
e3647ae2 77 logger.info('Requests scheduler deactivated.')
9f10b292 78 clearInterval(timer)
5abeec31 79 timer = null
9f10b292 80}
1fe5076f 81
528a9efa 82function flush () {
00057e85
C
83 removeAll.call(this, function (err) {
84 if (err) logger.error('Cannot flush the requests.', { error: err })
528a9efa
C
85 })
86}
87
9f10b292 88function forceSend () {
e3647ae2 89 logger.info('Force requests scheduler sending.')
00057e85 90 makeRequests.call(this)
9f10b292 91}
c45f7f84 92
5abeec31
C
93function remainingMilliSeconds () {
94 if (timer === null) return -1
95
96 return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp)
97}
98
9f10b292 99// ---------------------------------------------------------------------------
c45f7f84 100
8c255eb5 101// Make a requests to friends of a certain type
4b08096b 102function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
9f10b292 103 if (!callback) callback = function () {}
c45f7f84 104
528a9efa
C
105 const params = {
106 toPod: toPod,
38d78e5b 107 sign: true, // Prove our identity
528a9efa 108 method: 'POST',
4b08096b 109 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
528a9efa
C
110 data: requestsToMake // Requests we need to make
111 }
112
113 // Make multiple retry requests to all of pods
114 // The function fire some useful callbacks
115 requests.makeSecureRequest(params, function (err, res) {
116 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
b9135905
C
117 logger.error(
118 'Error sending secure request to %s pod.',
49abbbbe 119 toPod.host,
b9135905 120 {
7c34bc64 121 error: err || new Error('Status code not 20x : ' + res.statusCode)
b9135905
C
122 }
123 )
528a9efa
C
124
125 return callback(false)
9f10b292 126 }
c45f7f84 127
528a9efa 128 return callback(true)
9f10b292
C
129 })
130}
131
8c255eb5 132// Make all the requests of the scheduler
e3647ae2 133function makeRequests () {
00057e85 134 const self = this
feb4bdfd 135 const RequestToPod = this.sequelize.models.RequestToPod
00057e85 136
43666d61
C
137 // We limit the size of the requests (REQUESTS_LIMIT)
138 // We don't want to stuck with the same failing requests so we get a random list
139 listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) {
9f10b292 140 if (err) {
e3647ae2 141 logger.error('Cannot get the list of requests.', { err: err })
9f10b292
C
142 return // Abort
143 }
144
8c255eb5
C
145 // If there are no requests, abort
146 if (requests.length === 0) {
147 logger.info('No requests to make.')
148 return
149 }
9f10b292 150
8c255eb5
C
151 logger.info('Making requests to friends.')
152
4b08096b
C
153 // We want to group requests by destinations pod and endpoint
154 const requestsToMakeGrouped = {}
9f10b292 155
feb4bdfd
C
156 requests.forEach(function (request) {
157 request.Pods.forEach(function (toPod) {
158 const hashKey = toPod.id + request.endpoint
4b08096b
C
159 if (!requestsToMakeGrouped[hashKey]) {
160 requestsToMakeGrouped[hashKey] = {
feb4bdfd
C
161 toPodId: toPod.id,
162 endpoint: request.endpoint,
163 ids: [], // request ids, to delete them from the DB in the future
4b08096b 164 datas: [] // requests data,
528a9efa
C
165 }
166 }
167
feb4bdfd
C
168 requestsToMakeGrouped[hashKey].ids.push(request.id)
169 requestsToMakeGrouped[hashKey].datas.push(request.request)
528a9efa 170 })
3c8ee69f 171 })
8d6ae227 172
528a9efa
C
173 const goodPods = []
174 const badPods = []
175
4b08096b
C
176 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
177 const requestToMake = requestsToMakeGrouped[hashKey]
528a9efa 178
feb4bdfd
C
179 // FIXME: SQL request inside a loop :/
180 self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
00057e85
C
181 if (err) {
182 logger.error('Error finding pod by id.', { err: err })
183 return callbackEach()
184 }
528a9efa 185
5abeec31 186 // Maybe the pod is not our friend anymore so simply remove it
528a9efa 187 if (!toPod) {
4b08096b
C
188 const requestIdsToDelete = requestToMake.ids
189
190 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
feb4bdfd 191 RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
528a9efa
C
192 return callbackEach()
193 }
194
4b08096b 195 makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
528a9efa 196 if (success === true) {
4b08096b 197 logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids })
3c8ee69f 198
4b08096b 199 goodPods.push(requestToMake.toPodId)
c2ee5ce8
C
200
201 // Remove the pod id of these request ids
feb4bdfd 202 RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
528a9efa 203 } else {
4b08096b 204 badPods.push(requestToMake.toPodId)
c2ee5ce8 205 callbackEach()
528a9efa 206 }
3c8ee69f 207 })
528a9efa
C
208 })
209 }, function () {
210 // All the requests were made, we update the pods score
feb4bdfd 211 updatePodsScore.call(self, goodPods, badPods)
528a9efa 212 // Flush requests with no pod
feb4bdfd
C
213 removeWithEmptyTo.call(self, function (err) {
214 if (err) logger.error('Error when removing requests with no pods.', { error: err })
215 })
528a9efa 216 })
9f10b292
C
217 })
218}
0b697522 219
8c255eb5 220// Remove pods with a score of 0 (too many requests where they were unreachable)
9f10b292 221function removeBadPods () {
feb4bdfd
C
222 const self = this
223
1a42c9e2 224 waterfall([
e856e334 225 function findBadPods (callback) {
feb4bdfd 226 self.sequelize.models.Pod.listBadPods(function (err, pods) {
e856e334
C
227 if (err) {
228 logger.error('Cannot find bad pods.', { error: err })
229 return callback(err)
230 }
8d6ae227 231
e856e334
C
232 return callback(null, pods)
233 })
234 },
8d6ae227 235
80a6c9e7 236 function removeTheseBadPods (pods, callback) {
1a42c9e2 237 each(pods, function (pod, callbackEach) {
feb4bdfd 238 pod.destroy().asCallback(callbackEach)
a3ee6fa2 239 }, function (err) {
80a6c9e7 240 return callback(err, pods.length)
a3ee6fa2 241 })
e856e334 242 }
a3ee6fa2 243 ], function (err, numberOfPodsRemoved) {
e856e334
C
244 if (err) {
245 logger.error('Cannot remove bad pods.', { error: err })
a3ee6fa2
C
246 } else if (numberOfPodsRemoved) {
247 logger.info('Removed %d pods.', numberOfPodsRemoved)
e856e334
C
248 } else {
249 logger.info('No need to remove bad pods.')
250 }
9f10b292
C
251 })
252}
0b697522 253
bc503c2a 254function updatePodsScore (goodPods, badPods) {
feb4bdfd
C
255 const self = this
256 const Pod = this.sequelize.models.Pod
257
bc503c2a 258 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
0b697522 259
feb4bdfd
C
260 if (goodPods.length !== 0) {
261 Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
262 if (err) logger.error('Cannot increment scores of good pods.')
263 })
264 }
8425cb89 265
feb4bdfd
C
266 if (badPods.length !== 0) {
267 Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
268 if (err) logger.error('Cannot decrement scores of bad pods.')
269 removeBadPods.call(self)
270 })
271 }
9f10b292 272}
00057e85 273
43666d61
C
274function listWithLimitAndRandom (limit, callback) {
275 const self = this
276
feb4bdfd 277 self.count().asCallback(function (err, count) {
43666d61
C
278 if (err) return callback(err)
279
feb4bdfd
C
280 // Optimization...
281 if (count === 0) return callback(null, [])
282
43666d61
C
283 let start = Math.floor(Math.random() * count) - limit
284 if (start < 0) start = 0
285
feb4bdfd
C
286 const query = {
287 order: [
288 [ 'id', 'ASC' ]
289 ],
290 offset: start,
291 limit: limit,
292 include: [ this.sequelize.models.Pod ]
293 }
294
295 self.findAll(query).asCallback(callback)
43666d61 296 })
00057e85
C
297}
298
299function removeAll (callback) {
feb4bdfd
C
300 // Delete all requests
301 this.destroy({ truncate: true }).asCallback(callback)
00057e85
C
302}
303
304function removeWithEmptyTo (callback) {
305 if (!callback) callback = function () {}
306
feb4bdfd
C
307 const query = {
308 where: {
309 id: {
310 $notIn: [
311 this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
312 ]
313 }
314 }
315 }
316
317 this.destroy(query).asCallback(callback)
00057e85 318}