diff options
Diffstat (limited to 'server/lib/request')
-rw-r--r-- | server/lib/request/base-request-scheduler.js | 136 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.js | 97 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.js | 108 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.js | 117 |
4 files changed, 458 insertions, 0 deletions
diff --git a/server/lib/request/base-request-scheduler.js b/server/lib/request/base-request-scheduler.js new file mode 100644 index 000000000..782448340 --- /dev/null +++ b/server/lib/request/base-request-scheduler.js | |||
@@ -0,0 +1,136 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../../initializers/constants') | ||
6 | const db = require('../../initializers/database') | ||
7 | const logger = require('../../helpers/logger') | ||
8 | const requests = require('../../helpers/requests') | ||
9 | |||
10 | module.exports = class BaseRequestScheduler { | ||
11 | constructor (options) { | ||
12 | this.lastRequestTimestamp = 0 | ||
13 | this.timer = null | ||
14 | this.requestInterval = constants.REQUESTS_INTERVAL | ||
15 | } | ||
16 | |||
17 | activate () { | ||
18 | logger.info('Requests scheduler activated.') | ||
19 | this.lastRequestTimestamp = Date.now() | ||
20 | |||
21 | this.timer = setInterval(() => { | ||
22 | this.lastRequestTimestamp = Date.now() | ||
23 | this.makeRequests() | ||
24 | }, this.requestInterval) | ||
25 | } | ||
26 | |||
27 | deactivate () { | ||
28 | logger.info('Requests scheduler deactivated.') | ||
29 | clearInterval(this.timer) | ||
30 | this.timer = null | ||
31 | } | ||
32 | |||
33 | forceSend () { | ||
34 | logger.info('Force requests scheduler sending.') | ||
35 | this.makeRequests() | ||
36 | } | ||
37 | |||
38 | remainingMilliSeconds () { | ||
39 | if (this.timer === null) return -1 | ||
40 | |||
41 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
42 | } | ||
43 | |||
44 | remainingRequestsCount (callback) { | ||
45 | return this.getRequestModel().countTotalRequests(callback) | ||
46 | } | ||
47 | |||
48 | // --------------------------------------------------------------------------- | ||
49 | |||
50 | // Make a requests to friends of a certain type | ||
51 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
52 | if (!callback) callback = function () {} | ||
53 | |||
54 | const params = { | ||
55 | toPod: toPod, | ||
56 | sign: true, // Prove our identity | ||
57 | method: 'POST', | ||
58 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
59 | data: requestsToMake // Requests we need to make | ||
60 | } | ||
61 | |||
62 | // Make multiple retry requests to all of pods | ||
63 | // The function fire some useful callbacks | ||
64 | requests.makeSecureRequest(params, (err, res) => { | ||
65 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
66 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
67 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
68 | |||
69 | return callback(err) | ||
70 | } | ||
71 | |||
72 | return callback(null) | ||
73 | }) | ||
74 | } | ||
75 | |||
76 | // Make all the requests of the scheduler | ||
77 | makeRequests () { | ||
78 | this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { | ||
79 | if (err) { | ||
80 | logger.error('Cannot get the list of "%s".', this.description, { err: err }) | ||
81 | return // Abort | ||
82 | } | ||
83 | |||
84 | // If there are no requests, abort | ||
85 | if (requests.length === 0) { | ||
86 | logger.info('No "%s" to make.', this.description) | ||
87 | return | ||
88 | } | ||
89 | |||
90 | // We want to group requests by destinations pod and endpoint | ||
91 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
92 | |||
93 | logger.info('Making "%s" to friends.', this.description) | ||
94 | |||
95 | const goodPods = [] | ||
96 | const badPods = [] | ||
97 | |||
98 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
99 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
100 | const toPod = requestToMake.toPod | ||
101 | |||
102 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { | ||
103 | if (err) { | ||
104 | badPods.push(requestToMake.toPod.id) | ||
105 | return callbackEach() | ||
106 | } | ||
107 | |||
108 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
109 | goodPods.push(requestToMake.toPod.id) | ||
110 | |||
111 | // Remove the pod id of these request ids | ||
112 | this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
113 | |||
114 | this.afterRequestHook() | ||
115 | }) | ||
116 | }, () => { | ||
117 | // All the requests were made, we update the pods score | ||
118 | db.Pod.updatePodsScore(goodPods, badPods) | ||
119 | |||
120 | this.afterRequestsHook() | ||
121 | }) | ||
122 | }) | ||
123 | } | ||
124 | |||
125 | flush (callback) { | ||
126 | this.getRequestModel().removeAll(callback) | ||
127 | } | ||
128 | |||
129 | afterRequestHook () { | ||
130 | // Nothing to do, let children reimplement it | ||
131 | } | ||
132 | |||
133 | afterRequestsHook () { | ||
134 | // Nothing to do, let children reimplement it | ||
135 | } | ||
136 | } | ||
diff --git a/server/lib/request/request-scheduler.js b/server/lib/request/request-scheduler.js new file mode 100644 index 000000000..555ec3e54 --- /dev/null +++ b/server/lib/request/request-scheduler.js | |||
@@ -0,0 +1,97 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const constants = require('../../initializers/constants') | ||
4 | const BaseRequestScheduler = require('./base-request-scheduler') | ||
5 | const db = require('../../initializers/database') | ||
6 | const logger = require('../../helpers/logger') | ||
7 | |||
8 | module.exports = class RequestScheduler extends BaseRequestScheduler { | ||
9 | constructor () { | ||
10 | super() | ||
11 | |||
12 | // We limit the size of the requests | ||
13 | this.limitPods = constants.REQUESTS_LIMIT_PODS | ||
14 | this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD | ||
15 | |||
16 | this.description = 'requests' | ||
17 | } | ||
18 | |||
19 | getRequestModel () { | ||
20 | return db.Request | ||
21 | } | ||
22 | |||
23 | getRequestToPodModel () { | ||
24 | return db.RequestToPod | ||
25 | } | ||
26 | |||
27 | buildRequestObjects (requests) { | ||
28 | const requestsToMakeGrouped = {} | ||
29 | |||
30 | Object.keys(requests).forEach(toPodId => { | ||
31 | requests[toPodId].forEach(data => { | ||
32 | const request = data.request | ||
33 | const pod = data.pod | ||
34 | const hashKey = toPodId + request.endpoint | ||
35 | |||
36 | if (!requestsToMakeGrouped[hashKey]) { | ||
37 | requestsToMakeGrouped[hashKey] = { | ||
38 | toPod: pod, | ||
39 | endpoint: request.endpoint, | ||
40 | ids: [], // request ids, to delete them from the DB in the future | ||
41 | datas: [] // requests data, | ||
42 | } | ||
43 | } | ||
44 | |||
45 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
46 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
47 | }) | ||
48 | }) | ||
49 | |||
50 | return requestsToMakeGrouped | ||
51 | } | ||
52 | |||
53 | // { type, endpoint, data, toIds, transaction } | ||
54 | createRequest (options, callback) { | ||
55 | const type = options.type | ||
56 | const endpoint = options.endpoint | ||
57 | const data = options.data | ||
58 | const toIds = options.toIds | ||
59 | const transaction = options.transaction | ||
60 | |||
61 | const pods = [] | ||
62 | |||
63 | // If there are no destination pods abort | ||
64 | if (toIds.length === 0) return callback(null) | ||
65 | |||
66 | toIds.forEach(toPod => { | ||
67 | pods.push(db.Pod.build({ id: toPod })) | ||
68 | }) | ||
69 | |||
70 | const createQuery = { | ||
71 | endpoint, | ||
72 | request: { | ||
73 | type: type, | ||
74 | data: data | ||
75 | } | ||
76 | } | ||
77 | |||
78 | const dbRequestOptions = { | ||
79 | transaction | ||
80 | } | ||
81 | |||
82 | return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { | ||
83 | if (err) return callback(err) | ||
84 | |||
85 | return request.setPods(pods, dbRequestOptions).asCallback(callback) | ||
86 | }) | ||
87 | } | ||
88 | |||
89 | // --------------------------------------------------------------------------- | ||
90 | |||
91 | afterRequestsHook () { | ||
92 | // Flush requests with no pod | ||
93 | this.getRequestModel().removeWithEmptyTo(err => { | ||
94 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
95 | }) | ||
96 | } | ||
97 | } | ||
diff --git a/server/lib/request/request-video-event-scheduler.js b/server/lib/request/request-video-event-scheduler.js new file mode 100644 index 000000000..e54d34f4a --- /dev/null +++ b/server/lib/request/request-video-event-scheduler.js | |||
@@ -0,0 +1,108 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const BaseRequestScheduler = require('./base-request-scheduler') | ||
4 | const constants = require('../../initializers/constants') | ||
5 | const db = require('../../initializers/database') | ||
6 | |||
7 | module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler { | ||
8 | constructor () { | ||
9 | super() | ||
10 | |||
11 | // We limit the size of the requests | ||
12 | this.limitPods = constants.REQUESTS_VIDEO_EVENT_LIMIT_PODS | ||
13 | this.limitPerPod = constants.REQUESTS_VIDEO_EVENT_LIMIT_PER_POD | ||
14 | |||
15 | this.description = 'video event requests' | ||
16 | } | ||
17 | |||
18 | getRequestModel () { | ||
19 | return db.RequestVideoEvent | ||
20 | } | ||
21 | |||
22 | getRequestToPodModel () { | ||
23 | return db.RequestVideoEvent | ||
24 | } | ||
25 | |||
26 | buildRequestObjects (eventsToProcess) { | ||
27 | const requestsToMakeGrouped = {} | ||
28 | |||
29 | /* Example: | ||
30 | { | ||
31 | pod1: { | ||
32 | video1: { views: 4, likes: 5 }, | ||
33 | video2: { likes: 5 } | ||
34 | } | ||
35 | } | ||
36 | */ | ||
37 | const eventsPerVideoPerPod = {} | ||
38 | |||
39 | // We group video events per video and per pod | ||
40 | // We add the counts of the same event types | ||
41 | Object.keys(eventsToProcess).forEach(toPodId => { | ||
42 | eventsToProcess[toPodId].forEach(eventToProcess => { | ||
43 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | ||
44 | |||
45 | if (!requestsToMakeGrouped[toPodId]) { | ||
46 | requestsToMakeGrouped[toPodId] = { | ||
47 | toPod: eventToProcess.pod, | ||
48 | endpoint: constants.REQUEST_VIDEO_EVENT_ENDPOINT, | ||
49 | ids: [], // request ids, to delete them from the DB in the future | ||
50 | datas: [] // requests data | ||
51 | } | ||
52 | } | ||
53 | requestsToMakeGrouped[toPodId].ids.push(eventToProcess.id) | ||
54 | |||
55 | const eventsPerVideo = eventsPerVideoPerPod[toPodId] | ||
56 | const remoteId = eventToProcess.video.remoteId | ||
57 | if (!eventsPerVideo[remoteId]) eventsPerVideo[remoteId] = {} | ||
58 | |||
59 | const events = eventsPerVideo[remoteId] | ||
60 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 | ||
61 | |||
62 | events[eventToProcess.type] += eventToProcess.count | ||
63 | }) | ||
64 | }) | ||
65 | |||
66 | // Now we build our requests array per pod | ||
67 | Object.keys(eventsPerVideoPerPod).forEach(toPodId => { | ||
68 | const eventsForPod = eventsPerVideoPerPod[toPodId] | ||
69 | |||
70 | Object.keys(eventsForPod).forEach(remoteId => { | ||
71 | const eventsForVideo = eventsForPod[remoteId] | ||
72 | |||
73 | Object.keys(eventsForVideo).forEach(eventType => { | ||
74 | requestsToMakeGrouped[toPodId].datas.push({ | ||
75 | data: { | ||
76 | remoteId, | ||
77 | eventType, | ||
78 | count: eventsForVideo[eventType] | ||
79 | } | ||
80 | }) | ||
81 | }) | ||
82 | }) | ||
83 | }) | ||
84 | |||
85 | return requestsToMakeGrouped | ||
86 | } | ||
87 | |||
88 | // { type, videoId, count?, transaction? } | ||
89 | createRequest (options, callback) { | ||
90 | const type = options.type | ||
91 | const videoId = options.videoId | ||
92 | const transaction = options.transaction | ||
93 | let count = options.count | ||
94 | |||
95 | if (count === undefined) count = 1 | ||
96 | |||
97 | const dbRequestOptions = {} | ||
98 | if (transaction) dbRequestOptions.transaction = transaction | ||
99 | |||
100 | const createQuery = { | ||
101 | type, | ||
102 | count, | ||
103 | videoId | ||
104 | } | ||
105 | |||
106 | return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) | ||
107 | } | ||
108 | } | ||
diff --git a/server/lib/request/request-video-qadu-scheduler.js b/server/lib/request/request-video-qadu-scheduler.js new file mode 100644 index 000000000..17402b556 --- /dev/null +++ b/server/lib/request/request-video-qadu-scheduler.js | |||
@@ -0,0 +1,117 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const BaseRequestScheduler = require('./base-request-scheduler') | ||
4 | const constants = require('../../initializers/constants') | ||
5 | const db = require('../../initializers/database') | ||
6 | const logger = require('../../helpers/logger') | ||
7 | |||
8 | module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler { | ||
9 | constructor () { | ||
10 | super() | ||
11 | |||
12 | // We limit the size of the requests | ||
13 | this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS | ||
14 | this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PER_POD | ||
15 | |||
16 | this.description = 'video QADU requests' | ||
17 | } | ||
18 | |||
19 | getRequestModel () { | ||
20 | return db.RequestVideoQadu | ||
21 | } | ||
22 | |||
23 | getRequestToPodModel () { | ||
24 | return db.RequestVideoQadu | ||
25 | } | ||
26 | |||
27 | buildRequestObjects (requests) { | ||
28 | const requestsToMakeGrouped = {} | ||
29 | |||
30 | Object.keys(requests).forEach(toPodId => { | ||
31 | requests[toPodId].forEach(data => { | ||
32 | const request = data.request | ||
33 | const video = data.video | ||
34 | const pod = data.pod | ||
35 | const hashKey = toPodId | ||
36 | |||
37 | if (!requestsToMakeGrouped[hashKey]) { | ||
38 | requestsToMakeGrouped[hashKey] = { | ||
39 | toPod: pod, | ||
40 | endpoint: constants.REQUEST_VIDEO_QADU_ENDPOINT, | ||
41 | ids: [], // request ids, to delete them from the DB in the future | ||
42 | datas: [], // requests data | ||
43 | videos: {} | ||
44 | } | ||
45 | } | ||
46 | |||
47 | // Maybe another attribute was filled for this video | ||
48 | let videoData = requestsToMakeGrouped[hashKey].videos[video.id] | ||
49 | if (!videoData) videoData = {} | ||
50 | |||
51 | switch (request.type) { | ||
52 | case constants.REQUEST_VIDEO_QADU_TYPES.LIKES: | ||
53 | videoData.likes = video.likes | ||
54 | break | ||
55 | |||
56 | case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES: | ||
57 | videoData.dislikes = video.dislikes | ||
58 | break | ||
59 | |||
60 | case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS: | ||
61 | videoData.views = video.views | ||
62 | break | ||
63 | |||
64 | default: | ||
65 | logger.error('Unknown request video QADU type %s.', request.type) | ||
66 | return | ||
67 | } | ||
68 | |||
69 | // Do not forget the remoteId so the remote pod can identify the video | ||
70 | videoData.remoteId = video.id | ||
71 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
72 | |||
73 | // Maybe there are multiple quick and dirty update for the same video | ||
74 | // We use this hashmap to dedupe them | ||
75 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData | ||
76 | }) | ||
77 | }) | ||
78 | |||
79 | // Now we deduped similar quick and dirty updates, we can build our requests datas | ||
80 | Object.keys(requestsToMakeGrouped).forEach(hashKey => { | ||
81 | Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => { | ||
82 | const videoData = requestsToMakeGrouped[hashKey].videos[videoId] | ||
83 | |||
84 | requestsToMakeGrouped[hashKey].datas.push({ | ||
85 | data: videoData | ||
86 | }) | ||
87 | }) | ||
88 | |||
89 | // We don't need it anymore, it was just to build our datas array | ||
90 | delete requestsToMakeGrouped[hashKey].videos | ||
91 | }) | ||
92 | |||
93 | return requestsToMakeGrouped | ||
94 | } | ||
95 | |||
96 | // { type, videoId, transaction? } | ||
97 | createRequest (options, callback) { | ||
98 | const type = options.type | ||
99 | const videoId = options.videoId | ||
100 | const transaction = options.transaction | ||
101 | |||
102 | const dbRequestOptions = {} | ||
103 | if (transaction) dbRequestOptions.transaction = transaction | ||
104 | |||
105 | // Send the update to all our friends | ||
106 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | ||
107 | if (err) return callback(err) | ||
108 | |||
109 | const queries = [] | ||
110 | podIds.forEach(podId => { | ||
111 | queries.push({ type, videoId, podId }) | ||
112 | }) | ||
113 | |||
114 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) | ||
115 | }) | ||
116 | } | ||
117 | } | ||