]>
git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/models/request.js
3 const async
= require('async')
4 const map
= require('lodash/map')
5 const mongoose
= require('mongoose')
7 const constants
= require('../initializers/constants')
8 const logger
= require('../helpers/logger')
9 const requests
= require('../helpers/requests')
11 const Pod
= mongoose
.model('Pod')
12 const Video
= mongoose
.model('Video')
16 // ---------------------------------------------------------------------------
18 const RequestSchema
= mongoose
.Schema({
19 request: mongoose
.Schema
.Types
.Mixed
,
20 to: [ { type: mongoose
.Schema
.Types
.ObjectId
, ref: 'users' } ]
23 RequestSchema
.statics
= {
30 RequestSchema
.pre('save', function (next
) {
33 if (self
.to
.length
=== 0) {
34 Pod
.listAllIds(function (err
, podIds
) {
35 if (err
) return next(err
)
38 if (podIds
.length
=== 0) return
48 mongoose
.model('Request', RequestSchema
)
50 // ------------------------------ STATICS ------------------------------
52 function activate () {
53 logger
.info('Requests scheduler activated.')
54 timer
= setInterval(makeRequests
.bind(this), constants
.INTERVAL
)
57 function deactivate () {
58 logger
.info('Requests scheduler deactivated.')
63 removeAll
.call(this, function (err
) {
64 if (err
) logger
.error('Cannot flush the requests.', { error: err
})
68 function forceSend () {
69 logger
.info('Force requests scheduler sending.')
70 makeRequests
.call(this)
73 // ---------------------------------------------------------------------------
75 // Make a requests to friends of a certain type
76 function makeRequest (toPod
, requestsToMake
, callback
) {
77 if (!callback
) callback = function () {}
81 encrypt: true, // Security
82 sign: true, // To prove our identity
84 path: '/api/' + constants
.API_VERSION
+ '/remote/videos',
85 data: requestsToMake
// Requests we need to make
88 // Make multiple retry requests to all of pods
89 // The function fire some useful callbacks
90 requests
.makeSecureRequest(params
, function (err
, res
) {
91 if (err
|| (res
.statusCode
!== 200 && res
.statusCode
!== 201 && res
.statusCode
!== 204)) {
92 logger
.error('Error sending secure request to %s pod.', toPod
.url
, { error: err
|| new Error('Status code not 20x') })
94 return callback(false)
101 // Make all the requests of the scheduler
102 function makeRequests () {
105 list
.call(self
, function (err
, requests
) {
107 logger
.error('Cannot get the list of requests.', { err: err
})
111 // If there are no requests, abort
112 if (requests
.length
=== 0) {
113 logger
.info('No requests to make.')
117 logger
.info('Making requests to friends.')
119 // Requests by pods id
120 const requestsToMake
= {}
122 requests
.forEach(function (poolRequest
) {
123 poolRequest
.to
.forEach(function (toPodId
) {
124 if (!requestsToMake
[toPodId
]) {
125 requestsToMake
[toPodId
] = {
131 requestsToMake
[toPodId
].ids
.push(poolRequest
._id
)
132 requestsToMake
[toPodId
].datas
.push(poolRequest
.request
)
139 async
.eachLimit(Object
.keys(requestsToMake
), constants
.REQUESTS_IN_PARALLEL
, function (toPodId
, callbackEach
) {
140 const requestToMake
= requestsToMake
[toPodId
]
142 // FIXME: mongodb request inside a loop :/
143 Pod
.load(toPodId
, function (err
, toPod
) {
145 logger
.error('Error finding pod by id.', { err: err
})
146 return callbackEach()
149 // Maybe the pod is not our friend anymore so simply remove them
151 removePodOf
.call(self
, requestToMake
.ids
, toPodId
)
152 return callbackEach()
155 makeRequest(toPod
, requestToMake
.datas
, function (success
) {
157 logger
.error('Errors when sent request to %s.', toPod
.url
, { error: err
})
158 // Do not stop the process just for one error
159 return callbackEach()
162 if (success
=== true) {
163 logger
.debug('Removing requests for %s pod.', toPodId
, { requestsIds: requestToMake
.ids
})
165 // Remove the pod id of these request ids
166 removePodOf
.call(self
, requestToMake
.ids
, toPodId
)
167 goodPods
.push(toPodId
)
169 badPods
.push(toPodId
)
176 // All the requests were made, we update the pods score
177 updatePodsScore(goodPods
, badPods
)
178 // Flush requests with no pod
179 removeWithEmptyTo
.call(self
)
184 // Remove pods with a score of 0 (too many requests where they were unreachable)
185 function removeBadPods () {
187 function findBadPods (callback
) {
188 Pod
.listBadPods(function (err
, pods
) {
190 logger
.error('Cannot find bad pods.', { error: err
})
194 return callback(null, pods
)
198 function listVideosOfTheseBadPods (pods
, callback
) {
199 if (pods
.length
=== 0) return callback(null)
201 const urls
= map(pods
, 'url')
203 Video
.listByUrls(urls
, function (err
, videosList
) {
205 logger
.error('Cannot list videos urls.', { error: err
, urls: urls
})
206 return callback(null, pods
, [])
209 return callback(null, pods
, videosList
)
213 function removeVideosOfTheseBadPods (pods
, videosList
, callback
) {
214 // We don't have to remove pods, skip
215 if (typeof pods
=== 'function') {
217 return callback(null)
220 async
.each(videosList
, function (video
, callbackEach
) {
221 video
.remove(callbackEach
)
224 // Don't stop the process
225 logger
.error('Error while removing videos of bad pods.', { error: err
})
229 return callback(null, pods
)
233 function removeBadPodsFromDB (pods
, callback
) {
234 // We don't have to remove pods, skip
235 if (typeof pods
=== 'function') {
237 return callback(null)
240 async
.each(pods
, function (pod
, callbackEach
) {
241 pod
.remove(callbackEach
)
243 if (err
) return callback(err
)
245 return callback(null, pods
.length
)
248 ], function (err
, numberOfPodsRemoved
) {
250 logger
.error('Cannot remove bad pods.', { error: err
})
251 } else if (numberOfPodsRemoved
) {
252 logger
.info('Removed %d pods.', numberOfPodsRemoved
)
254 logger
.info('No need to remove bad pods.')
259 function updatePodsScore (goodPods
, badPods
) {
260 logger
.info('Updating %d good pods and %d bad pods scores.', goodPods
.length
, badPods
.length
)
262 Pod
.incrementScores(goodPods
, constants
.PODS_SCORE
.BONUS
, function (err
) {
263 if (err
) logger
.error('Cannot increment scores of good pods.')
266 Pod
.incrementScores(badPods
, constants
.PODS_SCORE
.MALUS
, function (err
) {
267 if (err
) logger
.error('Cannot decrement scores of bad pods.')
272 function list (callback
) {
273 this.find({ }, { _id: 1, request: 1, to: 1 }, callback
)
276 function removeAll (callback
) {
277 this.remove({ }, callback
)
280 function removePodOf (requestsIds
, podId
, callback
) {
281 if (!callback
) callback = function () {}
283 this.update({ _id: { $in: requestsIds
} }, { $pull: { to: podId
} }, { multi: true }, callback
)
286 function removeWithEmptyTo (callback
) {
287 if (!callback
) callback = function () {}
289 this.remove({ to: { $size: 0 } }, callback
)