aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/friends.js
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/friends.js')
-rw-r--r--server/lib/friends.js73
1 files changed, 49 insertions, 24 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js
index eaea040ca..3ed29f651 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -4,18 +4,14 @@ const each = require('async/each')
4const eachLimit = require('async/eachLimit') 4const eachLimit = require('async/eachLimit')
5const eachSeries = require('async/eachSeries') 5const eachSeries = require('async/eachSeries')
6const fs = require('fs') 6const fs = require('fs')
7const mongoose = require('mongoose')
8const request = require('request') 7const request = require('request')
9const waterfall = require('async/waterfall') 8const waterfall = require('async/waterfall')
10 9
11const constants = require('../initializers/constants') 10const constants = require('../initializers/constants')
11const db = require('../initializers/database')
12const logger = require('../helpers/logger') 12const logger = require('../helpers/logger')
13const requests = require('../helpers/requests') 13const requests = require('../helpers/requests')
14 14
15const Pod = mongoose.model('Pod')
16const Request = mongoose.model('Request')
17const Video = mongoose.model('Video')
18
19const friends = { 15const friends = {
20 addVideoToFriends, 16 addVideoToFriends,
21 hasFriends, 17 hasFriends,
@@ -31,7 +27,7 @@ function addVideoToFriends (video) {
31} 27}
32 28
33function hasFriends (callback) { 29function hasFriends (callback) {
34 Pod.countAll(function (err, count) { 30 db.Pod.countAll(function (err, count) {
35 if (err) return callback(err) 31 if (err) return callback(err)
36 32
37 const hasFriends = (count !== 0) 33 const hasFriends = (count !== 0)
@@ -69,13 +65,13 @@ function makeFriends (hosts, callback) {
69 65
70function quitFriends (callback) { 66function quitFriends (callback) {
71 // Stop pool requests 67 // Stop pool requests
72 Request.deactivate() 68 db.Request.deactivate()
73 // Flush pool requests 69 // Flush pool requests
74 Request.flush() 70 db.Request.flush()
75 71
76 waterfall([ 72 waterfall([
77 function getPodsList (callbackAsync) { 73 function getPodsList (callbackAsync) {
78 return Pod.list(callbackAsync) 74 return db.Pod.list(callbackAsync)
79 }, 75 },
80 76
81 function announceIQuitMyFriends (pods, callbackAsync) { 77 function announceIQuitMyFriends (pods, callbackAsync) {
@@ -103,12 +99,12 @@ function quitFriends (callback) {
103 99
104 function removePodsFromDB (pods, callbackAsync) { 100 function removePodsFromDB (pods, callbackAsync) {
105 each(pods, function (pod, callbackEach) { 101 each(pods, function (pod, callbackEach) {
106 pod.remove(callbackEach) 102 pod.destroy().asCallback(callbackEach)
107 }, callbackAsync) 103 }, callbackAsync)
108 } 104 }
109 ], function (err) { 105 ], function (err) {
110 // Don't forget to re activate the scheduler, even if there was an error 106 // Don't forget to re activate the scheduler, even if there was an error
111 Request.activate() 107 db.Request.activate()
112 108
113 if (err) return callback(err) 109 if (err) return callback(err)
114 110
@@ -122,7 +118,7 @@ function removeVideoToFriends (videoParams) {
122} 118}
123 119
124function sendOwnedVideosToPod (podId) { 120function sendOwnedVideosToPod (podId) {
125 Video.listOwned(function (err, videosList) { 121 db.Video.listOwnedAndPopulateAuthor(function (err, videosList) {
126 if (err) { 122 if (err) {
127 logger.error('Cannot get the list of videos we own.') 123 logger.error('Cannot get the list of videos we own.')
128 return 124 return
@@ -200,9 +196,9 @@ function getForeignPodsList (host, callback) {
200 196
201function makeRequestsToWinningPods (cert, podsList, callback) { 197function makeRequestsToWinningPods (cert, podsList, callback) {
202 // Stop pool requests 198 // Stop pool requests
203 Request.deactivate() 199 db.Request.deactivate()
204 // Flush pool requests 200 // Flush pool requests
205 Request.forceSend() 201 db.Request.forceSend()
206 202
207 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 203 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
208 const params = { 204 const params = {
@@ -222,8 +218,8 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
222 } 218 }
223 219
224 if (res.statusCode === 200) { 220 if (res.statusCode === 200) {
225 const podObj = new Pod({ host: pod.host, publicKey: body.cert }) 221 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
226 podObj.save(function (err, podCreated) { 222 podObj.save().asCallback(function (err, podCreated) {
227 if (err) { 223 if (err) {
228 logger.error('Cannot add friend %s pod.', pod.host, { error: err }) 224 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
229 return callbackEach() 225 return callbackEach()
@@ -242,28 +238,57 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
242 }, function endRequests () { 238 }, function endRequests () {
243 // Final callback, we've ended all the requests 239 // Final callback, we've ended all the requests
244 // Now we made new friends, we can re activate the pool of requests 240 // Now we made new friends, we can re activate the pool of requests
245 Request.activate() 241 db.Request.activate()
246 242
247 logger.debug('makeRequestsToWinningPods finished.') 243 logger.debug('makeRequestsToWinningPods finished.')
248 return callback() 244 return callback()
249 }) 245 })
250} 246}
251 247
248// Wrapper that populate "to" argument with all our friends if it is not specified
252function createRequest (type, endpoint, data, to) { 249function createRequest (type, endpoint, data, to) {
253 const req = new Request({ 250 if (to) return _createRequest(type, endpoint, data, to)
251
252 // If the "to" pods is not specified, we send the request to all our friends
253 db.Pod.listAllIds(function (err, podIds) {
254 if (err) {
255 logger.error('Cannot get pod ids', { error: err })
256 return
257 }
258
259 return _createRequest(type, endpoint, data, podIds)
260 })
261}
262
263function _createRequest (type, endpoint, data, to) {
264 const pods = []
265
266 // If there are no destination pods abort
267 if (to.length === 0) return
268
269 to.forEach(function (toPod) {
270 pods.push(db.Pod.build({ id: toPod }))
271 })
272
273 const createQuery = {
254 endpoint, 274 endpoint,
255 request: { 275 request: {
256 type: type, 276 type: type,
257 data: data 277 data: data
258 } 278 }
259 })
260
261 if (to) {
262 req.to = to
263 } 279 }
264 280
265 req.save(function (err) { 281 // We run in transaction to keep coherency between Request and RequestToPod tables
266 if (err) logger.error('Cannot save the request.', { error: err }) 282 db.sequelize.transaction(function (t) {
283 const dbRequestOptions = {
284 transaction: t
285 }
286
287 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
288 return request.setPods(pods, dbRequestOptions)
289 })
290 }).asCallback(function (err) {
291 if (err) logger.error('Error in createRequest transaction.', { error: err })
267 }) 292 })
268} 293}
269 294