3 const each
= require('async/each')
4 const eachLimit
= require('async/eachLimit')
5 const eachSeries
= require('async/eachSeries')
6 const series
= require('async/series')
7 const request
= require('request')
8 const waterfall
= require('async/waterfall')
10 const constants
= require('../initializers/constants')
11 const db
= require('../initializers/database')
12 const logger
= require('../helpers/logger')
13 const peertubeCrypto
= require('../helpers/peertube-crypto')
14 const requests
= require('../helpers/requests')
15 const utils
= require('../helpers/utils')
16 const RequestScheduler
= require('./request-scheduler')
17 const RequestVideoQaduScheduler
= require('./request-video-qadu-scheduler')
18 const RequestVideoEventScheduler
= require('./request-video-event-scheduler')
20 const ENDPOINT_ACTIONS
= constants
.REQUEST_ENDPOINT_ACTIONS
[constants
.REQUEST_ENDPOINTS
.VIDEOS
]
22 const requestScheduler
= new RequestScheduler()
23 const requestVideoQaduScheduler
= new RequestVideoQaduScheduler()
24 const requestVideoEventScheduler
= new RequestVideoEventScheduler()
30 reportAbuseVideoToFriend
,
31 quickAndDirtyUpdateVideoToFriends
,
32 quickAndDirtyUpdatesVideoToFriends
,
33 addEventToRemoteVideo
,
34 addEventsToRemoteVideo
,
41 getRequestVideoQaduScheduler
,
42 getRequestVideoEventScheduler
45 function activate () {
46 requestScheduler
.activate()
47 requestVideoQaduScheduler
.activate()
48 requestVideoEventScheduler
.activate()
51 function addVideoToFriends (videoData
, transaction
, callback
) {
53 type: ENDPOINT_ACTIONS
.ADD
,
54 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
58 createRequest(options
, callback
)
61 function updateVideoToFriends (videoData
, transaction
, callback
) {
63 type: ENDPOINT_ACTIONS
.UPDATE
,
64 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
68 createRequest(options
, callback
)
71 function removeVideoToFriends (videoParams
) {
73 type: ENDPOINT_ACTIONS
.REMOVE
,
74 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
77 createRequest(options
)
80 function reportAbuseVideoToFriend (reportData
, video
) {
82 type: ENDPOINT_ACTIONS
.REPORT_ABUSE
,
83 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
85 toIds: [ video
.Author
.podId
]
87 createRequest(options
)
90 function quickAndDirtyUpdateVideoToFriends (qaduParams
, transaction
, callback
) {
92 videoId: qaduParams
.videoId
,
93 type: qaduParams
.type
,
96 return createVideoQaduRequest(options
, callback
)
99 function quickAndDirtyUpdatesVideoToFriends (qadusParams
, transaction
, finalCallback
) {
102 qadusParams
.forEach(function (qaduParams
) {
103 const fun = function (callback
) {
104 quickAndDirtyUpdateVideoToFriends(qaduParams
, transaction
, callback
)
110 series(tasks
, finalCallback
)
113 function addEventToRemoteVideo (eventParams
, transaction
, callback
) {
115 videoId: eventParams
.videoId
,
116 type: eventParams
.type
,
119 createVideoEventRequest(options
, callback
)
122 function addEventsToRemoteVideo (eventsParams
, transaction
, finalCallback
) {
125 eventsParams
.forEach(function (eventParams
) {
126 const fun = function (callback
) {
127 addEventToRemoteVideo(eventParams
, transaction
, callback
)
133 series(tasks
, finalCallback
)
136 function hasFriends (callback
) {
137 db
.Pod
.countAll(function (err
, count
) {
138 if (err
) return callback(err
)
140 const hasFriends
= (count
!== 0)
141 callback(null, hasFriends
)
145 function makeFriends (hosts
, callback
) {
148 logger
.info('Make friends!')
149 peertubeCrypto
.getMyPublicCert(function (err
, cert
) {
151 logger
.error('Cannot read public cert.')
155 eachSeries(hosts
, function (host
, callbackEach
) {
156 computeForeignPodsList(host
, podsScore
, callbackEach
)
158 if (err
) return callback(err
)
160 logger
.debug('Pods scores computed.', { podsScore: podsScore
})
161 const podsList
= computeWinningPods(hosts
, podsScore
)
162 logger
.debug('Pods that we keep.', { podsToKeep: podsList
})
164 makeRequestsToWinningPods(cert
, podsList
, callback
)
169 function quitFriends (callback
) {
170 // Stop pool requests
171 requestScheduler
.deactivate()
174 function flushRequests (callbackAsync
) {
175 requestScheduler
.flush(err
=> callbackAsync(err
))
178 function flushVideoQaduRequests (callbackAsync
) {
179 requestVideoQaduScheduler
.flush(err
=> callbackAsync(err
))
182 function getPodsList (callbackAsync
) {
183 return db
.Pod
.list(callbackAsync
)
186 function announceIQuitMyFriends (pods
, callbackAsync
) {
187 const requestParams
= {
189 path: '/api/' + constants
.API_VERSION
+ '/pods/remove',
193 // Announce we quit them
194 // We don't care if the request fails
195 // The other pod will exclude us automatically after a while
196 eachLimit(pods
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
197 requestParams
.toPod
= pod
198 requests
.makeSecureRequest(requestParams
, callbackEach
)
201 logger
.error('Some errors while quitting friends.', { err: err
})
202 // Don't stop the process
205 return callbackAsync(null, pods
)
209 function removePodsFromDB (pods
, callbackAsync
) {
210 each(pods
, function (pod
, callbackEach
) {
211 pod
.destroy().asCallback(callbackEach
)
215 // Don't forget to re activate the scheduler, even if there was an error
216 requestScheduler
.activate()
218 if (err
) return callback(err
)
220 logger
.info('Removed all remote videos.')
221 return callback(null)
225 function sendOwnedVideosToPod (podId
) {
226 db
.Video
.listOwnedAndPopulateAuthorAndTags(function (err
, videosList
) {
228 logger
.error('Cannot get the list of videos we own.')
232 videosList
.forEach(function (video
) {
233 video
.toAddRemoteJSON(function (err
, remoteVideo
) {
235 logger
.error('Cannot convert video to remote.', { error: err
})
236 // Don't break the process
242 endpoint: constants
.REQUEST_ENDPOINTS
.VIDEOS
,
246 createRequest(options
)
252 function getRequestScheduler () {
253 return requestScheduler
256 function getRequestVideoQaduScheduler () {
257 return requestVideoQaduScheduler
260 function getRequestVideoEventScheduler () {
261 return requestVideoEventScheduler
264 // ---------------------------------------------------------------------------
266 module
.exports
= friends
268 // ---------------------------------------------------------------------------
270 function computeForeignPodsList (host
, podsScore
, callback
) {
271 getForeignPodsList(host
, function (err
, res
) {
272 if (err
) return callback(err
)
274 const foreignPodsList
= res
.data
276 // Let's give 1 point to the pod we ask the friends list
277 foreignPodsList
.push({ host
})
279 foreignPodsList
.forEach(function (foreignPod
) {
280 const foreignPodHost
= foreignPod
.host
282 if (podsScore
[foreignPodHost
]) podsScore
[foreignPodHost
]++
283 else podsScore
[foreignPodHost
] = 1
290 function computeWinningPods (hosts
, podsScore
) {
291 // Build the list of pods to add
292 // Only add a pod if it exists in more than a half base pods
294 const baseScore
= hosts
.length
/ 2
296 Object
.keys(podsScore
).forEach(function (podHost
) {
297 // If the pod is not me and with a good score we add it
298 if (isMe(podHost
) === false && podsScore
[podHost
] > baseScore
) {
299 podsList
.push({ host: podHost
})
306 function getForeignPodsList (host
, callback
) {
307 const path
= '/api/' + constants
.API_VERSION
+ '/pods'
309 request
.get(constants
.REMOTE_SCHEME
.HTTP
+ '://' + host
+ path
, function (err
, response
, body
) {
310 if (err
) return callback(err
)
313 const json
= JSON
.parse(body
)
314 return callback(null, json
)
321 function makeRequestsToWinningPods (cert
, podsList
, callback
) {
322 // Stop pool requests
323 requestScheduler
.deactivate()
324 // Flush pool requests
325 requestScheduler
.forceSend()
327 eachLimit(podsList
, constants
.REQUESTS_IN_PARALLEL
, function (pod
, callbackEach
) {
329 url: constants
.REMOTE_SCHEME
.HTTP
+ '://' + pod
.host
+ '/api/' + constants
.API_VERSION
+ '/pods/',
332 host: constants
.CONFIG
.WEBSERVER
.HOST
,
333 email: constants
.CONFIG
.ADMIN
.EMAIL
,
338 requests
.makeRetryRequest(params
, function (err
, res
, body
) {
340 logger
.error('Error with adding %s pod.', pod
.host
, { error: err
})
341 // Don't break the process
342 return callbackEach()
345 if (res
.statusCode
=== 200) {
346 const podObj
= db
.Pod
.build({ host: pod
.host
, publicKey: body
.cert
, email: body
.email
})
347 podObj
.save().asCallback(function (err
, podCreated
) {
349 logger
.error('Cannot add friend %s pod.', pod
.host
, { error: err
})
350 return callbackEach()
353 // Add our videos to the request scheduler
354 sendOwnedVideosToPod(podCreated
.id
)
356 return callbackEach()
359 logger
.error('Status not 200 for %s pod.', pod
.host
)
360 return callbackEach()
363 }, function endRequests () {
364 // Final callback, we've ended all the requests
365 // Now we made new friends, we can re activate the pool of requests
366 requestScheduler
.activate()
368 logger
.debug('makeRequestsToWinningPods finished.')
373 // Wrapper that populate "toIds" argument with all our friends if it is not specified
374 // { type, endpoint, data, toIds, transaction }
375 function createRequest (options
, callback
) {
376 if (!callback
) callback = function () {}
377 if (options
.toIds
) return requestScheduler
.createRequest(options
, callback
)
379 // If the "toIds" pods is not specified, we send the request to all our friends
380 db
.Pod
.listAllIds(options
.transaction
, function (err
, podIds
) {
382 logger
.error('Cannot get pod ids', { error: err
})
386 const newOptions
= Object
.assign(options
, { toIds: podIds
})
387 return requestScheduler
.createRequest(newOptions
, callback
)
391 function createVideoQaduRequest (options
, callback
) {
392 if (!callback
) callback
= utils
.createEmptyCallback()
394 requestVideoQaduScheduler
.createRequest(options
, callback
)
397 function createVideoEventRequest (options
, callback
) {
398 if (!callback
) callback
= utils
.createEmptyCallback()
400 requestVideoEventScheduler
.createRequest(options
, callback
)
403 function isMe (host
) {
404 return host
=== constants
.CONFIG
.WEBSERVER
.HOST