4 var async
= require('async')
6 var constants
= require('../initializers/constants')
7 var logger
= require('../helpers/logger')
8 var database
= require('../initializers/database')
9 var pluck
= require('lodash-node/compat/collection/pluck')
10 var PoolRequestsDB
= database
.PoolRequestsDB
11 var PodsDB
= database
.PodsDB
12 var utils
= require('../helpers/utils')
13 var VideosDB
= database
.VideosDB
17 // ----------- Private -----------
20 function removePoolRequestsFromDB (ids
) {
21 PoolRequestsDB
.remove({ _id: { $in: ids
} }, function (err
) {
23 logger
.error('Cannot remove requests from the pool requests database.', { error: err
})
27 logger
.info('Pool requests flushed.')
31 function makePoolRequests () {
32 logger
.info('Making pool requests to friends.')
34 PoolRequestsDB
.find({}, { _id: 1, type: 1, request: 1 }, function (err
, pool_requests
) {
37 if (pool_requests
.length
=== 0) return
50 async
.each(pool_requests
, function (pool_request
, callback_each
) {
51 if (pool_request
.type
=== 'add') {
52 requests
.add
.requests
.push(pool_request
.request
)
53 requests
.add
.ids
.push(pool_request
._id
)
54 } else if (pool_request
.type
=== 'remove') {
55 requests
.remove
.requests
.push(pool_request
.request
)
56 requests
.remove
.ids
.push(pool_request
._id
)
58 throw new Error('Unkown pool request type.')
63 // Send the add requests
64 if (requests
.add
.requests
.length
!== 0) {
65 makePoolRequest('add', requests
.add
.requests
, function (err
) {
66 if (err
) logger
.error('Errors when sent add pool requests.', { error: err
})
68 removePoolRequestsFromDB(requests
.add
.ids
)
72 // Send the remove requests
73 if (requests
.remove
.requests
.length
!== 0) {
74 makePoolRequest('remove', requests
.remove
.requests
, function (err
) {
75 if (err
) logger
.error('Errors when sent remove pool requests.', { error: err
})
77 removePoolRequestsFromDB(requests
.remove
.ids
)
84 function updatePodsScore (good_pods
, bad_pods
) {
85 logger
.info('Updating %d good pods and %d bad pods scores.', good_pods
.length
, bad_pods
.length
)
87 PodsDB
.update({ _id: { $in: good_pods
} }, { $inc: { score: constants
.PODS_SCORE
.BONUS
} }, { multi: true }).exec()
88 PodsDB
.update({ _id: { $in: bad_pods
} }, { $inc: { score: constants
.PODS_SCORE
.MALUS
} }, { multi: true }, function (err
) {
94 function removeBadPods () {
95 PodsDB
.find({ score: 0 }, { _id: 1, url: 1 }, function (err
, pods
) {
98 if (pods
.length
=== 0) return
100 var urls
= pluck(pods
, 'url')
101 var ids
= pluck(pods
, '_id')
103 VideosDB
.remove({ podUrl: { $in: urls
} }, function (err
, r
) {
104 if (err
) logger
.error('Cannot remove videos from a pod that we removing.', { error: err
})
105 var videos_removed
= r
.result
.n
106 logger
.info('Removed %d videos.', videos_removed
)
108 PodsDB
.remove({ _id: { $in: ids
} }, function (err
, r
) {
109 if (err
) logger
.error('Cannot remove bad pods.', { error: err
})
111 var pods_removed
= r
.result
.n
112 logger
.info('Removed %d pods.', pods_removed
)
118 function makePoolRequest (type
, requests
, callback
) {
119 if (!callback
) callback = function () {}
121 PodsDB
.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err
, pods
) {
132 if (type
=== 'add') {
133 params
.path
= '/api/' + constants
.API_VERSION
+ '/remotevideos/add'
134 } else if (type
=== 'remove') {
135 params
.path
= '/api/' + constants
.API_VERSION
+ '/remotevideos/remove'
137 throw new Error('Unkown pool request type.')
143 utils
.makeMultipleRetryRequest(params
, pods
, callbackEachPodFinished
, callbackAllPodsFinished
)
145 function callbackEachPodFinished (err
, response
, body
, url
, pod
, callback_each_pod_finished
) {
146 if (err
|| (response
.statusCode
!== 200 && response
.statusCode
!== 204)) {
147 bad_pods
.push(pod
._id
)
148 logger
.error('Error sending secure request to %s pod.', url
, { error: err
|| new Error('Status code not 20x') })
150 good_pods
.push(pod
._id
)
153 return callback_each_pod_finished()
156 function callbackAllPodsFinished (err
) {
157 if (err
) return callback(err
)
159 updatePodsScore(good_pods
, bad_pods
)
165 // ----------- Public -----------
166 poolRequests
.activate = function () {
167 logger
.info('Pool requests activated.')
168 timer
= setInterval(makePoolRequests
, constants
.INTERVAL
)
171 poolRequests
.addToPoolRequests = function (id
, type
, request
) {
172 logger
.debug('Add request to the pool requests.', { id: id
, type: type
, request: request
})
174 PoolRequestsDB
.findOne({ id: id
}, function (err
, entity
) {
175 if (err
) logger
.error(err
)
178 if (entity
.type
=== type
) {
179 logger
.error(new Error('Cannot insert two same requests.'))
183 // Remove the request of the other type
184 PoolRequestsDB
.remove({ id: id
}, function (err
) {
185 if (err
) logger
.error(err
)
188 PoolRequestsDB
.create({ id: id
, type: type
, request: request
}, function (err
) {
189 if (err
) logger
.error(err
)
195 poolRequests
.deactivate = function () {
196 logger
.info('Pool requests deactivated.')
200 poolRequests
.forceSend = function () {
201 logger
.info('Force pool requests sending.')
205 module
.exports
= poolRequests