aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/friends.ts (renamed from server/lib/friends.js)133
-rw-r--r--server/lib/index.ts4
-rw-r--r--server/lib/jobs/handlers/index.js7
-rw-r--r--server/lib/jobs/handlers/index.ts9
-rw-r--r--server/lib/jobs/handlers/video-transcoder.ts (renamed from server/lib/jobs/handlers/video-transcoder.js)22
-rw-r--r--server/lib/jobs/index.ts1
-rw-r--r--server/lib/jobs/job-scheduler.js129
-rw-r--r--server/lib/jobs/job-scheduler.ts137
-rw-r--r--server/lib/oauth-model.ts (renamed from server/lib/oauth-model.js)22
-rw-r--r--server/lib/request/base-request-scheduler.ts (renamed from server/lib/request/base-request-scheduler.js)64
-rw-r--r--server/lib/request/index.ts3
-rw-r--r--server/lib/request/request-scheduler.ts (renamed from server/lib/request/request-scheduler.js)25
-rw-r--r--server/lib/request/request-video-event-scheduler.ts (renamed from server/lib/request/request-video-event-scheduler.js)28
-rw-r--r--server/lib/request/request-video-qadu-scheduler.ts (renamed from server/lib/request/request-video-qadu-scheduler.js)37
14 files changed, 339 insertions, 282 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.ts
index 6dd32406c..b32783019 100644
--- a/server/lib/friends.js
+++ b/server/lib/friends.ts
@@ -1,48 +1,35 @@
1'use strict' 1import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2import request = require('request')
2 3
3const each = require('async/each')
4const eachLimit = require('async/eachLimit')
5const eachSeries = require('async/eachSeries')
6const series = require('async/series')
7const request = require('request')
8const waterfall = require('async/waterfall')
9
10const constants = require('../initializers/constants')
11const db = require('../initializers/database') 4const db = require('../initializers/database')
12const logger = require('../helpers/logger') 5import {
13const peertubeCrypto = require('../helpers/peertube-crypto') 6 API_VERSION,
14const requests = require('../helpers/requests') 7 CONFIG,
15const utils = require('../helpers/utils') 8 REQUESTS_IN_PARALLEL,
16const RequestScheduler = require('./request/request-scheduler') 9 REQUEST_ENDPOINTS,
17const RequestVideoQaduScheduler = require('./request/request-video-qadu-scheduler') 10 REQUEST_ENDPOINT_ACTIONS,
18const RequestVideoEventScheduler = require('./request/request-video-event-scheduler') 11 REMOTE_SCHEME
19 12} from '../initializers'
20const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS] 13import {
14 logger,
15 getMyPublicCert,
16 makeSecureRequest,
17 makeRetryRequest,
18 createEmptyCallback
19} from '../helpers'
20import {
21 RequestScheduler,
22 RequestVideoQaduScheduler,
23 RequestVideoEventScheduler
24} from './request'
25
26const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
21 27
22const requestScheduler = new RequestScheduler() 28const requestScheduler = new RequestScheduler()
23const requestVideoQaduScheduler = new RequestVideoQaduScheduler() 29const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
24const requestVideoEventScheduler = new RequestVideoEventScheduler() 30const requestVideoEventScheduler = new RequestVideoEventScheduler()
25 31
26const friends = { 32function activateSchedulers () {
27 activate,
28 addVideoToFriends,
29 updateVideoToFriends,
30 reportAbuseVideoToFriend,
31 quickAndDirtyUpdateVideoToFriends,
32 quickAndDirtyUpdatesVideoToFriends,
33 addEventToRemoteVideo,
34 addEventsToRemoteVideo,
35 hasFriends,
36 makeFriends,
37 quitFriends,
38 removeVideoToFriends,
39 sendOwnedVideosToPod,
40 getRequestScheduler,
41 getRequestVideoQaduScheduler,
42 getRequestVideoEventScheduler
43}
44
45function activate () {
46 requestScheduler.activate() 33 requestScheduler.activate()
47 requestVideoQaduScheduler.activate() 34 requestVideoQaduScheduler.activate()
48 requestVideoEventScheduler.activate() 35 requestVideoEventScheduler.activate()
@@ -51,7 +38,7 @@ function activate () {
51function addVideoToFriends (videoData, transaction, callback) { 38function addVideoToFriends (videoData, transaction, callback) {
52 const options = { 39 const options = {
53 type: ENDPOINT_ACTIONS.ADD, 40 type: ENDPOINT_ACTIONS.ADD,
54 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, 41 endpoint: REQUEST_ENDPOINTS.VIDEOS,
55 data: videoData, 42 data: videoData,
56 transaction 43 transaction
57 } 44 }
@@ -61,7 +48,7 @@ function addVideoToFriends (videoData, transaction, callback) {
61function updateVideoToFriends (videoData, transaction, callback) { 48function updateVideoToFriends (videoData, transaction, callback) {
62 const options = { 49 const options = {
63 type: ENDPOINT_ACTIONS.UPDATE, 50 type: ENDPOINT_ACTIONS.UPDATE,
64 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, 51 endpoint: REQUEST_ENDPOINTS.VIDEOS,
65 data: videoData, 52 data: videoData,
66 transaction 53 transaction
67 } 54 }
@@ -71,7 +58,7 @@ function updateVideoToFriends (videoData, transaction, callback) {
71function removeVideoToFriends (videoParams) { 58function removeVideoToFriends (videoParams) {
72 const options = { 59 const options = {
73 type: ENDPOINT_ACTIONS.REMOVE, 60 type: ENDPOINT_ACTIONS.REMOVE,
74 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, 61 endpoint: REQUEST_ENDPOINTS.VIDEOS,
75 data: videoParams 62 data: videoParams
76 } 63 }
77 createRequest(options) 64 createRequest(options)
@@ -80,14 +67,14 @@ function removeVideoToFriends (videoParams) {
80function reportAbuseVideoToFriend (reportData, video) { 67function reportAbuseVideoToFriend (reportData, video) {
81 const options = { 68 const options = {
82 type: ENDPOINT_ACTIONS.REPORT_ABUSE, 69 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
83 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, 70 endpoint: REQUEST_ENDPOINTS.VIDEOS,
84 data: reportData, 71 data: reportData,
85 toIds: [ video.Author.podId ] 72 toIds: [ video.Author.podId ]
86 } 73 }
87 createRequest(options) 74 createRequest(options)
88} 75}
89 76
90function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction, callback) { 77function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction?, callback?) {
91 const options = { 78 const options = {
92 videoId: qaduParams.videoId, 79 videoId: qaduParams.videoId,
93 type: qaduParams.type, 80 type: qaduParams.type,
@@ -110,7 +97,7 @@ function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCall
110 series(tasks, finalCallback) 97 series(tasks, finalCallback)
111} 98}
112 99
113function addEventToRemoteVideo (eventParams, transaction, callback) { 100function addEventToRemoteVideo (eventParams, transaction?, callback?) {
114 const options = { 101 const options = {
115 videoId: eventParams.videoId, 102 videoId: eventParams.videoId,
116 type: eventParams.type, 103 type: eventParams.type,
@@ -146,7 +133,7 @@ function makeFriends (hosts, callback) {
146 const podsScore = {} 133 const podsScore = {}
147 134
148 logger.info('Make friends!') 135 logger.info('Make friends!')
149 peertubeCrypto.getMyPublicCert(function (err, cert) { 136 getMyPublicCert(function (err, cert) {
150 if (err) { 137 if (err) {
151 logger.error('Cannot read public cert.') 138 logger.error('Cannot read public cert.')
152 return callback(err) 139 return callback(err)
@@ -186,16 +173,17 @@ function quitFriends (callback) {
186 function announceIQuitMyFriends (pods, callbackAsync) { 173 function announceIQuitMyFriends (pods, callbackAsync) {
187 const requestParams = { 174 const requestParams = {
188 method: 'POST', 175 method: 'POST',
189 path: '/api/' + constants.API_VERSION + '/remote/pods/remove', 176 path: '/api/' + API_VERSION + '/remote/pods/remove',
190 sign: true 177 sign: true,
178 toPod: null
191 } 179 }
192 180
193 // Announce we quit them 181 // Announce we quit them
194 // We don't care if the request fails 182 // We don't care if the request fails
195 // The other pod will exclude us automatically after a while 183 // The other pod will exclude us automatically after a while
196 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 184 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
197 requestParams.toPod = pod 185 requestParams.toPod = pod
198 requests.makeSecureRequest(requestParams, callbackEach) 186 makeSecureRequest(requestParams, callbackEach)
199 }, function (err) { 187 }, function (err) {
200 if (err) { 188 if (err) {
201 logger.error('Some errors while quitting friends.', { err: err }) 189 logger.error('Some errors while quitting friends.', { err: err })
@@ -207,7 +195,7 @@ function quitFriends (callback) {
207 }, 195 },
208 196
209 function removePodsFromDB (pods, callbackAsync) { 197 function removePodsFromDB (pods, callbackAsync) {
210 each(pods, function (pod, callbackEach) { 198 each(pods, function (pod: any, callbackEach) {
211 pod.destroy().asCallback(callbackEach) 199 pod.destroy().asCallback(callbackEach)
212 }, callbackAsync) 200 }, callbackAsync)
213 } 201 }
@@ -239,7 +227,7 @@ function sendOwnedVideosToPod (podId) {
239 227
240 const options = { 228 const options = {
241 type: 'add', 229 type: 'add',
242 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS, 230 endpoint: REQUEST_ENDPOINTS.VIDEOS,
243 data: remoteVideo, 231 data: remoteVideo,
244 toIds: [ podId ] 232 toIds: [ podId ]
245 } 233 }
@@ -263,7 +251,24 @@ function getRequestVideoEventScheduler () {
263 251
264// --------------------------------------------------------------------------- 252// ---------------------------------------------------------------------------
265 253
266module.exports = friends 254export {
255 activateSchedulers,
256 addVideoToFriends,
257 updateVideoToFriends,
258 reportAbuseVideoToFriend,
259 quickAndDirtyUpdateVideoToFriends,
260 quickAndDirtyUpdatesVideoToFriends,
261 addEventToRemoteVideo,
262 addEventsToRemoteVideo,
263 hasFriends,
264 makeFriends,
265 quitFriends,
266 removeVideoToFriends,
267 sendOwnedVideosToPod,
268 getRequestScheduler,
269 getRequestVideoQaduScheduler,
270 getRequestVideoEventScheduler
271}
267 272
268// --------------------------------------------------------------------------- 273// ---------------------------------------------------------------------------
269 274
@@ -304,9 +309,9 @@ function computeWinningPods (hosts, podsScore) {
304} 309}
305 310
306function getForeignPodsList (host, callback) { 311function getForeignPodsList (host, callback) {
307 const path = '/api/' + constants.API_VERSION + '/pods' 312 const path = '/api/' + API_VERSION + '/pods'
308 313
309 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { 314 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
310 if (err) return callback(err) 315 if (err) return callback(err)
311 316
312 try { 317 try {
@@ -324,18 +329,18 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
324 // Flush pool requests 329 // Flush pool requests
325 requestScheduler.forceSend() 330 requestScheduler.forceSend()
326 331
327 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 332 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: any, callbackEach) {
328 const params = { 333 const params = {
329 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/', 334 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
330 method: 'POST', 335 method: 'POST',
331 json: { 336 json: {
332 host: constants.CONFIG.WEBSERVER.HOST, 337 host: CONFIG.WEBSERVER.HOST,
333 email: constants.CONFIG.ADMIN.EMAIL, 338 email: CONFIG.ADMIN.EMAIL,
334 publicKey: cert 339 publicKey: cert
335 } 340 }
336 } 341 }
337 342
338 requests.makeRetryRequest(params, function (err, res, body) { 343 makeRetryRequest(params, function (err, res, body) {
339 if (err) { 344 if (err) {
340 logger.error('Error with adding %s pod.', pod.host, { error: err }) 345 logger.error('Error with adding %s pod.', pod.host, { error: err })
341 // Don't break the process 346 // Don't break the process
@@ -372,8 +377,8 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
372 377
373// Wrapper that populate "toIds" argument with all our friends if it is not specified 378// Wrapper that populate "toIds" argument with all our friends if it is not specified
374// { type, endpoint, data, toIds, transaction } 379// { type, endpoint, data, toIds, transaction }
375function createRequest (options, callback) { 380function createRequest (options, callback?) {
376 if (!callback) callback = function () {} 381 if (!callback) callback = function () { /* empty */ }
377 if (options.toIds) return requestScheduler.createRequest(options, callback) 382 if (options.toIds) return requestScheduler.createRequest(options, callback)
378 383
379 // If the "toIds" pods is not specified, we send the request to all our friends 384 // If the "toIds" pods is not specified, we send the request to all our friends
@@ -389,17 +394,17 @@ function createRequest (options, callback) {
389} 394}
390 395
391function createVideoQaduRequest (options, callback) { 396function createVideoQaduRequest (options, callback) {
392 if (!callback) callback = utils.createEmptyCallback() 397 if (!callback) callback = createEmptyCallback()
393 398
394 requestVideoQaduScheduler.createRequest(options, callback) 399 requestVideoQaduScheduler.createRequest(options, callback)
395} 400}
396 401
397function createVideoEventRequest (options, callback) { 402function createVideoEventRequest (options, callback) {
398 if (!callback) callback = utils.createEmptyCallback() 403 if (!callback) callback = createEmptyCallback()
399 404
400 requestVideoEventScheduler.createRequest(options, callback) 405 requestVideoEventScheduler.createRequest(options, callback)
401} 406}
402 407
403function isMe (host) { 408function isMe (host) {
404 return host === constants.CONFIG.WEBSERVER.HOST 409 return host === CONFIG.WEBSERVER.HOST
405} 410}
diff --git a/server/lib/index.ts b/server/lib/index.ts
new file mode 100644
index 000000000..b8697fb96
--- /dev/null
+++ b/server/lib/index.ts
@@ -0,0 +1,4 @@
1export * from './jobs'
2export * from './request'
3export * from './friends'
4export * from './oauth-model'
diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js
deleted file mode 100644
index 59c1ccce5..000000000
--- a/server/lib/jobs/handlers/index.js
+++ /dev/null
@@ -1,7 +0,0 @@
1'use strict'
2
3const videoTranscoder = require('./video-transcoder')
4
5module.exports = {
6 videoTranscoder
7}
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts
new file mode 100644
index 000000000..ae5440031
--- /dev/null
+++ b/server/lib/jobs/handlers/index.ts
@@ -0,0 +1,9 @@
1import * as videoTranscoder from './video-transcoder'
2
3const jobHandlers = {
4 videoTranscoder
5}
6
7export {
8 jobHandlers
9}
diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.ts
index d2ad4f9c7..35db5fb96 100644
--- a/server/lib/jobs/handlers/video-transcoder.js
+++ b/server/lib/jobs/handlers/video-transcoder.ts
@@ -1,16 +1,6 @@
1'use strict'
2
3const db = require('../../../initializers/database') 1const db = require('../../../initializers/database')
4const logger = require('../../../helpers/logger') 2import { logger } from '../../../helpers'
5const friends = require('../../../lib/friends') 3import { addVideoToFriends } from '../../../lib'
6
7const VideoTranscoderHandler = {
8 process,
9 onError,
10 onSuccess
11}
12
13// ---------------------------------------------------------------------------
14 4
15function process (data, callback) { 5function process (data, callback) {
16 db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { 6 db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) {
@@ -34,10 +24,14 @@ function onSuccess (data, jobId, video, callback) {
34 if (err) return callback(err) 24 if (err) return callback(err)
35 25
36 // Now we'll add the video's meta data to our friends 26 // Now we'll add the video's meta data to our friends
37 friends.addVideoToFriends(remoteVideo, null, callback) 27 addVideoToFriends(remoteVideo, null, callback)
38 }) 28 })
39} 29}
40 30
41// --------------------------------------------------------------------------- 31// ---------------------------------------------------------------------------
42 32
43module.exports = VideoTranscoderHandler 33export {
34 process,
35 onError,
36 onSuccess
37}
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts
new file mode 100644
index 000000000..b18a3d845
--- /dev/null
+++ b/server/lib/jobs/index.ts
@@ -0,0 +1 @@
export * from './job-scheduler'
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
deleted file mode 100644
index 7b239577f..000000000
--- a/server/lib/jobs/job-scheduler.js
+++ /dev/null
@@ -1,129 +0,0 @@
1'use strict'
2
3const forever = require('async/forever')
4const queue = require('async/queue')
5
6const constants = require('../../initializers/constants')
7const db = require('../../initializers/database')
8const logger = require('../../helpers/logger')
9
10const jobHandlers = require('./handlers')
11
12const jobScheduler = {
13 activate,
14 createJob
15}
16
17function activate () {
18 const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
19
20 logger.info('Jobs scheduler activated.')
21
22 const jobsQueue = queue(processJob)
23
24 // Finish processing jobs from a previous start
25 const state = constants.JOB_STATES.PROCESSING
26 db.Job.listWithLimit(limit, state, function (err, jobs) {
27 enqueueJobs(err, jobsQueue, jobs)
28
29 forever(
30 function (next) {
31 if (jobsQueue.length() !== 0) {
32 // Finish processing the queue first
33 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
34 }
35
36 const state = constants.JOB_STATES.PENDING
37 db.Job.listWithLimit(limit, state, function (err, jobs) {
38 if (err) {
39 logger.error('Cannot list pending jobs.', { error: err })
40 } else {
41 jobs.forEach(function (job) {
42 jobsQueue.push(job)
43 })
44 }
45
46 // Optimization: we could use "drain" from queue object
47 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
48 })
49 }
50 )
51 })
52}
53
54// ---------------------------------------------------------------------------
55
56module.exports = jobScheduler
57
58// ---------------------------------------------------------------------------
59
60function enqueueJobs (err, jobsQueue, jobs) {
61 if (err) {
62 logger.error('Cannot list pending jobs.', { error: err })
63 } else {
64 jobs.forEach(function (job) {
65 jobsQueue.push(job)
66 })
67 }
68}
69
70function createJob (transaction, handlerName, handlerInputData, callback) {
71 const createQuery = {
72 state: constants.JOB_STATES.PENDING,
73 handlerName,
74 handlerInputData
75 }
76 const options = { transaction }
77
78 db.Job.create(createQuery, options).asCallback(callback)
79}
80
81function processJob (job, callback) {
82 const jobHandler = jobHandlers[job.handlerName]
83
84 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
85
86 job.state = constants.JOB_STATES.PROCESSING
87 job.save().asCallback(function (err) {
88 if (err) return cannotSaveJobError(err, callback)
89
90 if (jobHandler === undefined) {
91 logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
92 return callback()
93 }
94
95 return jobHandler.process(job.handlerInputData, function (err, result) {
96 if (err) {
97 logger.error('Error in job handler %s.', job.handlerName, { error: err })
98 return onJobError(jobHandler, job, result, callback)
99 }
100
101 return onJobSuccess(jobHandler, job, result, callback)
102 })
103 })
104}
105
106function onJobError (jobHandler, job, jobResult, callback) {
107 job.state = constants.JOB_STATES.ERROR
108
109 job.save().asCallback(function (err) {
110 if (err) return cannotSaveJobError(err, callback)
111
112 return jobHandler.onError(err, job.id, jobResult, callback)
113 })
114}
115
116function onJobSuccess (jobHandler, job, jobResult, callback) {
117 job.state = constants.JOB_STATES.SUCCESS
118
119 job.save().asCallback(function (err) {
120 if (err) return cannotSaveJobError(err, callback)
121
122 return jobHandler.onSuccess(err, job.id, jobResult, callback)
123 })
124}
125
126function cannotSaveJobError (err, callback) {
127 logger.error('Cannot save new job state.', { error: err })
128 return callback(err)
129}
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
new file mode 100644
index 000000000..7b8c6faf9
--- /dev/null
+++ b/server/lib/jobs/job-scheduler.ts
@@ -0,0 +1,137 @@
1import { forever, queue } from 'async'
2
3const db = require('../../initializers/database')
4import {
5 JOBS_FETCHING_INTERVAL,
6 JOBS_FETCH_LIMIT_PER_CYCLE,
7 JOB_STATES
8} from '../../initializers'
9import { logger } from '../../helpers'
10import { jobHandlers } from './handlers'
11
12class JobScheduler {
13
14 private static instance: JobScheduler
15
16 private constructor () { }
17
18 static get Instance () {
19 return this.instance || (this.instance = new this())
20 }
21
22 activate () {
23 const limit = JOBS_FETCH_LIMIT_PER_CYCLE
24
25 logger.info('Jobs scheduler activated.')
26
27 const jobsQueue = queue(this.processJob)
28
29 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING
31 db.Job.listWithLimit(limit, state, (err, jobs) => {
32 this.enqueueJobs(err, jobsQueue, jobs)
33
34 forever(
35 next => {
36 if (jobsQueue.length() !== 0) {
37 // Finish processing the queue first
38 return setTimeout(next, JOBS_FETCHING_INTERVAL)
39 }
40
41 const state = JOB_STATES.PENDING
42 db.Job.listWithLimit(limit, state, (err, jobs) => {
43 if (err) {
44 logger.error('Cannot list pending jobs.', { error: err })
45 } else {
46 jobs.forEach(job => {
47 jobsQueue.push(job)
48 })
49 }
50
51 // Optimization: we could use "drain" from queue object
52 return setTimeout(next, JOBS_FETCHING_INTERVAL)
53 })
54 },
55
56 err => { logger.error('Error in job scheduler queue.', { error: err }) }
57 )
58 })
59 }
60
61 createJob (transaction, handlerName, handlerInputData, callback) {
62 const createQuery = {
63 state: JOB_STATES.PENDING,
64 handlerName,
65 handlerInputData
66 }
67 const options = { transaction }
68
69 db.Job.create(createQuery, options).asCallback(callback)
70 }
71
72 private enqueueJobs (err, jobsQueue, jobs) {
73 if (err) {
74 logger.error('Cannot list pending jobs.', { error: err })
75 } else {
76 jobs.forEach(job => {
77 jobsQueue.push(job)
78 })
79 }
80 }
81
82 private processJob (job, callback) {
83 const jobHandler = jobHandlers[job.handlerName]
84
85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
86
87 job.state = JOB_STATES.PROCESSING
88 job.save().asCallback(err => {
89 if (err) return this.cannotSaveJobError(err, callback)
90
91 if (jobHandler === undefined) {
92 logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
93 return callback()
94 }
95
96 return jobHandler.process(job.handlerInputData, (err, result) => {
97 if (err) {
98 logger.error('Error in job handler %s.', job.handlerName, { error: err })
99 return this.onJobError(jobHandler, job, result, callback)
100 }
101
102 return this.onJobSuccess(jobHandler, job, result, callback)
103 })
104 })
105 }
106
107 private onJobError (jobHandler, job, jobResult, callback) {
108 job.state = JOB_STATES.ERROR
109
110 job.save().asCallback(err => {
111 if (err) return this.cannotSaveJobError(err, callback)
112
113 return jobHandler.onError(err, job.id, jobResult, callback)
114 })
115 }
116
117 private onJobSuccess (jobHandler, job, jobResult, callback) {
118 job.state = JOB_STATES.SUCCESS
119
120 job.save().asCallback(err => {
121 if (err) return this.cannotSaveJobError(err, callback)
122
123 return jobHandler.onSuccess(err, job.id, jobResult, callback)
124 })
125 }
126
127 private cannotSaveJobError (err, callback) {
128 logger.error('Cannot save new job state.', { error: err })
129 return callback(err)
130 }
131}
132
133// ---------------------------------------------------------------------------
134
135export {
136 JobScheduler
137}
diff --git a/server/lib/oauth-model.js b/server/lib/oauth-model.ts
index 1c12f1b14..00b1afcf5 100644
--- a/server/lib/oauth-model.js
+++ b/server/lib/oauth-model.ts
@@ -1,15 +1,5 @@
1const db = require('../initializers/database') 1const db = require('../initializers/database')
2const logger = require('../helpers/logger') 2import { logger } from '../helpers'
3
4// See https://github.com/oauthjs/node-oauth2-server/wiki/Model-specification for the model specifications
5const OAuthModel = {
6 getAccessToken,
7 getClient,
8 getRefreshToken,
9 getUser,
10 revokeToken,
11 saveToken
12}
13 3
14// --------------------------------------------------------------------------- 4// ---------------------------------------------------------------------------
15 5
@@ -94,4 +84,12 @@ function saveToken (token, client, user) {
94 84
95// --------------------------------------------------------------------------- 85// ---------------------------------------------------------------------------
96 86
97module.exports = OAuthModel 87// See https://github.com/oauthjs/node-oauth2-server/wiki/Model-specification for the model specifications
88export {
89 getAccessToken,
90 getClient,
91 getRefreshToken,
92 getUser,
93 revokeToken,
94 saveToken
95}
diff --git a/server/lib/request/base-request-scheduler.js b/server/lib/request/base-request-scheduler.ts
index 782448340..7fc88b5f1 100644
--- a/server/lib/request/base-request-scheduler.js
+++ b/server/lib/request/base-request-scheduler.ts
@@ -1,19 +1,31 @@
1'use strict' 1import { eachLimit } from 'async/eachLimit'
2 2
3const eachLimit = require('async/eachLimit')
4
5const constants = require('../../initializers/constants')
6const db = require('../../initializers/database') 3const db = require('../../initializers/database')
7const logger = require('../../helpers/logger') 4import { logger, makeSecureRequest } from '../../helpers'
8const requests = require('../../helpers/requests') 5import {
9 6 API_VERSION,
10module.exports = class BaseRequestScheduler { 7 REQUESTS_IN_PARALLEL,
11 constructor (options) { 8 REQUESTS_INTERVAL
9} from '../../initializers'
10
11abstract class BaseRequestScheduler {
12 protected lastRequestTimestamp: number
13 protected timer: NodeJS.Timer
14 protected requestInterval: number
15 protected limitPods: number
16 protected limitPerPod: number
17 protected description: string
18
19 constructor () {
12 this.lastRequestTimestamp = 0 20 this.lastRequestTimestamp = 0
13 this.timer = null 21 this.timer = null
14 this.requestInterval = constants.REQUESTS_INTERVAL 22 this.requestInterval = REQUESTS_INTERVAL
15 } 23 }
16 24
25 abstract getRequestModel ()
26 abstract getRequestToPodModel ()
27 abstract buildRequestObjects (requests: any)
28
17 activate () { 29 activate () {
18 logger.info('Requests scheduler activated.') 30 logger.info('Requests scheduler activated.')
19 this.lastRequestTimestamp = Date.now() 31 this.lastRequestTimestamp = Date.now()
@@ -38,30 +50,34 @@ module.exports = class BaseRequestScheduler {
38 remainingMilliSeconds () { 50 remainingMilliSeconds () {
39 if (this.timer === null) return -1 51 if (this.timer === null) return -1
40 52
41 return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) 53 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
42 } 54 }
43 55
44 remainingRequestsCount (callback) { 56 remainingRequestsCount (callback) {
45 return this.getRequestModel().countTotalRequests(callback) 57 return this.getRequestModel().countTotalRequests(callback)
46 } 58 }
47 59
60 flush (callback) {
61 this.getRequestModel().removeAll(callback)
62 }
63
48 // --------------------------------------------------------------------------- 64 // ---------------------------------------------------------------------------
49 65
50 // Make a requests to friends of a certain type 66 // Make a requests to friends of a certain type
51 makeRequest (toPod, requestEndpoint, requestsToMake, callback) { 67 protected makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
52 if (!callback) callback = function () {} 68 if (!callback) callback = function () { /* empty */ }
53 69
54 const params = { 70 const params = {
55 toPod: toPod, 71 toPod: toPod,
56 sign: true, // Prove our identity 72 sign: true, // Prove our identity
57 method: 'POST', 73 method: 'POST',
58 path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, 74 path: '/api/' + API_VERSION + '/remote/' + requestEndpoint,
59 data: requestsToMake // Requests we need to make 75 data: requestsToMake // Requests we need to make
60 } 76 }
61 77
62 // Make multiple retry requests to all of pods 78 // Make multiple retry requests to all of pods
63 // The function fire some useful callbacks 79 // The function fire some useful callbacks
64 requests.makeSecureRequest(params, (err, res) => { 80 makeSecureRequest(params, (err, res) => {
65 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { 81 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
66 err = err ? err.message : 'Status code not 20x : ' + res.statusCode 82 err = err ? err.message : 'Status code not 20x : ' + res.statusCode
67 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) 83 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
@@ -74,7 +90,7 @@ module.exports = class BaseRequestScheduler {
74 } 90 }
75 91
76 // Make all the requests of the scheduler 92 // Make all the requests of the scheduler
77 makeRequests () { 93 protected makeRequests () {
78 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { 94 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
79 if (err) { 95 if (err) {
80 logger.error('Cannot get the list of "%s".', this.description, { err: err }) 96 logger.error('Cannot get the list of "%s".', this.description, { err: err })
@@ -95,7 +111,7 @@ module.exports = class BaseRequestScheduler {
95 const goodPods = [] 111 const goodPods = []
96 const badPods = [] 112 const badPods = []
97 113
98 eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { 114 eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
99 const requestToMake = requestsToMakeGrouped[hashKey] 115 const requestToMake = requestsToMakeGrouped[hashKey]
100 const toPod = requestToMake.toPod 116 const toPod = requestToMake.toPod
101 117
@@ -122,15 +138,17 @@ module.exports = class BaseRequestScheduler {
122 }) 138 })
123 } 139 }
124 140
125 flush (callback) { 141 protected afterRequestHook () {
126 this.getRequestModel().removeAll(callback)
127 }
128
129 afterRequestHook () {
130 // Nothing to do, let children reimplement it 142 // Nothing to do, let children reimplement it
131 } 143 }
132 144
133 afterRequestsHook () { 145 protected afterRequestsHook () {
134 // Nothing to do, let children reimplement it 146 // Nothing to do, let children reimplement it
135 } 147 }
136} 148}
149
150// ---------------------------------------------------------------------------
151
152export {
153 BaseRequestScheduler
154}
diff --git a/server/lib/request/index.ts b/server/lib/request/index.ts
new file mode 100644
index 000000000..c98f956db
--- /dev/null
+++ b/server/lib/request/index.ts
@@ -0,0 +1,3 @@
1export * from './request-scheduler'
2export * from './request-video-event-scheduler'
3export * from './request-video-qadu-scheduler'
diff --git a/server/lib/request/request-scheduler.js b/server/lib/request/request-scheduler.ts
index 555ec3e54..2006a6f03 100644
--- a/server/lib/request/request-scheduler.js
+++ b/server/lib/request/request-scheduler.ts
@@ -1,17 +1,18 @@
1'use strict'
2
3const constants = require('../../initializers/constants')
4const BaseRequestScheduler = require('./base-request-scheduler')
5const db = require('../../initializers/database') 1const db = require('../../initializers/database')
6const logger = require('../../helpers/logger') 2import { BaseRequestScheduler } from './base-request-scheduler'
7 3import { logger } from '../../helpers'
8module.exports = class RequestScheduler extends BaseRequestScheduler { 4import {
5 REQUESTS_LIMIT_PODS,
6 REQUESTS_LIMIT_PER_POD
7} from '../../initializers'
8
9class RequestScheduler extends BaseRequestScheduler {
9 constructor () { 10 constructor () {
10 super() 11 super()
11 12
12 // We limit the size of the requests 13 // We limit the size of the requests
13 this.limitPods = constants.REQUESTS_LIMIT_PODS 14 this.limitPods = REQUESTS_LIMIT_PODS
14 this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD 15 this.limitPerPod = REQUESTS_LIMIT_PER_POD
15 16
16 this.description = 'requests' 17 this.description = 'requests'
17 } 18 }
@@ -95,3 +96,9 @@ module.exports = class RequestScheduler extends BaseRequestScheduler {
95 }) 96 })
96 } 97 }
97} 98}
99
100// ---------------------------------------------------------------------------
101
102export {
103 RequestScheduler
104}
diff --git a/server/lib/request/request-video-event-scheduler.js b/server/lib/request/request-video-event-scheduler.ts
index e54d34f4a..6e5306c7d 100644
--- a/server/lib/request/request-video-event-scheduler.js
+++ b/server/lib/request/request-video-event-scheduler.ts
@@ -1,16 +1,18 @@
1'use strict'
2
3const BaseRequestScheduler = require('./base-request-scheduler')
4const constants = require('../../initializers/constants')
5const db = require('../../initializers/database') 1const db = require('../../initializers/database')
6 2import { BaseRequestScheduler } from './base-request-scheduler'
7module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler { 3import {
4 REQUESTS_VIDEO_EVENT_LIMIT_PODS,
5 REQUESTS_VIDEO_EVENT_LIMIT_PER_POD,
6 REQUEST_VIDEO_EVENT_ENDPOINT
7} from '../../initializers'
8
9class RequestVideoEventScheduler extends BaseRequestScheduler {
8 constructor () { 10 constructor () {
9 super() 11 super()
10 12
11 // We limit the size of the requests 13 // We limit the size of the requests
12 this.limitPods = constants.REQUESTS_VIDEO_EVENT_LIMIT_PODS 14 this.limitPods = REQUESTS_VIDEO_EVENT_LIMIT_PODS
13 this.limitPerPod = constants.REQUESTS_VIDEO_EVENT_LIMIT_PER_POD 15 this.limitPerPod = REQUESTS_VIDEO_EVENT_LIMIT_PER_POD
14 16
15 this.description = 'video event requests' 17 this.description = 'video event requests'
16 } 18 }
@@ -45,7 +47,7 @@ module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler {
45 if (!requestsToMakeGrouped[toPodId]) { 47 if (!requestsToMakeGrouped[toPodId]) {
46 requestsToMakeGrouped[toPodId] = { 48 requestsToMakeGrouped[toPodId] = {
47 toPod: eventToProcess.pod, 49 toPod: eventToProcess.pod,
48 endpoint: constants.REQUEST_VIDEO_EVENT_ENDPOINT, 50 endpoint: REQUEST_VIDEO_EVENT_ENDPOINT,
49 ids: [], // request ids, to delete them from the DB in the future 51 ids: [], // request ids, to delete them from the DB in the future
50 datas: [] // requests data 52 datas: [] // requests data
51 } 53 }
@@ -94,7 +96,7 @@ module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler {
94 96
95 if (count === undefined) count = 1 97 if (count === undefined) count = 1
96 98
97 const dbRequestOptions = {} 99 const dbRequestOptions: { transaction?: any } = {}
98 if (transaction) dbRequestOptions.transaction = transaction 100 if (transaction) dbRequestOptions.transaction = transaction
99 101
100 const createQuery = { 102 const createQuery = {
@@ -106,3 +108,9 @@ module.exports = class RequestVideoEventScheduler extends BaseRequestScheduler {
106 return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) 108 return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback)
107 } 109 }
108} 110}
111
112// ---------------------------------------------------------------------------
113
114export {
115 RequestVideoEventScheduler
116}
diff --git a/server/lib/request/request-video-qadu-scheduler.js b/server/lib/request/request-video-qadu-scheduler.ts
index 17402b556..d81822723 100644
--- a/server/lib/request/request-video-qadu-scheduler.js
+++ b/server/lib/request/request-video-qadu-scheduler.ts
@@ -1,17 +1,20 @@
1'use strict'
2
3const BaseRequestScheduler = require('./base-request-scheduler')
4const constants = require('../../initializers/constants')
5const db = require('../../initializers/database') 1const db = require('../../initializers/database')
6const logger = require('../../helpers/logger') 2import { BaseRequestScheduler } from './base-request-scheduler'
7 3import { logger } from '../../helpers'
8module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler { 4import {
5 REQUESTS_VIDEO_QADU_LIMIT_PODS,
6 REQUESTS_VIDEO_QADU_LIMIT_PER_POD,
7 REQUEST_VIDEO_QADU_ENDPOINT,
8 REQUEST_VIDEO_QADU_TYPES
9} from '../../initializers'
10
11class RequestVideoQaduScheduler extends BaseRequestScheduler {
9 constructor () { 12 constructor () {
10 super() 13 super()
11 14
12 // We limit the size of the requests 15 // We limit the size of the requests
13 this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS 16 this.limitPods = REQUESTS_VIDEO_QADU_LIMIT_PODS
14 this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PER_POD 17 this.limitPerPod = REQUESTS_VIDEO_QADU_LIMIT_PER_POD
15 18
16 this.description = 'video QADU requests' 19 this.description = 'video QADU requests'
17 } 20 }
@@ -37,7 +40,7 @@ module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
37 if (!requestsToMakeGrouped[hashKey]) { 40 if (!requestsToMakeGrouped[hashKey]) {
38 requestsToMakeGrouped[hashKey] = { 41 requestsToMakeGrouped[hashKey] = {
39 toPod: pod, 42 toPod: pod,
40 endpoint: constants.REQUEST_VIDEO_QADU_ENDPOINT, 43 endpoint: REQUEST_VIDEO_QADU_ENDPOINT,
41 ids: [], // request ids, to delete them from the DB in the future 44 ids: [], // request ids, to delete them from the DB in the future
42 datas: [], // requests data 45 datas: [], // requests data
43 videos: {} 46 videos: {}
@@ -49,15 +52,15 @@ module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
49 if (!videoData) videoData = {} 52 if (!videoData) videoData = {}
50 53
51 switch (request.type) { 54 switch (request.type) {
52 case constants.REQUEST_VIDEO_QADU_TYPES.LIKES: 55 case REQUEST_VIDEO_QADU_TYPES.LIKES:
53 videoData.likes = video.likes 56 videoData.likes = video.likes
54 break 57 break
55 58
56 case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES: 59 case REQUEST_VIDEO_QADU_TYPES.DISLIKES:
57 videoData.dislikes = video.dislikes 60 videoData.dislikes = video.dislikes
58 break 61 break
59 62
60 case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS: 63 case REQUEST_VIDEO_QADU_TYPES.VIEWS:
61 videoData.views = video.views 64 videoData.views = video.views
62 break 65 break
63 66
@@ -99,7 +102,7 @@ module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
99 const videoId = options.videoId 102 const videoId = options.videoId
100 const transaction = options.transaction 103 const transaction = options.transaction
101 104
102 const dbRequestOptions = {} 105 const dbRequestOptions: { transaction?: any } = {}
103 if (transaction) dbRequestOptions.transaction = transaction 106 if (transaction) dbRequestOptions.transaction = transaction
104 107
105 // Send the update to all our friends 108 // Send the update to all our friends
@@ -115,3 +118,9 @@ module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
115 }) 118 })
116 } 119 }
117} 120}
121
122// ---------------------------------------------------------------------------
123
124export {
125 RequestVideoQaduScheduler
126}