diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-10-25 16:03:33 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-10-26 09:11:38 +0200 |
commit | f5028693a896a3076dd286ac0030e3d8f78f5ebf (patch) | |
tree | 09144ed6357e49ea575fb110247f933283ad235e /server/lib/friends.ts | |
parent | eb08047657e739bcd9e592d76307befa3998482b (diff) | |
download | PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.gz PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.zst PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.zip |
Use async/await in lib and initializers
Diffstat (limited to 'server/lib/friends.ts')
-rw-r--r-- | server/lib/friends.ts | 368 |
1 files changed, 189 insertions, 179 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts index f035b099b..a33432dc1 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import * as request from 'request' | 1 | import * as request from 'request' |
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | import * as Promise from 'bluebird' | 3 | import * as Bluebird from 'bluebird' |
4 | import { join } from 'path' | 4 | import { join } from 'path' |
5 | 5 | ||
6 | import { database as db } from '../initializers/database' | 6 | import { database as db } from '../initializers/database' |
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize. | |||
188 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { | 188 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { |
189 | const tasks = [] | 189 | const tasks = [] |
190 | 190 | ||
191 | eventsParams.forEach(eventParams => { | 191 | for (const eventParams of eventsParams) { |
192 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) | 192 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) |
193 | }) | 193 | } |
194 | 194 | ||
195 | return Promise.all(tasks) | 195 | return Promise.all(tasks) |
196 | } | 196 | } |
197 | 197 | ||
198 | function hasFriends () { | 198 | async function hasFriends () { |
199 | return db.Pod.countAll().then(count => count !== 0) | 199 | const count = await db.Pod.countAll() |
200 | |||
201 | return count !== 0 | ||
200 | } | 202 | } |
201 | 203 | ||
202 | function makeFriends (hosts: string[]) { | 204 | async function makeFriends (hosts: string[]) { |
203 | const podsScore = {} | 205 | const podsScore = {} |
204 | 206 | ||
205 | logger.info('Make friends!') | 207 | logger.info('Make friends!') |
206 | return getMyPublicCert() | 208 | const cert = await getMyPublicCert() |
207 | .then(cert => { | ||
208 | return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert) | ||
209 | }) | ||
210 | .then(cert => { | ||
211 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | ||
212 | const podsList = computeWinningPods(hosts, podsScore) | ||
213 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | ||
214 | 209 | ||
215 | return makeRequestsToWinningPods(cert, podsList) | 210 | for (const host of hosts) { |
216 | }) | 211 | await computeForeignPodsList(host, podsScore) |
212 | } | ||
213 | |||
214 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | ||
215 | |||
216 | const podsList = computeWinningPods(hosts, podsScore) | ||
217 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | ||
218 | |||
219 | return makeRequestsToWinningPods(cert, podsList) | ||
217 | } | 220 | } |
218 | 221 | ||
219 | function quitFriends () { | 222 | async function quitFriends () { |
220 | // Stop pool requests | 223 | // Stop pool requests |
221 | requestScheduler.deactivate() | 224 | requestScheduler.deactivate() |
222 | 225 | ||
223 | return requestScheduler.flush() | 226 | try { |
224 | .then(() => { | 227 | await requestScheduler.flush() |
225 | return requestVideoQaduScheduler.flush() | 228 | |
226 | }) | 229 | await requestVideoQaduScheduler.flush() |
227 | .then(() => { | 230 | |
228 | return db.Pod.list() | 231 | const pods = await db.Pod.list() |
229 | }) | 232 | const requestParams = { |
230 | .then(pods => { | 233 | method: 'POST' as 'POST', |
231 | const requestParams = { | 234 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
232 | method: 'POST' as 'POST', | 235 | toPod: null |
233 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 236 | } |
234 | toPod: null | ||
235 | } | ||
236 | 237 | ||
237 | // Announce we quit them | 238 | // Announce we quit them |
238 | // We don't care if the request fails | 239 | // We don't care if the request fails |
239 | // The other pod will exclude us automatically after a while | 240 | // The other pod will exclude us automatically after a while |
240 | return Promise.map(pods, pod => { | 241 | try { |
242 | await Bluebird.map(pods, pod => { | ||
241 | requestParams.toPod = pod | 243 | requestParams.toPod = pod |
242 | 244 | ||
243 | return makeSecureRequest(requestParams) | 245 | return makeSecureRequest(requestParams) |
244 | }, { concurrency: REQUESTS_IN_PARALLEL }) | 246 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
245 | .then(() => pods) | 247 | } catch (err) { // Don't stop the process |
246 | .catch(err => { | 248 | logger.error('Some errors while quitting friends.', err) |
247 | logger.error('Some errors while quitting friends.', err) | 249 | } |
248 | // Don't stop the process | ||
249 | return pods | ||
250 | }) | ||
251 | }) | ||
252 | .then(pods => { | ||
253 | const tasks = [] | ||
254 | pods.forEach(pod => tasks.push(pod.destroy())) | ||
255 | 250 | ||
256 | return Promise.all(pods) | 251 | const tasks = [] |
257 | }) | 252 | for (const pod of pods) { |
258 | .then(() => { | 253 | tasks.push(pod.destroy()) |
259 | logger.info('Removed all remote videos.') | 254 | } |
260 | // Don't forget to re activate the scheduler, even if there was an error | 255 | await Promise.all(pods) |
261 | return requestScheduler.activate() | 256 | |
262 | }) | 257 | logger.info('Removed all remote videos.') |
263 | .finally(() => requestScheduler.activate()) | 258 | |
259 | requestScheduler.activate() | ||
260 | } catch (err) { | ||
261 | // Don't forget to re activate the scheduler, even if there was an error | ||
262 | requestScheduler.activate() | ||
263 | |||
264 | throw err | ||
265 | } | ||
264 | } | 266 | } |
265 | 267 | ||
266 | function sendOwnedDataToPod (podId: number) { | 268 | async function sendOwnedDataToPod (podId: number) { |
267 | // First send authors | 269 | // First send authors |
268 | return sendOwnedAuthorsToPod(podId) | 270 | await sendOwnedAuthorsToPod(podId) |
269 | .then(() => sendOwnedChannelsToPod(podId)) | 271 | await sendOwnedChannelsToPod(podId) |
270 | .then(() => sendOwnedVideosToPod(podId)) | 272 | await sendOwnedVideosToPod(podId) |
273 | } | ||
274 | |||
275 | async function sendOwnedChannelsToPod (podId: number) { | ||
276 | const videoChannels = await db.VideoChannel.listOwned() | ||
277 | |||
278 | const tasks: Promise<any>[] = [] | ||
279 | for (const videoChannel of videoChannels) { | ||
280 | const remoteVideoChannel = videoChannel.toAddRemoteJSON() | ||
281 | const options = { | ||
282 | type: 'add-channel' as 'add-channel', | ||
283 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
284 | data: remoteVideoChannel, | ||
285 | toIds: [ podId ], | ||
286 | transaction: null | ||
287 | } | ||
288 | |||
289 | const p = createRequest(options) | ||
290 | tasks.push(p) | ||
291 | } | ||
292 | |||
293 | await Promise.all(tasks) | ||
271 | } | 294 | } |
272 | 295 | ||
273 | function sendOwnedChannelsToPod (podId: number) { | 296 | async function sendOwnedAuthorsToPod (podId: number) { |
274 | return db.VideoChannel.listOwned() | 297 | const authors = await db.Author.listOwned() |
275 | .then(videoChannels => { | 298 | const tasks: Promise<any>[] = [] |
276 | const tasks = [] | ||
277 | videoChannels.forEach(videoChannel => { | ||
278 | const remoteVideoChannel = videoChannel.toAddRemoteJSON() | ||
279 | const options = { | ||
280 | type: 'add-channel' as 'add-channel', | ||
281 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
282 | data: remoteVideoChannel, | ||
283 | toIds: [ podId ], | ||
284 | transaction: null | ||
285 | } | ||
286 | 299 | ||
287 | const p = createRequest(options) | 300 | for (const author of authors) { |
288 | tasks.push(p) | 301 | const remoteAuthor = author.toAddRemoteJSON() |
289 | }) | 302 | const options = { |
303 | type: 'add-author' as 'add-author', | ||
304 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
305 | data: remoteAuthor, | ||
306 | toIds: [ podId ], | ||
307 | transaction: null | ||
308 | } | ||
290 | 309 | ||
291 | return Promise.all(tasks) | 310 | const p = createRequest(options) |
292 | }) | 311 | tasks.push(p) |
312 | } | ||
313 | |||
314 | await Promise.all(tasks) | ||
293 | } | 315 | } |
294 | 316 | ||
295 | function sendOwnedAuthorsToPod (podId: number) { | 317 | async function sendOwnedVideosToPod (podId: number) { |
296 | return db.Author.listOwned() | 318 | const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags() |
297 | .then(authors => { | 319 | const tasks: Bluebird<any>[] = [] |
298 | const tasks = [] | 320 | |
299 | authors.forEach(author => { | 321 | for (const video of videosList) { |
300 | const remoteAuthor = author.toAddRemoteJSON() | 322 | const promise = video.toAddRemoteJSON() |
323 | .then(remoteVideo => { | ||
301 | const options = { | 324 | const options = { |
302 | type: 'add-author' as 'add-author', | 325 | type: 'add-video' as 'add-video', |
303 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 326 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
304 | data: remoteAuthor, | 327 | data: remoteVideo, |
305 | toIds: [ podId ], | 328 | toIds: [ podId ], |
306 | transaction: null | 329 | transaction: null |
307 | } | 330 | } |
308 | 331 | return createRequest(options) | |
309 | const p = createRequest(options) | ||
310 | tasks.push(p) | ||
311 | }) | 332 | }) |
312 | 333 | .catch(err => { | |
313 | return Promise.all(tasks) | 334 | logger.error('Cannot convert video to remote.', err) |
314 | }) | 335 | // Don't break the process |
315 | } | 336 | return undefined |
316 | |||
317 | function sendOwnedVideosToPod (podId: number) { | ||
318 | return db.Video.listOwnedAndPopulateAuthorAndTags() | ||
319 | .then(videosList => { | ||
320 | const tasks = [] | ||
321 | videosList.forEach(video => { | ||
322 | const promise = video.toAddRemoteJSON() | ||
323 | .then(remoteVideo => { | ||
324 | const options = { | ||
325 | type: 'add-video' as 'add-video', | ||
326 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
327 | data: remoteVideo, | ||
328 | toIds: [ podId ], | ||
329 | transaction: null | ||
330 | } | ||
331 | return createRequest(options) | ||
332 | }) | ||
333 | .catch(err => { | ||
334 | logger.error('Cannot convert video to remote.', err) | ||
335 | // Don't break the process | ||
336 | return undefined | ||
337 | }) | ||
338 | |||
339 | tasks.push(promise) | ||
340 | }) | 337 | }) |
341 | 338 | ||
342 | return Promise.all(tasks) | 339 | tasks.push(promise) |
343 | }) | 340 | } |
341 | |||
342 | await Promise.all(tasks) | ||
344 | } | 343 | } |
345 | 344 | ||
346 | function fetchRemotePreview (video: VideoInstance) { | 345 | function fetchRemotePreview (video: VideoInstance) { |
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) { | |||
350 | return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) | 349 | return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) |
351 | } | 350 | } |
352 | 351 | ||
353 | function removeFriend (pod: PodInstance) { | 352 | async function removeFriend (pod: PodInstance) { |
354 | const requestParams = { | 353 | const requestParams = { |
355 | method: 'POST' as 'POST', | 354 | method: 'POST' as 'POST', |
356 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 355 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
357 | toPod: pod | 356 | toPod: pod |
358 | } | 357 | } |
359 | 358 | ||
360 | return makeSecureRequest(requestParams) | 359 | try { |
361 | .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) | 360 | await makeSecureRequest(requestParams) |
362 | .then(() => pod.destroy()) | 361 | } catch (err) { |
363 | .then(() => logger.info('Removed friend %s.', pod.host)) | 362 | logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err) |
364 | .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) | 363 | } |
364 | |||
365 | try { | ||
366 | await pod.destroy() | ||
367 | |||
368 | logger.info('Removed friend %s.', pod.host) | ||
369 | } catch (err) { | ||
370 | logger.error('Cannot destroy friend %s.', pod.host, err) | ||
371 | } | ||
365 | } | 372 | } |
366 | 373 | ||
367 | function getRequestScheduler () { | 374 | function getRequestScheduler () { |
@@ -406,23 +413,21 @@ export { | |||
406 | 413 | ||
407 | // --------------------------------------------------------------------------- | 414 | // --------------------------------------------------------------------------- |
408 | 415 | ||
409 | function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { | 416 | async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { |
410 | // TODO: type res | 417 | const result = await getForeignPodsList(host) |
411 | return getForeignPodsList(host).then(res => { | 418 | const foreignPodsList: { host: string }[] = result.data |
412 | const foreignPodsList: { host: string }[] = res.data | ||
413 | 419 | ||
414 | // Let's give 1 point to the pod we ask the friends list | 420 | // Let's give 1 point to the pod we ask the friends list |
415 | foreignPodsList.push({ host }) | 421 | foreignPodsList.push({ host }) |
416 | 422 | ||
417 | foreignPodsList.forEach(foreignPod => { | 423 | for (const foreignPod of foreignPodsList) { |
418 | const foreignPodHost = foreignPod.host | 424 | const foreignPodHost = foreignPod.host |
419 | 425 | ||
420 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ | 426 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ |
421 | else podsScore[foreignPodHost] = 1 | 427 | else podsScore[foreignPodHost] = 1 |
422 | }) | 428 | } |
423 | 429 | ||
424 | return undefined | 430 | return undefined |
425 | }) | ||
426 | } | 431 | } |
427 | 432 | ||
428 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { | 433 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { |
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num | |||
431 | const podsList = [] | 436 | const podsList = [] |
432 | const baseScore = hosts.length / 2 | 437 | const baseScore = hosts.length / 2 |
433 | 438 | ||
434 | Object.keys(podsScore).forEach(podHost => { | 439 | for (const podHost of Object.keys(podsScore)) { |
435 | // If the pod is not me and with a good score we add it | 440 | // If the pod is not me and with a good score we add it |
436 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { | 441 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { |
437 | podsList.push({ host: podHost }) | 442 | podsList.push({ host: podHost }) |
438 | } | 443 | } |
439 | }) | 444 | } |
440 | 445 | ||
441 | return podsList | 446 | return podsList |
442 | } | 447 | } |
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) { | |||
449 | if (err) return rej(err) | 454 | if (err) return rej(err) |
450 | 455 | ||
451 | try { | 456 | try { |
452 | const json = JSON.parse(body) | 457 | const json: ResultList<FormattedPod> = JSON.parse(body) |
453 | return res(json) | 458 | return res(json) |
454 | } catch (err) { | 459 | } catch (err) { |
455 | return rej(err) | 460 | return rej(err) |
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) { | |||
458 | }) | 463 | }) |
459 | } | 464 | } |
460 | 465 | ||
461 | function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { | 466 | async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { |
462 | // Stop pool requests | 467 | // Stop pool requests |
463 | requestScheduler.deactivate() | 468 | requestScheduler.deactivate() |
464 | // Flush pool requests | 469 | // Flush pool requests |
465 | requestScheduler.forceSend() | 470 | requestScheduler.forceSend() |
466 | 471 | ||
467 | return Promise.map(podsList, pod => { | 472 | try { |
468 | const params = { | 473 | await Bluebird.map(podsList, async pod => { |
469 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', | 474 | const params = { |
470 | method: 'POST' as 'POST', | 475 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', |
471 | json: { | 476 | method: 'POST' as 'POST', |
472 | host: CONFIG.WEBSERVER.HOST, | 477 | json: { |
473 | email: CONFIG.ADMIN.EMAIL, | 478 | host: CONFIG.WEBSERVER.HOST, |
474 | publicKey: cert | 479 | email: CONFIG.ADMIN.EMAIL, |
480 | publicKey: cert | ||
481 | } | ||
475 | } | 482 | } |
476 | } | ||
477 | 483 | ||
478 | return makeRetryRequest(params) | 484 | const { response, body } = await makeRetryRequest(params) |
479 | .then(({ response, body }) => { | 485 | const typedBody = body as { cert: string, email: string } |
480 | body = body as { cert: string, email: string } | 486 | |
481 | 487 | if (response.statusCode === 200) { | |
482 | if (response.statusCode === 200) { | 488 | const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email }) |
483 | const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) | 489 | |
484 | return podObj.save() | 490 | let podCreated: PodInstance |
485 | .then(podCreated => { | 491 | try { |
486 | 492 | podCreated = await podObj.save() | |
487 | // Add our videos to the request scheduler | 493 | } catch (err) { |
488 | sendOwnedDataToPod(podCreated.id) | 494 | logger.error('Cannot add friend %s pod.', pod.host, err) |
489 | }) | ||
490 | .catch(err => { | ||
491 | logger.error('Cannot add friend %s pod.', pod.host, err) | ||
492 | }) | ||
493 | } else { | ||
494 | logger.error('Status not 200 for %s pod.', pod.host) | ||
495 | } | 495 | } |
496 | }) | 496 | |
497 | .catch(err => { | 497 | // Add our videos to the request scheduler |
498 | logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) | 498 | sendOwnedDataToPod(podCreated.id) |
499 | // Don't break the process | 499 | .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err)) |
500 | }) | 500 | } else { |
501 | }, { concurrency: REQUESTS_IN_PARALLEL }) | 501 | logger.error('Status not 200 for %s pod.', pod.host) |
502 | .then(() => logger.debug('makeRequestsToWinningPods finished.')) | 502 | } |
503 | .finally(() => { | 503 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
504 | |||
505 | logger.debug('makeRequestsToWinningPods finished.') | ||
506 | |||
507 | requestScheduler.activate() | ||
508 | } catch (err) { | ||
504 | // Final callback, we've ended all the requests | 509 | // Final callback, we've ended all the requests |
505 | // Now we made new friends, we can re activate the pool of requests | 510 | // Now we made new friends, we can re activate the pool of requests |
506 | requestScheduler.activate() | 511 | requestScheduler.activate() |
507 | }) | 512 | } |
508 | } | 513 | } |
509 | 514 | ||
510 | // Wrapper that populate "toIds" argument with all our friends if it is not specified | 515 | // Wrapper that populate "toIds" argument with all our friends if it is not specified |
@@ -515,14 +520,19 @@ type CreateRequestOptions = { | |||
515 | toIds?: number[] | 520 | toIds?: number[] |
516 | transaction: Sequelize.Transaction | 521 | transaction: Sequelize.Transaction |
517 | } | 522 | } |
518 | function createRequest (options: CreateRequestOptions) { | 523 | async function createRequest (options: CreateRequestOptions) { |
519 | if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) | 524 | if (options.toIds !== undefined) { |
525 | await requestScheduler.createRequest(options as RequestSchedulerOptions) | ||
526 | return undefined | ||
527 | } | ||
520 | 528 | ||
521 | // If the "toIds" pods is not specified, we send the request to all our friends | 529 | // If the "toIds" pods is not specified, we send the request to all our friends |
522 | return db.Pod.listAllIds(options.transaction).then(podIds => { | 530 | const podIds = await db.Pod.listAllIds(options.transaction) |
523 | const newOptions = Object.assign(options, { toIds: podIds }) | 531 | |
524 | return requestScheduler.createRequest(newOptions) | 532 | const newOptions = Object.assign(options, { toIds: podIds }) |
525 | }) | 533 | await requestScheduler.createRequest(newOptions) |
534 | |||
535 | return undefined | ||
526 | } | 536 | } |
527 | 537 | ||
528 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { | 538 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { |