]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - lib/poolRequests.js
Require on the top of the files
[github/Chocobozzz/PeerTube.git] / lib / poolRequests.js
index 9c7f3238bb03af059615433d2110dc3fae1d6814..f786c3c7a4c3657a657c0883f8d8035e88866762 100644 (file)
-;(function () {
-  'use strict'
-
-  var async = require('async')
-
-  var constants = require('../initializers/constants')
-  var logger = require('../helpers/logger')
-  var database = require('../initializers/database')
-  var pluck = require('lodash-node/compat/collection/pluck')
-  var PoolRequestsDB = database.PoolRequestsDB
-  var PodsDB = database.PodsDB
-  var utils = require('../helpers/utils')
-  var VideosDB = database.VideosDB
-
-  var poolRequests = {}
-
-  // ----------- Private -----------
-  var timer = null
-
-  function removePoolRequestsFromDB (ids) {
-    PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) {
-      if (err) {
-        logger.error('Cannot remove requests from the pool requests database.', { error: err })
-        return
+'use strict'
+
+var async = require('async')
+var pluck = require('lodash-node/compat/collection/pluck')
+
+var constants = require('../initializers/constants')
+var logger = require('../helpers/logger')
+var Pods = require('../models/pods')
+var PoolRequests = require('../models/poolRequests')
+var requests = require('../helpers/requests')
+var Videos = require('../models/videos')
+
+var timer = null
+
+var poolRequests = {
+  activate: activate,
+  addRequest: addRequest,
+  deactivate: deactivate,
+  forceSend: forceSend
+}
+
+function activate () {
+  logger.info('Pool requests activated.')
+  timer = setInterval(makePoolRequests, constants.INTERVAL)
+}
+
+function addRequest (id, type, request) {
+  logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
+
+  PoolRequests.findById(id, function (err, entity) {
+    if (err) {
+      logger.error('Cannot find one pool request.', { error: err })
+      return // Abort
+    }
+
+    if (entity) {
+      if (entity.type === type) {
+        logger.error('Cannot insert two same requests.')
+        return // Abort
       }
 
-      logger.info('Pool requests flushed.')
-    })
-  }
-
-  function makePoolRequests () {
-    logger.info('Making pool requests to friends.')
-
-    PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) {
-      if (err) throw err
-
-      if (pool_requests.length === 0) return
-
-      var requests = {
-        add: {
-          ids: [],
-          requests: []
-        },
-        remove: {
-          ids: [],
-          requests: []
-        }
-      }
-
-      async.each(pool_requests, function (pool_request, callback_each) {
-        if (pool_request.type === 'add') {
-          requests.add.requests.push(pool_request.request)
-          requests.add.ids.push(pool_request._id)
-        } else if (pool_request.type === 'remove') {
-          requests.remove.requests.push(pool_request.request)
-          requests.remove.ids.push(pool_request._id)
-        } else {
-          throw new Error('Unkown pool request type.')
-        }
-
-        callback_each()
-      }, function () {
-        // Send the add requests
-        if (requests.add.requests.length !== 0) {
-          makePoolRequest('add', requests.add.requests, function (err) {
-            if (err) logger.error('Errors when sent add pool requests.', { error: err })
-
-            removePoolRequestsFromDB(requests.add.ids)
-          })
-        }
-
-        // Send the remove requests
-        if (requests.remove.requests.length !== 0) {
-          makePoolRequest('remove', requests.remove.requests, function (err) {
-            if (err) logger.error('Errors when sent remove pool requests.', { error: err })
-
-            removePoolRequestsFromDB(requests.remove.ids)
-          })
+      // Remove the request of the other type
+      PoolRequests.removeRequestById(id, function (err) {
+        if (err) {
+          logger.error('Cannot remove a pool request.', { error: err })
+          return // Abort
         }
       })
-    })
-  }
+    } else {
+      PoolRequests.create(id, type, request, function (err) {
+        if (err) logger.error('Cannot create a pool request.', { error: err })
+        return // Abort
+      })
+    }
+  })
+}
 
-  function updatePodsScore (good_pods, bad_pods) {
-    logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
+function deactivate () {
+  logger.info('Pool requests deactivated.')
+  clearInterval(timer)
+}
 
-    PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec()
-    PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) {
-      if (err) throw err
-      removeBadPods()
-    })
-  }
+function forceSend () {
+  logger.info('Force pool requests sending.')
+  makePoolRequests()
+}
 
-  function removeBadPods () {
-    PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) {
-      if (err) throw err
+// ---------------------------------------------------------------------------
 
-      if (pods.length === 0) return
+module.exports = poolRequests
 
-      var urls = pluck(pods, 'url')
-      var ids = pluck(pods, '_id')
+// ---------------------------------------------------------------------------
 
-      VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) {
-        if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err })
-        var videos_removed = r.result.n
-        logger.info('Removed %d videos.', videos_removed)
+function makePoolRequest (type, requests_to_make, callback) {
+  if (!callback) callback = function () {}
 
-        PodsDB.remove({ _id: { $in: ids } }, function (err, r) {
-          if (err) logger.error('Cannot remove bad pods.', { error: err })
+  Pods.list(function (err, pods) {
+    if (err) return callback(err)
 
-          var pods_removed = r.result.n
-          logger.info('Removed %d pods.', pods_removed)
-        })
-      })
-    })
-  }
+    var params = {
+      encrypt: true,
+      sign: true,
+      method: 'POST',
+      path: null,
+      data: requests_to_make
+    }
 
-  function makePoolRequest (type, requests, callback) {
-    if (!callback) callback = function () {}
+    if (type === 'add') {
+      params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
+    } else if (type === 'remove') {
+      params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
+    } else {
+      return callback(new Error('Unkown pool request type.'))
+    }
 
-    PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
-      if (err) throw err
+    var bad_pods = []
+    var good_pods = []
 
-      var params = {
-        encrypt: true,
-        sign: true,
-        method: 'POST',
-        path: null,
-        data: requests
-      }
+    requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
 
-      if (type === 'add') {
-        params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
-      } else if (type === 'remove') {
-        params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
+    function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
+      if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
+        bad_pods.push(pod._id)
+        logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
       } else {
-        throw new Error('Unkown pool request type.')
+        good_pods.push(pod._id)
       }
 
-      var bad_pods = []
-      var good_pods = []
-
-      utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
+      return callback_each_pod_finished()
+    }
+
+    function callbackAllPodsFinished (err) {
+      if (err) return callback(err)
+
+      updatePodsScore(good_pods, bad_pods)
+      callback(null)
+    }
+  })
+}
+
+function makePoolRequests () {
+  logger.info('Making pool requests to friends.')
+
+  PoolRequests.list(function (err, pool_requests) {
+    if (err) {
+      logger.error('Cannot get the list of pool requests.', { err: err })
+      return // Abort
+    }
+
+    if (pool_requests.length === 0) return
+
+    var requests_to_make = {
+      add: {
+        ids: [],
+        requests: []
+      },
+      remove: {
+        ids: [],
+        requests: []
+      }
+    }
+
+    async.each(pool_requests, function (pool_request, callback_each) {
+      if (pool_request.type === 'add') {
+        requests_to_make.add.requests.push(pool_request.request)
+        requests_to_make.add.ids.push(pool_request._id)
+      } else if (pool_request.type === 'remove') {
+        requests_to_make.remove.requests.push(pool_request.request)
+        requests_to_make.remove.ids.push(pool_request._id)
+      } else {
+        logger.error('Unkown pool request type.', { request_type: pool_request.type })
+        return // abort
+      }
 
-      function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
-        if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
-          bad_pods.push(pod._id)
-          logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
-        } else {
-          good_pods.push(pod._id)
-        }
+      callback_each()
+    }, function () {
+      // Send the add requests
+      if (requests_to_make.add.requests.length !== 0) {
+        makePoolRequest('add', requests_to_make.add.requests, function (err) {
+          if (err) logger.error('Errors when sent add pool requests.', { error: err })
 
-        return callback_each_pod_finished()
+          PoolRequests.removeRequests(requests_to_make.add.ids)
+        })
       }
 
-      function callbackAllPodsFinished (err) {
-        if (err) return callback(err)
+      // Send the remove requests
+      if (requests_to_make.remove.requests.length !== 0) {
+        makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
+          if (err) logger.error('Errors when sent remove pool requests.', { error: err })
 
-        updatePodsScore(good_pods, bad_pods)
-        callback(null)
+          PoolRequests.removeRequests(requests_to_make.remove.ids)
+        })
       }
     })
-  }
+  })
+}
 
-  // ----------- Public -----------
-  poolRequests.activate = function () {
-    logger.info('Pool requests activated.')
-    timer = setInterval(makePoolRequests, constants.INTERVAL)
-  }
+function removeBadPods () {
+  Pods.findBadPods(function (err, pods) {
+    if (err) {
+      logger.error('Cannot find bad pods.', { error: err })
+      return // abort
+    }
 
-  poolRequests.addToPoolRequests = function (id, type, request) {
-    logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
+    if (pods.length === 0) return
 
-    PoolRequestsDB.findOne({ id: id }, function (err, entity) {
-      if (err) logger.error(err)
+    var urls = pluck(pods, 'url')
+    var ids = pluck(pods, '_id')
 
-      if (entity) {
-        if (entity.type === type) {
-          logger.error(new Error('Cannot insert two same requests.'))
-          return
-        }
-
-        // Remove the request of the other type
-        PoolRequestsDB.remove({ id: id }, function (err) {
-          if (err) logger.error(err)
-        })
+    Videos.removeAllRemotesOf(urls, function (err, r) {
+      if (err) {
+        logger.error('Cannot remove videos from a pod that we removing.', { error: err })
       } else {
-        PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
-          if (err) logger.error(err)
-        })
+        var videos_removed = r.result.n
+        logger.info('Removed %d videos.', videos_removed)
       }
+
+      Pods.removeAllByIds(ids, function (err, r) {
+        if (err) {
+          logger.error('Cannot remove bad pods.', { error: err })
+        } else {
+          var pods_removed = r.result.n
+          logger.info('Removed %d pods.', pods_removed)
+        }
+      })
     })
-  }
+  })
+}
 
-  poolRequests.deactivate = function () {
-    logger.info('Pool requests deactivated.')
-    clearInterval(timer)
-  }
+function updatePodsScore (good_pods, bad_pods) {
+  logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
 
-  poolRequests.forceSend = function () {
-    logger.info('Force pool requests sending.')
-    makePoolRequests()
-  }
+  Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
+    if (err) logger.error('Cannot increment scores of good pods.')
+  })
 
-  module.exports = poolRequests
-})()
+  Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
+    if (err) logger.error('Cannot increment scores of bad pods.')
+    removeBadPods()
+  })
+}