3 const each
= require('async/each')
4 const eachLimit
= require('async/eachLimit')
5 const eachSeries
= require('async/eachSeries')
6 const request
= require('request')
7 const waterfall
= require('async/waterfall')
9 const constants
= require('../initializers/constants')
10 const db
= require('../initializers/database')
11 const logger
= require('../helpers/logger')
12 const peertubeCrypto
= require('../helpers/peertube-crypto')
13 const requests
= require('../helpers/requests')
14 const utils
= require('../helpers/utils')
15 const RequestScheduler
= require('./request-scheduler')
16 const RequestVideoQaduScheduler
= require('./request-video-qadu-scheduler')
17 const RequestVideoEventScheduler
= require('./request-video-event-scheduler')
19 const ENDPOINT_ACTIONS
= constants
.REQUEST_ENDPOINT_ACTIONS
[constants
.REQUEST_ENDPOINTS
.VIDEOS
]
21 const requestScheduler
= new RequestScheduler()
22 const requestVideoQaduScheduler
= new RequestVideoQaduScheduler()
23 const requestVideoEventScheduler
= new RequestVideoEventScheduler()
29 reportAbuseVideoToFriend
,
30 quickAndDirtyUpdateVideoToFriends
,
31 addEventToRemoteVideo
,
38 getRequestVideoQaduScheduler
,
39 getRequestVideoEventScheduler
42 function activate () {
43 requestScheduler
.activate()
44 requestVideoQaduScheduler
.activate()
45 requestVideoEventScheduler
.activate()
48 function addVideoToFriends (videoData
, transaction
, callback
) {
50 type: ENDPOINT_ACTIONS
.ADD
,
51 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
55 createRequest(options
, callback
)
58 function updateVideoToFriends (videoData
, transaction
, callback
) {
60 type: ENDPOINT_ACTIONS
.UPDATE
,
61 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
65 createRequest(options
, callback
)
68 function removeVideoToFriends (videoParams
) {
70 type: ENDPOINT_ACTIONS
.REMOVE
,
71 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
74 createRequest(options
)
77 function reportAbuseVideoToFriend (reportData
, video
) {
79 type: ENDPOINT_ACTIONS
.REPORT_ABUSE
,
80 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
82 toIds: [ video
.Author
.podId
]
84 createRequest(options
)
87 function quickAndDirtyUpdateVideoToFriends (videoId
, type
, transaction
, callback
) {
93 return createVideoQaduRequest(options
, callback
)
96 function addEventToRemoteVideo (videoId
, type
, transaction
, callback
) {
102 createVideoEventRequest(options
, callback
)
105 function hasFriends (callback
) {
106 db
.Pod
.countAll(function (err
, count
) {
107 if (err
) return callback(err
)
109 const hasFriends
= (count
!== 0)
110 callback(null, hasFriends
)
114 function makeFriends (hosts
, callback
) {
117 logger
.info('Make friends!')
118 peertubeCrypto
.getMyPublicCert(function (err
, cert
) {
120 logger
.error('Cannot read public cert.')
124 eachSeries(hosts
, function (host
, callbackEach
) {
125 computeForeignPodsList(host
, podsScore
, callbackEach
)
127 if (err
) return callback(err
)
129 logger
.debug('Pods scores computed.', { podsScore: podsScore
})
130 const podsList
= computeWinningPods(hosts
, podsScore
)
131 logger
.debug('Pods that we keep.', { podsToKeep: podsList
})
133 makeRequestsToWinningPods(cert
, podsList
, callback
)
138 function quitFriends (callback
) {
139 // Stop pool requests
140 requestScheduler
.deactivate()
143 function flushRequests (callbackAsync
) {
144 requestScheduler
.flush(err
=> callbackAsync(err
))
147 function flushVideoQaduRequests (callbackAsync
) {
148 requestVideoQaduScheduler
.flush(err
=> callbackAsync(err
))
151 function getPodsList (callbackAsync
) {
152 return db
.Pod
.list(callbackAsync
)
155 function announceIQuitMyFriends (pods
, callbackAsync
) {
156 const requestParams
= {
158 path: '/api/' + constants
.API_VERSION
+ '/pods/remove',
162 // Announce we quit them
163 // We don't care if the request fails
164 // The other pod will exclude us automatically after a while
165 eachLimit(pods
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
166 requestParams
.toPod
= pod
167 requests
.makeSecureRequest(requestParams
, callbackEach
)
170 logger
.error('Some errors while quitting friends.', { err: err
})
171 // Don't stop the process
174 return callbackAsync(null, pods
)
178 function removePodsFromDB (pods
, callbackAsync
) {
179 each(pods
, function (pod
, callbackEach
) {
180 pod
.destroy().asCallback(callbackEach
)
184 // Don't forget to re activate the scheduler, even if there was an error
185 requestScheduler
.activate()
187 if (err
) return callback(err
)
189 logger
.info('Removed all remote videos.')
190 return callback(null)
194 function sendOwnedVideosToPod (podId
) {
195 db
.Video
.listOwnedAndPopulateAuthorAndTags(function (err
, videosList
) {
197 logger
.error('Cannot get the list of videos we own.')
201 videosList
.forEach(function (video
) {
202 video
.toAddRemoteJSON(function (err
, remoteVideo
) {
204 logger
.error('Cannot convert video to remote.', { error: err
})
205 // Don't break the process
211 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
215 createRequest(options
)
221 function getRequestScheduler () {
222 return requestScheduler
225 function getRequestVideoQaduScheduler () {
226 return requestVideoQaduScheduler
229 function getRequestVideoEventScheduler () {
230 return requestVideoEventScheduler
233 // ---------------------------------------------------------------------------
235 module
.exports
= friends
237 // ---------------------------------------------------------------------------
239 function computeForeignPodsList (host
, podsScore
, callback
) {
240 getForeignPodsList(host
, function (err
, res
) {
241 if (err
) return callback(err
)
243 const foreignPodsList
= res
.data
245 // Let's give 1 point to the pod we ask the friends list
246 foreignPodsList
.push({ host
})
248 foreignPodsList
.forEach(function (foreignPod
) {
249 const foreignPodHost
= foreignPod
.host
251 if (podsScore
[foreignPodHost
]) podsScore
[foreignPodHost
]++
252 else podsScore
[foreignPodHost
] = 1
259 function computeWinningPods (hosts
, podsScore
) {
260 // Build the list of pods to add
261 // Only add a pod if it exists in more than a half base pods
263 const baseScore
= hosts
.length
/ 2
265 Object
.keys(podsScore
).forEach(function (podHost
) {
266 // If the pod is not me and with a good score we add it
267 if (isMe(podHost
) === false && podsScore
[podHost
] > baseScore
) {
268 podsList
.push({ host: podHost
})
275 function getForeignPodsList (host
, callback
) {
276 const path
= '/api/' + constants
.API_VERSION
+ '/pods'
278 request
.get(constants
.REMOTE_SCHEME
.HTTP
+ '://' + host
+ path
, function (err
, response
, body
) {
279 if (err
) return callback(err
)
282 const json
= JSON
.parse(body
)
283 return callback(null, json
)
290 function makeRequestsToWinningPods (cert
, podsList
, callback
) {
291 // Stop pool requests
292 requestScheduler
.deactivate()
293 // Flush pool requests
294 requestScheduler
.forceSend()
296 eachLimit(podsList
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
298 url: constants
.REMOTE_SCHEME
.HTTP
+ '://' + pod
.host
+ '/api/' + constants
.API_VERSION
+ '/pods/',
301 host: constants
.CONFIG
.WEBSERVER
.HOST
,
302 email: constants
.CONFIG
.ADMIN
.EMAIL
,
307 requests
.makeRetryRequest(params
, function (err
, res
, body
) {
309 logger
.error('Error with adding %s pod.', pod
.host
, { error: err
})
310 // Don't break the process
311 return callbackEach()
314 if (res
.statusCode
=== 200) {
315 const podObj
= db
.Pod
.build({ host: pod
.host
, publicKey: body
.cert
, email: body
.email
})
316 podObj
.save().asCallback(function (err
, podCreated
) {
318 logger
.error('Cannot add friend %s pod.', pod
.host
, { error: err
})
319 return callbackEach()
322 // Add our videos to the request scheduler
323 sendOwnedVideosToPod(podCreated
.id
)
325 return callbackEach()
328 logger
.error('Status not 200 for %s pod.', pod
.host
)
329 return callbackEach()
332 }, function endRequests () {
333 // Final callback, we've ended all the requests
334 // Now we made new friends, we can re activate the pool of requests
335 requestScheduler
.activate()
337 logger
.debug('makeRequestsToWinningPods finished.')
342 // Wrapper that populate "toIds" argument with all our friends if it is not specified
343 // { type, endpoint, data, toIds, transaction }
344 function createRequest (options
, callback
) {
345 if (!callback
) callback = function () {}
346 if (options
.toIds
) return requestScheduler
.createRequest(options
, callback
)
348 // If the "toIds" pods is not specified, we send the request to all our friends
349 db
.Pod
.listAllIds(options
.transaction
, function (err
, podIds
) {
351 logger
.error('Cannot get pod ids', { error: err
})
355 const newOptions
= Object
.assign(options
, { toIds: podIds
})
356 return requestScheduler
.createRequest(newOptions
, callback
)
360 function createVideoQaduRequest (options
, callback
) {
361 if (!callback
) callback
= utils
.createEmptyCallback()
363 requestVideoQaduScheduler
.createRequest(options
, callback
)
366 function createVideoEventRequest (options
, callback
) {
367 if (!callback
) callback
= utils
.createEmptyCallback()
369 requestVideoEventScheduler
.createRequest(options
, callback
)
372 function isMe (host
) {
373 return host
=== constants
.CONFIG
.WEBSERVER
.HOST