aboutsummaryrefslogtreecommitdiffhomepage
path: root/lib/poolRequests.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/poolRequests.js')
-rw-r--r--lib/poolRequests.js358
1 files changed, 178 insertions, 180 deletions
diff --git a/lib/poolRequests.js b/lib/poolRequests.js
index 9ea41f383..f786c3c7a 100644
--- a/lib/poolRequests.js
+++ b/lib/poolRequests.js
@@ -1,223 +1,221 @@
1;(function () { 1'use strict'
2 'use strict' 2
3 3var async = require('async')
4 var async = require('async') 4var pluck = require('lodash-node/compat/collection/pluck')
5 var pluck = require('lodash-node/compat/collection/pluck') 5
6 6var constants = require('../initializers/constants')
7 var constants = require('../initializers/constants') 7var logger = require('../helpers/logger')
8 var logger = require('../helpers/logger') 8var Pods = require('../models/pods')
9 var Pods = require('../models/pods') 9var PoolRequests = require('../models/poolRequests')
10 var PoolRequests = require('../models/poolRequests') 10var requests = require('../helpers/requests')
11 var requests = require('../helpers/requests') 11var Videos = require('../models/videos')
12 var Videos = require('../models/videos') 12
13 13var timer = null
14 var timer = null 14
15 15var poolRequests = {
16 var poolRequests = { 16 activate: activate,
17 activate: activate, 17 addRequest: addRequest,
18 addRequest: addRequest, 18 deactivate: deactivate,
19 deactivate: deactivate, 19 forceSend: forceSend
20 forceSend: forceSend 20}
21 } 21
22 22function activate () {
23 function activate () { 23 logger.info('Pool requests activated.')
24 logger.info('Pool requests activated.') 24 timer = setInterval(makePoolRequests, constants.INTERVAL)
25 timer = setInterval(makePoolRequests, constants.INTERVAL) 25}
26 } 26
27 27function addRequest (id, type, request) {
28 function addRequest (id, type, request) { 28 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
29 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) 29
30 30 PoolRequests.findById(id, function (err, entity) {
31 PoolRequests.findById(id, function (err, entity) { 31 if (err) {
32 if (err) { 32 logger.error('Cannot find one pool request.', { error: err })
33 logger.error('Cannot find one pool request.', { error: err }) 33 return // Abort
34 }
35
36 if (entity) {
37 if (entity.type === type) {
38 logger.error('Cannot insert two same requests.')
34 return // Abort 39 return // Abort
35 } 40 }
36 41
37 if (entity) { 42 // Remove the request of the other type
38 if (entity.type === type) { 43 PoolRequests.removeRequestById(id, function (err) {
39 logger.error('Cannot insert two same requests.') 44 if (err) {
45 logger.error('Cannot remove a pool request.', { error: err })
40 return // Abort 46 return // Abort
41 } 47 }
48 })
49 } else {
50 PoolRequests.create(id, type, request, function (err) {
51 if (err) logger.error('Cannot create a pool request.', { error: err })
52 return // Abort
53 })
54 }
55 })
56}
42 57
43 // Remove the request of the other type 58function deactivate () {
44 PoolRequests.removeRequestById(id, function (err) { 59 logger.info('Pool requests deactivated.')
45 if (err) { 60 clearInterval(timer)
46 logger.error('Cannot remove a pool request.', { error: err }) 61}
47 return // Abort
48 }
49 })
50 } else {
51 PoolRequests.create(id, type, request, function (err) {
52 if (err) logger.error('Cannot create a pool request.', { error: err })
53 return // Abort
54 })
55 }
56 })
57 }
58 62
59 function deactivate () { 63function forceSend () {
60 logger.info('Pool requests deactivated.') 64 logger.info('Force pool requests sending.')
61 clearInterval(timer) 65 makePoolRequests()
62 } 66}
63 67
64 function forceSend () { 68// ---------------------------------------------------------------------------
65 logger.info('Force pool requests sending.')
66 makePoolRequests()
67 }
68 69
69 // --------------------------------------------------------------------------- 70module.exports = poolRequests
70 71
71 module.exports = poolRequests 72// ---------------------------------------------------------------------------
72 73
73 // --------------------------------------------------------------------------- 74function makePoolRequest (type, requests_to_make, callback) {
75 if (!callback) callback = function () {}
74 76
75 function makePoolRequest (type, requests_to_make, callback) { 77 Pods.list(function (err, pods) {
76 if (!callback) callback = function () {} 78 if (err) return callback(err)
77 79
78 Pods.list(function (err, pods) { 80 var params = {
79 if (err) return callback(err) 81 encrypt: true,
82 sign: true,
83 method: 'POST',
84 path: null,
85 data: requests_to_make
86 }
80 87
81 var params = { 88 if (type === 'add') {
82 encrypt: true, 89 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
83 sign: true, 90 } else if (type === 'remove') {
84 method: 'POST', 91 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
85 path: null, 92 } else {
86 data: requests_to_make 93 return callback(new Error('Unkown pool request type.'))
87 } 94 }
95
96 var bad_pods = []
97 var good_pods = []
88 98
89 if (type === 'add') { 99 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' 100
91 } else if (type === 'remove') { 101 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' 102 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
103 bad_pods.push(pod._id)
104 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
93 } else { 105 } else {
94 return callback(new Error('Unkown pool request type.')) 106 good_pods.push(pod._id)
95 } 107 }
96 108
97 var bad_pods = [] 109 return callback_each_pod_finished()
98 var good_pods = [] 110 }
99
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101 111
102 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { 112 function callbackAllPodsFinished (err) {
103 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { 113 if (err) return callback(err)
104 bad_pods.push(pod._id)
105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
106 } else {
107 good_pods.push(pod._id)
108 }
109 114
110 return callback_each_pod_finished() 115 updatePodsScore(good_pods, bad_pods)
116 callback(null)
117 }
118 })
119}
120
121function makePoolRequests () {
122 logger.info('Making pool requests to friends.')
123
124 PoolRequests.list(function (err, pool_requests) {
125 if (err) {
126 logger.error('Cannot get the list of pool requests.', { err: err })
127 return // Abort
128 }
129
130 if (pool_requests.length === 0) return
131
132 var requests_to_make = {
133 add: {
134 ids: [],
135 requests: []
136 },
137 remove: {
138 ids: [],
139 requests: []
111 } 140 }
112 141 }
113 function callbackAllPodsFinished (err) { 142
114 if (err) return callback(err) 143 async.each(pool_requests, function (pool_request, callback_each) {
115 144 if (pool_request.type === 'add') {
116 updatePodsScore(good_pods, bad_pods) 145 requests_to_make.add.requests.push(pool_request.request)
117 callback(null) 146 requests_to_make.add.ids.push(pool_request._id)
147 } else if (pool_request.type === 'remove') {
148 requests_to_make.remove.requests.push(pool_request.request)
149 requests_to_make.remove.ids.push(pool_request._id)
150 } else {
151 logger.error('Unkown pool request type.', { request_type: pool_request.type })
152 return // abort
118 } 153 }
119 })
120 }
121 154
122 function makePoolRequests () { 155 callback_each()
123 logger.info('Making pool requests to friends.') 156 }, function () {
157 // Send the add requests
158 if (requests_to_make.add.requests.length !== 0) {
159 makePoolRequest('add', requests_to_make.add.requests, function (err) {
160 if (err) logger.error('Errors when sent add pool requests.', { error: err })
124 161
125 PoolRequests.list(function (err, pool_requests) { 162 PoolRequests.removeRequests(requests_to_make.add.ids)
126 if (err) { 163 })
127 logger.error('Cannot get the list of pool requests.', { err: err })
128 return // Abort
129 } 164 }
130 165
131 if (pool_requests.length === 0) return 166 // Send the remove requests
167 if (requests_to_make.remove.requests.length !== 0) {
168 makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
169 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
132 170
133 var requests_to_make = { 171 PoolRequests.removeRequests(requests_to_make.remove.ids)
134 add: { 172 })
135 ids: [],
136 requests: []
137 },
138 remove: {
139 ids: [],
140 requests: []
141 }
142 } 173 }
174 })
175 })
176}
143 177
144 async.each(pool_requests, function (pool_request, callback_each) { 178function removeBadPods () {
145 if (pool_request.type === 'add') { 179 Pods.findBadPods(function (err, pods) {
146 requests_to_make.add.requests.push(pool_request.request) 180 if (err) {
147 requests_to_make.add.ids.push(pool_request._id) 181 logger.error('Cannot find bad pods.', { error: err })
148 } else if (pool_request.type === 'remove') { 182 return // abort
149 requests_to_make.remove.requests.push(pool_request.request) 183 }
150 requests_to_make.remove.ids.push(pool_request._id)
151 } else {
152 logger.error('Unkown pool request type.', { request_type: pool_request.type })
153 return // abort
154 }
155
156 callback_each()
157 }, function () {
158 // Send the add requests
159 if (requests_to_make.add.requests.length !== 0) {
160 makePoolRequest('add', requests_to_make.add.requests, function (err) {
161 if (err) logger.error('Errors when sent add pool requests.', { error: err })
162
163 PoolRequests.removeRequests(requests_to_make.add.ids)
164 })
165 }
166 184
167 // Send the remove requests 185 if (pods.length === 0) return
168 if (requests_to_make.remove.requests.length !== 0) {
169 makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
171 186
172 PoolRequests.removeRequests(requests_to_make.remove.ids) 187 var urls = pluck(pods, 'url')
173 }) 188 var ids = pluck(pods, '_id')
174 }
175 })
176 })
177 }
178 189
179 function removeBadPods () { 190 Videos.removeAllRemotesOf(urls, function (err, r) {
180 Pods.findBadPods(function (err, pods) {
181 if (err) { 191 if (err) {
182 logger.error('Cannot find bad pods.', { error: err }) 192 logger.error('Cannot remove videos from a pod that we removing.', { error: err })
183 return // abort 193 } else {
194 var videos_removed = r.result.n
195 logger.info('Removed %d videos.', videos_removed)
184 } 196 }
185 197
186 if (pods.length === 0) return 198 Pods.removeAllByIds(ids, function (err, r) {
187
188 var urls = pluck(pods, 'url')
189 var ids = pluck(pods, '_id')
190
191 Videos.removeAllRemotesOf(urls, function (err, r) {
192 if (err) { 199 if (err) {
193 logger.error('Cannot remove videos from a pod that we removing.', { error: err }) 200 logger.error('Cannot remove bad pods.', { error: err })
194 } else { 201 } else {
195 var videos_removed = r.result.n 202 var pods_removed = r.result.n
196 logger.info('Removed %d videos.', videos_removed) 203 logger.info('Removed %d pods.', pods_removed)
197 } 204 }
198
199 Pods.removeAllByIds(ids, function (err, r) {
200 if (err) {
201 logger.error('Cannot remove bad pods.', { error: err })
202 } else {
203 var pods_removed = r.result.n
204 logger.info('Removed %d pods.', pods_removed)
205 }
206 })
207 }) 205 })
208 }) 206 })
209 } 207 })
208}
210 209
211 function updatePodsScore (good_pods, bad_pods) { 210function updatePodsScore (good_pods, bad_pods) {
212 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) 211 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
213 212
214 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { 213 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
215 if (err) logger.error('Cannot increment scores of good pods.') 214 if (err) logger.error('Cannot increment scores of good pods.')
216 }) 215 })
217 216
218 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { 217 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
219 if (err) logger.error('Cannot increment scores of bad pods.') 218 if (err) logger.error('Cannot increment scores of bad pods.')
220 removeBadPods() 219 removeBadPods()
221 }) 220 })
222 } 221}
223})()