const each = require('async/each')
const eachLimit = require('async/eachLimit')
const eachSeries = require('async/eachSeries')
-const fs = require('fs')
-const mongoose = require('mongoose')
const request = require('request')
const waterfall = require('async/waterfall')
const constants = require('../initializers/constants')
+const db = require('../initializers/database')
const logger = require('../helpers/logger')
+const peertubeCrypto = require('../helpers/peertube-crypto')
const requests = require('../helpers/requests')
+const utils = require('../helpers/utils')
+const RequestScheduler = require('./request-scheduler')
+const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
+const RequestVideoEventScheduler = require('./request-video-event-scheduler')
-const Pod = mongoose.model('Pod')
-const Request = mongoose.model('Request')
-const Video = mongoose.model('Video')
+const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
+
+const requestScheduler = new RequestScheduler()
+const requestSchedulerVideoQadu = new RequestVideoQaduScheduler()
+const requestSchedulerVideoEvent = new RequestVideoEventScheduler()
const friends = {
+ activate,
addVideoToFriends,
+ updateVideoToFriends,
+ reportAbuseVideoToFriend,
+ quickAndDirtyUpdateVideoToFriends,
+ addEventToRemoteVideo,
hasFriends,
- getMyCertificate,
makeFriends,
quitFriends,
removeVideoToFriends,
sendOwnedVideosToPod
}
-function addVideoToFriends (video) {
- createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
+function activate () {
+ requestScheduler.activate()
+ requestSchedulerVideoQadu.activate()
+ requestSchedulerVideoEvent.activate()
+}
+
+function addVideoToFriends (videoData, transaction, callback) {
+ const options = {
+ type: ENDPOINT_ACTIONS.ADD,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoData,
+ transaction
+ }
+ createRequest(options, callback)
+}
+
+function updateVideoToFriends (videoData, transaction, callback) {
+ const options = {
+ type: ENDPOINT_ACTIONS.UPDATE,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoData,
+ transaction
+ }
+ createRequest(options, callback)
+}
+
+function removeVideoToFriends (videoParams) {
+ const options = {
+ type: ENDPOINT_ACTIONS.REMOVE,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoParams
+ }
+ createRequest(options)
+}
+
+function reportAbuseVideoToFriend (reportData, video) {
+ const options = {
+ type: ENDPOINT_ACTIONS.REPORT_ABUSE,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: reportData,
+ toIds: [ video.Author.podId ]
+ }
+ createRequest(options)
+}
+
+function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
+ const options = {
+ videoId,
+ type,
+ transaction
+ }
+ return createVideoQaduRequest(options, callback)
+}
+
+function addEventToRemoteVideo (videoId, type, transaction, callback) {
+ const options = {
+ videoId,
+ type,
+ transaction
+ }
+ createVideoEventRequest(options, callback)
}
function hasFriends (callback) {
- Pod.countAll(function (err, count) {
+ db.Pod.countAll(function (err, count) {
if (err) return callback(err)
const hasFriends = (count !== 0)
})
}
-function getMyCertificate (callback) {
- fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
-}
-
function makeFriends (hosts, callback) {
const podsScore = {}
logger.info('Make friends!')
- getMyCertificate(function (err, cert) {
+ peertubeCrypto.getMyPublicCert(function (err, cert) {
if (err) {
logger.error('Cannot read public cert.')
return callback(err)
function quitFriends (callback) {
// Stop pool requests
- Request.deactivate()
- // Flush pool requests
- Request.flush()
+ requestScheduler.deactivate()
waterfall([
+ function flushRequests (callbackAsync) {
+ requestScheduler.flush(err => callbackAsync(err))
+ },
+
+ function flushVideoQaduRequests (callbackAsync) {
+ requestSchedulerVideoQadu.flush(err => callbackAsync(err))
+ },
+
function getPodsList (callbackAsync) {
- return Pod.list(callbackAsync)
+ return db.Pod.list(callbackAsync)
},
function announceIQuitMyFriends (pods, callbackAsync) {
function removePodsFromDB (pods, callbackAsync) {
each(pods, function (pod, callbackEach) {
- pod.remove(callbackEach)
+ pod.destroy().asCallback(callbackEach)
}, callbackAsync)
}
], function (err) {
// Don't forget to re activate the scheduler, even if there was an error
- Request.activate()
+ requestScheduler.activate()
if (err) return callback(err)
})
}
-function removeVideoToFriends (videoParams) {
- createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
-}
-
function sendOwnedVideosToPod (podId) {
- Video.listOwned(function (err, videosList) {
+ db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
if (err) {
logger.error('Cannot get the list of videos we own.')
return
}
videosList.forEach(function (video) {
- video.toRemoteJSON(function (err, remoteVideo) {
+ video.toAddRemoteJSON(function (err, remoteVideo) {
if (err) {
logger.error('Cannot convert video to remote.', { error: err })
// Don't break the process
return
}
- createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
+ const options = {
+ type: 'add',
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: remoteVideo,
+ toIds: [ podId ]
+ }
+ createRequest(options)
})
})
})
// ---------------------------------------------------------------------------
function computeForeignPodsList (host, podsScore, callback) {
- getForeignPodsList(host, function (err, foreignPodsList) {
+ getForeignPodsList(host, function (err, res) {
if (err) return callback(err)
- if (!foreignPodsList) foreignPodsList = []
+ const foreignPodsList = res.data
// Let's give 1 point to the pod we ask the friends list
foreignPodsList.push({ host })
else podsScore[foreignPodHost] = 1
})
- callback()
+ return callback()
})
}
// Only add a pod if it exists in more than a half base pods
const podsList = []
const baseScore = hosts.length / 2
+
Object.keys(podsScore).forEach(function (podHost) {
// If the pod is not me and with a good score we add it
if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
function makeRequestsToWinningPods (cert, podsList, callback) {
// Stop pool requests
- Request.deactivate()
+ requestScheduler.deactivate()
// Flush pool requests
- Request.forceSend()
+ requestScheduler.forceSend()
eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
const params = {
method: 'POST',
json: {
host: constants.CONFIG.WEBSERVER.HOST,
+ email: constants.CONFIG.ADMIN.EMAIL,
publicKey: cert
}
}
}
if (res.statusCode === 200) {
- const podObj = new Pod({ host: pod.host, publicKey: body.cert })
- podObj.save(function (err, podCreated) {
+ const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
+ podObj.save().asCallback(function (err, podCreated) {
if (err) {
logger.error('Cannot add friend %s pod.', pod.host, { error: err })
return callbackEach()
}
// Add our videos to the request scheduler
- sendOwnedVideosToPod(podCreated._id)
+ sendOwnedVideosToPod(podCreated.id)
return callbackEach()
})
}, function endRequests () {
// Final callback, we've ended all the requests
// Now we made new friends, we can re activate the pool of requests
- Request.activate()
+ requestScheduler.activate()
logger.debug('makeRequestsToWinningPods finished.')
return callback()
})
}
-function createRequest (type, endpoint, data, to) {
- const req = new Request({
- endpoint,
- request: {
- type: type,
- data: data
+// Wrapper that populate "toIds" argument with all our friends if it is not specified
+// { type, endpoint, data, toIds, transaction }
+function createRequest (options, callback) {
+ if (!callback) callback = function () {}
+ if (options.toIds) return requestScheduler.createRequest(options, callback)
+
+ // If the "toIds" pods is not specified, we send the request to all our friends
+ db.Pod.listAllIds(options.transaction, function (err, podIds) {
+ if (err) {
+ logger.error('Cannot get pod ids', { error: err })
+ return
}
+
+ const newOptions = Object.assign(options, { toIds: podIds })
+ return requestScheduler.createRequest(newOptions, callback)
})
+}
- if (to) {
- req.to = to
- }
+function createVideoQaduRequest (options, callback) {
+ if (!callback) callback = utils.createEmptyCallback()
- req.save(function (err) {
- if (err) logger.error('Cannot save the request.', { error: err })
- })
+ requestSchedulerVideoQadu.createRequest(options, callback)
+}
+
+function createVideoEventRequest (options, callback) {
+ if (!callback) callback = utils.createEmptyCallback()
+
+ requestSchedulerVideoEvent.createRequest(options, callback)
}
function isMe (host) {