From 528a9efa8272532bbd0dafc35c3e05e57c50f61e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Sat, 18 Jun 2016 16:13:54 +0200 Subject: Try to make a better communication (between pods) module --- server/lib/friends.js | 158 ++++++++++++++++--------------- server/lib/requestsScheduler.js | 200 +++++++++++++++++++++------------------- server/lib/videos.js | 27 +++++- 3 files changed, 214 insertions(+), 171 deletions(-) (limited to 'server/lib') diff --git a/server/lib/friends.js b/server/lib/friends.js index e986fa006..d81a603ad 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -24,15 +24,15 @@ const pods = { getMyCertificate: getMyCertificate, makeFriends: makeFriends, quitFriends: quitFriends, - removeVideoToFriends: removeVideoToFriends + removeVideoToFriends: removeVideoToFriends, + sendOwnedVideosToPod: sendOwnedVideosToPod } function addVideoToFriends (video) { - // To avoid duplicates - const id = video.name + video.magnetUri // ensure namePath is null video.namePath = null - requestsScheduler.addRequest(id, 'add', video) + + requestsScheduler.addRequest('add', video) } function hasFriends (callback) { @@ -60,7 +60,7 @@ function makeFriends (callback) { const urls = config.get('network.friends') - async.each(urls, function (url, callbackEach) { + async.eachSeries(urls, function (url, callbackEach) { computeForeignPodsList(url, podsScore, callbackEach) }, function (err) { if (err) return callback(err) @@ -78,7 +78,7 @@ function quitFriends (callback) { // Stop pool requests requestsScheduler.deactivate() // Flush pool requests - requestsScheduler.forceSend() + requestsScheduler.flush() async.waterfall([ function getPodsList (callbackAsync) { @@ -86,19 +86,25 @@ function quitFriends (callback) { }, function announceIQuitMyFriends (pods, callbackAsync) { - const request = { + const requestParams = { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/remove', - sign: true, - encrypt: true, - data: { - url: 'me' // Fake data - } + sign: true } // Announce we quit them - requests.makeMultipleRetryRequest(request, pods, function (err) { - return callbackAsync(err) + // We don't care if the request fails + // The other pod will exclude us automatically after a while + async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + requestParams.toPod = pod + requests.makeSecureRequest(requestParams, callbackEach) + }, function (err) { + if (err) { + logger.error('Some errors while quitting friends.', { err: err }) + // Don't stop the process + } + + return callbackAsync() }) }, @@ -136,9 +142,28 @@ function quitFriends (callback) { } function removeVideoToFriends (video) { - // To avoid duplicates - const id = video.name + video.magnetUri - requestsScheduler.addRequest(id, 'remove', video) + requestsScheduler.addRequest('remove', video) +} + +function sendOwnedVideosToPod (podId) { + Videos.listOwned(function (err, videosList) { + if (err) { + logger.error('Cannot get the list of videos we own.') + return + } + + videosList.forEach(function (video) { + videos.convertVideoToRemote(video, function (err, remoteVideo) { + if (err) { + logger.error('Cannot convert video to remote.', { error: err }) + // Don't break the process + return + } + + requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) + }) + }) + }) } // --------------------------------------------------------------------------- @@ -148,18 +173,19 @@ module.exports = pods // --------------------------------------------------------------------------- function computeForeignPodsList (url, podsScore, callback) { - // Let's give 1 point to the pod we ask the friends list - podsScore[url] = 1 - getForeignPodsList(url, function (err, foreignPodsList) { if (err) return callback(err) - if (foreignPodsList.length === 0) return callback() + + if (!foreignPodsList) foreignPodsList = [] + + // Let's give 1 point to the pod we ask the friends list + foreignPodsList.push({ url: url }) foreignPodsList.forEach(function (foreignPod) { - const foreignUrl = foreignPod.url + const foreignPodUrl = foreignPod.url - if (podsScore[foreignUrl]) podsScore[foreignUrl]++ - else podsScore[foreignUrl] = 1 + if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++ + else podsScore[foreignPodUrl] = 1 }) callback() @@ -194,63 +220,43 @@ function makeRequestsToWinningPods (cert, podsList, callback) { // Flush pool requests requestsScheduler.forceSend() - // Get the list of our videos to send to our new friends - Videos.listOwned(function (err, videosList) { - if (err) { - logger.error('Cannot get the list of videos we own.') - return callback(err) - } - - const data = { - url: http + '://' + host + ':' + port, - publicKey: cert, - videos: videosList + async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + const params = { + url: pod.url + '/api/' + constants.API_VERSION + '/pods/', + method: 'POST', + json: { + url: http + '://' + host + ':' + port, + publicKey: cert + } } - requests.makeMultipleRetryRequest( - { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, - - podsList, - - // Callback called after each request - function eachRequest (err, response, body, url, pod, callbackEachRequest) { - // We add the pod if it responded correctly with its public certificate - if (!err && response.statusCode === 200) { - Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { - if (err) { - logger.error('Error with adding %s pod.', pod.url, { error: err }) - return callbackEachRequest() - } - - videos.createRemoteVideos(body.videos, function (err) { - if (err) { - logger.error('Error with adding videos of pod.', pod.url, { error: err }) - return callbackEachRequest() - } - - logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) - return callbackEachRequest() - }) - }) - } else { - logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) - return callbackEachRequest() - } - }, + requests.makeRetryRequest(params, function (err, res, body) { + if (err) { + logger.error('Error with adding %s pod.', pod.url, { error: err }) + // Don't break the process + return callbackEach() + } - // Final callback, we've ended all the requests - function endRequests (err) { - // Now we made new friends, we can re activate the pool of requests - requestsScheduler.activate() + if (res.statusCode === 200) { + Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) { + if (err) logger.error('Cannot add friend %s pod.', pod.url) - if (err) { - logger.error('There was some errors when we wanted to make friends.') - return callback(err) - } + // Add our videos to the request scheduler + sendOwnedVideosToPod(podCreated._id) - logger.debug('makeRequestsToWinningPods finished.') - return callback(null) + return callbackEach() + }) + } else { + logger.error('Status not 200 for %s pod.', pod.url) + return callbackEach() } - ) + }) + }, function endRequests () { + // Final callback, we've ended all the requests + // Now we made new friends, we can re activate the pool of requests + requestsScheduler.activate() + + logger.debug('makeRequestsToWinningPods finished.') + return callback() }) } diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 78570209d..ac75e5b93 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js @@ -11,13 +11,14 @@ const requests = require('../helpers/requests') const videos = require('../lib/videos') const Videos = require('../models/videos') -const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE let timer = null const requestsScheduler = { activate: activate, addRequest: addRequest, + addRequestTo: addRequestTo, deactivate: deactivate, + flush: flush, forceSend: forceSend } @@ -27,35 +28,37 @@ function activate () { } // Add request to the scheduler -function addRequest (id, type, request) { - logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) +function addRequest (type, data) { + logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) - Requests.findById(id, function (err, entity) { + const request = { + type: type, + data: data + } + + Pods.listAllIds(function (err, podIds) { if (err) { - logger.error('Error when trying to find a request.', { error: err }) - return // Abort + logger.debug('Cannot list pod ids.') + return } - // If there were already a request with this id in the scheduler... - if (entity) { - if (entity.type === type) { - logger.error('Cannot insert two same requests.') - return // Abort - } + // No friends + if (!podIds) return - // Remove the request of the other type - Requests.removeRequestById(id, function (err) { - if (err) { - logger.error('Cannot remove a request.', { error: err }) - return // Abort - } - }) - } else { - Requests.create(id, type, request, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - return // Abort - }) - } + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) + }) + }) +} + +function addRequestTo (podIds, type, data) { + const request = { + type: type, + data: data + } + + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) }) } @@ -64,6 +67,14 @@ function deactivate () { clearInterval(timer) } +function flush () { + Requests.removeAll(function (err) { + if (err) { + logger.error('Cannot flush the requests.', { error: err }) + } + }) +} + function forceSend () { logger.info('Force requests scheduler sending.') makeRequests() @@ -76,54 +87,28 @@ module.exports = requestsScheduler // --------------------------------------------------------------------------- // Make a requests to friends of a certain type -function makeRequest (type, requestsToMake, callback) { +function makeRequest (toPod, requestsToMake, callback) { if (!callback) callback = function () {} - Pods.list(function (err, pods) { - if (err) return callback(err) - - const params = { - encrypt: true, // Security - sign: true, // To prove our identity - method: 'POST', - path: null, // We build the path later - data: requestsToMake // Requests we need to make - } - - // If this is a valid type, we build the path - if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type - } else { - return callback(new Error('Unkown pool request type.')) - } - - const badPods = [] - const goodPods = [] - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { - // We failed the request, add the pod unreachable to the bad pods list - if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { - badPods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - // Request success - goodPods.push(pod._id) - } - - return callbackEachPodFinished() + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) } - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - // All the requests were made, we update the pods score - updatePodsScore(goodPods, badPods) - callback(null) - } + return callback(true) }) } @@ -143,38 +128,65 @@ function makeRequests () { logger.info('Making requests to friends.') + // Requests by pods id const requestsToMake = {} - for (const type of REQUEST_SCHEDULER_TYPE) { - requestsToMake[type] = { - ids: [], - requests: [] - } - } - // For each requests to make, we add it to the correct request type requests.forEach(function (poolRequest) { - if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { - const requestTypeToMake = requestsToMake[poolRequest.type] - requestTypeToMake.requests.push(poolRequest.request) - requestTypeToMake.ids.push(poolRequest._id) - } else { - logger.error('Unkown request type.', { request_type: poolRequest.type }) - return // abort - } + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) }) - for (let type of Object.keys(requestsToMake)) { - const requestTypeToMake = requestsToMake[type] - // If there are requests for this type - if (requestTypeToMake.requests.length !== 0) { - makeRequest(type, requestTypeToMake.requests, function (err) { - if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err }) + const goodPods = [] + const badPods = [] - // We made the requests, so we can remove them from the scheduler - Requests.removeRequests(requestTypeToMake.ids) + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) return logger.error('Error finding pod by id.', { err: err }) + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + Requests.removePodOf(requestToMake.ids, toPodId) + return callbackEach() + } + + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + Requests.removePodOf(requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } + + callbackEach() }) - } - } + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + Requests.removeWithEmptyTo() + }) }) } diff --git a/server/lib/videos.js b/server/lib/videos.js index e0db0e1d5..a74c77dc4 100644 --- a/server/lib/videos.js +++ b/server/lib/videos.js @@ -17,6 +17,7 @@ const uploadDir = pathUtils.join(__dirname, '..', '..', config.get('storage.uplo const thumbnailsDir = pathUtils.join(__dirname, '..', '..', config.get('storage.thumbnails')) const videos = { + convertVideoToRemote: convertVideoToRemote, createRemoteVideos: createRemoteVideos, getVideoDuration: getVideoDuration, getVideoState: getVideoState, @@ -27,6 +28,29 @@ const videos = { seedAllExisting: seedAllExisting } +function convertVideoToRemote (video, callback) { + fs.readFile(thumbnailsDir + video.thumbnail, function (err, thumbnailData) { + if (err) { + logger.error('Cannot read the thumbnail of the video') + return callback(err) + } + + const remoteVideo = { + name: video.name, + description: video.description, + magnetUri: video.magnetUri, + author: video.author, + duration: video.duration, + thumbnailBase64: new Buffer(thumbnailData).toString('base64'), + tags: video.tags, + createdDate: video.createdDate, + podUrl: video.podUrl + } + + return callback(null, remoteVideo) + }) +} + function createRemoteVideos (videos, callback) { // Create the remote videos from the new pod createRemoteVideoObjects(videos, function (err, remoteVideos) { @@ -154,7 +178,8 @@ function createRemoteVideoObjects (videos, callback) { podUrl: video.podUrl, duration: video.duration, thumbnail: thumbnailName, - tags: video.tags + tags: video.tags, + author: video.author } remoteVideos.push(params) -- cgit v1.2.3