]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/requestsScheduler.js
Remove useless use of async.each
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
1 'use strict'
2
3 const async = require('async')
4 const map = require('lodash/map')
5
6 const constants = require('../initializers/constants')
7 const logger = require('../helpers/logger')
8 const Pods = require('../models/pods')
9 const Requests = require('../models/requests')
10 const requests = require('../helpers/requests')
11 const videos = require('../lib/videos')
12 const Videos = require('../models/videos')
13
14 const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
15 let timer = null
16
17 const requestsScheduler = {
18 activate: activate,
19 addRequest: addRequest,
20 deactivate: deactivate,
21 forceSend: forceSend
22 }
23
24 function activate () {
25 logger.info('Requests scheduler activated.')
26 timer = setInterval(makeRequests, constants.INTERVAL)
27 }
28
29 // Add request to the scheduler
30 function addRequest (id, type, request) {
31 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
32
33 Requests.findById(id, function (err, entity) {
34 if (err) {
35 logger.error('Error when trying to find a request.', { error: err })
36 return // Abort
37 }
38
39 // If there were already a request with this id in the scheduler...
40 if (entity) {
41 if (entity.type === type) {
42 logger.error('Cannot insert two same requests.')
43 return // Abort
44 }
45
46 // Remove the request of the other type
47 Requests.removeRequestById(id, function (err) {
48 if (err) {
49 logger.error('Cannot remove a request.', { error: err })
50 return // Abort
51 }
52 })
53 } else {
54 Requests.create(id, type, request, function (err) {
55 if (err) logger.error('Cannot create a request.', { error: err })
56 return // Abort
57 })
58 }
59 })
60 }
61
62 function deactivate () {
63 logger.info('Requests scheduler deactivated.')
64 clearInterval(timer)
65 }
66
67 function forceSend () {
68 logger.info('Force requests scheduler sending.')
69 makeRequests()
70 }
71
72 // ---------------------------------------------------------------------------
73
74 module.exports = requestsScheduler
75
76 // ---------------------------------------------------------------------------
77
78 // Make a requests to friends of a certain type
79 function makeRequest (type, requestsToMake, callback) {
80 if (!callback) callback = function () {}
81
82 Pods.list(function (err, pods) {
83 if (err) return callback(err)
84
85 const params = {
86 encrypt: true, // Security
87 sign: true, // To prove our identity
88 method: 'POST',
89 path: null, // We build the path later
90 data: requestsToMake // Requests we need to make
91 }
92
93 // If this is a valid type, we build the path
94 if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) {
95 params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type
96 } else {
97 return callback(new Error('Unkown pool request type.'))
98 }
99
100 const badPods = []
101 const goodPods = []
102
103 // Make multiple retry requests to all of pods
104 // The function fire some useful callbacks
105 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
106
107 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
108 // We failed the request, add the pod unreachable to the bad pods list
109 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
110 badPods.push(pod._id)
111 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
112 } else {
113 // Request success
114 goodPods.push(pod._id)
115 }
116
117 return callbackEachPodFinished()
118 }
119
120 function callbackAllPodsFinished (err) {
121 if (err) return callback(err)
122
123 // All the requests were made, we update the pods score
124 updatePodsScore(goodPods, badPods)
125 callback(null)
126 }
127 })
128 }
129
130 // Make all the requests of the scheduler
131 function makeRequests () {
132 Requests.list(function (err, requests) {
133 if (err) {
134 logger.error('Cannot get the list of requests.', { err: err })
135 return // Abort
136 }
137
138 // If there are no requests, abort
139 if (requests.length === 0) {
140 logger.info('No requests to make.')
141 return
142 }
143
144 logger.info('Making requests to friends.')
145
146 const requestsToMake = {}
147 for (const type of REQUEST_SCHEDULER_TYPE) {
148 requestsToMake[type] = {
149 ids: [],
150 requests: []
151 }
152 }
153
154 // For each requests to make, we add it to the correct request type
155 requests.forEach(function (poolRequest) {
156 if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) {
157 const requestTypeToMake = requestsToMake[poolRequest.type]
158 requestTypeToMake.requests.push(poolRequest.request)
159 requestTypeToMake.ids.push(poolRequest._id)
160 } else {
161 logger.error('Unkown request type.', { request_type: poolRequest.type })
162 return // abort
163 }
164 })
165
166 for (let type of Object.keys(requestsToMake)) {
167 const requestTypeToMake = requestsToMake[type]
168 // If there are requests for this type
169 if (requestTypeToMake.requests.length !== 0) {
170 makeRequest(type, requestTypeToMake.requests, function (err) {
171 if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
172
173 // We made the requests, so we can remove them from the scheduler
174 Requests.removeRequests(requestTypeToMake.ids)
175 })
176 }
177 }
178 })
179 }
180
181 // Remove pods with a score of 0 (too many requests where they were unreachable)
182 function removeBadPods () {
183 async.waterfall([
184 function findBadPods (callback) {
185 Pods.findBadPods(function (err, pods) {
186 if (err) {
187 logger.error('Cannot find bad pods.', { error: err })
188 return callback(err)
189 }
190
191 return callback(null, pods)
192 })
193 },
194
195 function listVideosOfTheseBadPods (pods, callback) {
196 if (pods.length === 0) return callback(null)
197
198 const urls = map(pods, 'url')
199 const ids = map(pods, '_id')
200
201 Videos.listFromUrls(urls, function (err, videosList) {
202 if (err) {
203 logger.error('Cannot list videos urls.', { error: err, urls: urls })
204 return callback(null, ids, [])
205 }
206
207 return callback(null, ids, videosList)
208 })
209 },
210
211 function removeVideosOfTheseBadPods (podIds, videosList, callback) {
212 // We don't have to remove pods, skip
213 if (typeof podIds === 'function') return podIds(null)
214
215 // Remove the remote videos
216 videos.removeRemoteVideos(videosList, function (err) {
217 if (err) logger.error('Cannot remove remote videos.', { error: err })
218
219 return callback(null, podIds)
220 })
221 },
222
223 function removeBadPodsFromDB (podIds, callback) {
224 // We don't have to remove pods, skip
225 if (typeof podIds === 'function') return podIds(null)
226
227 Pods.removeAllByIds(podIds, callback)
228 }
229 ], function (err, removeResult) {
230 if (err) {
231 logger.error('Cannot remove bad pods.', { error: err })
232 } else if (removeResult) {
233 const podsRemoved = removeResult.result.n
234 logger.info('Removed %d pods.', podsRemoved)
235 } else {
236 logger.info('No need to remove bad pods.')
237 }
238 })
239 }
240
241 function updatePodsScore (goodPods, badPods) {
242 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
243
244 Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
245 if (err) logger.error('Cannot increment scores of good pods.')
246 })
247
248 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
249 if (err) logger.error('Cannot decrement scores of bad pods.')
250 removeBadPods()
251 })
252 }