aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/friends.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-10-25 16:03:33 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-10-26 09:11:38 +0200
commitf5028693a896a3076dd286ac0030e3d8f78f5ebf (patch)
tree09144ed6357e49ea575fb110247f933283ad235e /server/lib/friends.ts
parenteb08047657e739bcd9e592d76307befa3998482b (diff)
downloadPeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.gz
PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.zst
PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.zip
Use async/await in lib and initializers
Diffstat (limited to 'server/lib/friends.ts')
-rw-r--r--server/lib/friends.ts368
1 files changed, 189 insertions, 179 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts
index f035b099b..a33432dc1 100644
--- a/server/lib/friends.ts
+++ b/server/lib/friends.ts
@@ -1,6 +1,6 @@
1import * as request from 'request' 1import * as request from 'request'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import * as Promise from 'bluebird' 3import * as Bluebird from 'bluebird'
4import { join } from 'path' 4import { join } from 'path'
5 5
6import { database as db } from '../initializers/database' 6import { database as db } from '../initializers/database'
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.
188function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { 188function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
189 const tasks = [] 189 const tasks = []
190 190
191 eventsParams.forEach(eventParams => { 191 for (const eventParams of eventsParams) {
192 tasks.push(addEventToRemoteVideo(eventParams, transaction)) 192 tasks.push(addEventToRemoteVideo(eventParams, transaction))
193 }) 193 }
194 194
195 return Promise.all(tasks) 195 return Promise.all(tasks)
196} 196}
197 197
198function hasFriends () { 198async function hasFriends () {
199 return db.Pod.countAll().then(count => count !== 0) 199 const count = await db.Pod.countAll()
200
201 return count !== 0
200} 202}
201 203
202function makeFriends (hosts: string[]) { 204async function makeFriends (hosts: string[]) {
203 const podsScore = {} 205 const podsScore = {}
204 206
205 logger.info('Make friends!') 207 logger.info('Make friends!')
206 return getMyPublicCert() 208 const cert = await getMyPublicCert()
207 .then(cert => {
208 return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
209 })
210 .then(cert => {
211 logger.debug('Pods scores computed.', { podsScore: podsScore })
212 const podsList = computeWinningPods(hosts, podsScore)
213 logger.debug('Pods that we keep.', { podsToKeep: podsList })
214 209
215 return makeRequestsToWinningPods(cert, podsList) 210 for (const host of hosts) {
216 }) 211 await computeForeignPodsList(host, podsScore)
212 }
213
214 logger.debug('Pods scores computed.', { podsScore: podsScore })
215
216 const podsList = computeWinningPods(hosts, podsScore)
217 logger.debug('Pods that we keep.', { podsToKeep: podsList })
218
219 return makeRequestsToWinningPods(cert, podsList)
217} 220}
218 221
219function quitFriends () { 222async function quitFriends () {
220 // Stop pool requests 223 // Stop pool requests
221 requestScheduler.deactivate() 224 requestScheduler.deactivate()
222 225
223 return requestScheduler.flush() 226 try {
224 .then(() => { 227 await requestScheduler.flush()
225 return requestVideoQaduScheduler.flush() 228
226 }) 229 await requestVideoQaduScheduler.flush()
227 .then(() => { 230
228 return db.Pod.list() 231 const pods = await db.Pod.list()
229 }) 232 const requestParams = {
230 .then(pods => { 233 method: 'POST' as 'POST',
231 const requestParams = { 234 path: '/api/' + API_VERSION + '/remote/pods/remove',
232 method: 'POST' as 'POST', 235 toPod: null
233 path: '/api/' + API_VERSION + '/remote/pods/remove', 236 }
234 toPod: null
235 }
236 237
237 // Announce we quit them 238 // Announce we quit them
238 // We don't care if the request fails 239 // We don't care if the request fails
239 // The other pod will exclude us automatically after a while 240 // The other pod will exclude us automatically after a while
240 return Promise.map(pods, pod => { 241 try {
242 await Bluebird.map(pods, pod => {
241 requestParams.toPod = pod 243 requestParams.toPod = pod
242 244
243 return makeSecureRequest(requestParams) 245 return makeSecureRequest(requestParams)
244 }, { concurrency: REQUESTS_IN_PARALLEL }) 246 }, { concurrency: REQUESTS_IN_PARALLEL })
245 .then(() => pods) 247 } catch (err) { // Don't stop the process
246 .catch(err => { 248 logger.error('Some errors while quitting friends.', err)
247 logger.error('Some errors while quitting friends.', err) 249 }
248 // Don't stop the process
249 return pods
250 })
251 })
252 .then(pods => {
253 const tasks = []
254 pods.forEach(pod => tasks.push(pod.destroy()))
255 250
256 return Promise.all(pods) 251 const tasks = []
257 }) 252 for (const pod of pods) {
258 .then(() => { 253 tasks.push(pod.destroy())
259 logger.info('Removed all remote videos.') 254 }
260 // Don't forget to re activate the scheduler, even if there was an error 255 await Promise.all(pods)
261 return requestScheduler.activate() 256
262 }) 257 logger.info('Removed all remote videos.')
263 .finally(() => requestScheduler.activate()) 258
259 requestScheduler.activate()
260 } catch (err) {
261 // Don't forget to re activate the scheduler, even if there was an error
262 requestScheduler.activate()
263
264 throw err
265 }
264} 266}
265 267
266function sendOwnedDataToPod (podId: number) { 268async function sendOwnedDataToPod (podId: number) {
267 // First send authors 269 // First send authors
268 return sendOwnedAuthorsToPod(podId) 270 await sendOwnedAuthorsToPod(podId)
269 .then(() => sendOwnedChannelsToPod(podId)) 271 await sendOwnedChannelsToPod(podId)
270 .then(() => sendOwnedVideosToPod(podId)) 272 await sendOwnedVideosToPod(podId)
273}
274
275async function sendOwnedChannelsToPod (podId: number) {
276 const videoChannels = await db.VideoChannel.listOwned()
277
278 const tasks: Promise<any>[] = []
279 for (const videoChannel of videoChannels) {
280 const remoteVideoChannel = videoChannel.toAddRemoteJSON()
281 const options = {
282 type: 'add-channel' as 'add-channel',
283 endpoint: REQUEST_ENDPOINTS.VIDEOS,
284 data: remoteVideoChannel,
285 toIds: [ podId ],
286 transaction: null
287 }
288
289 const p = createRequest(options)
290 tasks.push(p)
291 }
292
293 await Promise.all(tasks)
271} 294}
272 295
273function sendOwnedChannelsToPod (podId: number) { 296async function sendOwnedAuthorsToPod (podId: number) {
274 return db.VideoChannel.listOwned() 297 const authors = await db.Author.listOwned()
275 .then(videoChannels => { 298 const tasks: Promise<any>[] = []
276 const tasks = []
277 videoChannels.forEach(videoChannel => {
278 const remoteVideoChannel = videoChannel.toAddRemoteJSON()
279 const options = {
280 type: 'add-channel' as 'add-channel',
281 endpoint: REQUEST_ENDPOINTS.VIDEOS,
282 data: remoteVideoChannel,
283 toIds: [ podId ],
284 transaction: null
285 }
286 299
287 const p = createRequest(options) 300 for (const author of authors) {
288 tasks.push(p) 301 const remoteAuthor = author.toAddRemoteJSON()
289 }) 302 const options = {
303 type: 'add-author' as 'add-author',
304 endpoint: REQUEST_ENDPOINTS.VIDEOS,
305 data: remoteAuthor,
306 toIds: [ podId ],
307 transaction: null
308 }
290 309
291 return Promise.all(tasks) 310 const p = createRequest(options)
292 }) 311 tasks.push(p)
312 }
313
314 await Promise.all(tasks)
293} 315}
294 316
295function sendOwnedAuthorsToPod (podId: number) { 317async function sendOwnedVideosToPod (podId: number) {
296 return db.Author.listOwned() 318 const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
297 .then(authors => { 319 const tasks: Bluebird<any>[] = []
298 const tasks = [] 320
299 authors.forEach(author => { 321 for (const video of videosList) {
300 const remoteAuthor = author.toAddRemoteJSON() 322 const promise = video.toAddRemoteJSON()
323 .then(remoteVideo => {
301 const options = { 324 const options = {
302 type: 'add-author' as 'add-author', 325 type: 'add-video' as 'add-video',
303 endpoint: REQUEST_ENDPOINTS.VIDEOS, 326 endpoint: REQUEST_ENDPOINTS.VIDEOS,
304 data: remoteAuthor, 327 data: remoteVideo,
305 toIds: [ podId ], 328 toIds: [ podId ],
306 transaction: null 329 transaction: null
307 } 330 }
308 331 return createRequest(options)
309 const p = createRequest(options)
310 tasks.push(p)
311 }) 332 })
312 333 .catch(err => {
313 return Promise.all(tasks) 334 logger.error('Cannot convert video to remote.', err)
314 }) 335 // Don't break the process
315} 336 return undefined
316
317function sendOwnedVideosToPod (podId: number) {
318 return db.Video.listOwnedAndPopulateAuthorAndTags()
319 .then(videosList => {
320 const tasks = []
321 videosList.forEach(video => {
322 const promise = video.toAddRemoteJSON()
323 .then(remoteVideo => {
324 const options = {
325 type: 'add-video' as 'add-video',
326 endpoint: REQUEST_ENDPOINTS.VIDEOS,
327 data: remoteVideo,
328 toIds: [ podId ],
329 transaction: null
330 }
331 return createRequest(options)
332 })
333 .catch(err => {
334 logger.error('Cannot convert video to remote.', err)
335 // Don't break the process
336 return undefined
337 })
338
339 tasks.push(promise)
340 }) 337 })
341 338
342 return Promise.all(tasks) 339 tasks.push(promise)
343 }) 340 }
341
342 await Promise.all(tasks)
344} 343}
345 344
346function fetchRemotePreview (video: VideoInstance) { 345function fetchRemotePreview (video: VideoInstance) {
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) {
350 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) 349 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
351} 350}
352 351
353function removeFriend (pod: PodInstance) { 352async function removeFriend (pod: PodInstance) {
354 const requestParams = { 353 const requestParams = {
355 method: 'POST' as 'POST', 354 method: 'POST' as 'POST',
356 path: '/api/' + API_VERSION + '/remote/pods/remove', 355 path: '/api/' + API_VERSION + '/remote/pods/remove',
357 toPod: pod 356 toPod: pod
358 } 357 }
359 358
360 return makeSecureRequest(requestParams) 359 try {
361 .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) 360 await makeSecureRequest(requestParams)
362 .then(() => pod.destroy()) 361 } catch (err) {
363 .then(() => logger.info('Removed friend %s.', pod.host)) 362 logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
364 .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) 363 }
364
365 try {
366 await pod.destroy()
367
368 logger.info('Removed friend %s.', pod.host)
369 } catch (err) {
370 logger.error('Cannot destroy friend %s.', pod.host, err)
371 }
365} 372}
366 373
367function getRequestScheduler () { 374function getRequestScheduler () {
@@ -406,23 +413,21 @@ export {
406 413
407// --------------------------------------------------------------------------- 414// ---------------------------------------------------------------------------
408 415
409function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { 416async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
410 // TODO: type res 417 const result = await getForeignPodsList(host)
411 return getForeignPodsList(host).then(res => { 418 const foreignPodsList: { host: string }[] = result.data
412 const foreignPodsList: { host: string }[] = res.data
413 419
414 // Let's give 1 point to the pod we ask the friends list 420 // Let's give 1 point to the pod we ask the friends list
415 foreignPodsList.push({ host }) 421 foreignPodsList.push({ host })
416 422
417 foreignPodsList.forEach(foreignPod => { 423 for (const foreignPod of foreignPodsList) {
418 const foreignPodHost = foreignPod.host 424 const foreignPodHost = foreignPod.host
419 425
420 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ 426 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
421 else podsScore[foreignPodHost] = 1 427 else podsScore[foreignPodHost] = 1
422 }) 428 }
423 429
424 return undefined 430 return undefined
425 })
426} 431}
427 432
428function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { 433function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
431 const podsList = [] 436 const podsList = []
432 const baseScore = hosts.length / 2 437 const baseScore = hosts.length / 2
433 438
434 Object.keys(podsScore).forEach(podHost => { 439 for (const podHost of Object.keys(podsScore)) {
435 // If the pod is not me and with a good score we add it 440 // If the pod is not me and with a good score we add it
436 if (isMe(podHost) === false && podsScore[podHost] > baseScore) { 441 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
437 podsList.push({ host: podHost }) 442 podsList.push({ host: podHost })
438 } 443 }
439 }) 444 }
440 445
441 return podsList 446 return podsList
442} 447}
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) {
449 if (err) return rej(err) 454 if (err) return rej(err)
450 455
451 try { 456 try {
452 const json = JSON.parse(body) 457 const json: ResultList<FormattedPod> = JSON.parse(body)
453 return res(json) 458 return res(json)
454 } catch (err) { 459 } catch (err) {
455 return rej(err) 460 return rej(err)
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) {
458 }) 463 })
459} 464}
460 465
461function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { 466async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
462 // Stop pool requests 467 // Stop pool requests
463 requestScheduler.deactivate() 468 requestScheduler.deactivate()
464 // Flush pool requests 469 // Flush pool requests
465 requestScheduler.forceSend() 470 requestScheduler.forceSend()
466 471
467 return Promise.map(podsList, pod => { 472 try {
468 const params = { 473 await Bluebird.map(podsList, async pod => {
469 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', 474 const params = {
470 method: 'POST' as 'POST', 475 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
471 json: { 476 method: 'POST' as 'POST',
472 host: CONFIG.WEBSERVER.HOST, 477 json: {
473 email: CONFIG.ADMIN.EMAIL, 478 host: CONFIG.WEBSERVER.HOST,
474 publicKey: cert 479 email: CONFIG.ADMIN.EMAIL,
480 publicKey: cert
481 }
475 } 482 }
476 }
477 483
478 return makeRetryRequest(params) 484 const { response, body } = await makeRetryRequest(params)
479 .then(({ response, body }) => { 485 const typedBody = body as { cert: string, email: string }
480 body = body as { cert: string, email: string } 486
481 487 if (response.statusCode === 200) {
482 if (response.statusCode === 200) { 488 const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
483 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) 489
484 return podObj.save() 490 let podCreated: PodInstance
485 .then(podCreated => { 491 try {
486 492 podCreated = await podObj.save()
487 // Add our videos to the request scheduler 493 } catch (err) {
488 sendOwnedDataToPod(podCreated.id) 494 logger.error('Cannot add friend %s pod.', pod.host, err)
489 })
490 .catch(err => {
491 logger.error('Cannot add friend %s pod.', pod.host, err)
492 })
493 } else {
494 logger.error('Status not 200 for %s pod.', pod.host)
495 } 495 }
496 }) 496
497 .catch(err => { 497 // Add our videos to the request scheduler
498 logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) 498 sendOwnedDataToPod(podCreated.id)
499 // Don't break the process 499 .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
500 }) 500 } else {
501 }, { concurrency: REQUESTS_IN_PARALLEL }) 501 logger.error('Status not 200 for %s pod.', pod.host)
502 .then(() => logger.debug('makeRequestsToWinningPods finished.')) 502 }
503 .finally(() => { 503 }, { concurrency: REQUESTS_IN_PARALLEL })
504
505 logger.debug('makeRequestsToWinningPods finished.')
506
507 requestScheduler.activate()
508 } catch (err) {
504 // Final callback, we've ended all the requests 509 // Final callback, we've ended all the requests
505 // Now we made new friends, we can re activate the pool of requests 510 // Now we made new friends, we can re activate the pool of requests
506 requestScheduler.activate() 511 requestScheduler.activate()
507 }) 512 }
508} 513}
509 514
510// Wrapper that populate "toIds" argument with all our friends if it is not specified 515// Wrapper that populate "toIds" argument with all our friends if it is not specified
@@ -515,14 +520,19 @@ type CreateRequestOptions = {
515 toIds?: number[] 520 toIds?: number[]
516 transaction: Sequelize.Transaction 521 transaction: Sequelize.Transaction
517} 522}
518function createRequest (options: CreateRequestOptions) { 523async function createRequest (options: CreateRequestOptions) {
519 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) 524 if (options.toIds !== undefined) {
525 await requestScheduler.createRequest(options as RequestSchedulerOptions)
526 return undefined
527 }
520 528
521 // If the "toIds" pods is not specified, we send the request to all our friends 529 // If the "toIds" pods is not specified, we send the request to all our friends
522 return db.Pod.listAllIds(options.transaction).then(podIds => { 530 const podIds = await db.Pod.listAllIds(options.transaction)
523 const newOptions = Object.assign(options, { toIds: podIds }) 531
524 return requestScheduler.createRequest(newOptions) 532 const newOptions = Object.assign(options, { toIds: podIds })
525 }) 533 await requestScheduler.createRequest(newOptions)
534
535 return undefined
526} 536}
527 537
528function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { 538function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {