diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-02-21 21:35:59 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-02-26 20:01:26 +0100 |
commit | 9e167724f7e933f41d9ea2e1c31772bf4c560a28 (patch) | |
tree | 093cb7c1b088f35aaf847f859a313a121c8cd233 /server/lib | |
parent | 0150b17e51df3e9fad8a59133d828c68f8ba672b (diff) | |
download | PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.tar.gz PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.tar.zst PeerTube-9e167724f7e933f41d9ea2e1c31772bf4c560a28.zip |
Server: make a basic "quick and dirty update" for videos
This system will be useful to to update some int video attributes
(likes, dislikes, views...)
The classic system is not used because we need some optimization for
scaling
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/base-request-scheduler.js | 140 | ||||
-rw-r--r-- | server/lib/friends.js | 26 | ||||
-rw-r--r-- | server/lib/request-scheduler.js | 178 | ||||
-rw-r--r-- | server/lib/request-video-qadu-scheduler.js | 116 |
4 files changed, 319 insertions, 141 deletions
diff --git a/server/lib/base-request-scheduler.js b/server/lib/base-request-scheduler.js new file mode 100644 index 000000000..d15680c25 --- /dev/null +++ b/server/lib/base-request-scheduler.js | |||
@@ -0,0 +1,140 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../initializers/constants') | ||
6 | const db = require('../initializers/database') | ||
7 | const logger = require('../helpers/logger') | ||
8 | const requests = require('../helpers/requests') | ||
9 | |||
10 | module.exports = class BaseRequestScheduler { | ||
11 | |||
12 | constructor (options) { | ||
13 | this.lastRequestTimestamp = 0 | ||
14 | this.timer = null | ||
15 | } | ||
16 | |||
17 | activate () { | ||
18 | logger.info('Requests scheduler activated.') | ||
19 | this.lastRequestTimestamp = Date.now() | ||
20 | |||
21 | this.timer = setInterval(() => { | ||
22 | this.lastRequestTimestamp = Date.now() | ||
23 | this.makeRequests() | ||
24 | }, constants.REQUESTS_INTERVAL) | ||
25 | } | ||
26 | |||
27 | deactivate () { | ||
28 | logger.info('Requests scheduler deactivated.') | ||
29 | clearInterval(this.timer) | ||
30 | this.timer = null | ||
31 | } | ||
32 | |||
33 | forceSend () { | ||
34 | logger.info('Force requests scheduler sending.') | ||
35 | this.makeRequests() | ||
36 | } | ||
37 | |||
38 | remainingMilliSeconds () { | ||
39 | if (this.timer === null) return -1 | ||
40 | |||
41 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
42 | } | ||
43 | |||
44 | // --------------------------------------------------------------------------- | ||
45 | |||
46 | // Make a requests to friends of a certain type | ||
47 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
48 | if (!callback) callback = function () {} | ||
49 | |||
50 | const params = { | ||
51 | toPod: toPod, | ||
52 | sign: true, // Prove our identity | ||
53 | method: 'POST', | ||
54 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
55 | data: requestsToMake // Requests we need to make | ||
56 | } | ||
57 | |||
58 | // Make multiple retry requests to all of pods | ||
59 | // The function fire some useful callbacks | ||
60 | requests.makeSecureRequest(params, (err, res) => { | ||
61 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
62 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
63 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
64 | |||
65 | return callback(false) | ||
66 | } | ||
67 | |||
68 | return callback(true) | ||
69 | }) | ||
70 | } | ||
71 | |||
72 | // Make all the requests of the scheduler | ||
73 | makeRequests () { | ||
74 | this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { | ||
75 | if (err) { | ||
76 | logger.error('Cannot get the list of "%s".', this.description, { err: err }) | ||
77 | return // Abort | ||
78 | } | ||
79 | |||
80 | // If there are no requests, abort | ||
81 | if (requests.length === 0) { | ||
82 | logger.info('No "%s" to make.', this.description) | ||
83 | return | ||
84 | } | ||
85 | |||
86 | // We want to group requests by destinations pod and endpoint | ||
87 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
88 | |||
89 | logger.info('Making "%s" to friends.', this.description) | ||
90 | |||
91 | const goodPods = [] | ||
92 | const badPods = [] | ||
93 | |||
94 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
95 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
96 | const toPod = requestToMake.toPod | ||
97 | |||
98 | // Maybe the pod is not our friend anymore so simply remove it | ||
99 | if (!toPod) { | ||
100 | const requestIdsToDelete = requestToMake.ids | ||
101 | |||
102 | logger.info('Removing %d "%s" of unexisting pod %s.', requestIdsToDelete.length, this.description, requestToMake.toPod.id) | ||
103 | return this.getRequestToPodModel().removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
104 | } | ||
105 | |||
106 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { | ||
107 | if (success === false) { | ||
108 | badPods.push(requestToMake.toPod.id) | ||
109 | return callbackEach() | ||
110 | } | ||
111 | |||
112 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
113 | goodPods.push(requestToMake.toPod.id) | ||
114 | |||
115 | // Remove the pod id of these request ids | ||
116 | this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
117 | |||
118 | this.afterRequestHook() | ||
119 | }) | ||
120 | }, () => { | ||
121 | // All the requests were made, we update the pods score | ||
122 | db.Pod.updatePodsScore(goodPods, badPods) | ||
123 | |||
124 | this.afterRequestsHook() | ||
125 | }) | ||
126 | }) | ||
127 | } | ||
128 | |||
129 | flush (callback) { | ||
130 | this.getRequestModel().removeAll(callback) | ||
131 | } | ||
132 | |||
133 | afterRequestHook () { | ||
134 | // Nothing to do, let children reimplement it | ||
135 | } | ||
136 | |||
137 | afterRequestsHook () { | ||
138 | // Nothing to do, let children reimplement it | ||
139 | } | ||
140 | } | ||
diff --git a/server/lib/friends.js b/server/lib/friends.js index d53ab4553..424a30801 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js | |||
@@ -12,15 +12,19 @@ const logger = require('../helpers/logger') | |||
12 | const peertubeCrypto = require('../helpers/peertube-crypto') | 12 | const peertubeCrypto = require('../helpers/peertube-crypto') |
13 | const requests = require('../helpers/requests') | 13 | const requests = require('../helpers/requests') |
14 | const RequestScheduler = require('./request-scheduler') | 14 | const RequestScheduler = require('./request-scheduler') |
15 | const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler') | ||
15 | 16 | ||
16 | const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] | 17 | const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] |
18 | |||
17 | const requestScheduler = new RequestScheduler() | 19 | const requestScheduler = new RequestScheduler() |
20 | const requestSchedulerVideoQadu = new RequestVideoQaduScheduler() | ||
18 | 21 | ||
19 | const friends = { | 22 | const friends = { |
20 | activate, | 23 | activate, |
21 | addVideoToFriends, | 24 | addVideoToFriends, |
22 | updateVideoToFriends, | 25 | updateVideoToFriends, |
23 | reportAbuseVideoToFriend, | 26 | reportAbuseVideoToFriend, |
27 | quickAndDirtyUpdateVideoToFriends, | ||
24 | hasFriends, | 28 | hasFriends, |
25 | makeFriends, | 29 | makeFriends, |
26 | quitFriends, | 30 | quitFriends, |
@@ -30,6 +34,7 @@ const friends = { | |||
30 | 34 | ||
31 | function activate () { | 35 | function activate () { |
32 | requestScheduler.activate() | 36 | requestScheduler.activate() |
37 | requestSchedulerVideoQadu.activate() | ||
33 | } | 38 | } |
34 | 39 | ||
35 | function addVideoToFriends (videoData, transaction, callback) { | 40 | function addVideoToFriends (videoData, transaction, callback) { |
@@ -71,6 +76,15 @@ function reportAbuseVideoToFriend (reportData, video) { | |||
71 | createRequest(options) | 76 | createRequest(options) |
72 | } | 77 | } |
73 | 78 | ||
79 | function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) { | ||
80 | const options = { | ||
81 | videoId, | ||
82 | type, | ||
83 | transaction | ||
84 | } | ||
85 | return createVideoQaduRequest(options, callback) | ||
86 | } | ||
87 | |||
74 | function hasFriends (callback) { | 88 | function hasFriends (callback) { |
75 | db.Pod.countAll(function (err, count) { | 89 | db.Pod.countAll(function (err, count) { |
76 | if (err) return callback(err) | 90 | if (err) return callback(err) |
@@ -110,7 +124,11 @@ function quitFriends (callback) { | |||
110 | 124 | ||
111 | waterfall([ | 125 | waterfall([ |
112 | function flushRequests (callbackAsync) { | 126 | function flushRequests (callbackAsync) { |
113 | requestScheduler.flush(callbackAsync) | 127 | requestScheduler.flush(err => callbackAsync(err)) |
128 | }, | ||
129 | |||
130 | function flushVideoQaduRequests (callbackAsync) { | ||
131 | requestSchedulerVideoQadu.flush(err => callbackAsync(err)) | ||
114 | }, | 132 | }, |
115 | 133 | ||
116 | function getPodsList (callbackAsync) { | 134 | function getPodsList (callbackAsync) { |
@@ -310,6 +328,12 @@ function createRequest (options, callback) { | |||
310 | }) | 328 | }) |
311 | } | 329 | } |
312 | 330 | ||
331 | function createVideoQaduRequest (options, callback) { | ||
332 | if (!callback) callback = function () {} | ||
333 | |||
334 | requestSchedulerVideoQadu.createRequest(options, callback) | ||
335 | } | ||
336 | |||
313 | function isMe (host) { | 337 | function isMe (host) { |
314 | return host === constants.CONFIG.WEBSERVER.HOST | 338 | return host === constants.CONFIG.WEBSERVER.HOST |
315 | } | 339 | } |
diff --git a/server/lib/request-scheduler.js b/server/lib/request-scheduler.js index 28dabe339..6b6535519 100644 --- a/server/lib/request-scheduler.js +++ b/server/lib/request-scheduler.js | |||
@@ -1,44 +1,54 @@ | |||
1 | 'use strict' | 1 | 'use strict' |
2 | 2 | ||
3 | const eachLimit = require('async/eachLimit') | ||
4 | |||
5 | const constants = require('../initializers/constants') | 3 | const constants = require('../initializers/constants') |
4 | const BaseRequestScheduler = require('./base-request-scheduler') | ||
6 | const db = require('../initializers/database') | 5 | const db = require('../initializers/database') |
7 | const logger = require('../helpers/logger') | 6 | const logger = require('../helpers/logger') |
8 | const requests = require('../helpers/requests') | ||
9 | 7 | ||
10 | module.exports = class RequestScheduler { | 8 | module.exports = class RequestScheduler extends BaseRequestScheduler { |
11 | 9 | ||
12 | constructor () { | 10 | constructor () { |
13 | this.lastRequestTimestamp = 0 | 11 | super() |
14 | this.timer = null | ||
15 | } | ||
16 | 12 | ||
17 | activate () { | 13 | // We limit the size of the requests |
18 | logger.info('Requests scheduler activated.') | 14 | this.limitPods = constants.REQUESTS_LIMIT_PODS |
19 | this.lastRequestTimestamp = Date.now() | 15 | this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD |
20 | 16 | ||
21 | this.timer = setInterval(() => { | 17 | this.description = 'requests' |
22 | this.lastRequestTimestamp = Date.now() | ||
23 | this.makeRequests() | ||
24 | }, constants.REQUESTS_INTERVAL) | ||
25 | } | 18 | } |
26 | 19 | ||
27 | deactivate () { | 20 | getRequestModel () { |
28 | logger.info('Requests scheduler deactivated.') | 21 | return db.Request |
29 | clearInterval(this.timer) | ||
30 | this.timer = null | ||
31 | } | 22 | } |
32 | 23 | ||
33 | forceSend () { | 24 | getRequestToPodModel () { |
34 | logger.info('Force requests scheduler sending.') | 25 | return db.RequestToPod |
35 | this.makeRequests() | ||
36 | } | 26 | } |
37 | 27 | ||
38 | remainingMilliSeconds () { | 28 | buildRequestObjects (requests) { |
39 | if (this.timer === null) return -1 | 29 | const requestsToMakeGrouped = {} |
30 | |||
31 | Object.keys(requests).forEach(toPodId => { | ||
32 | requests[toPodId].forEach(data => { | ||
33 | const request = data.request | ||
34 | const pod = data.pod | ||
35 | const hashKey = toPodId + request.endpoint | ||
36 | |||
37 | if (!requestsToMakeGrouped[hashKey]) { | ||
38 | requestsToMakeGrouped[hashKey] = { | ||
39 | toPod: pod, | ||
40 | endpoint: request.endpoint, | ||
41 | ids: [], // request ids, to delete them from the DB in the future | ||
42 | datas: [] // requests data, | ||
43 | } | ||
44 | } | ||
45 | |||
46 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
47 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
48 | }) | ||
49 | }) | ||
40 | 50 | ||
41 | return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | 51 | return requestsToMakeGrouped |
42 | } | 52 | } |
43 | 53 | ||
44 | // { type, endpoint, data, toIds, transaction } | 54 | // { type, endpoint, data, toIds, transaction } |
@@ -79,122 +89,10 @@ module.exports = class RequestScheduler { | |||
79 | 89 | ||
80 | // --------------------------------------------------------------------------- | 90 | // --------------------------------------------------------------------------- |
81 | 91 | ||
82 | // Make all the requests of the scheduler | 92 | afterRequestsHook () { |
83 | makeRequests () { | 93 | // Flush requests with no pod |
84 | // We limit the size of the requests | 94 | this.getRequestModel().removeWithEmptyTo(err => { |
85 | // We don't want to stuck with the same failing requests so we get a random list | 95 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) |
86 | db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => { | ||
87 | if (err) { | ||
88 | logger.error('Cannot get the list of requests.', { err: err }) | ||
89 | return // Abort | ||
90 | } | ||
91 | |||
92 | // If there are no requests, abort | ||
93 | if (requests.length === 0) { | ||
94 | logger.info('No requests to make.') | ||
95 | return | ||
96 | } | ||
97 | |||
98 | // We want to group requests by destinations pod and endpoint | ||
99 | const requestsToMakeGrouped = this.buildRequestObjects(requests) | ||
100 | |||
101 | logger.info('Making requests to friends.') | ||
102 | |||
103 | const goodPods = [] | ||
104 | const badPods = [] | ||
105 | |||
106 | eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { | ||
107 | const requestToMake = requestsToMakeGrouped[hashKey] | ||
108 | const toPod = requestToMake.toPod | ||
109 | |||
110 | // Maybe the pod is not our friend anymore so simply remove it | ||
111 | if (!toPod) { | ||
112 | const requestIdsToDelete = requestToMake.ids | ||
113 | |||
114 | logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id) | ||
115 | return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach) | ||
116 | } | ||
117 | |||
118 | this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => { | ||
119 | if (success === false) { | ||
120 | badPods.push(requestToMake.toPod.id) | ||
121 | return callbackEach() | ||
122 | } | ||
123 | |||
124 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
125 | goodPods.push(requestToMake.toPod.id) | ||
126 | |||
127 | // Remove the pod id of these request ids | ||
128 | db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach) | ||
129 | }) | ||
130 | }, () => { | ||
131 | // All the requests were made, we update the pods score | ||
132 | db.Request.updatePodsScore(goodPods, badPods) | ||
133 | // Flush requests with no pod | ||
134 | db.Request.removeWithEmptyTo(err => { | ||
135 | if (err) logger.error('Error when removing requests with no pods.', { error: err }) | ||
136 | }) | ||
137 | }) | ||
138 | }) | ||
139 | } | ||
140 | |||
141 | // Make a requests to friends of a certain type | ||
142 | makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | ||
143 | if (!callback) callback = function () {} | ||
144 | |||
145 | const params = { | ||
146 | toPod: toPod, | ||
147 | sign: true, // Prove our identity | ||
148 | method: 'POST', | ||
149 | path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, | ||
150 | data: requestsToMake // Requests we need to make | ||
151 | } | ||
152 | |||
153 | // Make multiple retry requests to all of pods | ||
154 | // The function fire some useful callbacks | ||
155 | requests.makeSecureRequest(params, (err, res) => { | ||
156 | if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { | ||
157 | err = err ? err.message : 'Status code not 20x : ' + res.statusCode | ||
158 | logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) | ||
159 | |||
160 | return callback(false) | ||
161 | } | ||
162 | |||
163 | return callback(true) | ||
164 | }) | ||
165 | } | ||
166 | |||
167 | buildRequestObjects (requests) { | ||
168 | const requestsToMakeGrouped = {} | ||
169 | |||
170 | Object.keys(requests).forEach(toPodId => { | ||
171 | requests[toPodId].forEach(data => { | ||
172 | const request = data.request | ||
173 | const pod = data.pod | ||
174 | const hashKey = toPodId + request.endpoint | ||
175 | |||
176 | if (!requestsToMakeGrouped[hashKey]) { | ||
177 | requestsToMakeGrouped[hashKey] = { | ||
178 | toPod: pod, | ||
179 | endpoint: request.endpoint, | ||
180 | ids: [], // request ids, to delete them from the DB in the future | ||
181 | datas: [] // requests data, | ||
182 | } | ||
183 | } | ||
184 | |||
185 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
186 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
187 | }) | ||
188 | }) | ||
189 | |||
190 | return requestsToMakeGrouped | ||
191 | } | ||
192 | |||
193 | flush (callback) { | ||
194 | db.Request.removeAll(err => { | ||
195 | if (err) logger.error('Cannot flush the requests.', { error: err }) | ||
196 | |||
197 | return callback(err) | ||
198 | }) | 96 | }) |
199 | } | 97 | } |
200 | } | 98 | } |
diff --git a/server/lib/request-video-qadu-scheduler.js b/server/lib/request-video-qadu-scheduler.js new file mode 100644 index 000000000..401b2fb44 --- /dev/null +++ b/server/lib/request-video-qadu-scheduler.js | |||
@@ -0,0 +1,116 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const BaseRequestScheduler = require('./base-request-scheduler') | ||
4 | const constants = require('../initializers/constants') | ||
5 | const db = require('../initializers/database') | ||
6 | const logger = require('../helpers/logger') | ||
7 | |||
8 | module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler { | ||
9 | |||
10 | constructor () { | ||
11 | super() | ||
12 | |||
13 | // We limit the size of the requests | ||
14 | this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS | ||
15 | this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS | ||
16 | |||
17 | this.description = 'video QADU requests' | ||
18 | } | ||
19 | |||
20 | getRequestModel () { | ||
21 | return db.RequestVideoQadu | ||
22 | } | ||
23 | |||
24 | getRequestToPodModel () { | ||
25 | return db.RequestVideoQadu | ||
26 | } | ||
27 | |||
28 | buildRequestObjects (requests) { | ||
29 | const requestsToMakeGrouped = {} | ||
30 | |||
31 | Object.keys(requests).forEach(toPodId => { | ||
32 | requests[toPodId].forEach(data => { | ||
33 | const request = data.request | ||
34 | const video = data.video | ||
35 | const pod = data.pod | ||
36 | const hashKey = toPodId | ||
37 | |||
38 | if (!requestsToMakeGrouped[hashKey]) { | ||
39 | requestsToMakeGrouped[hashKey] = { | ||
40 | toPod: pod, | ||
41 | endpoint: constants.REQUEST_ENDPOINTS.QADU, | ||
42 | ids: [], // request ids, to delete them from the DB in the future | ||
43 | datas: [], // requests data | ||
44 | videos: {} | ||
45 | } | ||
46 | } | ||
47 | |||
48 | if (!requestsToMakeGrouped[hashKey].videos[video.id]) { | ||
49 | requestsToMakeGrouped[hashKey].videos[video.id] = {} | ||
50 | } | ||
51 | |||
52 | const videoData = requestsToMakeGrouped[hashKey].videos[video.id] | ||
53 | |||
54 | switch (request.type) { | ||
55 | case constants.REQUEST_VIDEO_QADU_TYPES.LIKES: | ||
56 | videoData.likes = video.likes | ||
57 | break | ||
58 | |||
59 | case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES: | ||
60 | videoData.likes = video.dislikes | ||
61 | break | ||
62 | |||
63 | case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS: | ||
64 | videoData.views = video.views | ||
65 | break | ||
66 | |||
67 | default: | ||
68 | logger.error('Unknown request video QADU type %s.', request.type) | ||
69 | return | ||
70 | } | ||
71 | |||
72 | // Do not forget the remoteId so the remote pod can identify the video | ||
73 | videoData.remoteId = video.id | ||
74 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
75 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData | ||
76 | }) | ||
77 | }) | ||
78 | |||
79 | Object.keys(requestsToMakeGrouped).forEach(hashKey => { | ||
80 | Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => { | ||
81 | const videoData = requestsToMakeGrouped[hashKey].videos[videoId] | ||
82 | |||
83 | requestsToMakeGrouped[hashKey].datas.push({ | ||
84 | data: videoData | ||
85 | }) | ||
86 | }) | ||
87 | |||
88 | // We don't need it anymore, it was just to build our datas array | ||
89 | delete requestsToMakeGrouped[hashKey].videos | ||
90 | }) | ||
91 | |||
92 | return requestsToMakeGrouped | ||
93 | } | ||
94 | |||
95 | // { type, videoId, transaction? } | ||
96 | createRequest (options, callback) { | ||
97 | const type = options.type | ||
98 | const videoId = options.videoId | ||
99 | const transaction = options.transaction | ||
100 | |||
101 | const dbRequestOptions = {} | ||
102 | if (transaction) dbRequestOptions.transaction = transaction | ||
103 | |||
104 | // Send the update to all our friends | ||
105 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | ||
106 | if (err) return callback(err) | ||
107 | |||
108 | const queries = [] | ||
109 | podIds.forEach(podId => { | ||
110 | queries.push({ type, videoId, podId }) | ||
111 | }) | ||
112 | |||
113 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) | ||
114 | }) | ||
115 | } | ||
116 | } | ||