]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/friends.ts
Use async/await in lib and initializers
[github/Chocobozzz/PeerTube.git] / server / lib / friends.ts
index f035b099ba68a7b44c9f1fa47e2e1e38694b5278..a33432dc19bd8a7bddef630c8cd5f6fa84b2af7c 100644 (file)
@@ -1,6 +1,6 @@
 import * as request from 'request'
 import * as Sequelize from 'sequelize'
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
 import { join } from 'path'
 
 import { database as db } from '../initializers/database'
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.
 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
   const tasks = []
 
-  eventsParams.forEach(eventParams => {
+  for (const eventParams of eventsParams) {
     tasks.push(addEventToRemoteVideo(eventParams, transaction))
-  })
+  }
 
   return Promise.all(tasks)
 }
 
-function hasFriends () {
-  return db.Pod.countAll().then(count => count !== 0)
+async function hasFriends () {
+  const count = await db.Pod.countAll()
+
+  return count !== 0
 }
 
-function makeFriends (hosts: string[]) {
+async function makeFriends (hosts: string[]) {
   const podsScore = {}
 
   logger.info('Make friends!')
-  return getMyPublicCert()
-    .then(cert => {
-      return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
-    })
-    .then(cert => {
-      logger.debug('Pods scores computed.', { podsScore: podsScore })
-      const podsList = computeWinningPods(hosts, podsScore)
-      logger.debug('Pods that we keep.', { podsToKeep: podsList })
+  const cert = await getMyPublicCert()
 
-      return makeRequestsToWinningPods(cert, podsList)
-    })
+  for (const host of hosts) {
+    await computeForeignPodsList(host, podsScore)
+  }
+
+  logger.debug('Pods scores computed.', { podsScore: podsScore })
+
+  const podsList = computeWinningPods(hosts, podsScore)
+  logger.debug('Pods that we keep.', { podsToKeep: podsList })
+
+  return makeRequestsToWinningPods(cert, podsList)
 }
 
-function quitFriends () {
+async function quitFriends () {
   // Stop pool requests
   requestScheduler.deactivate()
 
-  return requestScheduler.flush()
-    .then(() => {
-      return requestVideoQaduScheduler.flush()
-    })
-    .then(() => {
-      return db.Pod.list()
-    })
-    .then(pods => {
-      const requestParams = {
-        method: 'POST' as 'POST',
-        path: '/api/' + API_VERSION + '/remote/pods/remove',
-        toPod: null
-      }
+  try {
+    await requestScheduler.flush()
+
+    await requestVideoQaduScheduler.flush()
+
+    const pods = await db.Pod.list()
+    const requestParams = {
+      method: 'POST' as 'POST',
+      path: '/api/' + API_VERSION + '/remote/pods/remove',
+      toPod: null
+    }
 
-      // Announce we quit them
-      // We don't care if the request fails
-      // The other pod will exclude us automatically after a while
-      return Promise.map(pods, pod => {
+    // Announce we quit them
+    // We don't care if the request fails
+    // The other pod will exclude us automatically after a while
+    try {
+      await Bluebird.map(pods, pod => {
         requestParams.toPod = pod
 
         return makeSecureRequest(requestParams)
       }, { concurrency: REQUESTS_IN_PARALLEL })
-      .then(() => pods)
-      .catch(err => {
-        logger.error('Some errors while quitting friends.', err)
-        // Don't stop the process
-        return pods
-      })
-    })
-    .then(pods => {
-      const tasks = []
-      pods.forEach(pod => tasks.push(pod.destroy()))
+    } catch (err) { // Don't stop the process
+      logger.error('Some errors while quitting friends.', err)
+    }
 
-      return Promise.all(pods)
-    })
-    .then(() => {
-      logger.info('Removed all remote videos.')
-      // Don't forget to re activate the scheduler, even if there was an error
-      return requestScheduler.activate()
-    })
-    .finally(() => requestScheduler.activate())
+    const tasks = []
+    for (const pod of pods) {
+      tasks.push(pod.destroy())
+    }
+    await Promise.all(pods)
+
+    logger.info('Removed all remote videos.')
+
+    requestScheduler.activate()
+  } catch (err) {
+    // Don't forget to re activate the scheduler, even if there was an error
+    requestScheduler.activate()
+
+    throw err
+  }
 }
 
-function sendOwnedDataToPod (podId: number) {
+async function sendOwnedDataToPod (podId: number) {
   // First send authors
-  return sendOwnedAuthorsToPod(podId)
-    .then(() => sendOwnedChannelsToPod(podId))
-    .then(() => sendOwnedVideosToPod(podId))
+  await sendOwnedAuthorsToPod(podId)
+  await sendOwnedChannelsToPod(podId)
+  await sendOwnedVideosToPod(podId)
+}
+
+async function sendOwnedChannelsToPod (podId: number) {
+  const videoChannels = await db.VideoChannel.listOwned()
+
+  const tasks: Promise<any>[] = []
+  for (const videoChannel of videoChannels) {
+    const remoteVideoChannel = videoChannel.toAddRemoteJSON()
+    const options = {
+      type: 'add-channel' as 'add-channel',
+      endpoint: REQUEST_ENDPOINTS.VIDEOS,
+      data: remoteVideoChannel,
+      toIds: [ podId ],
+      transaction: null
+    }
+
+    const p = createRequest(options)
+    tasks.push(p)
+  }
+
+  await Promise.all(tasks)
 }
 
-function sendOwnedChannelsToPod (podId: number) {
-  return db.VideoChannel.listOwned()
-    .then(videoChannels => {
-      const tasks = []
-      videoChannels.forEach(videoChannel => {
-        const remoteVideoChannel = videoChannel.toAddRemoteJSON()
-        const options = {
-          type: 'add-channel' as 'add-channel',
-          endpoint: REQUEST_ENDPOINTS.VIDEOS,
-          data: remoteVideoChannel,
-          toIds: [ podId ],
-          transaction: null
-        }
+async function sendOwnedAuthorsToPod (podId: number) {
+  const authors = await db.Author.listOwned()
+  const tasks: Promise<any>[] = []
 
-        const p = createRequest(options)
-        tasks.push(p)
-      })
+  for (const author of authors) {
+    const remoteAuthor = author.toAddRemoteJSON()
+    const options = {
+      type: 'add-author' as 'add-author',
+      endpoint: REQUEST_ENDPOINTS.VIDEOS,
+      data: remoteAuthor,
+      toIds: [ podId ],
+      transaction: null
+    }
 
-      return Promise.all(tasks)
-    })
+    const p = createRequest(options)
+    tasks.push(p)
+  }
+
+  await Promise.all(tasks)
 }
 
-function sendOwnedAuthorsToPod (podId: number) {
-  return db.Author.listOwned()
-    .then(authors => {
-      const tasks = []
-      authors.forEach(author => {
-        const remoteAuthor = author.toAddRemoteJSON()
+async function sendOwnedVideosToPod (podId: number) {
+  const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
+  const tasks: Bluebird<any>[] = []
+
+  for (const video of videosList) {
+    const promise = video.toAddRemoteJSON()
+      .then(remoteVideo => {
         const options = {
-          type: 'add-author' as 'add-author',
+          type: 'add-video' as 'add-video',
           endpoint: REQUEST_ENDPOINTS.VIDEOS,
-          data: remoteAuthor,
+          data: remoteVideo,
           toIds: [ podId ],
           transaction: null
         }
-
-        const p = createRequest(options)
-        tasks.push(p)
+        return createRequest(options)
       })
-
-      return Promise.all(tasks)
-    })
-}
-
-function sendOwnedVideosToPod (podId: number) {
-  return db.Video.listOwnedAndPopulateAuthorAndTags()
-    .then(videosList => {
-      const tasks = []
-      videosList.forEach(video => {
-        const promise = video.toAddRemoteJSON()
-          .then(remoteVideo => {
-            const options = {
-              type: 'add-video' as 'add-video',
-              endpoint: REQUEST_ENDPOINTS.VIDEOS,
-              data: remoteVideo,
-              toIds: [ podId ],
-              transaction: null
-            }
-            return createRequest(options)
-          })
-          .catch(err => {
-            logger.error('Cannot convert video to remote.', err)
-            // Don't break the process
-            return undefined
-          })
-
-        tasks.push(promise)
+      .catch(err => {
+        logger.error('Cannot convert video to remote.', err)
+        // Don't break the process
+        return undefined
       })
 
-      return Promise.all(tasks)
-    })
+    tasks.push(promise)
+  }
+
+  await Promise.all(tasks)
 }
 
 function fetchRemotePreview (video: VideoInstance) {
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) {
   return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
 }
 
-function removeFriend (pod: PodInstance) {
+async function removeFriend (pod: PodInstance) {
   const requestParams = {
     method: 'POST' as 'POST',
     path: '/api/' + API_VERSION + '/remote/pods/remove',
     toPod: pod
   }
 
-  return makeSecureRequest(requestParams)
-    .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err))
-    .then(() => pod.destroy())
-    .then(() => logger.info('Removed friend %s.', pod.host))
-    .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err))
+  try {
+    await makeSecureRequest(requestParams)
+  } catch (err) {
+    logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
+  }
+
+  try {
+    await pod.destroy()
+
+    logger.info('Removed friend %s.', pod.host)
+  } catch (err) {
+    logger.error('Cannot destroy friend %s.', pod.host, err)
+  }
 }
 
 function getRequestScheduler () {
@@ -406,23 +413,21 @@ export {
 
 // ---------------------------------------------------------------------------
 
-function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
-  // TODO: type res
-  return getForeignPodsList(host).then(res => {
-    const foreignPodsList: { host: string }[] = res.data
+async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
+  const result = await getForeignPodsList(host)
+  const foreignPodsList: { host: string }[] = result.data
 
-    // Let's give 1 point to the pod we ask the friends list
-    foreignPodsList.push({ host })
+  // Let's give 1 point to the pod we ask the friends list
+  foreignPodsList.push({ host })
 
-    foreignPodsList.forEach(foreignPod => {
-      const foreignPodHost = foreignPod.host
+  for (const foreignPod of foreignPodsList) {
+    const foreignPodHost = foreignPod.host
 
-      if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
-      else podsScore[foreignPodHost] = 1
-    })
+    if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
+    else podsScore[foreignPodHost] = 1
+  }
 
-    return undefined
-  })
+  return undefined
 }
 
 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
   const podsList = []
   const baseScore = hosts.length / 2
 
-  Object.keys(podsScore).forEach(podHost => {
+  for (const podHost of Object.keys(podsScore)) {
     // If the pod is not me and with a good score we add it
     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
       podsList.push({ host: podHost })
     }
-  })
+  }
 
   return podsList
 }
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) {
       if (err) return rej(err)
 
       try {
-        const json = JSON.parse(body)
+        const json: ResultList<FormattedPod> = JSON.parse(body)
         return res(json)
       } catch (err) {
         return rej(err)
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) {
   })
 }
 
-function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
+async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
   // Stop pool requests
   requestScheduler.deactivate()
   // Flush pool requests
   requestScheduler.forceSend()
 
-  return Promise.map(podsList, pod => {
-    const params = {
-      url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
-      method: 'POST' as 'POST',
-      json: {
-        host: CONFIG.WEBSERVER.HOST,
-        email: CONFIG.ADMIN.EMAIL,
-        publicKey: cert
+  try {
+    await Bluebird.map(podsList, async pod => {
+      const params = {
+        url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
+        method: 'POST' as 'POST',
+        json: {
+          host: CONFIG.WEBSERVER.HOST,
+          email: CONFIG.ADMIN.EMAIL,
+          publicKey: cert
+        }
       }
-    }
 
-    return makeRetryRequest(params)
-      .then(({ response, body }) => {
-        body = body as { cert: string, email: string }
-
-        if (response.statusCode === 200) {
-          const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
-          return podObj.save()
-            .then(podCreated => {
-
-              // Add our videos to the request scheduler
-              sendOwnedDataToPod(podCreated.id)
-            })
-            .catch(err => {
-              logger.error('Cannot add friend %s pod.', pod.host, err)
-            })
-        } else {
-          logger.error('Status not 200 for %s pod.', pod.host)
+      const { response, body } = await makeRetryRequest(params)
+      const typedBody = body as { cert: string, email: string }
+
+      if (response.statusCode === 200) {
+        const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
+
+        let podCreated: PodInstance
+        try {
+          podCreated = await podObj.save()
+        } catch (err) {
+          logger.error('Cannot add friend %s pod.', pod.host, err)
         }
-      })
-      .catch(err => {
-        logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
-        // Don't break the process
-      })
-  }, { concurrency: REQUESTS_IN_PARALLEL })
-  .then(() => logger.debug('makeRequestsToWinningPods finished.'))
-  .finally(() => {
+
+        // Add our videos to the request scheduler
+        sendOwnedDataToPod(podCreated.id)
+          .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
+      } else {
+        logger.error('Status not 200 for %s pod.', pod.host)
+      }
+    }, { concurrency: REQUESTS_IN_PARALLEL })
+
+    logger.debug('makeRequestsToWinningPods finished.')
+
+    requestScheduler.activate()
+  } catch (err) {
     // Final callback, we've ended all the requests
     // Now we made new friends, we can re activate the pool of requests
     requestScheduler.activate()
-  })
+  }
 }
 
 // Wrapper that populate "toIds" argument with all our friends if it is not specified
@@ -515,14 +520,19 @@ type CreateRequestOptions = {
   toIds?: number[]
   transaction: Sequelize.Transaction
 }
-function createRequest (options: CreateRequestOptions) {
-  if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
+async function createRequest (options: CreateRequestOptions) {
+  if (options.toIds !== undefined) {
+    await requestScheduler.createRequest(options as RequestSchedulerOptions)
+    return undefined
+  }
 
   // If the "toIds" pods is not specified, we send the request to all our friends
-  return db.Pod.listAllIds(options.transaction).then(podIds => {
-    const newOptions = Object.assign(options, { toIds: podIds })
-    return requestScheduler.createRequest(newOptions)
-  })
+  const podIds = await db.Pod.listAllIds(options.transaction)
+
+  const newOptions = Object.assign(options, { toIds: podIds })
+  await requestScheduler.createRequest(newOptions)
+
+  return undefined
 }
 
 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {