aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/request/abstract-request-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-07-05 13:26:25 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-07-05 14:14:16 +0200
commit6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch)
tree3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib/request/abstract-request-scheduler.ts
parent5fe7e898316e18369c3e1aba307b55077adc7bfb (diff)
downloadPeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib/request/abstract-request-scheduler.ts')
-rw-r--r--server/lib/request/abstract-request-scheduler.ts124
1 files changed, 61 insertions, 63 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts
index e81ab9c36..dd77fddb7 100644
--- a/server/lib/request/abstract-request-scheduler.ts
+++ b/server/lib/request/abstract-request-scheduler.ts
@@ -1,15 +1,16 @@
1import * as eachLimit from 'async/eachLimit' 1import { isEmpty } from 'lodash'
2import * as Promise from 'bluebird'
2 3
3import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
4import { logger, makeSecureRequest } from '../../helpers' 5import { logger, makeSecureRequest } from '../../helpers'
5import { PodInstance } from '../../models' 6import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models'
6import { 7import {
7 API_VERSION, 8 API_VERSION,
8 REQUESTS_IN_PARALLEL, 9 REQUESTS_IN_PARALLEL,
9 REQUESTS_INTERVAL 10 REQUESTS_INTERVAL
10} from '../../initializers' 11} from '../../initializers'
11 12
12abstract class AbstractRequestScheduler { 13abstract class AbstractRequestScheduler <T> {
13 requestInterval: number 14 requestInterval: number
14 limitPods: number 15 limitPods: number
15 limitPerPod: number 16 limitPerPod: number
@@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler {
24 this.requestInterval = REQUESTS_INTERVAL 25 this.requestInterval = REQUESTS_INTERVAL
25 } 26 }
26 27
27 abstract getRequestModel () 28 abstract getRequestModel (): AbstractRequestClass<T>
28 abstract getRequestToPodModel () 29 abstract getRequestToPodModel (): AbstractRequestToPodClass
29 abstract buildRequestObjects (requests: any) 30 abstract buildRequestObjects (requestsGrouped: T): {}
30 31
31 activate () { 32 activate () {
32 logger.info('Requests scheduler activated.') 33 logger.info('Requests scheduler activated.')
@@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler {
55 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) 56 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
56 } 57 }
57 58
58 remainingRequestsCount (callback: (err: Error, total: number) => void) { 59 remainingRequestsCount () {
59 return this.getRequestModel().countTotalRequests(callback) 60 return this.getRequestModel().countTotalRequests()
60 } 61 }
61 62
62 flush (callback: (err: Error) => void) { 63 flush () {
63 this.getRequestModel().removeAll(callback) 64 return this.getRequestModel().removeAll()
64 } 65 }
65 66
66 // --------------------------------------------------------------------------- 67 // ---------------------------------------------------------------------------
67 68
68 // Make a requests to friends of a certain type 69 // Make a requests to friends of a certain type
69 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { 70 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) {
70 if (!callback) callback = function () { /* empty */ }
71
72 const params = { 71 const params = {
73 toPod: toPod, 72 toPod: toPod,
74 sign: true, // Prove our identity 73 sign: true, // Prove our identity
@@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler {
79 78
80 // Make multiple retry requests to all of pods 79 // Make multiple retry requests to all of pods
81 // The function fire some useful callbacks 80 // The function fire some useful callbacks
82 makeSecureRequest(params, (err, res) => { 81 return makeSecureRequest(params)
83 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { 82 .then(({ response, body }) => {
84 err = err ? err.message : 'Status code not 20x : ' + res.statusCode 83 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
84 throw new Error('Status code not 20x : ' + response.statusCode)
85 }
86 })
87 .catch(err => {
85 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) 88 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
86 89
87 return callback(err) 90 throw err
88 } 91 })
89
90 return callback(null)
91 })
92 } 92 }
93 93
94 // Make all the requests of the scheduler 94 // Make all the requests of the scheduler
95 protected makeRequests () { 95 protected makeRequests () {
96 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { 96 return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
97 if (err) { 97 .then((requestsGrouped: T) => {
98 logger.error('Cannot get the list of "%s".', this.description, { err: err }) 98 // We want to group requests by destinations pod and endpoint
99 return // Abort 99 const requestsToMake = this.buildRequestObjects(requestsGrouped)
100 } 100
101 101 // If there are no requests, abort
102 // If there are no requests, abort 102 if (isEmpty(requestsToMake) === true) {
103 if (requests.length === 0) { 103 logger.info('No "%s" to make.', this.description)
104 logger.info('No "%s" to make.', this.description) 104 return { goodPods: [], badPods: [] }
105 return 105 }
106 } 106
107 107 logger.info('Making "%s" to friends.', this.description)
108 // We want to group requests by destinations pod and endpoint 108
109 const requestsToMakeGrouped = this.buildRequestObjects(requests) 109 const goodPods = []
110 110 const badPods = []
111 logger.info('Making "%s" to friends.', this.description) 111
112 112 return Promise.map(Object.keys(requestsToMake), hashKey => {
113 const goodPods = [] 113 const requestToMake = requestsToMake[hashKey]
114 const badPods = [] 114 const toPod: PodInstance = requestToMake.toPod
115 115
116 eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { 116 return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
117 const requestToMake = requestsToMakeGrouped[hashKey] 117 .then(() => {
118 const toPod = requestToMake.toPod 118 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
119 119 goodPods.push(requestToMake.toPod.id)
120 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { 120
121 if (err) { 121 this.afterRequestHook()
122 badPods.push(requestToMake.toPod.id) 122
123 return callbackEach() 123 // Remove the pod id of these request ids
124 } 124 return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
125 125 })
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) 126 .catch(err => {
127 goodPods.push(requestToMake.toPod.id) 127 badPods.push(requestToMake.toPod.id)
128 128 logger.info('Cannot make request to %s.', toPod.host, { error: err })
129 // Remove the pod id of these request ids 129 })
130 this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) 130 }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
131 })
132 .then(({ goodPods, badPods }) => {
133 this.afterRequestsHook()
131 134
132 this.afterRequestHook()
133 })
134 }, () => {
135 // All the requests were made, we update the pods score 135 // All the requests were made, we update the pods score
136 db.Pod.updatePodsScore(goodPods, badPods) 136 return db.Pod.updatePodsScore(goodPods, badPods)
137
138 this.afterRequestsHook()
139 }) 137 })
140 }) 138 .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
141 } 139 }
142 140
143 protected afterRequestHook () { 141 protected afterRequestHook () {