diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2016-06-28 20:10:32 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2016-06-28 20:10:32 +0200 |
commit | 00057e85a703713a8f0d96e01c49978be0987eb2 (patch) | |
tree | 54735435062d2ededa72448de9fcaa677cf2c79c /server/models/request.js | |
parent | aaf61f3810e6d57c5130af959bd2860df32775e7 (diff) | |
download | PeerTube-00057e85a703713a8f0d96e01c49978be0987eb2.tar.gz PeerTube-00057e85a703713a8f0d96e01c49978be0987eb2.tar.zst PeerTube-00057e85a703713a8f0d96e01c49978be0987eb2.zip |
Request model refractoring -> use mongoose api
Diffstat (limited to 'server/models/request.js')
-rw-r--r-- | server/models/request.js | 280 |
1 files changed, 280 insertions, 0 deletions
diff --git a/server/models/request.js b/server/models/request.js new file mode 100644 index 000000000..2a407388a --- /dev/null +++ b/server/models/request.js | |||
@@ -0,0 +1,280 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const async = require('async') | ||
4 | const map = require('lodash/map') | ||
5 | const mongoose = require('mongoose') | ||
6 | |||
7 | const constants = require('../initializers/constants') | ||
8 | const logger = require('../helpers/logger') | ||
9 | const Pods = require('../models/pods') | ||
10 | const requests = require('../helpers/requests') | ||
11 | |||
12 | const Video = mongoose.model('Video') | ||
13 | |||
14 | let timer = null | ||
15 | |||
16 | // --------------------------------------------------------------------------- | ||
17 | |||
18 | const RequestSchema = mongoose.Schema({ | ||
19 | request: mongoose.Schema.Types.Mixed, | ||
20 | to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ] | ||
21 | }) | ||
22 | |||
23 | RequestSchema.statics = { | ||
24 | activate, | ||
25 | deactivate, | ||
26 | flush, | ||
27 | forceSend | ||
28 | } | ||
29 | |||
30 | RequestSchema.pre('save', function (next) { | ||
31 | const self = this | ||
32 | |||
33 | if (self.to.length === 0) { | ||
34 | Pods.listAllIds(function (err, podIds) { | ||
35 | if (err) return next(err) | ||
36 | |||
37 | // No friends | ||
38 | if (podIds.length === 0) return | ||
39 | |||
40 | self.to = podIds | ||
41 | return next() | ||
42 | }) | ||
43 | } else { | ||
44 | return next() | ||
45 | } | ||
46 | }) | ||
47 | |||
48 | mongoose.model('Request', RequestSchema) | ||
49 | |||
50 | // ------------------------------ STATICS ------------------------------ | ||
51 | |||
52 | function activate () { | ||
53 | logger.info('Requests scheduler activated.') | ||
54 | timer = setInterval(makeRequests.bind(this), constants.INTERVAL) | ||
55 | } | ||
56 | |||
57 | function deactivate () { | ||
58 | logger.info('Requests scheduler deactivated.') | ||
59 | clearInterval(timer) | ||
60 | } | ||
61 | |||
62 | function flush () { | ||
63 | removeAll.call(this, function (err) { | ||
64 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
65 | }) | ||
66 | } | ||
67 | |||
68 | function forceSend () { | ||
69 | logger.info('Force requests scheduler sending.') | ||
70 | makeRequests.call(this) | ||
71 | } | ||
72 | |||
73 | // --------------------------------------------------------------------------- | ||
74 | |||
75 | // Make a requests to friends of a certain type | ||
76 | function makeRequest (toPod, requestsToMake, callback) { | ||
77 | if (!callback) callback = function () {} | ||
78 | |||
79 | const params = { | ||
80 | toPod: toPod, | ||
81 | encrypt: true, // Security | ||
82 | sign: true, // To prove our identity | ||
83 | method: 'POST', | ||
84 | path: '/api/' + constants.API_VERSION + '/remote/videos', | ||
85 | data: requestsToMake // Requests we need to make | ||
86 | } | ||
87 | |||
88 | // Make multiple retry requests to all of pods | ||
89 | // The function fire some useful callbacks | ||
90 | requests.makeSecureRequest(params, function (err, res) { | ||
91 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
92 | logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) | ||
93 | |||
94 | return callback(false) | ||
95 | } | ||
96 | |||
97 | return callback(true) | ||
98 | }) | ||
99 | } | ||
100 | |||
101 | // Make all the requests of the scheduler | ||
102 | function makeRequests () { | ||
103 | const self = this | ||
104 | |||
105 | list.call(self, function (err, requests) { | ||
106 | if (err) { | ||
107 | logger.error('Cannot get the list of requests.', { err: err }) | ||
108 | return // Abort | ||
109 | } | ||
110 | |||
111 | // If there are no requests, abort | ||
112 | if (requests.length === 0) { | ||
113 | logger.info('No requests to make.') | ||
114 | return | ||
115 | } | ||
116 | |||
117 | logger.info('Making requests to friends.') | ||
118 | |||
119 | // Requests by pods id | ||
120 | const requestsToMake = {} | ||
121 | |||
122 | requests.forEach(function (poolRequest) { | ||
123 | poolRequest.to.forEach(function (toPodId) { | ||
124 | if (!requestsToMake[toPodId]) { | ||
125 | requestsToMake[toPodId] = { | ||
126 | ids: [], | ||
127 | datas: [] | ||
128 | } | ||
129 | } | ||
130 | |||
131 | requestsToMake[toPodId].ids.push(poolRequest._id) | ||
132 | requestsToMake[toPodId].datas.push(poolRequest.request) | ||
133 | }) | ||
134 | }) | ||
135 | |||
136 | const goodPods = [] | ||
137 | const badPods = [] | ||
138 | |||
139 | async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { | ||
140 | const requestToMake = requestsToMake[toPodId] | ||
141 | |||
142 | // FIXME: mongodb request inside a loop :/ | ||
143 | Pods.findById(toPodId, function (err, toPod) { | ||
144 | if (err) { | ||
145 | logger.error('Error finding pod by id.', { err: err }) | ||
146 | return callbackEach() | ||
147 | } | ||
148 | |||
149 | // Maybe the pod is not our friend anymore so simply remove them | ||
150 | if (!toPod) { | ||
151 | removePodOf.call(self, requestToMake.ids, toPodId) | ||
152 | return callbackEach() | ||
153 | } | ||
154 | |||
155 | makeRequest(toPod, requestToMake.datas, function (success) { | ||
156 | if (err) { | ||
157 | logger.error('Errors when sent request to %s.', toPod.url, { error: err }) | ||
158 | // Do not stop the process just for one error | ||
159 | return callbackEach() | ||
160 | } | ||
161 | |||
162 | if (success === true) { | ||
163 | logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) | ||
164 | |||
165 | // Remove the pod id of these request ids | ||
166 | removePodOf.call(self, requestToMake.ids, toPodId) | ||
167 | goodPods.push(toPodId) | ||
168 | } else { | ||
169 | badPods.push(toPodId) | ||
170 | } | ||
171 | |||
172 | callbackEach() | ||
173 | }) | ||
174 | }) | ||
175 | }, function () { | ||
176 | // All the requests were made, we update the pods score | ||
177 | updatePodsScore(goodPods, badPods) | ||
178 | // Flush requests with no pod | ||
179 | removeWithEmptyTo.call(self) | ||
180 | }) | ||
181 | }) | ||
182 | } | ||
183 | |||
184 | // Remove pods with a score of 0 (too many requests where they were unreachable) | ||
185 | function removeBadPods () { | ||
186 | async.waterfall([ | ||
187 | function findBadPods (callback) { | ||
188 | Pods.findBadPods(function (err, pods) { | ||
189 | if (err) { | ||
190 | logger.error('Cannot find bad pods.', { error: err }) | ||
191 | return callback(err) | ||
192 | } | ||
193 | |||
194 | return callback(null, pods) | ||
195 | }) | ||
196 | }, | ||
197 | |||
198 | function listVideosOfTheseBadPods (pods, callback) { | ||
199 | if (pods.length === 0) return callback(null) | ||
200 | |||
201 | const urls = map(pods, 'url') | ||
202 | const ids = map(pods, '_id') | ||
203 | |||
204 | Video.listByUrls(urls, function (err, videosList) { | ||
205 | if (err) { | ||
206 | logger.error('Cannot list videos urls.', { error: err, urls: urls }) | ||
207 | return callback(null, ids, []) | ||
208 | } | ||
209 | |||
210 | return callback(null, ids, videosList) | ||
211 | }) | ||
212 | }, | ||
213 | |||
214 | function removeVideosOfTheseBadPods (podIds, videosList, callback) { | ||
215 | // We don't have to remove pods, skip | ||
216 | if (typeof podIds === 'function') return podIds(null) | ||
217 | |||
218 | async.each(videosList, function (video, callbackEach) { | ||
219 | video.remove(callbackEach) | ||
220 | }, function (err) { | ||
221 | if (err) { | ||
222 | // Don't stop the process | ||
223 | logger.error('Error while removing videos of bad pods.', { error: err }) | ||
224 | return | ||
225 | } | ||
226 | |||
227 | return callback(null, podIds) | ||
228 | }) | ||
229 | }, | ||
230 | |||
231 | function removeBadPodsFromDB (podIds, callback) { | ||
232 | // We don't have to remove pods, skip | ||
233 | if (typeof podIds === 'function') return podIds(null) | ||
234 | |||
235 | Pods.removeAllByIds(podIds, callback) | ||
236 | } | ||
237 | ], function (err, removeResult) { | ||
238 | if (err) { | ||
239 | logger.error('Cannot remove bad pods.', { error: err }) | ||
240 | } else if (removeResult) { | ||
241 | const podsRemoved = removeResult.result.n | ||
242 | logger.info('Removed %d pods.', podsRemoved) | ||
243 | } else { | ||
244 | logger.info('No need to remove bad pods.') | ||
245 | } | ||
246 | }) | ||
247 | } | ||
248 | |||
249 | function updatePodsScore (goodPods, badPods) { | ||
250 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | ||
251 | |||
252 | Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | ||
253 | if (err) logger.error('Cannot increment scores of good pods.') | ||
254 | }) | ||
255 | |||
256 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | ||
257 | if (err) logger.error('Cannot decrement scores of bad pods.') | ||
258 | removeBadPods() | ||
259 | }) | ||
260 | } | ||
261 | |||
262 | function list (callback) { | ||
263 | this.find({ }, { _id: 1, request: 1, to: 1 }, callback) | ||
264 | } | ||
265 | |||
266 | function removeAll (callback) { | ||
267 | this.remove({ }, callback) | ||
268 | } | ||
269 | |||
270 | function removePodOf (requestsIds, podId, callback) { | ||
271 | if (!callback) callback = function () {} | ||
272 | |||
273 | this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) | ||
274 | } | ||
275 | |||
276 | function removeWithEmptyTo (callback) { | ||
277 | if (!callback) callback = function () {} | ||
278 | |||
279 | this.remove({ to: { $size: 0 } }, callback) | ||
280 | } | ||