1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import * as request from 'request'
3 import * as Sequelize from 'sequelize'
5 import { database as db } from '../initializers/database'
11 REQUEST_ENDPOINT_ACTIONS,
13 } from '../initializers'
23 RequestSchedulerOptions,
25 RequestVideoQaduScheduler,
26 RequestVideoQaduSchedulerOptions,
28 RequestVideoEventScheduler,
29 RequestVideoEventSchedulerOptions
37 RequestVideoEventType,
41 type QaduParam = { videoId: string, type: RequestVideoQaduType }
42 type EventParam = { videoId: string, type: RequestVideoEventType }
44 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
46 const requestScheduler = new RequestScheduler()
47 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
48 const requestVideoEventScheduler = new RequestVideoEventScheduler()
50 function activateSchedulers () {
51 requestScheduler.activate()
52 requestVideoQaduScheduler.activate()
53 requestVideoEventScheduler.activate()
56 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
58 type: ENDPOINT_ACTIONS.ADD,
59 endpoint: REQUEST_ENDPOINTS.VIDEOS,
63 createRequest(options, callback)
66 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
68 type: ENDPOINT_ACTIONS.UPDATE,
69 endpoint: REQUEST_ENDPOINTS.VIDEOS,
73 createRequest(options, callback)
76 function removeVideoToFriends (videoParams: Object) {
78 type: ENDPOINT_ACTIONS.REMOVE,
79 endpoint: REQUEST_ENDPOINTS.VIDEOS,
83 createRequest(options)
86 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) {
88 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
89 endpoint: REQUEST_ENDPOINTS.VIDEOS,
91 toIds: [ video.Author.podId ],
94 createRequest(options)
97 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
99 videoId: qaduParam.videoId,
100 type: qaduParam.type,
103 return createVideoQaduRequest(options, callback)
106 function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
109 qadusParams.forEach(function (qaduParams) {
110 const fun = function (callback) {
111 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
117 series(tasks, finalCallback)
120 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
122 videoId: eventParam.videoId,
123 type: eventParam.type,
126 createVideoEventRequest(options, callback)
129 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
132 eventsParams.forEach(function (eventParams) {
133 const fun = function (callback) {
134 addEventToRemoteVideo(eventParams, transaction, callback)
140 series(tasks, finalCallback)
143 function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) {
144 db.Pod.countAll(function (err, count) {
145 if (err) return callback(err)
147 const hasFriends = (count !== 0)
148 callback(null, hasFriends)
152 function makeFriends (hosts: string[], callback: (err: Error) => void) {
155 logger.info('Make friends!')
156 getMyPublicCert(function (err, cert) {
158 logger.error('Cannot read public cert.')
162 eachSeries(hosts, function (host, callbackEach) {
163 computeForeignPodsList(host, podsScore, callbackEach)
164 }, function (err: Error) {
165 if (err) return callback(err)
167 logger.debug('Pods scores computed.', { podsScore: podsScore })
168 const podsList = computeWinningPods(hosts, podsScore)
169 logger.debug('Pods that we keep.', { podsToKeep: podsList })
171 makeRequestsToWinningPods(cert, podsList, callback)
176 function quitFriends (callback: (err: Error) => void) {
177 // Stop pool requests
178 requestScheduler.deactivate()
181 function flushRequests (callbackAsync) {
182 requestScheduler.flush(err => callbackAsync(err))
185 function flushVideoQaduRequests (callbackAsync) {
186 requestVideoQaduScheduler.flush(err => callbackAsync(err))
189 function getPodsList (callbackAsync) {
190 return db.Pod.list(callbackAsync)
193 function announceIQuitMyFriends (pods, callbackAsync) {
194 const requestParams = {
195 method: 'POST' as 'POST',
196 path: '/api/' + API_VERSION + '/remote/pods/remove',
201 // Announce we quit them
202 // We don't care if the request fails
203 // The other pod will exclude us automatically after a while
204 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
205 requestParams.toPod = pod
206 makeSecureRequest(requestParams, callbackEach)
209 logger.error('Some errors while quitting friends.', { err: err })
210 // Don't stop the process
213 return callbackAsync(null, pods)
217 function removePodsFromDB (pods, callbackAsync) {
218 each(pods, function (pod: any, callbackEach) {
219 pod.destroy().asCallback(callbackEach)
222 ], function (err: Error) {
223 // Don't forget to re activate the scheduler, even if there was an error
224 requestScheduler.activate()
226 if (err) return callback(err)
228 logger.info('Removed all remote videos.')
229 return callback(null)
233 function sendOwnedVideosToPod (podId: number) {
234 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
236 logger.error('Cannot get the list of videos we own.')
240 videosList.forEach(function (video) {
241 video.toAddRemoteJSON(function (err, remoteVideo) {
243 logger.error('Cannot convert video to remote.', { error: err })
244 // Don't break the process
250 endpoint: REQUEST_ENDPOINTS.VIDEOS,
255 createRequest(options)
261 function getRequestScheduler () {
262 return requestScheduler
265 function getRequestVideoQaduScheduler () {
266 return requestVideoQaduScheduler
269 function getRequestVideoEventScheduler () {
270 return requestVideoEventScheduler
273 // ---------------------------------------------------------------------------
278 updateVideoToFriends,
279 reportAbuseVideoToFriend,
280 quickAndDirtyUpdateVideoToFriends,
281 quickAndDirtyUpdatesVideoToFriends,
282 addEventToRemoteVideo,
283 addEventsToRemoteVideo,
287 removeVideoToFriends,
288 sendOwnedVideosToPod,
290 getRequestVideoQaduScheduler,
291 getRequestVideoEventScheduler
294 // ---------------------------------------------------------------------------
296 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) {
297 getForeignPodsList(host, function (err, res) {
298 if (err) return callback(err)
300 const foreignPodsList = res.data
302 // Let's give 1 point to the pod we ask the friends list
303 foreignPodsList.push({ host })
305 foreignPodsList.forEach(function (foreignPod) {
306 const foreignPodHost = foreignPod.host
308 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
309 else podsScore[foreignPodHost] = 1
312 return callback(null)
316 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
317 // Build the list of pods to add
318 // Only add a pod if it exists in more than a half base pods
320 const baseScore = hosts.length / 2
322 Object.keys(podsScore).forEach(function (podHost) {
323 // If the pod is not me and with a good score we add it
324 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
325 podsList.push({ host: podHost })
332 function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) {
333 const path = '/api/' + API_VERSION + '/pods'
335 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
336 if (err) return callback(err)
339 const json = JSON.parse(body)
340 return callback(null, json)
347 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) {
348 // Stop pool requests
349 requestScheduler.deactivate()
350 // Flush pool requests
351 requestScheduler.forceSend()
353 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) {
355 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
356 method: 'POST' as 'POST',
358 host: CONFIG.WEBSERVER.HOST,
359 email: CONFIG.ADMIN.EMAIL,
364 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) {
366 logger.error('Error with adding %s pod.', pod.host, { error: err })
367 // Don't break the process
368 return callbackEach()
371 if (res.statusCode === 200) {
372 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
373 podObj.save().asCallback(function (err, podCreated) {
375 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
376 return callbackEach()
379 // Add our videos to the request scheduler
380 sendOwnedVideosToPod(podCreated.id)
382 return callbackEach()
385 logger.error('Status not 200 for %s pod.', pod.host)
386 return callbackEach()
389 }, function endRequests () {
390 // Final callback, we've ended all the requests
391 // Now we made new friends, we can re activate the pool of requests
392 requestScheduler.activate()
394 logger.debug('makeRequestsToWinningPods finished.')
395 return callback(null)
399 // Wrapper that populate "toIds" argument with all our friends if it is not specified
400 type CreateRequestOptions = {
402 endpoint: RequestEndpoint
405 transaction: Sequelize.Transaction
407 function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) {
408 if (!callback) callback = function () { /* empty */ }
410 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
412 // If the "toIds" pods is not specified, we send the request to all our friends
413 db.Pod.listAllIds(options.transaction, function (err, podIds) {
415 logger.error('Cannot get pod ids', { error: err })
419 const newOptions = Object.assign(options, { toIds: podIds })
420 return requestScheduler.createRequest(newOptions, callback)
424 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) {
425 if (!callback) callback = createEmptyCallback()
427 requestVideoQaduScheduler.createRequest(options, callback)
430 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) {
431 if (!callback) callback = createEmptyCallback()
433 requestVideoEventScheduler.createRequest(options, callback)
436 function isMe (host: string) {
437 return host === CONFIG.WEBSERVER.HOST