diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-07-05 13:26:25 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-07-05 14:14:16 +0200 |
commit | 6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch) | |
tree | 3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib/request/abstract-request-scheduler.ts | |
parent | 5fe7e898316e18369c3e1aba307b55077adc7bfb (diff) | |
download | PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip |
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib/request/abstract-request-scheduler.ts')
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 124 |
1 files changed, 61 insertions, 63 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index e81ab9c36..dd77fddb7 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -1,15 +1,16 @@ | |||
1 | import * as eachLimit from 'async/eachLimit' | 1 | import { isEmpty } from 'lodash' |
2 | import * as Promise from 'bluebird' | ||
2 | 3 | ||
3 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
4 | import { logger, makeSecureRequest } from '../../helpers' | 5 | import { logger, makeSecureRequest } from '../../helpers' |
5 | import { PodInstance } from '../../models' | 6 | import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' |
6 | import { | 7 | import { |
7 | API_VERSION, | 8 | API_VERSION, |
8 | REQUESTS_IN_PARALLEL, | 9 | REQUESTS_IN_PARALLEL, |
9 | REQUESTS_INTERVAL | 10 | REQUESTS_INTERVAL |
10 | } from '../../initializers' | 11 | } from '../../initializers' |
11 | 12 | ||
12 | abstract class AbstractRequestScheduler { | 13 | abstract class AbstractRequestScheduler <T> { |
13 | requestInterval: number | 14 | requestInterval: number |
14 | limitPods: number | 15 | limitPods: number |
15 | limitPerPod: number | 16 | limitPerPod: number |
@@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler { | |||
24 | this.requestInterval = REQUESTS_INTERVAL | 25 | this.requestInterval = REQUESTS_INTERVAL |
25 | } | 26 | } |
26 | 27 | ||
27 | abstract getRequestModel () | 28 | abstract getRequestModel (): AbstractRequestClass<T> |
28 | abstract getRequestToPodModel () | 29 | abstract getRequestToPodModel (): AbstractRequestToPodClass |
29 | abstract buildRequestObjects (requests: any) | 30 | abstract buildRequestObjects (requestsGrouped: T): {} |
30 | 31 | ||
31 | activate () { | 32 | activate () { |
32 | logger.info('Requests scheduler activated.') | 33 | logger.info('Requests scheduler activated.') |
@@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler { | |||
55 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | 56 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) |
56 | } | 57 | } |
57 | 58 | ||
58 | remainingRequestsCount (callback: (err: Error, total: number) => void) { | 59 | remainingRequestsCount () { |
59 | return this.getRequestModel().countTotalRequests(callback) | 60 | return this.getRequestModel().countTotalRequests() |
60 | } | 61 | } |
61 | 62 | ||
62 | flush (callback: (err: Error) => void) { | 63 | flush () { |
63 | this.getRequestModel().removeAll(callback) | 64 | return this.getRequestModel().removeAll() |
64 | } | 65 | } |
65 | 66 | ||
66 | // --------------------------------------------------------------------------- | 67 | // --------------------------------------------------------------------------- |
67 | 68 | ||
68 | // Make a requests to friends of a certain type | 69 | // Make a requests to friends of a certain type |
69 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { | 70 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) { |
70 | if (!callback) callback = function () { /* empty */ } | ||
71 | |||
72 | const params = { | 71 | const params = { |
73 | toPod: toPod, | 72 | toPod: toPod, |
74 | sign: true, // Prove our identity | 73 | sign: true, // Prove our identity |
@@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler { | |||
79 | 78 | ||
80 | // Make multiple retry requests to all of pods | 79 | // Make multiple retry requests to all of pods |
81 | // The function fire some useful callbacks | 80 | // The function fire some useful callbacks |
82 | makeSecureRequest(params, (err, res) => { | 81 | return makeSecureRequest(params) |
83 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | 82 | .then(({ response, body }) => { |
84 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | 83 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { |
84 | throw new Error('Status code not 20x : ' + response.statusCode) | ||
85 | } | ||
86 | }) | ||
87 | .catch(err => { | ||
85 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | 88 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) |
86 | 89 | ||
87 | return callback(err) | 90 | throw err |
88 | } | 91 | }) |
89 | |||
90 | return callback(null) | ||
91 | }) | ||
92 | } | 92 | } |
93 | 93 | ||
94 | // Make all the requests of the scheduler | 94 | // Make all the requests of the scheduler |
95 | protected makeRequests () { | 95 | protected makeRequests () { |
96 | this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { | 96 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) |
97 | if (err) { | 97 | .then((requestsGrouped: T) => { |
98 | logger.error('Cannot get the list of "%s".', this.description, { err: err }) | 98 | // We want to group requests by destinations pod and endpoint |
99 | return // Abort | 99 | const requestsToMake = this.buildRequestObjects(requestsGrouped) |
100 | } | 100 | |
101 | 101 | // If there are no requests, abort | |
102 | // If there are no requests, abort | 102 | if (isEmpty(requestsToMake) === true) { |
103 | if (requests.length === 0) { | 103 | logger.info('No "%s" to make.', this.description) |
104 | logger.info('No "%s" to make.', this.description) | 104 | return { goodPods: [], badPods: [] } |
105 | return | 105 | } |
106 | } | 106 | |
107 | 107 | logger.info('Making "%s" to friends.', this.description) | |
108 | // We want to group requests by destinations pod and endpoint | 108 | |
109 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | 109 | const goodPods = [] |
110 | 110 | const badPods = [] | |
111 | logger.info('Making "%s" to friends.', this.description) | 111 | |
112 | 112 | return Promise.map(Object.keys(requestsToMake), hashKey => { | |
113 | const goodPods = [] | 113 | const requestToMake = requestsToMake[hashKey] |
114 | const badPods = [] | 114 | const toPod: PodInstance = requestToMake.toPod |
115 | 115 | ||
116 | eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | 116 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) |
117 | const requestToMake = requestsToMakeGrouped[hashKey] | 117 | .then(() => { |
118 | const toPod = requestToMake.toPod | 118 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
119 | 119 | goodPods.push(requestToMake.toPod.id) | |
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { | 120 | |
121 | if (err) { | 121 | this.afterRequestHook() |
122 | badPods.push(requestToMake.toPod.id) | 122 | |
123 | return callbackEach() | 123 | // Remove the pod id of these request ids |
124 | } | 124 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) |
125 | 125 | }) | |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | 126 | .catch(err => { |
127 | goodPods.push(requestToMake.toPod.id) | 127 | badPods.push(requestToMake.toPod.id) |
128 | 128 | logger.info('Cannot make request to %s.', toPod.host, { error: err }) | |
129 | // Remove the pod id of these request ids | 129 | }) |
130 | this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) | 130 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) |
131 | }) | ||
132 | .then(({ goodPods, badPods }) => { | ||
133 | this.afterRequestsHook() | ||
131 | 134 | ||
132 | this.afterRequestHook() | ||
133 | }) | ||
134 | }, () => { | ||
135 | // All the requests were made, we update the pods score | 135 | // All the requests were made, we update the pods score |
136 | db.Pod.updatePodsScore(goodPods, badPods) | 136 | return db.Pod.updatePodsScore(goodPods, badPods) |
137 | |||
138 | this.afterRequestsHook() | ||
139 | }) | 137 | }) |
140 | }) | 138 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) |
141 | } | 139 | } |
142 | 140 | ||
143 | protected afterRequestHook () { | 141 | protected afterRequestHook () { |