]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/requestsScheduler.js
Use async waterfall in request scheduler for better readability
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
1 'use strict'
2
3 const async = require('async')
4 const map = require('lodash/map')
5
6 const constants = require('../initializers/constants')
7 const logger = require('../helpers/logger')
8 const Pods = require('../models/pods')
9 const Requests = require('../models/requests')
10 const requests = require('../helpers/requests')
11 const videos = require('../lib/videos')
12 const Videos = require('../models/videos')
13
14 let timer = null
15
16 const requestsScheduler = {
17 activate: activate,
18 addRequest: addRequest,
19 deactivate: deactivate,
20 forceSend: forceSend
21 }
22
23 function activate () {
24 logger.info('Requests scheduler activated.')
25 timer = setInterval(makeRequests, constants.INTERVAL)
26 }
27
28 function addRequest (id, type, request) {
29 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
30
31 Requests.findById(id, function (err, entity) {
32 if (err) {
33 logger.error('Cannot find one request.', { error: err })
34 return // Abort
35 }
36
37 if (entity) {
38 if (entity.type === type) {
39 logger.error('Cannot insert two same requests.')
40 return // Abort
41 }
42
43 // Remove the request of the other type
44 Requests.removeRequestById(id, function (err) {
45 if (err) {
46 logger.error('Cannot remove a request.', { error: err })
47 return // Abort
48 }
49 })
50 } else {
51 Requests.create(id, type, request, function (err) {
52 if (err) logger.error('Cannot create a request.', { error: err })
53 return // Abort
54 })
55 }
56 })
57 }
58
59 function deactivate () {
60 logger.info('Requests scheduler deactivated.')
61 clearInterval(timer)
62 }
63
64 function forceSend () {
65 logger.info('Force requests scheduler sending.')
66 makeRequests()
67 }
68
69 // ---------------------------------------------------------------------------
70
71 module.exports = requestsScheduler
72
73 // ---------------------------------------------------------------------------
74
75 function makeRequest (type, requestsToMake, callback) {
76 if (!callback) callback = function () {}
77
78 Pods.list(function (err, pods) {
79 if (err) return callback(err)
80
81 const params = {
82 encrypt: true,
83 sign: true,
84 method: 'POST',
85 path: null,
86 data: requestsToMake
87 }
88
89 if (type === 'add') {
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
91 } else if (type === 'remove') {
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
93 } else {
94 return callback(new Error('Unkown pool request type.'))
95 }
96
97 const badPods = []
98 const goodPods = []
99
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101
102 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
103 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
104 badPods.push(pod._id)
105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
106 } else {
107 goodPods.push(pod._id)
108 }
109
110 return callbackEachPodFinished()
111 }
112
113 function callbackAllPodsFinished (err) {
114 if (err) return callback(err)
115
116 updatePodsScore(goodPods, badPods)
117 callback(null)
118 }
119 })
120 }
121
122 function makeRequests () {
123 logger.info('Making requests to friends.')
124
125 Requests.list(function (err, requests) {
126 if (err) {
127 logger.error('Cannot get the list of requests.', { err: err })
128 return // Abort
129 }
130
131 if (requests.length === 0) return
132
133 const requestsToMake = {
134 add: {
135 ids: [],
136 requests: []
137 },
138 remove: {
139 ids: [],
140 requests: []
141 }
142 }
143
144 async.each(requests, function (poolRequest, callbackEach) {
145 if (poolRequest.type === 'add') {
146 requestsToMake.add.requests.push(poolRequest.request)
147 requestsToMake.add.ids.push(poolRequest._id)
148 } else if (poolRequest.type === 'remove') {
149 requestsToMake.remove.requests.push(poolRequest.request)
150 requestsToMake.remove.ids.push(poolRequest._id)
151 } else {
152 logger.error('Unkown request type.', { request_type: poolRequest.type })
153 return // abort
154 }
155
156 callbackEach()
157 }, function () {
158 // Send the add requests
159 if (requestsToMake.add.requests.length !== 0) {
160 makeRequest('add', requestsToMake.add.requests, function (err) {
161 if (err) logger.error('Errors when sent add requests.', { error: err })
162
163 Requests.removeRequests(requestsToMake.add.ids)
164 })
165 }
166
167 // Send the remove requests
168 if (requestsToMake.remove.requests.length !== 0) {
169 makeRequest('remove', requestsToMake.remove.requests, function (err) {
170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
171
172 Requests.removeRequests(requestsToMake.remove.ids)
173 })
174 }
175 })
176 })
177 }
178
179 function removeBadPods () {
180 async.waterfall([
181 function findBadPods (callback) {
182 Pods.findBadPods(function (err, pods) {
183 if (err) {
184 logger.error('Cannot find bad pods.', { error: err })
185 return callback(err)
186 }
187
188 return callback(null, pods)
189 })
190 },
191
192 function listVideosOfTheseBadPods (pods, callback) {
193 if (pods.length === 0) return callback(null)
194
195 const urls = map(pods, 'url')
196 const ids = map(pods, '_id')
197
198 Videos.listFromUrls(urls, function (err, videosList) {
199 if (err) {
200 logger.error('Cannot list videos urls.', { error: err, urls: urls })
201 return callback(null, ids, [])
202 }
203
204 return callback(null, ids, videosList)
205 })
206 },
207
208 function removeVideosOfTheseBadPods (podIds, videosList, callback) {
209 // We don't have to remove pods, skip
210 if (typeof podIds === 'function') return podIds(null)
211
212 // Remove the remote videos
213 videos.removeRemoteVideos(videosList, function (err) {
214 if (err) logger.error('Cannot remove remote videos.', { error: err })
215
216 return callback(null, podIds)
217 })
218 },
219
220 function removeBadPodsFromDB (podIds, callback) {
221 // We don't have to remove pods, skip
222 if (typeof podIds === 'function') return podIds(null)
223
224 Pods.removeAllByIds(podIds, callback)
225 }
226 ], function (err, removeResult) {
227 if (err) {
228 logger.error('Cannot remove bad pods.', { error: err })
229 } else if (removeResult) {
230 const podsRemoved = removeResult.result.n
231 logger.info('Removed %d pods.', podsRemoved)
232 } else {
233 logger.info('No need to remove bad pods.')
234 }
235 })
236 }
237
238 function updatePodsScore (goodPods, badPods) {
239 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
240
241 Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
242 if (err) logger.error('Cannot increment scores of good pods.')
243 })
244
245 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
246 if (err) logger.error('Cannot increment scores of bad pods.')
247 removeBadPods()
248 })
249 }