]>
Commit | Line | Data |
---|---|---|
1 | 'use strict' | |
2 | ||
3 | const async = require('async') | |
4 | const map = require('lodash/map') | |
5 | ||
6 | const constants = require('../initializers/constants') | |
7 | const logger = require('../helpers/logger') | |
8 | const Pods = require('../models/pods') | |
9 | const PoolRequests = require('../models/poolRequests') | |
10 | const requests = require('../helpers/requests') | |
11 | const Videos = require('../models/videos') | |
12 | ||
13 | let timer = null | |
14 | ||
15 | const poolRequests = { | |
16 | activate: activate, | |
17 | addRequest: addRequest, | |
18 | deactivate: deactivate, | |
19 | forceSend: forceSend | |
20 | } | |
21 | ||
22 | function activate () { | |
23 | logger.info('Pool requests activated.') | |
24 | timer = setInterval(makePoolRequests, constants.INTERVAL) | |
25 | } | |
26 | ||
27 | function addRequest (id, type, request) { | |
28 | logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) | |
29 | ||
30 | PoolRequests.findById(id, function (err, entity) { | |
31 | if (err) { | |
32 | logger.error('Cannot find one pool request.', { error: err }) | |
33 | return // Abort | |
34 | } | |
35 | ||
36 | if (entity) { | |
37 | if (entity.type === type) { | |
38 | logger.error('Cannot insert two same requests.') | |
39 | return // Abort | |
40 | } | |
41 | ||
42 | // Remove the request of the other type | |
43 | PoolRequests.removeRequestById(id, function (err) { | |
44 | if (err) { | |
45 | logger.error('Cannot remove a pool request.', { error: err }) | |
46 | return // Abort | |
47 | } | |
48 | }) | |
49 | } else { | |
50 | PoolRequests.create(id, type, request, function (err) { | |
51 | if (err) logger.error('Cannot create a pool request.', { error: err }) | |
52 | return // Abort | |
53 | }) | |
54 | } | |
55 | }) | |
56 | } | |
57 | ||
58 | function deactivate () { | |
59 | logger.info('Pool requests deactivated.') | |
60 | clearInterval(timer) | |
61 | } | |
62 | ||
63 | function forceSend () { | |
64 | logger.info('Force pool requests sending.') | |
65 | makePoolRequests() | |
66 | } | |
67 | ||
68 | // --------------------------------------------------------------------------- | |
69 | ||
70 | module.exports = poolRequests | |
71 | ||
72 | // --------------------------------------------------------------------------- | |
73 | ||
74 | function makePoolRequest (type, requests_to_make, callback) { | |
75 | if (!callback) callback = function () {} | |
76 | ||
77 | Pods.list(function (err, pods) { | |
78 | if (err) return callback(err) | |
79 | ||
80 | const params = { | |
81 | encrypt: true, | |
82 | sign: true, | |
83 | method: 'POST', | |
84 | path: null, | |
85 | data: requests_to_make | |
86 | } | |
87 | ||
88 | if (type === 'add') { | |
89 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' | |
90 | } else if (type === 'remove') { | |
91 | params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' | |
92 | } else { | |
93 | return callback(new Error('Unkown pool request type.')) | |
94 | } | |
95 | ||
96 | const bad_pods = [] | |
97 | const good_pods = [] | |
98 | ||
99 | requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) | |
100 | ||
101 | function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { | |
102 | if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { | |
103 | bad_pods.push(pod._id) | |
104 | logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) | |
105 | } else { | |
106 | good_pods.push(pod._id) | |
107 | } | |
108 | ||
109 | return callback_each_pod_finished() | |
110 | } | |
111 | ||
112 | function callbackAllPodsFinished (err) { | |
113 | if (err) return callback(err) | |
114 | ||
115 | updatePodsScore(good_pods, bad_pods) | |
116 | callback(null) | |
117 | } | |
118 | }) | |
119 | } | |
120 | ||
121 | function makePoolRequests () { | |
122 | logger.info('Making pool requests to friends.') | |
123 | ||
124 | PoolRequests.list(function (err, pool_requests) { | |
125 | if (err) { | |
126 | logger.error('Cannot get the list of pool requests.', { err: err }) | |
127 | return // Abort | |
128 | } | |
129 | ||
130 | if (pool_requests.length === 0) return | |
131 | ||
132 | const requests_to_make = { | |
133 | add: { | |
134 | ids: [], | |
135 | requests: [] | |
136 | }, | |
137 | remove: { | |
138 | ids: [], | |
139 | requests: [] | |
140 | } | |
141 | } | |
142 | ||
143 | async.each(pool_requests, function (pool_request, callback_each) { | |
144 | if (pool_request.type === 'add') { | |
145 | requests_to_make.add.requests.push(pool_request.request) | |
146 | requests_to_make.add.ids.push(pool_request._id) | |
147 | } else if (pool_request.type === 'remove') { | |
148 | requests_to_make.remove.requests.push(pool_request.request) | |
149 | requests_to_make.remove.ids.push(pool_request._id) | |
150 | } else { | |
151 | logger.error('Unkown pool request type.', { request_type: pool_request.type }) | |
152 | return // abort | |
153 | } | |
154 | ||
155 | callback_each() | |
156 | }, function () { | |
157 | // Send the add requests | |
158 | if (requests_to_make.add.requests.length !== 0) { | |
159 | makePoolRequest('add', requests_to_make.add.requests, function (err) { | |
160 | if (err) logger.error('Errors when sent add pool requests.', { error: err }) | |
161 | ||
162 | PoolRequests.removeRequests(requests_to_make.add.ids) | |
163 | }) | |
164 | } | |
165 | ||
166 | // Send the remove requests | |
167 | if (requests_to_make.remove.requests.length !== 0) { | |
168 | makePoolRequest('remove', requests_to_make.remove.requests, function (err) { | |
169 | if (err) logger.error('Errors when sent remove pool requests.', { error: err }) | |
170 | ||
171 | PoolRequests.removeRequests(requests_to_make.remove.ids) | |
172 | }) | |
173 | } | |
174 | }) | |
175 | }) | |
176 | } | |
177 | ||
178 | function removeBadPods () { | |
179 | Pods.findBadPods(function (err, pods) { | |
180 | if (err) { | |
181 | logger.error('Cannot find bad pods.', { error: err }) | |
182 | return // abort | |
183 | } | |
184 | ||
185 | if (pods.length === 0) return | |
186 | ||
187 | const urls = map(pods, 'url') | |
188 | const ids = map(pods, '_id') | |
189 | ||
190 | Videos.removeAllRemotesOf(urls, function (err, r) { | |
191 | if (err) { | |
192 | logger.error('Cannot remove videos from a pod that we removing.', { error: err }) | |
193 | } else { | |
194 | const videos_removed = r.result.n | |
195 | logger.info('Removed %d videos.', videos_removed) | |
196 | } | |
197 | ||
198 | Pods.removeAllByIds(ids, function (err, r) { | |
199 | if (err) { | |
200 | logger.error('Cannot remove bad pods.', { error: err }) | |
201 | } else { | |
202 | const pods_removed = r.result.n | |
203 | logger.info('Removed %d pods.', pods_removed) | |
204 | } | |
205 | }) | |
206 | }) | |
207 | }) | |
208 | } | |
209 | ||
210 | function updatePodsScore (good_pods, bad_pods) { | |
211 | logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) | |
212 | ||
213 | Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { | |
214 | if (err) logger.error('Cannot increment scores of good pods.') | |
215 | }) | |
216 | ||
217 | Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { | |
218 | if (err) logger.error('Cannot increment scores of bad pods.') | |
219 | removeBadPods() | |
220 | }) | |
221 | } |