aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/request
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/request')
-rw-r--r--server/lib/request/base-request-scheduler.js136
-rw-r--r--server/lib/request/request-scheduler.js97
-rw-r--r--server/lib/request/request-video-event-scheduler.js108
-rw-r--r--server/lib/request/request-video-qadu-scheduler.js117
4 files changed, 458 insertions, 0 deletions
diff --git a/server/lib/request/base-request-scheduler.js b/server/lib/request/base-request-scheduler.js
new file mode 100644
index 000000000..782448340
--- /dev/null
+++ b/server/lib/request/base-request-scheduler.js
@@ -0,0 +1,136 @@
1'use strict'
2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../../initializers/constants')
6const db = require('../../initializers/database')
7const logger = require('../../helpers/logger')
8const requests = require('../../helpers/requests')
9
10module.exports = class BaseRequestScheduler {
11 constructor (options) {
12 this.lastRequestTimestamp = 0
13 this.timer = null
14 this.requestInterval = constants.REQUESTS_INTERVAL
15 }
16
17 activate () {
18 logger.info('Requests scheduler activated.')
19 this.lastRequestTimestamp = Date.now()
20
21 this.timer = setInterval(() => {
22 this.lastRequestTimestamp = Date.now()
23 this.makeRequests()
24 }, this.requestInterval)
25 }
26
27 deactivate () {
28 logger.info('Requests scheduler deactivated.')
29 clearInterval(this.timer)
30 this.timer = null
31 }
32
33 forceSend () {
34 logger.info('Force requests scheduler sending.')
35 this.makeRequests()
36 }
37
38 remainingMilliSeconds () {
39 if (this.timer === null) return -1
40
41 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
42 }
43
44 remainingRequestsCount (callback) {
45 return this.getRequestModel().countTotalRequests(callback)
46 }
47
48 // ---------------------------------------------------------------------------
49
50 // Make a requests to friends of a certain type
51 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
52 if (!callback) callback = function () {}
53
54 const params = {
55 toPod: toPod,
56 sign: true, // Prove our identity
57 method: 'POST',
58 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
59 data: requestsToMake // Requests we need to make
60 }
61
62 // Make multiple retry requests to all of pods
63 // The function fire some useful callbacks
64 requests.makeSecureRequest(params, (err, res) => {
65 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
66 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
67 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
68
69 return callback(err)
70 }
71
72 return callback(null)
73 })
74 }
75
76 // Make all the requests of the scheduler
77 makeRequests () {
78 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
79 if (err) {
80 logger.error('Cannot get the list of "%s".', this.description, { err: err })
81 return // Abort
82 }
83
84 // If there are no requests, abort
85 if (requests.length === 0) {
86 logger.info('No "%s" to make.', this.description)
87 return
88 }
89
90 // We want to group requests by destinations pod and endpoint
91 const requestsToMakeGrouped = this.buildRequestObjects(requests)
92
93 logger.info('Making "%s" to friends.', this.description)
94
95 const goodPods = []
96 const badPods = []
97
98 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
99 const requestToMake = requestsToMakeGrouped[hashKey]
100 const toPod = requestToMake.toPod
101
102 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => {
103 if (err) {
104 badPods.push(requestToMake.toPod.id)
105 return callbackEach()
106 }
107
108 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
109 goodPods.push(requestToMake.toPod.id)
110
111 // Remove the pod id of these request ids
112 this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
113
114 this.afterRequestHook()
115 })
116 }, () => {
117 // All the requests were made, we update the pods score
118 db.Pod.updatePodsScore(goodPods, badPods)
119
120 this.afterRequestsHook()
121 })
122 })
123 }
124
125 flush (callback) {
126 this.getRequestModel().removeAll(callback)
127 }
128
129 afterRequestHook () {
130 // Nothing to do, let children reimplement it
131 }
132
133 afterRequestsHook () {
134 // Nothing to do, let children reimplement it
135 }
136}
diff --git a/server/lib/request/request-scheduler.js b/server/lib/request/request-scheduler.js
new file mode 100644
index 000000000..555ec3e54
--- /dev/null
+++ b/server/lib/request/request-scheduler.js
@@ -0,0 +1,97 @@
1'use strict'
2
3const constants = require('../../initializers/constants')
4const BaseRequestScheduler = require('./base-request-scheduler')
5const db = require('../../initializers/database')
6const logger = require('../../helpers/logger')
7
8module.exports = class RequestScheduler extends BaseRequestScheduler {
9 constructor () {
10 super()
11
12 // We limit the size of the requests
13 this.limitPods = constants.REQUESTS_LIMIT_PODS
14 this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD
15
16 this.description = 'requests'
17 }
18
19 getRequestModel () {
20 return db.Request
21 }
22
23 getRequestToPodModel () {
24 return db.RequestToPod
25 }
26
27 buildRequestObjects (requests) {
28 const requestsToMakeGrouped = {}
29
30 Object.keys(requests).forEach(toPodId => {
31 requests[toPodId].forEach(data => {
32 const request = data.request
33 const pod = data.pod
34 const hashKey = toPodId + request.endpoint
35
36 if (!requestsToMakeGrouped[hashKey]) {
37 requestsToMakeGrouped[hashKey] = {
38 toPod: pod,
39 endpoint: request.endpoint,
40 ids: [], // request ids, to delete them from the DB in the future
41 datas: [] // requests data,
42 }
43 }
44
45 requestsToMakeGrouped[hashKey].ids.push(request.id)
46 requestsToMakeGrouped[hashKey].datas.push(request.request)
47 })
48 })
49
50 return requestsToMakeGrouped
51 }
52
53 // { type, endpoint, data, toIds, transaction }
54 createRequest (options, callback) {
55 const type = options.type
56 const endpoint = options.endpoint
57 const data = options.data
58 const toIds = options.toIds
59 const transaction = options.transaction
60
61 const pods = []
62
63 // If there are no destination pods abort
64 if (toIds.length === 0) return callback(null)
65
66 toIds.forEach(toPod => {
67 pods.push(db.Pod.build({ id: toPod }))
68 })
69
70 const createQuery = {
71 endpoint,
72 request: {
73 type: type,
74 data: data
75 }
76 }
77
78 const dbRequestOptions = {
79 transaction
80 }
81
82 return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => {
83 if (err) return callback(err)
84
85 return request.setPods(pods, dbRequestOptions).asCallback(callback)
86 })
87 }
88
89 // ---------------------------------------------------------------------------
90
91 afterRequestsHook () {
92 // Flush requests with no pod
93 this.getRequestModel().removeWithEmptyTo(err => {
94 if (err) logger.error('Error when removing requests with no pods.', { error: err })
95 })
96 }
97}
diff --git a/server/lib/request/request-video-event-scheduler.js b/server/lib/request/request-video-event-scheduler.js
new file mode 100644
index 000000000..e54d34f4a
--- /dev/null
+++ b/server/lib/request/request-video-event-scheduler.js
@@ -0,0 +1,108 @@
1'use strict'
2
3const BaseRequestScheduler = require('./base-request-scheduler')
4const constants = require('../../initializers/constants')
5const db = require('../../initializers/database')
6
7module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler {
8 constructor () {
9 super()
10
11 // We limit the size of the requests
12 this.limitPods = constants.REQUESTS_VIDEO_EVENT_LIMIT_PODS
13 this.limitPerPod = constants.REQUESTS_VIDEO_EVENT_LIMIT_PER_POD
14
15 this.description = 'video event requests'
16 }
17
18 getRequestModel () {
19 return db.RequestVideoEvent
20 }
21
22 getRequestToPodModel () {
23 return db.RequestVideoEvent
24 }
25
26 buildRequestObjects (eventsToProcess) {
27 const requestsToMakeGrouped = {}
28
29 /* Example:
30 {
31 pod1: {
32 video1: { views: 4, likes: 5 },
33 video2: { likes: 5 }
34 }
35 }
36 */
37 const eventsPerVideoPerPod = {}
38
39 // We group video events per video and per pod
40 // We add the counts of the same event types
41 Object.keys(eventsToProcess).forEach(toPodId => {
42 eventsToProcess[toPodId].forEach(eventToProcess => {
43 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
44
45 if (!requestsToMakeGrouped[toPodId]) {
46 requestsToMakeGrouped[toPodId] = {
47 toPod: eventToProcess.pod,
48 endpoint: constants.REQUEST_VIDEO_EVENT_ENDPOINT,
49 ids: [], // request ids, to delete them from the DB in the future
50 datas: [] // requests data
51 }
52 }
53 requestsToMakeGrouped[toPodId].ids.push(eventToProcess.id)
54
55 const eventsPerVideo = eventsPerVideoPerPod[toPodId]
56 const remoteId = eventToProcess.video.remoteId
57 if (!eventsPerVideo[remoteId]) eventsPerVideo[remoteId] = {}
58
59 const events = eventsPerVideo[remoteId]
60 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0
61
62 events[eventToProcess.type] += eventToProcess.count
63 })
64 })
65
66 // Now we build our requests array per pod
67 Object.keys(eventsPerVideoPerPod).forEach(toPodId => {
68 const eventsForPod = eventsPerVideoPerPod[toPodId]
69
70 Object.keys(eventsForPod).forEach(remoteId => {
71 const eventsForVideo = eventsForPod[remoteId]
72
73 Object.keys(eventsForVideo).forEach(eventType => {
74 requestsToMakeGrouped[toPodId].datas.push({
75 data: {
76 remoteId,
77 eventType,
78 count: eventsForVideo[eventType]
79 }
80 })
81 })
82 })
83 })
84
85 return requestsToMakeGrouped
86 }
87
88 // { type, videoId, count?, transaction? }
89 createRequest (options, callback) {
90 const type = options.type
91 const videoId = options.videoId
92 const transaction = options.transaction
93 let count = options.count
94
95 if (count === undefined) count = 1
96
97 const dbRequestOptions = {}
98 if (transaction) dbRequestOptions.transaction = transaction
99
100 const createQuery = {
101 type,
102 count,
103 videoId
104 }
105
106 return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback)
107 }
108}
diff --git a/server/lib/request/request-video-qadu-scheduler.js b/server/lib/request/request-video-qadu-scheduler.js
new file mode 100644
index 000000000..17402b556
--- /dev/null
+++ b/server/lib/request/request-video-qadu-scheduler.js
@@ -0,0 +1,117 @@
1'use strict'
2
3const BaseRequestScheduler = require('./base-request-scheduler')
4const constants = require('../../initializers/constants')
5const db = require('../../initializers/database')
6const logger = require('../../helpers/logger')
7
8module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
9 constructor () {
10 super()
11
12 // We limit the size of the requests
13 this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
14 this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PER_POD
15
16 this.description = 'video QADU requests'
17 }
18
19 getRequestModel () {
20 return db.RequestVideoQadu
21 }
22
23 getRequestToPodModel () {
24 return db.RequestVideoQadu
25 }
26
27 buildRequestObjects (requests) {
28 const requestsToMakeGrouped = {}
29
30 Object.keys(requests).forEach(toPodId => {
31 requests[toPodId].forEach(data => {
32 const request = data.request
33 const video = data.video
34 const pod = data.pod
35 const hashKey = toPodId
36
37 if (!requestsToMakeGrouped[hashKey]) {
38 requestsToMakeGrouped[hashKey] = {
39 toPod: pod,
40 endpoint: constants.REQUEST_VIDEO_QADU_ENDPOINT,
41 ids: [], // request ids, to delete them from the DB in the future
42 datas: [], // requests data
43 videos: {}
44 }
45 }
46
47 // Maybe another attribute was filled for this video
48 let videoData = requestsToMakeGrouped[hashKey].videos[video.id]
49 if (!videoData) videoData = {}
50
51 switch (request.type) {
52 case constants.REQUEST_VIDEO_QADU_TYPES.LIKES:
53 videoData.likes = video.likes
54 break
55
56 case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES:
57 videoData.dislikes = video.dislikes
58 break
59
60 case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS:
61 videoData.views = video.views
62 break
63
64 default:
65 logger.error('Unknown request video QADU type %s.', request.type)
66 return
67 }
68
69 // Do not forget the remoteId so the remote pod can identify the video
70 videoData.remoteId = video.id
71 requestsToMakeGrouped[hashKey].ids.push(request.id)
72
73 // Maybe there are multiple quick and dirty update for the same video
74 // We use this hashmap to dedupe them
75 requestsToMakeGrouped[hashKey].videos[video.id] = videoData
76 })
77 })
78
79 // Now we deduped similar quick and dirty updates, we can build our requests datas
80 Object.keys(requestsToMakeGrouped).forEach(hashKey => {
81 Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => {
82 const videoData = requestsToMakeGrouped[hashKey].videos[videoId]
83
84 requestsToMakeGrouped[hashKey].datas.push({
85 data: videoData
86 })
87 })
88
89 // We don't need it anymore, it was just to build our datas array
90 delete requestsToMakeGrouped[hashKey].videos
91 })
92
93 return requestsToMakeGrouped
94 }
95
96 // { type, videoId, transaction? }
97 createRequest (options, callback) {
98 const type = options.type
99 const videoId = options.videoId
100 const transaction = options.transaction
101
102 const dbRequestOptions = {}
103 if (transaction) dbRequestOptions.transaction = transaction
104
105 // Send the update to all our friends
106 db.Pod.listAllIds(options.transaction, function (err, podIds) {
107 if (err) return callback(err)
108
109 const queries = []
110 podIds.forEach(podId => {
111 queries.push({ type, videoId, podId })
112 })
113
114 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback)
115 })
116 }
117}