diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-07-05 13:26:25 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-07-05 14:14:16 +0200 |
commit | 6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch) | |
tree | 3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib | |
parent | 5fe7e898316e18369c3e1aba307b55077adc7bfb (diff) | |
download | PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip |
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/friends.ts | 318 | ||||
-rw-r--r-- | server/lib/jobs/handlers/index.ts | 8 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-transcoder.ts | 22 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 117 | ||||
-rw-r--r-- | server/lib/oauth-model.ts | 15 | ||||
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 124 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.ts | 32 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.ts | 13 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.ts | 13 |
9 files changed, 293 insertions, 369 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts index 522cb82b3..498144318 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import { each, eachLimit, eachSeries, series, waterfall } from 'async' | ||
2 | import * as request from 'request' | 1 | import * as request from 'request' |
3 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | import * as Promise from 'bluebird' | ||
4 | 4 | ||
5 | import { database as db } from '../initializers/database' | 5 | import { database as db } from '../initializers/database' |
6 | import { | 6 | import { |
@@ -15,8 +15,7 @@ import { | |||
15 | logger, | 15 | logger, |
16 | getMyPublicCert, | 16 | getMyPublicCert, |
17 | makeSecureRequest, | 17 | makeSecureRequest, |
18 | makeRetryRequest, | 18 | makeRetryRequest |
19 | createEmptyCallback | ||
20 | } from '../helpers' | 19 | } from '../helpers' |
21 | import { | 20 | import { |
22 | RequestScheduler, | 21 | RequestScheduler, |
@@ -53,24 +52,24 @@ function activateSchedulers () { | |||
53 | requestVideoEventScheduler.activate() | 52 | requestVideoEventScheduler.activate() |
54 | } | 53 | } |
55 | 54 | ||
56 | function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { | 55 | function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) { |
57 | const options = { | 56 | const options = { |
58 | type: ENDPOINT_ACTIONS.ADD, | 57 | type: ENDPOINT_ACTIONS.ADD, |
59 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 58 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
60 | data: videoData, | 59 | data: videoData, |
61 | transaction | 60 | transaction |
62 | } | 61 | } |
63 | createRequest(options, callback) | 62 | return createRequest(options) |
64 | } | 63 | } |
65 | 64 | ||
66 | function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { | 65 | function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) { |
67 | const options = { | 66 | const options = { |
68 | type: ENDPOINT_ACTIONS.UPDATE, | 67 | type: ENDPOINT_ACTIONS.UPDATE, |
69 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 68 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
70 | data: videoData, | 69 | data: videoData, |
71 | transaction | 70 | transaction |
72 | } | 71 | } |
73 | createRequest(options, callback) | 72 | return createRequest(options) |
74 | } | 73 | } |
75 | 74 | ||
76 | function removeVideoToFriends (videoParams: Object) { | 75 | function removeVideoToFriends (videoParams: Object) { |
@@ -80,121 +79,93 @@ function removeVideoToFriends (videoParams: Object) { | |||
80 | data: videoParams, | 79 | data: videoParams, |
81 | transaction: null | 80 | transaction: null |
82 | } | 81 | } |
83 | createRequest(options) | 82 | return createRequest(options) |
84 | } | 83 | } |
85 | 84 | ||
86 | function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) { | 85 | function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance, transaction: Sequelize.Transaction) { |
87 | const options = { | 86 | const options = { |
88 | type: ENDPOINT_ACTIONS.REPORT_ABUSE, | 87 | type: ENDPOINT_ACTIONS.REPORT_ABUSE, |
89 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 88 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
90 | data: reportData, | 89 | data: reportData, |
91 | toIds: [ video.Author.podId ], | 90 | toIds: [ video.Author.podId ], |
92 | transaction: null | 91 | transaction |
93 | } | 92 | } |
94 | createRequest(options) | 93 | return createRequest(options) |
95 | } | 94 | } |
96 | 95 | ||
97 | function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { | 96 | function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction) { |
98 | const options = { | 97 | const options = { |
99 | videoId: qaduParam.videoId, | 98 | videoId: qaduParam.videoId, |
100 | type: qaduParam.type, | 99 | type: qaduParam.type, |
101 | transaction | 100 | transaction |
102 | } | 101 | } |
103 | return createVideoQaduRequest(options, callback) | 102 | return createVideoQaduRequest(options) |
104 | } | 103 | } |
105 | 104 | ||
106 | function quickAndDirtyUpdatesVideoToFriends ( | 105 | function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction) { |
107 | qadusParams: QaduParam[], | ||
108 | transaction: Sequelize.Transaction, | ||
109 | finalCallback: (err: Error) => void | ||
110 | ) { | ||
111 | const tasks = [] | 106 | const tasks = [] |
112 | 107 | ||
113 | qadusParams.forEach(function (qaduParams) { | 108 | qadusParams.forEach(function (qaduParams) { |
114 | const fun = function (callback) { | 109 | tasks.push(quickAndDirtyUpdateVideoToFriends(qaduParams, transaction)) |
115 | quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback) | ||
116 | } | ||
117 | |||
118 | tasks.push(fun) | ||
119 | }) | 110 | }) |
120 | 111 | ||
121 | series(tasks, finalCallback) | 112 | return Promise.all(tasks) |
122 | } | 113 | } |
123 | 114 | ||
124 | function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { | 115 | function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction) { |
125 | const options = { | 116 | const options = { |
126 | videoId: eventParam.videoId, | 117 | videoId: eventParam.videoId, |
127 | type: eventParam.type, | 118 | type: eventParam.type, |
128 | transaction | 119 | transaction |
129 | } | 120 | } |
130 | createVideoEventRequest(options, callback) | 121 | return createVideoEventRequest(options) |
131 | } | 122 | } |
132 | 123 | ||
133 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { | 124 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { |
134 | const tasks = [] | 125 | const tasks = [] |
135 | 126 | ||
136 | eventsParams.forEach(function (eventParams) { | 127 | eventsParams.forEach(function (eventParams) { |
137 | const fun = function (callback) { | 128 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) |
138 | addEventToRemoteVideo(eventParams, transaction, callback) | ||
139 | } | ||
140 | |||
141 | tasks.push(fun) | ||
142 | }) | 129 | }) |
143 | 130 | ||
144 | series(tasks, finalCallback) | 131 | return Promise.all(tasks) |
145 | } | 132 | } |
146 | 133 | ||
147 | function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) { | 134 | function hasFriends () { |
148 | db.Pod.countAll(function (err, count) { | 135 | return db.Pod.countAll().then(count => count !== 0) |
149 | if (err) return callback(err) | ||
150 | |||
151 | const hasFriends = (count !== 0) | ||
152 | callback(null, hasFriends) | ||
153 | }) | ||
154 | } | 136 | } |
155 | 137 | ||
156 | function makeFriends (hosts: string[], callback: (err: Error) => void) { | 138 | function makeFriends (hosts: string[]) { |
157 | const podsScore = {} | 139 | const podsScore = {} |
158 | 140 | ||
159 | logger.info('Make friends!') | 141 | logger.info('Make friends!') |
160 | getMyPublicCert(function (err, cert) { | 142 | return getMyPublicCert() |
161 | if (err) { | 143 | .then(cert => { |
162 | logger.error('Cannot read public cert.') | 144 | return Promise.mapSeries(hosts, host => { |
163 | return callback(err) | 145 | return computeForeignPodsList(host, podsScore) |
164 | } | 146 | }).then(() => cert) |
165 | 147 | }) | |
166 | eachSeries(hosts, function (host, callbackEach) { | 148 | .then(cert => { |
167 | computeForeignPodsList(host, podsScore, callbackEach) | ||
168 | }, function (err: Error) { | ||
169 | if (err) return callback(err) | ||
170 | |||
171 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | 149 | logger.debug('Pods scores computed.', { podsScore: podsScore }) |
172 | const podsList = computeWinningPods(hosts, podsScore) | 150 | const podsList = computeWinningPods(hosts, podsScore) |
173 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | 151 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) |
174 | 152 | ||
175 | makeRequestsToWinningPods(cert, podsList, callback) | 153 | return makeRequestsToWinningPods(cert, podsList) |
176 | }) | 154 | }) |
177 | }) | ||
178 | } | 155 | } |
179 | 156 | ||
180 | function quitFriends (callback: (err: Error) => void) { | 157 | function quitFriends () { |
181 | // Stop pool requests | 158 | // Stop pool requests |
182 | requestScheduler.deactivate() | 159 | requestScheduler.deactivate() |
183 | 160 | ||
184 | waterfall([ | 161 | return requestScheduler.flush() |
185 | function flushRequests (callbackAsync) { | 162 | .then(() => { |
186 | requestScheduler.flush(err => callbackAsync(err)) | 163 | return requestVideoQaduScheduler.flush() |
187 | }, | 164 | }) |
188 | 165 | .then(() => { | |
189 | function flushVideoQaduRequests (callbackAsync) { | 166 | return db.Pod.list() |
190 | requestVideoQaduScheduler.flush(err => callbackAsync(err)) | 167 | }) |
191 | }, | 168 | .then(pods => { |
192 | |||
193 | function getPodsList (callbackAsync) { | ||
194 | return db.Pod.list(callbackAsync) | ||
195 | }, | ||
196 | |||
197 | function announceIQuitMyFriends (pods, callbackAsync) { | ||
198 | const requestParams = { | 169 | const requestParams = { |
199 | method: 'POST' as 'POST', | 170 | method: 'POST' as 'POST', |
200 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 171 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
@@ -205,61 +176,57 @@ function quitFriends (callback: (err: Error) => void) { | |||
205 | // Announce we quit them | 176 | // Announce we quit them |
206 | // We don't care if the request fails | 177 | // We don't care if the request fails |
207 | // The other pod will exclude us automatically after a while | 178 | // The other pod will exclude us automatically after a while |
208 | eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) { | 179 | return Promise.map(pods, pod => { |
209 | requestParams.toPod = pod | 180 | requestParams.toPod = pod |
210 | makeSecureRequest(requestParams, callbackEach) | 181 | return makeSecureRequest(requestParams) |
211 | }, function (err) { | 182 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
212 | if (err) { | 183 | .then(() => pods) |
213 | logger.error('Some errors while quitting friends.', { err: err }) | 184 | .catch(err => { |
214 | // Don't stop the process | 185 | logger.error('Some errors while quitting friends.', { err: err }) |
215 | } | 186 | // Don't stop the process |
216 | |||
217 | return callbackAsync(null, pods) | ||
218 | }) | 187 | }) |
219 | }, | 188 | }) |
220 | 189 | .then(pods => { | |
221 | function removePodsFromDB (pods, callbackAsync) { | 190 | const tasks = [] |
222 | each(pods, function (pod: any, callbackEach) { | 191 | pods.forEach(pod => tasks.push(pod.destroy())) |
223 | pod.destroy().asCallback(callbackEach) | ||
224 | }, callbackAsync) | ||
225 | } | ||
226 | ], function (err: Error) { | ||
227 | // Don't forget to re activate the scheduler, even if there was an error | ||
228 | requestScheduler.activate() | ||
229 | |||
230 | if (err) return callback(err) | ||
231 | 192 | ||
232 | logger.info('Removed all remote videos.') | 193 | return Promise.all(pods) |
233 | return callback(null) | 194 | }) |
234 | }) | 195 | .then(() => { |
196 | logger.info('Removed all remote videos.') | ||
197 | // Don't forget to re activate the scheduler, even if there was an error | ||
198 | return requestScheduler.activate() | ||
199 | }) | ||
200 | .finally(() => requestScheduler.activate()) | ||
235 | } | 201 | } |
236 | 202 | ||
237 | function sendOwnedVideosToPod (podId: number) { | 203 | function sendOwnedVideosToPod (podId: number) { |
238 | db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { | 204 | db.Video.listOwnedAndPopulateAuthorAndTags() |
239 | if (err) { | 205 | .then(videosList => { |
240 | logger.error('Cannot get the list of videos we own.') | 206 | const tasks = [] |
241 | return | 207 | videosList.forEach(video => { |
242 | } | 208 | const promise = video.toAddRemoteJSON() |
243 | 209 | .then(remoteVideo => { | |
244 | videosList.forEach(function (video) { | 210 | const options = { |
245 | video.toAddRemoteJSON(function (err, remoteVideo) { | 211 | type: 'add', |
246 | if (err) { | 212 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
247 | logger.error('Cannot convert video to remote.', { error: err }) | 213 | data: remoteVideo, |
248 | // Don't break the process | 214 | toIds: [ podId ], |
249 | return | 215 | transaction: null |
250 | } | 216 | } |
251 | 217 | return createRequest(options) | |
252 | const options = { | 218 | }) |
253 | type: 'add', | 219 | .catch(err => { |
254 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 220 | logger.error('Cannot convert video to remote.', { error: err }) |
255 | data: remoteVideo, | 221 | // Don't break the process |
256 | toIds: [ podId ], | 222 | return undefined |
257 | transaction: null | 223 | }) |
258 | } | 224 | |
259 | createRequest(options) | 225 | tasks.push(promise) |
260 | }) | 226 | }) |
227 | |||
228 | return Promise.all(tasks) | ||
261 | }) | 229 | }) |
262 | }) | ||
263 | } | 230 | } |
264 | 231 | ||
265 | function getRequestScheduler () { | 232 | function getRequestScheduler () { |
@@ -297,23 +264,22 @@ export { | |||
297 | 264 | ||
298 | // --------------------------------------------------------------------------- | 265 | // --------------------------------------------------------------------------- |
299 | 266 | ||
300 | function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) { | 267 | function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { |
301 | getForeignPodsList(host, function (err, res) { | 268 | // TODO: type res |
302 | if (err) return callback(err) | 269 | return getForeignPodsList(host).then((res: any) => { |
303 | |||
304 | const foreignPodsList = res.data | 270 | const foreignPodsList = res.data |
305 | 271 | ||
306 | // Let's give 1 point to the pod we ask the friends list | 272 | // Let's give 1 point to the pod we ask the friends list |
307 | foreignPodsList.push({ host }) | 273 | foreignPodsList.push({ host }) |
308 | 274 | ||
309 | foreignPodsList.forEach(function (foreignPod) { | 275 | foreignPodsList.forEach(foreignPod => { |
310 | const foreignPodHost = foreignPod.host | 276 | const foreignPodHost = foreignPod.host |
311 | 277 | ||
312 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ | 278 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ |
313 | else podsScore[foreignPodHost] = 1 | 279 | else podsScore[foreignPodHost] = 1 |
314 | }) | 280 | }) |
315 | 281 | ||
316 | return callback(null) | 282 | return undefined |
317 | }) | 283 | }) |
318 | } | 284 | } |
319 | 285 | ||
@@ -323,7 +289,7 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num | |||
323 | const podsList = [] | 289 | const podsList = [] |
324 | const baseScore = hosts.length / 2 | 290 | const baseScore = hosts.length / 2 |
325 | 291 | ||
326 | Object.keys(podsScore).forEach(function (podHost) { | 292 | Object.keys(podsScore).forEach(podHost => { |
327 | // If the pod is not me and with a good score we add it | 293 | // If the pod is not me and with a good score we add it |
328 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { | 294 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { |
329 | podsList.push({ host: podHost }) | 295 | podsList.push({ host: podHost }) |
@@ -333,28 +299,30 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num | |||
333 | return podsList | 299 | return podsList |
334 | } | 300 | } |
335 | 301 | ||
336 | function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) { | 302 | function getForeignPodsList (host: string) { |
337 | const path = '/api/' + API_VERSION + '/pods' | 303 | return new Promise((res, rej) => { |
304 | const path = '/api/' + API_VERSION + '/pods' | ||
338 | 305 | ||
339 | request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { | 306 | request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { |
340 | if (err) return callback(err) | 307 | if (err) return rej(err) |
341 | 308 | ||
342 | try { | 309 | try { |
343 | const json = JSON.parse(body) | 310 | const json = JSON.parse(body) |
344 | return callback(null, json) | 311 | return res(json) |
345 | } catch (err) { | 312 | } catch (err) { |
346 | return callback(err) | 313 | return rej(err) |
347 | } | 314 | } |
315 | }) | ||
348 | }) | 316 | }) |
349 | } | 317 | } |
350 | 318 | ||
351 | function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) { | 319 | function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { |
352 | // Stop pool requests | 320 | // Stop pool requests |
353 | requestScheduler.deactivate() | 321 | requestScheduler.deactivate() |
354 | // Flush pool requests | 322 | // Flush pool requests |
355 | requestScheduler.forceSend() | 323 | requestScheduler.forceSend() |
356 | 324 | ||
357 | eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) { | 325 | return Promise.map(podsList, pod => { |
358 | const params = { | 326 | const params = { |
359 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', | 327 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', |
360 | method: 'POST' as 'POST', | 328 | method: 'POST' as 'POST', |
@@ -365,38 +333,35 @@ function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callb | |||
365 | } | 333 | } |
366 | } | 334 | } |
367 | 335 | ||
368 | makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) { | 336 | return makeRetryRequest(params) |
369 | if (err) { | 337 | .then(({ response, body }) => { |
370 | logger.error('Error with adding %s pod.', pod.host, { error: err }) | 338 | body = body as { cert: string, email: string } |
339 | |||
340 | if (response.statusCode === 200) { | ||
341 | const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) | ||
342 | return podObj.save() | ||
343 | .then(podCreated => { | ||
344 | |||
345 | // Add our videos to the request scheduler | ||
346 | sendOwnedVideosToPod(podCreated.id) | ||
347 | }) | ||
348 | .catch(err => { | ||
349 | logger.error('Cannot add friend %s pod.', pod.host, { error: err }) | ||
350 | }) | ||
351 | } else { | ||
352 | logger.error('Status not 200 for %s pod.', pod.host) | ||
353 | } | ||
354 | }) | ||
355 | .catch(err => { | ||
356 | logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) | ||
371 | // Don't break the process | 357 | // Don't break the process |
372 | return callbackEach() | 358 | }) |
373 | } | 359 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
374 | 360 | .then(() => logger.debug('makeRequestsToWinningPods finished.')) | |
375 | if (res.statusCode === 200) { | 361 | .finally(() => { |
376 | const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) | ||
377 | podObj.save().asCallback(function (err, podCreated) { | ||
378 | if (err) { | ||
379 | logger.error('Cannot add friend %s pod.', pod.host, { error: err }) | ||
380 | return callbackEach() | ||
381 | } | ||
382 | |||
383 | // Add our videos to the request scheduler | ||
384 | sendOwnedVideosToPod(podCreated.id) | ||
385 | |||
386 | return callbackEach() | ||
387 | }) | ||
388 | } else { | ||
389 | logger.error('Status not 200 for %s pod.', pod.host) | ||
390 | return callbackEach() | ||
391 | } | ||
392 | }) | ||
393 | }, function endRequests () { | ||
394 | // Final callback, we've ended all the requests | 362 | // Final callback, we've ended all the requests |
395 | // Now we made new friends, we can re activate the pool of requests | 363 | // Now we made new friends, we can re activate the pool of requests |
396 | requestScheduler.activate() | 364 | requestScheduler.activate() |
397 | |||
398 | logger.debug('makeRequestsToWinningPods finished.') | ||
399 | return callback(null) | ||
400 | }) | 365 | }) |
401 | } | 366 | } |
402 | 367 | ||
@@ -408,33 +373,22 @@ type CreateRequestOptions = { | |||
408 | toIds?: number[] | 373 | toIds?: number[] |
409 | transaction: Sequelize.Transaction | 374 | transaction: Sequelize.Transaction |
410 | } | 375 | } |
411 | function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) { | 376 | function createRequest (options: CreateRequestOptions) { |
412 | if (!callback) callback = function () { /* empty */ } | 377 | if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) |
413 | |||
414 | if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback) | ||
415 | 378 | ||
416 | // If the "toIds" pods is not specified, we send the request to all our friends | 379 | // If the "toIds" pods is not specified, we send the request to all our friends |
417 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | 380 | return db.Pod.listAllIds(options.transaction).then(podIds => { |
418 | if (err) { | ||
419 | logger.error('Cannot get pod ids', { error: err }) | ||
420 | return | ||
421 | } | ||
422 | |||
423 | const newOptions = Object.assign(options, { toIds: podIds }) | 381 | const newOptions = Object.assign(options, { toIds: podIds }) |
424 | return requestScheduler.createRequest(newOptions, callback) | 382 | return requestScheduler.createRequest(newOptions) |
425 | }) | 383 | }) |
426 | } | 384 | } |
427 | 385 | ||
428 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { | 386 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { |
429 | if (!callback) callback = createEmptyCallback() | 387 | return requestVideoQaduScheduler.createRequest(options) |
430 | |||
431 | requestVideoQaduScheduler.createRequest(options, callback) | ||
432 | } | 388 | } |
433 | 389 | ||
434 | function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { | 390 | function createVideoEventRequest (options: RequestVideoEventSchedulerOptions) { |
435 | if (!callback) callback = createEmptyCallback() | 391 | return requestVideoEventScheduler.createRequest(options) |
436 | |||
437 | requestVideoEventScheduler.createRequest(options, callback) | ||
438 | } | 392 | } |
439 | 393 | ||
440 | function isMe (host: string) { | 394 | function isMe (host: string) { |
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts index 7d0263b15..8abddae35 100644 --- a/server/lib/jobs/handlers/index.ts +++ b/server/lib/jobs/handlers/index.ts | |||
@@ -1,11 +1,9 @@ | |||
1 | import * as videoTranscoder from './video-transcoder' | 1 | import * as videoTranscoder from './video-transcoder' |
2 | 2 | ||
3 | import { VideoInstance } from '../../../models' | ||
4 | |||
5 | export interface JobHandler<T> { | 3 | export interface JobHandler<T> { |
6 | process (data: object, callback: (err: Error, videoInstance?: T) => void) | 4 | process (data: object): T |
7 | onError (err: Error, jobId: number, video: T, callback: (err: Error) => void) | 5 | onError (err: Error, jobId: number) |
8 | onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void) | 6 | onSuccess (jobId: number, jobResult: T) |
9 | } | 7 | } |
10 | 8 | ||
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | 9 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { |
diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts index 6f606a7d3..e829ca813 100644 --- a/server/lib/jobs/handlers/video-transcoder.ts +++ b/server/lib/jobs/handlers/video-transcoder.ts | |||
@@ -3,29 +3,23 @@ import { logger } from '../../../helpers' | |||
3 | import { addVideoToFriends } from '../../../lib' | 3 | import { addVideoToFriends } from '../../../lib' |
4 | import { VideoInstance } from '../../../models' | 4 | import { VideoInstance } from '../../../models' |
5 | 5 | ||
6 | function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) { | 6 | function process (data: { id: string }) { |
7 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { | 7 | return db.Video.loadAndPopulateAuthorAndPodAndTags(data.id).then(video => { |
8 | if (err) return callback(err) | 8 | return video.transcodeVideofile().then(() => video) |
9 | |||
10 | video.transcodeVideofile(function (err) { | ||
11 | return callback(err, video) | ||
12 | }) | ||
13 | }) | 9 | }) |
14 | } | 10 | } |
15 | 11 | ||
16 | function onError (err: Error, jobId: number, video: VideoInstance, callback: (err: Error) => void) { | 12 | function onError (err: Error, jobId: number) { |
17 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) | 13 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) |
18 | return callback(null) | 14 | return Promise.resolve() |
19 | } | 15 | } |
20 | 16 | ||
21 | function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) { | 17 | function onSuccess (jobId: number, video: VideoInstance) { |
22 | logger.info('Job %d is a success.', jobId) | 18 | logger.info('Job %d is a success.', jobId) |
23 | 19 | ||
24 | video.toAddRemoteJSON(function (err, remoteVideo) { | 20 | video.toAddRemoteJSON().then(remoteVideo => { |
25 | if (err) return callback(err) | ||
26 | |||
27 | // Now we'll add the video's meta data to our friends | 21 | // Now we'll add the video's meta data to our friends |
28 | addVideoToFriends(remoteVideo, null, callback) | 22 | return addVideoToFriends(remoteVideo, null) |
29 | }) | 23 | }) |
30 | } | 24 | } |
31 | 25 | ||
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 2f01387e7..248dc7978 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -32,37 +32,35 @@ class JobScheduler { | |||
32 | 32 | ||
33 | // Finish processing jobs from a previous start | 33 | // Finish processing jobs from a previous start |
34 | const state = JOB_STATES.PROCESSING | 34 | const state = JOB_STATES.PROCESSING |
35 | db.Job.listWithLimit(limit, state, (err, jobs) => { | 35 | db.Job.listWithLimit(limit, state) |
36 | this.enqueueJobs(err, jobsQueue, jobs) | 36 | .then(jobs => { |
37 | 37 | this.enqueueJobs(jobsQueue, jobs) | |
38 | forever( | 38 | |
39 | next => { | 39 | forever( |
40 | if (jobsQueue.length() !== 0) { | 40 | next => { |
41 | // Finish processing the queue first | 41 | if (jobsQueue.length() !== 0) { |
42 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | 42 | // Finish processing the queue first |
43 | } | 43 | return setTimeout(next, JOBS_FETCHING_INTERVAL) |
44 | |||
45 | const state = JOB_STATES.PENDING | ||
46 | db.Job.listWithLimit(limit, state, (err, jobs) => { | ||
47 | if (err) { | ||
48 | logger.error('Cannot list pending jobs.', { error: err }) | ||
49 | } else { | ||
50 | jobs.forEach(job => { | ||
51 | jobsQueue.push(job) | ||
52 | }) | ||
53 | } | 44 | } |
54 | 45 | ||
55 | // Optimization: we could use "drain" from queue object | 46 | const state = JOB_STATES.PENDING |
56 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | 47 | db.Job.listWithLimit(limit, state) |
57 | }) | 48 | .then(jobs => { |
58 | }, | 49 | this.enqueueJobs(jobsQueue, jobs) |
59 | 50 | ||
60 | err => { logger.error('Error in job scheduler queue.', { error: err }) } | 51 | // Optimization: we could use "drain" from queue object |
61 | ) | 52 | return setTimeout(next, JOBS_FETCHING_INTERVAL) |
62 | }) | 53 | }) |
54 | .catch(err => logger.error('Cannot list pending jobs.', { error: err })) | ||
55 | }, | ||
56 | |||
57 | err => logger.error('Error in job scheduler queue.', { error: err }) | ||
58 | ) | ||
59 | }) | ||
60 | .catch(err => logger.error('Cannot list pending jobs.', { error: err })) | ||
63 | } | 61 | } |
64 | 62 | ||
65 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { | 63 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { |
66 | const createQuery = { | 64 | const createQuery = { |
67 | state: JOB_STATES.PENDING, | 65 | state: JOB_STATES.PENDING, |
68 | handlerName, | 66 | handlerName, |
@@ -70,67 +68,62 @@ class JobScheduler { | |||
70 | } | 68 | } |
71 | const options = { transaction } | 69 | const options = { transaction } |
72 | 70 | ||
73 | db.Job.create(createQuery, options).asCallback(callback) | 71 | return db.Job.create(createQuery, options) |
74 | } | 72 | } |
75 | 73 | ||
76 | private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { | 74 | private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { |
77 | if (err) { | 75 | jobs.forEach(job => jobsQueue.push(job)) |
78 | logger.error('Cannot list pending jobs.', { error: err }) | ||
79 | } else { | ||
80 | jobs.forEach(job => { | ||
81 | jobsQueue.push(job) | ||
82 | }) | ||
83 | } | ||
84 | } | 76 | } |
85 | 77 | ||
86 | private processJob (job: JobInstance, callback: (err: Error) => void) { | 78 | private processJob (job: JobInstance, callback: (err: Error) => void) { |
87 | const jobHandler = jobHandlers[job.handlerName] | 79 | const jobHandler = jobHandlers[job.handlerName] |
80 | if (jobHandler === undefined) { | ||
81 | logger.error('Unknown job handler for job %s.', job.handlerName) | ||
82 | return callback(null) | ||
83 | } | ||
88 | 84 | ||
89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | 85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) |
90 | 86 | ||
91 | job.state = JOB_STATES.PROCESSING | 87 | job.state = JOB_STATES.PROCESSING |
92 | job.save().asCallback(err => { | 88 | return job.save() |
93 | if (err) return this.cannotSaveJobError(err, callback) | 89 | .then(() => { |
94 | 90 | return jobHandler.process(job.handlerInputData) | |
95 | if (jobHandler === undefined) { | 91 | }) |
96 | logger.error('Unknown job handler for job %s.', job.handlerName) | 92 | .then( |
97 | return callback(null) | 93 | result => { |
98 | } | 94 | return this.onJobSuccess(jobHandler, job, result) |
95 | }, | ||
99 | 96 | ||
100 | return jobHandler.process(job.handlerInputData, (err, result) => { | 97 | err => { |
101 | if (err) { | ||
102 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | 98 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) |
103 | return this.onJobError(jobHandler, job, result, callback) | 99 | return this.onJobError(jobHandler, job, err) |
104 | } | 100 | } |
105 | 101 | ) | |
106 | return this.onJobSuccess(jobHandler, job, result, callback) | 102 | .then(() => callback(null)) |
103 | .catch(err => { | ||
104 | this.cannotSaveJobError(err) | ||
105 | return callback(err) | ||
107 | }) | 106 | }) |
108 | }) | ||
109 | } | 107 | } |
110 | 108 | ||
111 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { | 109 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { |
112 | job.state = JOB_STATES.ERROR | 110 | job.state = JOB_STATES.ERROR |
113 | 111 | ||
114 | job.save().asCallback(err => { | 112 | return job.save() |
115 | if (err) return this.cannotSaveJobError(err, callback) | 113 | .then(() => jobHandler.onError(err, job.id)) |
116 | 114 | .catch(err => this.cannotSaveJobError(err)) | |
117 | return jobHandler.onError(err, job.id, jobResult, callback) | ||
118 | }) | ||
119 | } | 115 | } |
120 | 116 | ||
121 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { | 117 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { |
122 | job.state = JOB_STATES.SUCCESS | 118 | job.state = JOB_STATES.SUCCESS |
123 | 119 | ||
124 | job.save().asCallback(err => { | 120 | return job.save() |
125 | if (err) return this.cannotSaveJobError(err, callback) | 121 | .then(() => jobHandler.onSuccess(job.id, jobResult)) |
126 | 122 | .catch(err => this.cannotSaveJobError(err)) | |
127 | return jobHandler.onSuccess(err, job.id, jobResult, callback) | ||
128 | }) | ||
129 | } | 123 | } |
130 | 124 | ||
131 | private cannotSaveJobError (err: Error, callback: (err: Error) => void) { | 125 | private cannotSaveJobError (err: Error) { |
132 | logger.error('Cannot save new job state.', { error: err }) | 126 | logger.error('Cannot save new job state.', { error: err }) |
133 | return callback(err) | ||
134 | } | 127 | } |
135 | } | 128 | } |
136 | 129 | ||
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts index 7cf42e94c..f34c9c667 100644 --- a/server/lib/oauth-model.ts +++ b/server/lib/oauth-model.ts | |||
@@ -30,17 +30,10 @@ function getUser (username: string, password: string) { | |||
30 | return db.User.getByUsername(username).then(function (user) { | 30 | return db.User.getByUsername(username).then(function (user) { |
31 | if (!user) return null | 31 | if (!user) return null |
32 | 32 | ||
33 | // We need to return a promise | 33 | return user.isPasswordMatch(password).then(passwordMatch => { |
34 | return new Promise(function (resolve, reject) { | 34 | if (passwordMatch === false) return null |
35 | return user.isPasswordMatch(password, function (err, isPasswordMatch) { | ||
36 | if (err) return reject(err) | ||
37 | 35 | ||
38 | if (isPasswordMatch === true) { | 36 | return user |
39 | return resolve(user) | ||
40 | } | ||
41 | |||
42 | return resolve(null) | ||
43 | }) | ||
44 | }) | 37 | }) |
45 | }) | 38 | }) |
46 | } | 39 | } |
@@ -80,8 +73,6 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns | |||
80 | tokenCreated.user = user | 73 | tokenCreated.user = user |
81 | 74 | ||
82 | return tokenCreated | 75 | return tokenCreated |
83 | }).catch(function (err) { | ||
84 | throw err | ||
85 | }) | 76 | }) |
86 | } | 77 | } |
87 | 78 | ||
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index e81ab9c36..dd77fddb7 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -1,15 +1,16 @@ | |||
1 | import * as eachLimit from 'async/eachLimit' | 1 | import { isEmpty } from 'lodash' |
2 | import * as Promise from 'bluebird' | ||
2 | 3 | ||
3 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
4 | import { logger, makeSecureRequest } from '../../helpers' | 5 | import { logger, makeSecureRequest } from '../../helpers' |
5 | import { PodInstance } from '../../models' | 6 | import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' |
6 | import { | 7 | import { |
7 | API_VERSION, | 8 | API_VERSION, |
8 | REQUESTS_IN_PARALLEL, | 9 | REQUESTS_IN_PARALLEL, |
9 | REQUESTS_INTERVAL | 10 | REQUESTS_INTERVAL |
10 | } from '../../initializers' | 11 | } from '../../initializers' |
11 | 12 | ||
12 | abstract class AbstractRequestScheduler { | 13 | abstract class AbstractRequestScheduler <T> { |
13 | requestInterval: number | 14 | requestInterval: number |
14 | limitPods: number | 15 | limitPods: number |
15 | limitPerPod: number | 16 | limitPerPod: number |
@@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler { | |||
24 | this.requestInterval = REQUESTS_INTERVAL | 25 | this.requestInterval = REQUESTS_INTERVAL |
25 | } | 26 | } |
26 | 27 | ||
27 | abstract getRequestModel () | 28 | abstract getRequestModel (): AbstractRequestClass<T> |
28 | abstract getRequestToPodModel () | 29 | abstract getRequestToPodModel (): AbstractRequestToPodClass |
29 | abstract buildRequestObjects (requests: any) | 30 | abstract buildRequestObjects (requestsGrouped: T): {} |
30 | 31 | ||
31 | activate () { | 32 | activate () { |
32 | logger.info('Requests scheduler activated.') | 33 | logger.info('Requests scheduler activated.') |
@@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler { | |||
55 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | 56 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) |
56 | } | 57 | } |
57 | 58 | ||
58 | remainingRequestsCount (callback: (err: Error, total: number) => void) { | 59 | remainingRequestsCount () { |
59 | return this.getRequestModel().countTotalRequests(callback) | 60 | return this.getRequestModel().countTotalRequests() |
60 | } | 61 | } |
61 | 62 | ||
62 | flush (callback: (err: Error) => void) { | 63 | flush () { |
63 | this.getRequestModel().removeAll(callback) | 64 | return this.getRequestModel().removeAll() |
64 | } | 65 | } |
65 | 66 | ||
66 | // --------------------------------------------------------------------------- | 67 | // --------------------------------------------------------------------------- |
67 | 68 | ||
68 | // Make a requests to friends of a certain type | 69 | // Make a requests to friends of a certain type |
69 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { | 70 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) { |
70 | if (!callback) callback = function () { /* empty */ } | ||
71 | |||
72 | const params = { | 71 | const params = { |
73 | toPod: toPod, | 72 | toPod: toPod, |
74 | sign: true, // Prove our identity | 73 | sign: true, // Prove our identity |
@@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler { | |||
79 | 78 | ||
80 | // Make multiple retry requests to all of pods | 79 | // Make multiple retry requests to all of pods |
81 | // The function fire some useful callbacks | 80 | // The function fire some useful callbacks |
82 | makeSecureRequest(params, (err, res) => { | 81 | return makeSecureRequest(params) |
83 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | 82 | .then(({ response, body }) => { |
84 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | 83 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { |
84 | throw new Error('Status code not 20x : ' + response.statusCode) | ||
85 | } | ||
86 | }) | ||
87 | .catch(err => { | ||
85 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | 88 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) |
86 | 89 | ||
87 | return callback(err) | 90 | throw err |
88 | } | 91 | }) |
89 | |||
90 | return callback(null) | ||
91 | }) | ||
92 | } | 92 | } |
93 | 93 | ||
94 | // Make all the requests of the scheduler | 94 | // Make all the requests of the scheduler |
95 | protected makeRequests () { | 95 | protected makeRequests () { |
96 | this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { | 96 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) |
97 | if (err) { | 97 | .then((requestsGrouped: T) => { |
98 | logger.error('Cannot get the list of "%s".', this.description, { err: err }) | 98 | // We want to group requests by destinations pod and endpoint |
99 | return // Abort | 99 | const requestsToMake = this.buildRequestObjects(requestsGrouped) |
100 | } | 100 | |
101 | 101 | // If there are no requests, abort | |
102 | // If there are no requests, abort | 102 | if (isEmpty(requestsToMake) === true) { |
103 | if (requests.length === 0) { | 103 | logger.info('No "%s" to make.', this.description) |
104 | logger.info('No "%s" to make.', this.description) | 104 | return { goodPods: [], badPods: [] } |
105 | return | 105 | } |
106 | } | 106 | |
107 | 107 | logger.info('Making "%s" to friends.', this.description) | |
108 | // We want to group requests by destinations pod and endpoint | 108 | |
109 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | 109 | const goodPods = [] |
110 | 110 | const badPods = [] | |
111 | logger.info('Making "%s" to friends.', this.description) | 111 | |
112 | 112 | return Promise.map(Object.keys(requestsToMake), hashKey => { | |
113 | const goodPods = [] | 113 | const requestToMake = requestsToMake[hashKey] |
114 | const badPods = [] | 114 | const toPod: PodInstance = requestToMake.toPod |
115 | 115 | ||
116 | eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | 116 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) |
117 | const requestToMake = requestsToMakeGrouped[hashKey] | 117 | .then(() => { |
118 | const toPod = requestToMake.toPod | 118 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
119 | 119 | goodPods.push(requestToMake.toPod.id) | |
120 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { | 120 | |
121 | if (err) { | 121 | this.afterRequestHook() |
122 | badPods.push(requestToMake.toPod.id) | 122 | |
123 | return callbackEach() | 123 | // Remove the pod id of these request ids |
124 | } | 124 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) |
125 | 125 | }) | |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | 126 | .catch(err => { |
127 | goodPods.push(requestToMake.toPod.id) | 127 | badPods.push(requestToMake.toPod.id) |
128 | 128 | logger.info('Cannot make request to %s.', toPod.host, { error: err }) | |
129 | // Remove the pod id of these request ids | 129 | }) |
130 | this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) | 130 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) |
131 | }) | ||
132 | .then(({ goodPods, badPods }) => { | ||
133 | this.afterRequestsHook() | ||
131 | 134 | ||
132 | this.afterRequestHook() | ||
133 | }) | ||
134 | }, () => { | ||
135 | // All the requests were made, we update the pods score | 135 | // All the requests were made, we update the pods score |
136 | db.Pod.updatePodsScore(goodPods, badPods) | 136 | return db.Pod.updatePodsScore(goodPods, badPods) |
137 | |||
138 | this.afterRequestsHook() | ||
139 | }) | 137 | }) |
140 | }) | 138 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) |
141 | } | 139 | } |
142 | 140 | ||
143 | protected afterRequestHook () { | 141 | protected afterRequestHook () { |
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 575e0227c..0dd796fb0 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts | |||
@@ -3,10 +3,8 @@ import * as Sequelize from 'sequelize' | |||
3 | import { database as db } from '../../initializers/database' | 3 | import { database as db } from '../../initializers/database' |
4 | import { AbstractRequestScheduler } from './abstract-request-scheduler' | 4 | import { AbstractRequestScheduler } from './abstract-request-scheduler' |
5 | import { logger } from '../../helpers' | 5 | import { logger } from '../../helpers' |
6 | import { | 6 | import { REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PER_POD } from '../../initializers' |
7 | REQUESTS_LIMIT_PODS, | 7 | import { RequestsGrouped } from '../../models' |
8 | REQUESTS_LIMIT_PER_POD | ||
9 | } from '../../initializers' | ||
10 | import { RequestEndpoint } from '../../../shared' | 8 | import { RequestEndpoint } from '../../../shared' |
11 | 9 | ||
12 | export type RequestSchedulerOptions = { | 10 | export type RequestSchedulerOptions = { |
@@ -17,7 +15,7 @@ export type RequestSchedulerOptions = { | |||
17 | transaction: Sequelize.Transaction | 15 | transaction: Sequelize.Transaction |
18 | } | 16 | } |
19 | 17 | ||
20 | class RequestScheduler extends AbstractRequestScheduler { | 18 | class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { |
21 | constructor () { | 19 | constructor () { |
22 | super() | 20 | super() |
23 | 21 | ||
@@ -36,11 +34,11 @@ class RequestScheduler extends AbstractRequestScheduler { | |||
36 | return db.RequestToPod | 34 | return db.RequestToPod |
37 | } | 35 | } |
38 | 36 | ||
39 | buildRequestObjects (requests: { [ toPodId: number ]: any }) { | 37 | buildRequestObjects (requestsGrouped: RequestsGrouped) { |
40 | const requestsToMakeGrouped = {} | 38 | const requestsToMakeGrouped = {} |
41 | 39 | ||
42 | Object.keys(requests).forEach(toPodId => { | 40 | Object.keys(requestsGrouped).forEach(toPodId => { |
43 | requests[toPodId].forEach(data => { | 41 | requestsGrouped[toPodId].forEach(data => { |
44 | const request = data.request | 42 | const request = data.request |
45 | const pod = data.pod | 43 | const pod = data.pod |
46 | const hashKey = toPodId + request.endpoint | 44 | const hashKey = toPodId + request.endpoint |
@@ -62,12 +60,12 @@ class RequestScheduler extends AbstractRequestScheduler { | |||
62 | return requestsToMakeGrouped | 60 | return requestsToMakeGrouped |
63 | } | 61 | } |
64 | 62 | ||
65 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions, callback: (err: Error) => void) { | 63 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { |
66 | // TODO: check the setPods works | 64 | // TODO: check the setPods works |
67 | const podIds = [] | 65 | const podIds = [] |
68 | 66 | ||
69 | // If there are no destination pods abort | 67 | // If there are no destination pods abort |
70 | if (toIds.length === 0) return callback(null) | 68 | if (toIds.length === 0) return undefined |
71 | 69 | ||
72 | toIds.forEach(toPod => { | 70 | toIds.forEach(toPod => { |
73 | podIds.push(toPod) | 71 | podIds.push(toPod) |
@@ -85,20 +83,18 @@ class RequestScheduler extends AbstractRequestScheduler { | |||
85 | transaction | 83 | transaction |
86 | } | 84 | } |
87 | 85 | ||
88 | return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { | 86 | return db.Request.create(createQuery, dbRequestOptions) |
89 | if (err) return callback(err) | 87 | .then(request => { |
90 | 88 | return request.setPods(podIds, dbRequestOptions) | |
91 | return request.setPods(podIds, dbRequestOptions).asCallback(callback) | 89 | }) |
92 | }) | ||
93 | } | 90 | } |
94 | 91 | ||
95 | // --------------------------------------------------------------------------- | 92 | // --------------------------------------------------------------------------- |
96 | 93 | ||
97 | afterRequestsHook () { | 94 | afterRequestsHook () { |
98 | // Flush requests with no pod | 95 | // Flush requests with no pod |
99 | this.getRequestModel().removeWithEmptyTo(err => { | 96 | this.getRequestModel().removeWithEmptyTo() |
100 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | 97 | .catch(err => logger.error('Error when removing requests with no pods.', { error: err })) |
101 | }) | ||
102 | } | 98 | } |
103 | } | 99 | } |
104 | 100 | ||
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 4bb76f4c9..d4d714c02 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts | |||
@@ -7,6 +7,7 @@ import { | |||
7 | REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, | 7 | REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, |
8 | REQUEST_VIDEO_EVENT_ENDPOINT | 8 | REQUEST_VIDEO_EVENT_ENDPOINT |
9 | } from '../../initializers' | 9 | } from '../../initializers' |
10 | import { RequestsVideoEventGrouped } from '../../models' | ||
10 | import { RequestVideoEventType } from '../../../shared' | 11 | import { RequestVideoEventType } from '../../../shared' |
11 | 12 | ||
12 | export type RequestVideoEventSchedulerOptions = { | 13 | export type RequestVideoEventSchedulerOptions = { |
@@ -16,7 +17,7 @@ export type RequestVideoEventSchedulerOptions = { | |||
16 | transaction?: Sequelize.Transaction | 17 | transaction?: Sequelize.Transaction |
17 | } | 18 | } |
18 | 19 | ||
19 | class RequestVideoEventScheduler extends AbstractRequestScheduler { | 20 | class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoEventGrouped> { |
20 | constructor () { | 21 | constructor () { |
21 | super() | 22 | super() |
22 | 23 | ||
@@ -35,7 +36,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
35 | return db.RequestVideoEvent | 36 | return db.RequestVideoEvent |
36 | } | 37 | } |
37 | 38 | ||
38 | buildRequestObjects (eventsToProcess: { [ toPodId: number ]: any }[]) { | 39 | buildRequestObjects (eventRequests: RequestsVideoEventGrouped) { |
39 | const requestsToMakeGrouped = {} | 40 | const requestsToMakeGrouped = {} |
40 | 41 | ||
41 | /* Example: | 42 | /* Example: |
@@ -50,8 +51,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
50 | 51 | ||
51 | // We group video events per video and per pod | 52 | // We group video events per video and per pod |
52 | // We add the counts of the same event types | 53 | // We add the counts of the same event types |
53 | Object.keys(eventsToProcess).forEach(toPodId => { | 54 | Object.keys(eventRequests).forEach(toPodId => { |
54 | eventsToProcess[toPodId].forEach(eventToProcess => { | 55 | eventRequests[toPodId].forEach(eventToProcess => { |
55 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | 56 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} |
56 | 57 | ||
57 | if (!requestsToMakeGrouped[toPodId]) { | 58 | if (!requestsToMakeGrouped[toPodId]) { |
@@ -97,7 +98,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
97 | return requestsToMakeGrouped | 98 | return requestsToMakeGrouped |
98 | } | 99 | } |
99 | 100 | ||
100 | createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { | 101 | createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions) { |
101 | if (count === undefined) count = 1 | 102 | if (count === undefined) count = 1 |
102 | 103 | ||
103 | const dbRequestOptions: Sequelize.CreateOptions = {} | 104 | const dbRequestOptions: Sequelize.CreateOptions = {} |
@@ -109,7 +110,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { | |||
109 | videoId | 110 | videoId |
110 | } | 111 | } |
111 | 112 | ||
112 | return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) | 113 | return db.RequestVideoEvent.create(createQuery, dbRequestOptions) |
113 | } | 114 | } |
114 | } | 115 | } |
115 | 116 | ||
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index d7169cc81..5ec7de9c2 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts | |||
@@ -9,6 +9,7 @@ import { | |||
9 | REQUEST_VIDEO_QADU_ENDPOINT, | 9 | REQUEST_VIDEO_QADU_ENDPOINT, |
10 | REQUEST_VIDEO_QADU_TYPES | 10 | REQUEST_VIDEO_QADU_TYPES |
11 | } from '../../initializers' | 11 | } from '../../initializers' |
12 | import { RequestsVideoQaduGrouped } from '../../models' | ||
12 | import { RequestVideoQaduType } from '../../../shared' | 13 | import { RequestVideoQaduType } from '../../../shared' |
13 | 14 | ||
14 | export type RequestVideoQaduSchedulerOptions = { | 15 | export type RequestVideoQaduSchedulerOptions = { |
@@ -17,7 +18,7 @@ export type RequestVideoQaduSchedulerOptions = { | |||
17 | transaction?: Sequelize.Transaction | 18 | transaction?: Sequelize.Transaction |
18 | } | 19 | } |
19 | 20 | ||
20 | class RequestVideoQaduScheduler extends AbstractRequestScheduler { | 21 | class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQaduGrouped> { |
21 | constructor () { | 22 | constructor () { |
22 | super() | 23 | super() |
23 | 24 | ||
@@ -36,7 +37,7 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { | |||
36 | return db.RequestVideoQadu | 37 | return db.RequestVideoQadu |
37 | } | 38 | } |
38 | 39 | ||
39 | buildRequestObjects (requests: { [ toPodId: number ]: any }[]) { | 40 | buildRequestObjects (requests: RequestsVideoQaduGrouped) { |
40 | const requestsToMakeGrouped = {} | 41 | const requestsToMakeGrouped = {} |
41 | 42 | ||
42 | Object.keys(requests).forEach(toPodId => { | 43 | Object.keys(requests).forEach(toPodId => { |
@@ -105,20 +106,18 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { | |||
105 | return requestsToMakeGrouped | 106 | return requestsToMakeGrouped |
106 | } | 107 | } |
107 | 108 | ||
108 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { | 109 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { |
109 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} | 110 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} |
110 | if (transaction) dbRequestOptions.transaction = transaction | 111 | if (transaction) dbRequestOptions.transaction = transaction |
111 | 112 | ||
112 | // Send the update to all our friends | 113 | // Send the update to all our friends |
113 | db.Pod.listAllIds(transaction, function (err, podIds) { | 114 | return db.Pod.listAllIds(transaction).then(podIds => { |
114 | if (err) return callback(err) | ||
115 | |||
116 | const queries = [] | 115 | const queries = [] |
117 | podIds.forEach(podId => { | 116 | podIds.forEach(podId => { |
118 | queries.push({ type, videoId, podId }) | 117 | queries.push({ type, videoId, podId }) |
119 | }) | 118 | }) |
120 | 119 | ||
121 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) | 120 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) |
122 | }) | 121 | }) |
123 | } | 122 | } |
124 | } | 123 | } |