]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/requestsScheduler.js
Try to make a better communication (between pods) module
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
CommitLineData
9f10b292
C
1'use strict'
2
f0f5567b 3const async = require('async')
35f1c54e 4const map = require('lodash/map')
9f10b292 5
f0f5567b
C
6const constants = require('../initializers/constants')
7const logger = require('../helpers/logger')
8const Pods = require('../models/pods')
e3647ae2 9const Requests = require('../models/requests')
f0f5567b 10const requests = require('../helpers/requests')
cbe2f7c3 11const videos = require('../lib/videos')
f0f5567b 12const Videos = require('../models/videos')
9f10b292 13
f0f5567b 14let timer = null
9f10b292 15
e3647ae2 16const requestsScheduler = {
9f10b292
C
17 activate: activate,
18 addRequest: addRequest,
528a9efa 19 addRequestTo: addRequestTo,
9f10b292 20 deactivate: deactivate,
528a9efa 21 flush: flush,
9f10b292
C
22 forceSend: forceSend
23}
24
25function activate () {
e3647ae2
C
26 logger.info('Requests scheduler activated.')
27 timer = setInterval(makeRequests, constants.INTERVAL)
9f10b292
C
28}
29
8c255eb5 30// Add request to the scheduler
528a9efa
C
31function addRequest (type, data) {
32 logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
9f10b292 33
528a9efa
C
34 const request = {
35 type: type,
36 data: data
37 }
38
39 Pods.listAllIds(function (err, podIds) {
9f10b292 40 if (err) {
528a9efa
C
41 logger.debug('Cannot list pod ids.')
42 return
9f10b292
C
43 }
44
528a9efa
C
45 // No friends
46 if (!podIds) return
47
48 Requests.create(request, podIds, function (err) {
49 if (err) logger.error('Cannot create a request.', { error: err })
50 })
51 })
52}
53
54function addRequestTo (podIds, type, data) {
55 const request = {
56 type: type,
57 data: data
58 }
59
60 Requests.create(request, podIds, function (err) {
61 if (err) logger.error('Cannot create a request.', { error: err })
9f10b292
C
62 })
63}
1fe5076f 64
9f10b292 65function deactivate () {
e3647ae2 66 logger.info('Requests scheduler deactivated.')
9f10b292
C
67 clearInterval(timer)
68}
1fe5076f 69
528a9efa
C
70function flush () {
71 Requests.removeAll(function (err) {
72 if (err) {
73 logger.error('Cannot flush the requests.', { error: err })
74 }
75 })
76}
77
9f10b292 78function forceSend () {
e3647ae2
C
79 logger.info('Force requests scheduler sending.')
80 makeRequests()
9f10b292 81}
c45f7f84 82
9f10b292 83// ---------------------------------------------------------------------------
c45f7f84 84
e3647ae2 85module.exports = requestsScheduler
c45f7f84 86
9f10b292 87// ---------------------------------------------------------------------------
c45f7f84 88
8c255eb5 89// Make a requests to friends of a certain type
528a9efa 90function makeRequest (toPod, requestsToMake, callback) {
9f10b292 91 if (!callback) callback = function () {}
c45f7f84 92
528a9efa
C
93 const params = {
94 toPod: toPod,
95 encrypt: true, // Security
96 sign: true, // To prove our identity
97 method: 'POST',
98 path: '/api/' + constants.API_VERSION + '/remote/videos',
99 data: requestsToMake // Requests we need to make
100 }
101
102 // Make multiple retry requests to all of pods
103 // The function fire some useful callbacks
104 requests.makeSecureRequest(params, function (err, res) {
105 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
106 logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
107
108 return callback(false)
9f10b292 109 }
c45f7f84 110
528a9efa 111 return callback(true)
9f10b292
C
112 })
113}
114
8c255eb5 115// Make all the requests of the scheduler
e3647ae2 116function makeRequests () {
e3647ae2 117 Requests.list(function (err, requests) {
9f10b292 118 if (err) {
e3647ae2 119 logger.error('Cannot get the list of requests.', { err: err })
9f10b292
C
120 return // Abort
121 }
122
8c255eb5
C
123 // If there are no requests, abort
124 if (requests.length === 0) {
125 logger.info('No requests to make.')
126 return
127 }
9f10b292 128
8c255eb5
C
129 logger.info('Making requests to friends.')
130
528a9efa 131 // Requests by pods id
8c255eb5 132 const requestsToMake = {}
9f10b292 133
3c8ee69f 134 requests.forEach(function (poolRequest) {
528a9efa
C
135 poolRequest.to.forEach(function (toPodId) {
136 if (!requestsToMake[toPodId]) {
137 requestsToMake[toPodId] = {
138 ids: [],
139 datas: []
140 }
141 }
142
143 requestsToMake[toPodId].ids.push(poolRequest._id)
144 requestsToMake[toPodId].datas.push(poolRequest.request)
145 })
3c8ee69f 146 })
8d6ae227 147
528a9efa
C
148 const goodPods = []
149 const badPods = []
150
151 async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
152 const requestToMake = requestsToMake[toPodId]
153
154 // FIXME: mongodb request inside a loop :/
155 Pods.findById(toPodId, function (err, toPod) {
156 if (err) return logger.error('Error finding pod by id.', { err: err })
157
158 // Maybe the pod is not our friend anymore so simply remove them
159 if (!toPod) {
160 Requests.removePodOf(requestToMake.ids, toPodId)
161 return callbackEach()
162 }
163
164 makeRequest(toPod, requestToMake.datas, function (success) {
165 if (err) {
166 logger.error('Errors when sent request to %s.', toPod.url, { error: err })
167 // Do not stop the process just for one error
168 return callbackEach()
169 }
170
171 if (success === true) {
172 logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
3c8ee69f 173
528a9efa
C
174 // Remove the pod id of these request ids
175 Requests.removePodOf(requestToMake.ids, toPodId)
176 goodPods.push(toPodId)
177 } else {
178 badPods.push(toPodId)
179 }
180
181 callbackEach()
3c8ee69f 182 })
528a9efa
C
183 })
184 }, function () {
185 // All the requests were made, we update the pods score
186 updatePodsScore(goodPods, badPods)
187 // Flush requests with no pod
188 Requests.removeWithEmptyTo()
189 })
9f10b292
C
190 })
191}
0b697522 192
8c255eb5 193// Remove pods with a score of 0 (too many requests where they were unreachable)
9f10b292 194function removeBadPods () {
e856e334
C
195 async.waterfall([
196 function findBadPods (callback) {
197 Pods.findBadPods(function (err, pods) {
198 if (err) {
199 logger.error('Cannot find bad pods.', { error: err })
200 return callback(err)
201 }
8d6ae227 202
e856e334
C
203 return callback(null, pods)
204 })
205 },
8d6ae227 206
e856e334
C
207 function listVideosOfTheseBadPods (pods, callback) {
208 if (pods.length === 0) return callback(null)
0b697522 209
e856e334
C
210 const urls = map(pods, 'url')
211 const ids = map(pods, '_id')
0b697522 212
e856e334 213 Videos.listFromUrls(urls, function (err, videosList) {
8425cb89 214 if (err) {
e856e334
C
215 logger.error('Cannot list videos urls.', { error: err, urls: urls })
216 return callback(null, ids, [])
8425cb89 217 }
e856e334
C
218
219 return callback(null, ids, videosList)
45239549 220 })
e856e334
C
221 },
222
223 function removeVideosOfTheseBadPods (podIds, videosList, callback) {
224 // We don't have to remove pods, skip
225 if (typeof podIds === 'function') return podIds(null)
226
227 // Remove the remote videos
228 videos.removeRemoteVideos(videosList, function (err) {
229 if (err) logger.error('Cannot remove remote videos.', { error: err })
230
231 return callback(null, podIds)
232 })
233 },
234
235 function removeBadPodsFromDB (podIds, callback) {
236 // We don't have to remove pods, skip
237 if (typeof podIds === 'function') return podIds(null)
238
239 Pods.removeAllByIds(podIds, callback)
240 }
241 ], function (err, removeResult) {
242 if (err) {
243 logger.error('Cannot remove bad pods.', { error: err })
244 } else if (removeResult) {
245 const podsRemoved = removeResult.result.n
246 logger.info('Removed %d pods.', podsRemoved)
247 } else {
248 logger.info('No need to remove bad pods.')
249 }
9f10b292
C
250 })
251}
0b697522 252
bc503c2a
C
253function updatePodsScore (goodPods, badPods) {
254 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
0b697522 255
bc503c2a 256 Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
9f10b292
C
257 if (err) logger.error('Cannot increment scores of good pods.')
258 })
8425cb89 259
bc503c2a 260 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
8c255eb5 261 if (err) logger.error('Cannot decrement scores of bad pods.')
9f10b292
C
262 removeBadPods()
263 })
264}