]>
git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/requestsScheduler.js
3 const async
= require('async')
4 const map
= require('lodash/map')
6 const constants
= require('../initializers/constants')
7 const logger
= require('../helpers/logger')
8 const Pods
= require('../models/pods')
9 const Requests
= require('../models/requests')
10 const requests
= require('../helpers/requests')
11 const videos
= require('../lib/videos')
12 const Videos
= require('../models/videos')
16 const requestsScheduler
= {
18 addRequest: addRequest
,
19 addRequestTo: addRequestTo
,
20 deactivate: deactivate
,
25 function activate () {
26 logger
.info('Requests scheduler activated.')
27 timer
= setInterval(makeRequests
, constants
.INTERVAL
)
30 // Add request to the scheduler
31 function addRequest (type
, data
) {
32 logger
.debug('Add request of type %s to the requests scheduler.', type
, { data: data
})
39 Pods
.listAllIds(function (err
, podIds
) {
41 logger
.debug('Cannot list pod ids.')
48 Requests
.create(request
, podIds
, function (err
) {
49 if (err
) logger
.error('Cannot create a request.', { error: err
})
54 function addRequestTo (podIds
, type
, data
) {
60 Requests
.create(request
, podIds
, function (err
) {
61 if (err
) logger
.error('Cannot create a request.', { error: err
})
65 function deactivate () {
66 logger
.info('Requests scheduler deactivated.')
71 Requests
.removeAll(function (err
) {
73 logger
.error('Cannot flush the requests.', { error: err
})
78 function forceSend () {
79 logger
.info('Force requests scheduler sending.')
83 // ---------------------------------------------------------------------------
85 module
.exports
= requestsScheduler
87 // ---------------------------------------------------------------------------
89 // Make a requests to friends of a certain type
90 function makeRequest (toPod
, requestsToMake
, callback
) {
91 if (!callback
) callback = function () {}
95 encrypt: true, // Security
96 sign: true, // To prove our identity
98 path: '/api/' + constants
.API_VERSION
+ '/remote/videos',
99 data: requestsToMake
// Requests we need to make
102 // Make multiple retry requests to all of pods
103 // The function fire some useful callbacks
104 requests
.makeSecureRequest(params
, function (err
, res
) {
105 if (err
|| (res
.statusCode
!== 200 && res
.statusCode
!== 201 && res
.statusCode
!== 204)) {
106 logger
.error('Error sending secure request to %s pod.', toPod
.url
, { error: err
|| new Error('Status code not 20x') })
108 return callback(false)
111 return callback(true)
115 // Make all the requests of the scheduler
116 function makeRequests () {
117 Requests
.list(function (err
, requests
) {
119 logger
.error('Cannot get the list of requests.', { err: err
})
123 // If there are no requests, abort
124 if (requests
.length
=== 0) {
125 logger
.info('No requests to make.')
129 logger
.info('Making requests to friends.')
131 // Requests by pods id
132 const requestsToMake
= {}
134 requests
.forEach(function (poolRequest
) {
135 poolRequest
.to
.forEach(function (toPodId
) {
136 if (!requestsToMake
[toPodId
]) {
137 requestsToMake
[toPodId
] = {
143 requestsToMake
[toPodId
].ids
.push(poolRequest
._id
)
144 requestsToMake
[toPodId
].datas
.push(poolRequest
.request
)
151 async
.eachLimit(Object
.keys(requestsToMake
), constants
.REQUESTS_IN_PARALLEL
, function (toPodId
, callbackEach
) {
152 const requestToMake
= requestsToMake
[toPodId
]
154 // FIXME: mongodb request inside a loop :/
155 Pods
.findById(toPodId
, function (err
, toPod
) {
156 if (err
) return logger
.error('Error finding pod by id.', { err: err
})
158 // Maybe the pod is not our friend anymore so simply remove them
160 Requests
.removePodOf(requestToMake
.ids
, toPodId
)
161 return callbackEach()
164 makeRequest(toPod
, requestToMake
.datas
, function (success
) {
166 logger
.error('Errors when sent request to %s.', toPod
.url
, { error: err
})
167 // Do not stop the process just for one error
168 return callbackEach()
171 if (success
=== true) {
172 logger
.debug('Removing requests for %s pod.', toPodId
, { requestsIds: requestToMake
.ids
})
174 // Remove the pod id of these request ids
175 Requests
.removePodOf(requestToMake
.ids
, toPodId
)
176 goodPods
.push(toPodId
)
178 badPods
.push(toPodId
)
185 // All the requests were made, we update the pods score
186 updatePodsScore(goodPods
, badPods
)
187 // Flush requests with no pod
188 Requests
.removeWithEmptyTo()
193 // Remove pods with a score of 0 (too many requests where they were unreachable)
194 function removeBadPods () {
196 function findBadPods (callback
) {
197 Pods
.findBadPods(function (err
, pods
) {
199 logger
.error('Cannot find bad pods.', { error: err
})
203 return callback(null, pods
)
207 function listVideosOfTheseBadPods (pods
, callback
) {
208 if (pods
.length
=== 0) return callback(null)
210 const urls
= map(pods
, 'url')
211 const ids
= map(pods
, '_id')
213 Videos
.listFromUrls(urls
, function (err
, videosList
) {
215 logger
.error('Cannot list videos urls.', { error: err
, urls: urls
})
216 return callback(null, ids
, [])
219 return callback(null, ids
, videosList
)
223 function removeVideosOfTheseBadPods (podIds
, videosList
, callback
) {
224 // We don't have to remove pods, skip
225 if (typeof podIds
=== 'function') return podIds(null)
227 // Remove the remote videos
228 videos
.removeRemoteVideos(videosList
, function (err
) {
229 if (err
) logger
.error('Cannot remove remote videos.', { error: err
})
231 return callback(null, podIds
)
235 function removeBadPodsFromDB (podIds
, callback
) {
236 // We don't have to remove pods, skip
237 if (typeof podIds
=== 'function') return podIds(null)
239 Pods
.removeAllByIds(podIds
, callback
)
241 ], function (err
, removeResult
) {
243 logger
.error('Cannot remove bad pods.', { error: err
})
244 } else if (removeResult
) {
245 const podsRemoved
= removeResult
.result
.n
246 logger
.info('Removed %d pods.', podsRemoved
)
248 logger
.info('No need to remove bad pods.')
253 function updatePodsScore (goodPods
, badPods
) {
254 logger
.info('Updating %d good pods and %d bad pods scores.', goodPods
.length
, badPods
.length
)
256 Pods
.incrementScores(goodPods
, constants
.PODS_SCORE
.BONUS
, function (err
) {
257 if (err
) logger
.error('Cannot increment scores of good pods.')
260 Pods
.incrementScores(badPods
, constants
.PODS_SCORE
.MALUS
, function (err
) {
261 if (err
) logger
.error('Cannot decrement scores of bad pods.')