]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - lib/poolRequests.js
Move the creation of requests in lib instead of model for poolrequests
[github/Chocobozzz/PeerTube.git] / lib / poolRequests.js
1 ;(function () {
2 'use strict'
3
4 var async = require('async')
5 var pluck = require('lodash-node/compat/collection/pluck')
6
7 var constants = require('../initializers/constants')
8 var logger = require('../helpers/logger')
9 var Pods = require('../models/pods')
10 var PoolRequests = require('../models/poolRequests')
11 var requests = require('../helpers/requests')
12 var Videos = require('../models/videos')
13
14 var timer = null
15
16 var poolRequests = {
17 activate: activate,
18 addRequest: addRequest,
19 deactivate: deactivate,
20 forceSend: forceSend
21 }
22
23 function activate () {
24 logger.info('Pool requests activated.')
25 timer = setInterval(makePoolRequests, constants.INTERVAL)
26 }
27
28 function addRequest (id, type, request) {
29 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
30
31 PoolRequests.findById(id, function (err, entity) {
32 if (err) {
33 logger.error('Cannot find one pool request.', { error: err })
34 return // Abort
35 }
36
37 if (entity) {
38 if (entity.type === type) {
39 logger.error('Cannot insert two same requests.')
40 return // Abort
41 }
42
43 // Remove the request of the other type
44 PoolRequests.removeById(id, function (err) {
45 if (err) {
46 logger.error('Cannot remove a pool request.', { error: err })
47 return // Abort
48 }
49 })
50 } else {
51 PoolRequests.create(id, type, request, function (err) {
52 logger.error('Cannot create a pool request.', { error: err })
53 return // Abort
54 })
55 }
56 })
57 }
58
59 function deactivate () {
60 logger.info('Pool requests deactivated.')
61 clearInterval(timer)
62 }
63
64 function forceSend () {
65 logger.info('Force pool requests sending.')
66 makePoolRequests()
67 }
68
69 // ---------------------------------------------------------------------------
70
71 module.exports = poolRequests
72
73 // ---------------------------------------------------------------------------
74
75 function makePoolRequest (type, requests_to_make, callback) {
76 if (!callback) callback = function () {}
77
78 Pods.list(function (err, pods) {
79 if (err) return callback(err)
80
81 var params = {
82 encrypt: true,
83 sign: true,
84 method: 'POST',
85 path: null,
86 data: requests_to_make
87 }
88
89 if (type === 'add') {
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
91 } else if (type === 'remove') {
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
93 } else {
94 return callback(new Error('Unkown pool request type.'))
95 }
96
97 var bad_pods = []
98 var good_pods = []
99
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101
102 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
103 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
104 bad_pods.push(pod._id)
105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
106 } else {
107 good_pods.push(pod._id)
108 }
109
110 return callback_each_pod_finished()
111 }
112
113 function callbackAllPodsFinished (err) {
114 if (err) return callback(err)
115
116 updatePodsScore(good_pods, bad_pods)
117 callback(null)
118 }
119 })
120 }
121
122 function makePoolRequests () {
123 logger.info('Making pool requests to friends.')
124
125 PoolRequests.list(function (err, pool_requests) {
126 if (err) {
127 logger.error('Cannot get the list of pool requests.', { err: err })
128 return // Abort
129 }
130
131 if (pool_requests.length === 0) return
132
133 var requests_to_make = {
134 add: {
135 ids: [],
136 requests: []
137 },
138 remove: {
139 ids: [],
140 requests: []
141 }
142 }
143
144 async.each(pool_requests, function (pool_request, callback_each) {
145 if (pool_request.type === 'add') {
146 requests_to_make.add.requests.push(pool_request.request)
147 requests_to_make.add.ids.push(pool_request._id)
148 } else if (pool_request.type === 'remove') {
149 requests_to_make.remove.requests.push(pool_request.request)
150 requests_to_make.remove.ids.push(pool_request._id)
151 } else {
152 logger.error('Unkown pool request type.', { request_type: pool_request.type })
153 return // abort
154 }
155
156 callback_each()
157 }, function () {
158 // Send the add requests
159 if (requests_to_make.add.requests.length !== 0) {
160 makePoolRequest('add', requests_to_make.add.requests, function (err) {
161 if (err) logger.error('Errors when sent add pool requests.', { error: err })
162
163 PoolRequests.removeRequests(requests_to_make.add.ids)
164 })
165 }
166
167 // Send the remove requests
168 if (requests_to_make.remove.requests.length !== 0) {
169 makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
171
172 PoolRequests.removeRequests(requests_to_make.remove.ids)
173 })
174 }
175 })
176 })
177 }
178
179 function removeBadPods () {
180 Pods.findBadPods(function (err, pods) {
181 if (err) {
182 logger.error('Cannot find bad pods.', { error: err })
183 return // abort
184 }
185
186 if (pods.length === 0) return
187
188 var urls = pluck(pods, 'url')
189 var ids = pluck(pods, '_id')
190
191 Videos.removeAllRemotesOf(urls, function (err, r) {
192 if (err) {
193 logger.error('Cannot remove videos from a pod that we removing.', { error: err })
194 } else {
195 var videos_removed = r.result.n
196 logger.info('Removed %d videos.', videos_removed)
197 }
198
199 Pods.removeAllByIds(ids, function (err, r) {
200 if (err) {
201 logger.error('Cannot remove bad pods.', { error: err })
202 } else {
203 var pods_removed = r.result.n
204 logger.info('Removed %d pods.', pods_removed)
205 }
206 })
207 })
208 })
209 }
210
211 function updatePodsScore (good_pods, bad_pods) {
212 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
213
214 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
215 if (err) logger.error('Cannot increment scores of good pods.')
216 })
217
218 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
219 if (err) logger.error('Cannot increment scores of bad pods.')
220 removeBadPods()
221 })
222 }
223 })()