]>
Commit | Line | Data |
---|---|---|
1 | ;(function () { | |
2 | 'use strict' | |
3 | ||
4 | var async = require('async') | |
5 | var pluck = require('lodash-node/compat/collection/pluck') | |
6 | ||
7 | var constants = require('../initializers/constants') | |
8 | var logger = require('../helpers/logger') | |
9 | var Pods = require('../models/pods') | |
10 | var PoolRequests = require('../models/poolRequests') | |
11 | var requests = require('../helpers/requests') | |
12 | var Videos = require('../models/videos') | |
13 | ||
14 | var timer = null | |
15 | ||
16 | var poolRequests = { | |
17 | activate: activate, | |
18 | addRequest: addRequest, | |
19 | deactivate: deactivate, | |
20 | forceSend: forceSend | |
21 | } | |
22 | ||
23 | function activate () { | |
24 | logger.info('Pool requests activated.') | |
25 | timer = setInterval(makePoolRequests, constants.INTERVAL) | |
26 | } | |
27 | ||
28 | function addRequest (id, type, request) { | |
29 | logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) | |
30 | ||
31 | PoolRequests.findById(id, function (err, entity) { | |
32 | if (err) { | |
33 | logger.error('Cannot find one pool request.', { error: err }) | |
34 | return // Abort | |
35 | } | |
36 | ||
37 | if (entity) { | |
38 | if (entity.type === type) { | |
39 | logger.error('Cannot insert two same requests.') | |
40 | return // Abort | |
41 | } | |
42 | ||
43 | // Remove the request of the other type | |
44 | PoolRequests.removeById(id, function (err) { | |
45 | if (err) { | |
46 | logger.error('Cannot remove a pool request.', { error: err }) | |
47 | return // Abort | |
48 | } | |
49 | }) | |
50 | } else { | |
51 | PoolRequests.create(id, type, request, function (err) { | |
52 | logger.error('Cannot create a pool request.', { error: err }) | |
53 | return // Abort | |
54 | }) | |
55 | } | |
56 | }) | |
57 | } | |
58 | ||
59 | function deactivate () { | |
60 | logger.info('Pool requests deactivated.') | |
61 | clearInterval(timer) | |
62 | } | |
63 | ||
64 | function forceSend () { | |
65 | logger.info('Force pool requests sending.') | |
66 | makePoolRequests() | |
67 | } | |
68 | ||
69 | // --------------------------------------------------------------------------- | |
70 | ||
71 | module.exports = poolRequests | |
72 | ||
73 | // --------------------------------------------------------------------------- | |
74 | ||
75 | function makePoolRequest (type, requests_to_make, callback) { | |
76 | if (!callback) callback = function () {} | |
77 | ||
78 | Pods.list(function (err, pods) { | |
79 | if (err) return callback(err) | |
80 | ||
81 | var params = { | |
82 | encrypt: true, | |
83 | sign: true, | |
84 | method: 'POST', | |
85 | path: null, | |
86 | data: requests_to_make | |
87 | } | |
88 | ||
89 | if (type === 'add') { | |
90 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' | |
91 | } else if (type === 'remove') { | |
92 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' | |
93 | } else { | |
94 | return callback(new Error('Unkown pool request type.')) | |
95 | } | |
96 | ||
97 | var bad_pods = [] | |
98 | var good_pods = [] | |
99 | ||
100 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | |
101 | ||
102 | function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { | |
103 | if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { | |
104 | bad_pods.push(pod._id) | |
105 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) | |
106 | } else { | |
107 | good_pods.push(pod._id) | |
108 | } | |
109 | ||
110 | return callback_each_pod_finished() | |
111 | } | |
112 | ||
113 | function callbackAllPodsFinished (err) { | |
114 | if (err) return callback(err) | |
115 | ||
116 | updatePodsScore(good_pods, bad_pods) | |
117 | callback(null) | |
118 | } | |
119 | }) | |
120 | } | |
121 | ||
122 | function makePoolRequests () { | |
123 | logger.info('Making pool requests to friends.') | |
124 | ||
125 | PoolRequests.list(function (err, pool_requests) { | |
126 | if (err) { | |
127 | logger.error('Cannot get the list of pool requests.', { err: err }) | |
128 | return // Abort | |
129 | } | |
130 | ||
131 | if (pool_requests.length === 0) return | |
132 | ||
133 | var requests_to_make = { | |
134 | add: { | |
135 | ids: [], | |
136 | requests: [] | |
137 | }, | |
138 | remove: { | |
139 | ids: [], | |
140 | requests: [] | |
141 | } | |
142 | } | |
143 | ||
144 | async.each(pool_requests, function (pool_request, callback_each) { | |
145 | if (pool_request.type === 'add') { | |
146 | requests_to_make.add.requests.push(pool_request.request) | |
147 | requests_to_make.add.ids.push(pool_request._id) | |
148 | } else if (pool_request.type === 'remove') { | |
149 | requests_to_make.remove.requests.push(pool_request.request) | |
150 | requests_to_make.remove.ids.push(pool_request._id) | |
151 | } else { | |
152 | logger.error('Unkown pool request type.', { request_type: pool_request.type }) | |
153 | return // abort | |
154 | } | |
155 | ||
156 | callback_each() | |
157 | }, function () { | |
158 | // Send the add requests | |
159 | if (requests_to_make.add.requests.length !== 0) { | |
160 | makePoolRequest('add', requests_to_make.add.requests, function (err) { | |
161 | if (err) logger.error('Errors when sent add pool requests.', { error: err }) | |
162 | ||
163 | PoolRequests.removeRequests(requests_to_make.add.ids) | |
164 | }) | |
165 | } | |
166 | ||
167 | // Send the remove requests | |
168 | if (requests_to_make.remove.requests.length !== 0) { | |
169 | makePoolRequest('remove', requests_to_make.remove.requests, function (err) { | |
170 | if (err) logger.error('Errors when sent remove pool requests.', { error: err }) | |
171 | ||
172 | PoolRequests.removeRequests(requests_to_make.remove.ids) | |
173 | }) | |
174 | } | |
175 | }) | |
176 | }) | |
177 | } | |
178 | ||
179 | function removeBadPods () { | |
180 | Pods.findBadPods(function (err, pods) { | |
181 | if (err) { | |
182 | logger.error('Cannot find bad pods.', { error: err }) | |
183 | return // abort | |
184 | } | |
185 | ||
186 | if (pods.length === 0) return | |
187 | ||
188 | var urls = pluck(pods, 'url') | |
189 | var ids = pluck(pods, '_id') | |
190 | ||
191 | Videos.removeAllRemotesOf(urls, function (err, r) { | |
192 | if (err) { | |
193 | logger.error('Cannot remove videos from a pod that we removing.', { error: err }) | |
194 | } else { | |
195 | var videos_removed = r.result.n | |
196 | logger.info('Removed %d videos.', videos_removed) | |
197 | } | |
198 | ||
199 | Pods.removeAllByIds(ids, function (err, r) { | |
200 | if (err) { | |
201 | logger.error('Cannot remove bad pods.', { error: err }) | |
202 | } else { | |
203 | var pods_removed = r.result.n | |
204 | logger.info('Removed %d pods.', pods_removed) | |
205 | } | |
206 | }) | |
207 | }) | |
208 | }) | |
209 | } | |
210 | ||
211 | function updatePodsScore (good_pods, bad_pods) { | |
212 | logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) | |
213 | ||
214 | Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { | |
215 | if (err) logger.error('Cannot increment scores of good pods.') | |
216 | }) | |
217 | ||
218 | Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { | |
219 | if (err) logger.error('Cannot increment scores of bad pods.') | |
220 | removeBadPods() | |
221 | }) | |
222 | } | |
223 | })() |