]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.js
Server: requests refractoring
[github/Chocobozzz/PeerTube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
16
17 const friends = {
18 addVideoToFriends,
19 updateVideoToFriends,
20 reportAbuseVideoToFriend,
21 hasFriends,
22 getMyCertificate,
23 makeFriends,
24 quitFriends,
25 removeVideoToFriends,
26 sendOwnedVideosToPod
27 }
28
29 function addVideoToFriends (videoData, transaction, callback) {
30 const options = {
31 type: ENDPOINT_ACTIONS.ADD,
32 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
33 data: videoData,
34 transaction
35 }
36 createRequest(options, callback)
37 }
38
39 function updateVideoToFriends (videoData, transaction, callback) {
40 const options = {
41 type: ENDPOINT_ACTIONS.UPDATE,
42 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
43 data: videoData,
44 transaction
45 }
46 createRequest(options, callback)
47 }
48
49 function removeVideoToFriends (videoParams) {
50 const options = {
51 type: ENDPOINT_ACTIONS.REMOVE,
52 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
53 data: videoParams
54 }
55 createRequest(options)
56 }
57
58 function reportAbuseVideoToFriend (reportData, video) {
59 const options = {
60 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
61 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
62 data: reportData,
63 toIds: [ video.Author.podId ]
64 }
65 createRequest(options)
66 }
67
68 function hasFriends (callback) {
69 db.Pod.countAll(function (err, count) {
70 if (err) return callback(err)
71
72 const hasFriends = (count !== 0)
73 callback(null, hasFriends)
74 })
75 }
76
77 function getMyCertificate (callback) {
78 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
79 }
80
81 function makeFriends (hosts, callback) {
82 const podsScore = {}
83
84 logger.info('Make friends!')
85 getMyCertificate(function (err, cert) {
86 if (err) {
87 logger.error('Cannot read public cert.')
88 return callback(err)
89 }
90
91 eachSeries(hosts, function (host, callbackEach) {
92 computeForeignPodsList(host, podsScore, callbackEach)
93 }, function (err) {
94 if (err) return callback(err)
95
96 logger.debug('Pods scores computed.', { podsScore: podsScore })
97 const podsList = computeWinningPods(hosts, podsScore)
98 logger.debug('Pods that we keep.', { podsToKeep: podsList })
99
100 makeRequestsToWinningPods(cert, podsList, callback)
101 })
102 })
103 }
104
105 function quitFriends (callback) {
106 // Stop pool requests
107 db.Request.deactivate()
108
109 waterfall([
110 function flushRequests (callbackAsync) {
111 db.Request.flush(callbackAsync)
112 },
113
114 function getPodsList (callbackAsync) {
115 return db.Pod.list(callbackAsync)
116 },
117
118 function announceIQuitMyFriends (pods, callbackAsync) {
119 const requestParams = {
120 method: 'POST',
121 path: '/api/' + constants.API_VERSION + '/pods/remove',
122 sign: true
123 }
124
125 // Announce we quit them
126 // We don't care if the request fails
127 // The other pod will exclude us automatically after a while
128 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
129 requestParams.toPod = pod
130 requests.makeSecureRequest(requestParams, callbackEach)
131 }, function (err) {
132 if (err) {
133 logger.error('Some errors while quitting friends.', { err: err })
134 // Don't stop the process
135 }
136
137 return callbackAsync(null, pods)
138 })
139 },
140
141 function removePodsFromDB (pods, callbackAsync) {
142 each(pods, function (pod, callbackEach) {
143 pod.destroy().asCallback(callbackEach)
144 }, callbackAsync)
145 }
146 ], function (err) {
147 // Don't forget to re activate the scheduler, even if there was an error
148 db.Request.activate()
149
150 if (err) return callback(err)
151
152 logger.info('Removed all remote videos.')
153 return callback(null)
154 })
155 }
156
157 function sendOwnedVideosToPod (podId) {
158 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
159 if (err) {
160 logger.error('Cannot get the list of videos we own.')
161 return
162 }
163
164 videosList.forEach(function (video) {
165 video.toAddRemoteJSON(function (err, remoteVideo) {
166 if (err) {
167 logger.error('Cannot convert video to remote.', { error: err })
168 // Don't break the process
169 return
170 }
171
172 const options = {
173 type: 'add',
174 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
175 data: remoteVideo,
176 toIds: [ podId ]
177 }
178 createRequest(options)
179 })
180 })
181 })
182 }
183
184 // ---------------------------------------------------------------------------
185
186 module.exports = friends
187
188 // ---------------------------------------------------------------------------
189
190 function computeForeignPodsList (host, podsScore, callback) {
191 getForeignPodsList(host, function (err, res) {
192 if (err) return callback(err)
193
194 const foreignPodsList = res.data
195
196 // Let's give 1 point to the pod we ask the friends list
197 foreignPodsList.push({ host })
198
199 foreignPodsList.forEach(function (foreignPod) {
200 const foreignPodHost = foreignPod.host
201
202 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
203 else podsScore[foreignPodHost] = 1
204 })
205
206 return callback()
207 })
208 }
209
210 function computeWinningPods (hosts, podsScore) {
211 // Build the list of pods to add
212 // Only add a pod if it exists in more than a half base pods
213 const podsList = []
214 const baseScore = hosts.length / 2
215
216 Object.keys(podsScore).forEach(function (podHost) {
217 // If the pod is not me and with a good score we add it
218 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
219 podsList.push({ host: podHost })
220 }
221 })
222
223 return podsList
224 }
225
226 function getForeignPodsList (host, callback) {
227 const path = '/api/' + constants.API_VERSION + '/pods'
228
229 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
230 if (err) return callback(err)
231
232 try {
233 const json = JSON.parse(body)
234 return callback(null, json)
235 } catch (err) {
236 return callback(err)
237 }
238 })
239 }
240
241 function makeRequestsToWinningPods (cert, podsList, callback) {
242 // Stop pool requests
243 db.Request.deactivate()
244 // Flush pool requests
245 db.Request.forceSend()
246
247 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
248 const params = {
249 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
250 method: 'POST',
251 json: {
252 host: constants.CONFIG.WEBSERVER.HOST,
253 publicKey: cert
254 }
255 }
256
257 requests.makeRetryRequest(params, function (err, res, body) {
258 if (err) {
259 logger.error('Error with adding %s pod.', pod.host, { error: err })
260 // Don't break the process
261 return callbackEach()
262 }
263
264 if (res.statusCode === 200) {
265 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
266 podObj.save().asCallback(function (err, podCreated) {
267 if (err) {
268 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
269 return callbackEach()
270 }
271
272 // Add our videos to the request scheduler
273 sendOwnedVideosToPod(podCreated.id)
274
275 return callbackEach()
276 })
277 } else {
278 logger.error('Status not 200 for %s pod.', pod.host)
279 return callbackEach()
280 }
281 })
282 }, function endRequests () {
283 // Final callback, we've ended all the requests
284 // Now we made new friends, we can re activate the pool of requests
285 db.Request.activate()
286
287 logger.debug('makeRequestsToWinningPods finished.')
288 return callback()
289 })
290 }
291
292 // Wrapper that populate "toIds" argument with all our friends if it is not specified
293 // { type, endpoint, data, toIds, transaction }
294 function createRequest (options, callback) {
295 if (!callback) callback = function () {}
296 if (options.toIds) return _createRequest(options, callback)
297
298 // If the "toIds" pods is not specified, we send the request to all our friends
299 db.Pod.listAllIds(options.transaction, function (err, podIds) {
300 if (err) {
301 logger.error('Cannot get pod ids', { error: err })
302 return
303 }
304
305 const newOptions = Object.assign(options, { toIds: podIds })
306 return _createRequest(newOptions, callback)
307 })
308 }
309
310 // { type, endpoint, data, toIds, transaction }
311 function _createRequest (options, callback) {
312 const type = options.type
313 const endpoint = options.endpoint
314 const data = options.data
315 const toIds = options.toIds
316 const transaction = options.transaction
317
318 const pods = []
319
320 // If there are no destination pods abort
321 if (toIds.length === 0) return callback(null)
322
323 toIds.forEach(function (toPod) {
324 pods.push(db.Pod.build({ id: toPod }))
325 })
326
327 const createQuery = {
328 endpoint,
329 request: {
330 type: type,
331 data: data
332 }
333 }
334
335 const dbRequestOptions = {
336 transaction
337 }
338
339 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
340 if (err) return callback(err)
341
342 return request.setPods(pods, dbRequestOptions).asCallback(callback)
343 })
344 }
345
346 function isMe (host) {
347 return host === constants.CONFIG.WEBSERVER.HOST
348 }