aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-02-21 21:35:59 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-02-26 20:01:26 +0100
commit9e167724f7e933f41d9ea2e1c31772bf4c560a28 (patch)
tree093cb7c1b088f35aaf847f859a313a121c8cd233 /server/lib
parent0150b17e51df3e9fad8a59133d828c68f8ba672b (diff)
downloadPeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.tar.gz
PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.tar.zst
PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.zip
Server: make a basic "quick and dirty update" for videos
This system will be useful to to update some int video attributes (likes, dislikes, views...) The classic system is not used because we need some optimization for scaling
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/base-request-scheduler.js140
-rw-r--r--server/lib/friends.js26
-rw-r--r--server/lib/request-scheduler.js178
-rw-r--r--server/lib/request-video-qadu-scheduler.js116
4 files changed, 319 insertions, 141 deletions
diff --git a/server/lib/base-request-scheduler.js b/server/lib/base-request-scheduler.js
new file mode 100644
index 000000000..d15680c25
--- /dev/null
+++ b/server/lib/base-request-scheduler.js
@@ -0,0 +1,140 @@
1'use strict'
2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../initializers/constants')
6const db = require('../initializers/database')
7const logger = require('../helpers/logger')
8const requests = require('../helpers/requests')
9
10module.exports = class BaseRequestScheduler {
11
12 constructor (options) {
13 this.lastRequestTimestamp = 0
14 this.timer = null
15 }
16
17 activate () {
18 logger.info('Requests scheduler activated.')
19 this.lastRequestTimestamp = Date.now()
20
21 this.timer = setInterval(() => {
22 this.lastRequestTimestamp = Date.now()
23 this.makeRequests()
24 }, constants.REQUESTS_INTERVAL)
25 }
26
27 deactivate () {
28 logger.info('Requests scheduler deactivated.')
29 clearInterval(this.timer)
30 this.timer = null
31 }
32
33 forceSend () {
34 logger.info('Force requests scheduler sending.')
35 this.makeRequests()
36 }
37
38 remainingMilliSeconds () {
39 if (this.timer === null) return -1
40
41 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
42 }
43
44 // ---------------------------------------------------------------------------
45
46 // Make a requests to friends of a certain type
47 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
48 if (!callback) callback = function () {}
49
50 const params = {
51 toPod: toPod,
52 sign: true, // Prove our identity
53 method: 'POST',
54 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
55 data: requestsToMake // Requests we need to make
56 }
57
58 // Make multiple retry requests to all of pods
59 // The function fire some useful callbacks
60 requests.makeSecureRequest(params, (err, res) => {
61 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
62 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
63 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
64
65 return callback(false)
66 }
67
68 return callback(true)
69 })
70 }
71
72 // Make all the requests of the scheduler
73 makeRequests () {
74 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
75 if (err) {
76 logger.error('Cannot get the list of "%s".', this.description, { err: err })
77 return // Abort
78 }
79
80 // If there are no requests, abort
81 if (requests.length === 0) {
82 logger.info('No "%s" to make.', this.description)
83 return
84 }
85
86 // We want to group requests by destinations pod and endpoint
87 const requestsToMakeGrouped = this.buildRequestObjects(requests)
88
89 logger.info('Making "%s" to friends.', this.description)
90
91 const goodPods = []
92 const badPods = []
93
94 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
95 const requestToMake = requestsToMakeGrouped[hashKey]
96 const toPod = requestToMake.toPod
97
98 // Maybe the pod is not our friend anymore so simply remove it
99 if (!toPod) {
100 const requestIdsToDelete = requestToMake.ids
101
102 logger.info('Removing %d "%s" of unexisting pod %s.', requestIdsToDelete.length, this.description, requestToMake.toPod.id)
103 return this.getRequestToPodModel().removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
104 }
105
106 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
107 if (success === false) {
108 badPods.push(requestToMake.toPod.id)
109 return callbackEach()
110 }
111
112 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
113 goodPods.push(requestToMake.toPod.id)
114
115 // Remove the pod id of these request ids
116 this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
117
118 this.afterRequestHook()
119 })
120 }, () => {
121 // All the requests were made, we update the pods score
122 db.Pod.updatePodsScore(goodPods, badPods)
123
124 this.afterRequestsHook()
125 })
126 })
127 }
128
129 flush (callback) {
130 this.getRequestModel().removeAll(callback)
131 }
132
133 afterRequestHook () {
134 // Nothing to do, let children reimplement it
135 }
136
137 afterRequestsHook () {
138 // Nothing to do, let children reimplement it
139 }
140}
diff --git a/server/lib/friends.js b/server/lib/friends.js
index d53ab4553..424a30801 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.js
@@ -12,15 +12,19 @@ const logger = require('../helpers/logger')
12const peertubeCrypto = require('../helpers/peertube-crypto') 12const peertubeCrypto = require('../helpers/peertube-crypto')
13const requests = require('../helpers/requests') 13const requests = require('../helpers/requests')
14const RequestScheduler = require('./request-scheduler') 14const RequestScheduler = require('./request-scheduler')
15const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
15 16
16const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] 17const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
18
17const requestScheduler = new RequestScheduler() 19const requestScheduler = new RequestScheduler()
20const requestSchedulerVideoQadu = new RequestVideoQaduScheduler()
18 21
19const friends = { 22const friends = {
20 activate, 23 activate,
21 addVideoToFriends, 24 addVideoToFriends,
22 updateVideoToFriends, 25 updateVideoToFriends,
23 reportAbuseVideoToFriend, 26 reportAbuseVideoToFriend,
27 quickAndDirtyUpdateVideoToFriends,
24 hasFriends, 28 hasFriends,
25 makeFriends, 29 makeFriends,
26 quitFriends, 30 quitFriends,
@@ -30,6 +34,7 @@ const friends = {
30 34
31function activate () { 35function activate () {
32 requestScheduler.activate() 36 requestScheduler.activate()
37 requestSchedulerVideoQadu.activate()
33} 38}
34 39
35function addVideoToFriends (videoData, transaction, callback) { 40function addVideoToFriends (videoData, transaction, callback) {
@@ -71,6 +76,15 @@ function reportAbuseVideoToFriend (reportData, video) {
71 createRequest(options) 76 createRequest(options)
72} 77}
73 78
79function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
80 const options = {
81 videoId,
82 type,
83 transaction
84 }
85 return createVideoQaduRequest(options, callback)
86}
87
74function hasFriends (callback) { 88function hasFriends (callback) {
75 db.Pod.countAll(function (err, count) { 89 db.Pod.countAll(function (err, count) {
76 if (err) return callback(err) 90 if (err) return callback(err)
@@ -110,7 +124,11 @@ function quitFriends (callback) {
110 124
111 waterfall([ 125 waterfall([
112 function flushRequests (callbackAsync) { 126 function flushRequests (callbackAsync) {
113 requestScheduler.flush(callbackAsync) 127 requestScheduler.flush(err => callbackAsync(err))
128 },
129
130 function flushVideoQaduRequests (callbackAsync) {
131 requestSchedulerVideoQadu.flush(err => callbackAsync(err))
114 }, 132 },
115 133
116 function getPodsList (callbackAsync) { 134 function getPodsList (callbackAsync) {
@@ -310,6 +328,12 @@ function createRequest (options, callback) {
310 }) 328 })
311} 329}
312 330
331function createVideoQaduRequest (options, callback) {
332 if (!callback) callback = function () {}
333
334 requestSchedulerVideoQadu.createRequest(options, callback)
335}
336
313function isMe (host) { 337function isMe (host) {
314 return host === constants.CONFIG.WEBSERVER.HOST 338 return host === constants.CONFIG.WEBSERVER.HOST
315} 339}
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js
index 28dabe339..6b6535519 100644
--- a/server/lib/request-scheduler.js
+++ b/server/lib/request-scheduler.js
@@ -1,44 +1,54 @@
1'use strict' 1'use strict'
2 2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../initializers/constants') 3const constants = require('../initializers/constants')
4const BaseRequestScheduler = require('./base-request-scheduler')
6const db = require('../initializers/database') 5const db = require('../initializers/database')
7const logger = require('../helpers/logger') 6const logger = require('../helpers/logger')
8const requests = require('../helpers/requests')
9 7
10module.exports = class RequestScheduler { 8module.exports = class RequestScheduler extends BaseRequestScheduler {
11 9
12 constructor () { 10 constructor () {
13 this.lastRequestTimestamp = 0 11 super()
14 this.timer = null
15 }
16 12
17 activate () { 13 // We limit the size of the requests
18 logger.info('Requests scheduler activated.') 14 this.limitPods = constants.REQUESTS_LIMIT_PODS
19 this.lastRequestTimestamp = Date.now() 15 this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD
20 16
21 this.timer = setInterval(() => { 17 this.description = 'requests'
22 this.lastRequestTimestamp = Date.now()
23 this.makeRequests()
24 }, constants.REQUESTS_INTERVAL)
25 } 18 }
26 19
27 deactivate () { 20 getRequestModel () {
28 logger.info('Requests scheduler deactivated.') 21 return db.Request
29 clearInterval(this.timer)
30 this.timer = null
31 } 22 }
32 23
33 forceSend () { 24 getRequestToPodModel () {
34 logger.info('Force requests scheduler sending.') 25 return db.RequestToPod
35 this.makeRequests()
36 } 26 }
37 27
38 remainingMilliSeconds () { 28 buildRequestObjects (requests) {
39 if (this.timer === null) return -1 29 const requestsToMakeGrouped = {}
30
31 Object.keys(requests).forEach(toPodId => {
32 requests[toPodId].forEach(data => {
33 const request = data.request
34 const pod = data.pod
35 const hashKey = toPodId + request.endpoint
36
37 if (!requestsToMakeGrouped[hashKey]) {
38 requestsToMakeGrouped[hashKey] = {
39 toPod: pod,
40 endpoint: request.endpoint,
41 ids: [], // request ids, to delete them from the DB in the future
42 datas: [] // requests data,
43 }
44 }
45
46 requestsToMakeGrouped[hashKey].ids.push(request.id)
47 requestsToMakeGrouped[hashKey].datas.push(request.request)
48 })
49 })
40 50
41 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) 51 return requestsToMakeGrouped
42 } 52 }
43 53
44 // { type, endpoint, data, toIds, transaction } 54 // { type, endpoint, data, toIds, transaction }
@@ -79,122 +89,10 @@ module.exports = class RequestScheduler {
79 89
80 // --------------------------------------------------------------------------- 90 // ---------------------------------------------------------------------------
81 91
82 // Make all the requests of the scheduler 92 afterRequestsHook () {
83 makeRequests () { 93 // Flush requests with no pod
84 // We limit the size of the requests 94 this.getRequestModel().removeWithEmptyTo(err => {
85 // We don't want to stuck with the same failing requests so we get a random list 95 if (err) logger.error('Error when removing requests with no pods.', { error: err })
86 db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
87 if (err) {
88 logger.error('Cannot get the list of requests.', { err: err })
89 return // Abort
90 }
91
92 // If there are no requests, abort
93 if (requests.length === 0) {
94 logger.info('No requests to make.')
95 return
96 }
97
98 // We want to group requests by destinations pod and endpoint
99 const requestsToMakeGrouped = this.buildRequestObjects(requests)
100
101 logger.info('Making requests to friends.')
102
103 const goodPods = []
104 const badPods = []
105
106 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
107 const requestToMake = requestsToMakeGrouped[hashKey]
108 const toPod = requestToMake.toPod
109
110 // Maybe the pod is not our friend anymore so simply remove it
111 if (!toPod) {
112 const requestIdsToDelete = requestToMake.ids
113
114 logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
115 return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
116 }
117
118 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
119 if (success === false) {
120 badPods.push(requestToMake.toPod.id)
121 return callbackEach()
122 }
123
124 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
125 goodPods.push(requestToMake.toPod.id)
126
127 // Remove the pod id of these request ids
128 db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
129 })
130 }, () => {
131 // All the requests were made, we update the pods score
132 db.Request.updatePodsScore(goodPods, badPods)
133 // Flush requests with no pod
134 db.Request.removeWithEmptyTo(err => {
135 if (err) logger.error('Error when removing requests with no pods.', { error: err })
136 })
137 })
138 })
139 }
140
141 // Make a requests to friends of a certain type
142 makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
143 if (!callback) callback = function () {}
144
145 const params = {
146 toPod: toPod,
147 sign: true, // Prove our identity
148 method: 'POST',
149 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
150 data: requestsToMake // Requests we need to make
151 }
152
153 // Make multiple retry requests to all of pods
154 // The function fire some useful callbacks
155 requests.makeSecureRequest(params, (err, res) => {
156 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
157 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
158 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
159
160 return callback(false)
161 }
162
163 return callback(true)
164 })
165 }
166
167 buildRequestObjects (requests) {
168 const requestsToMakeGrouped = {}
169
170 Object.keys(requests).forEach(toPodId => {
171 requests[toPodId].forEach(data => {
172 const request = data.request
173 const pod = data.pod
174 const hashKey = toPodId + request.endpoint
175
176 if (!requestsToMakeGrouped[hashKey]) {
177 requestsToMakeGrouped[hashKey] = {
178 toPod: pod,
179 endpoint: request.endpoint,
180 ids: [], // request ids, to delete them from the DB in the future
181 datas: [] // requests data,
182 }
183 }
184
185 requestsToMakeGrouped[hashKey].ids.push(request.id)
186 requestsToMakeGrouped[hashKey].datas.push(request.request)
187 })
188 })
189
190 return requestsToMakeGrouped
191 }
192
193 flush (callback) {
194 db.Request.removeAll(err => {
195 if (err) logger.error('Cannot flush the requests.', { error: err })
196
197 return callback(err)
198 }) 96 })
199 } 97 }
200} 98}
diff --git a/server/lib/request-video-qadu-scheduler.js b/server/lib/request-video-qadu-scheduler.js
new file mode 100644
index 000000000..401b2fb44
--- /dev/null
+++ b/server/lib/request-video-qadu-scheduler.js
@@ -0,0 +1,116 @@
1'use strict'
2
3const BaseRequestScheduler = require('./base-request-scheduler')
4const constants = require('../initializers/constants')
5const db = require('../initializers/database')
6const logger = require('../helpers/logger')
7
8module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
9
10 constructor () {
11 super()
12
13 // We limit the size of the requests
14 this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
15 this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
16
17 this.description = 'video QADU requests'
18 }
19
20 getRequestModel () {
21 return db.RequestVideoQadu
22 }
23
24 getRequestToPodModel () {
25 return db.RequestVideoQadu
26 }
27
28 buildRequestObjects (requests) {
29 const requestsToMakeGrouped = {}
30
31 Object.keys(requests).forEach(toPodId => {
32 requests[toPodId].forEach(data => {
33 const request = data.request
34 const video = data.video
35 const pod = data.pod
36 const hashKey = toPodId
37
38 if (!requestsToMakeGrouped[hashKey]) {
39 requestsToMakeGrouped[hashKey] = {
40 toPod: pod,
41 endpoint: constants.REQUEST_ENDPOINTS.QADU,
42 ids: [], // request ids, to delete them from the DB in the future
43 datas: [], // requests data
44 videos: {}
45 }
46 }
47
48 if (!requestsToMakeGrouped[hashKey].videos[video.id]) {
49 requestsToMakeGrouped[hashKey].videos[video.id] = {}
50 }
51
52 const videoData = requestsToMakeGrouped[hashKey].videos[video.id]
53
54 switch (request.type) {
55 case constants.REQUEST_VIDEO_QADU_TYPES.LIKES:
56 videoData.likes = video.likes
57 break
58
59 case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES:
60 videoData.likes = video.dislikes
61 break
62
63 case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS:
64 videoData.views = video.views
65 break
66
67 default:
68 logger.error('Unknown request video QADU type %s.', request.type)
69 return
70 }
71
72 // Do not forget the remoteId so the remote pod can identify the video
73 videoData.remoteId = video.id
74 requestsToMakeGrouped[hashKey].ids.push(request.id)
75 requestsToMakeGrouped[hashKey].videos[video.id] = videoData
76 })
77 })
78
79 Object.keys(requestsToMakeGrouped).forEach(hashKey => {
80 Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => {
81 const videoData = requestsToMakeGrouped[hashKey].videos[videoId]
82
83 requestsToMakeGrouped[hashKey].datas.push({
84 data: videoData
85 })
86 })
87
88 // We don't need it anymore, it was just to build our datas array
89 delete requestsToMakeGrouped[hashKey].videos
90 })
91
92 return requestsToMakeGrouped
93 }
94
95 // { type, videoId, transaction? }
96 createRequest (options, callback) {
97 const type = options.type
98 const videoId = options.videoId
99 const transaction = options.transaction
100
101 const dbRequestOptions = {}
102 if (transaction) dbRequestOptions.transaction = transaction
103
104 // Send the update to all our friends
105 db.Pod.listAllIds(options.transaction, function (err, podIds) {
106 if (err) return callback(err)
107
108 const queries = []
109 podIds.forEach(podId => {
110 queries.push({ type, videoId, podId })
111 })
112
113 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback)
114 })
115 }
116}