]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/requestsScheduler.js
Use async waterfall for better readability in friends lib
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
CommitLineData
9f10b292
C
1'use strict'
2
f0f5567b 3const async = require('async')
35f1c54e 4const map = require('lodash/map')
9f10b292 5
f0f5567b
C
6const constants = require('../initializers/constants')
7const logger = require('../helpers/logger')
8const Pods = require('../models/pods')
e3647ae2 9const Requests = require('../models/requests')
f0f5567b 10const requests = require('../helpers/requests')
cbe2f7c3 11const videos = require('../lib/videos')
f0f5567b 12const Videos = require('../models/videos')
9f10b292 13
f0f5567b 14let timer = null
9f10b292 15
e3647ae2 16const requestsScheduler = {
9f10b292
C
17 activate: activate,
18 addRequest: addRequest,
19 deactivate: deactivate,
20 forceSend: forceSend
21}
22
23function activate () {
e3647ae2
C
24 logger.info('Requests scheduler activated.')
25 timer = setInterval(makeRequests, constants.INTERVAL)
9f10b292
C
26}
27
28function addRequest (id, type, request) {
e3647ae2 29 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
9f10b292 30
e3647ae2 31 Requests.findById(id, function (err, entity) {
9f10b292 32 if (err) {
e3647ae2 33 logger.error('Cannot find one request.', { error: err })
9f10b292
C
34 return // Abort
35 }
36
37 if (entity) {
38 if (entity.type === type) {
39 logger.error('Cannot insert two same requests.')
1fe5076f
C
40 return // Abort
41 }
42
9f10b292 43 // Remove the request of the other type
e3647ae2 44 Requests.removeRequestById(id, function (err) {
9f10b292 45 if (err) {
e3647ae2 46 logger.error('Cannot remove a request.', { error: err })
1fe5076f
C
47 return // Abort
48 }
9f10b292
C
49 })
50 } else {
e3647ae2
C
51 Requests.create(id, type, request, function (err) {
52 if (err) logger.error('Cannot create a request.', { error: err })
9f10b292
C
53 return // Abort
54 })
55 }
56 })
57}
1fe5076f 58
9f10b292 59function deactivate () {
e3647ae2 60 logger.info('Requests scheduler deactivated.')
9f10b292
C
61 clearInterval(timer)
62}
1fe5076f 63
9f10b292 64function forceSend () {
e3647ae2
C
65 logger.info('Force requests scheduler sending.')
66 makeRequests()
9f10b292 67}
c45f7f84 68
9f10b292 69// ---------------------------------------------------------------------------
c45f7f84 70
e3647ae2 71module.exports = requestsScheduler
c45f7f84 72
9f10b292 73// ---------------------------------------------------------------------------
c45f7f84 74
bc503c2a 75function makeRequest (type, requestsToMake, callback) {
9f10b292 76 if (!callback) callback = function () {}
c45f7f84 77
9f10b292
C
78 Pods.list(function (err, pods) {
79 if (err) return callback(err)
c45f7f84 80
f0f5567b 81 const params = {
9f10b292
C
82 encrypt: true,
83 sign: true,
84 method: 'POST',
85 path: null,
bc503c2a 86 data: requestsToMake
9f10b292 87 }
c45f7f84 88
9f10b292
C
89 if (type === 'add') {
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
91 } else if (type === 'remove') {
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
93 } else {
94 return callback(new Error('Unkown pool request type.'))
95 }
96
bc503c2a
C
97 const badPods = []
98 const goodPods = []
c45f7f84 99
9f10b292
C
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101
bc503c2a 102 function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
cbe2f7c3 103 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
bc503c2a 104 badPods.push(pod._id)
9f10b292 105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
c45f7f84 106 } else {
bc503c2a 107 goodPods.push(pod._id)
c45f7f84
C
108 }
109
bc503c2a 110 return callbackEachPodFinished()
9f10b292 111 }
c45f7f84 112
9f10b292
C
113 function callbackAllPodsFinished (err) {
114 if (err) return callback(err)
c45f7f84 115
bc503c2a 116 updatePodsScore(goodPods, badPods)
9f10b292
C
117 callback(null)
118 }
119 })
120}
121
e3647ae2
C
122function makeRequests () {
123 logger.info('Making requests to friends.')
9f10b292 124
e3647ae2 125 Requests.list(function (err, requests) {
9f10b292 126 if (err) {
e3647ae2 127 logger.error('Cannot get the list of requests.', { err: err })
9f10b292
C
128 return // Abort
129 }
130
e3647ae2 131 if (requests.length === 0) return
9f10b292 132
bc503c2a 133 const requestsToMake = {
9f10b292
C
134 add: {
135 ids: [],
136 requests: []
137 },
138 remove: {
139 ids: [],
140 requests: []
c45f7f84 141 }
9f10b292
C
142 }
143
bc503c2a
C
144 async.each(requests, function (poolRequest, callbackEach) {
145 if (poolRequest.type === 'add') {
146 requestsToMake.add.requests.push(poolRequest.request)
147 requestsToMake.add.ids.push(poolRequest._id)
148 } else if (poolRequest.type === 'remove') {
149 requestsToMake.remove.requests.push(poolRequest.request)
150 requestsToMake.remove.ids.push(poolRequest._id)
9f10b292 151 } else {
bc503c2a 152 logger.error('Unkown request type.', { request_type: poolRequest.type })
9f10b292 153 return // abort
c45f7f84 154 }
8d6ae227 155
bc503c2a 156 callbackEach()
9f10b292
C
157 }, function () {
158 // Send the add requests
bc503c2a
C
159 if (requestsToMake.add.requests.length !== 0) {
160 makeRequest('add', requestsToMake.add.requests, function (err) {
e3647ae2 161 if (err) logger.error('Errors when sent add requests.', { error: err })
0b697522 162
bc503c2a 163 Requests.removeRequests(requestsToMake.add.ids)
9f10b292 164 })
8425cb89 165 }
0b697522 166
9f10b292 167 // Send the remove requests
bc503c2a
C
168 if (requestsToMake.remove.requests.length !== 0) {
169 makeRequest('remove', requestsToMake.remove.requests, function (err) {
9f10b292 170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
155098af 171
bc503c2a 172 Requests.removeRequests(requestsToMake.remove.ids)
9f10b292 173 })
0b697522 174 }
9f10b292
C
175 })
176 })
177}
0b697522 178
9f10b292
C
179function removeBadPods () {
180 Pods.findBadPods(function (err, pods) {
181 if (err) {
182 logger.error('Cannot find bad pods.', { error: err })
183 return // abort
184 }
8d6ae227 185
9f10b292 186 if (pods.length === 0) return
8d6ae227 187
e7ea0979
C
188 const urls = map(pods, 'url')
189 const ids = map(pods, '_id')
0b697522 190
bc503c2a 191 Videos.listFromUrls(urls, function (err, videosList) {
8425cb89 192 if (err) {
cbe2f7c3 193 logger.error('Cannot list videos urls.', { error: err, urls: urls })
9f10b292 194 } else {
bc503c2a 195 videos.removeRemoteVideos(videosList, function (err) {
cbe2f7c3
C
196 if (err) logger.error('Cannot remove remote videos.', { error: err })
197 })
8425cb89 198 }
0b697522 199
9f10b292 200 Pods.removeAllByIds(ids, function (err, r) {
8425cb89 201 if (err) {
9f10b292 202 logger.error('Cannot remove bad pods.', { error: err })
8425cb89 203 } else {
bc503c2a
C
204 const podsRemoved = r.result.n
205 logger.info('Removed %d pods.', podsRemoved)
8425cb89 206 }
45239549 207 })
0b697522 208 })
9f10b292
C
209 })
210}
0b697522 211
bc503c2a
C
212function updatePodsScore (goodPods, badPods) {
213 logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
0b697522 214
bc503c2a 215 Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
9f10b292
C
216 if (err) logger.error('Cannot increment scores of good pods.')
217 })
8425cb89 218
bc503c2a 219 Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
9f10b292
C
220 if (err) logger.error('Cannot increment scores of bad pods.')
221 removeBadPods()
222 })
223}