From 65fcc3119c334b75dd13bcfdebf186afdc580a8f Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 15 May 2017 22:22:03 +0200 Subject: First typescript iteration --- server/lib/friends.js | 405 -------------------- server/lib/friends.ts | 410 +++++++++++++++++++++ server/lib/index.ts | 4 + server/lib/jobs/handlers/index.js | 7 - server/lib/jobs/handlers/index.ts | 9 + server/lib/jobs/handlers/video-transcoder.js | 43 --- server/lib/jobs/handlers/video-transcoder.ts | 37 ++ server/lib/jobs/index.ts | 1 + server/lib/jobs/job-scheduler.js | 129 ------- server/lib/jobs/job-scheduler.ts | 137 +++++++ server/lib/oauth-model.js | 97 ----- server/lib/oauth-model.ts | 95 +++++ server/lib/request/base-request-scheduler.js | 136 ------- server/lib/request/base-request-scheduler.ts | 154 ++++++++ server/lib/request/index.ts | 3 + server/lib/request/request-scheduler.js | 97 ----- server/lib/request/request-scheduler.ts | 104 ++++++ .../lib/request/request-video-event-scheduler.js | 108 ------ .../lib/request/request-video-event-scheduler.ts | 116 ++++++ server/lib/request/request-video-qadu-scheduler.js | 117 ------ server/lib/request/request-video-qadu-scheduler.ts | 126 +++++++ 21 files changed, 1196 insertions(+), 1139 deletions(-) delete mode 100644 server/lib/friends.js create mode 100644 server/lib/friends.ts create mode 100644 server/lib/index.ts delete mode 100644 server/lib/jobs/handlers/index.js create mode 100644 server/lib/jobs/handlers/index.ts delete mode 100644 server/lib/jobs/handlers/video-transcoder.js create mode 100644 server/lib/jobs/handlers/video-transcoder.ts create mode 100644 server/lib/jobs/index.ts delete mode 100644 server/lib/jobs/job-scheduler.js create mode 100644 server/lib/jobs/job-scheduler.ts delete mode 100644 server/lib/oauth-model.js create mode 100644 server/lib/oauth-model.ts delete mode 100644 server/lib/request/base-request-scheduler.js create mode 100644 server/lib/request/base-request-scheduler.ts create mode 100644 server/lib/request/index.ts delete mode 100644 server/lib/request/request-scheduler.js create mode 100644 server/lib/request/request-scheduler.ts delete mode 100644 server/lib/request/request-video-event-scheduler.js create mode 100644 server/lib/request/request-video-event-scheduler.ts delete mode 100644 server/lib/request/request-video-qadu-scheduler.js create mode 100644 server/lib/request/request-video-qadu-scheduler.ts (limited to 'server/lib') diff --git a/server/lib/friends.js b/server/lib/friends.js deleted file mode 100644 index 6dd32406c..000000000 --- a/server/lib/friends.js +++ /dev/null @@ -1,405 +0,0 @@ -'use strict' - -const each = require('async/each') -const eachLimit = require('async/eachLimit') -const eachSeries = require('async/eachSeries') -const series = require('async/series') -const request = require('request') -const waterfall = require('async/waterfall') - -const constants = require('../initializers/constants') -const db = require('../initializers/database') -const logger = require('../helpers/logger') -const peertubeCrypto = require('../helpers/peertube-crypto') -const requests = require('../helpers/requests') -const utils = require('../helpers/utils') -const RequestScheduler = require('./request/request-scheduler') -const RequestVideoQaduScheduler = require('./request/request-video-qadu-scheduler') -const RequestVideoEventScheduler = require('./request/request-video-event-scheduler') - -const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] - -const requestScheduler = new RequestScheduler() -const requestVideoQaduScheduler = new RequestVideoQaduScheduler() -const requestVideoEventScheduler = new RequestVideoEventScheduler() - -const friends = { - activate, - addVideoToFriends, - updateVideoToFriends, - reportAbuseVideoToFriend, - quickAndDirtyUpdateVideoToFriends, - quickAndDirtyUpdatesVideoToFriends, - addEventToRemoteVideo, - addEventsToRemoteVideo, - hasFriends, - makeFriends, - quitFriends, - removeVideoToFriends, - sendOwnedVideosToPod, - getRequestScheduler, - getRequestVideoQaduScheduler, - getRequestVideoEventScheduler -} - -function activate () { - requestScheduler.activate() - requestVideoQaduScheduler.activate() - requestVideoEventScheduler.activate() -} - -function addVideoToFriends (videoData, transaction, callback) { - const options = { - type: ENDPOINT_ACTIONS.ADD, - endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, - data: videoData, - transaction - } - createRequest(options, callback) -} - -function updateVideoToFriends (videoData, transaction, callback) { - const options = { - type: ENDPOINT_ACTIONS.UPDATE, - endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, - data: videoData, - transaction - } - createRequest(options, callback) -} - -function removeVideoToFriends (videoParams) { - const options = { - type: ENDPOINT_ACTIONS.REMOVE, - endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, - data: videoParams - } - createRequest(options) -} - -function reportAbuseVideoToFriend (reportData, video) { - const options = { - type: ENDPOINT_ACTIONS.REPORT_ABUSE, - endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, - data: reportData, - toIds: [ video.Author.podId ] - } - createRequest(options) -} - -function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction, callback) { - const options = { - videoId: qaduParams.videoId, - type: qaduParams.type, - transaction - } - return createVideoQaduRequest(options, callback) -} - -function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) { - const tasks = [] - - qadusParams.forEach(function (qaduParams) { - const fun = function (callback) { - quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback) - } - - tasks.push(fun) - }) - - series(tasks, finalCallback) -} - -function addEventToRemoteVideo (eventParams, transaction, callback) { - const options = { - videoId: eventParams.videoId, - type: eventParams.type, - transaction - } - createVideoEventRequest(options, callback) -} - -function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) { - const tasks = [] - - eventsParams.forEach(function (eventParams) { - const fun = function (callback) { - addEventToRemoteVideo(eventParams, transaction, callback) - } - - tasks.push(fun) - }) - - series(tasks, finalCallback) -} - -function hasFriends (callback) { - db.Pod.countAll(function (err, count) { - if (err) return callback(err) - - const hasFriends = (count !== 0) - callback(null, hasFriends) - }) -} - -function makeFriends (hosts, callback) { - const podsScore = {} - - logger.info('Make friends!') - peertubeCrypto.getMyPublicCert(function (err, cert) { - if (err) { - logger.error('Cannot read public cert.') - return callback(err) - } - - eachSeries(hosts, function (host, callbackEach) { - computeForeignPodsList(host, podsScore, callbackEach) - }, function (err) { - if (err) return callback(err) - - logger.debug('Pods scores computed.', { podsScore: podsScore }) - const podsList = computeWinningPods(hosts, podsScore) - logger.debug('Pods that we keep.', { podsToKeep: podsList }) - - makeRequestsToWinningPods(cert, podsList, callback) - }) - }) -} - -function quitFriends (callback) { - // Stop pool requests - requestScheduler.deactivate() - - waterfall([ - function flushRequests (callbackAsync) { - requestScheduler.flush(err => callbackAsync(err)) - }, - - function flushVideoQaduRequests (callbackAsync) { - requestVideoQaduScheduler.flush(err => callbackAsync(err)) - }, - - function getPodsList (callbackAsync) { - return db.Pod.list(callbackAsync) - }, - - function announceIQuitMyFriends (pods, callbackAsync) { - const requestParams = { - method: 'POST', - path: '/api/' + constants.API_VERSION + '/remote/pods/remove', - sign: true - } - - // Announce we quit them - // We don't care if the request fails - // The other pod will exclude us automatically after a while - eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { - requestParams.toPod = pod - requests.makeSecureRequest(requestParams, callbackEach) - }, function (err) { - if (err) { - logger.error('Some errors while quitting friends.', { err: err }) - // Don't stop the process - } - - return callbackAsync(null, pods) - }) - }, - - function removePodsFromDB (pods, callbackAsync) { - each(pods, function (pod, callbackEach) { - pod.destroy().asCallback(callbackEach) - }, callbackAsync) - } - ], function (err) { - // Don't forget to re activate the scheduler, even if there was an error - requestScheduler.activate() - - if (err) return callback(err) - - logger.info('Removed all remote videos.') - return callback(null) - }) -} - -function sendOwnedVideosToPod (podId) { - db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { - if (err) { - logger.error('Cannot get the list of videos we own.') - return - } - - videosList.forEach(function (video) { - video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) { - logger.error('Cannot convert video to remote.', { error: err }) - // Don't break the process - return - } - - const options = { - type: 'add', - endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideo, - toIds: [ podId ] - } - createRequest(options) - }) - }) - }) -} - -function getRequestScheduler () { - return requestScheduler -} - -function getRequestVideoQaduScheduler () { - return requestVideoQaduScheduler -} - -function getRequestVideoEventScheduler () { - return requestVideoEventScheduler -} - -// --------------------------------------------------------------------------- - -module.exports = friends - -// --------------------------------------------------------------------------- - -function computeForeignPodsList (host, podsScore, callback) { - getForeignPodsList(host, function (err, res) { - if (err) return callback(err) - - const foreignPodsList = res.data - - // Let's give 1 point to the pod we ask the friends list - foreignPodsList.push({ host }) - - foreignPodsList.forEach(function (foreignPod) { - const foreignPodHost = foreignPod.host - - if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ - else podsScore[foreignPodHost] = 1 - }) - - return callback() - }) -} - -function computeWinningPods (hosts, podsScore) { - // Build the list of pods to add - // Only add a pod if it exists in more than a half base pods - const podsList = [] - const baseScore = hosts.length / 2 - - Object.keys(podsScore).forEach(function (podHost) { - // If the pod is not me and with a good score we add it - if (isMe(podHost) === false && podsScore[podHost] > baseScore) { - podsList.push({ host: podHost }) - } - }) - - return podsList -} - -function getForeignPodsList (host, callback) { - const path = '/api/' + constants.API_VERSION + '/pods' - - request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { - if (err) return callback(err) - - try { - const json = JSON.parse(body) - return callback(null, json) - } catch (err) { - return callback(err) - } - }) -} - -function makeRequestsToWinningPods (cert, podsList, callback) { - // Stop pool requests - requestScheduler.deactivate() - // Flush pool requests - requestScheduler.forceSend() - - eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { - const params = { - url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/', - method: 'POST', - json: { - host: constants.CONFIG.WEBSERVER.HOST, - email: constants.CONFIG.ADMIN.EMAIL, - publicKey: cert - } - } - - requests.makeRetryRequest(params, function (err, res, body) { - if (err) { - logger.error('Error with adding %s pod.', pod.host, { error: err }) - // Don't break the process - return callbackEach() - } - - if (res.statusCode === 200) { - const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) - podObj.save().asCallback(function (err, podCreated) { - if (err) { - logger.error('Cannot add friend %s pod.', pod.host, { error: err }) - return callbackEach() - } - - // Add our videos to the request scheduler - sendOwnedVideosToPod(podCreated.id) - - return callbackEach() - }) - } else { - logger.error('Status not 200 for %s pod.', pod.host) - return callbackEach() - } - }) - }, function endRequests () { - // Final callback, we've ended all the requests - // Now we made new friends, we can re activate the pool of requests - requestScheduler.activate() - - logger.debug('makeRequestsToWinningPods finished.') - return callback() - }) -} - -// Wrapper that populate "toIds" argument with all our friends if it is not specified -// { type, endpoint, data, toIds, transaction } -function createRequest (options, callback) { - if (!callback) callback = function () {} - if (options.toIds) return requestScheduler.createRequest(options, callback) - - // If the "toIds" pods is not specified, we send the request to all our friends - db.Pod.listAllIds(options.transaction, function (err, podIds) { - if (err) { - logger.error('Cannot get pod ids', { error: err }) - return - } - - const newOptions = Object.assign(options, { toIds: podIds }) - return requestScheduler.createRequest(newOptions, callback) - }) -} - -function createVideoQaduRequest (options, callback) { - if (!callback) callback = utils.createEmptyCallback() - - requestVideoQaduScheduler.createRequest(options, callback) -} - -function createVideoEventRequest (options, callback) { - if (!callback) callback = utils.createEmptyCallback() - - requestVideoEventScheduler.createRequest(options, callback) -} - -function isMe (host) { - return host === constants.CONFIG.WEBSERVER.HOST -} diff --git a/server/lib/friends.ts b/server/lib/friends.ts new file mode 100644 index 000000000..b32783019 --- /dev/null +++ b/server/lib/friends.ts @@ -0,0 +1,410 @@ +import { each, eachLimit, eachSeries, series, waterfall } from 'async' +import request = require('request') + +const db = require('../initializers/database') +import { + API_VERSION, + CONFIG, + REQUESTS_IN_PARALLEL, + REQUEST_ENDPOINTS, + REQUEST_ENDPOINT_ACTIONS, + REMOTE_SCHEME +} from '../initializers' +import { + logger, + getMyPublicCert, + makeSecureRequest, + makeRetryRequest, + createEmptyCallback +} from '../helpers' +import { + RequestScheduler, + RequestVideoQaduScheduler, + RequestVideoEventScheduler +} from './request' + +const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] + +const requestScheduler = new RequestScheduler() +const requestVideoQaduScheduler = new RequestVideoQaduScheduler() +const requestVideoEventScheduler = new RequestVideoEventScheduler() + +function activateSchedulers () { + requestScheduler.activate() + requestVideoQaduScheduler.activate() + requestVideoEventScheduler.activate() +} + +function addVideoToFriends (videoData, transaction, callback) { + const options = { + type: ENDPOINT_ACTIONS.ADD, + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: videoData, + transaction + } + createRequest(options, callback) +} + +function updateVideoToFriends (videoData, transaction, callback) { + const options = { + type: ENDPOINT_ACTIONS.UPDATE, + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: videoData, + transaction + } + createRequest(options, callback) +} + +function removeVideoToFriends (videoParams) { + const options = { + type: ENDPOINT_ACTIONS.REMOVE, + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: videoParams + } + createRequest(options) +} + +function reportAbuseVideoToFriend (reportData, video) { + const options = { + type: ENDPOINT_ACTIONS.REPORT_ABUSE, + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: reportData, + toIds: [ video.Author.podId ] + } + createRequest(options) +} + +function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction?, callback?) { + const options = { + videoId: qaduParams.videoId, + type: qaduParams.type, + transaction + } + return createVideoQaduRequest(options, callback) +} + +function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) { + const tasks = [] + + qadusParams.forEach(function (qaduParams) { + const fun = function (callback) { + quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback) + } + + tasks.push(fun) + }) + + series(tasks, finalCallback) +} + +function addEventToRemoteVideo (eventParams, transaction?, callback?) { + const options = { + videoId: eventParams.videoId, + type: eventParams.type, + transaction + } + createVideoEventRequest(options, callback) +} + +function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) { + const tasks = [] + + eventsParams.forEach(function (eventParams) { + const fun = function (callback) { + addEventToRemoteVideo(eventParams, transaction, callback) + } + + tasks.push(fun) + }) + + series(tasks, finalCallback) +} + +function hasFriends (callback) { + db.Pod.countAll(function (err, count) { + if (err) return callback(err) + + const hasFriends = (count !== 0) + callback(null, hasFriends) + }) +} + +function makeFriends (hosts, callback) { + const podsScore = {} + + logger.info('Make friends!') + getMyPublicCert(function (err, cert) { + if (err) { + logger.error('Cannot read public cert.') + return callback(err) + } + + eachSeries(hosts, function (host, callbackEach) { + computeForeignPodsList(host, podsScore, callbackEach) + }, function (err) { + if (err) return callback(err) + + logger.debug('Pods scores computed.', { podsScore: podsScore }) + const podsList = computeWinningPods(hosts, podsScore) + logger.debug('Pods that we keep.', { podsToKeep: podsList }) + + makeRequestsToWinningPods(cert, podsList, callback) + }) + }) +} + +function quitFriends (callback) { + // Stop pool requests + requestScheduler.deactivate() + + waterfall([ + function flushRequests (callbackAsync) { + requestScheduler.flush(err => callbackAsync(err)) + }, + + function flushVideoQaduRequests (callbackAsync) { + requestVideoQaduScheduler.flush(err => callbackAsync(err)) + }, + + function getPodsList (callbackAsync) { + return db.Pod.list(callbackAsync) + }, + + function announceIQuitMyFriends (pods, callbackAsync) { + const requestParams = { + method: 'POST', + path: '/api/' + API_VERSION + '/remote/pods/remove', + sign: true, + toPod: null + } + + // Announce we quit them + // We don't care if the request fails + // The other pod will exclude us automatically after a while + eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) { + requestParams.toPod = pod + makeSecureRequest(requestParams, callbackEach) + }, function (err) { + if (err) { + logger.error('Some errors while quitting friends.', { err: err }) + // Don't stop the process + } + + return callbackAsync(null, pods) + }) + }, + + function removePodsFromDB (pods, callbackAsync) { + each(pods, function (pod: any, callbackEach) { + pod.destroy().asCallback(callbackEach) + }, callbackAsync) + } + ], function (err) { + // Don't forget to re activate the scheduler, even if there was an error + requestScheduler.activate() + + if (err) return callback(err) + + logger.info('Removed all remote videos.') + return callback(null) + }) +} + +function sendOwnedVideosToPod (podId) { + db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { + if (err) { + logger.error('Cannot get the list of videos we own.') + return + } + + videosList.forEach(function (video) { + video.toAddRemoteJSON(function (err, remoteVideo) { + if (err) { + logger.error('Cannot convert video to remote.', { error: err }) + // Don't break the process + return + } + + const options = { + type: 'add', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteVideo, + toIds: [ podId ] + } + createRequest(options) + }) + }) + }) +} + +function getRequestScheduler () { + return requestScheduler +} + +function getRequestVideoQaduScheduler () { + return requestVideoQaduScheduler +} + +function getRequestVideoEventScheduler () { + return requestVideoEventScheduler +} + +// --------------------------------------------------------------------------- + +export { + activateSchedulers, + addVideoToFriends, + updateVideoToFriends, + reportAbuseVideoToFriend, + quickAndDirtyUpdateVideoToFriends, + quickAndDirtyUpdatesVideoToFriends, + addEventToRemoteVideo, + addEventsToRemoteVideo, + hasFriends, + makeFriends, + quitFriends, + removeVideoToFriends, + sendOwnedVideosToPod, + getRequestScheduler, + getRequestVideoQaduScheduler, + getRequestVideoEventScheduler +} + +// --------------------------------------------------------------------------- + +function computeForeignPodsList (host, podsScore, callback) { + getForeignPodsList(host, function (err, res) { + if (err) return callback(err) + + const foreignPodsList = res.data + + // Let's give 1 point to the pod we ask the friends list + foreignPodsList.push({ host }) + + foreignPodsList.forEach(function (foreignPod) { + const foreignPodHost = foreignPod.host + + if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ + else podsScore[foreignPodHost] = 1 + }) + + return callback() + }) +} + +function computeWinningPods (hosts, podsScore) { + // Build the list of pods to add + // Only add a pod if it exists in more than a half base pods + const podsList = [] + const baseScore = hosts.length / 2 + + Object.keys(podsScore).forEach(function (podHost) { + // If the pod is not me and with a good score we add it + if (isMe(podHost) === false && podsScore[podHost] > baseScore) { + podsList.push({ host: podHost }) + } + }) + + return podsList +} + +function getForeignPodsList (host, callback) { + const path = '/api/' + API_VERSION + '/pods' + + request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { + if (err) return callback(err) + + try { + const json = JSON.parse(body) + return callback(null, json) + } catch (err) { + return callback(err) + } + }) +} + +function makeRequestsToWinningPods (cert, podsList, callback) { + // Stop pool requests + requestScheduler.deactivate() + // Flush pool requests + requestScheduler.forceSend() + + eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: any, callbackEach) { + const params = { + url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', + method: 'POST', + json: { + host: CONFIG.WEBSERVER.HOST, + email: CONFIG.ADMIN.EMAIL, + publicKey: cert + } + } + + makeRetryRequest(params, function (err, res, body) { + if (err) { + logger.error('Error with adding %s pod.', pod.host, { error: err }) + // Don't break the process + return callbackEach() + } + + if (res.statusCode === 200) { + const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) + podObj.save().asCallback(function (err, podCreated) { + if (err) { + logger.error('Cannot add friend %s pod.', pod.host, { error: err }) + return callbackEach() + } + + // Add our videos to the request scheduler + sendOwnedVideosToPod(podCreated.id) + + return callbackEach() + }) + } else { + logger.error('Status not 200 for %s pod.', pod.host) + return callbackEach() + } + }) + }, function endRequests () { + // Final callback, we've ended all the requests + // Now we made new friends, we can re activate the pool of requests + requestScheduler.activate() + + logger.debug('makeRequestsToWinningPods finished.') + return callback() + }) +} + +// Wrapper that populate "toIds" argument with all our friends if it is not specified +// { type, endpoint, data, toIds, transaction } +function createRequest (options, callback?) { + if (!callback) callback = function () { /* empty */ } + if (options.toIds) return requestScheduler.createRequest(options, callback) + + // If the "toIds" pods is not specified, we send the request to all our friends + db.Pod.listAllIds(options.transaction, function (err, podIds) { + if (err) { + logger.error('Cannot get pod ids', { error: err }) + return + } + + const newOptions = Object.assign(options, { toIds: podIds }) + return requestScheduler.createRequest(newOptions, callback) + }) +} + +function createVideoQaduRequest (options, callback) { + if (!callback) callback = createEmptyCallback() + + requestVideoQaduScheduler.createRequest(options, callback) +} + +function createVideoEventRequest (options, callback) { + if (!callback) callback = createEmptyCallback() + + requestVideoEventScheduler.createRequest(options, callback) +} + +function isMe (host) { + return host === CONFIG.WEBSERVER.HOST +} diff --git a/server/lib/index.ts b/server/lib/index.ts new file mode 100644 index 000000000..b8697fb96 --- /dev/null +++ b/server/lib/index.ts @@ -0,0 +1,4 @@ +export * from './jobs' +export * from './request' +export * from './friends' +export * from './oauth-model' diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js deleted file mode 100644 index 59c1ccce5..000000000 --- a/server/lib/jobs/handlers/index.js +++ /dev/null @@ -1,7 +0,0 @@ -'use strict' - -const videoTranscoder = require('./video-transcoder') - -module.exports = { - videoTranscoder -} diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts new file mode 100644 index 000000000..ae5440031 --- /dev/null +++ b/server/lib/jobs/handlers/index.ts @@ -0,0 +1,9 @@ +import * as videoTranscoder from './video-transcoder' + +const jobHandlers = { + videoTranscoder +} + +export { + jobHandlers +} diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js deleted file mode 100644 index d2ad4f9c7..000000000 --- a/server/lib/jobs/handlers/video-transcoder.js +++ /dev/null @@ -1,43 +0,0 @@ -'use strict' - -const db = require('../../../initializers/database') -const logger = require('../../../helpers/logger') -const friends = require('../../../lib/friends') - -const VideoTranscoderHandler = { - process, - onError, - onSuccess -} - -// --------------------------------------------------------------------------- - -function process (data, callback) { - db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { - if (err) return callback(err) - - video.transcodeVideofile(function (err) { - return callback(err, video) - }) - }) -} - -function onError (err, jobId, video, callback) { - logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) - return callback() -} - -function onSuccess (data, jobId, video, callback) { - logger.info('Job %d is a success.', jobId) - - video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) return callback(err) - - // Now we'll add the video's meta data to our friends - friends.addVideoToFriends(remoteVideo, null, callback) - }) -} - -// --------------------------------------------------------------------------- - -module.exports = VideoTranscoderHandler diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts new file mode 100644 index 000000000..35db5fb96 --- /dev/null +++ b/server/lib/jobs/handlers/video-transcoder.ts @@ -0,0 +1,37 @@ +const db = require('../../../initializers/database') +import { logger } from '../../../helpers' +import { addVideoToFriends } from '../../../lib' + +function process (data, callback) { + db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { + if (err) return callback(err) + + video.transcodeVideofile(function (err) { + return callback(err, video) + }) + }) +} + +function onError (err, jobId, video, callback) { + logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) + return callback() +} + +function onSuccess (data, jobId, video, callback) { + logger.info('Job %d is a success.', jobId) + + video.toAddRemoteJSON(function (err, remoteVideo) { + if (err) return callback(err) + + // Now we'll add the video's meta data to our friends + addVideoToFriends(remoteVideo, null, callback) + }) +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts new file mode 100644 index 000000000..b18a3d845 --- /dev/null +++ b/server/lib/jobs/index.ts @@ -0,0 +1 @@ +export * from './job-scheduler' diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js deleted file mode 100644 index 7b239577f..000000000 --- a/server/lib/jobs/job-scheduler.js +++ /dev/null @@ -1,129 +0,0 @@ -'use strict' - -const forever = require('async/forever') -const queue = require('async/queue') - -const constants = require('../../initializers/constants') -const db = require('../../initializers/database') -const logger = require('../../helpers/logger') - -const jobHandlers = require('./handlers') - -const jobScheduler = { - activate, - createJob -} - -function activate () { - const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE - - logger.info('Jobs scheduler activated.') - - const jobsQueue = queue(processJob) - - // Finish processing jobs from a previous start - const state = constants.JOB_STATES.PROCESSING - db.Job.listWithLimit(limit, state, function (err, jobs) { - enqueueJobs(err, jobsQueue, jobs) - - forever( - function (next) { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - } - - const state = constants.JOB_STATES.PENDING - db.Job.listWithLimit(limit, state, function (err, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) - } - - // Optimization: we could use "drain" from queue object - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - }) - } - ) - }) -} - -// --------------------------------------------------------------------------- - -module.exports = jobScheduler - -// --------------------------------------------------------------------------- - -function enqueueJobs (err, jobsQueue, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) - } -} - -function createJob (transaction, handlerName, handlerInputData, callback) { - const createQuery = { - state: constants.JOB_STATES.PENDING, - handlerName, - handlerInputData - } - const options = { transaction } - - db.Job.create(createQuery, options).asCallback(callback) -} - -function processJob (job, callback) { - const jobHandler = jobHandlers[job.handlerName] - - logger.info('Processing job %d with handler %s.', job.id, job.handlerName) - - job.state = constants.JOB_STATES.PROCESSING - job.save().asCallback(function (err) { - if (err) return cannotSaveJobError(err, callback) - - if (jobHandler === undefined) { - logger.error('Unknown job handler for job %s.', jobHandler.handlerName) - return callback() - } - - return jobHandler.process(job.handlerInputData, function (err, result) { - if (err) { - logger.error('Error in job handler %s.', job.handlerName, { error: err }) - return onJobError(jobHandler, job, result, callback) - } - - return onJobSuccess(jobHandler, job, result, callback) - }) - }) -} - -function onJobError (jobHandler, job, jobResult, callback) { - job.state = constants.JOB_STATES.ERROR - - job.save().asCallback(function (err) { - if (err) return cannotSaveJobError(err, callback) - - return jobHandler.onError(err, job.id, jobResult, callback) - }) -} - -function onJobSuccess (jobHandler, job, jobResult, callback) { - job.state = constants.JOB_STATES.SUCCESS - - job.save().asCallback(function (err) { - if (err) return cannotSaveJobError(err, callback) - - return jobHandler.onSuccess(err, job.id, jobResult, callback) - }) -} - -function cannotSaveJobError (err, callback) { - logger.error('Cannot save new job state.', { error: err }) - return callback(err) -} diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts new file mode 100644 index 000000000..7b8c6faf9 --- /dev/null +++ b/server/lib/jobs/job-scheduler.ts @@ -0,0 +1,137 @@ +import { forever, queue } from 'async' + +const db = require('../../initializers/database') +import { + JOBS_FETCHING_INTERVAL, + JOBS_FETCH_LIMIT_PER_CYCLE, + JOB_STATES +} from '../../initializers' +import { logger } from '../../helpers' +import { jobHandlers } from './handlers' + +class JobScheduler { + + private static instance: JobScheduler + + private constructor () { } + + static get Instance () { + return this.instance || (this.instance = new this()) + } + + activate () { + const limit = JOBS_FETCH_LIMIT_PER_CYCLE + + logger.info('Jobs scheduler activated.') + + const jobsQueue = queue(this.processJob) + + // Finish processing jobs from a previous start + const state = JOB_STATES.PROCESSING + db.Job.listWithLimit(limit, state, (err, jobs) => { + this.enqueueJobs(err, jobsQueue, jobs) + + forever( + next => { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, JOBS_FETCHING_INTERVAL) + } + + const state = JOB_STATES.PENDING + db.Job.listWithLimit(limit, state, (err, jobs) => { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(job => { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, JOBS_FETCHING_INTERVAL) + }) + }, + + err => { logger.error('Error in job scheduler queue.', { error: err }) } + ) + }) + } + + createJob (transaction, handlerName, handlerInputData, callback) { + const createQuery = { + state: JOB_STATES.PENDING, + handlerName, + handlerInputData + } + const options = { transaction } + + db.Job.create(createQuery, options).asCallback(callback) + } + + private enqueueJobs (err, jobsQueue, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(job => { + jobsQueue.push(job) + }) + } + } + + private processJob (job, callback) { + const jobHandler = jobHandlers[job.handlerName] + + logger.info('Processing job %d with handler %s.', job.id, job.handlerName) + + job.state = JOB_STATES.PROCESSING + job.save().asCallback(err => { + if (err) return this.cannotSaveJobError(err, callback) + + if (jobHandler === undefined) { + logger.error('Unknown job handler for job %s.', jobHandler.handlerName) + return callback() + } + + return jobHandler.process(job.handlerInputData, (err, result) => { + if (err) { + logger.error('Error in job handler %s.', job.handlerName, { error: err }) + return this.onJobError(jobHandler, job, result, callback) + } + + return this.onJobSuccess(jobHandler, job, result, callback) + }) + }) + } + + private onJobError (jobHandler, job, jobResult, callback) { + job.state = JOB_STATES.ERROR + + job.save().asCallback(err => { + if (err) return this.cannotSaveJobError(err, callback) + + return jobHandler.onError(err, job.id, jobResult, callback) + }) + } + + private onJobSuccess (jobHandler, job, jobResult, callback) { + job.state = JOB_STATES.SUCCESS + + job.save().asCallback(err => { + if (err) return this.cannotSaveJobError(err, callback) + + return jobHandler.onSuccess(err, job.id, jobResult, callback) + }) + } + + private cannotSaveJobError (err, callback) { + logger.error('Cannot save new job state.', { error: err }) + return callback(err) + } +} + +// --------------------------------------------------------------------------- + +export { + JobScheduler +} diff --git a/server/lib/oauth-model.js b/server/lib/oauth-model.js deleted file mode 100644 index 1c12f1b14..000000000 --- a/server/lib/oauth-model.js +++ /dev/null @@ -1,97 +0,0 @@ -const db = require('../initializers/database') -const logger = require('../helpers/logger') - -// See https://github.com/oauthjs/node-oauth2-server/wiki/Model-specification for the model specifications -const OAuthModel = { - getAccessToken, - getClient, - getRefreshToken, - getUser, - revokeToken, - saveToken -} - -// --------------------------------------------------------------------------- - -function getAccessToken (bearerToken) { - logger.debug('Getting access token (bearerToken: ' + bearerToken + ').') - - return db.OAuthToken.getByTokenAndPopulateUser(bearerToken) -} - -function getClient (clientId, clientSecret) { - logger.debug('Getting Client (clientId: ' + clientId + ', clientSecret: ' + clientSecret + ').') - - return db.OAuthClient.getByIdAndSecret(clientId, clientSecret) -} - -function getRefreshToken (refreshToken) { - logger.debug('Getting RefreshToken (refreshToken: ' + refreshToken + ').') - - return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) -} - -function getUser (username, password) { - logger.debug('Getting User (username: ' + username + ', password: ' + password + ').') - - return db.User.getByUsername(username).then(function (user) { - if (!user) return null - - // We need to return a promise - return new Promise(function (resolve, reject) { - return user.isPasswordMatch(password, function (err, isPasswordMatch) { - if (err) return reject(err) - - if (isPasswordMatch === true) { - return resolve(user) - } - - return resolve(null) - }) - }) - }) -} - -function revokeToken (token) { - return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(function (tokenDB) { - if (tokenDB) tokenDB.destroy() - - /* - * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js - * "As per the discussion we need set older date - * revokeToken will expected return a boolean in future version - * https://github.com/oauthjs/node-oauth2-server/pull/274 - * https://github.com/oauthjs/node-oauth2-server/issues/290" - */ - const expiredToken = tokenDB - expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') - - return expiredToken - }) -} - -function saveToken (token, client, user) { - logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') - - const tokenToCreate = { - accessToken: token.accessToken, - accessTokenExpiresAt: token.accessTokenExpiresAt, - refreshToken: token.refreshToken, - refreshTokenExpiresAt: token.refreshTokenExpiresAt, - oAuthClientId: client.id, - userId: user.id - } - - return db.OAuthToken.create(tokenToCreate).then(function (tokenCreated) { - tokenCreated.client = client - tokenCreated.user = user - - return tokenCreated - }).catch(function (err) { - throw err - }) -} - -// --------------------------------------------------------------------------- - -module.exports = OAuthModel diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts new file mode 100644 index 000000000..00b1afcf5 --- /dev/null +++ b/server/lib/oauth-model.ts @@ -0,0 +1,95 @@ +const db = require('../initializers/database') +import { logger } from '../helpers' + +// --------------------------------------------------------------------------- + +function getAccessToken (bearerToken) { + logger.debug('Getting access token (bearerToken: ' + bearerToken + ').') + + return db.OAuthToken.getByTokenAndPopulateUser(bearerToken) +} + +function getClient (clientId, clientSecret) { + logger.debug('Getting Client (clientId: ' + clientId + ', clientSecret: ' + clientSecret + ').') + + return db.OAuthClient.getByIdAndSecret(clientId, clientSecret) +} + +function getRefreshToken (refreshToken) { + logger.debug('Getting RefreshToken (refreshToken: ' + refreshToken + ').') + + return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) +} + +function getUser (username, password) { + logger.debug('Getting User (username: ' + username + ', password: ' + password + ').') + + return db.User.getByUsername(username).then(function (user) { + if (!user) return null + + // We need to return a promise + return new Promise(function (resolve, reject) { + return user.isPasswordMatch(password, function (err, isPasswordMatch) { + if (err) return reject(err) + + if (isPasswordMatch === true) { + return resolve(user) + } + + return resolve(null) + }) + }) + }) +} + +function revokeToken (token) { + return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(function (tokenDB) { + if (tokenDB) tokenDB.destroy() + + /* + * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js + * "As per the discussion we need set older date + * revokeToken will expected return a boolean in future version + * https://github.com/oauthjs/node-oauth2-server/pull/274 + * https://github.com/oauthjs/node-oauth2-server/issues/290" + */ + const expiredToken = tokenDB + expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') + + return expiredToken + }) +} + +function saveToken (token, client, user) { + logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') + + const tokenToCreate = { + accessToken: token.accessToken, + accessTokenExpiresAt: token.accessTokenExpiresAt, + refreshToken: token.refreshToken, + refreshTokenExpiresAt: token.refreshTokenExpiresAt, + oAuthClientId: client.id, + userId: user.id + } + + return db.OAuthToken.create(tokenToCreate).then(function (tokenCreated) { + tokenCreated.client = client + tokenCreated.user = user + + return tokenCreated + }).catch(function (err) { + throw err + }) +} + +// --------------------------------------------------------------------------- + +// See https://github.com/oauthjs/node-oauth2-server/wiki/Model-specification for the model specifications +export { + getAccessToken, + getClient, + getRefreshToken, + getUser, + revokeToken, + saveToken +} diff --git a/server/lib/request/base-request-scheduler.js b/server/lib/request/base-request-scheduler.js deleted file mode 100644 index 782448340..000000000 --- a/server/lib/request/base-request-scheduler.js +++ /dev/null @@ -1,136 +0,0 @@ -'use strict' - -const eachLimit = require('async/eachLimit') - -const constants = require('../../initializers/constants') -const db = require('../../initializers/database') -const logger = require('../../helpers/logger') -const requests = require('../../helpers/requests') - -module.exports = class BaseRequestScheduler { - constructor (options) { - this.lastRequestTimestamp = 0 - this.timer = null - this.requestInterval = constants.REQUESTS_INTERVAL - } - - activate () { - logger.info('Requests scheduler activated.') - this.lastRequestTimestamp = Date.now() - - this.timer = setInterval(() => { - this.lastRequestTimestamp = Date.now() - this.makeRequests() - }, this.requestInterval) - } - - deactivate () { - logger.info('Requests scheduler deactivated.') - clearInterval(this.timer) - this.timer = null - } - - forceSend () { - logger.info('Force requests scheduler sending.') - this.makeRequests() - } - - remainingMilliSeconds () { - if (this.timer === null) return -1 - - return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) - } - - remainingRequestsCount (callback) { - return this.getRequestModel().countTotalRequests(callback) - } - - // --------------------------------------------------------------------------- - - // Make a requests to friends of a certain type - makeRequest (toPod, requestEndpoint, requestsToMake, callback) { - if (!callback) callback = function () {} - - const params = { - toPod: toPod, - sign: true, // Prove our identity - method: 'POST', - path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, - data: requestsToMake // Requests we need to make - } - - // Make multiple retry requests to all of pods - // The function fire some useful callbacks - requests.makeSecureRequest(params, (err, res) => { - if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { - err = err ? err.message : 'Status code not 20x : ' + res.statusCode - logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) - - return callback(err) - } - - return callback(null) - }) - } - - // Make all the requests of the scheduler - makeRequests () { - this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { - if (err) { - logger.error('Cannot get the list of "%s".', this.description, { err: err }) - return // Abort - } - - // If there are no requests, abort - if (requests.length === 0) { - logger.info('No "%s" to make.', this.description) - return - } - - // We want to group requests by destinations pod and endpoint - const requestsToMakeGrouped = this.buildRequestObjects(requests) - - logger.info('Making "%s" to friends.', this.description) - - const goodPods = [] - const badPods = [] - - eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { - const requestToMake = requestsToMakeGrouped[hashKey] - const toPod = requestToMake.toPod - - this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { - if (err) { - badPods.push(requestToMake.toPod.id) - return callbackEach() - } - - logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) - goodPods.push(requestToMake.toPod.id) - - // Remove the pod id of these request ids - this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) - - this.afterRequestHook() - }) - }, () => { - // All the requests were made, we update the pods score - db.Pod.updatePodsScore(goodPods, badPods) - - this.afterRequestsHook() - }) - }) - } - - flush (callback) { - this.getRequestModel().removeAll(callback) - } - - afterRequestHook () { - // Nothing to do, let children reimplement it - } - - afterRequestsHook () { - // Nothing to do, let children reimplement it - } -} diff --git a/server/lib/request/base-request-scheduler.ts b/server/lib/request/base-request-scheduler.ts new file mode 100644 index 000000000..7fc88b5f1 --- /dev/null +++ b/server/lib/request/base-request-scheduler.ts @@ -0,0 +1,154 @@ +import { eachLimit } from 'async/eachLimit' + +const db = require('../../initializers/database') +import { logger, makeSecureRequest } from '../../helpers' +import { + API_VERSION, + REQUESTS_IN_PARALLEL, + REQUESTS_INTERVAL +} from '../../initializers' + +abstract class BaseRequestScheduler { + protected lastRequestTimestamp: number + protected timer: NodeJS.Timer + protected requestInterval: number + protected limitPods: number + protected limitPerPod: number + protected description: string + + constructor () { + this.lastRequestTimestamp = 0 + this.timer = null + this.requestInterval = REQUESTS_INTERVAL + } + + abstract getRequestModel () + abstract getRequestToPodModel () + abstract buildRequestObjects (requests: any) + + activate () { + logger.info('Requests scheduler activated.') + this.lastRequestTimestamp = Date.now() + + this.timer = setInterval(() => { + this.lastRequestTimestamp = Date.now() + this.makeRequests() + }, this.requestInterval) + } + + deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(this.timer) + this.timer = null + } + + forceSend () { + logger.info('Force requests scheduler sending.') + this.makeRequests() + } + + remainingMilliSeconds () { + if (this.timer === null) return -1 + + return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) + } + + remainingRequestsCount (callback) { + return this.getRequestModel().countTotalRequests(callback) + } + + flush (callback) { + this.getRequestModel().removeAll(callback) + } + + // --------------------------------------------------------------------------- + + // Make a requests to friends of a certain type + protected makeRequest (toPod, requestEndpoint, requestsToMake, callback) { + if (!callback) callback = function () { /* empty */ } + + const params = { + toPod: toPod, + sign: true, // Prove our identity + method: 'POST', + path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, + data: requestsToMake // Requests we need to make + } + + // Make multiple retry requests to all of pods + // The function fire some useful callbacks + makeSecureRequest(params, (err, res) => { + if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { + err = err ? err.message : 'Status code not 20x : ' + res.statusCode + logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) + + return callback(err) + } + + return callback(null) + }) + } + + // Make all the requests of the scheduler + protected makeRequests () { + this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { + if (err) { + logger.error('Cannot get the list of "%s".', this.description, { err: err }) + return // Abort + } + + // If there are no requests, abort + if (requests.length === 0) { + logger.info('No "%s" to make.', this.description) + return + } + + // We want to group requests by destinations pod and endpoint + const requestsToMakeGrouped = this.buildRequestObjects(requests) + + logger.info('Making "%s" to friends.', this.description) + + const goodPods = [] + const badPods = [] + + eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { + const requestToMake = requestsToMakeGrouped[hashKey] + const toPod = requestToMake.toPod + + this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { + if (err) { + badPods.push(requestToMake.toPod.id) + return callbackEach() + } + + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + // Remove the pod id of these request ids + this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) + + this.afterRequestHook() + }) + }, () => { + // All the requests were made, we update the pods score + db.Pod.updatePodsScore(goodPods, badPods) + + this.afterRequestsHook() + }) + }) + } + + protected afterRequestHook () { + // Nothing to do, let children reimplement it + } + + protected afterRequestsHook () { + // Nothing to do, let children reimplement it + } +} + +// --------------------------------------------------------------------------- + +export { + BaseRequestScheduler +} diff --git a/server/lib/request/index.ts b/server/lib/request/index.ts new file mode 100644 index 000000000..c98f956db --- /dev/null +++ b/server/lib/request/index.ts @@ -0,0 +1,3 @@ +export * from './request-scheduler' +export * from './request-video-event-scheduler' +export * from './request-video-qadu-scheduler' diff --git a/server/lib/request/request-scheduler.js b/server/lib/request/request-scheduler.js deleted file mode 100644 index 555ec3e54..000000000 --- a/server/lib/request/request-scheduler.js +++ /dev/null @@ -1,97 +0,0 @@ -'use strict' - -const constants = require('../../initializers/constants') -const BaseRequestScheduler = require('./base-request-scheduler') -const db = require('../../initializers/database') -const logger = require('../../helpers/logger') - -module.exports = class RequestScheduler extends BaseRequestScheduler { - constructor () { - super() - - // We limit the size of the requests - this.limitPods = constants.REQUESTS_LIMIT_PODS - this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD - - this.description = 'requests' - } - - getRequestModel () { - return db.Request - } - - getRequestToPodModel () { - return db.RequestToPod - } - - buildRequestObjects (requests) { - const requestsToMakeGrouped = {} - - Object.keys(requests).forEach(toPodId => { - requests[toPodId].forEach(data => { - const request = data.request - const pod = data.pod - const hashKey = toPodId + request.endpoint - - if (!requestsToMakeGrouped[hashKey]) { - requestsToMakeGrouped[hashKey] = { - toPod: pod, - endpoint: request.endpoint, - ids: [], // request ids, to delete them from the DB in the future - datas: [] // requests data, - } - } - - requestsToMakeGrouped[hashKey].ids.push(request.id) - requestsToMakeGrouped[hashKey].datas.push(request.request) - }) - }) - - return requestsToMakeGrouped - } - - // { type, endpoint, data, toIds, transaction } - createRequest (options, callback) { - const type = options.type - const endpoint = options.endpoint - const data = options.data - const toIds = options.toIds - const transaction = options.transaction - - const pods = [] - - // If there are no destination pods abort - if (toIds.length === 0) return callback(null) - - toIds.forEach(toPod => { - pods.push(db.Pod.build({ id: toPod })) - }) - - const createQuery = { - endpoint, - request: { - type: type, - data: data - } - } - - const dbRequestOptions = { - transaction - } - - return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { - if (err) return callback(err) - - return request.setPods(pods, dbRequestOptions).asCallback(callback) - }) - } - - // --------------------------------------------------------------------------- - - afterRequestsHook () { - // Flush requests with no pod - this.getRequestModel().removeWithEmptyTo(err => { - if (err) logger.error('Error when removing requests with no pods.', { error: err }) - }) - } -} diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts new file mode 100644 index 000000000..2006a6f03 --- /dev/null +++ b/server/lib/request/request-scheduler.ts @@ -0,0 +1,104 @@ +const db = require('../../initializers/database') +import { BaseRequestScheduler } from './base-request-scheduler' +import { logger } from '../../helpers' +import { + REQUESTS_LIMIT_PODS, + REQUESTS_LIMIT_PER_POD +} from '../../initializers' + +class RequestScheduler extends BaseRequestScheduler { + constructor () { + super() + + // We limit the size of the requests + this.limitPods = REQUESTS_LIMIT_PODS + this.limitPerPod = REQUESTS_LIMIT_PER_POD + + this.description = 'requests' + } + + getRequestModel () { + return db.Request + } + + getRequestToPodModel () { + return db.RequestToPod + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const pod = data.pod + const hashKey = toPodId + request.endpoint + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: request.endpoint, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data, + } + } + + requestsToMakeGrouped[hashKey].ids.push(request.id) + requestsToMakeGrouped[hashKey].datas.push(request.request) + }) + }) + + return requestsToMakeGrouped + } + + // { type, endpoint, data, toIds, transaction } + createRequest (options, callback) { + const type = options.type + const endpoint = options.endpoint + const data = options.data + const toIds = options.toIds + const transaction = options.transaction + + const pods = [] + + // If there are no destination pods abort + if (toIds.length === 0) return callback(null) + + toIds.forEach(toPod => { + pods.push(db.Pod.build({ id: toPod })) + }) + + const createQuery = { + endpoint, + request: { + type: type, + data: data + } + } + + const dbRequestOptions = { + transaction + } + + return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { + if (err) return callback(err) + + return request.setPods(pods, dbRequestOptions).asCallback(callback) + }) + } + + // --------------------------------------------------------------------------- + + afterRequestsHook () { + // Flush requests with no pod + this.getRequestModel().removeWithEmptyTo(err => { + if (err) logger.error('Error when removing requests with no pods.', { error: err }) + }) + } +} + +// --------------------------------------------------------------------------- + +export { + RequestScheduler +} diff --git a/server/lib/request/request-video-event-scheduler.js b/server/lib/request/request-video-event-scheduler.js deleted file mode 100644 index e54d34f4a..000000000 --- a/server/lib/request/request-video-event-scheduler.js +++ /dev/null @@ -1,108 +0,0 @@ -'use strict' - -const BaseRequestScheduler = require('./base-request-scheduler') -const constants = require('../../initializers/constants') -const db = require('../../initializers/database') - -module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler { - constructor () { - super() - - // We limit the size of the requests - this.limitPods = constants.REQUESTS_VIDEO_EVENT_LIMIT_PODS - this.limitPerPod = constants.REQUESTS_VIDEO_EVENT_LIMIT_PER_POD - - this.description = 'video event requests' - } - - getRequestModel () { - return db.RequestVideoEvent - } - - getRequestToPodModel () { - return db.RequestVideoEvent - } - - buildRequestObjects (eventsToProcess) { - const requestsToMakeGrouped = {} - - /* Example: - { - pod1: { - video1: { views: 4, likes: 5 }, - video2: { likes: 5 } - } - } - */ - const eventsPerVideoPerPod = {} - - // We group video events per video and per pod - // We add the counts of the same event types - Object.keys(eventsToProcess).forEach(toPodId => { - eventsToProcess[toPodId].forEach(eventToProcess => { - if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} - - if (!requestsToMakeGrouped[toPodId]) { - requestsToMakeGrouped[toPodId] = { - toPod: eventToProcess.pod, - endpoint: constants.REQUEST_VIDEO_EVENT_ENDPOINT, - ids: [], // request ids, to delete them from the DB in the future - datas: [] // requests data - } - } - requestsToMakeGrouped[toPodId].ids.push(eventToProcess.id) - - const eventsPerVideo = eventsPerVideoPerPod[toPodId] - const remoteId = eventToProcess.video.remoteId - if (!eventsPerVideo[remoteId]) eventsPerVideo[remoteId] = {} - - const events = eventsPerVideo[remoteId] - if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 - - events[eventToProcess.type] += eventToProcess.count - }) - }) - - // Now we build our requests array per pod - Object.keys(eventsPerVideoPerPod).forEach(toPodId => { - const eventsForPod = eventsPerVideoPerPod[toPodId] - - Object.keys(eventsForPod).forEach(remoteId => { - const eventsForVideo = eventsForPod[remoteId] - - Object.keys(eventsForVideo).forEach(eventType => { - requestsToMakeGrouped[toPodId].datas.push({ - data: { - remoteId, - eventType, - count: eventsForVideo[eventType] - } - }) - }) - }) - }) - - return requestsToMakeGrouped - } - - // { type, videoId, count?, transaction? } - createRequest (options, callback) { - const type = options.type - const videoId = options.videoId - const transaction = options.transaction - let count = options.count - - if (count === undefined) count = 1 - - const dbRequestOptions = {} - if (transaction) dbRequestOptions.transaction = transaction - - const createQuery = { - type, - count, - videoId - } - - return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) - } -} diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts new file mode 100644 index 000000000..6e5306c7d --- /dev/null +++ b/server/lib/request/request-video-event-scheduler.ts @@ -0,0 +1,116 @@ +const db = require('../../initializers/database') +import { BaseRequestScheduler } from './base-request-scheduler' +import { + REQUESTS_VIDEO_EVENT_LIMIT_PODS, + REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, + REQUEST_VIDEO_EVENT_ENDPOINT +} from '../../initializers' + +class RequestVideoEventScheduler extends BaseRequestScheduler { + constructor () { + super() + + // We limit the size of the requests + this.limitPods = REQUESTS_VIDEO_EVENT_LIMIT_PODS + this.limitPerPod = REQUESTS_VIDEO_EVENT_LIMIT_PER_POD + + this.description = 'video event requests' + } + + getRequestModel () { + return db.RequestVideoEvent + } + + getRequestToPodModel () { + return db.RequestVideoEvent + } + + buildRequestObjects (eventsToProcess) { + const requestsToMakeGrouped = {} + + /* Example: + { + pod1: { + video1: { views: 4, likes: 5 }, + video2: { likes: 5 } + } + } + */ + const eventsPerVideoPerPod = {} + + // We group video events per video and per pod + // We add the counts of the same event types + Object.keys(eventsToProcess).forEach(toPodId => { + eventsToProcess[toPodId].forEach(eventToProcess => { + if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} + + if (!requestsToMakeGrouped[toPodId]) { + requestsToMakeGrouped[toPodId] = { + toPod: eventToProcess.pod, + endpoint: REQUEST_VIDEO_EVENT_ENDPOINT, + ids: [], // request ids, to delete them from the DB in the future + datas: [] // requests data + } + } + requestsToMakeGrouped[toPodId].ids.push(eventToProcess.id) + + const eventsPerVideo = eventsPerVideoPerPod[toPodId] + const remoteId = eventToProcess.video.remoteId + if (!eventsPerVideo[remoteId]) eventsPerVideo[remoteId] = {} + + const events = eventsPerVideo[remoteId] + if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 + + events[eventToProcess.type] += eventToProcess.count + }) + }) + + // Now we build our requests array per pod + Object.keys(eventsPerVideoPerPod).forEach(toPodId => { + const eventsForPod = eventsPerVideoPerPod[toPodId] + + Object.keys(eventsForPod).forEach(remoteId => { + const eventsForVideo = eventsForPod[remoteId] + + Object.keys(eventsForVideo).forEach(eventType => { + requestsToMakeGrouped[toPodId].datas.push({ + data: { + remoteId, + eventType, + count: eventsForVideo[eventType] + } + }) + }) + }) + }) + + return requestsToMakeGrouped + } + + // { type, videoId, count?, transaction? } + createRequest (options, callback) { + const type = options.type + const videoId = options.videoId + const transaction = options.transaction + let count = options.count + + if (count === undefined) count = 1 + + const dbRequestOptions: { transaction?: any } = {} + if (transaction) dbRequestOptions.transaction = transaction + + const createQuery = { + type, + count, + videoId + } + + return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) + } +} + +// --------------------------------------------------------------------------- + +export { + RequestVideoEventScheduler +} diff --git a/server/lib/request/request-video-qadu-scheduler.js b/server/lib/request/request-video-qadu-scheduler.js deleted file mode 100644 index 17402b556..000000000 --- a/server/lib/request/request-video-qadu-scheduler.js +++ /dev/null @@ -1,117 +0,0 @@ -'use strict' - -const BaseRequestScheduler = require('./base-request-scheduler') -const constants = require('../../initializers/constants') -const db = require('../../initializers/database') -const logger = require('../../helpers/logger') - -module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler { - constructor () { - super() - - // We limit the size of the requests - this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS - this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PER_POD - - this.description = 'video QADU requests' - } - - getRequestModel () { - return db.RequestVideoQadu - } - - getRequestToPodModel () { - return db.RequestVideoQadu - } - - buildRequestObjects (requests) { - const requestsToMakeGrouped = {} - - Object.keys(requests).forEach(toPodId => { - requests[toPodId].forEach(data => { - const request = data.request - const video = data.video - const pod = data.pod - const hashKey = toPodId - - if (!requestsToMakeGrouped[hashKey]) { - requestsToMakeGrouped[hashKey] = { - toPod: pod, - endpoint: constants.REQUEST_VIDEO_QADU_ENDPOINT, - ids: [], // request ids, to delete them from the DB in the future - datas: [], // requests data - videos: {} - } - } - - // Maybe another attribute was filled for this video - let videoData = requestsToMakeGrouped[hashKey].videos[video.id] - if (!videoData) videoData = {} - - switch (request.type) { - case constants.REQUEST_VIDEO_QADU_TYPES.LIKES: - videoData.likes = video.likes - break - - case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES: - videoData.dislikes = video.dislikes - break - - case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS: - videoData.views = video.views - break - - default: - logger.error('Unknown request video QADU type %s.', request.type) - return - } - - // Do not forget the remoteId so the remote pod can identify the video - videoData.remoteId = video.id - requestsToMakeGrouped[hashKey].ids.push(request.id) - - // Maybe there are multiple quick and dirty update for the same video - // We use this hashmap to dedupe them - requestsToMakeGrouped[hashKey].videos[video.id] = videoData - }) - }) - - // Now we deduped similar quick and dirty updates, we can build our requests datas - Object.keys(requestsToMakeGrouped).forEach(hashKey => { - Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => { - const videoData = requestsToMakeGrouped[hashKey].videos[videoId] - - requestsToMakeGrouped[hashKey].datas.push({ - data: videoData - }) - }) - - // We don't need it anymore, it was just to build our datas array - delete requestsToMakeGrouped[hashKey].videos - }) - - return requestsToMakeGrouped - } - - // { type, videoId, transaction? } - createRequest (options, callback) { - const type = options.type - const videoId = options.videoId - const transaction = options.transaction - - const dbRequestOptions = {} - if (transaction) dbRequestOptions.transaction = transaction - - // Send the update to all our friends - db.Pod.listAllIds(options.transaction, function (err, podIds) { - if (err) return callback(err) - - const queries = [] - podIds.forEach(podId => { - queries.push({ type, videoId, podId }) - }) - - return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) - }) - } -} diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts new file mode 100644 index 000000000..d81822723 --- /dev/null +++ b/server/lib/request/request-video-qadu-scheduler.ts @@ -0,0 +1,126 @@ +const db = require('../../initializers/database') +import { BaseRequestScheduler } from './base-request-scheduler' +import { logger } from '../../helpers' +import { + REQUESTS_VIDEO_QADU_LIMIT_PODS, + REQUESTS_VIDEO_QADU_LIMIT_PER_POD, + REQUEST_VIDEO_QADU_ENDPOINT, + REQUEST_VIDEO_QADU_TYPES +} from '../../initializers' + +class RequestVideoQaduScheduler extends BaseRequestScheduler { + constructor () { + super() + + // We limit the size of the requests + this.limitPods = REQUESTS_VIDEO_QADU_LIMIT_PODS + this.limitPerPod = REQUESTS_VIDEO_QADU_LIMIT_PER_POD + + this.description = 'video QADU requests' + } + + getRequestModel () { + return db.RequestVideoQadu + } + + getRequestToPodModel () { + return db.RequestVideoQadu + } + + buildRequestObjects (requests) { + const requestsToMakeGrouped = {} + + Object.keys(requests).forEach(toPodId => { + requests[toPodId].forEach(data => { + const request = data.request + const video = data.video + const pod = data.pod + const hashKey = toPodId + + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPod: pod, + endpoint: REQUEST_VIDEO_QADU_ENDPOINT, + ids: [], // request ids, to delete them from the DB in the future + datas: [], // requests data + videos: {} + } + } + + // Maybe another attribute was filled for this video + let videoData = requestsToMakeGrouped[hashKey].videos[video.id] + if (!videoData) videoData = {} + + switch (request.type) { + case REQUEST_VIDEO_QADU_TYPES.LIKES: + videoData.likes = video.likes + break + + case REQUEST_VIDEO_QADU_TYPES.DISLIKES: + videoData.dislikes = video.dislikes + break + + case REQUEST_VIDEO_QADU_TYPES.VIEWS: + videoData.views = video.views + break + + default: + logger.error('Unknown request video QADU type %s.', request.type) + return + } + + // Do not forget the remoteId so the remote pod can identify the video + videoData.remoteId = video.id + requestsToMakeGrouped[hashKey].ids.push(request.id) + + // Maybe there are multiple quick and dirty update for the same video + // We use this hashmap to dedupe them + requestsToMakeGrouped[hashKey].videos[video.id] = videoData + }) + }) + + // Now we deduped similar quick and dirty updates, we can build our requests datas + Object.keys(requestsToMakeGrouped).forEach(hashKey => { + Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => { + const videoData = requestsToMakeGrouped[hashKey].videos[videoId] + + requestsToMakeGrouped[hashKey].datas.push({ + data: videoData + }) + }) + + // We don't need it anymore, it was just to build our datas array + delete requestsToMakeGrouped[hashKey].videos + }) + + return requestsToMakeGrouped + } + + // { type, videoId, transaction? } + createRequest (options, callback) { + const type = options.type + const videoId = options.videoId + const transaction = options.transaction + + const dbRequestOptions: { transaction?: any } = {} + if (transaction) dbRequestOptions.transaction = transaction + + // Send the update to all our friends + db.Pod.listAllIds(options.transaction, function (err, podIds) { + if (err) return callback(err) + + const queries = [] + podIds.forEach(podId => { + queries.push({ type, videoId, podId }) + }) + + return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) + }) + } +} + +// --------------------------------------------------------------------------- + +export { + RequestVideoQaduScheduler +} -- cgit v1.2.3