X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Ffriends.ts;fp=server%2Flib%2Ffriends.ts;h=a33432dc19bd8a7bddef630c8cd5f6fa84b2af7c;hb=f5028693a896a3076dd286ac0030e3d8f78f5ebf;hp=f035b099ba68a7b44c9f1fa47e2e1e38694b5278;hpb=eb08047657e739bcd9e592d76307befa3998482b;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/friends.ts b/server/lib/friends.ts index f035b099b..a33432dc1 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts @@ -1,6 +1,6 @@ import * as request from 'request' import * as Sequelize from 'sequelize' -import * as Promise from 'bluebird' +import * as Bluebird from 'bluebird' import { join } from 'path' import { database as db } from '../initializers/database' @@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize. function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { const tasks = [] - eventsParams.forEach(eventParams => { + for (const eventParams of eventsParams) { tasks.push(addEventToRemoteVideo(eventParams, transaction)) - }) + } return Promise.all(tasks) } -function hasFriends () { - return db.Pod.countAll().then(count => count !== 0) +async function hasFriends () { + const count = await db.Pod.countAll() + + return count !== 0 } -function makeFriends (hosts: string[]) { +async function makeFriends (hosts: string[]) { const podsScore = {} logger.info('Make friends!') - return getMyPublicCert() - .then(cert => { - return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert) - }) - .then(cert => { - logger.debug('Pods scores computed.', { podsScore: podsScore }) - const podsList = computeWinningPods(hosts, podsScore) - logger.debug('Pods that we keep.', { podsToKeep: podsList }) + const cert = await getMyPublicCert() - return makeRequestsToWinningPods(cert, podsList) - }) + for (const host of hosts) { + await computeForeignPodsList(host, podsScore) + } + + logger.debug('Pods scores computed.', { podsScore: podsScore }) + + const podsList = computeWinningPods(hosts, podsScore) + logger.debug('Pods that we keep.', { podsToKeep: podsList }) + + return makeRequestsToWinningPods(cert, podsList) } -function quitFriends () { +async function quitFriends () { // Stop pool requests requestScheduler.deactivate() - return requestScheduler.flush() - .then(() => { - return requestVideoQaduScheduler.flush() - }) - .then(() => { - return db.Pod.list() - }) - .then(pods => { - const requestParams = { - method: 'POST' as 'POST', - path: '/api/' + API_VERSION + '/remote/pods/remove', - toPod: null - } + try { + await requestScheduler.flush() + + await requestVideoQaduScheduler.flush() + + const pods = await db.Pod.list() + const requestParams = { + method: 'POST' as 'POST', + path: '/api/' + API_VERSION + '/remote/pods/remove', + toPod: null + } - // Announce we quit them - // We don't care if the request fails - // The other pod will exclude us automatically after a while - return Promise.map(pods, pod => { + // Announce we quit them + // We don't care if the request fails + // The other pod will exclude us automatically after a while + try { + await Bluebird.map(pods, pod => { requestParams.toPod = pod return makeSecureRequest(requestParams) }, { concurrency: REQUESTS_IN_PARALLEL }) - .then(() => pods) - .catch(err => { - logger.error('Some errors while quitting friends.', err) - // Don't stop the process - return pods - }) - }) - .then(pods => { - const tasks = [] - pods.forEach(pod => tasks.push(pod.destroy())) + } catch (err) { // Don't stop the process + logger.error('Some errors while quitting friends.', err) + } - return Promise.all(pods) - }) - .then(() => { - logger.info('Removed all remote videos.') - // Don't forget to re activate the scheduler, even if there was an error - return requestScheduler.activate() - }) - .finally(() => requestScheduler.activate()) + const tasks = [] + for (const pod of pods) { + tasks.push(pod.destroy()) + } + await Promise.all(pods) + + logger.info('Removed all remote videos.') + + requestScheduler.activate() + } catch (err) { + // Don't forget to re activate the scheduler, even if there was an error + requestScheduler.activate() + + throw err + } } -function sendOwnedDataToPod (podId: number) { +async function sendOwnedDataToPod (podId: number) { // First send authors - return sendOwnedAuthorsToPod(podId) - .then(() => sendOwnedChannelsToPod(podId)) - .then(() => sendOwnedVideosToPod(podId)) + await sendOwnedAuthorsToPod(podId) + await sendOwnedChannelsToPod(podId) + await sendOwnedVideosToPod(podId) +} + +async function sendOwnedChannelsToPod (podId: number) { + const videoChannels = await db.VideoChannel.listOwned() + + const tasks: Promise[] = [] + for (const videoChannel of videoChannels) { + const remoteVideoChannel = videoChannel.toAddRemoteJSON() + const options = { + type: 'add-channel' as 'add-channel', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteVideoChannel, + toIds: [ podId ], + transaction: null + } + + const p = createRequest(options) + tasks.push(p) + } + + await Promise.all(tasks) } -function sendOwnedChannelsToPod (podId: number) { - return db.VideoChannel.listOwned() - .then(videoChannels => { - const tasks = [] - videoChannels.forEach(videoChannel => { - const remoteVideoChannel = videoChannel.toAddRemoteJSON() - const options = { - type: 'add-channel' as 'add-channel', - endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideoChannel, - toIds: [ podId ], - transaction: null - } +async function sendOwnedAuthorsToPod (podId: number) { + const authors = await db.Author.listOwned() + const tasks: Promise[] = [] - const p = createRequest(options) - tasks.push(p) - }) + for (const author of authors) { + const remoteAuthor = author.toAddRemoteJSON() + const options = { + type: 'add-author' as 'add-author', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteAuthor, + toIds: [ podId ], + transaction: null + } - return Promise.all(tasks) - }) + const p = createRequest(options) + tasks.push(p) + } + + await Promise.all(tasks) } -function sendOwnedAuthorsToPod (podId: number) { - return db.Author.listOwned() - .then(authors => { - const tasks = [] - authors.forEach(author => { - const remoteAuthor = author.toAddRemoteJSON() +async function sendOwnedVideosToPod (podId: number) { + const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags() + const tasks: Bluebird[] = [] + + for (const video of videosList) { + const promise = video.toAddRemoteJSON() + .then(remoteVideo => { const options = { - type: 'add-author' as 'add-author', + type: 'add-video' as 'add-video', endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteAuthor, + data: remoteVideo, toIds: [ podId ], transaction: null } - - const p = createRequest(options) - tasks.push(p) + return createRequest(options) }) - - return Promise.all(tasks) - }) -} - -function sendOwnedVideosToPod (podId: number) { - return db.Video.listOwnedAndPopulateAuthorAndTags() - .then(videosList => { - const tasks = [] - videosList.forEach(video => { - const promise = video.toAddRemoteJSON() - .then(remoteVideo => { - const options = { - type: 'add-video' as 'add-video', - endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideo, - toIds: [ podId ], - transaction: null - } - return createRequest(options) - }) - .catch(err => { - logger.error('Cannot convert video to remote.', err) - // Don't break the process - return undefined - }) - - tasks.push(promise) + .catch(err => { + logger.error('Cannot convert video to remote.', err) + // Don't break the process + return undefined }) - return Promise.all(tasks) - }) + tasks.push(promise) + } + + await Promise.all(tasks) } function fetchRemotePreview (video: VideoInstance) { @@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) { return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) } -function removeFriend (pod: PodInstance) { +async function removeFriend (pod: PodInstance) { const requestParams = { method: 'POST' as 'POST', path: '/api/' + API_VERSION + '/remote/pods/remove', toPod: pod } - return makeSecureRequest(requestParams) - .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) - .then(() => pod.destroy()) - .then(() => logger.info('Removed friend %s.', pod.host)) - .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) + try { + await makeSecureRequest(requestParams) + } catch (err) { + logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err) + } + + try { + await pod.destroy() + + logger.info('Removed friend %s.', pod.host) + } catch (err) { + logger.error('Cannot destroy friend %s.', pod.host, err) + } } function getRequestScheduler () { @@ -406,23 +413,21 @@ export { // --------------------------------------------------------------------------- -function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { - // TODO: type res - return getForeignPodsList(host).then(res => { - const foreignPodsList: { host: string }[] = res.data +async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { + const result = await getForeignPodsList(host) + const foreignPodsList: { host: string }[] = result.data - // Let's give 1 point to the pod we ask the friends list - foreignPodsList.push({ host }) + // Let's give 1 point to the pod we ask the friends list + foreignPodsList.push({ host }) - foreignPodsList.forEach(foreignPod => { - const foreignPodHost = foreignPod.host + for (const foreignPod of foreignPodsList) { + const foreignPodHost = foreignPod.host - if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ - else podsScore[foreignPodHost] = 1 - }) + if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ + else podsScore[foreignPodHost] = 1 + } - return undefined - }) + return undefined } function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { @@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num const podsList = [] const baseScore = hosts.length / 2 - Object.keys(podsScore).forEach(podHost => { + for (const podHost of Object.keys(podsScore)) { // If the pod is not me and with a good score we add it if (isMe(podHost) === false && podsScore[podHost] > baseScore) { podsList.push({ host: podHost }) } - }) + } return podsList } @@ -449,7 +454,7 @@ function getForeignPodsList (host: string) { if (err) return rej(err) try { - const json = JSON.parse(body) + const json: ResultList = JSON.parse(body) return res(json) } catch (err) { return rej(err) @@ -458,53 +463,53 @@ function getForeignPodsList (host: string) { }) } -function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { +async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { // Stop pool requests requestScheduler.deactivate() // Flush pool requests requestScheduler.forceSend() - return Promise.map(podsList, pod => { - const params = { - url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', - method: 'POST' as 'POST', - json: { - host: CONFIG.WEBSERVER.HOST, - email: CONFIG.ADMIN.EMAIL, - publicKey: cert + try { + await Bluebird.map(podsList, async pod => { + const params = { + url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', + method: 'POST' as 'POST', + json: { + host: CONFIG.WEBSERVER.HOST, + email: CONFIG.ADMIN.EMAIL, + publicKey: cert + } } - } - return makeRetryRequest(params) - .then(({ response, body }) => { - body = body as { cert: string, email: string } - - if (response.statusCode === 200) { - const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) - return podObj.save() - .then(podCreated => { - - // Add our videos to the request scheduler - sendOwnedDataToPod(podCreated.id) - }) - .catch(err => { - logger.error('Cannot add friend %s pod.', pod.host, err) - }) - } else { - logger.error('Status not 200 for %s pod.', pod.host) + const { response, body } = await makeRetryRequest(params) + const typedBody = body as { cert: string, email: string } + + if (response.statusCode === 200) { + const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email }) + + let podCreated: PodInstance + try { + podCreated = await podObj.save() + } catch (err) { + logger.error('Cannot add friend %s pod.', pod.host, err) } - }) - .catch(err => { - logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) - // Don't break the process - }) - }, { concurrency: REQUESTS_IN_PARALLEL }) - .then(() => logger.debug('makeRequestsToWinningPods finished.')) - .finally(() => { + + // Add our videos to the request scheduler + sendOwnedDataToPod(podCreated.id) + .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err)) + } else { + logger.error('Status not 200 for %s pod.', pod.host) + } + }, { concurrency: REQUESTS_IN_PARALLEL }) + + logger.debug('makeRequestsToWinningPods finished.') + + requestScheduler.activate() + } catch (err) { // Final callback, we've ended all the requests // Now we made new friends, we can re activate the pool of requests requestScheduler.activate() - }) + } } // Wrapper that populate "toIds" argument with all our friends if it is not specified @@ -515,14 +520,19 @@ type CreateRequestOptions = { toIds?: number[] transaction: Sequelize.Transaction } -function createRequest (options: CreateRequestOptions) { - if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) +async function createRequest (options: CreateRequestOptions) { + if (options.toIds !== undefined) { + await requestScheduler.createRequest(options as RequestSchedulerOptions) + return undefined + } // If the "toIds" pods is not specified, we send the request to all our friends - return db.Pod.listAllIds(options.transaction).then(podIds => { - const newOptions = Object.assign(options, { toIds: podIds }) - return requestScheduler.createRequest(newOptions) - }) + const podIds = await db.Pod.listAllIds(options.transaction) + + const newOptions = Object.assign(options, { toIds: podIds }) + await requestScheduler.createRequest(newOptions) + + return undefined } function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {