X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Ffriends.ts;h=a658201913d9a080c1e34b097d25f7a08b5c8f26;hb=e6d4b0ff2404dcf0b3a755c3fcc415ffeb6e754d;hp=e097f925488c8c108f19b24c69a7a8da1fdb7014;hpb=69818c9394366b954b6ba3bd697bd9d2b09f2a16;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/friends.ts b/server/lib/friends.ts index e097f9254..a65820191 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts @@ -1,6 +1,6 @@ -import { each, eachLimit, eachSeries, series, waterfall } from 'async' import * as request from 'request' import * as Sequelize from 'sequelize' +import * as Promise from 'bluebird' import { database as db } from '../initializers/database' import { @@ -15,8 +15,7 @@ import { logger, getMyPublicCert, makeSecureRequest, - makeRetryRequest, - createEmptyCallback + makeRetryRequest } from '../helpers' import { RequestScheduler, @@ -28,10 +27,24 @@ import { RequestVideoEventScheduler, RequestVideoEventSchedulerOptions } from './request' -import { PodInstance, VideoInstance } from '../models' - -type QaduParam = { videoId: string, type: string } -type EventParam = { videoId: string, type: string } +import { + PodInstance, + VideoInstance +} from '../models' +import { + RequestEndpoint, + RequestVideoEventType, + RequestVideoQaduType, + RemoteVideoCreateData, + RemoteVideoUpdateData, + RemoteVideoRemoveData, + RemoteVideoReportAbuseData, + ResultList, + Pod as FormatedPod +} from '../../shared' + +type QaduParam = { videoId: string, type: RequestVideoQaduType } +type EventParam = { videoId: string, type: RequestVideoEventType } const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] @@ -45,209 +58,179 @@ function activateSchedulers () { requestVideoEventScheduler.activate() } -function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { +function addVideoToFriends (videoData: RemoteVideoCreateData, transaction: Sequelize.Transaction) { const options = { type: ENDPOINT_ACTIONS.ADD, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: videoData, transaction } - createRequest(options, callback) + return createRequest(options) } -function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { +function updateVideoToFriends (videoData: RemoteVideoUpdateData, transaction: Sequelize.Transaction) { const options = { type: ENDPOINT_ACTIONS.UPDATE, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: videoData, transaction } - createRequest(options, callback) + return createRequest(options) } -function removeVideoToFriends (videoParams: Object) { +function removeVideoToFriends (videoParams: RemoteVideoRemoveData) { const options = { type: ENDPOINT_ACTIONS.REMOVE, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: videoParams, transaction: null } - createRequest(options) + return createRequest(options) } -function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) { +function reportAbuseVideoToFriend (reportData: RemoteVideoReportAbuseData, video: VideoInstance, transaction: Sequelize.Transaction) { const options = { type: ENDPOINT_ACTIONS.REPORT_ABUSE, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: reportData, toIds: [ video.Author.podId ], - transaction: null + transaction } - createRequest(options) + return createRequest(options) } -function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { +function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction) { const options = { videoId: qaduParam.videoId, type: qaduParam.type, transaction } - return createVideoQaduRequest(options, callback) + return createVideoQaduRequest(options) } -function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { +function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction) { const tasks = [] qadusParams.forEach(function (qaduParams) { - const fun = function (callback) { - quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback) - } - - tasks.push(fun) + tasks.push(quickAndDirtyUpdateVideoToFriends(qaduParams, transaction)) }) - series(tasks, finalCallback) + return Promise.all(tasks) } -function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { +function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction) { const options = { videoId: eventParam.videoId, type: eventParam.type, transaction } - createVideoEventRequest(options, callback) + return createVideoEventRequest(options) } -function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { +function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { const tasks = [] eventsParams.forEach(function (eventParams) { - const fun = function (callback) { - addEventToRemoteVideo(eventParams, transaction, callback) - } - - tasks.push(fun) + tasks.push(addEventToRemoteVideo(eventParams, transaction)) }) - series(tasks, finalCallback) + return Promise.all(tasks) } -function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) { - db.Pod.countAll(function (err, count) { - if (err) return callback(err) - - const hasFriends = (count !== 0) - callback(null, hasFriends) - }) +function hasFriends () { + return db.Pod.countAll().then(count => count !== 0) } -function makeFriends (hosts: string[], callback: (err: Error) => void) { +function makeFriends (hosts: string[]) { const podsScore = {} logger.info('Make friends!') - getMyPublicCert(function (err, cert) { - if (err) { - logger.error('Cannot read public cert.') - return callback(err) - } - - eachSeries(hosts, function (host, callbackEach) { - computeForeignPodsList(host, podsScore, callbackEach) - }, function (err: Error) { - if (err) return callback(err) - + return getMyPublicCert() + .then(cert => { + return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert) + }) + .then(cert => { logger.debug('Pods scores computed.', { podsScore: podsScore }) const podsList = computeWinningPods(hosts, podsScore) logger.debug('Pods that we keep.', { podsToKeep: podsList }) - makeRequestsToWinningPods(cert, podsList, callback) + return makeRequestsToWinningPods(cert, podsList) }) - }) } -function quitFriends (callback: (err: Error) => void) { +function quitFriends () { // Stop pool requests requestScheduler.deactivate() - waterfall([ - function flushRequests (callbackAsync) { - requestScheduler.flush(err => callbackAsync(err)) - }, - - function flushVideoQaduRequests (callbackAsync) { - requestVideoQaduScheduler.flush(err => callbackAsync(err)) - }, - - function getPodsList (callbackAsync) { - return db.Pod.list(callbackAsync) - }, - - function announceIQuitMyFriends (pods, callbackAsync) { + return requestScheduler.flush() + .then(() => { + return requestVideoQaduScheduler.flush() + }) + .then(() => { + return db.Pod.list() + }) + .then(pods => { const requestParams = { method: 'POST' as 'POST', path: '/api/' + API_VERSION + '/remote/pods/remove', - sign: true, toPod: null } // Announce we quit them // We don't care if the request fails // The other pod will exclude us automatically after a while - eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + return Promise.map(pods, pod => { requestParams.toPod = pod - makeSecureRequest(requestParams, callbackEach) - }, function (err) { - if (err) { - logger.error('Some errors while quitting friends.', { err: err }) - // Don't stop the process - } - return callbackAsync(null, pods) + return makeSecureRequest(requestParams) + }, { concurrency: REQUESTS_IN_PARALLEL }) + .then(() => pods) + .catch(err => { + logger.error('Some errors while quitting friends.', err) + // Don't stop the process }) - }, - - function removePodsFromDB (pods, callbackAsync) { - each(pods, function (pod: any, callbackEach) { - pod.destroy().asCallback(callbackEach) - }, callbackAsync) - } - ], function (err: Error) { - // Don't forget to re activate the scheduler, even if there was an error - requestScheduler.activate() - - if (err) return callback(err) + }) + .then(pods => { + const tasks = [] + pods.forEach(pod => tasks.push(pod.destroy())) - logger.info('Removed all remote videos.') - return callback(null) - }) + return Promise.all(pods) + }) + .then(() => { + logger.info('Removed all remote videos.') + // Don't forget to re activate the scheduler, even if there was an error + return requestScheduler.activate() + }) + .finally(() => requestScheduler.activate()) } function sendOwnedVideosToPod (podId: number) { - db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { - if (err) { - logger.error('Cannot get the list of videos we own.') - return - } - - videosList.forEach(function (video) { - video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) { - logger.error('Cannot convert video to remote.', { error: err }) - // Don't break the process - return - } - - const options = { - type: 'add', - endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideo, - toIds: [ podId ], - transaction: null - } - createRequest(options) + db.Video.listOwnedAndPopulateAuthorAndTags() + .then(videosList => { + const tasks = [] + videosList.forEach(video => { + const promise = video.toAddRemoteJSON() + .then(remoteVideo => { + const options = { + type: 'add', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteVideo, + toIds: [ podId ], + transaction: null + } + return createRequest(options) + }) + .catch(err => { + logger.error('Cannot convert video to remote.', err) + // Don't break the process + return undefined + }) + + tasks.push(promise) }) + + return Promise.all(tasks) }) - }) } function getRequestScheduler () { @@ -285,23 +268,22 @@ export { // --------------------------------------------------------------------------- -function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) { - getForeignPodsList(host, function (err, res) { - if (err) return callback(err) - - const foreignPodsList = res.data +function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { + // TODO: type res + return getForeignPodsList(host).then(res => { + const foreignPodsList: { host: string }[] = res.data // Let's give 1 point to the pod we ask the friends list foreignPodsList.push({ host }) - foreignPodsList.forEach(function (foreignPod) { + foreignPodsList.forEach(foreignPod => { const foreignPodHost = foreignPod.host if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ else podsScore[foreignPodHost] = 1 }) - return callback(null) + return undefined }) } @@ -311,7 +293,7 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num const podsList = [] const baseScore = hosts.length / 2 - Object.keys(podsScore).forEach(function (podHost) { + Object.keys(podsScore).forEach(podHost => { // If the pod is not me and with a good score we add it if (isMe(podHost) === false && podsScore[podHost] > baseScore) { podsList.push({ host: podHost }) @@ -321,28 +303,30 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num return podsList } -function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) { - const path = '/api/' + API_VERSION + '/pods' +function getForeignPodsList (host: string) { + return new Promise< ResultList >((res, rej) => { + const path = '/api/' + API_VERSION + '/pods' - request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { - if (err) return callback(err) + request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { + if (err) return rej(err) - try { - const json = JSON.parse(body) - return callback(null, json) - } catch (err) { - return callback(err) - } + try { + const json = JSON.parse(body) + return res(json) + } catch (err) { + return rej(err) + } + }) }) } -function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) { +function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { // Stop pool requests requestScheduler.deactivate() // Flush pool requests requestScheduler.forceSend() - eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) { + return Promise.map(podsList, pod => { const params = { url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', method: 'POST' as 'POST', @@ -353,76 +337,62 @@ function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callb } } - makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) { - if (err) { - logger.error('Error with adding %s pod.', pod.host, { error: err }) + return makeRetryRequest(params) + .then(({ response, body }) => { + body = body as { cert: string, email: string } + + if (response.statusCode === 200) { + const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) + return podObj.save() + .then(podCreated => { + + // Add our videos to the request scheduler + sendOwnedVideosToPod(podCreated.id) + }) + .catch(err => { + logger.error('Cannot add friend %s pod.', pod.host, err) + }) + } else { + logger.error('Status not 200 for %s pod.', pod.host) + } + }) + .catch(err => { + logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) // Don't break the process - return callbackEach() - } - - if (res.statusCode === 200) { - const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) - podObj.save().asCallback(function (err, podCreated) { - if (err) { - logger.error('Cannot add friend %s pod.', pod.host, { error: err }) - return callbackEach() - } - - // Add our videos to the request scheduler - sendOwnedVideosToPod(podCreated.id) - - return callbackEach() - }) - } else { - logger.error('Status not 200 for %s pod.', pod.host) - return callbackEach() - } - }) - }, function endRequests () { + }) + }, { concurrency: REQUESTS_IN_PARALLEL }) + .then(() => logger.debug('makeRequestsToWinningPods finished.')) + .finally(() => { // Final callback, we've ended all the requests // Now we made new friends, we can re activate the pool of requests requestScheduler.activate() - - logger.debug('makeRequestsToWinningPods finished.') - return callback(null) }) } // Wrapper that populate "toIds" argument with all our friends if it is not specified type CreateRequestOptions = { type: string - endpoint: string + endpoint: RequestEndpoint data: Object toIds?: number[] transaction: Sequelize.Transaction } -function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) { - if (!callback) callback = function () { /* empty */ } - - if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback) +function createRequest (options: CreateRequestOptions) { + if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) // If the "toIds" pods is not specified, we send the request to all our friends - db.Pod.listAllIds(options.transaction, function (err, podIds) { - if (err) { - logger.error('Cannot get pod ids', { error: err }) - return - } - + return db.Pod.listAllIds(options.transaction).then(podIds => { const newOptions = Object.assign(options, { toIds: podIds }) - return requestScheduler.createRequest(newOptions, callback) + return requestScheduler.createRequest(newOptions) }) } -function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { - if (!callback) callback = createEmptyCallback() - - requestVideoQaduScheduler.createRequest(options, callback) +function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { + return requestVideoQaduScheduler.createRequest(options) } -function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { - if (!callback) callback = createEmptyCallback() - - requestVideoEventScheduler.createRequest(options, callback) +function createVideoEventRequest (options: RequestVideoEventSchedulerOptions) { + return requestVideoEventScheduler.createRequest(options) } function isMe (host: string) {