]>
git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.js
3 const each
= require('async/each')
4 const eachLimit
= require('async/eachLimit')
5 const eachSeries
= require('async/eachSeries')
6 const fs
= require('fs')
7 const request
= require('request')
8 const waterfall
= require('async/waterfall')
10 const constants
= require('../initializers/constants')
11 const db
= require('../initializers/database')
12 const logger
= require('../helpers/logger')
13 const requests
= require('../helpers/requests')
25 function addVideoToFriends (video
) {
26 createRequest('add', constants
.REQUEST_ENDPOINTS
.VIDEOS
, video
)
29 function hasFriends (callback
) {
30 db
.Pod
.countAll(function (err
, count
) {
31 if (err
) return callback(err
)
33 const hasFriends
= (count
!== 0)
34 callback(null, hasFriends
)
38 function getMyCertificate (callback
) {
39 fs
.readFile(constants
.CONFIG
.STORAGE
.CERT_DIR
+ 'peertube.pub', 'utf8', callback
)
42 function makeFriends (hosts
, callback
) {
45 logger
.info('Make friends!')
46 getMyCertificate(function (err
, cert
) {
48 logger
.error('Cannot read public cert.')
52 eachSeries(hosts
, function (host
, callbackEach
) {
53 computeForeignPodsList(host
, podsScore
, callbackEach
)
55 if (err
) return callback(err
)
57 logger
.debug('Pods scores computed.', { podsScore: podsScore
})
58 const podsList
= computeWinningPods(hosts
, podsScore
)
59 logger
.debug('Pods that we keep.', { podsToKeep: podsList
})
61 makeRequestsToWinningPods(cert
, podsList
, callback
)
66 function quitFriends (callback
) {
68 db
.Request
.deactivate()
71 function flushRequests (callbackAsync
) {
72 db
.Request
.flush(callbackAsync
)
75 function getPodsList (callbackAsync
) {
76 return db
.Pod
.list(callbackAsync
)
79 function announceIQuitMyFriends (pods
, callbackAsync
) {
80 const requestParams
= {
82 path: '/api/' + constants
.API_VERSION
+ '/pods/remove',
86 // Announce we quit them
87 // We don't care if the request fails
88 // The other pod will exclude us automatically after a while
89 eachLimit(pods
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
90 requestParams
.toPod
= pod
91 requests
.makeSecureRequest(requestParams
, callbackEach
)
94 logger
.error('Some errors while quitting friends.', { err: err
})
95 // Don't stop the process
98 return callbackAsync(null, pods
)
102 function removePodsFromDB (pods
, callbackAsync
) {
103 each(pods
, function (pod
, callbackEach
) {
104 pod
.destroy().asCallback(callbackEach
)
108 // Don't forget to re activate the scheduler, even if there was an error
109 db
.Request
.activate()
111 if (err
) return callback(err
)
113 logger
.info('Removed all remote videos.')
114 return callback(null)
118 function removeVideoToFriends (videoParams
) {
119 createRequest('remove', constants
.REQUEST_ENDPOINTS
.VIDEOS
, videoParams
)
122 function sendOwnedVideosToPod (podId
) {
123 db
.Video
.listOwnedAndPopulateAuthorAndTags(function (err
, videosList
) {
125 logger
.error('Cannot get the list of videos we own.')
129 videosList
.forEach(function (video
) {
130 video
.toRemoteJSON(function (err
, remoteVideo
) {
132 logger
.error('Cannot convert video to remote.', { error: err
})
133 // Don't break the process
137 createRequest('add', constants
.REQUEST_ENDPOINTS
.VIDEOS
, remoteVideo
, [ podId
])
143 // ---------------------------------------------------------------------------
145 module
.exports
= friends
147 // ---------------------------------------------------------------------------
149 function computeForeignPodsList (host
, podsScore
, callback
) {
150 getForeignPodsList(host
, function (err
, foreignPodsList
) {
151 if (err
) return callback(err
)
153 if (!foreignPodsList
) foreignPodsList
= []
155 // Let's give 1 point to the pod we ask the friends list
156 foreignPodsList
.push({ host
})
158 foreignPodsList
.forEach(function (foreignPod
) {
159 const foreignPodHost
= foreignPod
.host
161 if (podsScore
[foreignPodHost
]) podsScore
[foreignPodHost
]++
162 else podsScore
[foreignPodHost
] = 1
169 function computeWinningPods (hosts
, podsScore
) {
170 // Build the list of pods to add
171 // Only add a pod if it exists in more than a half base pods
173 const baseScore
= hosts
.length
/ 2
174 Object
.keys(podsScore
).forEach(function (podHost
) {
175 // If the pod is not me and with a good score we add it
176 if (isMe(podHost
) === false && podsScore
[podHost
] > baseScore
) {
177 podsList
.push({ host: podHost
})
184 function getForeignPodsList (host
, callback
) {
185 const path
= '/api/' + constants
.API_VERSION
+ '/pods'
187 request
.get(constants
.REMOTE_SCHEME
.HTTP
+ '://' + host
+ path
, function (err
, response
, body
) {
188 if (err
) return callback(err
)
191 const json
= JSON
.parse(body
)
192 return callback(null, json
)
199 function makeRequestsToWinningPods (cert
, podsList
, callback
) {
200 // Stop pool requests
201 db
.Request
.deactivate()
202 // Flush pool requests
203 db
.Request
.forceSend()
205 eachLimit(podsList
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
207 url: constants
.REMOTE_SCHEME
.HTTP
+ '://' + pod
.host
+ '/api/' + constants
.API_VERSION
+ '/pods/',
210 host: constants
.CONFIG
.WEBSERVER
.HOST
,
215 requests
.makeRetryRequest(params
, function (err
, res
, body
) {
217 logger
.error('Error with adding %s pod.', pod
.host
, { error: err
})
218 // Don't break the process
219 return callbackEach()
222 if (res
.statusCode
=== 200) {
223 const podObj
= db
.Pod
.build({ host: pod
.host
, publicKey: body
.cert
})
224 podObj
.save().asCallback(function (err
, podCreated
) {
226 logger
.error('Cannot add friend %s pod.', pod
.host
, { error: err
})
227 return callbackEach()
230 // Add our videos to the request scheduler
231 sendOwnedVideosToPod(podCreated
.id
)
233 return callbackEach()
236 logger
.error('Status not 200 for %s pod.', pod
.host
)
237 return callbackEach()
240 }, function endRequests () {
241 // Final callback, we've ended all the requests
242 // Now we made new friends, we can re activate the pool of requests
243 db
.Request
.activate()
245 logger
.debug('makeRequestsToWinningPods finished.')
250 // Wrapper that populate "to" argument with all our friends if it is not specified
251 function createRequest (type
, endpoint
, data
, to
) {
252 if (to
) return _createRequest(type
, endpoint
, data
, to
)
254 // If the "to" pods is not specified, we send the request to all our friends
255 db
.Pod
.listAllIds(function (err
, podIds
) {
257 logger
.error('Cannot get pod ids', { error: err
})
261 return _createRequest(type
, endpoint
, data
, podIds
)
265 function _createRequest (type
, endpoint
, data
, to
) {
268 // If there are no destination pods abort
269 if (to
.length
=== 0) return
271 to
.forEach(function (toPod
) {
272 pods
.push(db
.Pod
.build({ id: toPod
}))
275 const createQuery
= {
283 // We run in transaction to keep coherency between Request and RequestToPod tables
284 db
.sequelize
.transaction(function (t
) {
285 const dbRequestOptions
= {
289 return db
.Request
.create(createQuery
, dbRequestOptions
).then(function (request
) {
290 return request
.setPods(pods
, dbRequestOptions
)
292 }).asCallback(function (err
) {
293 if (err
) logger
.error('Error in createRequest transaction.', { error: err
})
297 function isMe (host
) {
298 return host
=== constants
.CONFIG
.WEBSERVER
.HOST