From: Chocobozzz Date: Sat, 18 Jun 2016 14:13:54 +0000 (+0200) Subject: Try to make a better communication (between pods) module X-Git-Tag: v0.0.1-alpha~851 X-Git-Url: https://git.immae.eu/?a=commitdiff_plain;h=528a9efa8272532bbd0dafc35c3e05e57c50f61e;p=github%2FChocobozzz%2FPeerTube.git Try to make a better communication (between pods) module --- diff --git a/client/src/vendor.ts b/client/src/vendor.ts index cf1524578..8f029191a 100644 --- a/client/src/vendor.ts +++ b/client/src/vendor.ts @@ -16,7 +16,7 @@ import 'rxjs/Observable'; import 'rxjs/Subject'; import 'rxjs/add/operator/catch'; import 'rxjs/add/operator/map'; -import 'rxjs/add/operator/mergeMap'; +import 'rxjs/add/observable/throw'; import 'bootstrap-loader'; import 'ng2-file-upload'; diff --git a/server/controllers/api/v1/index.js b/server/controllers/api/v1/index.js index 7b3ec32c0..e0c29a8a2 100644 --- a/server/controllers/api/v1/index.js +++ b/server/controllers/api/v1/index.js @@ -5,12 +5,12 @@ const express = require('express') const router = express.Router() const podsController = require('./pods') -const remoteVideosController = require('./remoteVideos') +const remoteController = require('./remote') const usersController = require('./users') const videosController = require('./videos') router.use('/pods', podsController) -router.use('/remotevideos', remoteVideosController) +router.use('/remote', remoteController) router.use('/users', usersController) router.use('/videos', videosController) router.use('/*', badRequest) diff --git a/server/controllers/api/v1/pods.js b/server/controllers/api/v1/pods.js index ecaeba666..881b2090d 100644 --- a/server/controllers/api/v1/pods.js +++ b/server/controllers/api/v1/pods.js @@ -9,19 +9,18 @@ const middlewares = require('../../../middlewares') const Pods = require('../../../models/pods') const oAuth2 = middlewares.oauth2 const reqValidator = middlewares.reqValidators.pods -const secureMiddleware = middlewares.secure -const secureRequest = middlewares.reqValidators.remote.secureRequest +const signatureValidator = middlewares.reqValidators.remote.signature const videos = require('../../../lib/videos') const Videos = require('../../../models/videos') const router = express.Router() -router.get('/', listPods) +router.get('/', listPodsUrl) router.post('/', reqValidator.podsAdd, addPods) router.get('/makefriends', oAuth2.authenticate, reqValidator.makeFriends, makeFriends) router.get('/quitfriends', oAuth2.authenticate, quitFriends) // Post because this is a secured request -router.post('/remove', secureRequest, secureMiddleware.decryptBody, removePods) +router.post('/remove', signatureValidator, removePods) // --------------------------------------------------------------------------- @@ -30,22 +29,17 @@ module.exports = router // --------------------------------------------------------------------------- function addPods (req, res, next) { - const informations = req.body.data + const informations = req.body async.waterfall([ function addPod (callback) { - Pods.add(informations, function (err) { - return callback(err) - }) + Pods.add(informations, callback) }, - function createVideosOfThisPod (callback) { - // Create the remote videos from the new pod - videos.createRemoteVideos(informations.videos, function (err) { - if (err) logger.error('Cannot create remote videos.', { error: err }) + function sendMyVideos (podCreated, callback) { + friends.sendOwnedVideosToPod(podCreated._id) - return callback(err) - }) + callback(null) }, function fetchMyCertificate (callback) { @@ -57,30 +51,19 @@ function addPods (req, res, next) { return callback(null, cert) }) - }, - - function getListOfMyVideos (cert, callback) { - Videos.listOwned(function (err, videosList) { - if (err) { - logger.error('Cannot get the list of owned videos.') - return callback(err) - } - - return callback(null, cert, videosList) - }) } - ], function (err, cert, videosList) { + ], function (err, cert) { if (err) return next(err) - return res.json({ cert: cert, videos: videosList }) + return res.json({ cert: cert }) }) } -function listPods (req, res, next) { - Pods.list(function (err, podsList) { +function listPodsUrl (req, res, next) { + Pods.listAllUrls(function (err, podsUrlList) { if (err) return next(err) - res.json(podsList) + res.json(podsUrlList) }) } diff --git a/server/controllers/api/v1/remote.js b/server/controllers/api/v1/remote.js new file mode 100644 index 000000000..ced8470d7 --- /dev/null +++ b/server/controllers/api/v1/remote.js @@ -0,0 +1,80 @@ +'use strict' + +const async = require('async') +const express = require('express') + +const middlewares = require('../../../middlewares') +const secureMiddleware = middlewares.secure +const reqValidator = middlewares.reqValidators.remote +const logger = require('../../../helpers/logger') +const Videos = require('../../../models/videos') +const videos = require('../../../lib/videos') + +const router = express.Router() + +router.post('/videos', + reqValidator.signature, + reqValidator.dataToDecrypt, + secureMiddleware.decryptBody, + reqValidator.remoteVideos, + remoteVideos +) + +// --------------------------------------------------------------------------- + +module.exports = router + +// --------------------------------------------------------------------------- + +function remoteVideos (req, res, next) { + const requests = req.body.data + const fromUrl = req.body.signature.url + + // We need to process in the same order to keep consistency + // TODO: optimization + async.eachSeries(requests, function (request, callbackEach) { + const video = request.data + + if (request.type === 'add') { + addRemoteVideo(video, callbackEach) + } else if (request.type === 'remove') { + removeRemoteVideo(video, fromUrl, callbackEach) + } + }) + + // We don't need to keep the other pod waiting + return res.type('json').status(204).end() +} + +function addRemoteVideo (videoToCreate, callback) { + videos.createRemoteVideos([ videoToCreate ], function (err, remoteVideos) { + if (err) { + logger.error('Cannot create remote videos.', { error: err }) + // Don't break the process + } + + return callback() + }) +} + +function removeRemoteVideo (videoToRemove, fromUrl, callback) { + const magnetUris = [ videoToRemove.magnetUri ] + + // We need the list because we have to remove some other stuffs (thumbnail etc) + Videos.listFromUrlAndMagnets(fromUrl, magnetUris, function (err, videosList) { + if (err) { + logger.error('Cannot list videos from url and magnets.', { error: err }) + // Don't break the process + return callback() + } + + videos.removeRemoteVideos(videosList, function (err) { + if (err) { + logger.error('Cannot remove remote videos.', { error: err }) + // Don't break the process + } + + return callback() + }) + }) +} diff --git a/server/controllers/api/v1/remoteVideos.js b/server/controllers/api/v1/remoteVideos.js deleted file mode 100644 index 2f41c0411..000000000 --- a/server/controllers/api/v1/remoteVideos.js +++ /dev/null @@ -1,66 +0,0 @@ -'use strict' - -const express = require('express') -const map = require('lodash/map') - -const middlewares = require('../../../middlewares') -const secureMiddleware = middlewares.secure -const reqValidator = middlewares.reqValidators.remote -const logger = require('../../../helpers/logger') -const Videos = require('../../../models/videos') -const videos = require('../../../lib/videos') - -const router = express.Router() - -router.post('/add', - reqValidator.secureRequest, - secureMiddleware.decryptBody, - reqValidator.remoteVideosAdd, - addRemoteVideos -) - -router.post('/remove', - reqValidator.secureRequest, - secureMiddleware.decryptBody, - reqValidator.remoteVideosRemove, - removeRemoteVideo -) - -// --------------------------------------------------------------------------- - -module.exports = router - -// --------------------------------------------------------------------------- - -function addRemoteVideos (req, res, next) { - const videosToCreate = req.body.data - videos.createRemoteVideos(videosToCreate, function (err, remoteVideos) { - if (err) { - logger.error('Cannot create remote videos.', { error: err }) - return next(err) - } - - res.type('json').status(201).end() - }) -} - -function removeRemoteVideo (req, res, next) { - const fromUrl = req.body.signature.url - const magnetUris = map(req.body.data, 'magnetUri') - - Videos.listFromUrlAndMagnets(fromUrl, magnetUris, function (err, videosList) { - if (err) { - logger.error('Cannot list videos from url and magnets.', { error: err }) - return next(err) - } - - videos.removeRemoteVideos(videosList, function (err) { - if (err) { - logger.error('Cannot remove remote videos.', { error: err }) - return next(err) - } - - res.type('json').status(204).end() - }) - }) -} diff --git a/server/controllers/api/v1/videos.js b/server/controllers/api/v1/videos.js index 5449cbcfa..2edb31122 100644 --- a/server/controllers/api/v1/videos.js +++ b/server/controllers/api/v1/videos.js @@ -3,8 +3,6 @@ const async = require('async') const config = require('config') const express = require('express') -const fs = require('fs') -const path = require('path') const multer = require('multer') const constants = require('../../../initializers/constants') @@ -46,7 +44,6 @@ const storage = multer.diskStorage({ }) const reqFiles = multer({ storage: storage }).fields([{ name: 'videofile', maxCount: 1 }]) -const thumbnailsDir = path.join(__dirname, '..', '..', '..', '..', config.get('storage.thumbnails')) router.get('/', reqValidatorPagination.pagination, @@ -127,34 +124,25 @@ function addVideo (req, res, next) { return callback(err) } - return callback(null, torrent, thumbnailName, videoData, insertedVideo) + return callback(null, insertedVideo) }) }, - function getThumbnailBase64 (torrent, thumbnailName, videoData, insertedVideo, callback) { - videoData.createdDate = insertedVideo.createdDate - - fs.readFile(thumbnailsDir + thumbnailName, function (err, thumbnailData) { + function sendToFriends (insertedVideo, callback) { + videos.convertVideoToRemote(insertedVideo, function (err, remoteVideo) { if (err) { // TODO unseed the video // TODO remove thumbnail - // TODO: remove video - logger.error('Cannot read the thumbnail of the video') + // TODO delete from DB + logger.error('Cannot convert video to remote.') return callback(err) } - return callback(null, videoData, thumbnailData) - }) - }, - - function sendToFriends (videoData, thumbnailData, callback) { - // Set the image in base64 - videoData.thumbnailBase64 = new Buffer(thumbnailData).toString('base64') + // Now we'll add the video's meta data to our friends + friends.addVideoToFriends(remoteVideo) - // Now we'll add the video's meta data to our friends - friends.addVideoToFriends(videoData) - - return callback(null) + return callback(null) + }) } ], function andFinally (err) { diff --git a/server/helpers/customValidators.js b/server/helpers/customValidators.js index 9c3ff38ef..a6cf680e5 100644 --- a/server/helpers/customValidators.js +++ b/server/helpers/customValidators.js @@ -7,8 +7,7 @@ const VIDEOS_CONSTRAINTS_FIELDS = constants.VIDEOS_CONSTRAINTS_FIELDS const customValidators = { exists: exists, - isEachAddRemoteVideosValid: isEachAddRemoteVideosValid, - isEachRemoveRemoteVideosValid: isEachRemoveRemoteVideosValid, + isEachRemoteVideosValid: isEachRemoteVideosValid, isArray: isArray, isVideoAuthorValid: isVideoAuthorValid, isVideoDateValid: isVideoDateValid, @@ -25,23 +24,26 @@ function exists (value) { return value !== undefined && value !== null } -function isEachAddRemoteVideosValid (videos) { - return videos.every(function (video) { - return isVideoAuthorValid(video.author) && - isVideoDateValid(video.createdDate) && - isVideoDescriptionValid(video.description) && - isVideoDurationValid(video.duration) && - isVideoMagnetUriValid(video.magnetUri) && - isVideoNameValid(video.name) && - isVideoPodUrlValid(video.podUrl) && - isVideoTagsValid(video.tags) && - isVideoThumbnailValid(video.thumbnailBase64) - }) -} - -function isEachRemoveRemoteVideosValid (videos) { - return videos.every(function (video) { - return isVideoMagnetUriValid(video.magnetUri) +function isEachRemoteVideosValid (requests) { + return requests.every(function (request) { + const video = request.data + return ( + isRequestTypeAddValid(request.type) && + isVideoAuthorValid(video.author) && + isVideoDateValid(video.createdDate) && + isVideoDescriptionValid(video.description) && + isVideoDurationValid(video.duration) && + isVideoMagnetUriValid(video.magnetUri) && + isVideoNameValid(video.name) && + isVideoPodUrlValid(video.podUrl) && + isVideoTagsValid(video.tags) && + isVideoThumbnailValid(video.thumbnailBase64) + ) || + ( + isRequestTypeRemoveValid(request.type) && + isVideoNameValid(video.name) && + isVideoMagnetUriValid(video.magnetUri) + ) }) } @@ -49,6 +51,14 @@ function isArray (value) { return Array.isArray(value) } +function isRequestTypeAddValid (value) { + return value === 'add' +} + +function isRequestTypeRemoveValid (value) { + return value === 'remove' +} + function isVideoAuthorValid (value) { return validator.isLength(value, VIDEOS_CONSTRAINTS_FIELDS.AUTHOR) } diff --git a/server/helpers/requests.js b/server/helpers/requests.js index 1e1bb4111..871342d60 100644 --- a/server/helpers/requests.js +++ b/server/helpers/requests.js @@ -1,12 +1,10 @@ 'use strict' -const async = require('async') const config = require('config') -const request = require('request') const replay = require('request-replay') +const request = require('request') const constants = require('../initializers/constants') -const logger = require('./logger') const peertubeCrypto = require('./peertubeCrypto') const http = config.get('webserver.https') ? 'https' : 'http' @@ -14,93 +12,67 @@ const host = config.get('webserver.host') const port = config.get('webserver.port') const requests = { - makeMultipleRetryRequest: makeMultipleRetryRequest + makeRetryRequest: makeRetryRequest, + makeSecureRequest: makeSecureRequest } -function makeMultipleRetryRequest (allData, pods, callbackEach, callback) { - if (!callback) { - callback = callbackEach - callbackEach = null - } +function makeRetryRequest (params, callback) { + replay( + request(params, callback), + { + retries: constants.RETRY_REQUESTS, + factor: 3, + maxTimeout: Infinity, + errorCodes: [ 'EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED' ] + } + ) +} - const url = http + '://' + host + ':' + port - let signature +function makeSecureRequest (params, callback) { + const myUrl = http + '://' + host + ':' + port - // Add signature if it is specified in the params - if (allData.method === 'POST' && allData.data && allData.sign === true) { - signature = peertubeCrypto.sign(url) + const requestParams = { + url: params.toPod.url + params.path } - // Make a request for each pod - async.each(pods, function (pod, callbackEachAsync) { - function callbackEachRetryRequest (err, response, body, url, pod) { - if (callbackEach !== null) { - callbackEach(err, response, body, url, pod, function () { - callbackEachAsync() - }) - } else { - callbackEachAsync() - } - } + // Add data with POST requst ? + if (params.method === 'POST') { + requestParams.json = {} - const params = { - url: pod.url + allData.path, - method: allData.method + // Add signature if it is specified in the params + if (params.sign === true) { + requestParams.json.signature = { + url: myUrl, + signature: peertubeCrypto.sign(myUrl) + } } - // Add data with POST requst ? - if (allData.method === 'POST' && allData.data) { - // Encrypt data ? - if (allData.encrypt === true) { - peertubeCrypto.encrypt(pod.publicKey, JSON.stringify(allData.data), function (err, encrypted) { + // If there are data informations + if (params.data) { + // Encrypt data + if (params.encrypt === true) { + peertubeCrypto.encrypt(params.toPod.publicKey, JSON.stringify(params.data), function (err, encrypted) { if (err) return callback(err) - params.json = { - data: encrypted.data, - key: encrypted.key - } + requestParams.json.data = encrypted.data + requestParams.json.key = encrypted.key - makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest) + request.post(requestParams, callback) }) } else { - params.json = { data: allData.data } - makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest) + // No encryption + requestParams.json.data = params.data + request.post(requestParams, callback) } } else { - makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest) + // No data + request.post(requestParams, callback) } - }, callback) + } else { + request.get(requestParams, callback) + } } // --------------------------------------------------------------------------- module.exports = requests - -// --------------------------------------------------------------------------- - -function makeRetryRequest (params, fromUrl, toPod, signature, callbackEach) { - // Append the signature - if (signature) { - params.json.signature = { - url: fromUrl, - signature: signature - } - } - - logger.debug('Make retry requests to %s.', toPod.url) - - replay( - request.post(params, function (err, response, body) { - callbackEach(err, response, body, params.url, toPod) - }), - { - retries: constants.REQUEST_RETRIES, - factor: 3, - maxTimeout: Infinity, - errorCodes: [ 'EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED' ] - } - ).on('replay', function (replay) { - logger.info('Replaying request to %s. Request failed: %d %s. Replay number: #%d. Will retry in: %d ms.', - params.url, replay.error.code, replay.error.message, replay.number, replay.delay) - }) -} diff --git a/server/initializers/constants.js b/server/initializers/constants.js index 22cbb1361..caeb340cf 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js @@ -18,11 +18,11 @@ const PODS_SCORE = { BONUS: 10 } -// Number of retries we make for the make retry requests (to friends...) -let REQUEST_RETRIES = 10 +// Number of requests in parallel we can make +const REQUESTS_IN_PARALLEL = 10 -// Different types or requests for the request scheduler module -const REQUEST_SCHEDULER_TYPE = [ 'add', 'remove' ] +// Number of requests to retry for replay requests module +const RETRY_REQUESTS = 5 // Sortable columns per schema const SEARCHABLE_COLUMNS = { @@ -56,7 +56,6 @@ if (isTestInstance() === true) { FRIEND_BASE_SCORE = 20 INTERVAL = 10000 VIDEOS_CONSTRAINTS_FIELDS.DURATION.max = 14 - REQUEST_RETRIES = 2 } // --------------------------------------------------------------------------- @@ -67,8 +66,8 @@ module.exports = { INTERVAL: INTERVAL, PAGINATION_COUNT_DEFAULT: PAGINATION_COUNT_DEFAULT, PODS_SCORE: PODS_SCORE, - REQUEST_RETRIES: REQUEST_RETRIES, - REQUEST_SCHEDULER_TYPE: REQUEST_SCHEDULER_TYPE, + REQUESTS_IN_PARALLEL: REQUESTS_IN_PARALLEL, + RETRY_REQUESTS: RETRY_REQUESTS, SEARCHABLE_COLUMNS: SEARCHABLE_COLUMNS, SORTABLE_COLUMNS: SORTABLE_COLUMNS, THUMBNAILS_SIZE: THUMBNAILS_SIZE, diff --git a/server/lib/friends.js b/server/lib/friends.js index e986fa006..d81a603ad 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -24,15 +24,15 @@ const pods = { getMyCertificate: getMyCertificate, makeFriends: makeFriends, quitFriends: quitFriends, - removeVideoToFriends: removeVideoToFriends + removeVideoToFriends: removeVideoToFriends, + sendOwnedVideosToPod: sendOwnedVideosToPod } function addVideoToFriends (video) { - // To avoid duplicates - const id = video.name + video.magnetUri // ensure namePath is null video.namePath = null - requestsScheduler.addRequest(id, 'add', video) + + requestsScheduler.addRequest('add', video) } function hasFriends (callback) { @@ -60,7 +60,7 @@ function makeFriends (callback) { const urls = config.get('network.friends') - async.each(urls, function (url, callbackEach) { + async.eachSeries(urls, function (url, callbackEach) { computeForeignPodsList(url, podsScore, callbackEach) }, function (err) { if (err) return callback(err) @@ -78,7 +78,7 @@ function quitFriends (callback) { // Stop pool requests requestsScheduler.deactivate() // Flush pool requests - requestsScheduler.forceSend() + requestsScheduler.flush() async.waterfall([ function getPodsList (callbackAsync) { @@ -86,19 +86,25 @@ function quitFriends (callback) { }, function announceIQuitMyFriends (pods, callbackAsync) { - const request = { + const requestParams = { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/remove', - sign: true, - encrypt: true, - data: { - url: 'me' // Fake data - } + sign: true } // Announce we quit them - requests.makeMultipleRetryRequest(request, pods, function (err) { - return callbackAsync(err) + // We don't care if the request fails + // The other pod will exclude us automatically after a while + async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + requestParams.toPod = pod + requests.makeSecureRequest(requestParams, callbackEach) + }, function (err) { + if (err) { + logger.error('Some errors while quitting friends.', { err: err }) + // Don't stop the process + } + + return callbackAsync() }) }, @@ -136,9 +142,28 @@ function quitFriends (callback) { } function removeVideoToFriends (video) { - // To avoid duplicates - const id = video.name + video.magnetUri - requestsScheduler.addRequest(id, 'remove', video) + requestsScheduler.addRequest('remove', video) +} + +function sendOwnedVideosToPod (podId) { + Videos.listOwned(function (err, videosList) { + if (err) { + logger.error('Cannot get the list of videos we own.') + return + } + + videosList.forEach(function (video) { + videos.convertVideoToRemote(video, function (err, remoteVideo) { + if (err) { + logger.error('Cannot convert video to remote.', { error: err }) + // Don't break the process + return + } + + requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo) + }) + }) + }) } // --------------------------------------------------------------------------- @@ -148,18 +173,19 @@ module.exports = pods // --------------------------------------------------------------------------- function computeForeignPodsList (url, podsScore, callback) { - // Let's give 1 point to the pod we ask the friends list - podsScore[url] = 1 - getForeignPodsList(url, function (err, foreignPodsList) { if (err) return callback(err) - if (foreignPodsList.length === 0) return callback() + + if (!foreignPodsList) foreignPodsList = [] + + // Let's give 1 point to the pod we ask the friends list + foreignPodsList.push({ url: url }) foreignPodsList.forEach(function (foreignPod) { - const foreignUrl = foreignPod.url + const foreignPodUrl = foreignPod.url - if (podsScore[foreignUrl]) podsScore[foreignUrl]++ - else podsScore[foreignUrl] = 1 + if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++ + else podsScore[foreignPodUrl] = 1 }) callback() @@ -194,63 +220,43 @@ function makeRequestsToWinningPods (cert, podsList, callback) { // Flush pool requests requestsScheduler.forceSend() - // Get the list of our videos to send to our new friends - Videos.listOwned(function (err, videosList) { - if (err) { - logger.error('Cannot get the list of videos we own.') - return callback(err) - } - - const data = { - url: http + '://' + host + ':' + port, - publicKey: cert, - videos: videosList + async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + const params = { + url: pod.url + '/api/' + constants.API_VERSION + '/pods/', + method: 'POST', + json: { + url: http + '://' + host + ':' + port, + publicKey: cert + } } - requests.makeMultipleRetryRequest( - { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, - - podsList, - - // Callback called after each request - function eachRequest (err, response, body, url, pod, callbackEachRequest) { - // We add the pod if it responded correctly with its public certificate - if (!err && response.statusCode === 200) { - Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { - if (err) { - logger.error('Error with adding %s pod.', pod.url, { error: err }) - return callbackEachRequest() - } - - videos.createRemoteVideos(body.videos, function (err) { - if (err) { - logger.error('Error with adding videos of pod.', pod.url, { error: err }) - return callbackEachRequest() - } - - logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) - return callbackEachRequest() - }) - }) - } else { - logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) - return callbackEachRequest() - } - }, + requests.makeRetryRequest(params, function (err, res, body) { + if (err) { + logger.error('Error with adding %s pod.', pod.url, { error: err }) + // Don't break the process + return callbackEach() + } - // Final callback, we've ended all the requests - function endRequests (err) { - // Now we made new friends, we can re activate the pool of requests - requestsScheduler.activate() + if (res.statusCode === 200) { + Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) { + if (err) logger.error('Cannot add friend %s pod.', pod.url) - if (err) { - logger.error('There was some errors when we wanted to make friends.') - return callback(err) - } + // Add our videos to the request scheduler + sendOwnedVideosToPod(podCreated._id) - logger.debug('makeRequestsToWinningPods finished.') - return callback(null) + return callbackEach() + }) + } else { + logger.error('Status not 200 for %s pod.', pod.url) + return callbackEach() } - ) + }) + }, function endRequests () { + // Final callback, we've ended all the requests + // Now we made new friends, we can re activate the pool of requests + requestsScheduler.activate() + + logger.debug('makeRequestsToWinningPods finished.') + return callback() }) } diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js index 78570209d..ac75e5b93 100644 --- a/server/lib/requestsScheduler.js +++ b/server/lib/requestsScheduler.js @@ -11,13 +11,14 @@ const requests = require('../helpers/requests') const videos = require('../lib/videos') const Videos = require('../models/videos') -const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE let timer = null const requestsScheduler = { activate: activate, addRequest: addRequest, + addRequestTo: addRequestTo, deactivate: deactivate, + flush: flush, forceSend: forceSend } @@ -27,35 +28,37 @@ function activate () { } // Add request to the scheduler -function addRequest (id, type, request) { - logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) +function addRequest (type, data) { + logger.debug('Add request of type %s to the requests scheduler.', type, { data: data }) - Requests.findById(id, function (err, entity) { + const request = { + type: type, + data: data + } + + Pods.listAllIds(function (err, podIds) { if (err) { - logger.error('Error when trying to find a request.', { error: err }) - return // Abort + logger.debug('Cannot list pod ids.') + return } - // If there were already a request with this id in the scheduler... - if (entity) { - if (entity.type === type) { - logger.error('Cannot insert two same requests.') - return // Abort - } + // No friends + if (!podIds) return - // Remove the request of the other type - Requests.removeRequestById(id, function (err) { - if (err) { - logger.error('Cannot remove a request.', { error: err }) - return // Abort - } - }) - } else { - Requests.create(id, type, request, function (err) { - if (err) logger.error('Cannot create a request.', { error: err }) - return // Abort - }) - } + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) + }) + }) +} + +function addRequestTo (podIds, type, data) { + const request = { + type: type, + data: data + } + + Requests.create(request, podIds, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) }) } @@ -64,6 +67,14 @@ function deactivate () { clearInterval(timer) } +function flush () { + Requests.removeAll(function (err) { + if (err) { + logger.error('Cannot flush the requests.', { error: err }) + } + }) +} + function forceSend () { logger.info('Force requests scheduler sending.') makeRequests() @@ -76,54 +87,28 @@ module.exports = requestsScheduler // --------------------------------------------------------------------------- // Make a requests to friends of a certain type -function makeRequest (type, requestsToMake, callback) { +function makeRequest (toPod, requestsToMake, callback) { if (!callback) callback = function () {} - Pods.list(function (err, pods) { - if (err) return callback(err) - - const params = { - encrypt: true, // Security - sign: true, // To prove our identity - method: 'POST', - path: null, // We build the path later - data: requestsToMake // Requests we need to make - } - - // If this is a valid type, we build the path - if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type - } else { - return callback(new Error('Unkown pool request type.')) - } - - const badPods = [] - const goodPods = [] - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) { - // We failed the request, add the pod unreachable to the bad pods list - if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) { - badPods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - // Request success - goodPods.push(pod._id) - } - - return callbackEachPodFinished() + const params = { + toPod: toPod, + encrypt: true, // Security + sign: true, // To prove our identity + method: 'POST', + path: '/api/' + constants.API_VERSION + '/remote/videos', + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + requests.makeSecureRequest(params, function (err, res) { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') }) + + return callback(false) } - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - // All the requests were made, we update the pods score - updatePodsScore(goodPods, badPods) - callback(null) - } + return callback(true) }) } @@ -143,38 +128,65 @@ function makeRequests () { logger.info('Making requests to friends.') + // Requests by pods id const requestsToMake = {} - for (const type of REQUEST_SCHEDULER_TYPE) { - requestsToMake[type] = { - ids: [], - requests: [] - } - } - // For each requests to make, we add it to the correct request type requests.forEach(function (poolRequest) { - if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) { - const requestTypeToMake = requestsToMake[poolRequest.type] - requestTypeToMake.requests.push(poolRequest.request) - requestTypeToMake.ids.push(poolRequest._id) - } else { - logger.error('Unkown request type.', { request_type: poolRequest.type }) - return // abort - } + poolRequest.to.forEach(function (toPodId) { + if (!requestsToMake[toPodId]) { + requestsToMake[toPodId] = { + ids: [], + datas: [] + } + } + + requestsToMake[toPodId].ids.push(poolRequest._id) + requestsToMake[toPodId].datas.push(poolRequest.request) + }) }) - for (let type of Object.keys(requestsToMake)) { - const requestTypeToMake = requestsToMake[type] - // If there are requests for this type - if (requestTypeToMake.requests.length !== 0) { - makeRequest(type, requestTypeToMake.requests, function (err) { - if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err }) + const goodPods = [] + const badPods = [] - // We made the requests, so we can remove them from the scheduler - Requests.removeRequests(requestTypeToMake.ids) + async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { + const requestToMake = requestsToMake[toPodId] + + // FIXME: mongodb request inside a loop :/ + Pods.findById(toPodId, function (err, toPod) { + if (err) return logger.error('Error finding pod by id.', { err: err }) + + // Maybe the pod is not our friend anymore so simply remove them + if (!toPod) { + Requests.removePodOf(requestToMake.ids, toPodId) + return callbackEach() + } + + makeRequest(toPod, requestToMake.datas, function (success) { + if (err) { + logger.error('Errors when sent request to %s.', toPod.url, { error: err }) + // Do not stop the process just for one error + return callbackEach() + } + + if (success === true) { + logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + + // Remove the pod id of these request ids + Requests.removePodOf(requestToMake.ids, toPodId) + goodPods.push(toPodId) + } else { + badPods.push(toPodId) + } + + callbackEach() }) - } - } + }) + }, function () { + // All the requests were made, we update the pods score + updatePodsScore(goodPods, badPods) + // Flush requests with no pod + Requests.removeWithEmptyTo() + }) }) } diff --git a/server/lib/videos.js b/server/lib/videos.js index e0db0e1d5..a74c77dc4 100644 --- a/server/lib/videos.js +++ b/server/lib/videos.js @@ -17,6 +17,7 @@ const uploadDir = pathUtils.join(__dirname, '..', '..', config.get('storage.uplo const thumbnailsDir = pathUtils.join(__dirname, '..', '..', config.get('storage.thumbnails')) const videos = { + convertVideoToRemote: convertVideoToRemote, createRemoteVideos: createRemoteVideos, getVideoDuration: getVideoDuration, getVideoState: getVideoState, @@ -27,6 +28,29 @@ const videos = { seedAllExisting: seedAllExisting } +function convertVideoToRemote (video, callback) { + fs.readFile(thumbnailsDir + video.thumbnail, function (err, thumbnailData) { + if (err) { + logger.error('Cannot read the thumbnail of the video') + return callback(err) + } + + const remoteVideo = { + name: video.name, + description: video.description, + magnetUri: video.magnetUri, + author: video.author, + duration: video.duration, + thumbnailBase64: new Buffer(thumbnailData).toString('base64'), + tags: video.tags, + createdDate: video.createdDate, + podUrl: video.podUrl + } + + return callback(null, remoteVideo) + }) +} + function createRemoteVideos (videos, callback) { // Create the remote videos from the new pod createRemoteVideoObjects(videos, function (err, remoteVideos) { @@ -154,7 +178,8 @@ function createRemoteVideoObjects (videos, callback) { podUrl: video.podUrl, duration: video.duration, thumbnail: thumbnailName, - tags: video.tags + tags: video.tags, + author: video.author } remoteVideos.push(params) diff --git a/server/middlewares/reqValidators/pods.js b/server/middlewares/reqValidators/pods.js index 77449480c..78a4b76c1 100644 --- a/server/middlewares/reqValidators/pods.js +++ b/server/middlewares/reqValidators/pods.js @@ -26,8 +26,10 @@ function makeFriends (req, res, next) { } function podsAdd (req, res, next) { - req.checkBody('data.url', 'Should have an url').notEmpty().isURL({ require_protocol: true }) - req.checkBody('data.publicKey', 'Should have a public key').notEmpty() + req.checkBody('url', 'Should have an url').notEmpty().isURL({ require_protocol: true }) + req.checkBody('publicKey', 'Should have a public key').notEmpty() + + // TODO: check we don't have it already logger.debug('Checking podsAdd parameters', { parameters: req.body }) diff --git a/server/middlewares/reqValidators/remote.js b/server/middlewares/reqValidators/remote.js index b5f3118b0..a23673d89 100644 --- a/server/middlewares/reqValidators/remote.js +++ b/server/middlewares/reqValidators/remote.js @@ -4,36 +4,34 @@ const checkErrors = require('./utils').checkErrors const logger = require('../../helpers/logger') const reqValidatorsRemote = { - remoteVideosAdd: remoteVideosAdd, - remoteVideosRemove: remoteVideosRemove, - secureRequest: secureRequest + dataToDecrypt: dataToDecrypt, + remoteVideos: remoteVideos, + signature: signature } -function remoteVideosAdd (req, res, next) { - req.checkBody('data').isArray() - req.checkBody('data').isEachAddRemoteVideosValid() +function dataToDecrypt (req, res, next) { + req.checkBody('key', 'Should have a key').notEmpty() + req.checkBody('data', 'Should have data').notEmpty() - logger.debug('Checking remoteVideosAdd parameters', { parameters: req.body }) + logger.debug('Checking dataToDecrypt parameters', { parameters: { keyLength: req.body.key.length, bodyLength: req.body.data.length } }) checkErrors(req, res, next) } -function remoteVideosRemove (req, res, next) { +function remoteVideos (req, res, next) { req.checkBody('data').isArray() - req.checkBody('data').isEachRemoveRemoteVideosValid() + req.checkBody('data').isEachRemoteVideosValid() - logger.debug('Checking remoteVideosRemove parameters', { parameters: req.body }) + logger.debug('Checking remoteVideosAdd parameters', { parameters: req.body }) checkErrors(req, res, next) } -function secureRequest (req, res, next) { +function signature (req, res, next) { req.checkBody('signature.url', 'Should have a signature url').isURL() req.checkBody('signature.signature', 'Should have a signature').notEmpty() - req.checkBody('key', 'Should have a key').notEmpty() - req.checkBody('data', 'Should have data').notEmpty() - logger.debug('Checking secureRequest parameters', { parameters: { data: req.body.data, keyLength: req.body.key.length } }) + logger.debug('Checking signature parameters', { parameters: { signatureUrl: req.body.signature.url } }) checkErrors(req, res, next) } diff --git a/server/models/pods.js b/server/models/pods.js index 04cc2d6fc..daeadeb07 100644 --- a/server/models/pods.js +++ b/server/models/pods.js @@ -19,10 +19,13 @@ const PodsDB = mongoose.model('pods', podsSchema) const Pods = { add: add, count: count, + findById: findById, findByUrl: findByUrl, findBadPods: findBadPods, incrementScores: incrementScores, list: list, + listAllIds: listAllIds, + listAllUrls: listAllUrls, remove: remove, removeAll: removeAll, removeAllByIds: removeAllByIds @@ -48,6 +51,10 @@ function findBadPods (callback) { PodsDB.find({ score: 0 }, callback) } +function findById (id, callback) { + PodsDB.findById(id, callback) +} + function findByUrl (url, callback) { PodsDB.findOne({ url: url }, callback) } @@ -68,6 +75,14 @@ function list (callback) { }) } +function listAllIds (callback) { + return PodsDB.find({}, { _id: 1 }, callback) +} + +function listAllUrls (callback) { + return PodsDB.find({}, { _id: 0, url: 1 }, callback) +} + function remove (url, callback) { if (!callback) callback = function () {} PodsDB.remove({ url: url }, callback) diff --git a/server/models/requests.js b/server/models/requests.js index 2152ae0e9..e67ccad56 100644 --- a/server/models/requests.js +++ b/server/models/requests.js @@ -7,9 +7,8 @@ const logger = require('../helpers/logger') // --------------------------------------------------------------------------- const requestsSchema = mongoose.Schema({ - type: String, - id: String, // Special id to find duplicates (video created we want to remove...) - request: mongoose.Schema.Types.Mixed + request: mongoose.Schema.Types.Mixed, + to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ] }) const RequestsDB = mongoose.model('requests', requestsSchema) @@ -19,12 +18,15 @@ const Requests = { create: create, findById: findById, list: list, + removeAll: removeAll, + removePodOf: removePodOf, removeRequestById: removeRequestById, - removeRequests: removeRequests + removeRequests: removeRequests, + removeWithEmptyTo: removeWithEmptyTo } -function create (id, type, request, callback) { - RequestsDB.create({ id: id, type: type, request: request }, callback) +function create (request, to, callback) { + RequestsDB.create({ request: request, to: to }, callback) } function findById (id, callback) { @@ -32,7 +34,17 @@ function findById (id, callback) { } function list (callback) { - RequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback) + RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback) +} + +function removeAll (callback) { + RequestsDB.remove({ }, callback) +} + +function removePodOf (requestsIds, podId, callback) { + if (!callback) callback = function () {} + + RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback) } function removeRequestById (id, callback) { @@ -50,6 +62,12 @@ function removeRequests (ids) { }) } +function removeWithEmptyTo (callback) { + if (!callback) callback = function () {} + + RequestsDB.remove({ to: { $size: 0 } }, callback) +} + // --------------------------------------------------------------------------- module.exports = Requests diff --git a/server/tests/api/checkParams.js b/server/tests/api/checkParams.js index 95a7738f8..7f22a37cc 100644 --- a/server/tests/api/checkParams.js +++ b/server/tests/api/checkParams.js @@ -90,33 +90,27 @@ describe('Test parameters validator', function () { it('Should fail without public key', function (done) { const data = { - data: { - url: 'http://coucou.com' - } + url: 'http://coucou.com' } makePostBodyRequest(path, data, done) }) it('Should fail without an url', function (done) { const data = { - data: { - publicKey: 'mysuperpublickey' - } + publicKey: 'mysuperpublickey' } makePostBodyRequest(path, data, done) }) it('Should fail with an incorrect url', function (done) { const data = { - data: { - url: 'coucou.com', - publicKey: 'mysuperpublickey' - } + url: 'coucou.com', + publicKey: 'mysuperpublickey' } makePostBodyRequest(path, data, function () { - data.data.url = 'http://coucou' + data.url = 'http://coucou' makePostBodyRequest(path, data, function () { - data.data.url = 'coucou' + data.url = 'coucou' makePostBodyRequest(path, data, done) }) }) @@ -124,10 +118,8 @@ describe('Test parameters validator', function () { it('Should succeed with the correct parameters', function (done) { const data = { - data: { - url: 'http://coucou.com', - publicKey: 'mysuperpublickey' - } + url: 'http://coucou.com', + publicKey: 'mysuperpublickey' } makePostBodyRequest(path, data, done, false) }) diff --git a/server/tests/api/friendsAdvanced.js b/server/tests/api/friendsAdvanced.js index 86620254e..b082270ff 100644 --- a/server/tests/api/friendsAdvanced.js +++ b/server/tests/api/friendsAdvanced.js @@ -130,6 +130,18 @@ describe('Test advanced friends', function () { function (next) { makeFriends(4, next) }, + // Check the pods 1, 2, 3 and 4 are friends + function (next) { + async.each([ 1, 2, 3, 4 ], function (i, callback) { + getFriendsList(i, function (err, res) { + if (err) throw err + + expect(res.body.length).to.equal(3) + + callback() + }) + }, next) + }, // Kill pod 4 function (next) { servers[3].app.kill() @@ -152,7 +164,7 @@ describe('Test advanced friends', function () { uploadVideo(2, next) }, function (next) { - setTimeout(next, 20000) + setTimeout(next, 11000) }, // Rerun server 4 function (next) { @@ -173,6 +185,9 @@ describe('Test advanced friends', function () { // Pod 6 ask pod 1, 2 and 3 function (next) { makeFriends(6, next) + }, + function (next) { + setTimeout(next, 11000) }], function (err) { if (err) throw err @@ -247,7 +262,7 @@ describe('Test advanced friends', function () { done() }) - }, 5000) + }, 11000) }) }) diff --git a/server/tests/api/friendsBasic.js b/server/tests/api/friendsBasic.js index 68817e852..5b738ad39 100644 --- a/server/tests/api/friendsBasic.js +++ b/server/tests/api/friendsBasic.js @@ -25,9 +25,10 @@ describe('Test basic friends', function () { if (err) throw err const result = res.body - const resultUrls = [ result[0].url, result[1].url ] expect(result).to.be.an('array') expect(result.length).to.equal(2) + + const resultUrls = [ result[0].url, result[1].url ] expect(resultUrls[0]).to.not.equal(resultUrls[1]) const errorString = 'Friends url do not correspond for ' + serverToTest.url diff --git a/server/tests/api/multiplePods.js b/server/tests/api/multiplePods.js index 40326c260..2a1bc64e6 100644 --- a/server/tests/api/multiplePods.js +++ b/server/tests/api/multiplePods.js @@ -105,6 +105,7 @@ describe('Test multiple pods', function () { expect(video.duration).to.equal(10) expect(video.tags).to.deep.equal([ 'tag1p1', 'tag2p1' ]) expect(utils.dateIsValid(video.createdDate)).to.be.true + expect(video.author).to.equal('root') if (server.url !== 'http://localhost:9001') { expect(video.isLocal).to.be.false @@ -166,6 +167,7 @@ describe('Test multiple pods', function () { expect(video.duration).to.equal(5) expect(video.tags).to.deep.equal([ 'tag1p2', 'tag2p2', 'tag3p2' ]) expect(utils.dateIsValid(video.createdDate)).to.be.true + expect(video.author).to.equal('root') if (server.url !== 'http://localhost:9002') { expect(video.isLocal).to.be.false @@ -243,6 +245,7 @@ describe('Test multiple pods', function () { expect(video1.magnetUri).to.exist expect(video1.duration).to.equal(5) expect(video1.tags).to.deep.equal([ 'tag1p3' ]) + expect(video1.author).to.equal('root') expect(utils.dateIsValid(video1.createdDate)).to.be.true expect(video2.name).to.equal('my super name for pod 3-2') @@ -251,6 +254,7 @@ describe('Test multiple pods', function () { expect(video2.magnetUri).to.exist expect(video2.duration).to.equal(5) expect(video2.tags).to.deep.equal([ 'tag2p3', 'tag3p3', 'tag4p3' ]) + expect(video2.author).to.equal('root') expect(utils.dateIsValid(video2.createdDate)).to.be.true if (server.url !== 'http://localhost:9003') {