aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/base-request-scheduler.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-02-21 21:35:59 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-02-26 20:01:26 +0100
commit9e167724f7e933f41d9ea2e1c31772bf4c560a28 (patch)
tree093cb7c1b088f35aaf847f859a313a121c8cd233 /server/lib/base-request-scheduler.js
parent0150b17e51df3e9fad8a59133d828c68f8ba672b (diff)
downloadPeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.tar.gz
PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.tar.zst
PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.zip
Server: make a basic "quick and dirty update" for videos
This system will be useful to to update some int video attributes (likes, dislikes, views...) The classic system is not used because we need some optimization for scaling
Diffstat (limited to 'server/lib/base-request-scheduler.js')
-rw-r--r--server/lib/base-request-scheduler.js140
1 files changed, 140 insertions, 0 deletions
diff --git a/server/lib/base-request-scheduler.js b/server/lib/base-request-scheduler.js
new file mode 100644
index 000000000..d15680c25
--- /dev/null
+++ b/server/lib/base-request-scheduler.js
@@ -0,0 +1,140 @@
1'use strict'
2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../initializers/constants')
6const db = require('../initializers/database')
7const logger = require('../helpers/logger')
8const requests = require('../helpers/requests')
9
10module.exports = class BaseRequestScheduler {
11
12 constructor (options) {
13 this.lastRequestTimestamp = 0
14 this.timer = null
15 }
16
17 activate () {
18 logger.info('Requests scheduler activated.')
19 this.lastRequestTimestamp = Date.now()
20
21 this.timer = setInterval(() => {
22 this.lastRequestTimestamp = Date.now()
23 this.makeRequests()
24 }, constants.REQUESTS_INTERVAL)
25 }
26
27 deactivate () {
28 logger.info('Requests scheduler deactivated.')
29 clearInterval(this.timer)
30 this.timer = null
31 }
32
33 forceSend () {
34 logger.info('Force requests scheduler sending.')
35 this.makeRequests()
36 }
37
38 remainingMilliSeconds () {
39 if (this.timer === null) return -1
40
41 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
42 }
43
44 // ---------------------------------------------------------------------------
45
46 // Make a requests to friends of a certain type
47 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
48 if (!callback) callback = function () {}
49
50 const params = {
51 toPod: toPod,
52 sign: true, // Prove our identity
53 method: 'POST',
54 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
55 data: requestsToMake // Requests we need to make
56 }
57
58 // Make multiple retry requests to all of pods
59 // The function fire some useful callbacks
60 requests.makeSecureRequest(params, (err, res) => {
61 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
62 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
63 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
64
65 return callback(false)
66 }
67
68 return callback(true)
69 })
70 }
71
72 // Make all the requests of the scheduler
73 makeRequests () {
74 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
75 if (err) {
76 logger.error('Cannot get the list of "%s".', this.description, { err: err })
77 return // Abort
78 }
79
80 // If there are no requests, abort
81 if (requests.length === 0) {
82 logger.info('No "%s" to make.', this.description)
83 return
84 }
85
86 // We want to group requests by destinations pod and endpoint
87 const requestsToMakeGrouped = this.buildRequestObjects(requests)
88
89 logger.info('Making "%s" to friends.', this.description)
90
91 const goodPods = []
92 const badPods = []
93
94 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
95 const requestToMake = requestsToMakeGrouped[hashKey]
96 const toPod = requestToMake.toPod
97
98 // Maybe the pod is not our friend anymore so simply remove it
99 if (!toPod) {
100 const requestIdsToDelete = requestToMake.ids
101
102 logger.info('Removing %d "%s" of unexisting pod %s.', requestIdsToDelete.length, this.description, requestToMake.toPod.id)
103 return this.getRequestToPodModel().removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
104 }
105
106 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
107 if (success === false) {
108 badPods.push(requestToMake.toPod.id)
109 return callbackEach()
110 }
111
112 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
113 goodPods.push(requestToMake.toPod.id)
114
115 // Remove the pod id of these request ids
116 this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
117
118 this.afterRequestHook()
119 })
120 }, () => {
121 // All the requests were made, we update the pods score
122 db.Pod.updatePodsScore(goodPods, badPods)
123
124 this.afterRequestsHook()
125 })
126 })
127 }
128
129 flush (callback) {
130 this.getRequestModel().removeAll(callback)
131 }
132
133 afterRequestHook () {
134 // Nothing to do, let children reimplement it
135 }
136
137 afterRequestsHook () {
138 // Nothing to do, let children reimplement it
139 }
140}