]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/models/request.js
Server: improve requests scheduler
[github/Chocobozzz/PeerTube.git] / server / models / request.js
index 882f747b76f76eaaa4e984f3f86a87c05f733c26..26953e5f50d46d3781939f78674466f4606fb802 100644 (file)
@@ -3,6 +3,7 @@
 const each = require('async/each')
 const eachLimit = require('async/eachLimit')
 const waterfall = require('async/waterfall')
+const values = require('lodash/values')
 
 const constants = require('../initializers/constants')
 const logger = require('../helpers/logger')
@@ -17,11 +18,12 @@ module.exports = function (sequelize, DataTypes) {
   const Request = sequelize.define('Request',
     {
       request: {
-        type: DataTypes.JSON
+        type: DataTypes.JSON,
+        allowNull: false
       },
       endpoint: {
-        // TODO: enum?
-        type: DataTypes.STRING
+        type: DataTypes.ENUM(values(constants.REQUEST_ENDPOINTS)),
+        allowNull: false
       }
     },
     {
@@ -79,9 +81,11 @@ function deactivate () {
   timer = null
 }
 
-function flush () {
+function flush (callback) {
   removeAll.call(this, function (err) {
     if (err) logger.error('Cannot flush the requests.', { error: err })
+
+    return callback(err)
   })
 }
 
@@ -118,7 +122,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
         'Error sending secure request to %s pod.',
         toPod.host,
         {
-          error: err || new Error('Status code not 20x : ' + res.statusCode)
+          error: err ? err.message : 'Status code not 20x : ' + res.statusCode
         }
       )
 
@@ -134,9 +138,9 @@ function makeRequests () {
   const self = this
   const RequestToPod = this.sequelize.models.RequestToPod
 
-  // We limit the size of the requests (REQUESTS_LIMIT)
+  // We limit the size of the requests
   // We don't want to stuck with the same failing requests so we get a random list
-  listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) {
+  listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) {
     if (err) {
       logger.error('Cannot get the list of requests.', { err: err })
       return // Abort
@@ -152,13 +156,15 @@ function makeRequests () {
 
     // We want to group requests by destinations pod and endpoint
     const requestsToMakeGrouped = {}
+    Object.keys(requests).forEach(function (toPodId) {
+      requests[toPodId].forEach(function (data) {
+        const request = data.request
+        const pod = data.pod
+        const hashKey = toPodId + request.endpoint
 
-    requests.forEach(function (request) {
-      request.Pods.forEach(function (toPod) {
-        const hashKey = toPod.id + request.endpoint
         if (!requestsToMakeGrouped[hashKey]) {
           requestsToMakeGrouped[hashKey] = {
-            toPodId: toPod.id,
+            toPod: pod,
             endpoint: request.endpoint,
             ids: [], // request ids, to delete them from the DB in the future
             datas: [] // requests data,
@@ -175,36 +181,29 @@ function makeRequests () {
 
     eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
       const requestToMake = requestsToMakeGrouped[hashKey]
+      const toPod = requestToMake.toPod
 
-      // FIXME: SQL request inside a loop :/
-      self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
-        if (err) {
-          logger.error('Error finding pod by id.', { err: err })
-          return callbackEach()
-        }
-
-        // Maybe the pod is not our friend anymore so simply remove it
-        if (!toPod) {
-          const requestIdsToDelete = requestToMake.ids
+      // Maybe the pod is not our friend anymore so simply remove it
+      if (!toPod) {
+        const requestIdsToDelete = requestToMake.ids
 
-          logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
-          RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
-          return callbackEach()
-        }
+        logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
+        RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id)
+        return callbackEach()
+      }
 
-        makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
-          if (success === true) {
-            logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids })
+      makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
+        if (success === true) {
+          logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
 
-            goodPods.push(requestToMake.toPodId)
+          goodPods.push(requestToMake.toPod.id)
 
-            // Remove the pod id of these request ids
-            RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
-          } else {
-            badPods.push(requestToMake.toPodId)
-            callbackEach()
-          }
-        })
+          // Remove the pod id of these request ids
+          RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
+        } else {
+          badPods.push(requestToMake.toPod.id)
+          callbackEach()
+        }
       })
     }, function () {
       // All the requests were made, we update the pods score
@@ -259,46 +258,77 @@ function updatePodsScore (goodPods, badPods) {
 
   if (goodPods.length !== 0) {
     Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
-      if (err) logger.error('Cannot increment scores of good pods.')
+      if (err) logger.error('Cannot increment scores of good pods.', { error: err })
     })
   }
 
   if (badPods.length !== 0) {
     Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
-      if (err) logger.error('Cannot decrement scores of bad pods.')
+      if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
       removeBadPods.call(self)
     })
   }
 }
 
-function listWithLimitAndRandom (limit, callback) {
+function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
   const self = this
+  const Pod = this.sequelize.models.Pod
 
-  self.count().asCallback(function (err, count) {
+  Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) {
     if (err) return callback(err)
 
-    // Optimization...
-    if (count === 0) return callback(null, [])
-
-    let start = Math.floor(Math.random() * count) - limit
-    if (start < 0) start = 0
+    // We don't have friends that have requests
+    if (podIds.length === 0) return callback(null, [])
 
+    // The the first x requests of these pods
+    // It is very important to sort by id ASC to keep the requests order!
     const query = {
       order: [
         [ 'id', 'ASC' ]
       ],
-      offset: start,
-      limit: limit,
-      include: [ this.sequelize.models.Pod ]
+      include: [
+        {
+          model: self.sequelize.models.Pod,
+          where: {
+            id: {
+              $in: podIds
+            }
+          }
+        }
+      ]
     }
 
-    self.findAll(query).asCallback(callback)
+    self.findAll(query).asCallback(function (err, requests) {
+      if (err) return callback(err)
+
+      const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod)
+      return callback(err, requestsGrouped)
+    })
+  })
+}
+
+function groupAndTruncateRequests (requests, limitRequestsPerPod) {
+  const requestsGrouped = {}
+
+  requests.forEach(function (request) {
+    request.Pods.forEach(function (pod) {
+      if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
+
+      if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
+        requestsGrouped[pod.id].push({
+          request,
+          pod
+        })
+      }
+    })
   })
+
+  return requestsGrouped
 }
 
 function removeAll (callback) {
   // Delete all requests
-  this.destroy({ truncate: true }).asCallback(callback)
+  this.truncate({ cascade: true }).asCallback(callback)
 }
 
 function removeWithEmptyTo (callback) {