3 const each
= require('async/each')
4 const eachLimit
= require('async/eachLimit')
5 const eachSeries
= require('async/eachSeries')
6 const request
= require('request')
7 const waterfall
= require('async/waterfall')
9 const constants
= require('../initializers/constants')
10 const db
= require('../initializers/database')
11 const logger
= require('../helpers/logger')
12 const peertubeCrypto
= require('../helpers/peertube-crypto')
13 const requests
= require('../helpers/requests')
14 const utils
= require('../helpers/utils')
15 const RequestScheduler
= require('./request-scheduler')
16 const RequestVideoQaduScheduler
= require('./request-video-qadu-scheduler')
17 const RequestVideoEventScheduler
= require('./request-video-event-scheduler')
19 const ENDPOINT_ACTIONS
= constants
.REQUEST_ENDPOINT_ACTIONS
[constants
.REQUEST_ENDPOINTS
.VIDEOS
]
21 const requestScheduler
= new RequestScheduler()
22 const requestSchedulerVideoQadu
= new RequestVideoQaduScheduler()
23 const requestSchedulerVideoEvent
= new RequestVideoEventScheduler()
29 reportAbuseVideoToFriend
,
30 quickAndDirtyUpdateVideoToFriends
,
31 addEventToRemoteVideo
,
39 function activate () {
40 requestScheduler
.activate()
41 requestSchedulerVideoQadu
.activate()
42 requestSchedulerVideoEvent
.activate()
45 function addVideoToFriends (videoData
, transaction
, callback
) {
47 type: ENDPOINT_ACTIONS
.ADD
,
48 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
52 createRequest(options
, callback
)
55 function updateVideoToFriends (videoData
, transaction
, callback
) {
57 type: ENDPOINT_ACTIONS
.UPDATE
,
58 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
62 createRequest(options
, callback
)
65 function removeVideoToFriends (videoParams
) {
67 type: ENDPOINT_ACTIONS
.REMOVE
,
68 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
71 createRequest(options
)
74 function reportAbuseVideoToFriend (reportData
, video
) {
76 type: ENDPOINT_ACTIONS
.REPORT_ABUSE
,
77 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
79 toIds: [ video
.Author
.podId
]
81 createRequest(options
)
84 function quickAndDirtyUpdateVideoToFriends (videoId
, type
, transaction
, callback
) {
90 return createVideoQaduRequest(options
, callback
)
93 function addEventToRemoteVideo (videoId
, type
, transaction
, callback
) {
99 createVideoEventRequest(options
, callback
)
102 function hasFriends (callback
) {
103 db
.Pod
.countAll(function (err
, count
) {
104 if (err
) return callback(err
)
106 const hasFriends
= (count
!== 0)
107 callback(null, hasFriends
)
111 function makeFriends (hosts
, callback
) {
114 logger
.info('Make friends!')
115 peertubeCrypto
.getMyPublicCert(function (err
, cert
) {
117 logger
.error('Cannot read public cert.')
121 eachSeries(hosts
, function (host
, callbackEach
) {
122 computeForeignPodsList(host
, podsScore
, callbackEach
)
124 if (err
) return callback(err
)
126 logger
.debug('Pods scores computed.', { podsScore: podsScore
})
127 const podsList
= computeWinningPods(hosts
, podsScore
)
128 logger
.debug('Pods that we keep.', { podsToKeep: podsList
})
130 makeRequestsToWinningPods(cert
, podsList
, callback
)
135 function quitFriends (callback
) {
136 // Stop pool requests
137 requestScheduler
.deactivate()
140 function flushRequests (callbackAsync
) {
141 requestScheduler
.flush(err
=> callbackAsync(err
))
144 function flushVideoQaduRequests (callbackAsync
) {
145 requestSchedulerVideoQadu
.flush(err
=> callbackAsync(err
))
148 function getPodsList (callbackAsync
) {
149 return db
.Pod
.list(callbackAsync
)
152 function announceIQuitMyFriends (pods
, callbackAsync
) {
153 const requestParams
= {
155 path: '/api/' + constants
.API_VERSION
+ '/pods/remove',
159 // Announce we quit them
160 // We don't care if the request fails
161 // The other pod will exclude us automatically after a while
162 eachLimit(pods
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
163 requestParams
.toPod
= pod
164 requests
.makeSecureRequest(requestParams
, callbackEach
)
167 logger
.error('Some errors while quitting friends.', { err: err
})
168 // Don't stop the process
171 return callbackAsync(null, pods
)
175 function removePodsFromDB (pods
, callbackAsync
) {
176 each(pods
, function (pod
, callbackEach
) {
177 pod
.destroy().asCallback(callbackEach
)
181 // Don't forget to re activate the scheduler, even if there was an error
182 requestScheduler
.activate()
184 if (err
) return callback(err
)
186 logger
.info('Removed all remote videos.')
187 return callback(null)
191 function sendOwnedVideosToPod (podId
) {
192 db
.Video
.listOwnedAndPopulateAuthorAndTags(function (err
, videosList
) {
194 logger
.error('Cannot get the list of videos we own.')
198 videosList
.forEach(function (video
) {
199 video
.toAddRemoteJSON(function (err
, remoteVideo
) {
201 logger
.error('Cannot convert video to remote.', { error: err
})
202 // Don't break the process
208 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
212 createRequest(options
)
218 // ---------------------------------------------------------------------------
220 module
.exports
= friends
222 // ---------------------------------------------------------------------------
224 function computeForeignPodsList (host
, podsScore
, callback
) {
225 getForeignPodsList(host
, function (err
, res
) {
226 if (err
) return callback(err
)
228 const foreignPodsList
= res
.data
230 // Let's give 1 point to the pod we ask the friends list
231 foreignPodsList
.push({ host
})
233 foreignPodsList
.forEach(function (foreignPod
) {
234 const foreignPodHost
= foreignPod
.host
236 if (podsScore
[foreignPodHost
]) podsScore
[foreignPodHost
]++
237 else podsScore
[foreignPodHost
] = 1
244 function computeWinningPods (hosts
, podsScore
) {
245 // Build the list of pods to add
246 // Only add a pod if it exists in more than a half base pods
248 const baseScore
= hosts
.length
/ 2
250 Object
.keys(podsScore
).forEach(function (podHost
) {
251 // If the pod is not me and with a good score we add it
252 if (isMe(podHost
) === false && podsScore
[podHost
] > baseScore
) {
253 podsList
.push({ host: podHost
})
260 function getForeignPodsList (host
, callback
) {
261 const path
= '/api/' + constants
.API_VERSION
+ '/pods'
263 request
.get(constants
.REMOTE_SCHEME
.HTTP
+ '://' + host
+ path
, function (err
, response
, body
) {
264 if (err
) return callback(err
)
267 const json
= JSON
.parse(body
)
268 return callback(null, json
)
275 function makeRequestsToWinningPods (cert
, podsList
, callback
) {
276 // Stop pool requests
277 requestScheduler
.deactivate()
278 // Flush pool requests
279 requestScheduler
.forceSend()
281 eachLimit(podsList
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
283 url: constants
.REMOTE_SCHEME
.HTTP
+ '://' + pod
.host
+ '/api/' + constants
.API_VERSION
+ '/pods/',
286 host: constants
.CONFIG
.WEBSERVER
.HOST
,
287 email: constants
.CONFIG
.ADMIN
.EMAIL
,
292 requests
.makeRetryRequest(params
, function (err
, res
, body
) {
294 logger
.error('Error with adding %s pod.', pod
.host
, { error: err
})
295 // Don't break the process
296 return callbackEach()
299 if (res
.statusCode
=== 200) {
300 const podObj
= db
.Pod
.build({ host: pod
.host
, publicKey: body
.cert
, email: body
.email
})
301 podObj
.save().asCallback(function (err
, podCreated
) {
303 logger
.error('Cannot add friend %s pod.', pod
.host
, { error: err
})
304 return callbackEach()
307 // Add our videos to the request scheduler
308 sendOwnedVideosToPod(podCreated
.id
)
310 return callbackEach()
313 logger
.error('Status not 200 for %s pod.', pod
.host
)
314 return callbackEach()
317 }, function endRequests () {
318 // Final callback, we've ended all the requests
319 // Now we made new friends, we can re activate the pool of requests
320 requestScheduler
.activate()
322 logger
.debug('makeRequestsToWinningPods finished.')
327 // Wrapper that populate "toIds" argument with all our friends if it is not specified
328 // { type, endpoint, data, toIds, transaction }
329 function createRequest (options
, callback
) {
330 if (!callback
) callback = function () {}
331 if (options
.toIds
) return requestScheduler
.createRequest(options
, callback
)
333 // If the "toIds" pods is not specified, we send the request to all our friends
334 db
.Pod
.listAllIds(options
.transaction
, function (err
, podIds
) {
336 logger
.error('Cannot get pod ids', { error: err
})
340 const newOptions
= Object
.assign(options
, { toIds: podIds
})
341 return requestScheduler
.createRequest(newOptions
, callback
)
345 function createVideoQaduRequest (options
, callback
) {
346 if (!callback
) callback
= utils
.createEmptyCallback()
348 requestSchedulerVideoQadu
.createRequest(options
, callback
)
351 function createVideoEventRequest (options
, callback
) {
352 if (!callback
) callback
= utils
.createEmptyCallback()
354 requestSchedulerVideoEvent
.createRequest(options
, callback
)
357 function isMe (host
) {
358 return host
=== constants
.CONFIG
.WEBSERVER
.HOST