1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import request = require('request')
4 const db = require('../initializers/database')
10 REQUEST_ENDPOINT_ACTIONS,
12 } from '../initializers'
22 RequestVideoQaduScheduler,
23 RequestVideoEventScheduler
26 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
28 const requestScheduler = new RequestScheduler()
29 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
30 const requestVideoEventScheduler = new RequestVideoEventScheduler()
32 function activateSchedulers () {
33 requestScheduler.activate()
34 requestVideoQaduScheduler.activate()
35 requestVideoEventScheduler.activate()
38 function addVideoToFriends (videoData, transaction, callback) {
40 type: ENDPOINT_ACTIONS.ADD,
41 endpoint: REQUEST_ENDPOINTS.VIDEOS,
45 createRequest(options, callback)
48 function updateVideoToFriends (videoData, transaction, callback) {
50 type: ENDPOINT_ACTIONS.UPDATE,
51 endpoint: REQUEST_ENDPOINTS.VIDEOS,
55 createRequest(options, callback)
58 function removeVideoToFriends (videoParams) {
60 type: ENDPOINT_ACTIONS.REMOVE,
61 endpoint: REQUEST_ENDPOINTS.VIDEOS,
64 createRequest(options)
67 function reportAbuseVideoToFriend (reportData, video) {
69 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
70 endpoint: REQUEST_ENDPOINTS.VIDEOS,
72 toIds: [ video.Author.podId ]
74 createRequest(options)
77 function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction?, callback?) {
79 videoId: qaduParams.videoId,
80 type: qaduParams.type,
83 return createVideoQaduRequest(options, callback)
86 function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) {
89 qadusParams.forEach(function (qaduParams) {
90 const fun = function (callback) {
91 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
97 series(tasks, finalCallback)
100 function addEventToRemoteVideo (eventParams, transaction?, callback?) {
102 videoId: eventParams.videoId,
103 type: eventParams.type,
106 createVideoEventRequest(options, callback)
109 function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) {
112 eventsParams.forEach(function (eventParams) {
113 const fun = function (callback) {
114 addEventToRemoteVideo(eventParams, transaction, callback)
120 series(tasks, finalCallback)
123 function hasFriends (callback) {
124 db.Pod.countAll(function (err, count) {
125 if (err) return callback(err)
127 const hasFriends = (count !== 0)
128 callback(null, hasFriends)
132 function makeFriends (hosts, callback) {
135 logger.info('Make friends!')
136 getMyPublicCert(function (err, cert) {
138 logger.error('Cannot read public cert.')
142 eachSeries(hosts, function (host, callbackEach) {
143 computeForeignPodsList(host, podsScore, callbackEach)
145 if (err) return callback(err)
147 logger.debug('Pods scores computed.', { podsScore: podsScore })
148 const podsList = computeWinningPods(hosts, podsScore)
149 logger.debug('Pods that we keep.', { podsToKeep: podsList })
151 makeRequestsToWinningPods(cert, podsList, callback)
156 function quitFriends (callback) {
157 // Stop pool requests
158 requestScheduler.deactivate()
161 function flushRequests (callbackAsync) {
162 requestScheduler.flush(err => callbackAsync(err))
165 function flushVideoQaduRequests (callbackAsync) {
166 requestVideoQaduScheduler.flush(err => callbackAsync(err))
169 function getPodsList (callbackAsync) {
170 return db.Pod.list(callbackAsync)
173 function announceIQuitMyFriends (pods, callbackAsync) {
174 const requestParams = {
176 path: '/api/' + API_VERSION + '/remote/pods/remove',
181 // Announce we quit them
182 // We don't care if the request fails
183 // The other pod will exclude us automatically after a while
184 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
185 requestParams.toPod = pod
186 makeSecureRequest(requestParams, callbackEach)
189 logger.error('Some errors while quitting friends.', { err: err })
190 // Don't stop the process
193 return callbackAsync(null, pods)
197 function removePodsFromDB (pods, callbackAsync) {
198 each(pods, function (pod: any, callbackEach) {
199 pod.destroy().asCallback(callbackEach)
203 // Don't forget to re activate the scheduler, even if there was an error
204 requestScheduler.activate()
206 if (err) return callback(err)
208 logger.info('Removed all remote videos.')
209 return callback(null)
213 function sendOwnedVideosToPod (podId) {
214 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
216 logger.error('Cannot get the list of videos we own.')
220 videosList.forEach(function (video) {
221 video.toAddRemoteJSON(function (err, remoteVideo) {
223 logger.error('Cannot convert video to remote.', { error: err })
224 // Don't break the process
230 endpoint: REQUEST_ENDPOINTS.VIDEOS,
234 createRequest(options)
240 function getRequestScheduler () {
241 return requestScheduler
244 function getRequestVideoQaduScheduler () {
245 return requestVideoQaduScheduler
248 function getRequestVideoEventScheduler () {
249 return requestVideoEventScheduler
252 // ---------------------------------------------------------------------------
257 updateVideoToFriends,
258 reportAbuseVideoToFriend,
259 quickAndDirtyUpdateVideoToFriends,
260 quickAndDirtyUpdatesVideoToFriends,
261 addEventToRemoteVideo,
262 addEventsToRemoteVideo,
266 removeVideoToFriends,
267 sendOwnedVideosToPod,
269 getRequestVideoQaduScheduler,
270 getRequestVideoEventScheduler
273 // ---------------------------------------------------------------------------
275 function computeForeignPodsList (host, podsScore, callback) {
276 getForeignPodsList(host, function (err, res) {
277 if (err) return callback(err)
279 const foreignPodsList = res.data
281 // Let's give 1 point to the pod we ask the friends list
282 foreignPodsList.push({ host })
284 foreignPodsList.forEach(function (foreignPod) {
285 const foreignPodHost = foreignPod.host
287 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
288 else podsScore[foreignPodHost] = 1
295 function computeWinningPods (hosts, podsScore) {
296 // Build the list of pods to add
297 // Only add a pod if it exists in more than a half base pods
299 const baseScore = hosts.length / 2
301 Object.keys(podsScore).forEach(function (podHost) {
302 // If the pod is not me and with a good score we add it
303 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
304 podsList.push({ host: podHost })
311 function getForeignPodsList (host, callback) {
312 const path = '/api/' + API_VERSION + '/pods'
314 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
315 if (err) return callback(err)
318 const json = JSON.parse(body)
319 return callback(null, json)
326 function makeRequestsToWinningPods (cert, podsList, callback) {
327 // Stop pool requests
328 requestScheduler.deactivate()
329 // Flush pool requests
330 requestScheduler.forceSend()
332 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: any, callbackEach) {
334 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
337 host: CONFIG.WEBSERVER.HOST,
338 email: CONFIG.ADMIN.EMAIL,
343 makeRetryRequest(params, function (err, res, body) {
345 logger.error('Error with adding %s pod.', pod.host, { error: err })
346 // Don't break the process
347 return callbackEach()
350 if (res.statusCode === 200) {
351 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
352 podObj.save().asCallback(function (err, podCreated) {
354 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
355 return callbackEach()
358 // Add our videos to the request scheduler
359 sendOwnedVideosToPod(podCreated.id)
361 return callbackEach()
364 logger.error('Status not 200 for %s pod.', pod.host)
365 return callbackEach()
368 }, function endRequests () {
369 // Final callback, we've ended all the requests
370 // Now we made new friends, we can re activate the pool of requests
371 requestScheduler.activate()
373 logger.debug('makeRequestsToWinningPods finished.')
378 // Wrapper that populate "toIds" argument with all our friends if it is not specified
379 // { type, endpoint, data, toIds, transaction }
380 function createRequest (options, callback?) {
381 if (!callback) callback = function () { /* empty */ }
382 if (options.toIds) return requestScheduler.createRequest(options, callback)
384 // If the "toIds" pods is not specified, we send the request to all our friends
385 db.Pod.listAllIds(options.transaction, function (err, podIds) {
387 logger.error('Cannot get pod ids', { error: err })
391 const newOptions = Object.assign(options, { toIds: podIds })
392 return requestScheduler.createRequest(newOptions, callback)
396 function createVideoQaduRequest (options, callback) {
397 if (!callback) callback = createEmptyCallback()
399 requestVideoQaduScheduler.createRequest(options, callback)
402 function createVideoEventRequest (options, callback) {
403 if (!callback) callback = createEmptyCallback()
405 requestVideoEventScheduler.createRequest(options, callback)
408 function isMe (host) {
409 return host === CONFIG.WEBSERVER.HOST