]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/requestsScheduler.js
Try to make a better communication (between pods) module
[github/Chocobozzz/PeerTube.git] / server / lib / requestsScheduler.js
index 2c5474e51204ace54c6d376dfd303d7a2c9501b1..ac75e5b93646983ba2e74c7d4ea54aa47445fa15 100644 (file)
@@ -8,6 +8,7 @@ const logger = require('../helpers/logger')
 const Pods = require('../models/pods')
 const Requests = require('../models/requests')
 const requests = require('../helpers/requests')
+const videos = require('../lib/videos')
 const Videos = require('../models/videos')
 
 let timer = null
@@ -15,7 +16,9 @@ let timer = null
 const requestsScheduler = {
   activate: activate,
   addRequest: addRequest,
+  addRequestTo: addRequestTo,
   deactivate: deactivate,
+  flush: flush,
   forceSend: forceSend
 }
 
@@ -24,34 +27,38 @@ function activate () {
   timer = setInterval(makeRequests, constants.INTERVAL)
 }
 
-function addRequest (id, type, request) {
-  logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request })
+// Add request to the scheduler
+function addRequest (type, data) {
+  logger.debug('Add request of type %s to the requests scheduler.', type, { data: data })
 
-  Requests.findById(id, function (err, entity) {
+  const request = {
+    type: type,
+    data: data
+  }
+
+  Pods.listAllIds(function (err, podIds) {
     if (err) {
-      logger.error('Cannot find one request.', { error: err })
-      return // Abort
+      logger.debug('Cannot list pod ids.')
+      return
     }
 
-    if (entity) {
-      if (entity.type === type) {
-        logger.error('Cannot insert two same requests.')
-        return // Abort
-      }
+    // No friends
+    if (!podIds) return
 
-      // Remove the request of the other type
-      Requests.removeRequestById(id, function (err) {
-        if (err) {
-          logger.error('Cannot remove a request.', { error: err })
-          return // Abort
-        }
-      })
-    } else {
-      Requests.create(id, type, request, function (err) {
-        if (err) logger.error('Cannot create a request.', { error: err })
-        return // Abort
-      })
-    }
+    Requests.create(request, podIds, function (err) {
+      if (err) logger.error('Cannot create a request.', { error: err })
+    })
+  })
+}
+
+function addRequestTo (podIds, type, data) {
+  const request = {
+    type: type,
+    data: data
+  }
+
+  Requests.create(request, podIds, function (err) {
+    if (err) logger.error('Cannot create a request.', { error: err })
   })
 }
 
@@ -60,6 +67,14 @@ function deactivate () {
   clearInterval(timer)
 }
 
+function flush () {
+  Requests.removeAll(function (err) {
+    if (err) {
+      logger.error('Cannot flush the requests.', { error: err })
+    }
+  })
+}
+
 function forceSend () {
   logger.info('Force requests scheduler sending.')
   makeRequests()
@@ -71,151 +86,179 @@ module.exports = requestsScheduler
 
 // ---------------------------------------------------------------------------
 
-function makeRequest (type, requests_to_make, callback) {
+// Make a requests to friends of a certain type
+function makeRequest (toPod, requestsToMake, callback) {
   if (!callback) callback = function () {}
 
-  Pods.list(function (err, pods) {
-    if (err) return callback(err)
-
-    const params = {
-      encrypt: true,
-      sign: true,
-      method: 'POST',
-      path: null,
-      data: requests_to_make
-    }
-
-    if (type === 'add') {
-      params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
-    } else if (type === 'remove') {
-      params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
-    } else {
-      return callback(new Error('Unkown pool request type.'))
-    }
-
-    const bad_pods = []
-    const good_pods = []
-
-    requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
-
-    function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
-      if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
-        bad_pods.push(pod._id)
-        logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
-      } else {
-        good_pods.push(pod._id)
-      }
-
-      return callback_each_pod_finished()
+  const params = {
+    toPod: toPod,
+    encrypt: true, // Security
+    sign: true, // To prove our identity
+    method: 'POST',
+    path: '/api/' + constants.API_VERSION + '/remote/videos',
+    data: requestsToMake // Requests we need to make
+  }
+
+  // Make multiple retry requests to all of pods
+  // The function fire some useful callbacks
+  requests.makeSecureRequest(params, function (err, res) {
+    if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
+      logger.error('Error sending secure request to %s pod.', toPod.url, { error: err || new Error('Status code not 20x') })
+
+      return callback(false)
     }
 
-    function callbackAllPodsFinished (err) {
-      if (err) return callback(err)
-
-      updatePodsScore(good_pods, bad_pods)
-      callback(null)
-    }
+    return callback(true)
   })
 }
 
+// Make all the requests of the scheduler
 function makeRequests () {
-  logger.info('Making requests to friends.')
-
   Requests.list(function (err, requests) {
     if (err) {
       logger.error('Cannot get the list of requests.', { err: err })
       return // Abort
     }
 
-    if (requests.length === 0) return
-
-    const requests_to_make = {
-      add: {
-        ids: [],
-        requests: []
-      },
-      remove: {
-        ids: [],
-        requests: []
-      }
+    // If there are no requests, abort
+    if (requests.length === 0) {
+      logger.info('No requests to make.')
+      return
     }
 
-    async.each(requests, function (pool_request, callback_each) {
-      if (pool_request.type === 'add') {
-        requests_to_make.add.requests.push(pool_request.request)
-        requests_to_make.add.ids.push(pool_request._id)
-      } else if (pool_request.type === 'remove') {
-        requests_to_make.remove.requests.push(pool_request.request)
-        requests_to_make.remove.ids.push(pool_request._id)
-      } else {
-        logger.error('Unkown request type.', { request_type: pool_request.type })
-        return // abort
-      }
-
-      callback_each()
-    }, function () {
-      // Send the add requests
-      if (requests_to_make.add.requests.length !== 0) {
-        makeRequest('add', requests_to_make.add.requests, function (err) {
-          if (err) logger.error('Errors when sent add requests.', { error: err })
+    logger.info('Making requests to friends.')
 
-          Requests.removeRequests(requests_to_make.add.ids)
-        })
-      }
+    // Requests by pods id
+    const requestsToMake = {}
 
-      // Send the remove requests
-      if (requests_to_make.remove.requests.length !== 0) {
-        makeRequest('remove', requests_to_make.remove.requests, function (err) {
-          if (err) logger.error('Errors when sent remove pool requests.', { error: err })
+    requests.forEach(function (poolRequest) {
+      poolRequest.to.forEach(function (toPodId) {
+        if (!requestsToMake[toPodId]) {
+          requestsToMake[toPodId] = {
+            ids: [],
+            datas: []
+          }
+        }
+
+        requestsToMake[toPodId].ids.push(poolRequest._id)
+        requestsToMake[toPodId].datas.push(poolRequest.request)
+      })
+    })
+
+    const goodPods = []
+    const badPods = []
+
+    async.eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) {
+      const requestToMake = requestsToMake[toPodId]
+
+      // FIXME: mongodb request inside a loop :/
+      Pods.findById(toPodId, function (err, toPod) {
+        if (err) return logger.error('Error finding pod by id.', { err: err })
+
+        // Maybe the pod is not our friend anymore so simply remove them
+        if (!toPod) {
+          Requests.removePodOf(requestToMake.ids, toPodId)
+          return callbackEach()
+        }
+
+        makeRequest(toPod, requestToMake.datas, function (success) {
+          if (err) {
+            logger.error('Errors when sent request to %s.', toPod.url, { error: err })
+            // Do not stop the process just for one error
+            return callbackEach()
+          }
+
+          if (success === true) {
+            logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids })
+
+            // Remove the pod id of these request ids
+            Requests.removePodOf(requestToMake.ids, toPodId)
+            goodPods.push(toPodId)
+          } else {
+            badPods.push(toPodId)
+          }
 
-          Requests.removeRequests(requests_to_make.remove.ids)
+          callbackEach()
         })
-      }
+      })
+    }, function () {
+      // All the requests were made, we update the pods score
+      updatePodsScore(goodPods, badPods)
+      // Flush requests with no pod
+      Requests.removeWithEmptyTo()
     })
   })
 }
 
+// Remove pods with a score of 0 (too many requests where they were unreachable)
 function removeBadPods () {
-  Pods.findBadPods(function (err, pods) {
-    if (err) {
-      logger.error('Cannot find bad pods.', { error: err })
-      return // abort
-    }
+  async.waterfall([
+    function findBadPods (callback) {
+      Pods.findBadPods(function (err, pods) {
+        if (err) {
+          logger.error('Cannot find bad pods.', { error: err })
+          return callback(err)
+        }
 
-    if (pods.length === 0) return
+        return callback(null, pods)
+      })
+    },
 
-    const urls = map(pods, 'url')
-    const ids = map(pods, '_id')
+    function listVideosOfTheseBadPods (pods, callback) {
+      if (pods.length === 0) return callback(null)
 
-    Videos.removeAllRemotesOf(urls, function (err, r) {
-      if (err) {
-        logger.error('Cannot remove videos from a pod that we removing.', { error: err })
-      } else {
-        const videos_removed = r.result.n
-        logger.info('Removed %d videos.', videos_removed)
-      }
+      const urls = map(pods, 'url')
+      const ids = map(pods, '_id')
 
-      Pods.removeAllByIds(ids, function (err, r) {
+      Videos.listFromUrls(urls, function (err, videosList) {
         if (err) {
-          logger.error('Cannot remove bad pods.', { error: err })
-        } else {
-          const pods_removed = r.result.n
-          logger.info('Removed %d pods.', pods_removed)
+          logger.error('Cannot list videos urls.', { error: err, urls: urls })
+          return callback(null, ids, [])
         }
+
+        return callback(null, ids, videosList)
       })
-    })
+    },
+
+    function removeVideosOfTheseBadPods (podIds, videosList, callback) {
+      // We don't have to remove pods, skip
+      if (typeof podIds === 'function') return podIds(null)
+
+      // Remove the remote videos
+      videos.removeRemoteVideos(videosList, function (err) {
+        if (err) logger.error('Cannot remove remote videos.', { error: err })
+
+        return callback(null, podIds)
+      })
+    },
+
+    function removeBadPodsFromDB (podIds, callback) {
+      // We don't have to remove pods, skip
+      if (typeof podIds === 'function') return podIds(null)
+
+      Pods.removeAllByIds(podIds, callback)
+    }
+  ], function (err, removeResult) {
+    if (err) {
+      logger.error('Cannot remove bad pods.', { error: err })
+    } else if (removeResult) {
+      const podsRemoved = removeResult.result.n
+      logger.info('Removed %d pods.', podsRemoved)
+    } else {
+      logger.info('No need to remove bad pods.')
+    }
   })
 }
 
-function updatePodsScore (good_pods, bad_pods) {
-  logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
+function updatePodsScore (goodPods, badPods) {
+  logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
 
-  Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
+  Pods.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
     if (err) logger.error('Cannot increment scores of good pods.')
   })
 
-  Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
-    if (err) logger.error('Cannot increment scores of bad pods.')
+  Pods.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
+    if (err) logger.error('Cannot decrement scores of bad pods.')
     removeBadPods()
   })
 }