From 6fcd19ba737f1f5614a56c6925adb882dea43b8d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 5 Jul 2017 13:26:25 +0200 Subject: Move to promises Closes https://github.com/Chocobozzz/PeerTube/issues/74 --- server/lib/friends.ts | 318 +++++++++------------ server/lib/jobs/handlers/index.ts | 8 +- server/lib/jobs/handlers/video-transcoder.ts | 22 +- server/lib/jobs/job-scheduler.ts | 117 ++++---- server/lib/oauth-model.ts | 15 +- server/lib/request/abstract-request-scheduler.ts | 124 ++++---- server/lib/request/request-scheduler.ts | 32 +-- .../lib/request/request-video-event-scheduler.ts | 13 +- server/lib/request/request-video-qadu-scheduler.ts | 13 +- 9 files changed, 293 insertions(+), 369 deletions(-) (limited to 'server/lib') diff --git a/server/lib/friends.ts b/server/lib/friends.ts index 522cb82b3..498144318 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts @@ -1,6 +1,6 @@ -import { each, eachLimit, eachSeries, series, waterfall } from 'async' import * as request from 'request' import * as Sequelize from 'sequelize' +import * as Promise from 'bluebird' import { database as db } from '../initializers/database' import { @@ -15,8 +15,7 @@ import { logger, getMyPublicCert, makeSecureRequest, - makeRetryRequest, - createEmptyCallback + makeRetryRequest } from '../helpers' import { RequestScheduler, @@ -53,24 +52,24 @@ function activateSchedulers () { requestVideoEventScheduler.activate() } -function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { +function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) { const options = { type: ENDPOINT_ACTIONS.ADD, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: videoData, transaction } - createRequest(options, callback) + return createRequest(options) } -function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { +function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) { const options = { type: ENDPOINT_ACTIONS.UPDATE, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: videoData, transaction } - createRequest(options, callback) + return createRequest(options) } function removeVideoToFriends (videoParams: Object) { @@ -80,121 +79,93 @@ function removeVideoToFriends (videoParams: Object) { data: videoParams, transaction: null } - createRequest(options) + return createRequest(options) } -function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) { +function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance, transaction: Sequelize.Transaction) { const options = { type: ENDPOINT_ACTIONS.REPORT_ABUSE, endpoint: REQUEST_ENDPOINTS.VIDEOS, data: reportData, toIds: [ video.Author.podId ], - transaction: null + transaction } - createRequest(options) + return createRequest(options) } -function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { +function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction) { const options = { videoId: qaduParam.videoId, type: qaduParam.type, transaction } - return createVideoQaduRequest(options, callback) + return createVideoQaduRequest(options) } -function quickAndDirtyUpdatesVideoToFriends ( - qadusParams: QaduParam[], - transaction: Sequelize.Transaction, - finalCallback: (err: Error) => void -) { +function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction) { const tasks = [] qadusParams.forEach(function (qaduParams) { - const fun = function (callback) { - quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback) - } - - tasks.push(fun) + tasks.push(quickAndDirtyUpdateVideoToFriends(qaduParams, transaction)) }) - series(tasks, finalCallback) + return Promise.all(tasks) } -function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { +function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction) { const options = { videoId: eventParam.videoId, type: eventParam.type, transaction } - createVideoEventRequest(options, callback) + return createVideoEventRequest(options) } -function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { +function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { const tasks = [] eventsParams.forEach(function (eventParams) { - const fun = function (callback) { - addEventToRemoteVideo(eventParams, transaction, callback) - } - - tasks.push(fun) + tasks.push(addEventToRemoteVideo(eventParams, transaction)) }) - series(tasks, finalCallback) + return Promise.all(tasks) } -function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) { - db.Pod.countAll(function (err, count) { - if (err) return callback(err) - - const hasFriends = (count !== 0) - callback(null, hasFriends) - }) +function hasFriends () { + return db.Pod.countAll().then(count => count !== 0) } -function makeFriends (hosts: string[], callback: (err: Error) => void) { +function makeFriends (hosts: string[]) { const podsScore = {} logger.info('Make friends!') - getMyPublicCert(function (err, cert) { - if (err) { - logger.error('Cannot read public cert.') - return callback(err) - } - - eachSeries(hosts, function (host, callbackEach) { - computeForeignPodsList(host, podsScore, callbackEach) - }, function (err: Error) { - if (err) return callback(err) - + return getMyPublicCert() + .then(cert => { + return Promise.mapSeries(hosts, host => { + return computeForeignPodsList(host, podsScore) + }).then(() => cert) + }) + .then(cert => { logger.debug('Pods scores computed.', { podsScore: podsScore }) const podsList = computeWinningPods(hosts, podsScore) logger.debug('Pods that we keep.', { podsToKeep: podsList }) - makeRequestsToWinningPods(cert, podsList, callback) + return makeRequestsToWinningPods(cert, podsList) }) - }) } -function quitFriends (callback: (err: Error) => void) { +function quitFriends () { // Stop pool requests requestScheduler.deactivate() - waterfall([ - function flushRequests (callbackAsync) { - requestScheduler.flush(err => callbackAsync(err)) - }, - - function flushVideoQaduRequests (callbackAsync) { - requestVideoQaduScheduler.flush(err => callbackAsync(err)) - }, - - function getPodsList (callbackAsync) { - return db.Pod.list(callbackAsync) - }, - - function announceIQuitMyFriends (pods, callbackAsync) { + return requestScheduler.flush() + .then(() => { + return requestVideoQaduScheduler.flush() + }) + .then(() => { + return db.Pod.list() + }) + .then(pods => { const requestParams = { method: 'POST' as 'POST', path: '/api/' + API_VERSION + '/remote/pods/remove', @@ -205,61 +176,57 @@ function quitFriends (callback: (err: Error) => void) { // Announce we quit them // We don't care if the request fails // The other pod will exclude us automatically after a while - eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + return Promise.map(pods, pod => { requestParams.toPod = pod - makeSecureRequest(requestParams, callbackEach) - }, function (err) { - if (err) { - logger.error('Some errors while quitting friends.', { err: err }) - // Don't stop the process - } - - return callbackAsync(null, pods) + return makeSecureRequest(requestParams) + }, { concurrency: REQUESTS_IN_PARALLEL }) + .then(() => pods) + .catch(err => { + logger.error('Some errors while quitting friends.', { err: err }) + // Don't stop the process }) - }, - - function removePodsFromDB (pods, callbackAsync) { - each(pods, function (pod: any, callbackEach) { - pod.destroy().asCallback(callbackEach) - }, callbackAsync) - } - ], function (err: Error) { - // Don't forget to re activate the scheduler, even if there was an error - requestScheduler.activate() - - if (err) return callback(err) + }) + .then(pods => { + const tasks = [] + pods.forEach(pod => tasks.push(pod.destroy())) - logger.info('Removed all remote videos.') - return callback(null) - }) + return Promise.all(pods) + }) + .then(() => { + logger.info('Removed all remote videos.') + // Don't forget to re activate the scheduler, even if there was an error + return requestScheduler.activate() + }) + .finally(() => requestScheduler.activate()) } function sendOwnedVideosToPod (podId: number) { - db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { - if (err) { - logger.error('Cannot get the list of videos we own.') - return - } - - videosList.forEach(function (video) { - video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) { - logger.error('Cannot convert video to remote.', { error: err }) - // Don't break the process - return - } - - const options = { - type: 'add', - endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideo, - toIds: [ podId ], - transaction: null - } - createRequest(options) + db.Video.listOwnedAndPopulateAuthorAndTags() + .then(videosList => { + const tasks = [] + videosList.forEach(video => { + const promise = video.toAddRemoteJSON() + .then(remoteVideo => { + const options = { + type: 'add', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteVideo, + toIds: [ podId ], + transaction: null + } + return createRequest(options) + }) + .catch(err => { + logger.error('Cannot convert video to remote.', { error: err }) + // Don't break the process + return undefined + }) + + tasks.push(promise) }) + + return Promise.all(tasks) }) - }) } function getRequestScheduler () { @@ -297,23 +264,22 @@ export { // --------------------------------------------------------------------------- -function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) { - getForeignPodsList(host, function (err, res) { - if (err) return callback(err) - +function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { + // TODO: type res + return getForeignPodsList(host).then((res: any) => { const foreignPodsList = res.data // Let's give 1 point to the pod we ask the friends list foreignPodsList.push({ host }) - foreignPodsList.forEach(function (foreignPod) { + foreignPodsList.forEach(foreignPod => { const foreignPodHost = foreignPod.host if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ else podsScore[foreignPodHost] = 1 }) - return callback(null) + return undefined }) } @@ -323,7 +289,7 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num const podsList = [] const baseScore = hosts.length / 2 - Object.keys(podsScore).forEach(function (podHost) { + Object.keys(podsScore).forEach(podHost => { // If the pod is not me and with a good score we add it if (isMe(podHost) === false && podsScore[podHost] > baseScore) { podsList.push({ host: podHost }) @@ -333,28 +299,30 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num return podsList } -function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) { - const path = '/api/' + API_VERSION + '/pods' +function getForeignPodsList (host: string) { + return new Promise((res, rej) => { + const path = '/api/' + API_VERSION + '/pods' - request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { - if (err) return callback(err) + request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { + if (err) return rej(err) - try { - const json = JSON.parse(body) - return callback(null, json) - } catch (err) { - return callback(err) - } + try { + const json = JSON.parse(body) + return res(json) + } catch (err) { + return rej(err) + } + }) }) } -function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) { +function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { // Stop pool requests requestScheduler.deactivate() // Flush pool requests requestScheduler.forceSend() - eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) { + return Promise.map(podsList, pod => { const params = { url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', method: 'POST' as 'POST', @@ -365,38 +333,35 @@ function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callb } } - makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) { - if (err) { - logger.error('Error with adding %s pod.', pod.host, { error: err }) + return makeRetryRequest(params) + .then(({ response, body }) => { + body = body as { cert: string, email: string } + + if (response.statusCode === 200) { + const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) + return podObj.save() + .then(podCreated => { + + // Add our videos to the request scheduler + sendOwnedVideosToPod(podCreated.id) + }) + .catch(err => { + logger.error('Cannot add friend %s pod.', pod.host, { error: err }) + }) + } else { + logger.error('Status not 200 for %s pod.', pod.host) + } + }) + .catch(err => { + logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) // Don't break the process - return callbackEach() - } - - if (res.statusCode === 200) { - const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) - podObj.save().asCallback(function (err, podCreated) { - if (err) { - logger.error('Cannot add friend %s pod.', pod.host, { error: err }) - return callbackEach() - } - - // Add our videos to the request scheduler - sendOwnedVideosToPod(podCreated.id) - - return callbackEach() - }) - } else { - logger.error('Status not 200 for %s pod.', pod.host) - return callbackEach() - } - }) - }, function endRequests () { + }) + }, { concurrency: REQUESTS_IN_PARALLEL }) + .then(() => logger.debug('makeRequestsToWinningPods finished.')) + .finally(() => { // Final callback, we've ended all the requests // Now we made new friends, we can re activate the pool of requests requestScheduler.activate() - - logger.debug('makeRequestsToWinningPods finished.') - return callback(null) }) } @@ -408,33 +373,22 @@ type CreateRequestOptions = { toIds?: number[] transaction: Sequelize.Transaction } -function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) { - if (!callback) callback = function () { /* empty */ } - - if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback) +function createRequest (options: CreateRequestOptions) { + if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) // If the "toIds" pods is not specified, we send the request to all our friends - db.Pod.listAllIds(options.transaction, function (err, podIds) { - if (err) { - logger.error('Cannot get pod ids', { error: err }) - return - } - + return db.Pod.listAllIds(options.transaction).then(podIds => { const newOptions = Object.assign(options, { toIds: podIds }) - return requestScheduler.createRequest(newOptions, callback) + return requestScheduler.createRequest(newOptions) }) } -function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { - if (!callback) callback = createEmptyCallback() - - requestVideoQaduScheduler.createRequest(options, callback) +function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { + return requestVideoQaduScheduler.createRequest(options) } -function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { - if (!callback) callback = createEmptyCallback() - - requestVideoEventScheduler.createRequest(options, callback) +function createVideoEventRequest (options: RequestVideoEventSchedulerOptions) { + return requestVideoEventScheduler.createRequest(options) } function isMe (host: string) { diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts index 7d0263b15..8abddae35 100644 --- a/server/lib/jobs/handlers/index.ts +++ b/server/lib/jobs/handlers/index.ts @@ -1,11 +1,9 @@ import * as videoTranscoder from './video-transcoder' -import { VideoInstance } from '../../../models' - export interface JobHandler { - process (data: object, callback: (err: Error, videoInstance?: T) => void) - onError (err: Error, jobId: number, video: T, callback: (err: Error) => void) - onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void) + process (data: object): T + onError (err: Error, jobId: number) + onSuccess (jobId: number, jobResult: T) } const jobHandlers: { [ handlerName: string ]: JobHandler } = { diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts index 6f606a7d3..e829ca813 100644 --- a/server/lib/jobs/handlers/video-transcoder.ts +++ b/server/lib/jobs/handlers/video-transcoder.ts @@ -3,29 +3,23 @@ import { logger } from '../../../helpers' import { addVideoToFriends } from '../../../lib' import { VideoInstance } from '../../../models' -function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) { - db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { - if (err) return callback(err) - - video.transcodeVideofile(function (err) { - return callback(err, video) - }) +function process (data: { id: string }) { + return db.Video.loadAndPopulateAuthorAndPodAndTags(data.id).then(video => { + return video.transcodeVideofile().then(() => video) }) } -function onError (err: Error, jobId: number, video: VideoInstance, callback: (err: Error) => void) { +function onError (err: Error, jobId: number) { logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) - return callback(null) + return Promise.resolve() } -function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) { +function onSuccess (jobId: number, video: VideoInstance) { logger.info('Job %d is a success.', jobId) - video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) return callback(err) - + video.toAddRemoteJSON().then(remoteVideo => { // Now we'll add the video's meta data to our friends - addVideoToFriends(remoteVideo, null, callback) + return addVideoToFriends(remoteVideo, null) }) } diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 2f01387e7..248dc7978 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts @@ -32,37 +32,35 @@ class JobScheduler { // Finish processing jobs from a previous start const state = JOB_STATES.PROCESSING - db.Job.listWithLimit(limit, state, (err, jobs) => { - this.enqueueJobs(err, jobsQueue, jobs) - - forever( - next => { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, JOBS_FETCHING_INTERVAL) - } - - const state = JOB_STATES.PENDING - db.Job.listWithLimit(limit, state, (err, jobs) => { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(job => { - jobsQueue.push(job) - }) + db.Job.listWithLimit(limit, state) + .then(jobs => { + this.enqueueJobs(jobsQueue, jobs) + + forever( + next => { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, JOBS_FETCHING_INTERVAL) } - // Optimization: we could use "drain" from queue object - return setTimeout(next, JOBS_FETCHING_INTERVAL) - }) - }, + const state = JOB_STATES.PENDING + db.Job.listWithLimit(limit, state) + .then(jobs => { + this.enqueueJobs(jobsQueue, jobs) - err => { logger.error('Error in job scheduler queue.', { error: err }) } - ) - }) + // Optimization: we could use "drain" from queue object + return setTimeout(next, JOBS_FETCHING_INTERVAL) + }) + .catch(err => logger.error('Cannot list pending jobs.', { error: err })) + }, + + err => logger.error('Error in job scheduler queue.', { error: err }) + ) + }) + .catch(err => logger.error('Cannot list pending jobs.', { error: err })) } - createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { + createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { const createQuery = { state: JOB_STATES.PENDING, handlerName, @@ -70,67 +68,62 @@ class JobScheduler { } const options = { transaction } - db.Job.create(createQuery, options).asCallback(callback) + return db.Job.create(createQuery, options) } - private enqueueJobs (err: Error, jobsQueue: AsyncQueue, jobs: JobInstance[]) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(job => { - jobsQueue.push(job) - }) - } + private enqueueJobs (jobsQueue: AsyncQueue, jobs: JobInstance[]) { + jobs.forEach(job => jobsQueue.push(job)) } private processJob (job: JobInstance, callback: (err: Error) => void) { const jobHandler = jobHandlers[job.handlerName] + if (jobHandler === undefined) { + logger.error('Unknown job handler for job %s.', job.handlerName) + return callback(null) + } logger.info('Processing job %d with handler %s.', job.id, job.handlerName) job.state = JOB_STATES.PROCESSING - job.save().asCallback(err => { - if (err) return this.cannotSaveJobError(err, callback) - - if (jobHandler === undefined) { - logger.error('Unknown job handler for job %s.', job.handlerName) - return callback(null) - } + return job.save() + .then(() => { + return jobHandler.process(job.handlerInputData) + }) + .then( + result => { + return this.onJobSuccess(jobHandler, job, result) + }, - return jobHandler.process(job.handlerInputData, (err, result) => { - if (err) { + err => { logger.error('Error in job handler %s.', job.handlerName, { error: err }) - return this.onJobError(jobHandler, job, result, callback) + return this.onJobError(jobHandler, job, err) } - - return this.onJobSuccess(jobHandler, job, result, callback) + ) + .then(() => callback(null)) + .catch(err => { + this.cannotSaveJobError(err) + return callback(err) }) - }) } - private onJobError (jobHandler: JobHandler, job: JobInstance, jobResult: any, callback: (err: Error) => void) { + private onJobError (jobHandler: JobHandler, job: JobInstance, err: Error) { job.state = JOB_STATES.ERROR - job.save().asCallback(err => { - if (err) return this.cannotSaveJobError(err, callback) - - return jobHandler.onError(err, job.id, jobResult, callback) - }) + return job.save() + .then(() => jobHandler.onError(err, job.id)) + .catch(err => this.cannotSaveJobError(err)) } - private onJobSuccess (jobHandler: JobHandler, job: JobInstance, jobResult: any, callback: (err: Error) => void) { + private onJobSuccess (jobHandler: JobHandler, job: JobInstance, jobResult: any) { job.state = JOB_STATES.SUCCESS - job.save().asCallback(err => { - if (err) return this.cannotSaveJobError(err, callback) - - return jobHandler.onSuccess(err, job.id, jobResult, callback) - }) + return job.save() + .then(() => jobHandler.onSuccess(job.id, jobResult)) + .catch(err => this.cannotSaveJobError(err)) } - private cannotSaveJobError (err: Error, callback: (err: Error) => void) { + private cannotSaveJobError (err: Error) { logger.error('Cannot save new job state.', { error: err }) - return callback(err) } } diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts index 7cf42e94c..f34c9c667 100644 --- a/server/lib/oauth-model.ts +++ b/server/lib/oauth-model.ts @@ -30,17 +30,10 @@ function getUser (username: string, password: string) { return db.User.getByUsername(username).then(function (user) { if (!user) return null - // We need to return a promise - return new Promise(function (resolve, reject) { - return user.isPasswordMatch(password, function (err, isPasswordMatch) { - if (err) return reject(err) + return user.isPasswordMatch(password).then(passwordMatch => { + if (passwordMatch === false) return null - if (isPasswordMatch === true) { - return resolve(user) - } - - return resolve(null) - }) + return user }) }) } @@ -80,8 +73,6 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns tokenCreated.user = user return tokenCreated - }).catch(function (err) { - throw err }) } diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index e81ab9c36..dd77fddb7 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts @@ -1,15 +1,16 @@ -import * as eachLimit from 'async/eachLimit' +import { isEmpty } from 'lodash' +import * as Promise from 'bluebird' import { database as db } from '../../initializers/database' import { logger, makeSecureRequest } from '../../helpers' -import { PodInstance } from '../../models' +import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' import { API_VERSION, REQUESTS_IN_PARALLEL, REQUESTS_INTERVAL } from '../../initializers' -abstract class AbstractRequestScheduler { +abstract class AbstractRequestScheduler { requestInterval: number limitPods: number limitPerPod: number @@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler { this.requestInterval = REQUESTS_INTERVAL } - abstract getRequestModel () - abstract getRequestToPodModel () - abstract buildRequestObjects (requests: any) + abstract getRequestModel (): AbstractRequestClass + abstract getRequestToPodModel (): AbstractRequestToPodClass + abstract buildRequestObjects (requestsGrouped: T): {} activate () { logger.info('Requests scheduler activated.') @@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler { return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) } - remainingRequestsCount (callback: (err: Error, total: number) => void) { - return this.getRequestModel().countTotalRequests(callback) + remainingRequestsCount () { + return this.getRequestModel().countTotalRequests() } - flush (callback: (err: Error) => void) { - this.getRequestModel().removeAll(callback) + flush () { + return this.getRequestModel().removeAll() } // --------------------------------------------------------------------------- // Make a requests to friends of a certain type - protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { - if (!callback) callback = function () { /* empty */ } - + protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) { const params = { toPod: toPod, sign: true, // Prove our identity @@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler { // Make multiple retry requests to all of pods // The function fire some useful callbacks - makeSecureRequest(params, (err, res) => { - if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { - err = err ? err.message : 'Status code not 20x : ' + res.statusCode + return makeSecureRequest(params) + .then(({ response, body }) => { + if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { + throw new Error('Status code not 20x : ' + response.statusCode) + } + }) + .catch(err => { logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) - return callback(err) - } - - return callback(null) - }) + throw err + }) } // Make all the requests of the scheduler protected makeRequests () { - this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { - if (err) { - logger.error('Cannot get the list of "%s".', this.description, { err: err }) - return // Abort - } - - // If there are no requests, abort - if (requests.length === 0) { - logger.info('No "%s" to make.', this.description) - return - } - - // We want to group requests by destinations pod and endpoint - const requestsToMakeGrouped = this.buildRequestObjects(requests) - - logger.info('Making "%s" to friends.', this.description) - - const goodPods = [] - const badPods = [] - - eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { - const requestToMake = requestsToMakeGrouped[hashKey] - const toPod = requestToMake.toPod - - this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { - if (err) { - badPods.push(requestToMake.toPod.id) - return callbackEach() - } - - logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) - goodPods.push(requestToMake.toPod.id) - - // Remove the pod id of these request ids - this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) + return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) + .then((requestsGrouped: T) => { + // We want to group requests by destinations pod and endpoint + const requestsToMake = this.buildRequestObjects(requestsGrouped) + + // If there are no requests, abort + if (isEmpty(requestsToMake) === true) { + logger.info('No "%s" to make.', this.description) + return { goodPods: [], badPods: [] } + } + + logger.info('Making "%s" to friends.', this.description) + + const goodPods = [] + const badPods = [] + + return Promise.map(Object.keys(requestsToMake), hashKey => { + const requestToMake = requestsToMake[hashKey] + const toPod: PodInstance = requestToMake.toPod + + return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) + .then(() => { + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + this.afterRequestHook() + + // Remove the pod id of these request ids + return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) + }) + .catch(err => { + badPods.push(requestToMake.toPod.id) + logger.info('Cannot make request to %s.', toPod.host, { error: err }) + }) + }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) + }) + .then(({ goodPods, badPods }) => { + this.afterRequestsHook() - this.afterRequestHook() - }) - }, () => { // All the requests were made, we update the pods score - db.Pod.updatePodsScore(goodPods, badPods) - - this.afterRequestsHook() + return db.Pod.updatePodsScore(goodPods, badPods) }) - }) + .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) } protected afterRequestHook () { diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 575e0227c..0dd796fb0 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts @@ -3,10 +3,8 @@ import * as Sequelize from 'sequelize' import { database as db } from '../../initializers/database' import { AbstractRequestScheduler } from './abstract-request-scheduler' import { logger } from '../../helpers' -import { - REQUESTS_LIMIT_PODS, - REQUESTS_LIMIT_PER_POD -} from '../../initializers' +import { REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PER_POD } from '../../initializers' +import { RequestsGrouped } from '../../models' import { RequestEndpoint } from '../../../shared' export type RequestSchedulerOptions = { @@ -17,7 +15,7 @@ export type RequestSchedulerOptions = { transaction: Sequelize.Transaction } -class RequestScheduler extends AbstractRequestScheduler { +class RequestScheduler extends AbstractRequestScheduler { constructor () { super() @@ -36,11 +34,11 @@ class RequestScheduler extends AbstractRequestScheduler { return db.RequestToPod } - buildRequestObjects (requests: { [ toPodId: number ]: any }) { + buildRequestObjects (requestsGrouped: RequestsGrouped) { const requestsToMakeGrouped = {} - Object.keys(requests).forEach(toPodId => { - requests[toPodId].forEach(data => { + Object.keys(requestsGrouped).forEach(toPodId => { + requestsGrouped[toPodId].forEach(data => { const request = data.request const pod = data.pod const hashKey = toPodId + request.endpoint @@ -62,12 +60,12 @@ class RequestScheduler extends AbstractRequestScheduler { return requestsToMakeGrouped } - createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions, callback: (err: Error) => void) { + createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { // TODO: check the setPods works const podIds = [] // If there are no destination pods abort - if (toIds.length === 0) return callback(null) + if (toIds.length === 0) return undefined toIds.forEach(toPod => { podIds.push(toPod) @@ -85,20 +83,18 @@ class RequestScheduler extends AbstractRequestScheduler { transaction } - return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { - if (err) return callback(err) - - return request.setPods(podIds, dbRequestOptions).asCallback(callback) - }) + return db.Request.create(createQuery, dbRequestOptions) + .then(request => { + return request.setPods(podIds, dbRequestOptions) + }) } // --------------------------------------------------------------------------- afterRequestsHook () { // Flush requests with no pod - this.getRequestModel().removeWithEmptyTo(err => { - if (err) logger.error('Error when removing requests with no pods.', { error: err }) - }) + this.getRequestModel().removeWithEmptyTo() + .catch(err => logger.error('Error when removing requests with no pods.', { error: err })) } } diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 4bb76f4c9..d4d714c02 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts @@ -7,6 +7,7 @@ import { REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, REQUEST_VIDEO_EVENT_ENDPOINT } from '../../initializers' +import { RequestsVideoEventGrouped } from '../../models' import { RequestVideoEventType } from '../../../shared' export type RequestVideoEventSchedulerOptions = { @@ -16,7 +17,7 @@ export type RequestVideoEventSchedulerOptions = { transaction?: Sequelize.Transaction } -class RequestVideoEventScheduler extends AbstractRequestScheduler { +class RequestVideoEventScheduler extends AbstractRequestScheduler { constructor () { super() @@ -35,7 +36,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { return db.RequestVideoEvent } - buildRequestObjects (eventsToProcess: { [ toPodId: number ]: any }[]) { + buildRequestObjects (eventRequests: RequestsVideoEventGrouped) { const requestsToMakeGrouped = {} /* Example: @@ -50,8 +51,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { // We group video events per video and per pod // We add the counts of the same event types - Object.keys(eventsToProcess).forEach(toPodId => { - eventsToProcess[toPodId].forEach(eventToProcess => { + Object.keys(eventRequests).forEach(toPodId => { + eventRequests[toPodId].forEach(eventToProcess => { if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} if (!requestsToMakeGrouped[toPodId]) { @@ -97,7 +98,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { return requestsToMakeGrouped } - createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { + createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions) { if (count === undefined) count = 1 const dbRequestOptions: Sequelize.CreateOptions = {} @@ -109,7 +110,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { videoId } - return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) + return db.RequestVideoEvent.create(createQuery, dbRequestOptions) } } diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index d7169cc81..5ec7de9c2 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts @@ -9,6 +9,7 @@ import { REQUEST_VIDEO_QADU_ENDPOINT, REQUEST_VIDEO_QADU_TYPES } from '../../initializers' +import { RequestsVideoQaduGrouped } from '../../models' import { RequestVideoQaduType } from '../../../shared' export type RequestVideoQaduSchedulerOptions = { @@ -17,7 +18,7 @@ export type RequestVideoQaduSchedulerOptions = { transaction?: Sequelize.Transaction } -class RequestVideoQaduScheduler extends AbstractRequestScheduler { +class RequestVideoQaduScheduler extends AbstractRequestScheduler { constructor () { super() @@ -36,7 +37,7 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { return db.RequestVideoQadu } - buildRequestObjects (requests: { [ toPodId: number ]: any }[]) { + buildRequestObjects (requests: RequestsVideoQaduGrouped) { const requestsToMakeGrouped = {} Object.keys(requests).forEach(toPodId => { @@ -105,20 +106,18 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { return requestsToMakeGrouped } - createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { + createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { const dbRequestOptions: Sequelize.BulkCreateOptions = {} if (transaction) dbRequestOptions.transaction = transaction // Send the update to all our friends - db.Pod.listAllIds(transaction, function (err, podIds) { - if (err) return callback(err) - + return db.Pod.listAllIds(transaction).then(podIds => { const queries = [] podIds.forEach(podId => { queries.push({ type, videoId, podId }) }) - return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) + return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) }) } } -- cgit v1.2.3