aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/friends.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-07-05 13:26:25 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-07-05 14:14:16 +0200
commit6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch)
tree3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib/friends.ts
parent5fe7e898316e18369c3e1aba307b55077adc7bfb (diff)
downloadPeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib/friends.ts')
-rw-r--r--server/lib/friends.ts318
1 files changed, 136 insertions, 182 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts
index 522cb82b3..498144318 100644
--- a/server/lib/friends.ts
+++ b/server/lib/friends.ts
@@ -1,6 +1,6 @@
1import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2import * as request from 'request' 1import * as request from 'request'
3import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import * as Promise from 'bluebird'
4 4
5import { database as db } from '../initializers/database' 5import { database as db } from '../initializers/database'
6import { 6import {
@@ -15,8 +15,7 @@ import {
15 logger, 15 logger,
16 getMyPublicCert, 16 getMyPublicCert,
17 makeSecureRequest, 17 makeSecureRequest,
18 makeRetryRequest, 18 makeRetryRequest
19 createEmptyCallback
20} from '../helpers' 19} from '../helpers'
21import { 20import {
22 RequestScheduler, 21 RequestScheduler,
@@ -53,24 +52,24 @@ function activateSchedulers () {
53 requestVideoEventScheduler.activate() 52 requestVideoEventScheduler.activate()
54} 53}
55 54
56function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { 55function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) {
57 const options = { 56 const options = {
58 type: ENDPOINT_ACTIONS.ADD, 57 type: ENDPOINT_ACTIONS.ADD,
59 endpoint: REQUEST_ENDPOINTS.VIDEOS, 58 endpoint: REQUEST_ENDPOINTS.VIDEOS,
60 data: videoData, 59 data: videoData,
61 transaction 60 transaction
62 } 61 }
63 createRequest(options, callback) 62 return createRequest(options)
64} 63}
65 64
66function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { 65function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) {
67 const options = { 66 const options = {
68 type: ENDPOINT_ACTIONS.UPDATE, 67 type: ENDPOINT_ACTIONS.UPDATE,
69 endpoint: REQUEST_ENDPOINTS.VIDEOS, 68 endpoint: REQUEST_ENDPOINTS.VIDEOS,
70 data: videoData, 69 data: videoData,
71 transaction 70 transaction
72 } 71 }
73 createRequest(options, callback) 72 return createRequest(options)
74} 73}
75 74
76function removeVideoToFriends (videoParams: Object) { 75function removeVideoToFriends (videoParams: Object) {
@@ -80,121 +79,93 @@ function removeVideoToFriends (videoParams: Object) {
80 data: videoParams, 79 data: videoParams,
81 transaction: null 80 transaction: null
82 } 81 }
83 createRequest(options) 82 return createRequest(options)
84} 83}
85 84
86function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) { 85function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance, transaction: Sequelize.Transaction) {
87 const options = { 86 const options = {
88 type: ENDPOINT_ACTIONS.REPORT_ABUSE, 87 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
89 endpoint: REQUEST_ENDPOINTS.VIDEOS, 88 endpoint: REQUEST_ENDPOINTS.VIDEOS,
90 data: reportData, 89 data: reportData,
91 toIds: [ video.Author.podId ], 90 toIds: [ video.Author.podId ],
92 transaction: null 91 transaction
93 } 92 }
94 createRequest(options) 93 return createRequest(options)
95} 94}
96 95
97function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { 96function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction) {
98 const options = { 97 const options = {
99 videoId: qaduParam.videoId, 98 videoId: qaduParam.videoId,
100 type: qaduParam.type, 99 type: qaduParam.type,
101 transaction 100 transaction
102 } 101 }
103 return createVideoQaduRequest(options, callback) 102 return createVideoQaduRequest(options)
104} 103}
105 104
106function quickAndDirtyUpdatesVideoToFriends ( 105function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction) {
107 qadusParams: QaduParam[],
108 transaction: Sequelize.Transaction,
109 finalCallback: (err: Error) => void
110) {
111 const tasks = [] 106 const tasks = []
112 107
113 qadusParams.forEach(function (qaduParams) { 108 qadusParams.forEach(function (qaduParams) {
114 const fun = function (callback) { 109 tasks.push(quickAndDirtyUpdateVideoToFriends(qaduParams, transaction))
115 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
116 }
117
118 tasks.push(fun)
119 }) 110 })
120 111
121 series(tasks, finalCallback) 112 return Promise.all(tasks)
122} 113}
123 114
124function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { 115function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction) {
125 const options = { 116 const options = {
126 videoId: eventParam.videoId, 117 videoId: eventParam.videoId,
127 type: eventParam.type, 118 type: eventParam.type,
128 transaction 119 transaction
129 } 120 }
130 createVideoEventRequest(options, callback) 121 return createVideoEventRequest(options)
131} 122}
132 123
133function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { 124function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
134 const tasks = [] 125 const tasks = []
135 126
136 eventsParams.forEach(function (eventParams) { 127 eventsParams.forEach(function (eventParams) {
137 const fun = function (callback) { 128 tasks.push(addEventToRemoteVideo(eventParams, transaction))
138 addEventToRemoteVideo(eventParams, transaction, callback)
139 }
140
141 tasks.push(fun)
142 }) 129 })
143 130
144 series(tasks, finalCallback) 131 return Promise.all(tasks)
145} 132}
146 133
147function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) { 134function hasFriends () {
148 db.Pod.countAll(function (err, count) { 135 return db.Pod.countAll().then(count => count !== 0)
149 if (err) return callback(err)
150
151 const hasFriends = (count !== 0)
152 callback(null, hasFriends)
153 })
154} 136}
155 137
156function makeFriends (hosts: string[], callback: (err: Error) => void) { 138function makeFriends (hosts: string[]) {
157 const podsScore = {} 139 const podsScore = {}
158 140
159 logger.info('Make friends!') 141 logger.info('Make friends!')
160 getMyPublicCert(function (err, cert) { 142 return getMyPublicCert()
161 if (err) { 143 .then(cert => {
162 logger.error('Cannot read public cert.') 144 return Promise.mapSeries(hosts, host => {
163 return callback(err) 145 return computeForeignPodsList(host, podsScore)
164 } 146 }).then(() => cert)
165 147 })
166 eachSeries(hosts, function (host, callbackEach) { 148 .then(cert => {
167 computeForeignPodsList(host, podsScore, callbackEach)
168 }, function (err: Error) {
169 if (err) return callback(err)
170
171 logger.debug('Pods scores computed.', { podsScore: podsScore }) 149 logger.debug('Pods scores computed.', { podsScore: podsScore })
172 const podsList = computeWinningPods(hosts, podsScore) 150 const podsList = computeWinningPods(hosts, podsScore)
173 logger.debug('Pods that we keep.', { podsToKeep: podsList }) 151 logger.debug('Pods that we keep.', { podsToKeep: podsList })
174 152
175 makeRequestsToWinningPods(cert, podsList, callback) 153 return makeRequestsToWinningPods(cert, podsList)
176 }) 154 })
177 })
178} 155}
179 156
180function quitFriends (callback: (err: Error) => void) { 157function quitFriends () {
181 // Stop pool requests 158 // Stop pool requests
182 requestScheduler.deactivate() 159 requestScheduler.deactivate()
183 160
184 waterfall([ 161 return requestScheduler.flush()
185 function flushRequests (callbackAsync) { 162 .then(() => {
186 requestScheduler.flush(err => callbackAsync(err)) 163 return requestVideoQaduScheduler.flush()
187 }, 164 })
188 165 .then(() => {
189 function flushVideoQaduRequests (callbackAsync) { 166 return db.Pod.list()
190 requestVideoQaduScheduler.flush(err => callbackAsync(err)) 167 })
191 }, 168 .then(pods => {
192
193 function getPodsList (callbackAsync) {
194 return db.Pod.list(callbackAsync)
195 },
196
197 function announceIQuitMyFriends (pods, callbackAsync) {
198 const requestParams = { 169 const requestParams = {
199 method: 'POST' as 'POST', 170 method: 'POST' as 'POST',
200 path: '/api/' + API_VERSION + '/remote/pods/remove', 171 path: '/api/' + API_VERSION + '/remote/pods/remove',
@@ -205,61 +176,57 @@ function quitFriends (callback: (err: Error) => void) {
205 // Announce we quit them 176 // Announce we quit them
206 // We don't care if the request fails 177 // We don't care if the request fails
207 // The other pod will exclude us automatically after a while 178 // The other pod will exclude us automatically after a while
208 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 179 return Promise.map(pods, pod => {
209 requestParams.toPod = pod 180 requestParams.toPod = pod
210 makeSecureRequest(requestParams, callbackEach) 181 return makeSecureRequest(requestParams)
211 }, function (err) { 182 }, { concurrency: REQUESTS_IN_PARALLEL })
212 if (err) { 183 .then(() => pods)
213 logger.error('Some errors while quitting friends.', { err: err }) 184 .catch(err => {
214 // Don't stop the process 185 logger.error('Some errors while quitting friends.', { err: err })
215 } 186 // Don't stop the process
216
217 return callbackAsync(null, pods)
218 }) 187 })
219 }, 188 })
220 189 .then(pods => {
221 function removePodsFromDB (pods, callbackAsync) { 190 const tasks = []
222 each(pods, function (pod: any, callbackEach) { 191 pods.forEach(pod => tasks.push(pod.destroy()))
223 pod.destroy().asCallback(callbackEach)
224 }, callbackAsync)
225 }
226 ], function (err: Error) {
227 // Don't forget to re activate the scheduler, even if there was an error
228 requestScheduler.activate()
229
230 if (err) return callback(err)
231 192
232 logger.info('Removed all remote videos.') 193 return Promise.all(pods)
233 return callback(null) 194 })
234 }) 195 .then(() => {
196 logger.info('Removed all remote videos.')
197 // Don't forget to re activate the scheduler, even if there was an error
198 return requestScheduler.activate()
199 })
200 .finally(() => requestScheduler.activate())
235} 201}
236 202
237function sendOwnedVideosToPod (podId: number) { 203function sendOwnedVideosToPod (podId: number) {
238 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { 204 db.Video.listOwnedAndPopulateAuthorAndTags()
239 if (err) { 205 .then(videosList => {
240 logger.error('Cannot get the list of videos we own.') 206 const tasks = []
241 return 207 videosList.forEach(video => {
242 } 208 const promise = video.toAddRemoteJSON()
243 209 .then(remoteVideo => {
244 videosList.forEach(function (video) { 210 const options = {
245 video.toAddRemoteJSON(function (err, remoteVideo) { 211 type: 'add',
246 if (err) { 212 endpoint: REQUEST_ENDPOINTS.VIDEOS,
247 logger.error('Cannot convert video to remote.', { error: err }) 213 data: remoteVideo,
248 // Don't break the process 214 toIds: [ podId ],
249 return 215 transaction: null
250 } 216 }
251 217 return createRequest(options)
252 const options = { 218 })
253 type: 'add', 219 .catch(err => {
254 endpoint: REQUEST_ENDPOINTS.VIDEOS, 220 logger.error('Cannot convert video to remote.', { error: err })
255 data: remoteVideo, 221 // Don't break the process
256 toIds: [ podId ], 222 return undefined
257 transaction: null 223 })
258 } 224
259 createRequest(options) 225 tasks.push(promise)
260 }) 226 })
227
228 return Promise.all(tasks)
261 }) 229 })
262 })
263} 230}
264 231
265function getRequestScheduler () { 232function getRequestScheduler () {
@@ -297,23 +264,22 @@ export {
297 264
298// --------------------------------------------------------------------------- 265// ---------------------------------------------------------------------------
299 266
300function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) { 267function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
301 getForeignPodsList(host, function (err, res) { 268 // TODO: type res
302 if (err) return callback(err) 269 return getForeignPodsList(host).then((res: any) => {
303
304 const foreignPodsList = res.data 270 const foreignPodsList = res.data
305 271
306 // Let's give 1 point to the pod we ask the friends list 272 // Let's give 1 point to the pod we ask the friends list
307 foreignPodsList.push({ host }) 273 foreignPodsList.push({ host })
308 274
309 foreignPodsList.forEach(function (foreignPod) { 275 foreignPodsList.forEach(foreignPod => {
310 const foreignPodHost = foreignPod.host 276 const foreignPodHost = foreignPod.host
311 277
312 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ 278 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
313 else podsScore[foreignPodHost] = 1 279 else podsScore[foreignPodHost] = 1
314 }) 280 })
315 281
316 return callback(null) 282 return undefined
317 }) 283 })
318} 284}
319 285
@@ -323,7 +289,7 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
323 const podsList = [] 289 const podsList = []
324 const baseScore = hosts.length / 2 290 const baseScore = hosts.length / 2
325 291
326 Object.keys(podsScore).forEach(function (podHost) { 292 Object.keys(podsScore).forEach(podHost => {
327 // If the pod is not me and with a good score we add it 293 // If the pod is not me and with a good score we add it
328 if (isMe(podHost) === false && podsScore[podHost] > baseScore) { 294 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
329 podsList.push({ host: podHost }) 295 podsList.push({ host: podHost })
@@ -333,28 +299,30 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
333 return podsList 299 return podsList
334} 300}
335 301
336function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) { 302function getForeignPodsList (host: string) {
337 const path = '/api/' + API_VERSION + '/pods' 303 return new Promise((res, rej) => {
304 const path = '/api/' + API_VERSION + '/pods'
338 305
339 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { 306 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
340 if (err) return callback(err) 307 if (err) return rej(err)
341 308
342 try { 309 try {
343 const json = JSON.parse(body) 310 const json = JSON.parse(body)
344 return callback(null, json) 311 return res(json)
345 } catch (err) { 312 } catch (err) {
346 return callback(err) 313 return rej(err)
347 } 314 }
315 })
348 }) 316 })
349} 317}
350 318
351function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) { 319function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
352 // Stop pool requests 320 // Stop pool requests
353 requestScheduler.deactivate() 321 requestScheduler.deactivate()
354 // Flush pool requests 322 // Flush pool requests
355 requestScheduler.forceSend() 323 requestScheduler.forceSend()
356 324
357 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) { 325 return Promise.map(podsList, pod => {
358 const params = { 326 const params = {
359 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', 327 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
360 method: 'POST' as 'POST', 328 method: 'POST' as 'POST',
@@ -365,38 +333,35 @@ function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callb
365 } 333 }
366 } 334 }
367 335
368 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) { 336 return makeRetryRequest(params)
369 if (err) { 337 .then(({ response, body }) => {
370 logger.error('Error with adding %s pod.', pod.host, { error: err }) 338 body = body as { cert: string, email: string }
339
340 if (response.statusCode === 200) {
341 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
342 return podObj.save()
343 .then(podCreated => {
344
345 // Add our videos to the request scheduler
346 sendOwnedVideosToPod(podCreated.id)
347 })
348 .catch(err => {
349 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
350 })
351 } else {
352 logger.error('Status not 200 for %s pod.', pod.host)
353 }
354 })
355 .catch(err => {
356 logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
371 // Don't break the process 357 // Don't break the process
372 return callbackEach() 358 })
373 } 359 }, { concurrency: REQUESTS_IN_PARALLEL })
374 360 .then(() => logger.debug('makeRequestsToWinningPods finished.'))
375 if (res.statusCode === 200) { 361 .finally(() => {
376 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
377 podObj.save().asCallback(function (err, podCreated) {
378 if (err) {
379 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
380 return callbackEach()
381 }
382
383 // Add our videos to the request scheduler
384 sendOwnedVideosToPod(podCreated.id)
385
386 return callbackEach()
387 })
388 } else {
389 logger.error('Status not 200 for %s pod.', pod.host)
390 return callbackEach()
391 }
392 })
393 }, function endRequests () {
394 // Final callback, we've ended all the requests 362 // Final callback, we've ended all the requests
395 // Now we made new friends, we can re activate the pool of requests 363 // Now we made new friends, we can re activate the pool of requests
396 requestScheduler.activate() 364 requestScheduler.activate()
397
398 logger.debug('makeRequestsToWinningPods finished.')
399 return callback(null)
400 }) 365 })
401} 366}
402 367
@@ -408,33 +373,22 @@ type CreateRequestOptions = {
408 toIds?: number[] 373 toIds?: number[]
409 transaction: Sequelize.Transaction 374 transaction: Sequelize.Transaction
410} 375}
411function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) { 376function createRequest (options: CreateRequestOptions) {
412 if (!callback) callback = function () { /* empty */ } 377 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
413
414 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
415 378
416 // If the "toIds" pods is not specified, we send the request to all our friends 379 // If the "toIds" pods is not specified, we send the request to all our friends
417 db.Pod.listAllIds(options.transaction, function (err, podIds) { 380 return db.Pod.listAllIds(options.transaction).then(podIds => {
418 if (err) {
419 logger.error('Cannot get pod ids', { error: err })
420 return
421 }
422
423 const newOptions = Object.assign(options, { toIds: podIds }) 381 const newOptions = Object.assign(options, { toIds: podIds })
424 return requestScheduler.createRequest(newOptions, callback) 382 return requestScheduler.createRequest(newOptions)
425 }) 383 })
426} 384}
427 385
428function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { 386function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {
429 if (!callback) callback = createEmptyCallback() 387 return requestVideoQaduScheduler.createRequest(options)
430
431 requestVideoQaduScheduler.createRequest(options, callback)
432} 388}
433 389
434function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { 390function createVideoEventRequest (options: RequestVideoEventSchedulerOptions) {
435 if (!callback) callback = createEmptyCallback() 391 return requestVideoEventScheduler.createRequest(options)
436
437 requestVideoEventScheduler.createRequest(options, callback)
438} 392}
439 393
440function isMe (host: string) { 394function isMe (host: string) {