1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import * as request from 'request'
3 import * as Sequelize from 'sequelize'
5 import { database as db } from '../initializers/database'
11 REQUEST_ENDPOINT_ACTIONS,
13 } from '../initializers'
23 RequestSchedulerOptions,
25 RequestVideoQaduScheduler,
26 RequestVideoQaduSchedulerOptions,
28 RequestVideoEventScheduler,
29 RequestVideoEventSchedulerOptions
37 RequestVideoEventType,
41 type QaduParam = { videoId: string, type: RequestVideoQaduType }
42 type EventParam = { videoId: string, type: RequestVideoEventType }
44 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
46 const requestScheduler = new RequestScheduler()
47 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
48 const requestVideoEventScheduler = new RequestVideoEventScheduler()
50 function activateSchedulers () {
51 requestScheduler.activate()
52 requestVideoQaduScheduler.activate()
53 requestVideoEventScheduler.activate()
56 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
58 type: ENDPOINT_ACTIONS.ADD,
59 endpoint: REQUEST_ENDPOINTS.VIDEOS,
63 createRequest(options, callback)
66 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
68 type: ENDPOINT_ACTIONS.UPDATE,
69 endpoint: REQUEST_ENDPOINTS.VIDEOS,
73 createRequest(options, callback)
76 function removeVideoToFriends (videoParams: Object) {
78 type: ENDPOINT_ACTIONS.REMOVE,
79 endpoint: REQUEST_ENDPOINTS.VIDEOS,
83 createRequest(options)
86 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) {
88 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
89 endpoint: REQUEST_ENDPOINTS.VIDEOS,
91 toIds: [ video.Author.podId ],
94 createRequest(options)
97 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
99 videoId: qaduParam.videoId,
100 type: qaduParam.type,
103 return createVideoQaduRequest(options, callback)
106 function quickAndDirtyUpdatesVideoToFriends (
107 qadusParams: QaduParam[],
108 transaction: Sequelize.Transaction,
109 finalCallback: (err: Error) => void
113 qadusParams.forEach(function (qaduParams) {
114 const fun = function (callback) {
115 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
121 series(tasks, finalCallback)
124 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
126 videoId: eventParam.videoId,
127 type: eventParam.type,
130 createVideoEventRequest(options, callback)
133 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
136 eventsParams.forEach(function (eventParams) {
137 const fun = function (callback) {
138 addEventToRemoteVideo(eventParams, transaction, callback)
144 series(tasks, finalCallback)
147 function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) {
148 db.Pod.countAll(function (err, count) {
149 if (err) return callback(err)
151 const hasFriends = (count !== 0)
152 callback(null, hasFriends)
156 function makeFriends (hosts: string[], callback: (err: Error) => void) {
159 logger.info('Make friends!')
160 getMyPublicCert(function (err, cert) {
162 logger.error('Cannot read public cert.')
166 eachSeries(hosts, function (host, callbackEach) {
167 computeForeignPodsList(host, podsScore, callbackEach)
168 }, function (err: Error) {
169 if (err) return callback(err)
171 logger.debug('Pods scores computed.', { podsScore: podsScore })
172 const podsList = computeWinningPods(hosts, podsScore)
173 logger.debug('Pods that we keep.', { podsToKeep: podsList })
175 makeRequestsToWinningPods(cert, podsList, callback)
180 function quitFriends (callback: (err: Error) => void) {
181 // Stop pool requests
182 requestScheduler.deactivate()
185 function flushRequests (callbackAsync) {
186 requestScheduler.flush(err => callbackAsync(err))
189 function flushVideoQaduRequests (callbackAsync) {
190 requestVideoQaduScheduler.flush(err => callbackAsync(err))
193 function getPodsList (callbackAsync) {
194 return db.Pod.list(callbackAsync)
197 function announceIQuitMyFriends (pods, callbackAsync) {
198 const requestParams = {
199 method: 'POST' as 'POST',
200 path: '/api/' + API_VERSION + '/remote/pods/remove',
205 // Announce we quit them
206 // We don't care if the request fails
207 // The other pod will exclude us automatically after a while
208 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
209 requestParams.toPod = pod
210 makeSecureRequest(requestParams, callbackEach)
213 logger.error('Some errors while quitting friends.', { err: err })
214 // Don't stop the process
217 return callbackAsync(null, pods)
221 function removePodsFromDB (pods, callbackAsync) {
222 each(pods, function (pod: any, callbackEach) {
223 pod.destroy().asCallback(callbackEach)
226 ], function (err: Error) {
227 // Don't forget to re activate the scheduler, even if there was an error
228 requestScheduler.activate()
230 if (err) return callback(err)
232 logger.info('Removed all remote videos.')
233 return callback(null)
237 function sendOwnedVideosToPod (podId: number) {
238 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
240 logger.error('Cannot get the list of videos we own.')
244 videosList.forEach(function (video) {
245 video.toAddRemoteJSON(function (err, remoteVideo) {
247 logger.error('Cannot convert video to remote.', { error: err })
248 // Don't break the process
254 endpoint: REQUEST_ENDPOINTS.VIDEOS,
259 createRequest(options)
265 function getRequestScheduler () {
266 return requestScheduler
269 function getRequestVideoQaduScheduler () {
270 return requestVideoQaduScheduler
273 function getRequestVideoEventScheduler () {
274 return requestVideoEventScheduler
277 // ---------------------------------------------------------------------------
282 updateVideoToFriends,
283 reportAbuseVideoToFriend,
284 quickAndDirtyUpdateVideoToFriends,
285 quickAndDirtyUpdatesVideoToFriends,
286 addEventToRemoteVideo,
287 addEventsToRemoteVideo,
291 removeVideoToFriends,
292 sendOwnedVideosToPod,
294 getRequestVideoQaduScheduler,
295 getRequestVideoEventScheduler
298 // ---------------------------------------------------------------------------
300 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) {
301 getForeignPodsList(host, function (err, res) {
302 if (err) return callback(err)
304 const foreignPodsList = res.data
306 // Let's give 1 point to the pod we ask the friends list
307 foreignPodsList.push({ host })
309 foreignPodsList.forEach(function (foreignPod) {
310 const foreignPodHost = foreignPod.host
312 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
313 else podsScore[foreignPodHost] = 1
316 return callback(null)
320 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
321 // Build the list of pods to add
322 // Only add a pod if it exists in more than a half base pods
324 const baseScore = hosts.length / 2
326 Object.keys(podsScore).forEach(function (podHost) {
327 // If the pod is not me and with a good score we add it
328 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
329 podsList.push({ host: podHost })
336 function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) {
337 const path = '/api/' + API_VERSION + '/pods'
339 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
340 if (err) return callback(err)
343 const json = JSON.parse(body)
344 return callback(null, json)
351 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) {
352 // Stop pool requests
353 requestScheduler.deactivate()
354 // Flush pool requests
355 requestScheduler.forceSend()
357 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) {
359 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
360 method: 'POST' as 'POST',
362 host: CONFIG.WEBSERVER.HOST,
363 email: CONFIG.ADMIN.EMAIL,
368 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) {
370 logger.error('Error with adding %s pod.', pod.host, { error: err })
371 // Don't break the process
372 return callbackEach()
375 if (res.statusCode === 200) {
376 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
377 podObj.save().asCallback(function (err, podCreated) {
379 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
380 return callbackEach()
383 // Add our videos to the request scheduler
384 sendOwnedVideosToPod(podCreated.id)
386 return callbackEach()
389 logger.error('Status not 200 for %s pod.', pod.host)
390 return callbackEach()
393 }, function endRequests () {
394 // Final callback, we've ended all the requests
395 // Now we made new friends, we can re activate the pool of requests
396 requestScheduler.activate()
398 logger.debug('makeRequestsToWinningPods finished.')
399 return callback(null)
403 // Wrapper that populate "toIds" argument with all our friends if it is not specified
404 type CreateRequestOptions = {
406 endpoint: RequestEndpoint
409 transaction: Sequelize.Transaction
411 function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) {
412 if (!callback) callback = function () { /* empty */ }
414 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
416 // If the "toIds" pods is not specified, we send the request to all our friends
417 db.Pod.listAllIds(options.transaction, function (err, podIds) {
419 logger.error('Cannot get pod ids', { error: err })
423 const newOptions = Object.assign(options, { toIds: podIds })
424 return requestScheduler.createRequest(newOptions, callback)
428 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) {
429 if (!callback) callback = createEmptyCallback()
431 requestVideoQaduScheduler.createRequest(options, callback)
434 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) {
435 if (!callback) callback = createEmptyCallback()
437 requestVideoEventScheduler.createRequest(options, callback)
440 function isMe (host: string) {
441 return host === CONFIG.WEBSERVER.HOST