aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/request
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/request')
-rw-r--r--server/lib/request/abstract-request-scheduler.ts121
-rw-r--r--server/lib/request/request-scheduler.ts16
-rw-r--r--server/lib/request/request-video-event-scheduler.ts20
-rw-r--r--server/lib/request/request-video-qadu-scheduler.ts34
4 files changed, 96 insertions, 95 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts
index ce4e2ffd2..f46682824 100644
--- a/server/lib/request/abstract-request-scheduler.ts
+++ b/server/lib/request/abstract-request-scheduler.ts
@@ -1,5 +1,5 @@
1import { isEmpty } from 'lodash' 1import { isEmpty } from 'lodash'
2import * as Promise from 'bluebird' 2import * as Bluebird from 'bluebird'
3 3
4import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
5import { logger, makeSecureRequest } from '../../helpers' 5import { logger, makeSecureRequest } from '../../helpers'
@@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> {
76 // --------------------------------------------------------------------------- 76 // ---------------------------------------------------------------------------
77 77
78 // Make a requests to friends of a certain type 78 // Make a requests to friends of a certain type
79 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { 79 protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
80 const params = { 80 const params = {
81 toPod: toPod, 81 toPod: toPod,
82 method: 'POST' as 'POST', 82 method: 'POST' as 'POST',
@@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> {
86 86
87 // Make multiple retry requests to all of pods 87 // Make multiple retry requests to all of pods
88 // The function fire some useful callbacks 88 // The function fire some useful callbacks
89 return makeSecureRequest(params) 89 try {
90 .then(({ response, body }) => { 90 const { response } = await makeSecureRequest(params)
91 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { 91 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
92 throw new Error('Status code not 20x : ' + response.statusCode) 92 throw new Error('Status code not 20x : ' + response.statusCode)
93 } 93 }
94 }) 94 } catch (err) {
95 .catch(err => { 95 logger.error('Error sending secure request to %s pod.', toPod.host, err)
96 logger.error('Error sending secure request to %s pod.', toPod.host, err) 96
97 97 throw err
98 throw err 98 }
99 })
100 } 99 }
101 100
102 // Make all the requests of the scheduler 101 // Make all the requests of the scheduler
103 protected makeRequests () { 102 protected async makeRequests () {
104 return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) 103 let requestsGrouped: T
105 .then((requestsGrouped: T) => { 104
106 // We want to group requests by destinations pod and endpoint 105 try {
107 const requestsToMake = this.buildRequestsObjects(requestsGrouped) 106 requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
108 107 } catch (err) {
109 // If there are no requests, abort 108 logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })
110 if (isEmpty(requestsToMake) === true) { 109 throw err
111 logger.info('No "%s" to make.', this.description) 110 }
112 return { goodPods: [], badPods: [] } 111
113 } 112 // We want to group requests by destinations pod and endpoint
114 113 const requestsToMake = this.buildRequestsObjects(requestsGrouped)
115 logger.info('Making "%s" to friends.', this.description) 114
116 115 // If there are no requests, abort
117 const goodPods: number[] = [] 116 if (isEmpty(requestsToMake) === true) {
118 const badPods: number[] = [] 117 logger.info('No "%s" to make.', this.description)
119 118 return { goodPods: [], badPods: [] }
120 return Promise.map(Object.keys(requestsToMake), hashKey => { 119 }
121 const requestToMake = requestsToMake[hashKey] 120
122 const toPod: PodInstance = requestToMake.toPod 121 logger.info('Making "%s" to friends.', this.description)
123 122
124 return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) 123 const goodPods: number[] = []
125 .then(() => { 124 const badPods: number[] = []
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) 125
127 goodPods.push(requestToMake.toPod.id) 126 await Bluebird.map(Object.keys(requestsToMake), async hashKey => {
128 127 const requestToMake = requestsToMake[hashKey]
129 this.afterRequestHook() 128 const toPod: PodInstance = requestToMake.toPod
130 129
131 // Remove the pod id of these request ids 130 try {
132 return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) 131 await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
133 }) 132 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
134 .catch(err => { 133 goodPods.push(requestToMake.toPod.id)
135 badPods.push(requestToMake.toPod.id) 134
136 logger.info('Cannot make request to %s.', toPod.host, err) 135 this.afterRequestHook()
137 }) 136
138 }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) 137 // Remove the pod id of these request ids
139 }) 138 await this.getRequestToPodModel()
140 .then(({ goodPods, badPods }) => { 139 .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
141 this.afterRequestsHook() 140 } catch (err) {
142 141 badPods.push(requestToMake.toPod.id)
143 // All the requests were made, we update the pods score 142 logger.info('Cannot make request to %s.', toPod.host, err)
144 return db.Pod.updatePodsScore(goodPods, badPods) 143 }
145 }) 144 }, { concurrency: REQUESTS_IN_PARALLEL })
146 .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) 145
146 this.afterRequestsHook()
147
148 // All the requests were made, we update the pods score
149 await db.Pod.updatePodsScore(goodPods, badPods)
147 } 150 }
148 151
149 protected afterRequestHook () { 152 protected afterRequestHook () {
150 // Nothing to do, let children reimplement it 153 // Nothing to do, let children re-implement it
151 } 154 }
152 155
153 protected afterRequestsHook () { 156 protected afterRequestsHook () {
154 // Nothing to do, let children reimplement it 157 // Nothing to do, let children re-implement it
155 } 158 }
156} 159}
157 160
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts
index 696875dcf..c3f7f6429 100644
--- a/server/lib/request/request-scheduler.ts
+++ b/server/lib/request/request-scheduler.ts
@@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
37 buildRequestsObjects (requestsGrouped: RequestsGrouped) { 37 buildRequestsObjects (requestsGrouped: RequestsGrouped) {
38 const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} 38 const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {}
39 39
40 Object.keys(requestsGrouped).forEach(toPodId => { 40 for (const toPodId of Object.keys(requestsGrouped)) {
41 requestsGrouped[toPodId].forEach(data => { 41 for (const data of requestsGrouped[toPodId]) {
42 const request = data.request 42 const request = data.request
43 const pod = data.pod 43 const pod = data.pod
44 const hashKey = toPodId + request.endpoint 44 const hashKey = toPodId + request.endpoint
@@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
54 54
55 requestsToMakeGrouped[hashKey].ids.push(request.id) 55 requestsToMakeGrouped[hashKey].ids.push(request.id)
56 requestsToMakeGrouped[hashKey].datas.push(request.request) 56 requestsToMakeGrouped[hashKey].datas.push(request.request)
57 }) 57 }
58 }) 58 }
59 59
60 return requestsToMakeGrouped 60 return requestsToMakeGrouped
61 } 61 }
62 62
63 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { 63 async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
64 // If there are no destination pods abort 64 // If there are no destination pods abort
65 if (toIds.length === 0) return undefined 65 if (toIds.length === 0) return undefined
66 66
@@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
76 transaction 76 transaction
77 } 77 }
78 78
79 return db.Request.create(createQuery, dbRequestOptions) 79 const request = await db.Request.create(createQuery, dbRequestOptions)
80 .then(request => { 80 await request.setPods(toIds, dbRequestOptions)
81 return request.setPods(toIds, dbRequestOptions)
82 })
83 } 81 }
84 82
85 // --------------------------------------------------------------------------- 83 // ---------------------------------------------------------------------------
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts
index 680232732..5f21287f0 100644
--- a/server/lib/request/request-video-event-scheduler.ts
+++ b/server/lib/request/request-video-event-scheduler.ts
@@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
59 59
60 // We group video events per video and per pod 60 // We group video events per video and per pod
61 // We add the counts of the same event types 61 // We add the counts of the same event types
62 Object.keys(eventRequests).forEach(toPodId => { 62 for (const toPodId of Object.keys(eventRequests)) {
63 eventRequests[toPodId].forEach(eventToProcess => { 63 for (const eventToProcess of eventRequests[toPodId]) {
64 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} 64 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
65 65
66 if (!requestsToMakeGrouped[toPodId]) { 66 if (!requestsToMakeGrouped[toPodId]) {
@@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
81 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 81 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0
82 82
83 events[eventToProcess.type] += eventToProcess.count 83 events[eventToProcess.type] += eventToProcess.count
84 }) 84 }
85 }) 85 }
86 86
87 // Now we build our requests array per pod 87 // Now we build our requests array per pod
88 Object.keys(eventsPerVideoPerPod).forEach(toPodId => { 88 for (const toPodId of Object.keys(eventsPerVideoPerPod)) {
89 const eventsForPod = eventsPerVideoPerPod[toPodId] 89 const eventsForPod = eventsPerVideoPerPod[toPodId]
90 90
91 Object.keys(eventsForPod).forEach(uuid => { 91 for (const uuid of Object.keys(eventsForPod)) {
92 const eventsForVideo = eventsForPod[uuid] 92 const eventsForVideo = eventsForPod[uuid]
93 93
94 Object.keys(eventsForVideo).forEach(eventType => { 94 for (const eventType of Object.keys(eventsForVideo)) {
95 requestsToMakeGrouped[toPodId].datas.push({ 95 requestsToMakeGrouped[toPodId].datas.push({
96 data: { 96 data: {
97 uuid, 97 uuid,
@@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
99 count: +eventsForVideo[eventType] 99 count: +eventsForVideo[eventType]
100 } 100 }
101 }) 101 })
102 }) 102 }
103 }) 103 }
104 }) 104 }
105 105
106 return requestsToMakeGrouped 106 return requestsToMakeGrouped
107 } 107 }
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts
index d7e1d5e31..a54efc111 100644
--- a/server/lib/request/request-video-qadu-scheduler.ts
+++ b/server/lib/request/request-video-qadu-scheduler.ts
@@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
59 buildRequestsObjects (requests: RequestsVideoQaduGrouped) { 59 buildRequestsObjects (requests: RequestsVideoQaduGrouped) {
60 const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} 60 const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {}
61 61
62 Object.keys(requests).forEach(toPodId => { 62 for (const toPodId of Object.keys(requests)) {
63 requests[toPodId].forEach(data => { 63 for (const data of requests[toPodId]) {
64 const request = data.request 64 const request = data.request
65 const video = data.video 65 const video = data.video
66 const pod = data.pod 66 const pod = data.pod
@@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
105 // Maybe there are multiple quick and dirty update for the same video 105 // Maybe there are multiple quick and dirty update for the same video
106 // We use this hash map to dedupe them 106 // We use this hash map to dedupe them
107 requestsToMakeGrouped[hashKey].videos[video.id] = videoData 107 requestsToMakeGrouped[hashKey].videos[video.id] = videoData
108 }) 108 }
109 }) 109 }
110 110
111 // Now we deduped similar quick and dirty updates, we can build our requests data 111 // Now we deduped similar quick and dirty updates, we can build our requests data
112 Object.keys(requestsToMakeGrouped).forEach(hashKey => { 112 for (const hashKey of Object.keys(requestsToMakeGrouped)) {
113 Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { 113 for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) {
114 const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] 114 const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID]
115 115
116 requestsToMakeGrouped[hashKey].datas.push({ 116 requestsToMakeGrouped[hashKey].datas.push({
117 data: videoData 117 data: videoData
118 }) 118 })
119 }) 119 }
120 120
121 // We don't need it anymore, it was just to build our data array 121 // We don't need it anymore, it was just to build our data array
122 delete requestsToMakeGrouped[hashKey].videos 122 delete requestsToMakeGrouped[hashKey].videos
123 }) 123 }
124 124
125 return requestsToMakeGrouped 125 return requestsToMakeGrouped
126 } 126 }
127 127
128 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { 128 async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
129 const dbRequestOptions: Sequelize.BulkCreateOptions = {} 129 const dbRequestOptions: Sequelize.BulkCreateOptions = {}
130 if (transaction) dbRequestOptions.transaction = transaction 130 if (transaction) dbRequestOptions.transaction = transaction
131 131
132 // Send the update to all our friends 132 // Send the update to all our friends
133 return db.Pod.listAllIds(transaction).then(podIds => { 133 const podIds = await db.Pod.listAllIds(transaction)
134 const queries = [] 134 const queries = []
135 podIds.forEach(podId => { 135 for (const podId of podIds) {
136 queries.push({ type, videoId, podId }) 136 queries.push({ type, videoId, podId })
137 }) 137 }
138 138
139 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) 139 await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
140 }) 140 return undefined
141 } 141 }
142} 142}
143 143