]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Server: improve requests scheduler
authorChocobozzz <florian.bigard@gmail.com>
Tue, 10 Jan 2017 21:24:42 +0000 (22:24 +0100)
committerChocobozzz <florian.bigard@gmail.com>
Tue, 10 Jan 2017 21:24:42 +0000 (22:24 +0100)
server/initializers/constants.js
server/lib/friends.js
server/models/pod.js
server/models/request.js

index a6adb75bf6714334e2b6fda470d295e2da3e93ac..97e3c5296b761a30ddc36f0a7880809ae65611bd 100644 (file)
@@ -108,8 +108,10 @@ let REQUESTS_INTERVAL = 600000
 // Number of requests in parallel we can make
 const REQUESTS_IN_PARALLEL = 10
 
-// How many requests we put in request
-const REQUESTS_LIMIT = 10
+// To how many pods we send requests
+const REQUESTS_LIMIT_PODS = 10
+// How many requests we send to a pod per interval
+const REQUESTS_LIMIT_PER_POD = 5
 
 // Number of requests to retry for replay requests module
 const RETRY_REQUESTS = 5
@@ -184,7 +186,8 @@ module.exports = {
   REQUEST_ENDPOINTS,
   REQUESTS_IN_PARALLEL,
   REQUESTS_INTERVAL,
-  REQUESTS_LIMIT,
+  REQUESTS_LIMIT_PODS,
+  REQUESTS_LIMIT_PER_POD,
   RETRY_REQUESTS,
   SEARCHABLE_COLUMNS,
   SIGNATURE_ALGORITHM,
index 3d3d0fdeed1da06bb6f0cc37147a5b5f3a32f0a8..f0575ff2fc0b34d2c433e157e5892099f961f48c 100644 (file)
@@ -54,7 +54,13 @@ function removeVideoToFriends (videoParams) {
 }
 
 function reportAbuseVideoToFriend (reportData, video) {
-  createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ])
+  const options = {
+    type: 'report-abuse',
+    endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+    data: reportData,
+    toIds: [ video.Author.podId ]
+  }
+  createRequest(options)
 }
 
 function hasFriends (callback) {
@@ -161,7 +167,13 @@ function sendOwnedVideosToPod (podId) {
           return
         }
 
-        createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
+        const options = {
+          type: 'add',
+          endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+          data: remoteVideo,
+          toIds: [ podId ]
+        }
+        createRequest(options)
       })
     })
   })
index 8e7dd1fd8d7f71a805ff087fd98072ef4f3df9a7..b3c6db8e881aa45e09afe688310ce3d8de6d37c2 100644 (file)
@@ -50,6 +50,7 @@ module.exports = function (sequelize, DataTypes) {
         incrementScores,
         list,
         listAllIds,
+        listRandomPodIdsWithRequest,
         listBadPods,
         load,
         loadByHost,
@@ -134,6 +135,42 @@ function listAllIds (transaction, callback) {
   })
 }
 
+function listRandomPodIdsWithRequest (limit, callback) {
+  const self = this
+
+  self.count().asCallback(function (err, count) {
+    if (err) return callback(err)
+
+    // Optimization...
+    if (count === 0) return callback(null, [])
+
+    let start = Math.floor(Math.random() * count) - limit
+    if (start < 0) start = 0
+
+    const query = {
+      attributes: [ 'id' ],
+      order: [
+        [ 'id', 'ASC' ]
+      ],
+      offset: start,
+      limit: limit,
+      where: {
+        id: {
+          $in: [
+            this.sequelize.literal('SELECT "podId" FROM "RequestToPods"')
+          ]
+        }
+      }
+    }
+
+    return this.findAll(query).asCallback(function (err, pods) {
+      if (err) return callback(err)
+
+      return callback(null, map(pods, 'id'))
+    })
+  })
+}
+
 function listBadPods (callback) {
   const query = {
     where: {
index 1d60380449014dbdd5a1f65358ae614569077adc..26953e5f50d46d3781939f78674466f4606fb802 100644 (file)
@@ -138,9 +138,9 @@ function makeRequests () {
   const self = this
   const RequestToPod = this.sequelize.models.RequestToPod
 
-  // We limit the size of the requests (REQUESTS_LIMIT)
+  // We limit the size of the requests
   // We don't want to stuck with the same failing requests so we get a random list
-  listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) {
+  listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, function (err, requests) {
     if (err) {
       logger.error('Cannot get the list of requests.', { err: err })
       return // Abort
@@ -156,13 +156,15 @@ function makeRequests () {
 
     // We want to group requests by destinations pod and endpoint
     const requestsToMakeGrouped = {}
+    Object.keys(requests).forEach(function (toPodId) {
+      requests[toPodId].forEach(function (data) {
+        const request = data.request
+        const pod = data.pod
+        const hashKey = toPodId + request.endpoint
 
-    requests.forEach(function (request) {
-      request.Pods.forEach(function (toPod) {
-        const hashKey = toPod.id + request.endpoint
         if (!requestsToMakeGrouped[hashKey]) {
           requestsToMakeGrouped[hashKey] = {
-            toPodId: toPod.id,
+            toPod: pod,
             endpoint: request.endpoint,
             ids: [], // request ids, to delete them from the DB in the future
             datas: [] // requests data,
@@ -179,36 +181,29 @@ function makeRequests () {
 
     eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
       const requestToMake = requestsToMakeGrouped[hashKey]
+      const toPod = requestToMake.toPod
 
-      // FIXME: SQL request inside a loop :/
-      self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
-        if (err) {
-          logger.error('Error finding pod by id.', { err: err })
-          return callbackEach()
-        }
-
-        // Maybe the pod is not our friend anymore so simply remove it
-        if (!toPod) {
-          const requestIdsToDelete = requestToMake.ids
+      // Maybe the pod is not our friend anymore so simply remove it
+      if (!toPod) {
+        const requestIdsToDelete = requestToMake.ids
 
-          logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
-          RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
-          return callbackEach()
-        }
+        logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
+        RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPod.id)
+        return callbackEach()
+      }
 
-        makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
-          if (success === true) {
-            logger.debug('Removing requests for pod %s.', requestToMake.toPodId, { requestsIds: requestToMake.ids })
+      makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
+        if (success === true) {
+          logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
 
-            goodPods.push(requestToMake.toPodId)
+          goodPods.push(requestToMake.toPod.id)
 
-            // Remove the pod id of these request ids
-            RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
-          } else {
-            badPods.push(requestToMake.toPodId)
-            callbackEach()
-          }
-        })
+          // Remove the pod id of these request ids
+          RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
+        } else {
+          badPods.push(requestToMake.toPod.id)
+          callbackEach()
+        }
       })
     }, function () {
       // All the requests were made, we update the pods score
@@ -275,29 +270,60 @@ function updatePodsScore (goodPods, badPods) {
   }
 }
 
-function listWithLimitAndRandom (limit, callback) {
+function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
   const self = this
+  const Pod = this.sequelize.models.Pod
 
-  self.count().asCallback(function (err, count) {
+  Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) {
     if (err) return callback(err)
 
-    // Optimization...
-    if (count === 0) return callback(null, [])
-
-    let start = Math.floor(Math.random() * count) - limit
-    if (start < 0) start = 0
+    // We don't have friends that have requests
+    if (podIds.length === 0) return callback(null, [])
 
+    // The the first x requests of these pods
+    // It is very important to sort by id ASC to keep the requests order!
     const query = {
       order: [
         [ 'id', 'ASC' ]
       ],
-      // offset: start,
-      // limit: limit,
-      include: [ this.sequelize.models.Pod ]
+      include: [
+        {
+          model: self.sequelize.models.Pod,
+          where: {
+            id: {
+              $in: podIds
+            }
+          }
+        }
+      ]
     }
 
-    self.findAll(query).asCallback(callback)
+    self.findAll(query).asCallback(function (err, requests) {
+      if (err) return callback(err)
+
+      const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod)
+      return callback(err, requestsGrouped)
+    })
+  })
+}
+
+function groupAndTruncateRequests (requests, limitRequestsPerPod) {
+  const requestsGrouped = {}
+
+  requests.forEach(function (request) {
+    request.Pods.forEach(function (pod) {
+      if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
+
+      if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
+        requestsGrouped[pod.id].push({
+          request,
+          pod
+        })
+      }
+    })
   })
+
+  return requestsGrouped
 }
 
 function removeAll (callback) {