aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/request
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-07-05 13:26:25 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-07-05 14:14:16 +0200
commit6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch)
tree3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib/request
parent5fe7e898316e18369c3e1aba307b55077adc7bfb (diff)
downloadPeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib/request')
-rw-r--r--server/lib/request/abstract-request-scheduler.ts124
-rw-r--r--server/lib/request/request-scheduler.ts32
-rw-r--r--server/lib/request/request-video-event-scheduler.ts13
-rw-r--r--server/lib/request/request-video-qadu-scheduler.ts13
4 files changed, 88 insertions, 94 deletions
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts
index e81ab9c36..dd77fddb7 100644
--- a/server/lib/request/abstract-request-scheduler.ts
+++ b/server/lib/request/abstract-request-scheduler.ts
@@ -1,15 +1,16 @@
1import * as eachLimit from 'async/eachLimit' 1import { isEmpty } from 'lodash'
2import * as Promise from 'bluebird'
2 3
3import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
4import { logger, makeSecureRequest } from '../../helpers' 5import { logger, makeSecureRequest } from '../../helpers'
5import { PodInstance } from '../../models' 6import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models'
6import { 7import {
7 API_VERSION, 8 API_VERSION,
8 REQUESTS_IN_PARALLEL, 9 REQUESTS_IN_PARALLEL,
9 REQUESTS_INTERVAL 10 REQUESTS_INTERVAL
10} from '../../initializers' 11} from '../../initializers'
11 12
12abstract class AbstractRequestScheduler { 13abstract class AbstractRequestScheduler <T> {
13 requestInterval: number 14 requestInterval: number
14 limitPods: number 15 limitPods: number
15 limitPerPod: number 16 limitPerPod: number
@@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler {
24 this.requestInterval = REQUESTS_INTERVAL 25 this.requestInterval = REQUESTS_INTERVAL
25 } 26 }
26 27
27 abstract getRequestModel () 28 abstract getRequestModel (): AbstractRequestClass<T>
28 abstract getRequestToPodModel () 29 abstract getRequestToPodModel (): AbstractRequestToPodClass
29 abstract buildRequestObjects (requests: any) 30 abstract buildRequestObjects (requestsGrouped: T): {}
30 31
31 activate () { 32 activate () {
32 logger.info('Requests scheduler activated.') 33 logger.info('Requests scheduler activated.')
@@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler {
55 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) 56 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
56 } 57 }
57 58
58 remainingRequestsCount (callback: (err: Error, total: number) => void) { 59 remainingRequestsCount () {
59 return this.getRequestModel().countTotalRequests(callback) 60 return this.getRequestModel().countTotalRequests()
60 } 61 }
61 62
62 flush (callback: (err: Error) => void) { 63 flush () {
63 this.getRequestModel().removeAll(callback) 64 return this.getRequestModel().removeAll()
64 } 65 }
65 66
66 // --------------------------------------------------------------------------- 67 // ---------------------------------------------------------------------------
67 68
68 // Make a requests to friends of a certain type 69 // Make a requests to friends of a certain type
69 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { 70 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) {
70 if (!callback) callback = function () { /* empty */ }
71
72 const params = { 71 const params = {
73 toPod: toPod, 72 toPod: toPod,
74 sign: true, // Prove our identity 73 sign: true, // Prove our identity
@@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler {
79 78
80 // Make multiple retry requests to all of pods 79 // Make multiple retry requests to all of pods
81 // The function fire some useful callbacks 80 // The function fire some useful callbacks
82 makeSecureRequest(params, (err, res) => { 81 return makeSecureRequest(params)
83 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { 82 .then(({ response, body }) => {
84 err = err ? err.message : 'Status code not 20x : ' + res.statusCode 83 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
84 throw new Error('Status code not 20x : ' + response.statusCode)
85 }
86 })
87 .catch(err => {
85 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) 88 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
86 89
87 return callback(err) 90 throw err
88 } 91 })
89
90 return callback(null)
91 })
92 } 92 }
93 93
94 // Make all the requests of the scheduler 94 // Make all the requests of the scheduler
95 protected makeRequests () { 95 protected makeRequests () {
96 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { 96 return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
97 if (err) { 97 .then((requestsGrouped: T) => {
98 logger.error('Cannot get the list of "%s".', this.description, { err: err }) 98 // We want to group requests by destinations pod and endpoint
99 return // Abort 99 const requestsToMake = this.buildRequestObjects(requestsGrouped)
100 } 100
101 101 // If there are no requests, abort
102 // If there are no requests, abort 102 if (isEmpty(requestsToMake) === true) {
103 if (requests.length === 0) { 103 logger.info('No "%s" to make.', this.description)
104 logger.info('No "%s" to make.', this.description) 104 return { goodPods: [], badPods: [] }
105 return 105 }
106 } 106
107 107 logger.info('Making "%s" to friends.', this.description)
108 // We want to group requests by destinations pod and endpoint 108
109 const requestsToMakeGrouped = this.buildRequestObjects(requests) 109 const goodPods = []
110 110 const badPods = []
111 logger.info('Making "%s" to friends.', this.description) 111
112 112 return Promise.map(Object.keys(requestsToMake), hashKey => {
113 const goodPods = [] 113 const requestToMake = requestsToMake[hashKey]
114 const badPods = [] 114 const toPod: PodInstance = requestToMake.toPod
115 115
116 eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { 116 return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
117 const requestToMake = requestsToMakeGrouped[hashKey] 117 .then(() => {
118 const toPod = requestToMake.toPod 118 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
119 119 goodPods.push(requestToMake.toPod.id)
120 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { 120
121 if (err) { 121 this.afterRequestHook()
122 badPods.push(requestToMake.toPod.id) 122
123 return callbackEach() 123 // Remove the pod id of these request ids
124 } 124 return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
125 125 })
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) 126 .catch(err => {
127 goodPods.push(requestToMake.toPod.id) 127 badPods.push(requestToMake.toPod.id)
128 128 logger.info('Cannot make request to %s.', toPod.host, { error: err })
129 // Remove the pod id of these request ids 129 })
130 this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) 130 }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
131 })
132 .then(({ goodPods, badPods }) => {
133 this.afterRequestsHook()
131 134
132 this.afterRequestHook()
133 })
134 }, () => {
135 // All the requests were made, we update the pods score 135 // All the requests were made, we update the pods score
136 db.Pod.updatePodsScore(goodPods, badPods) 136 return db.Pod.updatePodsScore(goodPods, badPods)
137
138 this.afterRequestsHook()
139 }) 137 })
140 }) 138 .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
141 } 139 }
142 140
143 protected afterRequestHook () { 141 protected afterRequestHook () {
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts
index 575e0227c..0dd796fb0 100644
--- a/server/lib/request/request-scheduler.ts
+++ b/server/lib/request/request-scheduler.ts
@@ -3,10 +3,8 @@ import * as Sequelize from 'sequelize'
3import { database as db } from '../../initializers/database' 3import { database as db } from '../../initializers/database'
4import { AbstractRequestScheduler } from './abstract-request-scheduler' 4import { AbstractRequestScheduler } from './abstract-request-scheduler'
5import { logger } from '../../helpers' 5import { logger } from '../../helpers'
6import { 6import { REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PER_POD } from '../../initializers'
7 REQUESTS_LIMIT_PODS, 7import { RequestsGrouped } from '../../models'
8 REQUESTS_LIMIT_PER_POD
9} from '../../initializers'
10import { RequestEndpoint } from '../../../shared' 8import { RequestEndpoint } from '../../../shared'
11 9
12export type RequestSchedulerOptions = { 10export type RequestSchedulerOptions = {
@@ -17,7 +15,7 @@ export type RequestSchedulerOptions = {
17 transaction: Sequelize.Transaction 15 transaction: Sequelize.Transaction
18} 16}
19 17
20class RequestScheduler extends AbstractRequestScheduler { 18class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
21 constructor () { 19 constructor () {
22 super() 20 super()
23 21
@@ -36,11 +34,11 @@ class RequestScheduler extends AbstractRequestScheduler {
36 return db.RequestToPod 34 return db.RequestToPod
37 } 35 }
38 36
39 buildRequestObjects (requests: { [ toPodId: number ]: any }) { 37 buildRequestObjects (requestsGrouped: RequestsGrouped) {
40 const requestsToMakeGrouped = {} 38 const requestsToMakeGrouped = {}
41 39
42 Object.keys(requests).forEach(toPodId => { 40 Object.keys(requestsGrouped).forEach(toPodId => {
43 requests[toPodId].forEach(data => { 41 requestsGrouped[toPodId].forEach(data => {
44 const request = data.request 42 const request = data.request
45 const pod = data.pod 43 const pod = data.pod
46 const hashKey = toPodId + request.endpoint 44 const hashKey = toPodId + request.endpoint
@@ -62,12 +60,12 @@ class RequestScheduler extends AbstractRequestScheduler {
62 return requestsToMakeGrouped 60 return requestsToMakeGrouped
63 } 61 }
64 62
65 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions, callback: (err: Error) => void) { 63 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
66 // TODO: check the setPods works 64 // TODO: check the setPods works
67 const podIds = [] 65 const podIds = []
68 66
69 // If there are no destination pods abort 67 // If there are no destination pods abort
70 if (toIds.length === 0) return callback(null) 68 if (toIds.length === 0) return undefined
71 69
72 toIds.forEach(toPod => { 70 toIds.forEach(toPod => {
73 podIds.push(toPod) 71 podIds.push(toPod)
@@ -85,20 +83,18 @@ class RequestScheduler extends AbstractRequestScheduler {
85 transaction 83 transaction
86 } 84 }
87 85
88 return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { 86 return db.Request.create(createQuery, dbRequestOptions)
89 if (err) return callback(err) 87 .then(request => {
90 88 return request.setPods(podIds, dbRequestOptions)
91 return request.setPods(podIds, dbRequestOptions).asCallback(callback) 89 })
92 })
93 } 90 }
94 91
95 // --------------------------------------------------------------------------- 92 // ---------------------------------------------------------------------------
96 93
97 afterRequestsHook () { 94 afterRequestsHook () {
98 // Flush requests with no pod 95 // Flush requests with no pod
99 this.getRequestModel().removeWithEmptyTo(err => { 96 this.getRequestModel().removeWithEmptyTo()
100 if (err) logger.error('Error when removing requests with no pods.', { error: err }) 97 .catch(err => logger.error('Error when removing requests with no pods.', { error: err }))
101 })
102 } 98 }
103} 99}
104 100
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts
index 4bb76f4c9..d4d714c02 100644
--- a/server/lib/request/request-video-event-scheduler.ts
+++ b/server/lib/request/request-video-event-scheduler.ts
@@ -7,6 +7,7 @@ import {
7 REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, 7 REQUESTS_VIDEO_EVENT_LIMIT_PER_POD,
8 REQUEST_VIDEO_EVENT_ENDPOINT 8 REQUEST_VIDEO_EVENT_ENDPOINT
9} from '../../initializers' 9} from '../../initializers'
10import { RequestsVideoEventGrouped } from '../../models'
10import { RequestVideoEventType } from '../../../shared' 11import { RequestVideoEventType } from '../../../shared'
11 12
12export type RequestVideoEventSchedulerOptions = { 13export type RequestVideoEventSchedulerOptions = {
@@ -16,7 +17,7 @@ export type RequestVideoEventSchedulerOptions = {
16 transaction?: Sequelize.Transaction 17 transaction?: Sequelize.Transaction
17} 18}
18 19
19class RequestVideoEventScheduler extends AbstractRequestScheduler { 20class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoEventGrouped> {
20 constructor () { 21 constructor () {
21 super() 22 super()
22 23
@@ -35,7 +36,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
35 return db.RequestVideoEvent 36 return db.RequestVideoEvent
36 } 37 }
37 38
38 buildRequestObjects (eventsToProcess: { [ toPodId: number ]: any }[]) { 39 buildRequestObjects (eventRequests: RequestsVideoEventGrouped) {
39 const requestsToMakeGrouped = {} 40 const requestsToMakeGrouped = {}
40 41
41 /* Example: 42 /* Example:
@@ -50,8 +51,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
50 51
51 // We group video events per video and per pod 52 // We group video events per video and per pod
52 // We add the counts of the same event types 53 // We add the counts of the same event types
53 Object.keys(eventsToProcess).forEach(toPodId => { 54 Object.keys(eventRequests).forEach(toPodId => {
54 eventsToProcess[toPodId].forEach(eventToProcess => { 55 eventRequests[toPodId].forEach(eventToProcess => {
55 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} 56 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
56 57
57 if (!requestsToMakeGrouped[toPodId]) { 58 if (!requestsToMakeGrouped[toPodId]) {
@@ -97,7 +98,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
97 return requestsToMakeGrouped 98 return requestsToMakeGrouped
98 } 99 }
99 100
100 createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { 101 createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions) {
101 if (count === undefined) count = 1 102 if (count === undefined) count = 1
102 103
103 const dbRequestOptions: Sequelize.CreateOptions = {} 104 const dbRequestOptions: Sequelize.CreateOptions = {}
@@ -109,7 +110,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
109 videoId 110 videoId
110 } 111 }
111 112
112 return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) 113 return db.RequestVideoEvent.create(createQuery, dbRequestOptions)
113 } 114 }
114} 115}
115 116
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts
index d7169cc81..5ec7de9c2 100644
--- a/server/lib/request/request-video-qadu-scheduler.ts
+++ b/server/lib/request/request-video-qadu-scheduler.ts
@@ -9,6 +9,7 @@ import {
9 REQUEST_VIDEO_QADU_ENDPOINT, 9 REQUEST_VIDEO_QADU_ENDPOINT,
10 REQUEST_VIDEO_QADU_TYPES 10 REQUEST_VIDEO_QADU_TYPES
11} from '../../initializers' 11} from '../../initializers'
12import { RequestsVideoQaduGrouped } from '../../models'
12import { RequestVideoQaduType } from '../../../shared' 13import { RequestVideoQaduType } from '../../../shared'
13 14
14export type RequestVideoQaduSchedulerOptions = { 15export type RequestVideoQaduSchedulerOptions = {
@@ -17,7 +18,7 @@ export type RequestVideoQaduSchedulerOptions = {
17 transaction?: Sequelize.Transaction 18 transaction?: Sequelize.Transaction
18} 19}
19 20
20class RequestVideoQaduScheduler extends AbstractRequestScheduler { 21class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQaduGrouped> {
21 constructor () { 22 constructor () {
22 super() 23 super()
23 24
@@ -36,7 +37,7 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler {
36 return db.RequestVideoQadu 37 return db.RequestVideoQadu
37 } 38 }
38 39
39 buildRequestObjects (requests: { [ toPodId: number ]: any }[]) { 40 buildRequestObjects (requests: RequestsVideoQaduGrouped) {
40 const requestsToMakeGrouped = {} 41 const requestsToMakeGrouped = {}
41 42
42 Object.keys(requests).forEach(toPodId => { 43 Object.keys(requests).forEach(toPodId => {
@@ -105,20 +106,18 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler {
105 return requestsToMakeGrouped 106 return requestsToMakeGrouped
106 } 107 }
107 108
108 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { 109 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
109 const dbRequestOptions: Sequelize.BulkCreateOptions = {} 110 const dbRequestOptions: Sequelize.BulkCreateOptions = {}
110 if (transaction) dbRequestOptions.transaction = transaction 111 if (transaction) dbRequestOptions.transaction = transaction
111 112
112 // Send the update to all our friends 113 // Send the update to all our friends
113 db.Pod.listAllIds(transaction, function (err, podIds) { 114 return db.Pod.listAllIds(transaction).then(podIds => {
114 if (err) return callback(err)
115
116 const queries = [] 115 const queries = []
117 podIds.forEach(podId => { 116 podIds.forEach(podId => {
118 queries.push({ type, videoId, podId }) 117 queries.push({ type, videoId, podId })
119 }) 118 })
120 119
121 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) 120 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
122 }) 121 })
123 } 122 }
124} 123}