aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/request-scheduler.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-02-18 10:29:36 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-02-18 10:29:36 +0100
commitc1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de (patch)
tree5d721432dcc25f43ebd72b12f675d4e208faf616 /server/lib/request-scheduler.js
parent361b7df2a257d1c44ec1d79128a9065b563090d8 (diff)
downloadPeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.gz
PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.tar.zst
PeerTube-c1a7ab7f04fdb1601cf1e41c4e372dbd3c81f3de.zip
Server: use a request scheduler object instance for friends
communication
Diffstat (limited to 'server/lib/request-scheduler.js')
-rw-r--r--server/lib/request-scheduler.js202
1 files changed, 202 insertions, 0 deletions
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js
new file mode 100644
index 000000000..c8bc4af28
--- /dev/null
+++ b/server/lib/request-scheduler.js
@@ -0,0 +1,202 @@
1'use strict'
2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../initializers/constants')
6const db = require('../initializers/database')
7const logger = require('../helpers/logger')
8const requests = require('../helpers/requests')
9
10module.exports = class RequestScheduler {
11
12 constructor (name) {
13 this.name = name
14
15 this.lastRequestTimestamp = 0
16 this.timer = null
17 }
18
19 activate () {
20 logger.info('Requests scheduler activated.')
21 this.lastRequestTimestamp = Date.now()
22
23 this.timer = setInterval(() => {
24 this.lastRequestTimestamp = Date.now()
25 this.makeRequests()
26 }, constants.REQUESTS_INTERVAL)
27 }
28
29 deactivate () {
30 logger.info('Requests scheduler deactivated.')
31 clearInterval(this.timer)
32 this.timer = null
33 }
34
35 forceSend () {
36 logger.info('Force requests scheduler sending.')
37 this.makeRequests()
38 }
39
40 remainingMilliSeconds () {
41 if (this.timer === null) return -1
42
43 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
44 }
45
46 // { type, endpoint, data, toIds, transaction }
47 createRequest (options, callback) {
48 const type = options.type
49 const endpoint = options.endpoint
50 const data = options.data
51 const toIds = options.toIds
52 const transaction = options.transaction
53
54 const pods = []
55
56 // If there are no destination pods abort
57 if (toIds.length === 0) return callback(null)
58
59 toIds.forEach(toPod => {
60 pods.push(db.Pod.build({ id: toPod }))
61 })
62
63 const createQuery = {
64 endpoint,
65 request: {
66 type: type,
67 data: data
68 }
69 }
70
71 const dbRequestOptions = {
72 transaction
73 }
74
75 return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => {
76 if (err) return callback(err)
77
78 return request.setPods(pods, dbRequestOptions).asCallback(callback)
79 })
80 }
81
82 // ---------------------------------------------------------------------------
83
84 // Make all the requests of the scheduler
85 makeRequests () {
86 // We limit the size of the requests
87 // We don't want to stuck with the same failing requests so we get a random list
88 db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
89 if (err) {
90 logger.error('Cannot get the list of requests.', { err: err })
91 return // Abort
92 }
93
94 // If there are no requests, abort
95 if (requests.length === 0) {
96 logger.info('No requests to make.')
97 return
98 }
99
100 // We want to group requests by destinations pod and endpoint
101 const requestsToMakeGrouped = this.buildRequestObjects(requests)
102
103 logger.info('Making requests to friends.')
104
105 const goodPods = []
106 const badPods = []
107
108 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
109 const requestToMake = requestsToMakeGrouped[hashKey]
110 const toPod = requestToMake.toPod
111
112 // Maybe the pod is not our friend anymore so simply remove it
113 if (!toPod) {
114 const requestIdsToDelete = requestToMake.ids
115
116 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
117 return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
118 }
119
120 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
121 if (success === false) {
122 badPods.push(requestToMake.toPod.id)
123 return callbackEach()
124 }
125
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
127 goodPods.push(requestToMake.toPod.id)
128
129 // Remove the pod id of these request ids
130 db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
131 })
132 }, () => {
133 // All the requests were made, we update the pods score
134 db.Request.updatePodsScore(goodPods, badPods)
135 // Flush requests with no pod
136 db.Request.removeWithEmptyTo(err => {
137 if (err) logger.error('Error when removing requests with no pods.', { error: err })
138 })
139 })
140 })
141 }
142
143 // Make a requests to friends of a certain type
144 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
145 if (!callback) callback = function () {}
146
147 const params = {
148 toPod: toPod,
149 sign: true, // Prove our identity
150 method: 'POST',
151 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
152 data: requestsToMake // Requests we need to make
153 }
154
155 // Make multiple retry requests to all of pods
156 // The function fire some useful callbacks
157 requests.makeSecureRequest(params, (err, res) => {
158 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
159 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
160 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
161
162 return callback(false)
163 }
164
165 return callback(true)
166 })
167 }
168
169 buildRequestObjects (requests) {
170 const requestsToMakeGrouped = {}
171
172 Object.keys(requests).forEach(toPodId => {
173 requests[toPodId].forEach(data => {
174 const request = data.request
175 const pod = data.pod
176 const hashKey = toPodId + request.endpoint
177
178 if (!requestsToMakeGrouped[hashKey]) {
179 requestsToMakeGrouped[hashKey] = {
180 toPod: pod,
181 endpoint: request.endpoint,
182 ids: [], // request ids, to delete them from the DB in the future
183 datas: [] // requests data,
184 }
185 }
186
187 requestsToMakeGrouped[hashKey].ids.push(request.id)
188 requestsToMakeGrouped[hashKey].datas.push(request.request)
189 })
190 })
191
192 return requestsToMakeGrouped
193 }
194
195 flush (callback) {
196 db.Request.removeAll(err => {
197 if (err) logger.error('Cannot flush the requests.', { error: err })
198
199 return callback(err)
200 })
201 }
202}