diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-07-05 13:26:25 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-07-05 14:14:16 +0200 |
commit | 6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch) | |
tree | 3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib/request | |
parent | 5fe7e898316e18369c3e1aba307b55077adc7bfb (diff) | |
download | PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip |
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib/request')
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 124 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.ts | 32 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.ts | 13 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.ts | 13 |
4 files changed, 88 insertions, 94 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index e81ab9c36..dd77fddb7 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -1,15 +1,16 @@ | |||
1 | import * as eachLimit from 'async/eachLimit' | 1 | import { isEmpty } from 'lodash' |
2 | import * as Promise from 'bluebird' | ||
2 | 3 | ||
3 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
4 | import { logger, makeSecureRequest } from '../../helpers' | 5 | import { logger, makeSecureRequest } from '../../helpers' |
5 | import { PodInstance } from '../../models' | 6 | import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' |
6 | import { | 7 | import { |
7 | API_VERSION, | 8 | API_VERSION, |
8 | REQUESTS_IN_PARALLEL, | 9 | REQUESTS_IN_PARALLEL, |
9 | REQUESTS_INTERVAL | 10 | REQUESTS_INTERVAL |
10 | } from '../../initializers' | 11 | } from '../../initializers' |
11 | 12 | ||
12 | abstract class AbstractRequestScheduler { | 13 | abstract class AbstractRequestScheduler <T> { |
13 | requestInterval: number | 14 | requestInterval: number |
14 | limitPods: number | 15 | limitPods: number |
15 | limitPerPod: number | 16 | limitPerPod: number |
@@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler { | |||
24 | this.requestInterval = REQUESTS_INTERVAL | 25 | this.requestInterval = REQUESTS_INTERVAL |
25 | } | 26 | } |
26 | 27 | ||
27 | abstract getRequestModel () | 28 | abstract getRequestModel (): AbstractRequestClass<T> |
28 | abstract getRequestToPodModel () | 29 | abstract getRequestToPodModel (): AbstractRequestToPodClass |
29 | abstract buildRequestObjects (requests: any) | 30 | abstract buildRequestObjects (requestsGrouped: T): {} |
30 | 31 | ||
31 | activate () { | 32 | activate () { |
32 | logger.info('Requests scheduler activated.') | 33 | logger.info('Requests scheduler activated.') |
@@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler { | |||
55 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | 56 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) |
56 | } | 57 | } |
57 | 58 | ||
58 | remainingRequestsCount (callback: (err: Error, total: number) => void) { | 59 | remainingRequestsCount () { |
59 | return this.getRequestModel().countTotalRequests(callback) | 60 | return this.getRequestModel().countTotalRequests() |
60 | } | 61 | } |
61 | 62 | ||
62 | flush (callback: (err: Error) => void) { | 63 | flush () { |
63 | this.getRequestModel().removeAll(callback) | 64 | return this.getRequestModel().removeAll() |
64 | } | 65 | } |
65 | 66 | ||
66 | // --------------------------------------------------------------------------- | 67 | // --------------------------------------------------------------------------- |
67 | 68 | ||
68 | // Make a requests to friends of a certain type | 69 | // Make a requests to friends of a certain type |
69 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { | 70 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) { |
70 | if (!callback) callback = function () { /* empty */ } | ||
71 | |||
72 | const params = { | 71 | const params = { |
73 | toPod: toPod, | 72 | toPod: toPod, |
74 | sign: true, // Prove our identity | 73 | sign: true, // Prove our identity |
@@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler { | |||
79 | 78 | ||
80 | // Make multiple retry requests to all of pods | 79 | // Make multiple retry requests to all of pods |
81 | // The function fire some useful callbacks | 80 | // The function fire some useful callbacks |
82 | makeSecureRequest(params, (err, res) => { | 81 | return makeSecureRequest(params) |
83 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | 82 | .then(({ response, body }) => { |
84 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | 83 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { |
84 | throw new Error('Status code not 20x : ' + response.statusCode) | ||
85 | } | ||
86 | }) | ||
87 | .catch(err => { | ||
85 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | 88 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) |
86 | 89 | ||
87 | return callback(err) | 90 | throw err |
88 | } | 91 | }) |
89 | |||
90 | return callback(null) | ||
91 | }) | ||
92 | } | 92 | } |
93 | 93 | ||
94 | // Make all the requests of the scheduler | 94 | // Make all the requests of the scheduler |
95 | protected makeRequests () { | 95 | protected makeRequests () { |
96 | this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { | 96 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) |
97 | if (err) { | 97 | .then((requestsGrouped: T) => { |
98 | logger.error('Cannot get the list of "%s".', this.description, { err: err }) | 98 | // We want to group requests by destinations pod and endpoint |
99 | return // Abort | 99 | const requestsToMake = this.buildRequestObjects(requestsGrouped) |
100 | } | 100 | |
101 | 101 | // If there are no requests, abort | |
102 | // If there are no requests, abort | 102 | if (isEmpty(requestsToMake) === true) { |
103 | if (requests.length === 0) { | 103 | logger.info('No "%s" to make.', this.description) |
104 | logger.info('No "%s" to make.', this.description) | 104 | return { goodPods: [], badPods: [] } |
105 | return | 105 | } |
106 | } | 106 | |
107 | 107 | logger.info('Making "%s" to friends.', this.description) | |
108 | // We want to group requests by destinations pod and endpoint | 108 | |
109 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | 109 | const goodPods = [] |
110 | 110 | const badPods = [] | |
111 | logger.info('Making "%s" to friends.', this.description) | 111 | |
112 | 112 | return Promise.map(Object.keys(requestsToMake), hashKey => { | |
113 | const goodPods = [] | 113 | const requestToMake = requestsToMake[hashKey] |
114 | const badPods = [] | 114 | const toPod: PodInstance = requestToMake.toPod |
115 | 115 | ||
116 | eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | 116 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) |
117 | const requestToMake = requestsToMakeGrouped[hashKey] | 117 | .then(() => { |
118 | const toPod = requestToMake.toPod | 118 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
119 | 119 | goodPods.push(requestToMake.toPod.id) | |
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { | 120 | |
121 | if (err) { | 121 | this.afterRequestHook() |
122 | badPods.push(requestToMake.toPod.id) | 122 | |
123 | return callbackEach() | 123 | // Remove the pod id of these request ids |
124 | } | 124 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) |
125 | 125 | }) | |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | 126 | .catch(err => { |
127 | goodPods.push(requestToMake.toPod.id) | 127 | badPods.push(requestToMake.toPod.id) |
128 | 128 | logger.info('Cannot make request to %s.', toPod.host, { error: err }) | |
129 | // Remove the pod id of these request ids | 129 | }) |
130 | this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) | 130 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) |
131 | }) | ||
132 | .then(({ goodPods, badPods }) => { | ||
133 | this.afterRequestsHook() | ||
131 | 134 | ||
132 | this.afterRequestHook() | ||
133 | }) | ||
134 | }, () => { | ||
135 | // All the requests were made, we update the pods score | 135 | // All the requests were made, we update the pods score |
136 | db.Pod.updatePodsScore(goodPods, badPods) | 136 | return db.Pod.updatePodsScore(goodPods, badPods) |
137 | |||
138 | this.afterRequestsHook() | ||
139 | }) | 137 | }) |
140 | }) | 138 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) |
141 | } | 139 | } |
142 | 140 | ||
143 | protected afterRequestHook () { | 141 | protected afterRequestHook () { |
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 575e0227c..0dd796fb0 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts | |||
@@ -3,10 +3,8 @@ import * as Sequelize from 'sequelize' | |||
3 | import { database as db } from '../../initializers/database' | 3 | import { database as db } from '../../initializers/database' |
4 | import { AbstractRequestScheduler } from './abstract-request-scheduler' | 4 | import { AbstractRequestScheduler } from './abstract-request-scheduler' |
5 | import { logger } from '../../helpers' | 5 | import { logger } from '../../helpers' |
6 | import { | 6 | import { REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PER_POD } from '../../initializers' |
7 | REQUESTS_LIMIT_PODS, | 7 | import { RequestsGrouped } from '../../models' |
8 | REQUESTS_LIMIT_PER_POD | ||
9 | } from '../../initializers' | ||
10 | import { RequestEndpoint } from '../../../shared' | 8 | import { RequestEndpoint } from '../../../shared' |
11 | 9 | ||
12 | export type RequestSchedulerOptions = { | 10 | export type RequestSchedulerOptions = { |
@@ -17,7 +15,7 @@ export type RequestSchedulerOptions = { | |||
17 | transaction: Sequelize.Transaction | 15 | transaction: Sequelize.Transaction |
18 | } | 16 | } |
19 | 17 | ||
20 | class RequestScheduler extends AbstractRequestScheduler { | 18 | class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { |
21 | constructor () { | 19 | constructor () { |
22 | super() | 20 | super() |
23 | 21 | ||
@@ -36,11 +34,11 @@ class RequestScheduler extends AbstractRequestScheduler { | |||
36 | return db.RequestToPod | 34 | return db.RequestToPod |
37 | } | 35 | } |
38 | 36 | ||
39 | buildRequestObjects (requests: { [ toPodId: number ]: any }) { | 37 | buildRequestObjects (requestsGrouped: RequestsGrouped) { |
40 | const requestsToMakeGrouped = {} | 38 | const requestsToMakeGrouped = {} |
41 | 39 | ||
42 | Object.keys(requests).forEach(toPodId => { | 40 | Object.keys(requestsGrouped).forEach(toPodId => { |
43 | requests[toPodId].forEach(data => { | 41 | requestsGrouped[toPodId].forEach(data => { |
44 | const request = data.request | 42 | const request = data.request |
45 | const pod = data.pod | 43 | const pod = data.pod |
46 | const hashKey = toPodId + request.endpoint | 44 | const hashKey = toPodId + request.endpoint |
@@ -62,12 +60,12 @@ class RequestScheduler extends AbstractRequestScheduler { | |||
62 | return requestsToMakeGrouped | 60 | return requestsToMakeGrouped |
63 | } | 61 | } |
64 | 62 | ||
65 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions, callback: (err: Error) => void) { | 63 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { |
66 | // TODO: check the setPods works | 64 | // TODO: check the setPods works |
67 | const podIds = [] | 65 | const podIds = [] |
68 | 66 | ||
69 | // If there are no destination pods abort | 67 | // If there are no destination pods abort |
70 | if (toIds.length === 0) return callback(null) | 68 | if (toIds.length === 0) return undefined |
71 | 69 | ||
72 | toIds.forEach(toPod => { | 70 | toIds.forEach(toPod => { |
73 | podIds.push(toPod) | 71 | podIds.push(toPod) |
@@ -85,20 +83,18 @@ class RequestScheduler extends AbstractRequestScheduler { | |||
85 | transaction | 83 | transaction |
86 | } | 84 | } |
87 | 85 | ||
88 | return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { | 86 | return db.Request.create(createQuery, dbRequestOptions) |
89 | if (err) return callback(err) | 87 | .then(request => { |
90 | 88 | return request.setPods(podIds, dbRequestOptions) | |
91 | return request.setPods(podIds, dbRequestOptions).asCallback(callback) | 89 | }) |
92 | }) | ||
93 | } | 90 | } |
94 | 91 | ||
95 | // --------------------------------------------------------------------------- | 92 | // --------------------------------------------------------------------------- |
96 | 93 | ||
97 | afterRequestsHook () { | 94 | afterRequestsHook () { |
98 | // Flush requests with no pod | 95 | // Flush requests with no pod |
99 | this.getRequestModel().removeWithEmptyTo(err => { | 96 | this.getRequestModel().removeWithEmptyTo() |
100 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | 97 | .catch(err => logger.error('Error when removing requests with no pods.', { error: err })) |
101 | }) | ||
102 | } | 98 | } |
103 | } | 99 | } |
104 | 100 | ||
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 4bb76f4c9..d4d714c02 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts | |||
@@ -7,6 +7,7 @@ import { | |||
7 | REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, | 7 | REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, |
8 | REQUEST_VIDEO_EVENT_ENDPOINT | 8 | REQUEST_VIDEO_EVENT_ENDPOINT |
9 | } from '../../initializers' | 9 | } from '../../initializers' |
10 | import { RequestsVideoEventGrouped } from '../../models' | ||
10 | import { RequestVideoEventType } from '../../../shared' | 11 | import { RequestVideoEventType } from '../../../shared' |
11 | 12 | ||
12 | export type RequestVideoEventSchedulerOptions = { | 13 | export type RequestVideoEventSchedulerOptions = { |
@@ -16,7 +17,7 @@ export type RequestVideoEventSchedulerOptions = { | |||
16 | transaction?: Sequelize.Transaction | 17 | transaction?: Sequelize.Transaction |
17 | } | 18 | } |
18 | 19 | ||
19 | class RequestVideoEventScheduler extends AbstractRequestScheduler { | 20 | class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoEventGrouped> { |
20 | constructor () { | 21 | constructor () { |
21 | super() | 22 | super() |
22 | 23 | ||
@@ -35,7 +36,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
35 | return db.RequestVideoEvent | 36 | return db.RequestVideoEvent |
36 | } | 37 | } |
37 | 38 | ||
38 | buildRequestObjects (eventsToProcess: { [ toPodId: number ]: any }[]) { | 39 | buildRequestObjects (eventRequests: RequestsVideoEventGrouped) { |
39 | const requestsToMakeGrouped = {} | 40 | const requestsToMakeGrouped = {} |
40 | 41 | ||
41 | /* Example: | 42 | /* Example: |
@@ -50,8 +51,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
50 | 51 | ||
51 | // We group video events per video and per pod | 52 | // We group video events per video and per pod |
52 | // We add the counts of the same event types | 53 | // We add the counts of the same event types |
53 | Object.keys(eventsToProcess).forEach(toPodId => { | 54 | Object.keys(eventRequests).forEach(toPodId => { |
54 | eventsToProcess[toPodId].forEach(eventToProcess => { | 55 | eventRequests[toPodId].forEach(eventToProcess => { |
55 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | 56 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} |
56 | 57 | ||
57 | if (!requestsToMakeGrouped[toPodId]) { | 58 | if (!requestsToMakeGrouped[toPodId]) { |
@@ -97,7 +98,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
97 | return requestsToMakeGrouped | 98 | return requestsToMakeGrouped |
98 | } | 99 | } |
99 | 100 | ||
100 | createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { | 101 | createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions) { |
101 | if (count === undefined) count = 1 | 102 | if (count === undefined) count = 1 |
102 | 103 | ||
103 | const dbRequestOptions: Sequelize.CreateOptions = {} | 104 | const dbRequestOptions: Sequelize.CreateOptions = {} |
@@ -109,7 +110,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
109 | videoId | 110 | videoId |
110 | } | 111 | } |
111 | 112 | ||
112 | return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) | 113 | return db.RequestVideoEvent.create(createQuery, dbRequestOptions) |
113 | } | 114 | } |
114 | } | 115 | } |
115 | 116 | ||
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index d7169cc81..5ec7de9c2 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts | |||
@@ -9,6 +9,7 @@ import { | |||
9 | REQUEST_VIDEO_QADU_ENDPOINT, | 9 | REQUEST_VIDEO_QADU_ENDPOINT, |
10 | REQUEST_VIDEO_QADU_TYPES | 10 | REQUEST_VIDEO_QADU_TYPES |
11 | } from '../../initializers' | 11 | } from '../../initializers' |
12 | import { RequestsVideoQaduGrouped } from '../../models' | ||
12 | import { RequestVideoQaduType } from '../../../shared' | 13 | import { RequestVideoQaduType } from '../../../shared' |
13 | 14 | ||
14 | export type RequestVideoQaduSchedulerOptions = { | 15 | export type RequestVideoQaduSchedulerOptions = { |
@@ -17,7 +18,7 @@ export type RequestVideoQaduSchedulerOptions = { | |||
17 | transaction?: Sequelize.Transaction | 18 | transaction?: Sequelize.Transaction |
18 | } | 19 | } |
19 | 20 | ||
20 | class RequestVideoQaduScheduler extends AbstractRequestScheduler { | 21 | class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQaduGrouped> { |
21 | constructor () { | 22 | constructor () { |
22 | super() | 23 | super() |
23 | 24 | ||
@@ -36,7 +37,7 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { | |||
36 | return db.RequestVideoQadu | 37 | return db.RequestVideoQadu |
37 | } | 38 | } |
38 | 39 | ||
39 | buildRequestObjects (requests: { [ toPodId: number ]: any }[]) { | 40 | buildRequestObjects (requests: RequestsVideoQaduGrouped) { |
40 | const requestsToMakeGrouped = {} | 41 | const requestsToMakeGrouped = {} |
41 | 42 | ||
42 | Object.keys(requests).forEach(toPodId => { | 43 | Object.keys(requests).forEach(toPodId => { |
@@ -105,20 +106,18 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { | |||
105 | return requestsToMakeGrouped | 106 | return requestsToMakeGrouped |
106 | } | 107 | } |
107 | 108 | ||
108 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { | 109 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { |
109 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} | 110 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} |
110 | if (transaction) dbRequestOptions.transaction = transaction | 111 | if (transaction) dbRequestOptions.transaction = transaction |
111 | 112 | ||
112 | // Send the update to all our friends | 113 | // Send the update to all our friends |
113 | db.Pod.listAllIds(transaction, function (err, podIds) { | 114 | return db.Pod.listAllIds(transaction).then(podIds => { |
114 | if (err) return callback(err) | ||
115 | |||
116 | const queries = [] | 115 | const queries = [] |
117 | podIds.forEach(podId => { | 116 | podIds.forEach(podId => { |
118 | queries.push({ type, videoId, podId }) | 117 | queries.push({ type, videoId, podId }) |
119 | }) | 118 | }) |
120 | 119 | ||
121 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) | 120 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) |
122 | }) | 121 | }) |
123 | } | 122 | } |
124 | } | 123 | } |