From c173e56520b0fe4206b9ea8049b6add40bfeabcd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 4 Feb 2016 21:10:33 +0100 Subject: [PATCH] Split models --- controllers/api/v1/pods.js | 56 ++++-- controllers/api/v1/remoteVideos.js | 5 +- controllers/api/v1/videos.js | 77 ++++++-- initializers/database.js | 58 ++---- lib/friends.js | 218 ++++++++++++++++++++++ lib/poolRequests.js | 61 ++---- lib/videos.js | 51 +++++ lib/webtorrent.js | 2 +- middlewares/misc.js | 9 +- middlewares/reqValidators/pods.js | 15 ++ middlewares/reqValidators/videos.js | 32 ++-- models/pods.js | 276 +++++----------------------- models/poolRequests.js | 67 +++++++ models/videos.js | 202 ++++++++------------ server.js | 8 +- 15 files changed, 629 insertions(+), 508 deletions(-) create mode 100644 lib/friends.js create mode 100644 lib/videos.js create mode 100644 models/poolRequests.js diff --git a/controllers/api/v1/pods.js b/controllers/api/v1/pods.js index b073e85af..82d8d7f08 100644 --- a/controllers/api/v1/pods.js +++ b/controllers/api/v1/pods.js @@ -2,18 +2,23 @@ 'use strict' var express = require('express') + var fs = require('fs') + var logger = require('../../../helpers/logger') + var friends = require('../../../lib/friends') var middleware = require('../../../middlewares') var miscMiddleware = middleware.misc - var pods = require('../../../models/pods') + var Pods = require('../../../models/pods') var reqValidator = middleware.reqValidators.pods var secureRequest = middleware.reqValidators.remote.secureRequest + var utils = require('../../../helpers/utils') + var Videos = require('../../../models/videos') var router = express.Router() router.get('/', miscMiddleware.cache(false), listPods) router.post('/', reqValidator.podsAdd, miscMiddleware.cache(false), addPods) - router.get('/makefriends', miscMiddleware.cache(false), makeFriends) + router.get('/makefriends', reqValidator.makeFriends, miscMiddleware.cache(false), makeFriends) router.get('/quitfriends', miscMiddleware.cache(false), quitFriends) // Post because this is a secured request router.post('/remove', secureRequest, miscMiddleware.decryptBody, removePods) @@ -25,15 +30,32 @@ // --------------------------------------------------------------------------- function addPods (req, res, next) { - pods.add(req.body.data, function (err, json) { + var informations = req.body.data + Pods.add(informations, function (err) { if (err) return next(err) - res.json(json) + Videos.addRemotes(informations.videos) + + fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { + if (err) { + logger.error('Cannot read cert file.', { error: err }) + return next(err) + } + + Videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get the list of owned videos.', { error: err }) + return next(err) + } + + res.json({ cert: cert, videos: videos_list }) + }) + }) }) } function listPods (req, res, next) { - pods.list(function (err, pods_list) { + Pods.list(function (err, pods_list) { if (err) return next(err) res.json(pods_list) @@ -41,32 +63,28 @@ } function makeFriends (req, res, next) { - pods.hasFriends(function (err, has_friends) { + friends.makeFriends(function (err) { if (err) return next(err) - if (has_friends === true) { - // We need to quit our friends before make new ones - res.sendStatus(409) - } else { - pods.makeFriends(function (err) { - if (err) return next(err) - - res.sendStatus(204) - }) - } + res.sendStatus(204) }) } function removePods (req, res, next) { - pods.remove(req.body.signature.url, function (err) { + var url = req.body.signature.url + Pods.remove(url, function (err) { if (err) return next(err) - res.sendStatus(204) + Videos.removeAllRemotesOf(url, function (err) { + if (err) logger.error('Cannot remove all remote videos of %s.', url) + logger.info('%s pod removed.', url) + res.sendStatus(204) + }) }) } function quitFriends (req, res, next) { - pods.quitFriends(function (err) { + friends.quitFriends(function (err) { if (err) return next(err) res.sendStatus(204) diff --git a/controllers/api/v1/remoteVideos.js b/controllers/api/v1/remoteVideos.js index 2be2fc87e..d72db9836 100644 --- a/controllers/api/v1/remoteVideos.js +++ b/controllers/api/v1/remoteVideos.js @@ -42,7 +42,10 @@ } function removeRemoteVideo (req, res, next) { - videos.removeRemotes(req.body.signature.url, pluck(req.body.data, 'magnetUri'), function (err) { + var url = req.body.signature.url + var magnetUris = pluck(req.body.data, 'magnetUri') + + videos.removeRemotesOfByMagnetUris(url, magnetUris, function (err) { if (err) return next(err) res.sendStatus(204) diff --git a/controllers/api/v1/videos.js b/controllers/api/v1/videos.js index 64b05e32b..d2e7e8825 100644 --- a/controllers/api/v1/videos.js +++ b/controllers/api/v1/videos.js @@ -6,10 +6,14 @@ var express = require('express') var multer = require('multer') + var logger = require('../../../helpers/logger') + var friends = require('../../../lib/friends') var middleware = require('../../../middlewares') var miscMiddleware = middleware.misc var reqValidator = middleware.reqValidators.videos - var videos = require('../../../models/videos') + var Videos = require('../../../models/videos') // model + var videos = require('../../../lib/videos') + var webtorrent = require('../../../lib/webTorrentNode') var router = express.Router() var uploads = config.get('storage.uploads') @@ -35,7 +39,7 @@ var reqFiles = multer({ storage: storage }).fields([{ name: 'input_video', maxCount: 1 }]) router.get('/', miscMiddleware.cache(false), listVideos) - router.post('/', reqFiles, reqValidator.videosAdd, miscMiddleware.cache(false), addVideos) + router.post('/', reqFiles, reqValidator.videosAdd, miscMiddleware.cache(false), addVideo) router.get('/:id', reqValidator.videosGet, miscMiddleware.cache(false), getVideos) router.delete('/:id', reqValidator.videosRemove, miscMiddleware.cache(false), removeVideo) router.get('/search/:name', reqValidator.videosSearch, miscMiddleware.cache(false), searchVideos) @@ -46,17 +50,41 @@ // --------------------------------------------------------------------------- - function addVideos (req, res, next) { - videos.add({ video: req.files.input_video[0], data: req.body }, function (err) { - if (err) return next(err) + function addVideo (req, res, next) { + var video_file = req.files.input_video[0] + var video_infos = req.body + + videos.seed(video_file.path, function (err, torrent) { + if (err) { + logger.error('Cannot seed this video.', { error: err }) + return next(err) + } + + var video_data = { + name: video_infos.name, + namePath: video_file.filename, + description: video_infos.description, + magnetUri: torrent.magnetURI + } + + Videos.add(video_data, function (err) { + if (err) { + // TODO unseed the video + logger.error('Cannot insert this video in the database.', { error: err }) + return next(err) + } - // TODO : include Location of the new video - res.sendStatus(201) + // Now we'll add the video's meta data to our friends + friends.addVideoToFriends(video_data) + + // TODO : include Location of the new video + res.sendStatus(201) + }) }) } function getVideos (req, res, next) { - videos.get(req.params.id, function (err, video) { + Videos.get(req.params.id, function (err, video) { if (err) return next(err) if (video === null) { @@ -68,7 +96,7 @@ } function listVideos (req, res, next) { - videos.list(function (err, videos_list) { + Videos.list(function (err, videos_list) { if (err) return next(err) res.json(videos_list) @@ -76,18 +104,43 @@ } function removeVideo (req, res, next) { - videos.remove(req.params.id, function (err) { + var video_id = req.params.id + Videos.get(video_id, function (err, video) { if (err) return next(err) - res.sendStatus(204) + removeTorrent(video.magnetUri, function () { + Videos.removeOwned(req.params.id, function (err) { + if (err) return next(err) + + var params = { + name: video.name, + magnetUri: video.magnetUri + } + + friends.removeVideoToFriends(params) + res.sendStatus(204) + }) + }) }) } function searchVideos (req, res, next) { - videos.search(req.params.name, function (err, videos_list) { + Videos.search(req.params.name, function (err, videos_list) { if (err) return next(err) res.json(videos_list) }) } + + // --------------------------------------------------------------------------- + + // Maybe the torrent is not seeded, but we catch the error to don't stop the removing process + function removeTorrent (magnetUri, callback) { + try { + webtorrent.remove(magnetUri, callback) + } catch (err) { + logger.warn('Cannot remove the torrent from WebTorrent', { err: err }) + return callback(null) + } + } })() diff --git a/initializers/database.js b/initializers/database.js index e041d5c4b..96c172637 100644 --- a/initializers/database.js +++ b/initializers/database.js @@ -4,59 +4,29 @@ var config = require('config') var mongoose = require('mongoose') - var constants = require('./constants') var logger = require('../helpers/logger') var dbname = 'peertube' + config.get('database.suffix') var host = config.get('database.host') var port = config.get('database.port') - // ----------- Pods ----------- - var podsSchema = mongoose.Schema({ - url: String, - publicKey: String, - score: { type: Number, max: constants.FRIEND_BASE_SCORE } - }) - - var PodsDB = mongoose.model('pods', podsSchema) - - // ----------- PoolRequests ----------- - var poolRequestsSchema = mongoose.Schema({ - type: String, - id: String, // Special id to find duplicates (video created we want to remove...) - request: mongoose.Schema.Types.Mixed - }) - - var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema) - - // ----------- Videos ----------- - var videosSchema = mongoose.Schema({ - name: String, - namePath: String, - description: String, - magnetUri: String, - podUrl: String - }) - - var VideosDB = mongoose.model('videos', videosSchema) + var database = { + connect: connect + } - // --------------------------------------------------------------------------- + function connect () { + mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname) + mongoose.connection.on('error', function () { + logger.error('Mongodb connection error.') + process.exit(0) + }) - module.exports = { - PodsDB: PodsDB, - PoolRequestsDB: PoolRequestsDB, - VideosDB: VideosDB + mongoose.connection.on('open', function () { + logger.info('Connected to mongodb.') + }) } - // ----------- Connection ----------- - - mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname) - mongoose.connection.on('error', function () { - logger.error('Mongodb connection error.') - process.exit(0) - }) + // --------------------------------------------------------------------------- - mongoose.connection.on('open', function () { - logger.info('Connected to mongodb.') - }) + module.exports = database })() diff --git a/lib/friends.js b/lib/friends.js new file mode 100644 index 000000000..e093c85c4 --- /dev/null +++ b/lib/friends.js @@ -0,0 +1,218 @@ +;(function () { + 'use strict' + + var async = require('async') + var config = require('config') + var fs = require('fs') + var request = require('request') + + var constants = require('../initializers/constants') + var logger = require('../helpers/logger') + var Pods = require('../models/pods') + var PoolRequests = require('../models/poolRequests') + var poolRequests = require('../lib/poolRequests') + var utils = require('../helpers/utils') + var Videos = require('../models/videos') + + var http = config.get('webserver.https') ? 'https' : 'http' + var host = config.get('webserver.host') + var port = config.get('webserver.port') + + var pods = { + addVideoToFriends: addVideoToFriends, + hasFriends: hasFriends, + makeFriends: makeFriends, + quitFriends: quitFriends, + removeVideoToFriends: removeVideoToFriends + } + + function addVideoToFriends (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + // namePath is null + // TODO + video.namePath = null + PoolRequests.addRequest(id, 'add', video) + } + + function hasFriends (callback) { + Pods.count(function (err, count) { + if (err) return callback(err) + + var has_friends = (count !== 0) + callback(null, has_friends) + }) + } + + function makeFriends (callback) { + var pods_score = {} + + logger.info('Make friends!') + fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { + if (err) { + logger.error('Cannot read public cert.', { error: err }) + return callback(err) + } + + var urls = config.get('network.friends') + + async.each(urls, computeForeignPodsList, function () { + logger.debug('Pods scores computed.', { pods_score: pods_score }) + var pods_list = computeWinningPods(urls, pods_score) + logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list }) + + makeRequestsToWinningPods(cert, pods_list) + }) + }) + + // ----------------------------------------------------------------------- + + function computeForeignPodsList (url, callback) { + // Let's give 1 point to the pod we ask the friends list + pods_score[url] = 1 + + getForeignPodsList(url, function (foreign_pods_list) { + if (foreign_pods_list.length === 0) return callback() + + async.each(foreign_pods_list, function (foreign_pod, callback_each) { + var foreign_url = foreign_pod.url + + if (pods_score[foreign_url]) pods_score[foreign_url]++ + else pods_score[foreign_url] = 1 + + callback_each() + }, function () { + callback() + }) + }) + } + + function computeWinningPods (urls, pods_score) { + // Build the list of pods to add + // Only add a pod if it exists in more than a half base pods + var pods_list = [] + var base_score = urls.length / 2 + Object.keys(pods_score).forEach(function (pod) { + if (pods_score[pod] > base_score) pods_list.push({ url: pod }) + }) + + return pods_list + } + + function makeRequestsToWinningPods (cert, pods_list) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + // Get the list of our videos to send to our new friends + Videos.listOwned(function (err, videos_list) { + if (err) throw err + + var data = { + url: http + '://' + host + ':' + port, + publicKey: cert, + videos: videos_list + } + + utils.makeMultipleRetryRequest( + { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, + + pods_list, + + function eachRequest (err, response, body, url, pod, callback_each_request) { + // We add the pod if it responded correctly with its public certificate + if (!err && response.statusCode === 200) { + Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { + if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) + + Videos.addRemotes(body.videos, function (err) { + if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) + + logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) + return callback_each_request() + }) + }) + } else { + logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) + return callback_each_request() + } + }, + + function endRequests (err) { + // Now we made new friends, we can re activate the pool of requests + poolRequests.activate() + + if (err) { + logger.error('There was some errors when we wanted to make friends.', { error: err }) + return callback(err) + } + + logger.debug('makeRequestsToWinningPods finished.') + return callback(null) + } + ) + }) + } + } + + function quitFriends (callback) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + Pods.list(function (err, pods) { + if (err) return callback(err) + + var request = { + method: 'POST', + path: '/api/' + constants.API_VERSION + '/pods/remove', + sign: true, + encrypt: true, + data: { + url: 'me' // Fake data + } + } + + // Announce we quit them + utils.makeMultipleRetryRequest(request, pods, function () { + Pods.removeAll(function (err) { + poolRequests.activate() + + if (err) return callback(err) + + logger.info('Broke friends, so sad :(') + + Videos.removeAllRemotes(function (err) { + if (err) return callback(err) + + logger.info('Removed all remote videos.') + callback(null) + }) + }) + }) + }) + } + + function removeVideoToFriends (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + PoolRequests.addRequest(id, 'remove', video) + } + + // --------------------------------------------------------------------------- + + module.exports = pods + + // --------------------------------------------------------------------------- + + function getForeignPodsList (url, callback) { + var path = '/api/' + constants.API_VERSION + '/pods' + + request.get(url + path, function (err, response, body) { + if (err) throw err + callback(JSON.parse(body)) + }) + } +})() diff --git a/lib/poolRequests.js b/lib/poolRequests.js index 53f47d629..796f06149 100644 --- a/lib/poolRequests.js +++ b/lib/poolRequests.js @@ -5,18 +5,16 @@ var pluck = require('lodash-node/compat/collection/pluck') var constants = require('../initializers/constants') - var database = require('../initializers/database') var logger = require('../helpers/logger') - var PodsDB = database.PodsDB - var PoolRequestsDB = database.PoolRequestsDB + var Pods = require('../models/pods') + var PoolRequests = require('../models/poolRequests') var utils = require('../helpers/utils') - var VideosDB = database.VideosDB + var Videos = require('../models/videos') var timer = null var poolRequests = { activate: activate, - addToPoolRequests: addToPoolRequests, deactivate: deactivate, forceSend: forceSend } @@ -36,30 +34,6 @@ timer = setInterval(makePoolRequests, constants.INTERVAL) } - function addToPoolRequests (id, type, request) { - logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) - - PoolRequestsDB.findOne({ id: id }, function (err, entity) { - if (err) logger.error(err) - - if (entity) { - if (entity.type === type) { - logger.error(new Error('Cannot insert two same requests.')) - return - } - - // Remove the request of the other type - PoolRequestsDB.remove({ id: id }, function (err) { - if (err) logger.error(err) - }) - } else { - PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { - if (err) logger.error(err) - }) - } - }) - } - // --------------------------------------------------------------------------- module.exports = poolRequests @@ -69,7 +43,7 @@ function makePoolRequest (type, requests, callback) { if (!callback) callback = function () {} - PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) { + Pods.list(function (err, pods) { if (err) throw err var params = { @@ -116,7 +90,7 @@ function makePoolRequests () { logger.info('Making pool requests to friends.') - PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) { + PoolRequests.list(function (err, pool_requests) { if (err) throw err if (pool_requests.length === 0) return @@ -150,7 +124,7 @@ makePoolRequest('add', requests.add.requests, function (err) { if (err) logger.error('Errors when sent add pool requests.', { error: err }) - removePoolRequestsFromDB(requests.add.ids) + PoolRequests.removeRequests(requests.add.ids) }) } @@ -159,7 +133,7 @@ makePoolRequest('remove', requests.remove.requests, function (err) { if (err) logger.error('Errors when sent remove pool requests.', { error: err }) - removePoolRequestsFromDB(requests.remove.ids) + PoolRequests.removeRequests(requests.remove.ids) }) } }) @@ -167,7 +141,7 @@ } function removeBadPods () { - PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) { + Pods.findBadPods(function (err, pods) { if (err) throw err if (pods.length === 0) return @@ -175,12 +149,12 @@ var urls = pluck(pods, 'url') var ids = pluck(pods, '_id') - VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) { + Videos.removeAllRemotesOf(urls, function (err, r) { if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err }) var videos_removed = r.result.n logger.info('Removed %d videos.', videos_removed) - PodsDB.remove({ _id: { $in: ids } }, function (err, r) { + Pods.removeAllByIds(ids, function (err, r) { if (err) logger.error('Cannot remove bad pods.', { error: err }) var pods_removed = r.result.n @@ -190,22 +164,11 @@ }) } - function removePoolRequestsFromDB (ids) { - PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) { - if (err) { - logger.error('Cannot remove requests from the pool requests database.', { error: err }) - return - } - - logger.info('Pool requests flushed.') - }) - } - function updatePodsScore (good_pods, bad_pods) { logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) - PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec() - PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) { + Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS) + Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { if (err) throw err removeBadPods() }) diff --git a/lib/videos.js b/lib/videos.js new file mode 100644 index 000000000..5d23070a7 --- /dev/null +++ b/lib/videos.js @@ -0,0 +1,51 @@ +;(function () { + 'use strict' + + var async = require('async') + var config = require('config') + var webtorrent = require('../lib/webTorrentNode') + + var logger = require('../helpers/logger') + var Videos = require('../models/videos') + + var uploadDir = __dirname + '/../' + config.get('storage.uploads') + + var videos = { + seed: seed, + seedAllExisting: seedAllExisting + } + + function seed (path, callback) { + logger.info('Seeding %s...', path) + + webtorrent.seed(path, function (torrent) { + logger.info('%s seeded (%s).', path, torrent.magnetURI) + + return callback(null, torrent) + }) + } + + function seedAllExisting (callback) { + Videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos to seed.', { error: err }) + return callback(err) + } + + async.each(videos_list, function (video, each_callback) { + seed(uploadDir + video.namePath, function (err) { + if (err) { + logger.error('Cannot seed this video.', { error: err }) + return callback(err) + } + + each_callback(null) + }) + }, callback) + }) + } + + // --------------------------------------------------------------------------- + + module.exports = videos +})() diff --git a/lib/webtorrent.js b/lib/webtorrent.js index 41e60499f..d1ca3c9f2 100644 --- a/lib/webtorrent.js +++ b/lib/webtorrent.js @@ -62,7 +62,7 @@ try { wt.remove(magnetUri, callback) } catch (err) { - console.log('Cannot remove the torrent from WebTorrent', { err: err }) + console.log('Cannot remove the torrent from WebTorrent') return callback(null) } diff --git a/middlewares/misc.js b/middlewares/misc.js index dbb604db3..cc4e2e8a4 100644 --- a/middlewares/misc.js +++ b/middlewares/misc.js @@ -5,7 +5,7 @@ var ursa = require('ursa') var logger = require('../helpers/logger') - var PodsDB = require('../initializers/database').PodsDB + var Pods = require('../models/pods') var utils = require('../helpers/utils') var miscMiddleware = { @@ -28,18 +28,19 @@ } function decryptBody (req, res, next) { - PodsDB.findOne({ url: req.body.signature.url }, function (err, pod) { + var url = req.body.signature.url + Pods.findByUrl(url, function (err, pod) { if (err) { logger.error('Cannot get signed url in decryptBody.', { error: err }) return res.sendStatus(500) } if (pod === null) { - logger.error('Unknown pod %s.', req.body.signature.url) + logger.error('Unknown pod %s.', url) return res.sendStatus(403) } - logger.debug('Decrypting body from %s.', req.body.signature.url) + logger.debug('Decrypting body from %s.', url) var crt = ursa.createPublicKey(pod.publicKey) var signature_ok = crt.hashAndVerify('sha256', new Buffer(req.body.signature.url).toString('hex'), req.body.signature.signature, 'hex') diff --git a/middlewares/reqValidators/pods.js b/middlewares/reqValidators/pods.js index 6ccfd7361..499cafd8f 100644 --- a/middlewares/reqValidators/pods.js +++ b/middlewares/reqValidators/pods.js @@ -2,12 +2,27 @@ 'use strict' var checkErrors = require('./utils').checkErrors + var friends = require('../../lib/friends') var logger = require('../../helpers/logger') var reqValidatorsPod = { + makeFriends: makeFriends, podsAdd: podsAdd } + function makeFriends (req, res, next) { + friends.hasFriends(function (err, has_friends) { + if (err) return next(err) + + if (has_friends === true) { + // We need to quit our friends before make new ones + res.sendStatus(409) + } else { + next() + } + }) + } + function podsAdd (req, res, next) { req.checkBody('data.url', 'Should have an url').notEmpty().isURL({ require_protocol: true }) req.checkBody('data.publicKey', 'Should have a public key').notEmpty() diff --git a/middlewares/reqValidators/videos.js b/middlewares/reqValidators/videos.js index 3479c47c3..f7bd24658 100644 --- a/middlewares/reqValidators/videos.js +++ b/middlewares/reqValidators/videos.js @@ -3,7 +3,7 @@ var checkErrors = require('./utils').checkErrors var logger = require('../../helpers/logger') - var VideosDB = require('../../initializers/database').VideosDB + var Videos = require('../../models/videos') var reqValidatorsVideos = { videosAdd: videosAdd, @@ -29,8 +29,13 @@ logger.debug('Checking videosGet parameters', { parameters: req.params }) checkErrors(req, res, function () { - findVideoById(req.params.id, function (video) { - if (!video) return res.status(404).send('Video not found') + Videos.getVideoState(req.params.id, function (err, state) { + if (err) { + logger.error('Error in videosGet request validator.', { error: err }) + res.sendStatus(500) + } + + if (state.exist === false) return res.status(404).send('Video not found') next() }) @@ -43,9 +48,14 @@ logger.debug('Checking videosRemove parameters', { parameters: req.params }) checkErrors(req, res, function () { - findVideoById(req.params.id, function (video) { - if (!video) return res.status(404).send('Video not found') - else if (video.namePath === null) return res.status(403).send('Cannot remove video of another pod') + Videos.getVideoState(req.params.id, function (err, state) { + if (err) { + logger.error('Error in videosRemove request validator.', { error: err }) + res.sendStatus(500) + } + + if (state.exist === false) return res.status(404).send('Video not found') + else if (state.owned === false) return res.status(403).send('Cannot remove video of another pod') next() }) @@ -63,14 +73,4 @@ // --------------------------------------------------------------------------- module.exports = reqValidatorsVideos - - // --------------------------------------------------------------------------- - - function findVideoById (id, callback) { - VideosDB.findById(id, { _id: 1, namePath: 1 }).limit(1).exec(function (err, video) { - if (err) throw err - - callback(video) - }) - } })() diff --git a/models/pods.js b/models/pods.js index ed2f0d8ee..395b1e0b1 100644 --- a/models/pods.js +++ b/models/pods.js @@ -1,73 +1,61 @@ ;(function () { 'use strict' - var async = require('async') - var config = require('config') - var fs = require('fs') - var request = require('request') + var mongoose = require('mongoose') var constants = require('../initializers/constants') var logger = require('../helpers/logger') - var PodsDB = require('../initializers/database').PodsDB - var poolRequests = require('../lib/poolRequests') - var utils = require('../helpers/utils') - var http = config.get('webserver.https') ? 'https' : 'http' - var host = config.get('webserver.host') - var port = config.get('webserver.port') + // --------------------------------------------------------------------------- + + var podsSchema = mongoose.Schema({ + url: String, + publicKey: String, + score: { type: Number, max: constants.FRIEND_BASE_SCORE } + }) + var PodsDB = mongoose.model('pods', podsSchema) - var pods = { + // --------------------------------------------------------------------------- + + var Pods = { add: add, - addVideoToFriends: addVideoToFriends, + count: count, + findByUrl: findByUrl, + findBadPods: findBadPods, + incrementScores: incrementScores, list: list, - hasFriends: hasFriends, - makeFriends: makeFriends, - quitFriends: quitFriends, remove: remove, - removeVideoToFriends + removeAll: removeAll, + removeAllByIds: removeAllByIds } // TODO: check if the pod is not already a friend function add (data, callback) { - var videos = require('./videos') - logger.info('Adding pod: %s', data.url) - + if (!callback) callback = function () {} var params = { url: data.url, publicKey: data.publicKey, score: constants.FRIEND_BASE_SCORE } - PodsDB.create(params, function (err, pod) { - if (err) { - logger.error('Cannot insert the pod.', { error: err }) - return callback(err) - } - - videos.addRemotes(data.videos) + PodsDB.create(params, callback) + } - fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { - if (err) { - logger.error('Cannot read cert file.', { error: err }) - return callback(err) - } + function count (callback) { + return PodsDB.count(callback) + } - videos.listOwned(function (err, videos_list) { - if (err) { - logger.error('Cannot get the list of owned videos.', { error: err }) - return callback(err) - } + function findBadPods (callback) { + PodsDB.find({ score: 0 }, callback) + } - return callback(null, { cert: cert, videos: videos_list }) - }) - }) - }) + function findByUrl (url, callback) { + PodsDB.findOne({ url: url }, callback) } - function addVideoToFriends (video) { - // To avoid duplicates - var id = video.name + video.magnetUri - poolRequests.addToPoolRequests(id, 'add', video) + function incrementScores (ids, value, callback) { + if (!callback) callback = function () {} + PodsDB.update({ _id: { $in: ids } }, { $inc: { score: value } }, { multi: true }, callback) } function list (callback) { @@ -81,202 +69,22 @@ }) } - function hasFriends (callback) { - PodsDB.count(function (err, count) { - if (err) return callback(err) - - var has_friends = (count !== 0) - callback(null, has_friends) - }) - } - - function makeFriends (callback) { - var videos = require('./videos') - var pods_score = {} - - logger.info('Make friends!') - fs.readFile(utils.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { - if (err) { - logger.error('Cannot read public cert.', { error: err }) - return callback(err) - } - - var urls = config.get('network.friends') - - async.each(urls, computeForeignPodsList, function () { - logger.debug('Pods scores computed.', { pods_score: pods_score }) - var pods_list = computeWinningPods(urls, pods_score) - logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list }) - - makeRequestsToWinningPods(cert, pods_list) - }) - }) - - // ----------------------------------------------------------------------- - - function computeForeignPodsList (url, callback) { - // Let's give 1 point to the pod we ask the friends list - pods_score[url] = 1 - - getForeignPodsList(url, function (foreign_pods_list) { - if (foreign_pods_list.length === 0) return callback() - - async.each(foreign_pods_list, function (foreign_pod, callback_each) { - var foreign_url = foreign_pod.url - - if (pods_score[foreign_url]) pods_score[foreign_url]++ - else pods_score[foreign_url] = 1 - - callback_each() - }, function () { - callback() - }) - }) - } - - function computeWinningPods (urls, pods_score) { - // Build the list of pods to add - // Only add a pod if it exists in more than a half base pods - var pods_list = [] - var base_score = urls.length / 2 - Object.keys(pods_score).forEach(function (pod) { - if (pods_score[pod] > base_score) pods_list.push({ url: pod }) - }) - - return pods_list - } - - function makeRequestsToWinningPods (cert, pods_list) { - // Stop pool requests - poolRequests.deactivate() - // Flush pool requests - poolRequests.forceSend() - - // Get the list of our videos to send to our new friends - videos.listOwned(function (err, videos_list) { - if (err) throw err - - var data = { - url: http + '://' + host + ':' + port, - publicKey: cert, - videos: videos_list - } - - utils.makeMultipleRetryRequest( - { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, - - pods_list, - - function eachRequest (err, response, body, url, pod, callback_each_request) { - // We add the pod if it responded correctly with its public certificate - if (!err && response.statusCode === 200) { - add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { - if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) - - videos.addRemotes(body.videos, function (err) { - if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) - - logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) - return callback_each_request() - }) - }) - } else { - logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) - return callback_each_request() - } - }, - - function endRequests (err) { - // Now we made new friends, we can re activate the pool of requests - poolRequests.activate() - - if (err) { - logger.error('There was some errors when we wanted to make friends.', { error: err }) - return callback(err) - } - - logger.debug('makeRequestsToWinningPods finished.') - return callback(null) - } - ) - }) - } - } - - function quitFriends (callback) { - // Stop pool requests - poolRequests.deactivate() - // Flush pool requests - poolRequests.forceSend() - - PodsDB.find(function (err, pods) { - if (err) return callback(err) - - var request = { - method: 'POST', - path: '/api/' + constants.API_VERSION + '/pods/remove', - sign: true, - encrypt: true, - data: { - url: 'me' // Fake data - } - } - - // Announce we quit them - utils.makeMultipleRetryRequest(request, pods, function () { - PodsDB.remove(function (err) { - poolRequests.activate() - - if (err) return callback(err) - - logger.info('Broke friends, so sad :(') - - var videos = require('./videos') - videos.removeAllRemotes(function (err) { - if (err) return callback(err) - - logger.info('Removed all remote videos.') - callback(null) - }) - }) - }) - }) - } - function remove (url, callback) { - var videos = require('./videos') - logger.info('Removing %s pod.', url) - - videos.removeAllRemotesOf(url, function (err) { - if (err) logger.error('Cannot remove all remote videos of %s.', url) - - PodsDB.remove({ url: url }, function (err) { - if (err) return callback(err) - - logger.info('%s pod removed.', url) - callback(null) - }) - }) + if (!callback) callback = function () {} + PodsDB.remove({ url: url }, callback) } - function removeVideoToFriends (video) { - // To avoid duplicates - var id = video.name + video.magnetUri - poolRequests.addToPoolRequests(id, 'remove', video) + function removeAll (callback) { + if (!callback) callback = function () {} + PodsDB.remove(callback) } - // --------------------------------------------------------------------------- - - module.exports = pods + function removeAllByIds (ids, callback) { + if (!callback) callback = function () {} + PodsDB.remove({ _id: { $in: ids } }, callback) + } // --------------------------------------------------------------------------- - function getForeignPodsList (url, callback) { - var path = '/api/' + constants.API_VERSION + '/pods' - - request.get(url + path, function (err, response, body) { - if (err) throw err - callback(JSON.parse(body)) - }) - } + module.exports = Pods })() diff --git a/models/poolRequests.js b/models/poolRequests.js new file mode 100644 index 000000000..0f488ef04 --- /dev/null +++ b/models/poolRequests.js @@ -0,0 +1,67 @@ +;(function () { + 'use strict' + + var mongoose = require('mongoose') + + var logger = require('../helpers/logger') + + // --------------------------------------------------------------------------- + + var poolRequestsSchema = mongoose.Schema({ + type: String, + id: String, // Special id to find duplicates (video created we want to remove...) + request: mongoose.Schema.Types.Mixed + }) + var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema) + + // --------------------------------------------------------------------------- + + var PoolRequests = { + addRequest: addRequest, + list: list, + removeRequests: removeRequests + } + + function addRequest (id, type, request) { + logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) + + PoolRequestsDB.findOne({ id: id }, function (err, entity) { + if (err) logger.error(err) + + if (entity) { + if (entity.type === type) { + logger.error(new Error('Cannot insert two same requests.')) + return + } + + // Remove the request of the other type + PoolRequestsDB.remove({ id: id }, function (err) { + if (err) logger.error(err) + }) + } else { + PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) { + if (err) logger.error(err) + }) + } + }) + } + + function list (callback) { + PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback) + } + + function removeRequests (ids) { + PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) { + if (err) { + logger.error('Cannot remove requests from the pool requests database.', { error: err }) + return + } + + logger.info('Pool requests flushed.') + }) + } + + // --------------------------------------------------------------------------- + + module.exports = PoolRequests +})() diff --git a/models/videos.js b/models/videos.js index 5711c5657..10abee6e7 100644 --- a/models/videos.js +++ b/models/videos.js @@ -5,71 +5,62 @@ var config = require('config') var dz = require('dezalgo') var fs = require('fs') - var webtorrent = require('../lib/webTorrentNode') + var mongoose = require('mongoose') var logger = require('../helpers/logger') - var pods = require('./pods') - var VideosDB = require('../initializers/database').VideosDB var http = config.get('webserver.https') === true ? 'https' : 'http' var host = config.get('webserver.host') var port = config.get('webserver.port') + var uploadDir = __dirname + '/../' + config.get('storage.uploads') + + // --------------------------------------------------------------------------- + + var videosSchema = mongoose.Schema({ + name: String, + namePath: String, + description: String, + magnetUri: String, + podUrl: String + }) + var VideosDB = mongoose.model('videos', videosSchema) + + // --------------------------------------------------------------------------- - var videos = { + var Videos = { add: add, addRemotes: addRemotes, get: get, + getVideoState: getVideoState, + isOwned: isOwned, list: list, listOwned: listOwned, - remove: remove, + removeOwned: removeOwned, removeAllRemotes: removeAllRemotes, removeAllRemotesOf: removeAllRemotesOf, - removeRemotes: removeRemotes, - search: search, - seedAll: seedAll, - uploadDir: uploadDir + removeRemotesOfByMagnetUris: removeRemotesOfByMagnetUris, + search: search } - // ----------- Public attributes ---------- - var uploadDir = __dirname + '/../' + config.get('storage.uploads') + function add (video, callback) { + logger.info('Adding %s video to database.', video.name) - function add (data, callback) { - var video_file = data.video - var video_data = data.data + var params = video + params.podUrl = http + '://' + host + ':' + port - logger.info('Adding %s video.', video_file.path) - seedVideo(video_file.path, function (err, torrent) { + VideosDB.create(params, function (err, video) { if (err) { - logger.error('Cannot seed this video.', { error: err }) + logger.error('Cannot insert this video into database.', { error: err }) return callback(err) } - var params = { - name: video_data.name, - namePath: video_file.filename, - description: video_data.description, - magnetUri: torrent.magnetURI, - podUrl: http + '://' + host + ':' + port - } - - VideosDB.create(params, function (err, video) { - if (err) { - logger.error('Cannot insert this video.', { error: err }) - return callback(err) - } - - // Now we'll add the video's meta data to our friends - params.namePath = null - - pods.addVideoToFriends(params) - callback(null) - }) + callback(null) }) } // TODO: avoid doublons function addRemotes (videos, callback) { - if (callback === undefined) callback = function () {} + if (!callback) callback = function () {} var to_add = [] @@ -111,6 +102,38 @@ }) } + function getVideoState (id, callback) { + get(id, function (err, video) { + if (err) return callback(err) + + var exist = (video !== null) + var owned = false + if (exist === true) { + owned = (video.namePath !== null) + } + + return callback(null, { exist: exist, owned: owned }) + }) + } + + function isOwned (id, callback) { + VideosDB.findById(id, function (err, video) { + if (err || !video) { + if (!err) err = new Error('Cannot find this video.') + logger.error('Cannot find this video.', { error: err }) + return callback(err) + } + + if (video.namePath === null) { + var error_string = 'Cannot remove the video of another pod.' + logger.error(error_string) + return callback(null, false, video) + } + + callback(null, true, video) + }) + } + function list (callback) { VideosDB.find(function (err, videos_list) { if (err) { @@ -134,76 +157,35 @@ }) } - function remove (id, callback) { - // Maybe the torrent is not seeded, but we catch the error to don't stop the removing process - function removeTorrent (magnetUri, callback) { - try { - webtorrent.remove(magnetUri, callback) - } catch (err) { - logger.warn('Cannot remove the torrent from WebTorrent', { err: err }) - return callback(null) - } - } - - VideosDB.findById(id, function (err, video) { - if (err || !video) { - if (!err) err = new Error('Cannot find this video.') - logger.error('Cannot find this video.', { error: err }) + function removeOwned (id, callback) { + VideosDB.findByIdAndRemove(id, function (err, video) { + if (err) { + logger.error('Cannot remove the torrent.', { error: err }) return callback(err) } - if (video.namePath === null) { - var error_string = 'Cannot remove the video of another pod.' - logger.error(error_string) - return callback(new Error(error_string)) - } - - logger.info('Removing %s video', video.name) - - removeTorrent(video.magnetUri, function () { - VideosDB.findByIdAndRemove(id, function (err) { - if (err) { - logger.error('Cannot remove the torrent.', { error: err }) - return callback(err) - } - - fs.unlink(uploadDir + video.namePath, function (err) { - if (err) { - logger.error('Cannot remove this video file.', { error: err }) - return callback(err) - } - - var params = { - name: video.name, - magnetUri: video.magnetUri - } + fs.unlink(uploadDir + video.namePath, function (err) { + if (err) { + logger.error('Cannot remove this video file.', { error: err }) + return callback(err) + } - pods.removeVideoToFriends(params) - callback(null) - }) - }) + callback(null) }) }) } function removeAllRemotes (callback) { - VideosDB.remove({ namePath: null }, function (err) { - if (err) return callback(err) - - callback(null) - }) + VideosDB.remove({ namePath: null }, callback) } function removeAllRemotesOf (fromUrl, callback) { - VideosDB.remove({ podUrl: fromUrl }, function (err) { - if (err) return callback(err) - - callback(null) - }) + // TODO { podUrl: { $in: urls } } + VideosDB.remove({ podUrl: fromUrl }, callback) } // Use the magnet Uri because the _id field is not the same on different servers - function removeRemotes (fromUrl, magnetUris, callback) { + function removeRemotesOfByMagnetUris (fromUrl, magnetUris, callback) { if (callback === undefined) callback = function () {} VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { @@ -248,39 +230,7 @@ }) } - function seedAll (callback) { - VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) { - if (err) { - logger.error('Cannot get list of the videos to seed.', { error: err }) - return callback(err) - } - - async.each(videos_list, function (video, each_callback) { - seedVideo(uploadDir + video.namePath, function (err) { - if (err) { - logger.error('Cannot seed this video.', { error: err }) - return callback(err) - } - - each_callback(null) - }) - }, callback) - }) - } - // --------------------------------------------------------------------------- - module.exports = videos - - // --------------------------------------------------------------------------- - - function seedVideo (path, callback) { - logger.info('Seeding %s...', path) - - webtorrent.seed(path, function (torrent) { - logger.info('%s seeded (%s).', path, torrent.magnetURI) - - return callback(null, torrent) - }) - } + module.exports = Videos })() diff --git a/server.js b/server.js index 1e0222f4f..1153c9ef6 100644 --- a/server.js +++ b/server.js @@ -30,16 +30,20 @@ var config = require('config') var constants = require('./initializers/constants') var customValidators = require('./helpers/customValidators') + var database = require('./initializers/database') var logger = require('./helpers/logger') var poolRequests = require('./lib/poolRequests') var routes = require('./controllers') var utils = require('./helpers/utils') - var videos = require('./models/videos') + var videos = require('./lib/videos') var webtorrent = require('./lib/webTorrentNode') // Get configurations var port = config.get('listen.port') + // ----------- Database ----------- + database.connect() + // ----------- Command line ----------- // ----------- App ----------- @@ -153,7 +157,7 @@ // Activate the pool requests poolRequests.activate() - videos.seedAll(function () { + videos.seedAllExisting(function () { logger.info('Seeded all the videos') logger.info('Server listening on port %d', port) app.emit('ready') -- 2.41.0