diff options
Diffstat (limited to 'server/lib/request')
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 121 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.ts | 16 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.ts | 20 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.ts | 34 |
4 files changed, 96 insertions, 95 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index ce4e2ffd2..f46682824 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import { isEmpty } from 'lodash' | 1 | import { isEmpty } from 'lodash' |
2 | import * as Promise from 'bluebird' | 2 | import * as Bluebird from 'bluebird' |
3 | 3 | ||
4 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
5 | import { logger, makeSecureRequest } from '../../helpers' | 5 | import { logger, makeSecureRequest } from '../../helpers' |
@@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> { | |||
76 | // --------------------------------------------------------------------------- | 76 | // --------------------------------------------------------------------------- |
77 | 77 | ||
78 | // Make a requests to friends of a certain type | 78 | // Make a requests to friends of a certain type |
79 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { | 79 | protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { |
80 | const params = { | 80 | const params = { |
81 | toPod: toPod, | 81 | toPod: toPod, |
82 | method: 'POST' as 'POST', | 82 | method: 'POST' as 'POST', |
@@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> { | |||
86 | 86 | ||
87 | // Make multiple retry requests to all of pods | 87 | // Make multiple retry requests to all of pods |
88 | // The function fire some useful callbacks | 88 | // The function fire some useful callbacks |
89 | return makeSecureRequest(params) | 89 | try { |
90 | .then(({ response, body }) => { | 90 | const { response } = await makeSecureRequest(params) |
91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { | 91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { |
92 | throw new Error('Status code not 20x : ' + response.statusCode) | 92 | throw new Error('Status code not 20x : ' + response.statusCode) |
93 | } | 93 | } |
94 | }) | 94 | } catch (err) { |
95 | .catch(err => { | 95 | logger.error('Error sending secure request to %s pod.', toPod.host, err) |
96 | logger.error('Error sending secure request to %s pod.', toPod.host, err) | 96 | |
97 | 97 | throw err | |
98 | throw err | 98 | } |
99 | }) | ||
100 | } | 99 | } |
101 | 100 | ||
102 | // Make all the requests of the scheduler | 101 | // Make all the requests of the scheduler |
103 | protected makeRequests () { | 102 | protected async makeRequests () { |
104 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) | 103 | let requestsGrouped: T |
105 | .then((requestsGrouped: T) => { | 104 | |
106 | // We want to group requests by destinations pod and endpoint | 105 | try { |
107 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | 106 | requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) |
108 | 107 | } catch (err) { | |
109 | // If there are no requests, abort | 108 | logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }) |
110 | if (isEmpty(requestsToMake) === true) { | 109 | throw err |
111 | logger.info('No "%s" to make.', this.description) | 110 | } |
112 | return { goodPods: [], badPods: [] } | 111 | |
113 | } | 112 | // We want to group requests by destinations pod and endpoint |
114 | 113 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | |
115 | logger.info('Making "%s" to friends.', this.description) | 114 | |
116 | 115 | // If there are no requests, abort | |
117 | const goodPods: number[] = [] | 116 | if (isEmpty(requestsToMake) === true) { |
118 | const badPods: number[] = [] | 117 | logger.info('No "%s" to make.', this.description) |
119 | 118 | return { goodPods: [], badPods: [] } | |
120 | return Promise.map(Object.keys(requestsToMake), hashKey => { | 119 | } |
121 | const requestToMake = requestsToMake[hashKey] | 120 | |
122 | const toPod: PodInstance = requestToMake.toPod | 121 | logger.info('Making "%s" to friends.', this.description) |
123 | 122 | ||
124 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) | 123 | const goodPods: number[] = [] |
125 | .then(() => { | 124 | const badPods: number[] = [] |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | 125 | |
127 | goodPods.push(requestToMake.toPod.id) | 126 | await Bluebird.map(Object.keys(requestsToMake), async hashKey => { |
128 | 127 | const requestToMake = requestsToMake[hashKey] | |
129 | this.afterRequestHook() | 128 | const toPod: PodInstance = requestToMake.toPod |
130 | 129 | ||
131 | // Remove the pod id of these request ids | 130 | try { |
132 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) | 131 | await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) |
133 | }) | 132 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
134 | .catch(err => { | 133 | goodPods.push(requestToMake.toPod.id) |
135 | badPods.push(requestToMake.toPod.id) | 134 | |
136 | logger.info('Cannot make request to %s.', toPod.host, err) | 135 | this.afterRequestHook() |
137 | }) | 136 | |
138 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) | 137 | // Remove the pod id of these request ids |
139 | }) | 138 | await this.getRequestToPodModel() |
140 | .then(({ goodPods, badPods }) => { | 139 | .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) |
141 | this.afterRequestsHook() | 140 | } catch (err) { |
142 | 141 | badPods.push(requestToMake.toPod.id) | |
143 | // All the requests were made, we update the pods score | 142 | logger.info('Cannot make request to %s.', toPod.host, err) |
144 | return db.Pod.updatePodsScore(goodPods, badPods) | 143 | } |
145 | }) | 144 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
146 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) | 145 | |
146 | this.afterRequestsHook() | ||
147 | |||
148 | // All the requests were made, we update the pods score | ||
149 | await db.Pod.updatePodsScore(goodPods, badPods) | ||
147 | } | 150 | } |
148 | 151 | ||
149 | protected afterRequestHook () { | 152 | protected afterRequestHook () { |
150 | // Nothing to do, let children reimplement it | 153 | // Nothing to do, let children re-implement it |
151 | } | 154 | } |
152 | 155 | ||
153 | protected afterRequestsHook () { | 156 | protected afterRequestsHook () { |
154 | // Nothing to do, let children reimplement it | 157 | // Nothing to do, let children re-implement it |
155 | } | 158 | } |
156 | } | 159 | } |
157 | 160 | ||
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 696875dcf..c3f7f6429 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts | |||
@@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { | 37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { |
38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} | 38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} |
39 | 39 | ||
40 | Object.keys(requestsGrouped).forEach(toPodId => { | 40 | for (const toPodId of Object.keys(requestsGrouped)) { |
41 | requestsGrouped[toPodId].forEach(data => { | 41 | for (const data of requestsGrouped[toPodId]) { |
42 | const request = data.request | 42 | const request = data.request |
43 | const pod = data.pod | 43 | const pod = data.pod |
44 | const hashKey = toPodId + request.endpoint | 44 | const hashKey = toPodId + request.endpoint |
@@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
54 | 54 | ||
55 | requestsToMakeGrouped[hashKey].ids.push(request.id) | 55 | requestsToMakeGrouped[hashKey].ids.push(request.id) |
56 | requestsToMakeGrouped[hashKey].datas.push(request.request) | 56 | requestsToMakeGrouped[hashKey].datas.push(request.request) |
57 | }) | 57 | } |
58 | }) | 58 | } |
59 | 59 | ||
60 | return requestsToMakeGrouped | 60 | return requestsToMakeGrouped |
61 | } | 61 | } |
62 | 62 | ||
63 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { | 63 | async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { |
64 | // If there are no destination pods abort | 64 | // If there are no destination pods abort |
65 | if (toIds.length === 0) return undefined | 65 | if (toIds.length === 0) return undefined |
66 | 66 | ||
@@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
76 | transaction | 76 | transaction |
77 | } | 77 | } |
78 | 78 | ||
79 | return db.Request.create(createQuery, dbRequestOptions) | 79 | const request = await db.Request.create(createQuery, dbRequestOptions) |
80 | .then(request => { | 80 | await request.setPods(toIds, dbRequestOptions) |
81 | return request.setPods(toIds, dbRequestOptions) | ||
82 | }) | ||
83 | } | 81 | } |
84 | 82 | ||
85 | // --------------------------------------------------------------------------- | 83 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 680232732..5f21287f0 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts | |||
@@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
59 | 59 | ||
60 | // We group video events per video and per pod | 60 | // We group video events per video and per pod |
61 | // We add the counts of the same event types | 61 | // We add the counts of the same event types |
62 | Object.keys(eventRequests).forEach(toPodId => { | 62 | for (const toPodId of Object.keys(eventRequests)) { |
63 | eventRequests[toPodId].forEach(eventToProcess => { | 63 | for (const eventToProcess of eventRequests[toPodId]) { |
64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | 64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} |
65 | 65 | ||
66 | if (!requestsToMakeGrouped[toPodId]) { | 66 | if (!requestsToMakeGrouped[toPodId]) { |
@@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 | 81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 |
82 | 82 | ||
83 | events[eventToProcess.type] += eventToProcess.count | 83 | events[eventToProcess.type] += eventToProcess.count |
84 | }) | 84 | } |
85 | }) | 85 | } |
86 | 86 | ||
87 | // Now we build our requests array per pod | 87 | // Now we build our requests array per pod |
88 | Object.keys(eventsPerVideoPerPod).forEach(toPodId => { | 88 | for (const toPodId of Object.keys(eventsPerVideoPerPod)) { |
89 | const eventsForPod = eventsPerVideoPerPod[toPodId] | 89 | const eventsForPod = eventsPerVideoPerPod[toPodId] |
90 | 90 | ||
91 | Object.keys(eventsForPod).forEach(uuid => { | 91 | for (const uuid of Object.keys(eventsForPod)) { |
92 | const eventsForVideo = eventsForPod[uuid] | 92 | const eventsForVideo = eventsForPod[uuid] |
93 | 93 | ||
94 | Object.keys(eventsForVideo).forEach(eventType => { | 94 | for (const eventType of Object.keys(eventsForVideo)) { |
95 | requestsToMakeGrouped[toPodId].datas.push({ | 95 | requestsToMakeGrouped[toPodId].datas.push({ |
96 | data: { | 96 | data: { |
97 | uuid, | 97 | uuid, |
@@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
99 | count: +eventsForVideo[eventType] | 99 | count: +eventsForVideo[eventType] |
100 | } | 100 | } |
101 | }) | 101 | }) |
102 | }) | 102 | } |
103 | }) | 103 | } |
104 | }) | 104 | } |
105 | 105 | ||
106 | return requestsToMakeGrouped | 106 | return requestsToMakeGrouped |
107 | } | 107 | } |
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index d7e1d5e31..a54efc111 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts | |||
@@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa | |||
59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { | 59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { |
60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} | 60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} |
61 | 61 | ||
62 | Object.keys(requests).forEach(toPodId => { | 62 | for (const toPodId of Object.keys(requests)) { |
63 | requests[toPodId].forEach(data => { | 63 | for (const data of requests[toPodId]) { |
64 | const request = data.request | 64 | const request = data.request |
65 | const video = data.video | 65 | const video = data.video |
66 | const pod = data.pod | 66 | const pod = data.pod |
@@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa | |||
105 | // Maybe there are multiple quick and dirty update for the same video | 105 | // Maybe there are multiple quick and dirty update for the same video |
106 | // We use this hash map to dedupe them | 106 | // We use this hash map to dedupe them |
107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData | 107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData |
108 | }) | 108 | } |
109 | }) | 109 | } |
110 | 110 | ||
111 | // Now we deduped similar quick and dirty updates, we can build our requests data | 111 | // Now we deduped similar quick and dirty updates, we can build our requests data |
112 | Object.keys(requestsToMakeGrouped).forEach(hashKey => { | 112 | for (const hashKey of Object.keys(requestsToMakeGrouped)) { |
113 | Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { | 113 | for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) { |
114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] | 114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] |
115 | 115 | ||
116 | requestsToMakeGrouped[hashKey].datas.push({ | 116 | requestsToMakeGrouped[hashKey].datas.push({ |
117 | data: videoData | 117 | data: videoData |
118 | }) | 118 | }) |
119 | }) | 119 | } |
120 | 120 | ||
121 | // We don't need it anymore, it was just to build our data array | 121 | // We don't need it anymore, it was just to build our data array |
122 | delete requestsToMakeGrouped[hashKey].videos | 122 | delete requestsToMakeGrouped[hashKey].videos |
123 | }) | 123 | } |
124 | 124 | ||
125 | return requestsToMakeGrouped | 125 | return requestsToMakeGrouped |
126 | } | 126 | } |
127 | 127 | ||
128 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { | 128 | async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { |
129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} | 129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} |
130 | if (transaction) dbRequestOptions.transaction = transaction | 130 | if (transaction) dbRequestOptions.transaction = transaction |
131 | 131 | ||
132 | // Send the update to all our friends | 132 | // Send the update to all our friends |
133 | return db.Pod.listAllIds(transaction).then(podIds => { | 133 | const podIds = await db.Pod.listAllIds(transaction) |
134 | const queries = [] | 134 | const queries = [] |
135 | podIds.forEach(podId => { | 135 | for (const podId of podIds) { |
136 | queries.push({ type, videoId, podId }) | 136 | queries.push({ type, videoId, podId }) |
137 | }) | 137 | } |
138 | 138 | ||
139 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) | 139 | await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) |
140 | }) | 140 | return undefined |
141 | } | 141 | } |
142 | } | 142 | } |
143 | 143 | ||