]>
Commit | Line | Data |
---|---|---|
1 | 'use strict' | |
2 | ||
3 | const async = require('async') | |
4 | const map = require('lodash/map') | |
5 | ||
6 | const constants = require('../initializers/constants') | |
7 | const logger = require('../helpers/logger') | |
8 | const Pods = require('../models/pods') | |
9 | const Requests = require('../models/requests') | |
10 | const requests = require('../helpers/requests') | |
11 | const videos = require('../lib/videos') | |
12 | const Videos = require('../models/videos') | |
13 | ||
14 | let timer = null | |
15 | ||
16 | const requestsScheduler = { | |
17 | activate: activate, | |
18 | addRequest: addRequest, | |
19 | deactivate: deactivate, | |
20 | forceSend: forceSend | |
21 | } | |
22 | ||
23 | function activate () { | |
24 | logger.info('Requests scheduler activated.') | |
25 | timer = setInterval(makeRequests, constants.INTERVAL) | |
26 | } | |
27 | ||
28 | function addRequest (id, type, request) { | |
29 | logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) | |
30 | ||
31 | Requests.findById(id, function (err, entity) { | |
32 | if (err) { | |
33 | logger.error('Cannot find one request.', { error: err }) | |
34 | return // Abort | |
35 | } | |
36 | ||
37 | if (entity) { | |
38 | if (entity.type === type) { | |
39 | logger.error('Cannot insert two same requests.') | |
40 | return // Abort | |
41 | } | |
42 | ||
43 | // Remove the request of the other type | |
44 | Requests.removeRequestById(id, function (err) { | |
45 | if (err) { | |
46 | logger.error('Cannot remove a request.', { error: err }) | |
47 | return // Abort | |
48 | } | |
49 | }) | |
50 | } else { | |
51 | Requests.create(id, type, request, function (err) { | |
52 | if (err) logger.error('Cannot create a request.', { error: err }) | |
53 | return // Abort | |
54 | }) | |
55 | } | |
56 | }) | |
57 | } | |
58 | ||
59 | function deactivate () { | |
60 | logger.info('Requests scheduler deactivated.') | |
61 | clearInterval(timer) | |
62 | } | |
63 | ||
64 | function forceSend () { | |
65 | logger.info('Force requests scheduler sending.') | |
66 | makeRequests() | |
67 | } | |
68 | ||
69 | // --------------------------------------------------------------------------- | |
70 | ||
71 | module.exports = requestsScheduler | |
72 | ||
73 | // --------------------------------------------------------------------------- | |
74 | ||
75 | function makeRequest (type, requestsToMake, callback) { | |
76 | if (!callback) callback = function () {} | |
77 | ||
78 | Pods.list(function (err, pods) { | |
79 | if (err) return callback(err) | |
80 | ||
81 | const params = { | |
82 | encrypt: true, | |
83 | sign: true, | |
84 | method: 'POST', | |
85 | path: null, | |
86 | data: requestsToMake | |
87 | } | |
88 | ||
89 | if (type === 'add') { | |
90 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' | |
91 | } else if (type === 'remove') { | |
92 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' | |
93 | } else { | |
94 | return callback(new Error('Unkown pool request type.')) | |
95 | } | |
96 | ||
97 | const badPods = [] | |
98 | const goodPods = [] | |
99 | ||
100 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | |
101 | ||
102 | function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { | |
103 | if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { | |
104 | badPods.push(pod._id) | |
105 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) | |
106 | } else { | |
107 | goodPods.push(pod._id) | |
108 | } | |
109 | ||
110 | return callbackEachPodFinished() | |
111 | } | |
112 | ||
113 | function callbackAllPodsFinished (err) { | |
114 | if (err) return callback(err) | |
115 | ||
116 | updatePodsScore(goodPods, badPods) | |
117 | callback(null) | |
118 | } | |
119 | }) | |
120 | } | |
121 | ||
122 | function makeRequests () { | |
123 | logger.info('Making requests to friends.') | |
124 | ||
125 | Requests.list(function (err, requests) { | |
126 | if (err) { | |
127 | logger.error('Cannot get the list of requests.', { err: err }) | |
128 | return // Abort | |
129 | } | |
130 | ||
131 | if (requests.length === 0) return | |
132 | ||
133 | const requestsToMake = { | |
134 | add: { | |
135 | ids: [], | |
136 | requests: [] | |
137 | }, | |
138 | remove: { | |
139 | ids: [], | |
140 | requests: [] | |
141 | } | |
142 | } | |
143 | ||
144 | async.each(requests, function (poolRequest, callbackEach) { | |
145 | if (poolRequest.type === 'add') { | |
146 | requestsToMake.add.requests.push(poolRequest.request) | |
147 | requestsToMake.add.ids.push(poolRequest._id) | |
148 | } else if (poolRequest.type === 'remove') { | |
149 | requestsToMake.remove.requests.push(poolRequest.request) | |
150 | requestsToMake.remove.ids.push(poolRequest._id) | |
151 | } else { | |
152 | logger.error('Unkown request type.', { request_type: poolRequest.type }) | |
153 | return // abort | |
154 | } | |
155 | ||
156 | callbackEach() | |
157 | }, function () { | |
158 | // Send the add requests | |
159 | if (requestsToMake.add.requests.length !== 0) { | |
160 | makeRequest('add', requestsToMake.add.requests, function (err) { | |
161 | if (err) logger.error('Errors when sent add requests.', { error: err }) | |
162 | ||
163 | Requests.removeRequests(requestsToMake.add.ids) | |
164 | }) | |
165 | } | |
166 | ||
167 | // Send the remove requests | |
168 | if (requestsToMake.remove.requests.length !== 0) { | |
169 | makeRequest('remove', requestsToMake.remove.requests, function (err) { | |
170 | if (err) logger.error('Errors when sent remove pool requests.', { error: err }) | |
171 | ||
172 | Requests.removeRequests(requestsToMake.remove.ids) | |
173 | }) | |
174 | } | |
175 | }) | |
176 | }) | |
177 | } | |
178 | ||
179 | function removeBadPods () { | |
180 | Pods.findBadPods(function (err, pods) { | |
181 | if (err) { | |
182 | logger.error('Cannot find bad pods.', { error: err }) | |
183 | return // abort | |
184 | } | |
185 | ||
186 | if (pods.length === 0) return | |
187 | ||
188 | const urls = map(pods, 'url') | |
189 | const ids = map(pods, '_id') | |
190 | ||
191 | Videos.listFromUrls(urls, function (err, videosList) { | |
192 | if (err) { | |
193 | logger.error('Cannot list videos urls.', { error: err, urls: urls }) | |
194 | } else { | |
195 | videos.removeRemoteVideos(videosList, function (err) { | |
196 | if (err) logger.error('Cannot remove remote videos.', { error: err }) | |
197 | }) | |
198 | } | |
199 | ||
200 | Pods.removeAllByIds(ids, function (err, r) { | |
201 | if (err) { | |
202 | logger.error('Cannot remove bad pods.', { error: err }) | |
203 | } else { | |
204 | const podsRemoved = r.result.n | |
205 | logger.info('Removed %d pods.', podsRemoved) | |
206 | } | |
207 | }) | |
208 | }) | |
209 | }) | |
210 | } | |
211 | ||
212 | function updatePodsScore (goodPods, badPods) { | |
213 | logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length) | |
214 | ||
215 | Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) { | |
216 | if (err) logger.error('Cannot increment scores of good pods.') | |
217 | }) | |
218 | ||
219 | Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) { | |
220 | if (err) logger.error('Cannot increment scores of bad pods.') | |
221 | removeBadPods() | |
222 | }) | |
223 | } |