]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - lib/poolRequests.js
Move the creation of requests in lib instead of model for poolrequests
[github/Chocobozzz/PeerTube.git] / lib / poolRequests.js
index 9c7f3238bb03af059615433d2110dc3fae1d6814..5b7d5489d071766f6b91e3870045704c61d8f6a3 100644 (file)
   'use strict'
 
   var async = require('async')
+  var pluck = require('lodash-node/compat/collection/pluck')
 
   var constants = require('../initializers/constants')
   var logger = require('../helpers/logger')
-  var database = require('../initializers/database')
-  var pluck = require('lodash-node/compat/collection/pluck')
-  var PoolRequestsDB = database.PoolRequestsDB
-  var PodsDB = database.PodsDB
-  var utils = require('../helpers/utils')
-  var VideosDB = database.VideosDB
-
-  var poolRequests = {}
+  var Pods = require('../models/pods')
+  var PoolRequests = require('../models/poolRequests')
+  var requests = require('../helpers/requests')
+  var Videos = require('../models/videos')
 
-  // ----------- Private -----------
   var timer = null
 
-  function removePoolRequestsFromDB (ids) {
-    PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) {
-      if (err) {
-        logger.error('Cannot remove requests from the pool requests database.', { error: err })
-        return
-      }
-
-      logger.info('Pool requests flushed.')
-    })
+  var poolRequests = {
+    activate: activate,
+    addRequest: addRequest,
+    deactivate: deactivate,
+    forceSend: forceSend
   }
 
-  function makePoolRequests () {
-    logger.info('Making pool requests to friends.')
-
-    PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) {
-      if (err) throw err
+  function activate () {
+    logger.info('Pool requests activated.')
+    timer = setInterval(makePoolRequests, constants.INTERVAL)
+  }
 
-      if (pool_requests.length === 0) return
+  function addRequest (id, type, request) {
+    logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
 
-      var requests = {
-        add: {
-          ids: [],
-          requests: []
-        },
-        remove: {
-          ids: [],
-          requests: []
-        }
+    PoolRequests.findById(id, function (err, entity) {
+      if (err) {
+        logger.error('Cannot find one pool request.', { error: err })
+        return // Abort
       }
 
-      async.each(pool_requests, function (pool_request, callback_each) {
-        if (pool_request.type === 'add') {
-          requests.add.requests.push(pool_request.request)
-          requests.add.ids.push(pool_request._id)
-        } else if (pool_request.type === 'remove') {
-          requests.remove.requests.push(pool_request.request)
-          requests.remove.ids.push(pool_request._id)
-        } else {
-          throw new Error('Unkown pool request type.')
-        }
-
-        callback_each()
-      }, function () {
-        // Send the add requests
-        if (requests.add.requests.length !== 0) {
-          makePoolRequest('add', requests.add.requests, function (err) {
-            if (err) logger.error('Errors when sent add pool requests.', { error: err })
-
-            removePoolRequestsFromDB(requests.add.ids)
-          })
+      if (entity) {
+        if (entity.type === type) {
+          logger.error('Cannot insert two same requests.')
+          return // Abort
         }
 
-        // Send the remove requests
-        if (requests.remove.requests.length !== 0) {
-          makePoolRequest('remove', requests.remove.requests, function (err) {
-            if (err) logger.error('Errors when sent remove pool requests.', { error: err })
-
-            removePoolRequestsFromDB(requests.remove.ids)
-          })
-        }
-      })
+        // Remove the request of the other type
+        PoolRequests.removeById(id, function (err) {
+          if (err) {
+            logger.error('Cannot remove a pool request.', { error: err })
+            return // Abort
+          }
+        })
+      } else {
+        PoolRequests.create(id, type, request, function (err) {
+          logger.error('Cannot create a pool request.', { error: err })
+          return // Abort
+        })
+      }
     })
   }
 
-  function updatePodsScore (good_pods, bad_pods) {
-    logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
-
-    PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec()
-    PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) {
-      if (err) throw err
-      removeBadPods()
-    })
+  function deactivate () {
+    logger.info('Pool requests deactivated.')
+    clearInterval(timer)
   }
 
-  function removeBadPods () {
-    PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) {
-      if (err) throw err
-
-      if (pods.length === 0) return
-
-      var urls = pluck(pods, 'url')
-      var ids = pluck(pods, '_id')
+  function forceSend () {
+    logger.info('Force pool requests sending.')
+    makePoolRequests()
+  }
 
-      VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) {
-        if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err })
-        var videos_removed = r.result.n
-        logger.info('Removed %d videos.', videos_removed)
+  // ---------------------------------------------------------------------------
 
-        PodsDB.remove({ _id: { $in: ids } }, function (err, r) {
-          if (err) logger.error('Cannot remove bad pods.', { error: err })
+  module.exports = poolRequests
 
-          var pods_removed = r.result.n
-          logger.info('Removed %d pods.', pods_removed)
-        })
-      })
-    })
-  }
+  // ---------------------------------------------------------------------------
 
-  function makePoolRequest (type, requests, callback) {
+  function makePoolRequest (type, requests_to_make, callback) {
     if (!callback) callback = function () {}
 
-    PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
-      if (err) throw err
+    Pods.list(function (err, pods) {
+      if (err) return callback(err)
 
       var params = {
         encrypt: true,
         sign: true,
         method: 'POST',
         path: null,
-        data: requests
+        data: requests_to_make
       }
 
       if (type === 'add') {
       } else if (type === 'remove') {
         params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
       } else {
-        throw new Error('Unkown pool request type.')
+        return callback(new Error('Unkown pool request type.'))
       }
 
       var bad_pods = []
       var good_pods = []
 
-      utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
+      requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
 
       function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
         if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
     })
   }
 
-  // ----------- Public -----------
-  poolRequests.activate = function () {
-    logger.info('Pool requests activated.')
-    timer = setInterval(makePoolRequests, constants.INTERVAL)
+  function makePoolRequests () {
+    logger.info('Making pool requests to friends.')
+
+    PoolRequests.list(function (err, pool_requests) {
+      if (err) {
+        logger.error('Cannot get the list of pool requests.', { err: err })
+        return // Abort
+      }
+
+      if (pool_requests.length === 0) return
+
+      var requests_to_make = {
+        add: {
+          ids: [],
+          requests: []
+        },
+        remove: {
+          ids: [],
+          requests: []
+        }
+      }
+
+      async.each(pool_requests, function (pool_request, callback_each) {
+        if (pool_request.type === 'add') {
+          requests_to_make.add.requests.push(pool_request.request)
+          requests_to_make.add.ids.push(pool_request._id)
+        } else if (pool_request.type === 'remove') {
+          requests_to_make.remove.requests.push(pool_request.request)
+          requests_to_make.remove.ids.push(pool_request._id)
+        } else {
+          logger.error('Unkown pool request type.', { request_type: pool_request.type })
+          return // abort
+        }
+
+        callback_each()
+      }, function () {
+        // Send the add requests
+        if (requests_to_make.add.requests.length !== 0) {
+          makePoolRequest('add', requests_to_make.add.requests, function (err) {
+            if (err) logger.error('Errors when sent add pool requests.', { error: err })
+
+            PoolRequests.removeRequests(requests_to_make.add.ids)
+          })
+        }
+
+        // Send the remove requests
+        if (requests_to_make.remove.requests.length !== 0) {
+          makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
+            if (err) logger.error('Errors when sent remove pool requests.', { error: err })
+
+            PoolRequests.removeRequests(requests_to_make.remove.ids)
+          })
+        }
+      })
+    })
   }
 
-  poolRequests.addToPoolRequests = function (id, type, request) {
-    logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
+  function removeBadPods () {
+    Pods.findBadPods(function (err, pods) {
+      if (err) {
+        logger.error('Cannot find bad pods.', { error: err })
+        return // abort
+      }
 
-    PoolRequestsDB.findOne({ id: id }, function (err, entity) {
-      if (err) logger.error(err)
+      if (pods.length === 0) return
 
-      if (entity) {
-        if (entity.type === type) {
-          logger.error(new Error('Cannot insert two same requests.'))
-          return
+      var urls = pluck(pods, 'url')
+      var ids = pluck(pods, '_id')
+
+      Videos.removeAllRemotesOf(urls, function (err, r) {
+        if (err) {
+          logger.error('Cannot remove videos from a pod that we removing.', { error: err })
+        } else {
+          var videos_removed = r.result.n
+          logger.info('Removed %d videos.', videos_removed)
         }
 
-        // Remove the request of the other type
-        PoolRequestsDB.remove({ id: id }, function (err) {
-          if (err) logger.error(err)
+        Pods.removeAllByIds(ids, function (err, r) {
+          if (err) {
+            logger.error('Cannot remove bad pods.', { error: err })
+          } else {
+            var pods_removed = r.result.n
+            logger.info('Removed %d pods.', pods_removed)
+          }
         })
-      } else {
-        PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
-          if (err) logger.error(err)
-        })
-      }
+      })
     })
   }
 
-  poolRequests.deactivate = function () {
-    logger.info('Pool requests deactivated.')
-    clearInterval(timer)
-  }
+  function updatePodsScore (good_pods, bad_pods) {
+    logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
 
-  poolRequests.forceSend = function () {
-    logger.info('Force pool requests sending.')
-    makePoolRequests()
-  }
+    Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
+      if (err) logger.error('Cannot increment scores of good pods.')
+    })
 
-  module.exports = poolRequests
+    Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
+      if (err) logger.error('Cannot increment scores of bad pods.')
+      removeBadPods()
+    })
+  }
 })()