]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/requestsScheduler.js
Video model refractoring -> use mongoose api
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
CommitLineData
9f10b292
C
1'use strict'
2
f0f5567b 3const async = require('async')
35f1c54e 4const map = require('lodash/map')
aaf61f38 5const mongoose = require('mongoose')
9f10b292 6
f0f5567b
C
7const constants = require('../initializers/constants')
8const logger = require('../helpers/logger')
9const Pods = require('../models/pods')
e3647ae2 10const Requests = require('../models/requests')
f0f5567b 11const requests = require('../helpers/requests')
aaf61f38
C
12
13const Video = mongoose.model('Video')
9f10b292 14
f0f5567b 15let timer = null
9f10b292 16
e3647ae2 17const requestsScheduler = {
9f10b292
C
18 activate: activate,
19 addRequest: addRequest,
528a9efa 20 addRequestTo: addRequestTo,
9f10b292 21 deactivate: deactivate,
528a9efa 22 flush: flush,
9f10b292
C
23 forceSend: forceSend
24}
25
26function activate () {
e3647ae2
C
27 logger.info('Requests scheduler activated.')
28 timer = setInterval(makeRequests, constants.INTERVAL)
9f10b292
C
29}
30
8c255eb5 31// Add request to the scheduler
528a9efa
C
32function addRequest (type, data) {
33 logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
9f10b292 34
528a9efa
C
35 const request = {
36 type: type,
37 data: data
38 }
39
40 Pods.listAllIds(function (err, podIds) {
9f10b292 41 if (err) {
528a9efa
C
42 logger.debug('Cannot list pod ids.')
43 return
9f10b292
C
44 }
45
528a9efa
C
46 // No friends
47 if (!podIds) return
48
49 Requests.create(request, podIds, function (err) {
50 if (err) logger.error('Cannot create a request.', { error: err })
51 })
52 })
53}
54
55function addRequestTo (podIds, type, data) {
56 const request = {
57 type: type,
58 data: data
59 }
60
61 Requests.create(request, podIds, function (err) {
62 if (err) logger.error('Cannot create a request.', { error: err })
9f10b292
C
63 })
64}
1fe5076f 65
9f10b292 66function deactivate () {
e3647ae2 67 logger.info('Requests scheduler deactivated.')
9f10b292
C
68 clearInterval(timer)
69}
1fe5076f 70
528a9efa
C
71function flush () {
72 Requests.removeAll(function (err) {
73 if (err) {
74 logger.error('Cannot flush the requests.', { error: err })
75 }
76 })
77}
78
9f10b292 79function forceSend () {
e3647ae2
C
80 logger.info('Force requests scheduler sending.')
81 makeRequests()
9f10b292 82}
c45f7f84 83
9f10b292 84// ---------------------------------------------------------------------------
c45f7f84 85
e3647ae2 86module.exports = requestsScheduler
c45f7f84 87
9f10b292 88// ---------------------------------------------------------------------------
c45f7f84 89
8c255eb5 90// Make a requests to friends of a certain type
528a9efa 91function makeRequest (toPod, requestsToMake, callback) {
9f10b292 92 if (!callback) callback = function () {}
c45f7f84 93
528a9efa
C
94 const params = {
95 toPod: toPod,
96 encrypt: true, // Security
97 sign: true, // To prove our identity
98 method: 'POST',
99 path: '/api/' + constants.API_VERSION + '/remote/videos',
100 data: requestsToMake // Requests we need to make
101 }
102
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
105 requests.makeSecureRequest(params, function (err, res) {
106 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
107 logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
108
109 return callback(false)
9f10b292 110 }
c45f7f84 111
528a9efa 112 return callback(true)
9f10b292
C
113 })
114}
115
8c255eb5 116// Make all the requests of the scheduler
e3647ae2 117function makeRequests () {
e3647ae2 118 Requests.list(function (err, requests) {
9f10b292 119 if (err) {
e3647ae2 120 logger.error('Cannot get the list of requests.', { err: err })
9f10b292
C
121 return // Abort
122 }
123
8c255eb5
C
124 // If there are no requests, abort
125 if (requests.length === 0) {
126 logger.info('No requests to make.')
127 return
128 }
9f10b292 129
8c255eb5
C
130 logger.info('Making requests to friends.')
131
528a9efa 132 // Requests by pods id
8c255eb5 133 const requestsToMake = {}
9f10b292 134
3c8ee69f 135 requests.forEach(function (poolRequest) {
528a9efa
C
136 poolRequest.to.forEach(function (toPodId) {
137 if (!requestsToMake[toPodId]) {
138 requestsToMake[toPodId] = {
139 ids: [],
140 datas: []
141 }
142 }
143
144 requestsToMake[toPodId].ids.push(poolRequest._id)
145 requestsToMake[toPodId].datas.push(poolRequest.request)
146 })
3c8ee69f 147 })
8d6ae227 148
528a9efa
C
149 const goodPods = []
150 const badPods = []
151
152 async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
153 const requestToMake = requestsToMake[toPodId]
154
155 // FIXME: mongodb request inside a loop :/
156 Pods.findById(toPodId, function (err, toPod) {
157 if (err) return logger.error('Error finding pod by id.', { err: err })
158
159 // Maybe the pod is not our friend anymore so simply remove them
160 if (!toPod) {
161 Requests.removePodOf(requestToMake.ids, toPodId)
162 return callbackEach()
163 }
164
165 makeRequest(toPod, requestToMake.datas, function (success) {
166 if (err) {
167 logger.error('Errors when sent request to %s.', toPod.url, { error: err })
168 // Do not stop the process just for one error
169 return callbackEach()
170 }
171
172 if (success === true) {
173 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
3c8ee69f 174
528a9efa
C
175 // Remove the pod id of these request ids
176 Requests.removePodOf(requestToMake.ids, toPodId)
177 goodPods.push(toPodId)
178 } else {
179 badPods.push(toPodId)
180 }
181
182 callbackEach()
3c8ee69f 183 })
528a9efa
C
184 })
185 }, function () {
186 // All the requests were made, we update the pods score
187 updatePodsScore(goodPods, badPods)
188 // Flush requests with no pod
189 Requests.removeWithEmptyTo()
190 })
9f10b292
C
191 })
192}
0b697522 193
8c255eb5 194// Remove pods with a score of 0 (too many requests where they were unreachable)
9f10b292 195function removeBadPods () {
e856e334
C
196 async.waterfall([
197 function findBadPods (callback) {
198 Pods.findBadPods(function (err, pods) {
199 if (err) {
200 logger.error('Cannot find bad pods.', { error: err })
201 return callback(err)
202 }
8d6ae227 203
e856e334
C
204 return callback(null, pods)
205 })
206 },
8d6ae227 207
e856e334
C
208 function listVideosOfTheseBadPods (pods, callback) {
209 if (pods.length === 0) return callback(null)
0b697522 210
e856e334
C
211 const urls = map(pods, 'url')
212 const ids = map(pods, '_id')
0b697522 213
aaf61f38 214 Video.listByUrls(urls, function (err, videosList) {
8425cb89 215 if (err) {
e856e334
C
216 logger.error('Cannot list videos urls.', { error: err, urls: urls })
217 return callback(null, ids, [])
8425cb89 218 }
e856e334
C
219
220 return callback(null, ids, videosList)
45239549 221 })
e856e334
C
222 },
223
224 function removeVideosOfTheseBadPods (podIds, videosList, callback) {
225 // We don't have to remove pods, skip
226 if (typeof podIds === 'function') return podIds(null)
227
aaf61f38
C
228 async.each(videosList, function (video, callbackEach) {
229 video.remove(callbackEach)
230 }, function (err) {
231 if (err) {
232 // Don't stop the process
233 logger.error('Error while removing videos of bad pods.', { error: err })
234 return
235 }
e856e334
C
236
237 return callback(null, podIds)
238 })
239 },
240
241 function removeBadPodsFromDB (podIds, callback) {
242 // We don't have to remove pods, skip
243 if (typeof podIds === 'function') return podIds(null)
244
245 Pods.removeAllByIds(podIds, callback)
246 }
247 ], function (err, removeResult) {
248 if (err) {
249 logger.error('Cannot remove bad pods.', { error: err })
250 } else if (removeResult) {
251 const podsRemoved = removeResult.result.n
252 logger.info('Removed %d pods.', podsRemoved)
253 } else {
254 logger.info('No need to remove bad pods.')
255 }
9f10b292
C
256 })
257}
0b697522 258
bc503c2a
C
259function updatePodsScore (goodPods, badPods) {
260 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
0b697522 261
bc503c2a 262 Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
9f10b292
C
263 if (err) logger.error('Cannot increment scores of good pods.')
264 })
8425cb89 265
bc503c2a 266 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
8c255eb5 267 if (err) logger.error('Cannot decrement scores of bad pods.')
9f10b292
C
268 removeBadPods()
269 })
270}