]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/requestsScheduler.js
Refractoring and add thumbnails support (without tests)
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
1 'use strict'
2
3 const async = require('async')
4 const map = require('lodash/map')
5
6 const constants = require('../initializers/constants')
7 const logger = require('../helpers/logger')
8 const Pods = require('../models/pods')
9 const Requests = require('../models/requests')
10 const requests = require('../helpers/requests')
11 const videos = require('../lib/videos')
12 const Videos = require('../models/videos')
13
14 let timer = null
15
16 const requestsScheduler = {
17 activate: activate,
18 addRequest: addRequest,
19 deactivate: deactivate,
20 forceSend: forceSend
21 }
22
23 function activate () {
24 logger.info('Requests scheduler activated.')
25 timer = setInterval(makeRequests, constants.INTERVAL)
26 }
27
28 function addRequest (id, type, request) {
29 logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
30
31 Requests.findById(id, function (err, entity) {
32 if (err) {
33 logger.error('Cannot find one request.', { error: err })
34 return // Abort
35 }
36
37 if (entity) {
38 if (entity.type === type) {
39 logger.error('Cannot insert two same requests.')
40 return // Abort
41 }
42
43 // Remove the request of the other type
44 Requests.removeRequestById(id, function (err) {
45 if (err) {
46 logger.error('Cannot remove a request.', { error: err })
47 return // Abort
48 }
49 })
50 } else {
51 Requests.create(id, type, request, function (err) {
52 if (err) logger.error('Cannot create a request.', { error: err })
53 return // Abort
54 })
55 }
56 })
57 }
58
59 function deactivate () {
60 logger.info('Requests scheduler deactivated.')
61 clearInterval(timer)
62 }
63
64 function forceSend () {
65 logger.info('Force requests scheduler sending.')
66 makeRequests()
67 }
68
69 // ---------------------------------------------------------------------------
70
71 module.exports = requestsScheduler
72
73 // ---------------------------------------------------------------------------
74
75 function makeRequest (type, requests_to_make, callback) {
76 if (!callback) callback = function () {}
77
78 Pods.list(function (err, pods) {
79 if (err) return callback(err)
80
81 const params = {
82 encrypt: true,
83 sign: true,
84 method: 'POST',
85 path: null,
86 data: requests_to_make
87 }
88
89 if (type === 'add') {
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
91 } else if (type === 'remove') {
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
93 } else {
94 return callback(new Error('Unkown pool request type.'))
95 }
96
97 const bad_pods = []
98 const good_pods = []
99
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101
102 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
103 if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
104 bad_pods.push(pod._id)
105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
106 } else {
107 good_pods.push(pod._id)
108 }
109
110 return callback_each_pod_finished()
111 }
112
113 function callbackAllPodsFinished (err) {
114 if (err) return callback(err)
115
116 updatePodsScore(good_pods, bad_pods)
117 callback(null)
118 }
119 })
120 }
121
122 function makeRequests () {
123 logger.info('Making requests to friends.')
124
125 Requests.list(function (err, requests) {
126 if (err) {
127 logger.error('Cannot get the list of requests.', { err: err })
128 return // Abort
129 }
130
131 if (requests.length === 0) return
132
133 const requests_to_make = {
134 add: {
135 ids: [],
136 requests: []
137 },
138 remove: {
139 ids: [],
140 requests: []
141 }
142 }
143
144 async.each(requests, function (pool_request, callback_each) {
145 if (pool_request.type === 'add') {
146 requests_to_make.add.requests.push(pool_request.request)
147 requests_to_make.add.ids.push(pool_request._id)
148 } else if (pool_request.type === 'remove') {
149 requests_to_make.remove.requests.push(pool_request.request)
150 requests_to_make.remove.ids.push(pool_request._id)
151 } else {
152 logger.error('Unkown request type.', { request_type: pool_request.type })
153 return // abort
154 }
155
156 callback_each()
157 }, function () {
158 // Send the add requests
159 if (requests_to_make.add.requests.length !== 0) {
160 makeRequest('add', requests_to_make.add.requests, function (err) {
161 if (err) logger.error('Errors when sent add requests.', { error: err })
162
163 Requests.removeRequests(requests_to_make.add.ids)
164 })
165 }
166
167 // Send the remove requests
168 if (requests_to_make.remove.requests.length !== 0) {
169 makeRequest('remove', requests_to_make.remove.requests, function (err) {
170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
171
172 Requests.removeRequests(requests_to_make.remove.ids)
173 })
174 }
175 })
176 })
177 }
178
179 function removeBadPods () {
180 Pods.findBadPods(function (err, pods) {
181 if (err) {
182 logger.error('Cannot find bad pods.', { error: err })
183 return // abort
184 }
185
186 if (pods.length === 0) return
187
188 const urls = map(pods, 'url')
189 const ids = map(pods, '_id')
190
191 Videos.listFromUrls(urls, function (err, videos_list) {
192 if (err) {
193 logger.error('Cannot list videos urls.', { error: err, urls: urls })
194 } else {
195 videos.removeRemoteVideos(videos_list, function (err) {
196 if (err) logger.error('Cannot remove remote videos.', { error: err })
197 })
198 }
199
200 Pods.removeAllByIds(ids, function (err, r) {
201 if (err) {
202 logger.error('Cannot remove bad pods.', { error: err })
203 } else {
204 const pods_removed = r.result.n
205 logger.info('Removed %d pods.', pods_removed)
206 }
207 })
208 })
209 })
210 }
211
212 function updatePodsScore (good_pods, bad_pods) {
213 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
214
215 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
216 if (err) logger.error('Cannot increment scores of good pods.')
217 })
218
219 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
220 if (err) logger.error('Cannot increment scores of bad pods.')
221 removeBadPods()
222 })
223 }