diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/friends.ts | 94 | ||||
-rw-r--r-- | server/lib/jobs/handlers/index.ts | 10 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-transcoder.ts | 7 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 24 | ||||
-rw-r--r-- | server/lib/oauth-model.ts | 15 | ||||
-rw-r--r-- | server/lib/request/base-request-scheduler.ts | 16 | ||||
-rw-r--r-- | server/lib/request/index.ts | 1 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.ts | 23 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.ts | 21 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.ts | 21 |
10 files changed, 140 insertions, 92 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts index 6b0fbd2bf..e097f9254 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts | |||
@@ -1,5 +1,6 @@ | |||
1 | import { each, eachLimit, eachSeries, series, waterfall } from 'async' | 1 | import { each, eachLimit, eachSeries, series, waterfall } from 'async' |
2 | import * as request from 'request' | 2 | import * as request from 'request' |
3 | import * as Sequelize from 'sequelize' | ||
3 | 4 | ||
4 | import { database as db } from '../initializers/database' | 5 | import { database as db } from '../initializers/database' |
5 | import { | 6 | import { |
@@ -19,9 +20,18 @@ import { | |||
19 | } from '../helpers' | 20 | } from '../helpers' |
20 | import { | 21 | import { |
21 | RequestScheduler, | 22 | RequestScheduler, |
23 | RequestSchedulerOptions, | ||
24 | |||
22 | RequestVideoQaduScheduler, | 25 | RequestVideoQaduScheduler, |
23 | RequestVideoEventScheduler | 26 | RequestVideoQaduSchedulerOptions, |
27 | |||
28 | RequestVideoEventScheduler, | ||
29 | RequestVideoEventSchedulerOptions | ||
24 | } from './request' | 30 | } from './request' |
31 | import { PodInstance, VideoInstance } from '../models' | ||
32 | |||
33 | type QaduParam = { videoId: string, type: string } | ||
34 | type EventParam = { videoId: string, type: string } | ||
25 | 35 | ||
26 | const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] | 36 | const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] |
27 | 37 | ||
@@ -35,7 +45,7 @@ function activateSchedulers () { | |||
35 | requestVideoEventScheduler.activate() | 45 | requestVideoEventScheduler.activate() |
36 | } | 46 | } |
37 | 47 | ||
38 | function addVideoToFriends (videoData, transaction, callback) { | 48 | function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { |
39 | const options = { | 49 | const options = { |
40 | type: ENDPOINT_ACTIONS.ADD, | 50 | type: ENDPOINT_ACTIONS.ADD, |
41 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 51 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
@@ -45,7 +55,7 @@ function addVideoToFriends (videoData, transaction, callback) { | |||
45 | createRequest(options, callback) | 55 | createRequest(options, callback) |
46 | } | 56 | } |
47 | 57 | ||
48 | function updateVideoToFriends (videoData, transaction, callback) { | 58 | function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { |
49 | const options = { | 59 | const options = { |
50 | type: ENDPOINT_ACTIONS.UPDATE, | 60 | type: ENDPOINT_ACTIONS.UPDATE, |
51 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 61 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
@@ -55,35 +65,37 @@ function updateVideoToFriends (videoData, transaction, callback) { | |||
55 | createRequest(options, callback) | 65 | createRequest(options, callback) |
56 | } | 66 | } |
57 | 67 | ||
58 | function removeVideoToFriends (videoParams) { | 68 | function removeVideoToFriends (videoParams: Object) { |
59 | const options = { | 69 | const options = { |
60 | type: ENDPOINT_ACTIONS.REMOVE, | 70 | type: ENDPOINT_ACTIONS.REMOVE, |
61 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 71 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
62 | data: videoParams | 72 | data: videoParams, |
73 | transaction: null | ||
63 | } | 74 | } |
64 | createRequest(options) | 75 | createRequest(options) |
65 | } | 76 | } |
66 | 77 | ||
67 | function reportAbuseVideoToFriend (reportData, video) { | 78 | function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) { |
68 | const options = { | 79 | const options = { |
69 | type: ENDPOINT_ACTIONS.REPORT_ABUSE, | 80 | type: ENDPOINT_ACTIONS.REPORT_ABUSE, |
70 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 81 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
71 | data: reportData, | 82 | data: reportData, |
72 | toIds: [ video.Author.podId ] | 83 | toIds: [ video.Author.podId ], |
84 | transaction: null | ||
73 | } | 85 | } |
74 | createRequest(options) | 86 | createRequest(options) |
75 | } | 87 | } |
76 | 88 | ||
77 | function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction?, callback?) { | 89 | function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { |
78 | const options = { | 90 | const options = { |
79 | videoId: qaduParams.videoId, | 91 | videoId: qaduParam.videoId, |
80 | type: qaduParams.type, | 92 | type: qaduParam.type, |
81 | transaction | 93 | transaction |
82 | } | 94 | } |
83 | return createVideoQaduRequest(options, callback) | 95 | return createVideoQaduRequest(options, callback) |
84 | } | 96 | } |
85 | 97 | ||
86 | function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) { | 98 | function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { |
87 | const tasks = [] | 99 | const tasks = [] |
88 | 100 | ||
89 | qadusParams.forEach(function (qaduParams) { | 101 | qadusParams.forEach(function (qaduParams) { |
@@ -97,16 +109,16 @@ function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCall | |||
97 | series(tasks, finalCallback) | 109 | series(tasks, finalCallback) |
98 | } | 110 | } |
99 | 111 | ||
100 | function addEventToRemoteVideo (eventParams, transaction?, callback?) { | 112 | function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { |
101 | const options = { | 113 | const options = { |
102 | videoId: eventParams.videoId, | 114 | videoId: eventParam.videoId, |
103 | type: eventParams.type, | 115 | type: eventParam.type, |
104 | transaction | 116 | transaction |
105 | } | 117 | } |
106 | createVideoEventRequest(options, callback) | 118 | createVideoEventRequest(options, callback) |
107 | } | 119 | } |
108 | 120 | ||
109 | function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) { | 121 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { |
110 | const tasks = [] | 122 | const tasks = [] |
111 | 123 | ||
112 | eventsParams.forEach(function (eventParams) { | 124 | eventsParams.forEach(function (eventParams) { |
@@ -120,7 +132,7 @@ function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) { | |||
120 | series(tasks, finalCallback) | 132 | series(tasks, finalCallback) |
121 | } | 133 | } |
122 | 134 | ||
123 | function hasFriends (callback) { | 135 | function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) { |
124 | db.Pod.countAll(function (err, count) { | 136 | db.Pod.countAll(function (err, count) { |
125 | if (err) return callback(err) | 137 | if (err) return callback(err) |
126 | 138 | ||
@@ -129,7 +141,7 @@ function hasFriends (callback) { | |||
129 | }) | 141 | }) |
130 | } | 142 | } |
131 | 143 | ||
132 | function makeFriends (hosts, callback) { | 144 | function makeFriends (hosts: string[], callback: (err: Error) => void) { |
133 | const podsScore = {} | 145 | const podsScore = {} |
134 | 146 | ||
135 | logger.info('Make friends!') | 147 | logger.info('Make friends!') |
@@ -141,7 +153,7 @@ function makeFriends (hosts, callback) { | |||
141 | 153 | ||
142 | eachSeries(hosts, function (host, callbackEach) { | 154 | eachSeries(hosts, function (host, callbackEach) { |
143 | computeForeignPodsList(host, podsScore, callbackEach) | 155 | computeForeignPodsList(host, podsScore, callbackEach) |
144 | }, function (err) { | 156 | }, function (err: Error) { |
145 | if (err) return callback(err) | 157 | if (err) return callback(err) |
146 | 158 | ||
147 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | 159 | logger.debug('Pods scores computed.', { podsScore: podsScore }) |
@@ -153,7 +165,7 @@ function makeFriends (hosts, callback) { | |||
153 | }) | 165 | }) |
154 | } | 166 | } |
155 | 167 | ||
156 | function quitFriends (callback) { | 168 | function quitFriends (callback: (err: Error) => void) { |
157 | // Stop pool requests | 169 | // Stop pool requests |
158 | requestScheduler.deactivate() | 170 | requestScheduler.deactivate() |
159 | 171 | ||
@@ -172,7 +184,7 @@ function quitFriends (callback) { | |||
172 | 184 | ||
173 | function announceIQuitMyFriends (pods, callbackAsync) { | 185 | function announceIQuitMyFriends (pods, callbackAsync) { |
174 | const requestParams = { | 186 | const requestParams = { |
175 | method: 'POST', | 187 | method: 'POST' as 'POST', |
176 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 188 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
177 | sign: true, | 189 | sign: true, |
178 | toPod: null | 190 | toPod: null |
@@ -199,7 +211,7 @@ function quitFriends (callback) { | |||
199 | pod.destroy().asCallback(callbackEach) | 211 | pod.destroy().asCallback(callbackEach) |
200 | }, callbackAsync) | 212 | }, callbackAsync) |
201 | } | 213 | } |
202 | ], function (err) { | 214 | ], function (err: Error) { |
203 | // Don't forget to re activate the scheduler, even if there was an error | 215 | // Don't forget to re activate the scheduler, even if there was an error |
204 | requestScheduler.activate() | 216 | requestScheduler.activate() |
205 | 217 | ||
@@ -210,7 +222,7 @@ function quitFriends (callback) { | |||
210 | }) | 222 | }) |
211 | } | 223 | } |
212 | 224 | ||
213 | function sendOwnedVideosToPod (podId) { | 225 | function sendOwnedVideosToPod (podId: number) { |
214 | db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { | 226 | db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { |
215 | if (err) { | 227 | if (err) { |
216 | logger.error('Cannot get the list of videos we own.') | 228 | logger.error('Cannot get the list of videos we own.') |
@@ -229,7 +241,8 @@ function sendOwnedVideosToPod (podId) { | |||
229 | type: 'add', | 241 | type: 'add', |
230 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 242 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
231 | data: remoteVideo, | 243 | data: remoteVideo, |
232 | toIds: [ podId ] | 244 | toIds: [ podId ], |
245 | transaction: null | ||
233 | } | 246 | } |
234 | createRequest(options) | 247 | createRequest(options) |
235 | }) | 248 | }) |
@@ -272,7 +285,7 @@ export { | |||
272 | 285 | ||
273 | // --------------------------------------------------------------------------- | 286 | // --------------------------------------------------------------------------- |
274 | 287 | ||
275 | function computeForeignPodsList (host, podsScore, callback) { | 288 | function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) { |
276 | getForeignPodsList(host, function (err, res) { | 289 | getForeignPodsList(host, function (err, res) { |
277 | if (err) return callback(err) | 290 | if (err) return callback(err) |
278 | 291 | ||
@@ -288,11 +301,11 @@ function computeForeignPodsList (host, podsScore, callback) { | |||
288 | else podsScore[foreignPodHost] = 1 | 301 | else podsScore[foreignPodHost] = 1 |
289 | }) | 302 | }) |
290 | 303 | ||
291 | return callback() | 304 | return callback(null) |
292 | }) | 305 | }) |
293 | } | 306 | } |
294 | 307 | ||
295 | function computeWinningPods (hosts, podsScore) { | 308 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { |
296 | // Build the list of pods to add | 309 | // Build the list of pods to add |
297 | // Only add a pod if it exists in more than a half base pods | 310 | // Only add a pod if it exists in more than a half base pods |
298 | const podsList = [] | 311 | const podsList = [] |
@@ -308,7 +321,7 @@ function computeWinningPods (hosts, podsScore) { | |||
308 | return podsList | 321 | return podsList |
309 | } | 322 | } |
310 | 323 | ||
311 | function getForeignPodsList (host, callback) { | 324 | function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) { |
312 | const path = '/api/' + API_VERSION + '/pods' | 325 | const path = '/api/' + API_VERSION + '/pods' |
313 | 326 | ||
314 | request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { | 327 | request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { |
@@ -323,16 +336,16 @@ function getForeignPodsList (host, callback) { | |||
323 | }) | 336 | }) |
324 | } | 337 | } |
325 | 338 | ||
326 | function makeRequestsToWinningPods (cert, podsList, callback) { | 339 | function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) { |
327 | // Stop pool requests | 340 | // Stop pool requests |
328 | requestScheduler.deactivate() | 341 | requestScheduler.deactivate() |
329 | // Flush pool requests | 342 | // Flush pool requests |
330 | requestScheduler.forceSend() | 343 | requestScheduler.forceSend() |
331 | 344 | ||
332 | eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: { host: string }, callbackEach) { | 345 | eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) { |
333 | const params = { | 346 | const params = { |
334 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', | 347 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', |
335 | method: 'POST', | 348 | method: 'POST' as 'POST', |
336 | json: { | 349 | json: { |
337 | host: CONFIG.WEBSERVER.HOST, | 350 | host: CONFIG.WEBSERVER.HOST, |
338 | email: CONFIG.ADMIN.EMAIL, | 351 | email: CONFIG.ADMIN.EMAIL, |
@@ -371,15 +384,22 @@ function makeRequestsToWinningPods (cert, podsList, callback) { | |||
371 | requestScheduler.activate() | 384 | requestScheduler.activate() |
372 | 385 | ||
373 | logger.debug('makeRequestsToWinningPods finished.') | 386 | logger.debug('makeRequestsToWinningPods finished.') |
374 | return callback() | 387 | return callback(null) |
375 | }) | 388 | }) |
376 | } | 389 | } |
377 | 390 | ||
378 | // Wrapper that populate "toIds" argument with all our friends if it is not specified | 391 | // Wrapper that populate "toIds" argument with all our friends if it is not specified |
379 | // { type, endpoint, data, toIds, transaction } | 392 | type CreateRequestOptions = { |
380 | function createRequest (options, callback?) { | 393 | type: string |
394 | endpoint: string | ||
395 | data: Object | ||
396 | toIds?: number[] | ||
397 | transaction: Sequelize.Transaction | ||
398 | } | ||
399 | function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) { | ||
381 | if (!callback) callback = function () { /* empty */ } | 400 | if (!callback) callback = function () { /* empty */ } |
382 | if (options.toIds) return requestScheduler.createRequest(options, callback) | 401 | |
402 | if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback) | ||
383 | 403 | ||
384 | // If the "toIds" pods is not specified, we send the request to all our friends | 404 | // If the "toIds" pods is not specified, we send the request to all our friends |
385 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | 405 | db.Pod.listAllIds(options.transaction, function (err, podIds) { |
@@ -393,18 +413,18 @@ function createRequest (options, callback?) { | |||
393 | }) | 413 | }) |
394 | } | 414 | } |
395 | 415 | ||
396 | function createVideoQaduRequest (options, callback) { | 416 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { |
397 | if (!callback) callback = createEmptyCallback() | 417 | if (!callback) callback = createEmptyCallback() |
398 | 418 | ||
399 | requestVideoQaduScheduler.createRequest(options, callback) | 419 | requestVideoQaduScheduler.createRequest(options, callback) |
400 | } | 420 | } |
401 | 421 | ||
402 | function createVideoEventRequest (options, callback) { | 422 | function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { |
403 | if (!callback) callback = createEmptyCallback() | 423 | if (!callback) callback = createEmptyCallback() |
404 | 424 | ||
405 | requestVideoEventScheduler.createRequest(options, callback) | 425 | requestVideoEventScheduler.createRequest(options, callback) |
406 | } | 426 | } |
407 | 427 | ||
408 | function isMe (host) { | 428 | function isMe (host: string) { |
409 | return host === CONFIG.WEBSERVER.HOST | 429 | return host === CONFIG.WEBSERVER.HOST |
410 | } | 430 | } |
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts index ae5440031..7d0263b15 100644 --- a/server/lib/jobs/handlers/index.ts +++ b/server/lib/jobs/handlers/index.ts | |||
@@ -1,6 +1,14 @@ | |||
1 | import * as videoTranscoder from './video-transcoder' | 1 | import * as videoTranscoder from './video-transcoder' |
2 | 2 | ||
3 | const jobHandlers = { | 3 | import { VideoInstance } from '../../../models' |
4 | |||
5 | export interface JobHandler<T> { | ||
6 | process (data: object, callback: (err: Error, videoInstance?: T) => void) | ||
7 | onError (err: Error, jobId: number, video: T, callback: (err: Error) => void) | ||
8 | onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void) | ||
9 | } | ||
10 | |||
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | ||
4 | videoTranscoder | 12 | videoTranscoder |
5 | } | 13 | } |
6 | 14 | ||
diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts index 43599356a..efa18ef2d 100644 --- a/server/lib/jobs/handlers/video-transcoder.ts +++ b/server/lib/jobs/handlers/video-transcoder.ts | |||
@@ -1,8 +1,9 @@ | |||
1 | import { database as db } from '../../../initializers/database' | 1 | import { database as db } from '../../../initializers/database' |
2 | import { logger } from '../../../helpers' | 2 | import { logger } from '../../../helpers' |
3 | import { addVideoToFriends } from '../../../lib' | 3 | import { addVideoToFriends } from '../../../lib' |
4 | import { VideoInstance } from '../../../models' | ||
4 | 5 | ||
5 | function process (data, callback) { | 6 | function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) { |
6 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { | 7 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { |
7 | if (err) return callback(err) | 8 | if (err) return callback(err) |
8 | 9 | ||
@@ -12,12 +13,12 @@ function process (data, callback) { | |||
12 | }) | 13 | }) |
13 | } | 14 | } |
14 | 15 | ||
15 | function onError (err, jobId, video, callback) { | 16 | function onError (err: Error, jobId: number, video: VideoInstance, callback: () => void) { |
16 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) | 17 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) |
17 | return callback() | 18 | return callback() |
18 | } | 19 | } |
19 | 20 | ||
20 | function onSuccess (data, jobId, video, callback) { | 21 | function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) { |
21 | logger.info('Job %d is a success.', jobId) | 22 | logger.info('Job %d is a success.', jobId) |
22 | 23 | ||
23 | video.toAddRemoteJSON(function (err, remoteVideo) { | 24 | video.toAddRemoteJSON(function (err, remoteVideo) { |
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index ad5f7f6d9..2f01387e7 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { forever, queue } from 'async' | 1 | import { forever, queue } from 'async' |
2 | import * as Sequelize from 'sequelize' | ||
2 | 3 | ||
3 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
4 | import { | 5 | import { |
@@ -7,7 +8,10 @@ import { | |||
7 | JOB_STATES | 8 | JOB_STATES |
8 | } from '../../initializers' | 9 | } from '../../initializers' |
9 | import { logger } from '../../helpers' | 10 | import { logger } from '../../helpers' |
10 | import { jobHandlers } from './handlers' | 11 | import { JobInstance } from '../../models' |
12 | import { JobHandler, jobHandlers } from './handlers' | ||
13 | |||
14 | type JobQueueCallback = (err: Error) => void | ||
11 | 15 | ||
12 | class JobScheduler { | 16 | class JobScheduler { |
13 | 17 | ||
@@ -24,7 +28,7 @@ class JobScheduler { | |||
24 | 28 | ||
25 | logger.info('Jobs scheduler activated.') | 29 | logger.info('Jobs scheduler activated.') |
26 | 30 | ||
27 | const jobsQueue = queue(this.processJob.bind(this)) | 31 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) |
28 | 32 | ||
29 | // Finish processing jobs from a previous start | 33 | // Finish processing jobs from a previous start |
30 | const state = JOB_STATES.PROCESSING | 34 | const state = JOB_STATES.PROCESSING |
@@ -58,7 +62,7 @@ class JobScheduler { | |||
58 | }) | 62 | }) |
59 | } | 63 | } |
60 | 64 | ||
61 | createJob (transaction, handlerName: string, handlerInputData: object, callback) { | 65 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { |
62 | const createQuery = { | 66 | const createQuery = { |
63 | state: JOB_STATES.PENDING, | 67 | state: JOB_STATES.PENDING, |
64 | handlerName, | 68 | handlerName, |
@@ -69,7 +73,7 @@ class JobScheduler { | |||
69 | db.Job.create(createQuery, options).asCallback(callback) | 73 | db.Job.create(createQuery, options).asCallback(callback) |
70 | } | 74 | } |
71 | 75 | ||
72 | private enqueueJobs (err, jobsQueue, jobs) { | 76 | private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { |
73 | if (err) { | 77 | if (err) { |
74 | logger.error('Cannot list pending jobs.', { error: err }) | 78 | logger.error('Cannot list pending jobs.', { error: err }) |
75 | } else { | 79 | } else { |
@@ -79,7 +83,7 @@ class JobScheduler { | |||
79 | } | 83 | } |
80 | } | 84 | } |
81 | 85 | ||
82 | private processJob (job, callback) { | 86 | private processJob (job: JobInstance, callback: (err: Error) => void) { |
83 | const jobHandler = jobHandlers[job.handlerName] | 87 | const jobHandler = jobHandlers[job.handlerName] |
84 | 88 | ||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | 89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) |
@@ -89,8 +93,8 @@ class JobScheduler { | |||
89 | if (err) return this.cannotSaveJobError(err, callback) | 93 | if (err) return this.cannotSaveJobError(err, callback) |
90 | 94 | ||
91 | if (jobHandler === undefined) { | 95 | if (jobHandler === undefined) { |
92 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | 96 | logger.error('Unknown job handler for job %s.', job.handlerName) |
93 | return callback() | 97 | return callback(null) |
94 | } | 98 | } |
95 | 99 | ||
96 | return jobHandler.process(job.handlerInputData, (err, result) => { | 100 | return jobHandler.process(job.handlerInputData, (err, result) => { |
@@ -104,7 +108,7 @@ class JobScheduler { | |||
104 | }) | 108 | }) |
105 | } | 109 | } |
106 | 110 | ||
107 | private onJobError (jobHandler, job, jobResult, callback) { | 111 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { |
108 | job.state = JOB_STATES.ERROR | 112 | job.state = JOB_STATES.ERROR |
109 | 113 | ||
110 | job.save().asCallback(err => { | 114 | job.save().asCallback(err => { |
@@ -114,7 +118,7 @@ class JobScheduler { | |||
114 | }) | 118 | }) |
115 | } | 119 | } |
116 | 120 | ||
117 | private onJobSuccess (jobHandler, job, jobResult, callback) { | 121 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { |
118 | job.state = JOB_STATES.SUCCESS | 122 | job.state = JOB_STATES.SUCCESS |
119 | 123 | ||
120 | job.save().asCallback(err => { | 124 | job.save().asCallback(err => { |
@@ -124,7 +128,7 @@ class JobScheduler { | |||
124 | }) | 128 | }) |
125 | } | 129 | } |
126 | 130 | ||
127 | private cannotSaveJobError (err, callback) { | 131 | private cannotSaveJobError (err: Error, callback: (err: Error) => void) { |
128 | logger.error('Cannot save new job state.', { error: err }) | 132 | logger.error('Cannot save new job state.', { error: err }) |
129 | return callback(err) | 133 | return callback(err) |
130 | } | 134 | } |
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts index 3bdf0f478..7cf42e94c 100644 --- a/server/lib/oauth-model.ts +++ b/server/lib/oauth-model.ts | |||
@@ -1,27 +1,30 @@ | |||
1 | import { OAuthClientInstance, UserInstance } from '../models' | ||
1 | import { database as db } from '../initializers/database' | 2 | import { database as db } from '../initializers/database' |
2 | import { logger } from '../helpers' | 3 | import { logger } from '../helpers' |
3 | 4 | ||
5 | type TokenInfo = { accessToken: string, refreshToken: string, accessTokenExpiresAt: Date, refreshTokenExpiresAt: Date } | ||
6 | |||
4 | // --------------------------------------------------------------------------- | 7 | // --------------------------------------------------------------------------- |
5 | 8 | ||
6 | function getAccessToken (bearerToken) { | 9 | function getAccessToken (bearerToken: string) { |
7 | logger.debug('Getting access token (bearerToken: ' + bearerToken + ').') | 10 | logger.debug('Getting access token (bearerToken: ' + bearerToken + ').') |
8 | 11 | ||
9 | return db.OAuthToken.getByTokenAndPopulateUser(bearerToken) | 12 | return db.OAuthToken.getByTokenAndPopulateUser(bearerToken) |
10 | } | 13 | } |
11 | 14 | ||
12 | function getClient (clientId, clientSecret) { | 15 | function getClient (clientId: string, clientSecret: string) { |
13 | logger.debug('Getting Client (clientId: ' + clientId + ', clientSecret: ' + clientSecret + ').') | 16 | logger.debug('Getting Client (clientId: ' + clientId + ', clientSecret: ' + clientSecret + ').') |
14 | 17 | ||
15 | return db.OAuthClient.getByIdAndSecret(clientId, clientSecret) | 18 | return db.OAuthClient.getByIdAndSecret(clientId, clientSecret) |
16 | } | 19 | } |
17 | 20 | ||
18 | function getRefreshToken (refreshToken) { | 21 | function getRefreshToken (refreshToken: string) { |
19 | logger.debug('Getting RefreshToken (refreshToken: ' + refreshToken + ').') | 22 | logger.debug('Getting RefreshToken (refreshToken: ' + refreshToken + ').') |
20 | 23 | ||
21 | return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) | 24 | return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) |
22 | } | 25 | } |
23 | 26 | ||
24 | function getUser (username, password) { | 27 | function getUser (username: string, password: string) { |
25 | logger.debug('Getting User (username: ' + username + ', password: ' + password + ').') | 28 | logger.debug('Getting User (username: ' + username + ', password: ' + password + ').') |
26 | 29 | ||
27 | return db.User.getByUsername(username).then(function (user) { | 30 | return db.User.getByUsername(username).then(function (user) { |
@@ -42,7 +45,7 @@ function getUser (username, password) { | |||
42 | }) | 45 | }) |
43 | } | 46 | } |
44 | 47 | ||
45 | function revokeToken (token) { | 48 | function revokeToken (token: TokenInfo) { |
46 | return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(function (tokenDB) { | 49 | return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(function (tokenDB) { |
47 | if (tokenDB) tokenDB.destroy() | 50 | if (tokenDB) tokenDB.destroy() |
48 | 51 | ||
@@ -60,7 +63,7 @@ function revokeToken (token) { | |||
60 | }) | 63 | }) |
61 | } | 64 | } |
62 | 65 | ||
63 | function saveToken (token, client, user) { | 66 | function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { |
64 | logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') | 67 | logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') |
65 | 68 | ||
66 | const tokenToCreate = { | 69 | const tokenToCreate = { |
diff --git a/server/lib/request/base-request-scheduler.ts b/server/lib/request/base-request-scheduler.ts index b7ef6abf9..26bdc2bff 100644 --- a/server/lib/request/base-request-scheduler.ts +++ b/server/lib/request/base-request-scheduler.ts | |||
@@ -2,6 +2,7 @@ import * as eachLimit from 'async/eachLimit' | |||
2 | 2 | ||
3 | import { database as db } from '../../initializers/database' | 3 | import { database as db } from '../../initializers/database' |
4 | import { logger, makeSecureRequest } from '../../helpers' | 4 | import { logger, makeSecureRequest } from '../../helpers' |
5 | import { PodInstance } from '../../models' | ||
5 | import { | 6 | import { |
6 | API_VERSION, | 7 | API_VERSION, |
7 | REQUESTS_IN_PARALLEL, | 8 | REQUESTS_IN_PARALLEL, |
@@ -9,11 +10,12 @@ import { | |||
9 | } from '../../initializers' | 10 | } from '../../initializers' |
10 | 11 | ||
11 | abstract class BaseRequestScheduler { | 12 | abstract class BaseRequestScheduler { |
13 | requestInterval: number | ||
14 | limitPods: number | ||
15 | limitPerPod: number | ||
16 | |||
12 | protected lastRequestTimestamp: number | 17 | protected lastRequestTimestamp: number |
13 | protected timer: NodeJS.Timer | 18 | protected timer: NodeJS.Timer |
14 | protected requestInterval: number | ||
15 | protected limitPods: number | ||
16 | protected limitPerPod: number | ||
17 | protected description: string | 19 | protected description: string |
18 | 20 | ||
19 | constructor () { | 21 | constructor () { |
@@ -53,24 +55,24 @@ abstract class BaseRequestScheduler { | |||
53 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | 55 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) |
54 | } | 56 | } |
55 | 57 | ||
56 | remainingRequestsCount (callback) { | 58 | remainingRequestsCount (callback: (err: Error, total: number) => void) { |
57 | return this.getRequestModel().countTotalRequests(callback) | 59 | return this.getRequestModel().countTotalRequests(callback) |
58 | } | 60 | } |
59 | 61 | ||
60 | flush (callback) { | 62 | flush (callback: (err: Error) => void) { |
61 | this.getRequestModel().removeAll(callback) | 63 | this.getRequestModel().removeAll(callback) |
62 | } | 64 | } |
63 | 65 | ||
64 | // --------------------------------------------------------------------------- | 66 | // --------------------------------------------------------------------------- |
65 | 67 | ||
66 | // Make a requests to friends of a certain type | 68 | // Make a requests to friends of a certain type |
67 | protected makeRequest (toPod, requestEndpoint, requestsToMake, callback) { | 69 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { |
68 | if (!callback) callback = function () { /* empty */ } | 70 | if (!callback) callback = function () { /* empty */ } |
69 | 71 | ||
70 | const params = { | 72 | const params = { |
71 | toPod: toPod, | 73 | toPod: toPod, |
72 | sign: true, // Prove our identity | 74 | sign: true, // Prove our identity |
73 | method: 'POST', | 75 | method: 'POST' as 'POST', |
74 | path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, | 76 | path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, |
75 | data: requestsToMake // Requests we need to make | 77 | data: requestsToMake // Requests we need to make |
76 | } | 78 | } |
diff --git a/server/lib/request/index.ts b/server/lib/request/index.ts index c98f956db..110d0ed78 100644 --- a/server/lib/request/index.ts +++ b/server/lib/request/index.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | export * from './base-request-scheduler' | ||
1 | export * from './request-scheduler' | 2 | export * from './request-scheduler' |
2 | export * from './request-video-event-scheduler' | 3 | export * from './request-video-event-scheduler' |
3 | export * from './request-video-qadu-scheduler' | 4 | export * from './request-video-qadu-scheduler' |
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 26ffbfb86..69d840eeb 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts | |||
@@ -1,3 +1,5 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
1 | import { database as db } from '../../initializers/database' | 3 | import { database as db } from '../../initializers/database' |
2 | import { BaseRequestScheduler } from './base-request-scheduler' | 4 | import { BaseRequestScheduler } from './base-request-scheduler' |
3 | import { logger } from '../../helpers' | 5 | import { logger } from '../../helpers' |
@@ -6,6 +8,14 @@ import { | |||
6 | REQUESTS_LIMIT_PER_POD | 8 | REQUESTS_LIMIT_PER_POD |
7 | } from '../../initializers' | 9 | } from '../../initializers' |
8 | 10 | ||
11 | export type RequestSchedulerOptions = { | ||
12 | type: string | ||
13 | endpoint: string | ||
14 | data: Object | ||
15 | toIds: number[] | ||
16 | transaction: Sequelize.Transaction | ||
17 | } | ||
18 | |||
9 | class RequestScheduler extends BaseRequestScheduler { | 19 | class RequestScheduler extends BaseRequestScheduler { |
10 | constructor () { | 20 | constructor () { |
11 | super() | 21 | super() |
@@ -25,7 +35,7 @@ class RequestScheduler extends BaseRequestScheduler { | |||
25 | return db.RequestToPod | 35 | return db.RequestToPod |
26 | } | 36 | } |
27 | 37 | ||
28 | buildRequestObjects (requests) { | 38 | buildRequestObjects (requests: { [ toPodId: number ]: any }) { |
29 | const requestsToMakeGrouped = {} | 39 | const requestsToMakeGrouped = {} |
30 | 40 | ||
31 | Object.keys(requests).forEach(toPodId => { | 41 | Object.keys(requests).forEach(toPodId => { |
@@ -51,14 +61,7 @@ class RequestScheduler extends BaseRequestScheduler { | |||
51 | return requestsToMakeGrouped | 61 | return requestsToMakeGrouped |
52 | } | 62 | } |
53 | 63 | ||
54 | // { type, endpoint, data, toIds, transaction } | 64 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions, callback: (err: Error) => void) { |
55 | createRequest (options, callback) { | ||
56 | const type = options.type | ||
57 | const endpoint = options.endpoint | ||
58 | const data = options.data | ||
59 | const toIds = options.toIds | ||
60 | const transaction = options.transaction | ||
61 | |||
62 | // TODO: check the setPods works | 65 | // TODO: check the setPods works |
63 | const podIds = [] | 66 | const podIds = [] |
64 | 67 | ||
@@ -77,7 +80,7 @@ class RequestScheduler extends BaseRequestScheduler { | |||
77 | } | 80 | } |
78 | } | 81 | } |
79 | 82 | ||
80 | const dbRequestOptions = { | 83 | const dbRequestOptions: Sequelize.CreateOptions = { |
81 | transaction | 84 | transaction |
82 | } | 85 | } |
83 | 86 | ||
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index bde50b1d3..9da82585e 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts | |||
@@ -1,3 +1,5 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
1 | import { database as db } from '../../initializers/database' | 3 | import { database as db } from '../../initializers/database' |
2 | import { BaseRequestScheduler } from './base-request-scheduler' | 4 | import { BaseRequestScheduler } from './base-request-scheduler' |
3 | import { | 5 | import { |
@@ -6,6 +8,13 @@ import { | |||
6 | REQUEST_VIDEO_EVENT_ENDPOINT | 8 | REQUEST_VIDEO_EVENT_ENDPOINT |
7 | } from '../../initializers' | 9 | } from '../../initializers' |
8 | 10 | ||
11 | export type RequestVideoEventSchedulerOptions = { | ||
12 | type: string | ||
13 | videoId: string | ||
14 | count?: number | ||
15 | transaction?: Sequelize.Transaction | ||
16 | } | ||
17 | |||
9 | class RequestVideoEventScheduler extends BaseRequestScheduler { | 18 | class RequestVideoEventScheduler extends BaseRequestScheduler { |
10 | constructor () { | 19 | constructor () { |
11 | super() | 20 | super() |
@@ -25,7 +34,7 @@ class RequestVideoEventScheduler extends BaseRequestScheduler { | |||
25 | return db.RequestVideoEvent | 34 | return db.RequestVideoEvent |
26 | } | 35 | } |
27 | 36 | ||
28 | buildRequestObjects (eventsToProcess) { | 37 | buildRequestObjects (eventsToProcess: { [ toPodId: number ]: any }[]) { |
29 | const requestsToMakeGrouped = {} | 38 | const requestsToMakeGrouped = {} |
30 | 39 | ||
31 | /* Example: | 40 | /* Example: |
@@ -87,16 +96,10 @@ class RequestVideoEventScheduler extends BaseRequestScheduler { | |||
87 | return requestsToMakeGrouped | 96 | return requestsToMakeGrouped |
88 | } | 97 | } |
89 | 98 | ||
90 | // { type, videoId, count?, transaction? } | 99 | createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { |
91 | createRequest (options, callback) { | ||
92 | const type = options.type | ||
93 | const videoId = options.videoId | ||
94 | const transaction = options.transaction | ||
95 | let count = options.count | ||
96 | |||
97 | if (count === undefined) count = 1 | 100 | if (count === undefined) count = 1 |
98 | 101 | ||
99 | const dbRequestOptions: { transaction?: any } = {} | 102 | const dbRequestOptions: Sequelize.CreateOptions = {} |
100 | if (transaction) dbRequestOptions.transaction = transaction | 103 | if (transaction) dbRequestOptions.transaction = transaction |
101 | 104 | ||
102 | const createQuery = { | 105 | const createQuery = { |
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index dab526088..436fd8e50 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts | |||
@@ -1,3 +1,5 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
1 | import { database as db } from '../../initializers/database' | 3 | import { database as db } from '../../initializers/database' |
2 | import { BaseRequestScheduler } from './base-request-scheduler' | 4 | import { BaseRequestScheduler } from './base-request-scheduler' |
3 | import { logger } from '../../helpers' | 5 | import { logger } from '../../helpers' |
@@ -8,6 +10,12 @@ import { | |||
8 | REQUEST_VIDEO_QADU_TYPES | 10 | REQUEST_VIDEO_QADU_TYPES |
9 | } from '../../initializers' | 11 | } from '../../initializers' |
10 | 12 | ||
13 | export type RequestVideoQaduSchedulerOptions = { | ||
14 | type: string | ||
15 | videoId: string | ||
16 | transaction?: Sequelize.Transaction | ||
17 | } | ||
18 | |||
11 | class RequestVideoQaduScheduler extends BaseRequestScheduler { | 19 | class RequestVideoQaduScheduler extends BaseRequestScheduler { |
12 | constructor () { | 20 | constructor () { |
13 | super() | 21 | super() |
@@ -27,7 +35,7 @@ class RequestVideoQaduScheduler extends BaseRequestScheduler { | |||
27 | return db.RequestVideoQadu | 35 | return db.RequestVideoQadu |
28 | } | 36 | } |
29 | 37 | ||
30 | buildRequestObjects (requests) { | 38 | buildRequestObjects (requests: { [ toPodId: number ]: any }[]) { |
31 | const requestsToMakeGrouped = {} | 39 | const requestsToMakeGrouped = {} |
32 | 40 | ||
33 | Object.keys(requests).forEach(toPodId => { | 41 | Object.keys(requests).forEach(toPodId => { |
@@ -96,17 +104,12 @@ class RequestVideoQaduScheduler extends BaseRequestScheduler { | |||
96 | return requestsToMakeGrouped | 104 | return requestsToMakeGrouped |
97 | } | 105 | } |
98 | 106 | ||
99 | // { type, videoId, transaction? } | 107 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { |
100 | createRequest (options, callback) { | 108 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} |
101 | const type = options.type | ||
102 | const videoId = options.videoId | ||
103 | const transaction = options.transaction | ||
104 | |||
105 | const dbRequestOptions: { transaction?: any } = {} | ||
106 | if (transaction) dbRequestOptions.transaction = transaction | 109 | if (transaction) dbRequestOptions.transaction = transaction |
107 | 110 | ||
108 | // Send the update to all our friends | 111 | // Send the update to all our friends |
109 | db.Pod.listAllIds(options.transaction, function (err, podIds) { | 112 | db.Pod.listAllIds(transaction, function (err, podIds) { |
110 | if (err) return callback(err) | 113 | if (err) return callback(err) |
111 | 114 | ||
112 | const queries = [] | 115 | const queries = [] |