1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import * as request from 'request'
3 import * as Sequelize from 'sequelize'
5 import { database as db } from '../initializers/database'
11 REQUEST_ENDPOINT_ACTIONS,
13 } from '../initializers'
23 RequestSchedulerOptions,
25 RequestVideoQaduScheduler,
26 RequestVideoQaduSchedulerOptions,
28 RequestVideoEventScheduler,
29 RequestVideoEventSchedulerOptions
31 import { PodInstance, VideoInstance } from '../models'
33 type QaduParam = { videoId: string, type: string }
34 type EventParam = { videoId: string, type: string }
36 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
38 const requestScheduler = new RequestScheduler()
39 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
40 const requestVideoEventScheduler = new RequestVideoEventScheduler()
42 function activateSchedulers () {
43 requestScheduler.activate()
44 requestVideoQaduScheduler.activate()
45 requestVideoEventScheduler.activate()
48 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
50 type: ENDPOINT_ACTIONS.ADD,
51 endpoint: REQUEST_ENDPOINTS.VIDEOS,
55 createRequest(options, callback)
58 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
60 type: ENDPOINT_ACTIONS.UPDATE,
61 endpoint: REQUEST_ENDPOINTS.VIDEOS,
65 createRequest(options, callback)
68 function removeVideoToFriends (videoParams: Object) {
70 type: ENDPOINT_ACTIONS.REMOVE,
71 endpoint: REQUEST_ENDPOINTS.VIDEOS,
75 createRequest(options)
78 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) {
80 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
81 endpoint: REQUEST_ENDPOINTS.VIDEOS,
83 toIds: [ video.Author.podId ],
86 createRequest(options)
89 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
91 videoId: qaduParam.videoId,
95 return createVideoQaduRequest(options, callback)
98 function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
101 qadusParams.forEach(function (qaduParams) {
102 const fun = function (callback) {
103 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
109 series(tasks, finalCallback)
112 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
114 videoId: eventParam.videoId,
115 type: eventParam.type,
118 createVideoEventRequest(options, callback)
121 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
124 eventsParams.forEach(function (eventParams) {
125 const fun = function (callback) {
126 addEventToRemoteVideo(eventParams, transaction, callback)
132 series(tasks, finalCallback)
135 function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) {
136 db.Pod.countAll(function (err, count) {
137 if (err) return callback(err)
139 const hasFriends = (count !== 0)
140 callback(null, hasFriends)
144 function makeFriends (hosts: string[], callback: (err: Error) => void) {
147 logger.info('Make friends!')
148 getMyPublicCert(function (err, cert) {
150 logger.error('Cannot read public cert.')
154 eachSeries(hosts, function (host, callbackEach) {
155 computeForeignPodsList(host, podsScore, callbackEach)
156 }, function (err: Error) {
157 if (err) return callback(err)
159 logger.debug('Pods scores computed.', { podsScore: podsScore })
160 const podsList = computeWinningPods(hosts, podsScore)
161 logger.debug('Pods that we keep.', { podsToKeep: podsList })
163 makeRequestsToWinningPods(cert, podsList, callback)
168 function quitFriends (callback: (err: Error) => void) {
169 // Stop pool requests
170 requestScheduler.deactivate()
173 function flushRequests (callbackAsync) {
174 requestScheduler.flush(err => callbackAsync(err))
177 function flushVideoQaduRequests (callbackAsync) {
178 requestVideoQaduScheduler.flush(err => callbackAsync(err))
181 function getPodsList (callbackAsync) {
182 return db.Pod.list(callbackAsync)
185 function announceIQuitMyFriends (pods, callbackAsync) {
186 const requestParams = {
187 method: 'POST' as 'POST',
188 path: '/api/' + API_VERSION + '/remote/pods/remove',
193 // Announce we quit them
194 // We don't care if the request fails
195 // The other pod will exclude us automatically after a while
196 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
197 requestParams.toPod = pod
198 makeSecureRequest(requestParams, callbackEach)
201 logger.error('Some errors while quitting friends.', { err: err })
202 // Don't stop the process
205 return callbackAsync(null, pods)
209 function removePodsFromDB (pods, callbackAsync) {
210 each(pods, function (pod: any, callbackEach) {
211 pod.destroy().asCallback(callbackEach)
214 ], function (err: Error) {
215 // Don't forget to re activate the scheduler, even if there was an error
216 requestScheduler.activate()
218 if (err) return callback(err)
220 logger.info('Removed all remote videos.')
221 return callback(null)
225 function sendOwnedVideosToPod (podId: number) {
226 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
228 logger.error('Cannot get the list of videos we own.')
232 videosList.forEach(function (video) {
233 video.toAddRemoteJSON(function (err, remoteVideo) {
235 logger.error('Cannot convert video to remote.', { error: err })
236 // Don't break the process
242 endpoint: REQUEST_ENDPOINTS.VIDEOS,
247 createRequest(options)
253 function getRequestScheduler () {
254 return requestScheduler
257 function getRequestVideoQaduScheduler () {
258 return requestVideoQaduScheduler
261 function getRequestVideoEventScheduler () {
262 return requestVideoEventScheduler
265 // ---------------------------------------------------------------------------
270 updateVideoToFriends,
271 reportAbuseVideoToFriend,
272 quickAndDirtyUpdateVideoToFriends,
273 quickAndDirtyUpdatesVideoToFriends,
274 addEventToRemoteVideo,
275 addEventsToRemoteVideo,
279 removeVideoToFriends,
280 sendOwnedVideosToPod,
282 getRequestVideoQaduScheduler,
283 getRequestVideoEventScheduler
286 // ---------------------------------------------------------------------------
288 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) {
289 getForeignPodsList(host, function (err, res) {
290 if (err) return callback(err)
292 const foreignPodsList = res.data
294 // Let's give 1 point to the pod we ask the friends list
295 foreignPodsList.push({ host })
297 foreignPodsList.forEach(function (foreignPod) {
298 const foreignPodHost = foreignPod.host
300 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
301 else podsScore[foreignPodHost] = 1
304 return callback(null)
308 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
309 // Build the list of pods to add
310 // Only add a pod if it exists in more than a half base pods
312 const baseScore = hosts.length / 2
314 Object.keys(podsScore).forEach(function (podHost) {
315 // If the pod is not me and with a good score we add it
316 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
317 podsList.push({ host: podHost })
324 function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) {
325 const path = '/api/' + API_VERSION + '/pods'
327 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
328 if (err) return callback(err)
331 const json = JSON.parse(body)
332 return callback(null, json)
339 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) {
340 // Stop pool requests
341 requestScheduler.deactivate()
342 // Flush pool requests
343 requestScheduler.forceSend()
345 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) {
347 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
348 method: 'POST' as 'POST',
350 host: CONFIG.WEBSERVER.HOST,
351 email: CONFIG.ADMIN.EMAIL,
356 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) {
358 logger.error('Error with adding %s pod.', pod.host, { error: err })
359 // Don't break the process
360 return callbackEach()
363 if (res.statusCode === 200) {
364 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
365 podObj.save().asCallback(function (err, podCreated) {
367 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
368 return callbackEach()
371 // Add our videos to the request scheduler
372 sendOwnedVideosToPod(podCreated.id)
374 return callbackEach()
377 logger.error('Status not 200 for %s pod.', pod.host)
378 return callbackEach()
381 }, function endRequests () {
382 // Final callback, we've ended all the requests
383 // Now we made new friends, we can re activate the pool of requests
384 requestScheduler.activate()
386 logger.debug('makeRequestsToWinningPods finished.')
387 return callback(null)
391 // Wrapper that populate "toIds" argument with all our friends if it is not specified
392 type CreateRequestOptions = {
397 transaction: Sequelize.Transaction
399 function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) {
400 if (!callback) callback = function () { /* empty */ }
402 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
404 // If the "toIds" pods is not specified, we send the request to all our friends
405 db.Pod.listAllIds(options.transaction, function (err, podIds) {
407 logger.error('Cannot get pod ids', { error: err })
411 const newOptions = Object.assign(options, { toIds: podIds })
412 return requestScheduler.createRequest(newOptions, callback)
416 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) {
417 if (!callback) callback = createEmptyCallback()
419 requestVideoQaduScheduler.createRequest(options, callback)
422 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) {
423 if (!callback) callback = createEmptyCallback()
425 requestVideoEventScheduler.createRequest(options, callback)
428 function isMe (host: string) {
429 return host === CONFIG.WEBSERVER.HOST