]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/request/abstract-request-scheduler.ts
Remove any typing from server
[github/Chocobozzz/PeerTube.git] / server / lib / request / abstract-request-scheduler.ts
index e81ab9c3688ed5097fbacaf66d1129fe4dda0cb8..ce4e2ffd2bc0d90e49c374a6162135c9faf5c094 100644 (file)
@@ -1,15 +1,25 @@
-import * as eachLimit from 'async/eachLimit'
+import { isEmpty } from 'lodash'
+import * as Promise from 'bluebird'
 
 import { database as db } from '../../initializers/database'
 import { logger, makeSecureRequest } from '../../helpers'
-import { PodInstance } from '../../models'
+import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models'
 import {
   API_VERSION,
   REQUESTS_IN_PARALLEL,
   REQUESTS_INTERVAL
 } from '../../initializers'
 
-abstract class AbstractRequestScheduler {
+interface RequestsObjects<U> {
+  [ id: string ]: {
+    toPod: PodInstance
+    endpoint: string
+    ids: number[] // ids
+    datas: U[]
+  }
+}
+
+abstract class AbstractRequestScheduler <T> {
   requestInterval: number
   limitPods: number
   limitPerPod: number
@@ -24,9 +34,9 @@ abstract class AbstractRequestScheduler {
     this.requestInterval = REQUESTS_INTERVAL
   }
 
-  abstract getRequestModel ()
-  abstract getRequestToPodModel ()
-  abstract buildRequestObjects (requests: any)
+  abstract getRequestModel (): AbstractRequestClass<T>
+  abstract getRequestToPodModel (): AbstractRequestToPodClass
+  abstract buildRequestsObjects (requestsGrouped: T): RequestsObjects<any>
 
   activate () {
     logger.info('Requests scheduler activated.')
@@ -55,23 +65,20 @@ abstract class AbstractRequestScheduler {
     return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
   }
 
-  remainingRequestsCount (callback: (err: Error, total: number) => void) {
-    return this.getRequestModel().countTotalRequests(callback)
+  remainingRequestsCount () {
+    return this.getRequestModel().countTotalRequests()
   }
 
-  flush (callback: (err: Error) => void) {
-    this.getRequestModel().removeAll(callback)
+  flush () {
+    return this.getRequestModel().removeAll()
   }
 
   // ---------------------------------------------------------------------------
 
   // Make a requests to friends of a certain type
-  protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) {
-    if (!callback) callback = function () { /* empty */ }
-
+  protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
     const params = {
       toPod: toPod,
-      sign: true, // Prove our identity
       method: 'POST' as 'POST',
       path: '/api/' + API_VERSION + '/remote/' + requestEndpoint,
       data: requestsToMake // Requests we need to make
@@ -79,65 +86,64 @@ abstract class AbstractRequestScheduler {
 
     // Make multiple retry requests to all of pods
     // The function fire some useful callbacks
-    makeSecureRequest(params, (err, res) => {
-      if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
-        err = err ? err.message : 'Status code not 20x : ' + res.statusCode
-        logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
-
-        return callback(err)
-      }
+    return makeSecureRequest(params)
+      .then(({ response, body }) => {
+        if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
+          throw new Error('Status code not 20x : ' + response.statusCode)
+        }
+      })
+      .catch(err => {
+        logger.error('Error sending secure request to %s pod.', toPod.host, err)
 
-      return callback(null)
-    })
+        throw err
+      })
   }
 
     // Make all the requests of the scheduler
   protected makeRequests () {
-    this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
-      if (err) {
-        logger.error('Cannot get the list of "%s".', this.description, { err: err })
-        return // Abort
-      }
-
-      // If there are no requests, abort
-      if (requests.length === 0) {
-        logger.info('No "%s" to make.', this.description)
-        return
-      }
-
-      // We want to group requests by destinations pod and endpoint
-      const requestsToMakeGrouped = this.buildRequestObjects(requests)
-
-      logger.info('Making "%s" to friends.', this.description)
-
-      const goodPods = []
-      const badPods = []
-
-      eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
-        const requestToMake = requestsToMakeGrouped[hashKey]
-        const toPod = requestToMake.toPod
-
-        this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => {
-          if (err) {
-            badPods.push(requestToMake.toPod.id)
-            return callbackEach()
-          }
-
-          logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
-          goodPods.push(requestToMake.toPod.id)
-
-          // Remove the pod id of these request ids
-          this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
+    return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
+      .then((requestsGrouped: T) => {
+        // We want to group requests by destinations pod and endpoint
+        const requestsToMake = this.buildRequestsObjects(requestsGrouped)
+
+        // If there are no requests, abort
+        if (isEmpty(requestsToMake) === true) {
+          logger.info('No "%s" to make.', this.description)
+          return { goodPods: [], badPods: [] }
+        }
+
+        logger.info('Making "%s" to friends.', this.description)
+
+        const goodPods: number[] = []
+        const badPods: number[] = []
+
+        return Promise.map(Object.keys(requestsToMake), hashKey => {
+          const requestToMake = requestsToMake[hashKey]
+          const toPod: PodInstance = requestToMake.toPod
+
+          return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
+            .then(() => {
+              logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
+              goodPods.push(requestToMake.toPod.id)
+
+              this.afterRequestHook()
+
+              // Remove the pod id of these request ids
+              return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
+            })
+            .catch(err => {
+              badPods.push(requestToMake.toPod.id)
+              logger.info('Cannot make request to %s.', toPod.host, err)
+            })
+        }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
+      })
+      .then(({ goodPods, badPods }) => {
+        this.afterRequestsHook()
 
-          this.afterRequestHook()
-        })
-      }, () => {
         // All the requests were made, we update the pods score
-        db.Pod.updatePodsScore(goodPods, badPods)
-
-        this.afterRequestsHook()
+        return db.Pod.updatePodsScore(goodPods, badPods)
       })
-    })
+      .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
   }
 
   protected afterRequestHook () {
@@ -152,5 +158,6 @@ abstract class AbstractRequestScheduler {
 // ---------------------------------------------------------------------------
 
 export {
-  AbstractRequestScheduler
+  AbstractRequestScheduler,
+  RequestsObjects
 }