]>
git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/requestsScheduler.js
3 const async
= require('async')
4 const map
= require('lodash/map')
6 const constants
= require('../initializers/constants')
7 const logger
= require('../helpers/logger')
8 const Pods
= require('../models/pods')
9 const Requests
= require('../models/requests')
10 const requests
= require('../helpers/requests')
11 const videos
= require('../lib/videos')
12 const Videos
= require('../models/videos')
14 const REQUEST_SCHEDULER_TYPE
= constants
.REQUEST_SCHEDULER_TYPE
17 const requestsScheduler
= {
19 addRequest: addRequest
,
20 deactivate: deactivate
,
24 function activate () {
25 logger
.info('Requests scheduler activated.')
26 timer
= setInterval(makeRequests
, constants
.INTERVAL
)
29 // Add request to the scheduler
30 function addRequest (id
, type
, request
) {
31 logger
.debug('Add request to the requests scheduler.', { id: id
, type: type
, request: request
})
33 Requests
.findById(id
, function (err
, entity
) {
35 logger
.error('Error when trying to find a request.', { error: err
})
39 // If there were already a request with this id in the scheduler...
41 if (entity
.type
=== type
) {
42 logger
.error('Cannot insert two same requests.')
46 // Remove the request of the other type
47 Requests
.removeRequestById(id
, function (err
) {
49 logger
.error('Cannot remove a request.', { error: err
})
54 Requests
.create(id
, type
, request
, function (err
) {
55 if (err
) logger
.error('Cannot create a request.', { error: err
})
62 function deactivate () {
63 logger
.info('Requests scheduler deactivated.')
67 function forceSend () {
68 logger
.info('Force requests scheduler sending.')
72 // ---------------------------------------------------------------------------
74 module
.exports
= requestsScheduler
76 // ---------------------------------------------------------------------------
78 // Make a requests to friends of a certain type
79 function makeRequest (type
, requestsToMake
, callback
) {
80 if (!callback
) callback = function () {}
82 Pods
.list(function (err
, pods
) {
83 if (err
) return callback(err
)
86 encrypt: true, // Security
87 sign: true, // To prove our identity
89 path: null, // We build the path later
90 data: requestsToMake
// Requests we need to make
93 // If this is a valid type, we build the path
94 if (REQUEST_SCHEDULER_TYPE
.indexOf(type
) > -1) {
95 params
.path
= '/api/' + constants
.API_VERSION
+ '/remotevideos/' + type
97 return callback(new Error('Unkown pool request type.'))
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
105 requests
.makeMultipleRetryRequest(params
, pods
, callbackEachPodFinished
, callbackAllPodsFinished
)
107 function callbackEachPodFinished (err
, response
, body
, url
, pod
, callbackEachPodFinished
) {
108 // We failed the request, add the pod unreachable to the bad pods list
109 if (err
|| (response
.statusCode
!== 200 && response
.statusCode
!== 201 && response
.statusCode
!== 204)) {
110 badPods
.push(pod
._id
)
111 logger
.error('Error sending secure request to %s pod.', url
, { error: err
|| new Error('Status code not 20x') })
114 goodPods
.push(pod
._id
)
117 return callbackEachPodFinished()
120 function callbackAllPodsFinished (err
) {
121 if (err
) return callback(err
)
123 // All the requests were made, we update the pods score
124 updatePodsScore(goodPods
, badPods
)
130 // Make all the requests of the scheduler
131 function makeRequests () {
132 Requests
.list(function (err
, requests
) {
134 logger
.error('Cannot get the list of requests.', { err: err
})
138 // If there are no requests, abort
139 if (requests
.length
=== 0) {
140 logger
.info('No requests to make.')
144 logger
.info('Making requests to friends.')
146 const requestsToMake
= {}
147 for (const type
of REQUEST_SCHEDULER_TYPE
) {
148 requestsToMake
[type
] = {
154 // For each requests to make, we add it to the correct request type
155 async
.each(requests
, function (poolRequest
, callbackEach
) {
156 if (REQUEST_SCHEDULER_TYPE
.indexOf(poolRequest
.type
) > -1) {
157 const requestTypeToMake
= requestsToMake
[poolRequest
.type
]
158 requestTypeToMake
.requests
.push(poolRequest
.request
)
159 requestTypeToMake
.ids
.push(poolRequest
._id
)
161 logger
.error('Unkown request type.', { request_type: poolRequest
.type
})
167 for (let type
of Object
.keys(requestsToMake
)) {
168 const requestTypeToMake
= requestsToMake
[type
]
169 // If there are requests for this type
170 if (requestTypeToMake
.requests
.length
!== 0) {
171 makeRequest(type
, requestTypeToMake
.requests
, function (err
) {
172 if (err
) logger
.error('Errors when sent ' + type
+ ' requests.', { error: err
})
174 // We made the requests, so we can remove them from the scheduler
175 Requests
.removeRequests(requestTypeToMake
.ids
)
183 // Remove pods with a score of 0 (too many requests where they were unreachable)
184 function removeBadPods () {
186 function findBadPods (callback
) {
187 Pods
.findBadPods(function (err
, pods
) {
189 logger
.error('Cannot find bad pods.', { error: err
})
193 return callback(null, pods
)
197 function listVideosOfTheseBadPods (pods
, callback
) {
198 if (pods
.length
=== 0) return callback(null)
200 const urls
= map(pods
, 'url')
201 const ids
= map(pods
, '_id')
203 Videos
.listFromUrls(urls
, function (err
, videosList
) {
205 logger
.error('Cannot list videos urls.', { error: err
, urls: urls
})
206 return callback(null, ids
, [])
209 return callback(null, ids
, videosList
)
213 function removeVideosOfTheseBadPods (podIds
, videosList
, callback
) {
214 // We don't have to remove pods, skip
215 if (typeof podIds
=== 'function') return podIds(null)
217 // Remove the remote videos
218 videos
.removeRemoteVideos(videosList
, function (err
) {
219 if (err
) logger
.error('Cannot remove remote videos.', { error: err
})
221 return callback(null, podIds
)
225 function removeBadPodsFromDB (podIds
, callback
) {
226 // We don't have to remove pods, skip
227 if (typeof podIds
=== 'function') return podIds(null)
229 Pods
.removeAllByIds(podIds
, callback
)
231 ], function (err
, removeResult
) {
233 logger
.error('Cannot remove bad pods.', { error: err
})
234 } else if (removeResult
) {
235 const podsRemoved
= removeResult
.result
.n
236 logger
.info('Removed %d pods.', podsRemoved
)
238 logger
.info('No need to remove bad pods.')
243 function updatePodsScore (goodPods
, badPods
) {
244 logger
.info('Updating %d good pods and %d bad pods scores.', goodPods
.length
, badPods
.length
)
246 Pods
.incrementScores(goodPods
, constants
.PODS_SCORE
.BONUS
, function (err
) {
247 if (err
) logger
.error('Cannot increment scores of good pods.')
250 Pods
.incrementScores(badPods
, constants
.PODS_SCORE
.MALUS
, function (err
) {
251 if (err
) logger
.error('Cannot decrement scores of bad pods.')