From cda021079ff455cc0fd0eb95a5395fa808ab63d1 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Sat, 30 Jan 2016 17:05:22 +0100 Subject: New directory organization --- models/pods.js | 274 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ models/videos.js | 272 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 546 insertions(+) create mode 100644 models/pods.js create mode 100644 models/videos.js (limited to 'models') diff --git a/models/pods.js b/models/pods.js new file mode 100644 index 000000000..c8d08b26f --- /dev/null +++ b/models/pods.js @@ -0,0 +1,274 @@ +;(function () { + 'use strict' + + var async = require('async') + var config = require('config') + var fs = require('fs') + var request = require('request') + + var constants = require('../initializers/constants') + var logger = require('../helpers/logger') + var PodsDB = require('../initializers/database').PodsDB + var poolRequests = require('../lib/poolRequests') + var utils = require('../helpers/utils') + + var pods = {} + + var http = config.get('webserver.https') ? 'https' : 'http' + var host = config.get('webserver.host') + var port = config.get('webserver.port') + + // ----------- Private functions ----------- + + function getForeignPodsList (url, callback) { + var path = '/api/' + constants.API_VERSION + '/pods' + + request.get(url + path, function (err, response, body) { + if (err) throw err + callback(JSON.parse(body)) + }) + } + + // ----------- Public functions ----------- + + pods.list = function (callback) { + PodsDB.find(function (err, pods_list) { + if (err) { + logger.error('Cannot get the list of the pods.', { error: err }) + return callback(err) + } + + return callback(null, pods_list) + }) + } + + // { url } + // TODO: check if the pod is not already a friend + pods.add = function (data, callback) { + var videos = require('./videos') + logger.info('Adding pod: %s', data.url) + + var params = { + url: data.url, + publicKey: data.publicKey, + score: constants.FRIEND_BASE_SCORE + } + + PodsDB.create(params, function (err, pod) { + if (err) { + logger.error('Cannot insert the pod.', { error: err }) + return callback(err) + } + + videos.addRemotes(data.videos) + + fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { + if (err) { + logger.error('Cannot read cert file.', { error: err }) + return callback(err) + } + + videos.listOwned(function (err, videos_list) { + if (err) { + logger.error('Cannot get the list of owned videos.', { error: err }) + return callback(err) + } + + return callback(null, { cert: cert, videos: videos_list }) + }) + }) + }) + } + + pods.remove = function (url, callback) { + var videos = require('./videos') + logger.info('Removing %s pod.', url) + + videos.removeAllRemotesOf(url, function (err) { + if (err) logger.error('Cannot remove all remote videos of %s.', url) + + PodsDB.remove({ url: url }, function (err) { + if (err) return callback(err) + + logger.info('%s pod removed.', url) + callback(null) + }) + }) + } + + pods.addVideoToFriends = function (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + poolRequests.addToPoolRequests(id, 'add', video) + } + + pods.removeVideoToFriends = function (video) { + // To avoid duplicates + var id = video.name + video.magnetUri + poolRequests.addToPoolRequests(id, 'remove', video) + } + + pods.makeFriends = function (callback) { + var videos = require('./videos') + var pods_score = {} + + logger.info('Make friends!') + fs.readFile(utils.certDir + 'peertube.pub', 'utf8', function (err, cert) { + if (err) { + logger.error('Cannot read public cert.', { error: err }) + return callback(err) + } + + var urls = config.get('network.friends') + + async.each(urls, computeForeignPodsList, function () { + logger.debug('Pods scores computed.', { pods_score: pods_score }) + var pods_list = computeWinningPods(urls, pods_score) + logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list }) + + makeRequestsToWinningPods(cert, pods_list) + }) + }) + + // ----------------------------------------------------------------------- + + function computeForeignPodsList (url, callback) { + // Let's give 1 point to the pod we ask the friends list + pods_score[url] = 1 + + getForeignPodsList(url, function (foreign_pods_list) { + if (foreign_pods_list.length === 0) return callback() + + async.each(foreign_pods_list, function (foreign_pod, callback_each) { + var foreign_url = foreign_pod.url + + if (pods_score[foreign_url]) pods_score[foreign_url]++ + else pods_score[foreign_url] = 1 + + callback_each() + }, function () { + callback() + }) + }) + } + + function computeWinningPods (urls, pods_score) { + // Build the list of pods to add + // Only add a pod if it exists in more than a half base pods + var pods_list = [] + var base_score = urls.length / 2 + Object.keys(pods_score).forEach(function (pod) { + if (pods_score[pod] > base_score) pods_list.push({ url: pod }) + }) + + return pods_list + } + + function makeRequestsToWinningPods (cert, pods_list) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + // Get the list of our videos to send to our new friends + videos.listOwned(function (err, videos_list) { + if (err) throw err + + var data = { + url: http + '://' + host + ':' + port, + publicKey: cert, + videos: videos_list + } + + utils.makeMultipleRetryRequest( + { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, + + pods_list, + + function eachRequest (err, response, body, url, pod, callback_each_request) { + // We add the pod if it responded correctly with its public certificate + if (!err && response.statusCode === 200) { + pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { + if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) + + videos.addRemotes(body.videos, function (err) { + if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) + + logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) + return callback_each_request() + }) + }) + } else { + logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) + return callback_each_request() + } + }, + + function endRequests (err) { + // Now we made new friends, we can re activate the pool of requests + poolRequests.activate() + + if (err) { + logger.error('There was some errors when we wanted to make friends.', { error: err }) + return callback(err) + } + + logger.debug('makeRequestsToWinningPods finished.') + return callback(null) + } + ) + }) + } + } + + pods.quitFriends = function (callback) { + // Stop pool requests + poolRequests.deactivate() + // Flush pool requests + poolRequests.forceSend() + + PodsDB.find(function (err, pods) { + if (err) return callback(err) + + var request = { + method: 'POST', + path: '/api/' + constants.API_VERSION + '/pods/remove', + sign: true, + encrypt: true, + data: { + url: 'me' // Fake data + } + } + + // Announce we quit them + utils.makeMultipleRetryRequest(request, pods, function () { + PodsDB.remove(function (err) { + poolRequests.activate() + + if (err) return callback(err) + + logger.info('Broke friends, so sad :(') + + var videos = require('./videos') + videos.removeAllRemotes(function (err) { + if (err) return callback(err) + + logger.info('Removed all remote videos.') + callback(null) + }) + }) + }) + }) + } + + pods.hasFriends = function (callback) { + PodsDB.count(function (err, count) { + if (err) return callback(err) + + var has_friends = (count !== 0) + callback(null, has_friends) + }) + } + + module.exports = pods +})() diff --git a/models/videos.js b/models/videos.js new file mode 100644 index 000000000..626c55819 --- /dev/null +++ b/models/videos.js @@ -0,0 +1,272 @@ +;(function () { + 'use strict' + + var async = require('async') + var config = require('config') + var dz = require('dezalgo') + var fs = require('fs') + var webtorrent = require('../lib/webTorrentNode') + + var logger = require('../helpers/logger') + var pods = require('./pods') + var VideosDB = require('../initializers/database').VideosDB + + var videos = {} + + var http = config.get('webserver.https') === true ? 'https' : 'http' + var host = config.get('webserver.host') + var port = config.get('webserver.port') + + // ----------- Private functions ----------- + function seedVideo (path, callback) { + logger.info('Seeding %s...', path) + + webtorrent.seed(path, function (torrent) { + logger.info('%s seeded (%s).', path, torrent.magnetURI) + + return callback(null, torrent) + }) + } + + // ----------- Public attributes ---------- + videos.uploadDir = __dirname + '/../' + config.get('storage.uploads') + + // ----------- Public functions ----------- + videos.list = function (callback) { + VideosDB.find(function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos.', { error: err }) + return callback(err) + } + + return callback(null, videos_list) + }) + } + + videos.listOwned = function (callback) { + // If namePath is not null this is *our* video + VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos.', { error: err }) + return callback(err) + } + + return callback(null, videos_list) + }) + } + + videos.add = function (data, callback) { + var video_file = data.video + var video_data = data.data + + logger.info('Adding %s video.', video_file.path) + seedVideo(video_file.path, function (err, torrent) { + if (err) { + logger.error('Cannot seed this video.', { error: err }) + return callback(err) + } + + var params = { + name: video_data.name, + namePath: video_file.filename, + description: video_data.description, + magnetUri: torrent.magnetURI, + podUrl: http + '://' + host + ':' + port + } + + VideosDB.create(params, function (err, video) { + if (err) { + logger.error('Cannot insert this video.', { error: err }) + return callback(err) + } + + // Now we'll add the video's meta data to our friends + params.namePath = null + + pods.addVideoToFriends(params) + callback(null) + }) + }) + } + + videos.remove = function (id, callback) { + // Maybe the torrent is not seeded, but we catch the error to don't stop the removing process + function removeTorrent (magnetUri, callback) { + try { + webtorrent.remove(magnetUri, callback) + } catch (err) { + logger.warn('Cannot remove the torrent from WebTorrent', { err: err }) + return callback(null) + } + } + + VideosDB.findById(id, function (err, video) { + if (err || !video) { + if (!err) err = new Error('Cannot find this video.') + logger.error('Cannot find this video.', { error: err }) + return callback(err) + } + + if (video.namePath === null) { + var error_string = 'Cannot remove the video of another pod.' + logger.error(error_string) + return callback(new Error(error_string)) + } + + logger.info('Removing %s video', video.name) + + removeTorrent(video.magnetUri, function () { + VideosDB.findByIdAndRemove(id, function (err) { + if (err) { + logger.error('Cannot remove the torrent.', { error: err }) + return callback(err) + } + + fs.unlink(videos.uploadDir + video.namePath, function (err) { + if (err) { + logger.error('Cannot remove this video file.', { error: err }) + return callback(err) + } + + var params = { + name: video.name, + magnetUri: video.magnetUri + } + + pods.removeVideoToFriends(params) + callback(null) + }) + }) + }) + }) + } + + // Use the magnet Uri because the _id field is not the same on different servers + videos.removeRemotes = function (fromUrl, magnetUris, callback) { + if (callback === undefined) callback = function () {} + + VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) { + if (err || !videos) { + logger.error('Cannot find the torrent URI of these remote videos.') + return callback(err) + } + + var to_remove = [] + async.each(videos, function (video, callback_async) { + callback_async = dz(callback_async) + + if (video.podUrl !== fromUrl) { + logger.error('The pod %s has not the rights on the video of %s.', fromUrl, video.podUrl) + } else { + to_remove.push(video._id) + } + + callback_async() + }, function () { + VideosDB.remove({ _id: { $in: to_remove } }, function (err) { + if (err) { + logger.error('Cannot remove the remote videos.') + return callback(err) + } + + logger.info('Removed remote videos from %s.', fromUrl) + callback(null) + }) + }) + }) + } + + videos.removeAllRemotes = function (callback) { + VideosDB.remove({ namePath: null }, function (err) { + if (err) return callback(err) + + callback(null) + }) + } + + videos.removeAllRemotesOf = function (fromUrl, callback) { + VideosDB.remove({ podUrl: fromUrl }, function (err) { + if (err) return callback(err) + + callback(null) + }) + } + + // { name, magnetUri, podUrl } + // TODO: avoid doublons + videos.addRemotes = function (videos, callback) { + if (callback === undefined) callback = function () {} + + var to_add = [] + + async.each(videos, function (video, callback_each) { + callback_each = dz(callback_each) + logger.debug('Add remote video from pod: %s', video.podUrl) + + var params = { + name: video.name, + namePath: null, + description: video.description, + magnetUri: video.magnetUri, + podUrl: video.podUrl + } + + to_add.push(params) + + callback_each() + }, function () { + VideosDB.create(to_add, function (err, videos) { + if (err) { + logger.error('Cannot insert this remote video.', { error: err }) + return callback(err) + } + + return callback(null, videos) + }) + }) + } + + videos.get = function (id, callback) { + VideosDB.findById(id, function (err, video) { + if (err) { + logger.error('Cannot get this video.', { error: err }) + return callback(err) + } + + return callback(null, video) + }) + } + + videos.search = function (name, callback) { + VideosDB.find({ name: new RegExp(name) }, function (err, videos) { + if (err) { + logger.error('Cannot search the videos.', { error: err }) + return callback(err) + } + + return callback(null, videos) + }) + } + + videos.seedAll = function (callback) { + VideosDB.find({ namePath: { $ne: null } }, function (err, videos_list) { + if (err) { + logger.error('Cannot get list of the videos to seed.', { error: err }) + return callback(err) + } + + async.each(videos_list, function (video, each_callback) { + seedVideo(videos.uploadDir + video.namePath, function (err) { + if (err) { + logger.error('Cannot seed this video.', { error: err }) + return callback(err) + } + + each_callback(null) + }) + }, callback) + }) + } + + module.exports = videos +})() -- cgit v1.2.3