import * as request from 'request'
import * as Sequelize from 'sequelize'
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
import { join } from 'path'
import { database as db } from '../initializers/database'
function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
const tasks = []
- eventsParams.forEach(eventParams => {
+ for (const eventParams of eventsParams) {
tasks.push(addEventToRemoteVideo(eventParams, transaction))
- })
+ }
return Promise.all(tasks)
}
-function hasFriends () {
- return db.Pod.countAll().then(count => count !== 0)
+async function hasFriends () {
+ const count = await db.Pod.countAll()
+
+ return count !== 0
}
-function makeFriends (hosts: string[]) {
+async function makeFriends (hosts: string[]) {
const podsScore = {}
logger.info('Make friends!')
- return getMyPublicCert()
- .then(cert => {
- return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
- })
- .then(cert => {
- logger.debug('Pods scores computed.', { podsScore: podsScore })
- const podsList = computeWinningPods(hosts, podsScore)
- logger.debug('Pods that we keep.', { podsToKeep: podsList })
+ const cert = await getMyPublicCert()
- return makeRequestsToWinningPods(cert, podsList)
- })
+ for (const host of hosts) {
+ await computeForeignPodsList(host, podsScore)
+ }
+
+ logger.debug('Pods scores computed.', { podsScore: podsScore })
+
+ const podsList = computeWinningPods(hosts, podsScore)
+ logger.debug('Pods that we keep.', { podsToKeep: podsList })
+
+ return makeRequestsToWinningPods(cert, podsList)
}
-function quitFriends () {
+async function quitFriends () {
// Stop pool requests
requestScheduler.deactivate()
- return requestScheduler.flush()
- .then(() => {
- return requestVideoQaduScheduler.flush()
- })
- .then(() => {
- return db.Pod.list()
- })
- .then(pods => {
- const requestParams = {
- method: 'POST' as 'POST',
- path: '/api/' + API_VERSION + '/remote/pods/remove',
- toPod: null
- }
+ try {
+ await requestScheduler.flush()
+
+ await requestVideoQaduScheduler.flush()
+
+ const pods = await db.Pod.list()
+ const requestParams = {
+ method: 'POST' as 'POST',
+ path: '/api/' + API_VERSION + '/remote/pods/remove',
+ toPod: null
+ }
- // Announce we quit them
- // We don't care if the request fails
- // The other pod will exclude us automatically after a while
- return Promise.map(pods, pod => {
+ // Announce we quit them
+ // We don't care if the request fails
+ // The other pod will exclude us automatically after a while
+ try {
+ await Bluebird.map(pods, pod => {
requestParams.toPod = pod
return makeSecureRequest(requestParams)
}, { concurrency: REQUESTS_IN_PARALLEL })
- .then(() => pods)
- .catch(err => {
- logger.error('Some errors while quitting friends.', err)
- // Don't stop the process
- return pods
- })
- })
- .then(pods => {
- const tasks = []
- pods.forEach(pod => tasks.push(pod.destroy()))
+ } catch (err) { // Don't stop the process
+ logger.error('Some errors while quitting friends.', err)
+ }
- return Promise.all(pods)
- })
- .then(() => {
- logger.info('Removed all remote videos.')
- // Don't forget to re activate the scheduler, even if there was an error
- return requestScheduler.activate()
- })
- .finally(() => requestScheduler.activate())
+ const tasks = []
+ for (const pod of pods) {
+ tasks.push(pod.destroy())
+ }
+ await Promise.all(pods)
+
+ logger.info('Removed all remote videos.')
+
+ requestScheduler.activate()
+ } catch (err) {
+ // Don't forget to re activate the scheduler, even if there was an error
+ requestScheduler.activate()
+
+ throw err
+ }
}
-function sendOwnedDataToPod (podId: number) {
+async function sendOwnedDataToPod (podId: number) {
// First send authors
- return sendOwnedAuthorsToPod(podId)
- .then(() => sendOwnedChannelsToPod(podId))
- .then(() => sendOwnedVideosToPod(podId))
+ await sendOwnedAuthorsToPod(podId)
+ await sendOwnedChannelsToPod(podId)
+ await sendOwnedVideosToPod(podId)
+}
+
+async function sendOwnedChannelsToPod (podId: number) {
+ const videoChannels = await db.VideoChannel.listOwned()
+
+ const tasks: Promise<any>[] = []
+ for (const videoChannel of videoChannels) {
+ const remoteVideoChannel = videoChannel.toAddRemoteJSON()
+ const options = {
+ type: 'add-channel' as 'add-channel',
+ endpoint: REQUEST_ENDPOINTS.VIDEOS,
+ data: remoteVideoChannel,
+ toIds: [ podId ],
+ transaction: null
+ }
+
+ const p = createRequest(options)
+ tasks.push(p)
+ }
+
+ await Promise.all(tasks)
}
-function sendOwnedChannelsToPod (podId: number) {
- return db.VideoChannel.listOwned()
- .then(videoChannels => {
- const tasks = []
- videoChannels.forEach(videoChannel => {
- const remoteVideoChannel = videoChannel.toAddRemoteJSON()
- const options = {
- type: 'add-channel' as 'add-channel',
- endpoint: REQUEST_ENDPOINTS.VIDEOS,
- data: remoteVideoChannel,
- toIds: [ podId ],
- transaction: null
- }
+async function sendOwnedAuthorsToPod (podId: number) {
+ const authors = await db.Author.listOwned()
+ const tasks: Promise<any>[] = []
- const p = createRequest(options)
- tasks.push(p)
- })
+ for (const author of authors) {
+ const remoteAuthor = author.toAddRemoteJSON()
+ const options = {
+ type: 'add-author' as 'add-author',
+ endpoint: REQUEST_ENDPOINTS.VIDEOS,
+ data: remoteAuthor,
+ toIds: [ podId ],
+ transaction: null
+ }
- return Promise.all(tasks)
- })
+ const p = createRequest(options)
+ tasks.push(p)
+ }
+
+ await Promise.all(tasks)
}
-function sendOwnedAuthorsToPod (podId: number) {
- return db.Author.listOwned()
- .then(authors => {
- const tasks = []
- authors.forEach(author => {
- const remoteAuthor = author.toAddRemoteJSON()
+async function sendOwnedVideosToPod (podId: number) {
+ const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
+ const tasks: Bluebird<any>[] = []
+
+ for (const video of videosList) {
+ const promise = video.toAddRemoteJSON()
+ .then(remoteVideo => {
const options = {
- type: 'add-author' as 'add-author',
+ type: 'add-video' as 'add-video',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
- data: remoteAuthor,
+ data: remoteVideo,
toIds: [ podId ],
transaction: null
}
-
- const p = createRequest(options)
- tasks.push(p)
+ return createRequest(options)
})
-
- return Promise.all(tasks)
- })
-}
-
-function sendOwnedVideosToPod (podId: number) {
- return db.Video.listOwnedAndPopulateAuthorAndTags()
- .then(videosList => {
- const tasks = []
- videosList.forEach(video => {
- const promise = video.toAddRemoteJSON()
- .then(remoteVideo => {
- const options = {
- type: 'add-video' as 'add-video',
- endpoint: REQUEST_ENDPOINTS.VIDEOS,
- data: remoteVideo,
- toIds: [ podId ],
- transaction: null
- }
- return createRequest(options)
- })
- .catch(err => {
- logger.error('Cannot convert video to remote.', err)
- // Don't break the process
- return undefined
- })
-
- tasks.push(promise)
+ .catch(err => {
+ logger.error('Cannot convert video to remote.', err)
+ // Don't break the process
+ return undefined
})
- return Promise.all(tasks)
- })
+ tasks.push(promise)
+ }
+
+ await Promise.all(tasks)
}
function fetchRemotePreview (video: VideoInstance) {
return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
}
-function removeFriend (pod: PodInstance) {
+async function removeFriend (pod: PodInstance) {
const requestParams = {
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/pods/remove',
toPod: pod
}
- return makeSecureRequest(requestParams)
- .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err))
- .then(() => pod.destroy())
- .then(() => logger.info('Removed friend %s.', pod.host))
- .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err))
+ try {
+ await makeSecureRequest(requestParams)
+ } catch (err) {
+ logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
+ }
+
+ try {
+ await pod.destroy()
+
+ logger.info('Removed friend %s.', pod.host)
+ } catch (err) {
+ logger.error('Cannot destroy friend %s.', pod.host, err)
+ }
}
function getRequestScheduler () {
// ---------------------------------------------------------------------------
-function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
- // TODO: type res
- return getForeignPodsList(host).then(res => {
- const foreignPodsList: { host: string }[] = res.data
+async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
+ const result = await getForeignPodsList(host)
+ const foreignPodsList: { host: string }[] = result.data
- // Let's give 1 point to the pod we ask the friends list
- foreignPodsList.push({ host })
+ // Let's give 1 point to the pod we ask the friends list
+ foreignPodsList.push({ host })
- foreignPodsList.forEach(foreignPod => {
- const foreignPodHost = foreignPod.host
+ for (const foreignPod of foreignPodsList) {
+ const foreignPodHost = foreignPod.host
- if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
- else podsScore[foreignPodHost] = 1
- })
+ if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
+ else podsScore[foreignPodHost] = 1
+ }
- return undefined
- })
+ return undefined
}
function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
const podsList = []
const baseScore = hosts.length / 2
- Object.keys(podsScore).forEach(podHost => {
+ for (const podHost of Object.keys(podsScore)) {
// If the pod is not me and with a good score we add it
if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
podsList.push({ host: podHost })
}
- })
+ }
return podsList
}
if (err) return rej(err)
try {
- const json = JSON.parse(body)
+ const json: ResultList<FormattedPod> = JSON.parse(body)
return res(json)
} catch (err) {
return rej(err)
})
}
-function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
+async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
// Stop pool requests
requestScheduler.deactivate()
// Flush pool requests
requestScheduler.forceSend()
- return Promise.map(podsList, pod => {
- const params = {
- url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
- method: 'POST' as 'POST',
- json: {
- host: CONFIG.WEBSERVER.HOST,
- email: CONFIG.ADMIN.EMAIL,
- publicKey: cert
+ try {
+ await Bluebird.map(podsList, async pod => {
+ const params = {
+ url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
+ method: 'POST' as 'POST',
+ json: {
+ host: CONFIG.WEBSERVER.HOST,
+ email: CONFIG.ADMIN.EMAIL,
+ publicKey: cert
+ }
}
- }
- return makeRetryRequest(params)
- .then(({ response, body }) => {
- body = body as { cert: string, email: string }
-
- if (response.statusCode === 200) {
- const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
- return podObj.save()
- .then(podCreated => {
-
- // Add our videos to the request scheduler
- sendOwnedDataToPod(podCreated.id)
- })
- .catch(err => {
- logger.error('Cannot add friend %s pod.', pod.host, err)
- })
- } else {
- logger.error('Status not 200 for %s pod.', pod.host)
+ const { response, body } = await makeRetryRequest(params)
+ const typedBody = body as { cert: string, email: string }
+
+ if (response.statusCode === 200) {
+ const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
+
+ let podCreated: PodInstance
+ try {
+ podCreated = await podObj.save()
+ } catch (err) {
+ logger.error('Cannot add friend %s pod.', pod.host, err)
}
- })
- .catch(err => {
- logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
- // Don't break the process
- })
- }, { concurrency: REQUESTS_IN_PARALLEL })
- .then(() => logger.debug('makeRequestsToWinningPods finished.'))
- .finally(() => {
+
+ // Add our videos to the request scheduler
+ sendOwnedDataToPod(podCreated.id)
+ .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
+ } else {
+ logger.error('Status not 200 for %s pod.', pod.host)
+ }
+ }, { concurrency: REQUESTS_IN_PARALLEL })
+
+ logger.debug('makeRequestsToWinningPods finished.')
+
+ requestScheduler.activate()
+ } catch (err) {
// Final callback, we've ended all the requests
// Now we made new friends, we can re activate the pool of requests
requestScheduler.activate()
- })
+ }
}
// Wrapper that populate "toIds" argument with all our friends if it is not specified
toIds?: number[]
transaction: Sequelize.Transaction
}
-function createRequest (options: CreateRequestOptions) {
- if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
+async function createRequest (options: CreateRequestOptions) {
+ if (options.toIds !== undefined) {
+ await requestScheduler.createRequest(options as RequestSchedulerOptions)
+ return undefined
+ }
// If the "toIds" pods is not specified, we send the request to all our friends
- return db.Pod.listAllIds(options.transaction).then(podIds => {
- const newOptions = Object.assign(options, { toIds: podIds })
- return requestScheduler.createRequest(newOptions)
- })
+ const podIds = await db.Pod.listAllIds(options.transaction)
+
+ const newOptions = Object.assign(options, { toIds: podIds })
+ await requestScheduler.createRequest(newOptions)
+
+ return undefined
}
function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {