aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/friends.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/friends.ts')
-rw-r--r--server/lib/friends.ts410
1 files changed, 410 insertions, 0 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts
new file mode 100644
index 000000000..b32783019
--- /dev/null
+++ b/server/lib/friends.ts
@@ -0,0 +1,410 @@
1import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2import request = require('request')
3
4const db = require('../initializers/database')
5import {
6 API_VERSION,
7 CONFIG,
8 REQUESTS_IN_PARALLEL,
9 REQUEST_ENDPOINTS,
10 REQUEST_ENDPOINT_ACTIONS,
11 REMOTE_SCHEME
12} from '../initializers'
13import {
14 logger,
15 getMyPublicCert,
16 makeSecureRequest,
17 makeRetryRequest,
18 createEmptyCallback
19} from '../helpers'
20import {
21 RequestScheduler,
22 RequestVideoQaduScheduler,
23 RequestVideoEventScheduler
24} from './request'
25
26const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
27
28const requestScheduler = new RequestScheduler()
29const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
30const requestVideoEventScheduler = new RequestVideoEventScheduler()
31
32function activateSchedulers () {
33 requestScheduler.activate()
34 requestVideoQaduScheduler.activate()
35 requestVideoEventScheduler.activate()
36}
37
38function addVideoToFriends (videoData, transaction, callback) {
39 const options = {
40 type: ENDPOINT_ACTIONS.ADD,
41 endpoint: REQUEST_ENDPOINTS.VIDEOS,
42 data: videoData,
43 transaction
44 }
45 createRequest(options, callback)
46}
47
48function updateVideoToFriends (videoData, transaction, callback) {
49 const options = {
50 type: ENDPOINT_ACTIONS.UPDATE,
51 endpoint: REQUEST_ENDPOINTS.VIDEOS,
52 data: videoData,
53 transaction
54 }
55 createRequest(options, callback)
56}
57
58function removeVideoToFriends (videoParams) {
59 const options = {
60 type: ENDPOINT_ACTIONS.REMOVE,
61 endpoint: REQUEST_ENDPOINTS.VIDEOS,
62 data: videoParams
63 }
64 createRequest(options)
65}
66
67function reportAbuseVideoToFriend (reportData, video) {
68 const options = {
69 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
70 endpoint: REQUEST_ENDPOINTS.VIDEOS,
71 data: reportData,
72 toIds: [ video.Author.podId ]
73 }
74 createRequest(options)
75}
76
77function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction?, callback?) {
78 const options = {
79 videoId: qaduParams.videoId,
80 type: qaduParams.type,
81 transaction
82 }
83 return createVideoQaduRequest(options, callback)
84}
85
86function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) {
87 const tasks = []
88
89 qadusParams.forEach(function (qaduParams) {
90 const fun = function (callback) {
91 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
92 }
93
94 tasks.push(fun)
95 })
96
97 series(tasks, finalCallback)
98}
99
100function addEventToRemoteVideo (eventParams, transaction?, callback?) {
101 const options = {
102 videoId: eventParams.videoId,
103 type: eventParams.type,
104 transaction
105 }
106 createVideoEventRequest(options, callback)
107}
108
109function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) {
110 const tasks = []
111
112 eventsParams.forEach(function (eventParams) {
113 const fun = function (callback) {
114 addEventToRemoteVideo(eventParams, transaction, callback)
115 }
116
117 tasks.push(fun)
118 })
119
120 series(tasks, finalCallback)
121}
122
123function hasFriends (callback) {
124 db.Pod.countAll(function (err, count) {
125 if (err) return callback(err)
126
127 const hasFriends = (count !== 0)
128 callback(null, hasFriends)
129 })
130}
131
132function makeFriends (hosts, callback) {
133 const podsScore = {}
134
135 logger.info('Make friends!')
136 getMyPublicCert(function (err, cert) {
137 if (err) {
138 logger.error('Cannot read public cert.')
139 return callback(err)
140 }
141
142 eachSeries(hosts, function (host, callbackEach) {
143 computeForeignPodsList(host, podsScore, callbackEach)
144 }, function (err) {
145 if (err) return callback(err)
146
147 logger.debug('Pods scores computed.', { podsScore: podsScore })
148 const podsList = computeWinningPods(hosts, podsScore)
149 logger.debug('Pods that we keep.', { podsToKeep: podsList })
150
151 makeRequestsToWinningPods(cert, podsList, callback)
152 })
153 })
154}
155
156function quitFriends (callback) {
157 // Stop pool requests
158 requestScheduler.deactivate()
159
160 waterfall([
161 function flushRequests (callbackAsync) {
162 requestScheduler.flush(err => callbackAsync(err))
163 },
164
165 function flushVideoQaduRequests (callbackAsync) {
166 requestVideoQaduScheduler.flush(err => callbackAsync(err))
167 },
168
169 function getPodsList (callbackAsync) {
170 return db.Pod.list(callbackAsync)
171 },
172
173 function announceIQuitMyFriends (pods, callbackAsync) {
174 const requestParams = {
175 method: 'POST',
176 path: '/api/' + API_VERSION + '/remote/pods/remove',
177 sign: true,
178 toPod: null
179 }
180
181 // Announce we quit them
182 // We don't care if the request fails
183 // The other pod will exclude us automatically after a while
184 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
185 requestParams.toPod = pod
186 makeSecureRequest(requestParams, callbackEach)
187 }, function (err) {
188 if (err) {
189 logger.error('Some errors while quitting friends.', { err: err })
190 // Don't stop the process
191 }
192
193 return callbackAsync(null, pods)
194 })
195 },
196
197 function removePodsFromDB (pods, callbackAsync) {
198 each(pods, function (pod: any, callbackEach) {
199 pod.destroy().asCallback(callbackEach)
200 }, callbackAsync)
201 }
202 ], function (err) {
203 // Don't forget to re activate the scheduler, even if there was an error
204 requestScheduler.activate()
205
206 if (err) return callback(err)
207
208 logger.info('Removed all remote videos.')
209 return callback(null)
210 })
211}
212
213function sendOwnedVideosToPod (podId) {
214 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
215 if (err) {
216 logger.error('Cannot get the list of videos we own.')
217 return
218 }
219
220 videosList.forEach(function (video) {
221 video.toAddRemoteJSON(function (err, remoteVideo) {
222 if (err) {
223 logger.error('Cannot convert video to remote.', { error: err })
224 // Don't break the process
225 return
226 }
227
228 const options = {
229 type: 'add',
230 endpoint: REQUEST_ENDPOINTS.VIDEOS,
231 data: remoteVideo,
232 toIds: [ podId ]
233 }
234 createRequest(options)
235 })
236 })
237 })
238}
239
240function getRequestScheduler () {
241 return requestScheduler
242}
243
244function getRequestVideoQaduScheduler () {
245 return requestVideoQaduScheduler
246}
247
248function getRequestVideoEventScheduler () {
249 return requestVideoEventScheduler
250}
251
252// ---------------------------------------------------------------------------
253
254export {
255 activateSchedulers,
256 addVideoToFriends,
257 updateVideoToFriends,
258 reportAbuseVideoToFriend,
259 quickAndDirtyUpdateVideoToFriends,
260 quickAndDirtyUpdatesVideoToFriends,
261 addEventToRemoteVideo,
262 addEventsToRemoteVideo,
263 hasFriends,
264 makeFriends,
265 quitFriends,
266 removeVideoToFriends,
267 sendOwnedVideosToPod,
268 getRequestScheduler,
269 getRequestVideoQaduScheduler,
270 getRequestVideoEventScheduler
271}
272
273// ---------------------------------------------------------------------------
274
275function computeForeignPodsList (host, podsScore, callback) {
276 getForeignPodsList(host, function (err, res) {
277 if (err) return callback(err)
278
279 const foreignPodsList = res.data
280
281 // Let's give 1 point to the pod we ask the friends list
282 foreignPodsList.push({ host })
283
284 foreignPodsList.forEach(function (foreignPod) {
285 const foreignPodHost = foreignPod.host
286
287 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
288 else podsScore[foreignPodHost] = 1
289 })
290
291 return callback()
292 })
293}
294
295function computeWinningPods (hosts, podsScore) {
296 // Build the list of pods to add
297 // Only add a pod if it exists in more than a half base pods
298 const podsList = []
299 const baseScore = hosts.length / 2
300
301 Object.keys(podsScore).forEach(function (podHost) {
302 // If the pod is not me and with a good score we add it
303 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
304 podsList.push({ host: podHost })
305 }
306 })
307
308 return podsList
309}
310
311function getForeignPodsList (host, callback) {
312 const path = '/api/' + API_VERSION + '/pods'
313
314 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
315 if (err) return callback(err)
316
317 try {
318 const json = JSON.parse(body)
319 return callback(null, json)
320 } catch (err) {
321 return callback(err)
322 }
323 })
324}
325
326function makeRequestsToWinningPods (cert, podsList, callback) {
327 // Stop pool requests
328 requestScheduler.deactivate()
329 // Flush pool requests
330 requestScheduler.forceSend()
331
332 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: any, callbackEach) {
333 const params = {
334 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
335 method: 'POST',
336 json: {
337 host: CONFIG.WEBSERVER.HOST,
338 email: CONFIG.ADMIN.EMAIL,
339 publicKey: cert
340 }
341 }
342
343 makeRetryRequest(params, function (err, res, body) {
344 if (err) {
345 logger.error('Error with adding %s pod.', pod.host, { error: err })
346 // Don't break the process
347 return callbackEach()
348 }
349
350 if (res.statusCode === 200) {
351 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
352 podObj.save().asCallback(function (err, podCreated) {
353 if (err) {
354 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
355 return callbackEach()
356 }
357
358 // Add our videos to the request scheduler
359 sendOwnedVideosToPod(podCreated.id)
360
361 return callbackEach()
362 })
363 } else {
364 logger.error('Status not 200 for %s pod.', pod.host)
365 return callbackEach()
366 }
367 })
368 }, function endRequests () {
369 // Final callback, we've ended all the requests
370 // Now we made new friends, we can re activate the pool of requests
371 requestScheduler.activate()
372
373 logger.debug('makeRequestsToWinningPods finished.')
374 return callback()
375 })
376}
377
378// Wrapper that populate "toIds" argument with all our friends if it is not specified
379// { type, endpoint, data, toIds, transaction }
380function createRequest (options, callback?) {
381 if (!callback) callback = function () { /* empty */ }
382 if (options.toIds) return requestScheduler.createRequest(options, callback)
383
384 // If the "toIds" pods is not specified, we send the request to all our friends
385 db.Pod.listAllIds(options.transaction, function (err, podIds) {
386 if (err) {
387 logger.error('Cannot get pod ids', { error: err })
388 return
389 }
390
391 const newOptions = Object.assign(options, { toIds: podIds })
392 return requestScheduler.createRequest(newOptions, callback)
393 })
394}
395
396function createVideoQaduRequest (options, callback) {
397 if (!callback) callback = createEmptyCallback()
398
399 requestVideoQaduScheduler.createRequest(options, callback)
400}
401
402function createVideoEventRequest (options, callback) {
403 if (!callback) callback = createEmptyCallback()
404
405 requestVideoEventScheduler.createRequest(options, callback)
406}
407
408function isMe (host) {
409 return host === CONFIG.WEBSERVER.HOST
410}