import 'rxjs/Subject';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
-import 'rxjs/add/operator/mergeMap';
+import 'rxjs/add/observable/throw';
import 'bootstrap-loader';
import 'ng2-file-upload';
const router = express.Router()
const podsController = require('./pods')
-const remoteVideosController = require('./remoteVideos')
+const remoteController = require('./remote')
const usersController = require('./users')
const videosController = require('./videos')
router.use('/pods', podsController)
-router.use('/remotevideos', remoteVideosController)
+router.use('/remote', remoteController)
router.use('/users', usersController)
router.use('/videos', videosController)
router.use('/*', badRequest)
const Pods = require('../../../models/pods')
const oAuth2 = middlewares.oauth2
const reqValidator = middlewares.reqValidators.pods
-const secureMiddleware = middlewares.secure
-const secureRequest = middlewares.reqValidators.remote.secureRequest
+const signatureValidator = middlewares.reqValidators.remote.signature
const videos = require('../../../lib/videos')
const Videos = require('../../../models/videos')
const router = express.Router()
-router.get('/', listPods)
+router.get('/', listPodsUrl)
router.post('/', reqValidator.podsAdd, addPods)
router.get('/makefriends', oAuth2.authenticate, reqValidator.makeFriends, makeFriends)
router.get('/quitfriends', oAuth2.authenticate, quitFriends)
// Post because this is a secured request
-router.post('/remove', secureRequest, secureMiddleware.decryptBody, removePods)
+router.post('/remove', signatureValidator, removePods)
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
function addPods (req, res, next) {
- const informations = req.body.data
+ const informations = req.body
async.waterfall([
function addPod (callback) {
- Pods.add(informations, function (err) {
- return callback(err)
- })
+ Pods.add(informations, callback)
},
- function createVideosOfThisPod (callback) {
- // Create the remote videos from the new pod
- videos.createRemoteVideos(informations.videos, function (err) {
- if (err) logger.error('Cannot create remote videos.', { error: err })
+ function sendMyVideos (podCreated, callback) {
+ friends.sendOwnedVideosToPod(podCreated._id)
- return callback(err)
- })
+ callback(null)
},
function fetchMyCertificate (callback) {
return callback(null, cert)
})
- },
-
- function getListOfMyVideos (cert, callback) {
- Videos.listOwned(function (err, videosList) {
- if (err) {
- logger.error('Cannot get the list of owned videos.')
- return callback(err)
- }
-
- return callback(null, cert, videosList)
- })
}
- ], function (err, cert, videosList) {
+ ], function (err, cert) {
if (err) return next(err)
- return res.json({ cert: cert, videos: videosList })
+ return res.json({ cert: cert })
})
}
-function listPods (req, res, next) {
- Pods.list(function (err, podsList) {
+function listPodsUrl (req, res, next) {
+ Pods.listAllUrls(function (err, podsUrlList) {
if (err) return next(err)
- res.json(podsList)
+ res.json(podsUrlList)
})
}
--- /dev/null
+'use strict'
+
+const async = require('async')
+const express = require('express')
+
+const middlewares = require('../../../middlewares')
+const secureMiddleware = middlewares.secure
+const reqValidator = middlewares.reqValidators.remote
+const logger = require('../../../helpers/logger')
+const Videos = require('../../../models/videos')
+const videos = require('../../../lib/videos')
+
+const router = express.Router()
+
+router.post('/videos',
+ reqValidator.signature,
+ reqValidator.dataToDecrypt,
+ secureMiddleware.decryptBody,
+ reqValidator.remoteVideos,
+ remoteVideos
+)
+
+// ---------------------------------------------------------------------------
+
+module.exports = router
+
+// ---------------------------------------------------------------------------
+
+function remoteVideos (req, res, next) {
+ const requests = req.body.data
+ const fromUrl = req.body.signature.url
+
+ // We need to process in the same order to keep consistency
+ // TODO: optimization
+ async.eachSeries(requests, function (request, callbackEach) {
+ const video = request.data
+
+ if (request.type === 'add') {
+ addRemoteVideo(video, callbackEach)
+ } else if (request.type === 'remove') {
+ removeRemoteVideo(video, fromUrl, callbackEach)
+ }
+ })
+
+ // We don't need to keep the other pod waiting
+ return res.type('json').status(204).end()
+}
+
+function addRemoteVideo (videoToCreate, callback) {
+ videos.createRemoteVideos([ videoToCreate ], function (err, remoteVideos) {
+ if (err) {
+ logger.error('Cannot create remote videos.', { error: err })
+ // Don't break the process
+ }
+
+ return callback()
+ })
+}
+
+function removeRemoteVideo (videoToRemove, fromUrl, callback) {
+ const magnetUris = [ videoToRemove.magnetUri ]
+
+ // We need the list because we have to remove some other stuffs (thumbnail etc)
+ Videos.listFromUrlAndMagnets(fromUrl, magnetUris, function (err, videosList) {
+ if (err) {
+ logger.error('Cannot list videos from url and magnets.', { error: err })
+ // Don't break the process
+ return callback()
+ }
+
+ videos.removeRemoteVideos(videosList, function (err) {
+ if (err) {
+ logger.error('Cannot remove remote videos.', { error: err })
+ // Don't break the process
+ }
+
+ return callback()
+ })
+ })
+}
+++ /dev/null
-'use strict'
-
-const express = require('express')
-const map = require('lodash/map')
-
-const middlewares = require('../../../middlewares')
-const secureMiddleware = middlewares.secure
-const reqValidator = middlewares.reqValidators.remote
-const logger = require('../../../helpers/logger')
-const Videos = require('../../../models/videos')
-const videos = require('../../../lib/videos')
-
-const router = express.Router()
-
-router.post('/add',
- reqValidator.secureRequest,
- secureMiddleware.decryptBody,
- reqValidator.remoteVideosAdd,
- addRemoteVideos
-)
-
-router.post('/remove',
- reqValidator.secureRequest,
- secureMiddleware.decryptBody,
- reqValidator.remoteVideosRemove,
- removeRemoteVideo
-)
-
-// ---------------------------------------------------------------------------
-
-module.exports = router
-
-// ---------------------------------------------------------------------------
-
-function addRemoteVideos (req, res, next) {
- const videosToCreate = req.body.data
- videos.createRemoteVideos(videosToCreate, function (err, remoteVideos) {
- if (err) {
- logger.error('Cannot create remote videos.', { error: err })
- return next(err)
- }
-
- res.type('json').status(201).end()
- })
-}
-
-function removeRemoteVideo (req, res, next) {
- const fromUrl = req.body.signature.url
- const magnetUris = map(req.body.data, 'magnetUri')
-
- Videos.listFromUrlAndMagnets(fromUrl, magnetUris, function (err, videosList) {
- if (err) {
- logger.error('Cannot list videos from url and magnets.', { error: err })
- return next(err)
- }
-
- videos.removeRemoteVideos(videosList, function (err) {
- if (err) {
- logger.error('Cannot remove remote videos.', { error: err })
- return next(err)
- }
-
- res.type('json').status(204).end()
- })
- })
-}
const async = require('async')
const config = require('config')
const express = require('express')
-const fs = require('fs')
-const path = require('path')
const multer = require('multer')
const constants = require('../../../initializers/constants')
})
const reqFiles = multer({ storage: storage }).fields([{ name: 'videofile', maxCount: 1 }])
-const thumbnailsDir = path.join(__dirname, '..', '..', '..', '..', config.get('storage.thumbnails'))
router.get('/',
reqValidatorPagination.pagination,
return callback(err)
}
- return callback(null, torrent, thumbnailName, videoData, insertedVideo)
+ return callback(null, insertedVideo)
})
},
- function getThumbnailBase64 (torrent, thumbnailName, videoData, insertedVideo, callback) {
- videoData.createdDate = insertedVideo.createdDate
-
- fs.readFile(thumbnailsDir + thumbnailName, function (err, thumbnailData) {
+ function sendToFriends (insertedVideo, callback) {
+ videos.convertVideoToRemote(insertedVideo, function (err, remoteVideo) {
if (err) {
// TODO unseed the video
// TODO remove thumbnail
- // TODO: remove video
- logger.error('Cannot read the thumbnail of the video')
+ // TODO delete from DB
+ logger.error('Cannot convert video to remote.')
return callback(err)
}
- return callback(null, videoData, thumbnailData)
- })
- },
-
- function sendToFriends (videoData, thumbnailData, callback) {
- // Set the image in base64
- videoData.thumbnailBase64 = new Buffer(thumbnailData).toString('base64')
+ // Now we'll add the video's meta data to our friends
+ friends.addVideoToFriends(remoteVideo)
- // Now we'll add the video's meta data to our friends
- friends.addVideoToFriends(videoData)
-
- return callback(null)
+ return callback(null)
+ })
}
], function andFinally (err) {
const customValidators = {
exists: exists,
- isEachAddRemoteVideosValid: isEachAddRemoteVideosValid,
- isEachRemoveRemoteVideosValid: isEachRemoveRemoteVideosValid,
+ isEachRemoteVideosValid: isEachRemoteVideosValid,
isArray: isArray,
isVideoAuthorValid: isVideoAuthorValid,
isVideoDateValid: isVideoDateValid,
return value !== undefined && value !== null
}
-function isEachAddRemoteVideosValid (videos) {
- return videos.every(function (video) {
- return isVideoAuthorValid(video.author) &&
- isVideoDateValid(video.createdDate) &&
- isVideoDescriptionValid(video.description) &&
- isVideoDurationValid(video.duration) &&
- isVideoMagnetUriValid(video.magnetUri) &&
- isVideoNameValid(video.name) &&
- isVideoPodUrlValid(video.podUrl) &&
- isVideoTagsValid(video.tags) &&
- isVideoThumbnailValid(video.thumbnailBase64)
- })
-}
-
-function isEachRemoveRemoteVideosValid (videos) {
- return videos.every(function (video) {
- return isVideoMagnetUriValid(video.magnetUri)
+function isEachRemoteVideosValid (requests) {
+ return requests.every(function (request) {
+ const video = request.data
+ return (
+ isRequestTypeAddValid(request.type) &&
+ isVideoAuthorValid(video.author) &&
+ isVideoDateValid(video.createdDate) &&
+ isVideoDescriptionValid(video.description) &&
+ isVideoDurationValid(video.duration) &&
+ isVideoMagnetUriValid(video.magnetUri) &&
+ isVideoNameValid(video.name) &&
+ isVideoPodUrlValid(video.podUrl) &&
+ isVideoTagsValid(video.tags) &&
+ isVideoThumbnailValid(video.thumbnailBase64)
+ ) ||
+ (
+ isRequestTypeRemoveValid(request.type) &&
+ isVideoNameValid(video.name) &&
+ isVideoMagnetUriValid(video.magnetUri)
+ )
})
}
return Array.isArray(value)
}
+function isRequestTypeAddValid (value) {
+ return value === 'add'
+}
+
+function isRequestTypeRemoveValid (value) {
+ return value === 'remove'
+}
+
function isVideoAuthorValid (value) {
return validator.isLength(value, VIDEOS_CONSTRAINTS_FIELDS.AUTHOR)
}
'use strict'
-const async = require('async')
const config = require('config')
-const request = require('request')
const replay = require('request-replay')
+const request = require('request')
const constants = require('../initializers/constants')
-const logger = require('./logger')
const peertubeCrypto = require('./peertubeCrypto')
const http = config.get('webserver.https') ? 'https' : 'http'
const port = config.get('webserver.port')
const requests = {
- makeMultipleRetryRequest: makeMultipleRetryRequest
+ makeRetryRequest: makeRetryRequest,
+ makeSecureRequest: makeSecureRequest
}
-function makeMultipleRetryRequest (allData, pods, callbackEach, callback) {
- if (!callback) {
- callback = callbackEach
- callbackEach = null
- }
+function makeRetryRequest (params, callback) {
+ replay(
+ request(params, callback),
+ {
+ retries: constants.RETRY_REQUESTS,
+ factor: 3,
+ maxTimeout: Infinity,
+ errorCodes: [ 'EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED' ]
+ }
+ )
+}
- const url = http + '://' + host + ':' + port
- let signature
+function makeSecureRequest (params, callback) {
+ const myUrl = http + '://' + host + ':' + port
- // Add signature if it is specified in the params
- if (allData.method === 'POST' && allData.data && allData.sign === true) {
- signature = peertubeCrypto.sign(url)
+ const requestParams = {
+ url: params.toPod.url + params.path
}
- // Make a request for each pod
- async.each(pods, function (pod, callbackEachAsync) {
- function callbackEachRetryRequest (err, response, body, url, pod) {
- if (callbackEach !== null) {
- callbackEach(err, response, body, url, pod, function () {
- callbackEachAsync()
- })
- } else {
- callbackEachAsync()
- }
- }
+ // Add data with POST requst ?
+ if (params.method === 'POST') {
+ requestParams.json = {}
- const params = {
- url: pod.url + allData.path,
- method: allData.method
+ // Add signature if it is specified in the params
+ if (params.sign === true) {
+ requestParams.json.signature = {
+ url: myUrl,
+ signature: peertubeCrypto.sign(myUrl)
+ }
}
- // Add data with POST requst ?
- if (allData.method === 'POST' && allData.data) {
- // Encrypt data ?
- if (allData.encrypt === true) {
- peertubeCrypto.encrypt(pod.publicKey, JSON.stringify(allData.data), function (err, encrypted) {
+ // If there are data informations
+ if (params.data) {
+ // Encrypt data
+ if (params.encrypt === true) {
+ peertubeCrypto.encrypt(params.toPod.publicKey, JSON.stringify(params.data), function (err, encrypted) {
if (err) return callback(err)
- params.json = {
- data: encrypted.data,
- key: encrypted.key
- }
+ requestParams.json.data = encrypted.data
+ requestParams.json.key = encrypted.key
- makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest)
+ request.post(requestParams, callback)
})
} else {
- params.json = { data: allData.data }
- makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest)
+ // No encryption
+ requestParams.json.data = params.data
+ request.post(requestParams, callback)
}
} else {
- makeRetryRequest(params, url, pod, signature, callbackEachRetryRequest)
+ // No data
+ request.post(requestParams, callback)
}
- }, callback)
+ } else {
+ request.get(requestParams, callback)
+ }
}
// ---------------------------------------------------------------------------
module.exports = requests
-
-// ---------------------------------------------------------------------------
-
-function makeRetryRequest (params, fromUrl, toPod, signature, callbackEach) {
- // Append the signature
- if (signature) {
- params.json.signature = {
- url: fromUrl,
- signature: signature
- }
- }
-
- logger.debug('Make retry requests to %s.', toPod.url)
-
- replay(
- request.post(params, function (err, response, body) {
- callbackEach(err, response, body, params.url, toPod)
- }),
- {
- retries: constants.REQUEST_RETRIES,
- factor: 3,
- maxTimeout: Infinity,
- errorCodes: [ 'EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'ECONNREFUSED' ]
- }
- ).on('replay', function (replay) {
- logger.info('Replaying request to %s. Request failed: %d %s. Replay number: #%d. Will retry in: %d ms.',
- params.url, replay.error.code, replay.error.message, replay.number, replay.delay)
- })
-}
BONUS: 10
}
-// Number of retries we make for the make retry requests (to friends...)
-let REQUEST_RETRIES = 10
+// Number of requests in parallel we can make
+const REQUESTS_IN_PARALLEL = 10
-// Different types or requests for the request scheduler module
-const REQUEST_SCHEDULER_TYPE = [ 'add', 'remove' ]
+// Number of requests to retry for replay requests module
+const RETRY_REQUESTS = 5
// Sortable columns per schema
const SEARCHABLE_COLUMNS = {
FRIEND_BASE_SCORE = 20
INTERVAL = 10000
VIDEOS_CONSTRAINTS_FIELDS.DURATION.max = 14
- REQUEST_RETRIES = 2
}
// ---------------------------------------------------------------------------
INTERVAL: INTERVAL,
PAGINATION_COUNT_DEFAULT: PAGINATION_COUNT_DEFAULT,
PODS_SCORE: PODS_SCORE,
- REQUEST_RETRIES: REQUEST_RETRIES,
- REQUEST_SCHEDULER_TYPE: REQUEST_SCHEDULER_TYPE,
+ REQUESTS_IN_PARALLEL: REQUESTS_IN_PARALLEL,
+ RETRY_REQUESTS: RETRY_REQUESTS,
SEARCHABLE_COLUMNS: SEARCHABLE_COLUMNS,
SORTABLE_COLUMNS: SORTABLE_COLUMNS,
THUMBNAILS_SIZE: THUMBNAILS_SIZE,
getMyCertificate: getMyCertificate,
makeFriends: makeFriends,
quitFriends: quitFriends,
- removeVideoToFriends: removeVideoToFriends
+ removeVideoToFriends: removeVideoToFriends,
+ sendOwnedVideosToPod: sendOwnedVideosToPod
}
function addVideoToFriends (video) {
- // To avoid duplicates
- const id = video.name + video.magnetUri
// ensure namePath is null
video.namePath = null
- requestsScheduler.addRequest(id, 'add', video)
+
+ requestsScheduler.addRequest('add', video)
}
function hasFriends (callback) {
const urls = config.get('network.friends')
- async.each(urls, function (url, callbackEach) {
+ async.eachSeries(urls, function (url, callbackEach) {
computeForeignPodsList(url, podsScore, callbackEach)
}, function (err) {
if (err) return callback(err)
// Stop pool requests
requestsScheduler.deactivate()
// Flush pool requests
- requestsScheduler.forceSend()
+ requestsScheduler.flush()
async.waterfall([
function getPodsList (callbackAsync) {
},
function announceIQuitMyFriends (pods, callbackAsync) {
- const request = {
+ const requestParams = {
method: 'POST',
path: '/api/' + constants.API_VERSION + '/pods/remove',
- sign: true,
- encrypt: true,
- data: {
- url: 'me' // Fake data
- }
+ sign: true
}
// Announce we quit them
- requests.makeMultipleRetryRequest(request, pods, function (err) {
- return callbackAsync(err)
+ // We don't care if the request fails
+ // The other pod will exclude us automatically after a while
+ async.eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
+ requestParams.toPod = pod
+ requests.makeSecureRequest(requestParams, callbackEach)
+ }, function (err) {
+ if (err) {
+ logger.error('Some errors while quitting friends.', { err: err })
+ // Don't stop the process
+ }
+
+ return callbackAsync()
})
},
}
function removeVideoToFriends (video) {
- // To avoid duplicates
- const id = video.name + video.magnetUri
- requestsScheduler.addRequest(id, 'remove', video)
+ requestsScheduler.addRequest('remove', video)
+}
+
+function sendOwnedVideosToPod (podId) {
+ Videos.listOwned(function (err, videosList) {
+ if (err) {
+ logger.error('Cannot get the list of videos we own.')
+ return
+ }
+
+ videosList.forEach(function (video) {
+ videos.convertVideoToRemote(video, function (err, remoteVideo) {
+ if (err) {
+ logger.error('Cannot convert video to remote.', { error: err })
+ // Don't break the process
+ return
+ }
+
+ requestsScheduler.addRequestTo([ podId ], 'add', remoteVideo)
+ })
+ })
+ })
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
function computeForeignPodsList (url, podsScore, callback) {
- // Let's give 1 point to the pod we ask the friends list
- podsScore[url] = 1
-
getForeignPodsList(url, function (err, foreignPodsList) {
if (err) return callback(err)
- if (foreignPodsList.length === 0) return callback()
+
+ if (!foreignPodsList) foreignPodsList = []
+
+ // Let's give 1 point to the pod we ask the friends list
+ foreignPodsList.push({ url: url })
foreignPodsList.forEach(function (foreignPod) {
- const foreignUrl = foreignPod.url
+ const foreignPodUrl = foreignPod.url
- if (podsScore[foreignUrl]) podsScore[foreignUrl]++
- else podsScore[foreignUrl] = 1
+ if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++
+ else podsScore[foreignPodUrl] = 1
})
callback()
// Flush pool requests
requestsScheduler.forceSend()
- // Get the list of our videos to send to our new friends
- Videos.listOwned(function (err, videosList) {
- if (err) {
- logger.error('Cannot get the list of videos we own.')
- return callback(err)
- }
-
- const data = {
- url: http + '://' + host + ':' + port,
- publicKey: cert,
- videos: videosList
+ async.eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
+ const params = {
+ url: pod.url + '/api/' + constants.API_VERSION + '/pods/',
+ method: 'POST',
+ json: {
+ url: http + '://' + host + ':' + port,
+ publicKey: cert
+ }
}
- requests.makeMultipleRetryRequest(
- { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
-
- podsList,
-
- // Callback called after each request
- function eachRequest (err, response, body, url, pod, callbackEachRequest) {
- // We add the pod if it responded correctly with its public certificate
- if (!err && response.statusCode === 200) {
- Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
- if (err) {
- logger.error('Error with adding %s pod.', pod.url, { error: err })
- return callbackEachRequest()
- }
-
- videos.createRemoteVideos(body.videos, function (err) {
- if (err) {
- logger.error('Error with adding videos of pod.', pod.url, { error: err })
- return callbackEachRequest()
- }
-
- logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
- return callbackEachRequest()
- })
- })
- } else {
- logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
- return callbackEachRequest()
- }
- },
+ requests.makeRetryRequest(params, function (err, res, body) {
+ if (err) {
+ logger.error('Error with adding %s pod.', pod.url, { error: err })
+ // Don't break the process
+ return callbackEach()
+ }
- // Final callback, we've ended all the requests
- function endRequests (err) {
- // Now we made new friends, we can re activate the pool of requests
- requestsScheduler.activate()
+ if (res.statusCode === 200) {
+ Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err, podCreated) {
+ if (err) logger.error('Cannot add friend %s pod.', pod.url)
- if (err) {
- logger.error('There was some errors when we wanted to make friends.')
- return callback(err)
- }
+ // Add our videos to the request scheduler
+ sendOwnedVideosToPod(podCreated._id)
- logger.debug('makeRequestsToWinningPods finished.')
- return callback(null)
+ return callbackEach()
+ })
+ } else {
+ logger.error('Status not 200 for %s pod.', pod.url)
+ return callbackEach()
}
- )
+ })
+ }, function endRequests () {
+ // Final callback, we've ended all the requests
+ // Now we made new friends, we can re activate the pool of requests
+ requestsScheduler.activate()
+
+ logger.debug('makeRequestsToWinningPods finished.')
+ return callback()
})
}
const videos = require('../lib/videos')
const Videos = require('../models/videos')
-const REQUEST_SCHEDULER_TYPE = constants.REQUEST_SCHEDULER_TYPE
let timer = null
const requestsScheduler = {
activate: activate,
addRequest: addRequest,
+ addRequestTo: addRequestTo,
deactivate: deactivate,
+ flush: flush,
forceSend: forceSend
}
}
// Add request to the scheduler
-function addRequest (id, type, request) {
- logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
+function addRequest (type, data) {
+ logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
- Requests.findById(id, function (err, entity) {
+ const request = {
+ type: type,
+ data: data
+ }
+
+ Pods.listAllIds(function (err, podIds) {
if (err) {
- logger.error('Error when trying to find a request.', { error: err })
- return // Abort
+ logger.debug('Cannot list pod ids.')
+ return
}
- // If there were already a request with this id in the scheduler...
- if (entity) {
- if (entity.type === type) {
- logger.error('Cannot insert two same requests.')
- return // Abort
- }
+ // No friends
+ if (!podIds) return
- // Remove the request of the other type
- Requests.removeRequestById(id, function (err) {
- if (err) {
- logger.error('Cannot remove a request.', { error: err })
- return // Abort
- }
- })
- } else {
- Requests.create(id, type, request, function (err) {
- if (err) logger.error('Cannot create a request.', { error: err })
- return // Abort
- })
- }
+ Requests.create(request, podIds, function (err) {
+ if (err) logger.error('Cannot create a request.', { error: err })
+ })
+ })
+}
+
+function addRequestTo (podIds, type, data) {
+ const request = {
+ type: type,
+ data: data
+ }
+
+ Requests.create(request, podIds, function (err) {
+ if (err) logger.error('Cannot create a request.', { error: err })
})
}
clearInterval(timer)
}
+function flush () {
+ Requests.removeAll(function (err) {
+ if (err) {
+ logger.error('Cannot flush the requests.', { error: err })
+ }
+ })
+}
+
function forceSend () {
logger.info('Force requests scheduler sending.')
makeRequests()
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
-function makeRequest (type, requestsToMake, callback) {
+function makeRequest (toPod, requestsToMake, callback) {
if (!callback) callback = function () {}
- Pods.list(function (err, pods) {
- if (err) return callback(err)
-
- const params = {
- encrypt: true, // Security
- sign: true, // To prove our identity
- method: 'POST',
- path: null, // We build the path later
- data: requestsToMake // Requests we need to make
- }
-
- // If this is a valid type, we build the path
- if (REQUEST_SCHEDULER_TYPE.indexOf(type) > -1) {
- params.path = '/api/' + constants.API_VERSION + '/remotevideos/' + type
- } else {
- return callback(new Error('Unkown pool request type.'))
- }
-
- const badPods = []
- const goodPods = []
-
- // Make multiple retry requests to all of pods
- // The function fire some useful callbacks
- requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
-
- function callbackEachPodFinished (err, response, body, url, pod, callbackEachPodFinished) {
- // We failed the request, add the pod unreachable to the bad pods list
- if (err || (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204)) {
- badPods.push(pod._id)
- logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
- } else {
- // Request success
- goodPods.push(pod._id)
- }
-
- return callbackEachPodFinished()
+ const params = {
+ toPod: toPod,
+ encrypt: true, // Security
+ sign: true, // To prove our identity
+ method: 'POST',
+ path: '/api/' + constants.API_VERSION + '/remote/videos',
+ data: requestsToMake // Requests we need to make
+ }
+
+ // Make multiple retry requests to all of pods
+ // The function fire some useful callbacks
+ requests.makeSecureRequest(params, function (err, res) {
+ if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
+ logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
+
+ return callback(false)
}
- function callbackAllPodsFinished (err) {
- if (err) return callback(err)
-
- // All the requests were made, we update the pods score
- updatePodsScore(goodPods, badPods)
- callback(null)
- }
+ return callback(true)
})
}
logger.info('Making requests to friends.')
+ // Requests by pods id
const requestsToMake = {}
- for (const type of REQUEST_SCHEDULER_TYPE) {
- requestsToMake[type] = {
- ids: [],
- requests: []
- }
- }
- // For each requests to make, we add it to the correct request type
requests.forEach(function (poolRequest) {
- if (REQUEST_SCHEDULER_TYPE.indexOf(poolRequest.type) > -1) {
- const requestTypeToMake = requestsToMake[poolRequest.type]
- requestTypeToMake.requests.push(poolRequest.request)
- requestTypeToMake.ids.push(poolRequest._id)
- } else {
- logger.error('Unkown request type.', { request_type: poolRequest.type })
- return // abort
- }
+ poolRequest.to.forEach(function (toPodId) {
+ if (!requestsToMake[toPodId]) {
+ requestsToMake[toPodId] = {
+ ids: [],
+ datas: []
+ }
+ }
+
+ requestsToMake[toPodId].ids.push(poolRequest._id)
+ requestsToMake[toPodId].datas.push(poolRequest.request)
+ })
})
- for (let type of Object.keys(requestsToMake)) {
- const requestTypeToMake = requestsToMake[type]
- // If there are requests for this type
- if (requestTypeToMake.requests.length !== 0) {
- makeRequest(type, requestTypeToMake.requests, function (err) {
- if (err) logger.error('Errors when sent ' + type + ' requests.', { error: err })
+ const goodPods = []
+ const badPods = []
- // We made the requests, so we can remove them from the scheduler
- Requests.removeRequests(requestTypeToMake.ids)
+ async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
+ const requestToMake = requestsToMake[toPodId]
+
+ // FIXME: mongodb request inside a loop :/
+ Pods.findById(toPodId, function (err, toPod) {
+ if (err) return logger.error('Error finding pod by id.', { err: err })
+
+ // Maybe the pod is not our friend anymore so simply remove them
+ if (!toPod) {
+ Requests.removePodOf(requestToMake.ids, toPodId)
+ return callbackEach()
+ }
+
+ makeRequest(toPod, requestToMake.datas, function (success) {
+ if (err) {
+ logger.error('Errors when sent request to %s.', toPod.url, { error: err })
+ // Do not stop the process just for one error
+ return callbackEach()
+ }
+
+ if (success === true) {
+ logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
+
+ // Remove the pod id of these request ids
+ Requests.removePodOf(requestToMake.ids, toPodId)
+ goodPods.push(toPodId)
+ } else {
+ badPods.push(toPodId)
+ }
+
+ callbackEach()
})
- }
- }
+ })
+ }, function () {
+ // All the requests were made, we update the pods score
+ updatePodsScore(goodPods, badPods)
+ // Flush requests with no pod
+ Requests.removeWithEmptyTo()
+ })
})
}
const thumbnailsDir = pathUtils.join(__dirname, '..', '..', config.get('storage.thumbnails'))
const videos = {
+ convertVideoToRemote: convertVideoToRemote,
createRemoteVideos: createRemoteVideos,
getVideoDuration: getVideoDuration,
getVideoState: getVideoState,
seedAllExisting: seedAllExisting
}
+function convertVideoToRemote (video, callback) {
+ fs.readFile(thumbnailsDir + video.thumbnail, function (err, thumbnailData) {
+ if (err) {
+ logger.error('Cannot read the thumbnail of the video')
+ return callback(err)
+ }
+
+ const remoteVideo = {
+ name: video.name,
+ description: video.description,
+ magnetUri: video.magnetUri,
+ author: video.author,
+ duration: video.duration,
+ thumbnailBase64: new Buffer(thumbnailData).toString('base64'),
+ tags: video.tags,
+ createdDate: video.createdDate,
+ podUrl: video.podUrl
+ }
+
+ return callback(null, remoteVideo)
+ })
+}
+
function createRemoteVideos (videos, callback) {
// Create the remote videos from the new pod
createRemoteVideoObjects(videos, function (err, remoteVideos) {
podUrl: video.podUrl,
duration: video.duration,
thumbnail: thumbnailName,
- tags: video.tags
+ tags: video.tags,
+ author: video.author
}
remoteVideos.push(params)
}
function podsAdd (req, res, next) {
- req.checkBody('data.url', 'Should have an url').notEmpty().isURL({ require_protocol: true })
- req.checkBody('data.publicKey', 'Should have a public key').notEmpty()
+ req.checkBody('url', 'Should have an url').notEmpty().isURL({ require_protocol: true })
+ req.checkBody('publicKey', 'Should have a public key').notEmpty()
+
+ // TODO: check we don't have it already
logger.debug('Checking podsAdd parameters', { parameters: req.body })
const logger = require('../../helpers/logger')
const reqValidatorsRemote = {
- remoteVideosAdd: remoteVideosAdd,
- remoteVideosRemove: remoteVideosRemove,
- secureRequest: secureRequest
+ dataToDecrypt: dataToDecrypt,
+ remoteVideos: remoteVideos,
+ signature: signature
}
-function remoteVideosAdd (req, res, next) {
- req.checkBody('data').isArray()
- req.checkBody('data').isEachAddRemoteVideosValid()
+function dataToDecrypt (req, res, next) {
+ req.checkBody('key', 'Should have a key').notEmpty()
+ req.checkBody('data', 'Should have data').notEmpty()
- logger.debug('Checking remoteVideosAdd parameters', { parameters: req.body })
+ logger.debug('Checking dataToDecrypt parameters', { parameters: { keyLength: req.body.key.length, bodyLength: req.body.data.length } })
checkErrors(req, res, next)
}
-function remoteVideosRemove (req, res, next) {
+function remoteVideos (req, res, next) {
req.checkBody('data').isArray()
- req.checkBody('data').isEachRemoveRemoteVideosValid()
+ req.checkBody('data').isEachRemoteVideosValid()
- logger.debug('Checking remoteVideosRemove parameters', { parameters: req.body })
+ logger.debug('Checking remoteVideosAdd parameters', { parameters: req.body })
checkErrors(req, res, next)
}
-function secureRequest (req, res, next) {
+function signature (req, res, next) {
req.checkBody('signature.url', 'Should have a signature url').isURL()
req.checkBody('signature.signature', 'Should have a signature').notEmpty()
- req.checkBody('key', 'Should have a key').notEmpty()
- req.checkBody('data', 'Should have data').notEmpty()
- logger.debug('Checking secureRequest parameters', { parameters: { data: req.body.data, keyLength: req.body.key.length } })
+ logger.debug('Checking signature parameters', { parameters: { signatureUrl: req.body.signature.url } })
checkErrors(req, res, next)
}
const Pods = {
add: add,
count: count,
+ findById: findById,
findByUrl: findByUrl,
findBadPods: findBadPods,
incrementScores: incrementScores,
list: list,
+ listAllIds: listAllIds,
+ listAllUrls: listAllUrls,
remove: remove,
removeAll: removeAll,
removeAllByIds: removeAllByIds
PodsDB.find({ score: 0 }, callback)
}
+function findById (id, callback) {
+ PodsDB.findById(id, callback)
+}
+
function findByUrl (url, callback) {
PodsDB.findOne({ url: url }, callback)
}
})
}
+function listAllIds (callback) {
+ return PodsDB.find({}, { _id: 1 }, callback)
+}
+
+function listAllUrls (callback) {
+ return PodsDB.find({}, { _id: 0, url: 1 }, callback)
+}
+
function remove (url, callback) {
if (!callback) callback = function () {}
PodsDB.remove({ url: url }, callback)
// ---------------------------------------------------------------------------
const requestsSchema = mongoose.Schema({
- type: String,
- id: String, // Special id to find duplicates (video created we want to remove...)
- request: mongoose.Schema.Types.Mixed
+ request: mongoose.Schema.Types.Mixed,
+ to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'users' } ]
})
const RequestsDB = mongoose.model('requests', requestsSchema)
create: create,
findById: findById,
list: list,
+ removeAll: removeAll,
+ removePodOf: removePodOf,
removeRequestById: removeRequestById,
- removeRequests: removeRequests
+ removeRequests: removeRequests,
+ removeWithEmptyTo: removeWithEmptyTo
}
-function create (id, type, request, callback) {
- RequestsDB.create({ id: id, type: type, request: request }, callback)
+function create (request, to, callback) {
+ RequestsDB.create({ request: request, to: to }, callback)
}
function findById (id, callback) {
}
function list (callback) {
- RequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback)
+ RequestsDB.find({}, { _id: 1, request: 1, to: 1 }, callback)
+}
+
+function removeAll (callback) {
+ RequestsDB.remove({ }, callback)
+}
+
+function removePodOf (requestsIds, podId, callback) {
+ if (!callback) callback = function () {}
+
+ RequestsDB.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
}
function removeRequestById (id, callback) {
})
}
+function removeWithEmptyTo (callback) {
+ if (!callback) callback = function () {}
+
+ RequestsDB.remove({ to: { $size: 0 } }, callback)
+}
+
// ---------------------------------------------------------------------------
module.exports = Requests
it('Should fail without public key', function (done) {
const data = {
- data: {
- url: 'http://coucou.com'
- }
+ url: 'http://coucou.com'
}
makePostBodyRequest(path, data, done)
})
it('Should fail without an url', function (done) {
const data = {
- data: {
- publicKey: 'mysuperpublickey'
- }
+ publicKey: 'mysuperpublickey'
}
makePostBodyRequest(path, data, done)
})
it('Should fail with an incorrect url', function (done) {
const data = {
- data: {
- url: 'coucou.com',
- publicKey: 'mysuperpublickey'
- }
+ url: 'coucou.com',
+ publicKey: 'mysuperpublickey'
}
makePostBodyRequest(path, data, function () {
- data.data.url = 'http://coucou'
+ data.url = 'http://coucou'
makePostBodyRequest(path, data, function () {
- data.data.url = 'coucou'
+ data.url = 'coucou'
makePostBodyRequest(path, data, done)
})
})
it('Should succeed with the correct parameters', function (done) {
const data = {
- data: {
- url: 'http://coucou.com',
- publicKey: 'mysuperpublickey'
- }
+ url: 'http://coucou.com',
+ publicKey: 'mysuperpublickey'
}
makePostBodyRequest(path, data, done, false)
})
function (next) {
makeFriends(4, next)
},
+ // Check the pods 1, 2, 3 and 4 are friends
+ function (next) {
+ async.each([ 1, 2, 3, 4 ], function (i, callback) {
+ getFriendsList(i, function (err, res) {
+ if (err) throw err
+
+ expect(res.body.length).to.equal(3)
+
+ callback()
+ })
+ }, next)
+ },
// Kill pod 4
function (next) {
servers[3].app.kill()
uploadVideo(2, next)
},
function (next) {
- setTimeout(next, 20000)
+ setTimeout(next, 11000)
},
// Rerun server 4
function (next) {
// Pod 6 ask pod 1, 2 and 3
function (next) {
makeFriends(6, next)
+ },
+ function (next) {
+ setTimeout(next, 11000)
}],
function (err) {
if (err) throw err
done()
})
- }, 5000)
+ }, 11000)
})
})
if (err) throw err
const result = res.body
- const resultUrls = [ result[0].url, result[1].url ]
expect(result).to.be.an('array')
expect(result.length).to.equal(2)
+
+ const resultUrls = [ result[0].url, result[1].url ]
expect(resultUrls[0]).to.not.equal(resultUrls[1])
const errorString = 'Friends url do not correspond for ' + serverToTest.url
expect(video.duration).to.equal(10)
expect(video.tags).to.deep.equal([ 'tag1p1', 'tag2p1' ])
expect(utils.dateIsValid(video.createdDate)).to.be.true
+ expect(video.author).to.equal('root')
if (server.url !== 'http://localhost:9001') {
expect(video.isLocal).to.be.false
expect(video.duration).to.equal(5)
expect(video.tags).to.deep.equal([ 'tag1p2', 'tag2p2', 'tag3p2' ])
expect(utils.dateIsValid(video.createdDate)).to.be.true
+ expect(video.author).to.equal('root')
if (server.url !== 'http://localhost:9002') {
expect(video.isLocal).to.be.false
expect(video1.magnetUri).to.exist
expect(video1.duration).to.equal(5)
expect(video1.tags).to.deep.equal([ 'tag1p3' ])
+ expect(video1.author).to.equal('root')
expect(utils.dateIsValid(video1.createdDate)).to.be.true
expect(video2.name).to.equal('my super name for pod 3-2')
expect(video2.magnetUri).to.exist
expect(video2.duration).to.equal(5)
expect(video2.tags).to.deep.equal([ 'tag2p3', 'tag3p3', 'tag4p3' ])
+ expect(video2.author).to.equal('root')
expect(utils.dateIsValid(video2.createdDate)).to.be.true
if (server.url !== 'http://localhost:9003') {