aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-07-05 13:26:25 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-07-05 14:14:16 +0200
commit6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch)
tree3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib
parent5fe7e898316e18369c3e1aba307b55077adc7bfb (diff)
downloadPeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/friends.ts318
-rw-r--r--server/lib/jobs/handlers/index.ts8
-rw-r--r--server/lib/jobs/handlers/video-transcoder.ts22
-rw-r--r--server/lib/jobs/job-scheduler.ts117
-rw-r--r--server/lib/oauth-model.ts15
-rw-r--r--server/lib/request/abstract-request-scheduler.ts124
-rw-r--r--server/lib/request/request-scheduler.ts32
-rw-r--r--server/lib/request/request-video-event-scheduler.ts13
-rw-r--r--server/lib/request/request-video-qadu-scheduler.ts13
9 files changed, 293 insertions, 369 deletions
diff --git a/server/lib/friends.ts b/server/lib/friends.ts
index 522cb82b3..498144318 100644
--- a/server/lib/friends.ts
+++ b/server/lib/friends.ts
@@ -1,6 +1,6 @@
1import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2import * as request from 'request' 1import * as request from 'request'
3import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import * as Promise from 'bluebird'
4 4
5import { database as db } from '../initializers/database' 5import { database as db } from '../initializers/database'
6import { 6import {
@@ -15,8 +15,7 @@ import {
15 logger, 15 logger,
16 getMyPublicCert, 16 getMyPublicCert,
17 makeSecureRequest, 17 makeSecureRequest,
18 makeRetryRequest, 18 makeRetryRequest
19 createEmptyCallback
20} from '../helpers' 19} from '../helpers'
21import { 20import {
22 RequestScheduler, 21 RequestScheduler,
@@ -53,24 +52,24 @@ function activateSchedulers () {
53 requestVideoEventScheduler.activate() 52 requestVideoEventScheduler.activate()
54} 53}
55 54
56function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { 55function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) {
57 const options = { 56 const options = {
58 type: ENDPOINT_ACTIONS.ADD, 57 type: ENDPOINT_ACTIONS.ADD,
59 endpoint: REQUEST_ENDPOINTS.VIDEOS, 58 endpoint: REQUEST_ENDPOINTS.VIDEOS,
60 data: videoData, 59 data: videoData,
61 transaction 60 transaction
62 } 61 }
63 createRequest(options, callback) 62 return createRequest(options)
64} 63}
65 64
66function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) { 65function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) {
67 const options = { 66 const options = {
68 type: ENDPOINT_ACTIONS.UPDATE, 67 type: ENDPOINT_ACTIONS.UPDATE,
69 endpoint: REQUEST_ENDPOINTS.VIDEOS, 68 endpoint: REQUEST_ENDPOINTS.VIDEOS,
70 data: videoData, 69 data: videoData,
71 transaction 70 transaction
72 } 71 }
73 createRequest(options, callback) 72 return createRequest(options)
74} 73}
75 74
76function removeVideoToFriends (videoParams: Object) { 75function removeVideoToFriends (videoParams: Object) {
@@ -80,121 +79,93 @@ function removeVideoToFriends (videoParams: Object) {
80 data: videoParams, 79 data: videoParams,
81 transaction: null 80 transaction: null
82 } 81 }
83 createRequest(options) 82 return createRequest(options)
84} 83}
85 84
86function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) { 85function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance, transaction: Sequelize.Transaction) {
87 const options = { 86 const options = {
88 type: ENDPOINT_ACTIONS.REPORT_ABUSE, 87 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
89 endpoint: REQUEST_ENDPOINTS.VIDEOS, 88 endpoint: REQUEST_ENDPOINTS.VIDEOS,
90 data: reportData, 89 data: reportData,
91 toIds: [ video.Author.podId ], 90 toIds: [ video.Author.podId ],
92 transaction: null 91 transaction
93 } 92 }
94 createRequest(options) 93 return createRequest(options)
95} 94}
96 95
97function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { 96function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction) {
98 const options = { 97 const options = {
99 videoId: qaduParam.videoId, 98 videoId: qaduParam.videoId,
100 type: qaduParam.type, 99 type: qaduParam.type,
101 transaction 100 transaction
102 } 101 }
103 return createVideoQaduRequest(options, callback) 102 return createVideoQaduRequest(options)
104} 103}
105 104
106function quickAndDirtyUpdatesVideoToFriends ( 105function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction) {
107 qadusParams: QaduParam[],
108 transaction: Sequelize.Transaction,
109 finalCallback: (err: Error) => void
110) {
111 const tasks = [] 106 const tasks = []
112 107
113 qadusParams.forEach(function (qaduParams) { 108 qadusParams.forEach(function (qaduParams) {
114 const fun = function (callback) { 109 tasks.push(quickAndDirtyUpdateVideoToFriends(qaduParams, transaction))
115 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
116 }
117
118 tasks.push(fun)
119 }) 110 })
120 111
121 series(tasks, finalCallback) 112 return Promise.all(tasks)
122} 113}
123 114
124function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) { 115function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction) {
125 const options = { 116 const options = {
126 videoId: eventParam.videoId, 117 videoId: eventParam.videoId,
127 type: eventParam.type, 118 type: eventParam.type,
128 transaction 119 transaction
129 } 120 }
130 createVideoEventRequest(options, callback) 121 return createVideoEventRequest(options)
131} 122}
132 123
133function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) { 124function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
134 const tasks = [] 125 const tasks = []
135 126
136 eventsParams.forEach(function (eventParams) { 127 eventsParams.forEach(function (eventParams) {
137 const fun = function (callback) { 128 tasks.push(addEventToRemoteVideo(eventParams, transaction))
138 addEventToRemoteVideo(eventParams, transaction, callback)
139 }
140
141 tasks.push(fun)
142 }) 129 })
143 130
144 series(tasks, finalCallback) 131 return Promise.all(tasks)
145} 132}
146 133
147function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) { 134function hasFriends () {
148 db.Pod.countAll(function (err, count) { 135 return db.Pod.countAll().then(count => count !== 0)
149 if (err) return callback(err)
150
151 const hasFriends = (count !== 0)
152 callback(null, hasFriends)
153 })
154} 136}
155 137
156function makeFriends (hosts: string[], callback: (err: Error) => void) { 138function makeFriends (hosts: string[]) {
157 const podsScore = {} 139 const podsScore = {}
158 140
159 logger.info('Make friends!') 141 logger.info('Make friends!')
160 getMyPublicCert(function (err, cert) { 142 return getMyPublicCert()
161 if (err) { 143 .then(cert => {
162 logger.error('Cannot read public cert.') 144 return Promise.mapSeries(hosts, host => {
163 return callback(err) 145 return computeForeignPodsList(host, podsScore)
164 } 146 }).then(() => cert)
165 147 })
166 eachSeries(hosts, function (host, callbackEach) { 148 .then(cert => {
167 computeForeignPodsList(host, podsScore, callbackEach)
168 }, function (err: Error) {
169 if (err) return callback(err)
170
171 logger.debug('Pods scores computed.', { podsScore: podsScore }) 149 logger.debug('Pods scores computed.', { podsScore: podsScore })
172 const podsList = computeWinningPods(hosts, podsScore) 150 const podsList = computeWinningPods(hosts, podsScore)
173 logger.debug('Pods that we keep.', { podsToKeep: podsList }) 151 logger.debug('Pods that we keep.', { podsToKeep: podsList })
174 152
175 makeRequestsToWinningPods(cert, podsList, callback) 153 return makeRequestsToWinningPods(cert, podsList)
176 }) 154 })
177 })
178} 155}
179 156
180function quitFriends (callback: (err: Error) => void) { 157function quitFriends () {
181 // Stop pool requests 158 // Stop pool requests
182 requestScheduler.deactivate() 159 requestScheduler.deactivate()
183 160
184 waterfall([ 161 return requestScheduler.flush()
185 function flushRequests (callbackAsync) { 162 .then(() => {
186 requestScheduler.flush(err => callbackAsync(err)) 163 return requestVideoQaduScheduler.flush()
187 }, 164 })
188 165 .then(() => {
189 function flushVideoQaduRequests (callbackAsync) { 166 return db.Pod.list()
190 requestVideoQaduScheduler.flush(err => callbackAsync(err)) 167 })
191 }, 168 .then(pods => {
192
193 function getPodsList (callbackAsync) {
194 return db.Pod.list(callbackAsync)
195 },
196
197 function announceIQuitMyFriends (pods, callbackAsync) {
198 const requestParams = { 169 const requestParams = {
199 method: 'POST' as 'POST', 170 method: 'POST' as 'POST',
200 path: '/api/' + API_VERSION + '/remote/pods/remove', 171 path: '/api/' + API_VERSION + '/remote/pods/remove',
@@ -205,61 +176,57 @@ function quitFriends (callback: (err: Error) => void) {
205 // Announce we quit them 176 // Announce we quit them
206 // We don't care if the request fails 177 // We don't care if the request fails
207 // The other pod will exclude us automatically after a while 178 // The other pod will exclude us automatically after a while
208 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) { 179 return Promise.map(pods, pod => {
209 requestParams.toPod = pod 180 requestParams.toPod = pod
210 makeSecureRequest(requestParams, callbackEach) 181 return makeSecureRequest(requestParams)
211 }, function (err) { 182 }, { concurrency: REQUESTS_IN_PARALLEL })
212 if (err) { 183 .then(() => pods)
213 logger.error('Some errors while quitting friends.', { err: err }) 184 .catch(err => {
214 // Don't stop the process 185 logger.error('Some errors while quitting friends.', { err: err })
215 } 186 // Don't stop the process
216
217 return callbackAsync(null, pods)
218 }) 187 })
219 }, 188 })
220 189 .then(pods => {
221 function removePodsFromDB (pods, callbackAsync) { 190 const tasks = []
222 each(pods, function (pod: any, callbackEach) { 191 pods.forEach(pod => tasks.push(pod.destroy()))
223 pod.destroy().asCallback(callbackEach)
224 }, callbackAsync)
225 }
226 ], function (err: Error) {
227 // Don't forget to re activate the scheduler, even if there was an error
228 requestScheduler.activate()
229
230 if (err) return callback(err)
231 192
232 logger.info('Removed all remote videos.') 193 return Promise.all(pods)
233 return callback(null) 194 })
234 }) 195 .then(() => {
196 logger.info('Removed all remote videos.')
197 // Don't forget to re activate the scheduler, even if there was an error
198 return requestScheduler.activate()
199 })
200 .finally(() => requestScheduler.activate())
235} 201}
236 202
237function sendOwnedVideosToPod (podId: number) { 203function sendOwnedVideosToPod (podId: number) {
238 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) { 204 db.Video.listOwnedAndPopulateAuthorAndTags()
239 if (err) { 205 .then(videosList => {
240 logger.error('Cannot get the list of videos we own.') 206 const tasks = []
241 return 207 videosList.forEach(video => {
242 } 208 const promise = video.toAddRemoteJSON()
243 209 .then(remoteVideo => {
244 videosList.forEach(function (video) { 210 const options = {
245 video.toAddRemoteJSON(function (err, remoteVideo) { 211 type: 'add',
246 if (err) { 212 endpoint: REQUEST_ENDPOINTS.VIDEOS,
247 logger.error('Cannot convert video to remote.', { error: err }) 213 data: remoteVideo,
248 // Don't break the process 214 toIds: [ podId ],
249 return 215 transaction: null
250 } 216 }
251 217 return createRequest(options)
252 const options = { 218 })
253 type: 'add', 219 .catch(err => {
254 endpoint: REQUEST_ENDPOINTS.VIDEOS, 220 logger.error('Cannot convert video to remote.', { error: err })
255 data: remoteVideo, 221 // Don't break the process
256 toIds: [ podId ], 222 return undefined
257 transaction: null 223 })
258 } 224
259 createRequest(options) 225 tasks.push(promise)
260 }) 226 })
227
228 return Promise.all(tasks)
261 }) 229 })
262 })
263} 230}
264 231
265function getRequestScheduler () { 232function getRequestScheduler () {
@@ -297,23 +264,22 @@ export {
297 264
298// --------------------------------------------------------------------------- 265// ---------------------------------------------------------------------------
299 266
300function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) { 267function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
301 getForeignPodsList(host, function (err, res) { 268 // TODO: type res
302 if (err) return callback(err) 269 return getForeignPodsList(host).then((res: any) => {
303
304 const foreignPodsList = res.data 270 const foreignPodsList = res.data
305 271
306 // Let's give 1 point to the pod we ask the friends list 272 // Let's give 1 point to the pod we ask the friends list
307 foreignPodsList.push({ host }) 273 foreignPodsList.push({ host })
308 274
309 foreignPodsList.forEach(function (foreignPod) { 275 foreignPodsList.forEach(foreignPod => {
310 const foreignPodHost = foreignPod.host 276 const foreignPodHost = foreignPod.host
311 277
312 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ 278 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
313 else podsScore[foreignPodHost] = 1 279 else podsScore[foreignPodHost] = 1
314 }) 280 })
315 281
316 return callback(null) 282 return undefined
317 }) 283 })
318} 284}
319 285
@@ -323,7 +289,7 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
323 const podsList = [] 289 const podsList = []
324 const baseScore = hosts.length / 2 290 const baseScore = hosts.length / 2
325 291
326 Object.keys(podsScore).forEach(function (podHost) { 292 Object.keys(podsScore).forEach(podHost => {
327 // If the pod is not me and with a good score we add it 293 // If the pod is not me and with a good score we add it
328 if (isMe(podHost) === false && podsScore[podHost] > baseScore) { 294 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
329 podsList.push({ host: podHost }) 295 podsList.push({ host: podHost })
@@ -333,28 +299,30 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
333 return podsList 299 return podsList
334} 300}
335 301
336function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) { 302function getForeignPodsList (host: string) {
337 const path = '/api/' + API_VERSION + '/pods' 303 return new Promise((res, rej) => {
304 const path = '/api/' + API_VERSION + '/pods'
338 305
339 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) { 306 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
340 if (err) return callback(err) 307 if (err) return rej(err)
341 308
342 try { 309 try {
343 const json = JSON.parse(body) 310 const json = JSON.parse(body)
344 return callback(null, json) 311 return res(json)
345 } catch (err) { 312 } catch (err) {
346 return callback(err) 313 return rej(err)
347 } 314 }
315 })
348 }) 316 })
349} 317}
350 318
351function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) { 319function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
352 // Stop pool requests 320 // Stop pool requests
353 requestScheduler.deactivate() 321 requestScheduler.deactivate()
354 // Flush pool requests 322 // Flush pool requests
355 requestScheduler.forceSend() 323 requestScheduler.forceSend()
356 324
357 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) { 325 return Promise.map(podsList, pod => {
358 const params = { 326 const params = {
359 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/', 327 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
360 method: 'POST' as 'POST', 328 method: 'POST' as 'POST',
@@ -365,38 +333,35 @@ function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callb
365 } 333 }
366 } 334 }
367 335
368 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) { 336 return makeRetryRequest(params)
369 if (err) { 337 .then(({ response, body }) => {
370 logger.error('Error with adding %s pod.', pod.host, { error: err }) 338 body = body as { cert: string, email: string }
339
340 if (response.statusCode === 200) {
341 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
342 return podObj.save()
343 .then(podCreated => {
344
345 // Add our videos to the request scheduler
346 sendOwnedVideosToPod(podCreated.id)
347 })
348 .catch(err => {
349 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
350 })
351 } else {
352 logger.error('Status not 200 for %s pod.', pod.host)
353 }
354 })
355 .catch(err => {
356 logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
371 // Don't break the process 357 // Don't break the process
372 return callbackEach() 358 })
373 } 359 }, { concurrency: REQUESTS_IN_PARALLEL })
374 360 .then(() => logger.debug('makeRequestsToWinningPods finished.'))
375 if (res.statusCode === 200) { 361 .finally(() => {
376 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
377 podObj.save().asCallback(function (err, podCreated) {
378 if (err) {
379 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
380 return callbackEach()
381 }
382
383 // Add our videos to the request scheduler
384 sendOwnedVideosToPod(podCreated.id)
385
386 return callbackEach()
387 })
388 } else {
389 logger.error('Status not 200 for %s pod.', pod.host)
390 return callbackEach()
391 }
392 })
393 }, function endRequests () {
394 // Final callback, we've ended all the requests 362 // Final callback, we've ended all the requests
395 // Now we made new friends, we can re activate the pool of requests 363 // Now we made new friends, we can re activate the pool of requests
396 requestScheduler.activate() 364 requestScheduler.activate()
397
398 logger.debug('makeRequestsToWinningPods finished.')
399 return callback(null)
400 }) 365 })
401} 366}
402 367
@@ -408,33 +373,22 @@ type CreateRequestOptions = {
408 toIds?: number[] 373 toIds?: number[]
409 transaction: Sequelize.Transaction 374 transaction: Sequelize.Transaction
410} 375}
411function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) { 376function createRequest (options: CreateRequestOptions) {
412 if (!callback) callback = function () { /* empty */ } 377 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
413
414 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
415 378
416 // If the "toIds" pods is not specified, we send the request to all our friends 379 // If the "toIds" pods is not specified, we send the request to all our friends
417 db.Pod.listAllIds(options.transaction, function (err, podIds) { 380 return db.Pod.listAllIds(options.transaction).then(podIds => {
418 if (err) {
419 logger.error('Cannot get pod ids', { error: err })
420 return
421 }
422
423 const newOptions = Object.assign(options, { toIds: podIds }) 381 const newOptions = Object.assign(options, { toIds: podIds })
424 return requestScheduler.createRequest(newOptions, callback) 382 return requestScheduler.createRequest(newOptions)
425 }) 383 })
426} 384}
427 385
428function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { 386function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {
429 if (!callback) callback = createEmptyCallback() 387 return requestVideoQaduScheduler.createRequest(options)
430
431 requestVideoQaduScheduler.createRequest(options, callback)
432} 388}
433 389
434function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { 390function createVideoEventRequest (options: RequestVideoEventSchedulerOptions) {
435 if (!callback) callback = createEmptyCallback() 391 return requestVideoEventScheduler.createRequest(options)
436
437 requestVideoEventScheduler.createRequest(options, callback)
438} 392}
439 393
440function isMe (host: string) { 394function isMe (host: string) {
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts
index 7d0263b15..8abddae35 100644
--- a/server/lib/jobs/handlers/index.ts
+++ b/server/lib/jobs/handlers/index.ts
@@ -1,11 +1,9 @@
1import * as videoTranscoder from './video-transcoder' 1import * as videoTranscoder from './video-transcoder'
2 2
3import { VideoInstance } from '../../../models'
4
5export interface JobHandler<T> { 3export interface JobHandler<T> {
6 process (data: object, callback: (err: Error, videoInstance?: T) => void) 4 process (data: object): T
7 onError (err: Error, jobId: number, video: T, callback: (err: Error) => void) 5 onError (err: Error, jobId: number)
8 onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void) 6 onSuccess (jobId: number, jobResult: T)
9} 7}
10 8
11const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { 9const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = {
diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts
index 6f606a7d3..e829ca813 100644
--- a/server/lib/jobs/handlers/video-transcoder.ts
+++ b/server/lib/jobs/handlers/video-transcoder.ts
@@ -3,29 +3,23 @@ import { logger } from '../../../helpers'
3import { addVideoToFriends } from '../../../lib' 3import { addVideoToFriends } from '../../../lib'
4import { VideoInstance } from '../../../models' 4import { VideoInstance } from '../../../models'
5 5
6function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) { 6function process (data: { id: string }) {
7 db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { 7 return db.Video.loadAndPopulateAuthorAndPodAndTags(data.id).then(video => {
8 if (err) return callback(err) 8 return video.transcodeVideofile().then(() => video)
9
10 video.transcodeVideofile(function (err) {
11 return callback(err, video)
12 })
13 }) 9 })
14} 10}
15 11
16function onError (err: Error, jobId: number, video: VideoInstance, callback: (err: Error) => void) { 12function onError (err: Error, jobId: number) {
17 logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) 13 logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
18 return callback(null) 14 return Promise.resolve()
19} 15}
20 16
21function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) { 17function onSuccess (jobId: number, video: VideoInstance) {
22 logger.info('Job %d is a success.', jobId) 18 logger.info('Job %d is a success.', jobId)
23 19
24 video.toAddRemoteJSON(function (err, remoteVideo) { 20 video.toAddRemoteJSON().then(remoteVideo => {
25 if (err) return callback(err)
26
27 // Now we'll add the video's meta data to our friends 21 // Now we'll add the video's meta data to our friends
28 addVideoToFriends(remoteVideo, null, callback) 22 return addVideoToFriends(remoteVideo, null)
29 }) 23 })
30} 24}
31 25
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 2f01387e7..248dc7978 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -32,37 +32,35 @@ class JobScheduler {
32 32
33 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
35 db.Job.listWithLimit(limit, state, (err, jobs) => { 35 db.Job.listWithLimit(limit, state)
36 this.enqueueJobs(err, jobsQueue, jobs) 36 .then(jobs => {
37 37 this.enqueueJobs(jobsQueue, jobs)
38 forever( 38
39 next => { 39 forever(
40 if (jobsQueue.length() !== 0) { 40 next => {
41 // Finish processing the queue first 41 if (jobsQueue.length() !== 0) {
42 return setTimeout(next, JOBS_FETCHING_INTERVAL) 42 // Finish processing the queue first
43 } 43 return setTimeout(next, JOBS_FETCHING_INTERVAL)
44
45 const state = JOB_STATES.PENDING
46 db.Job.listWithLimit(limit, state, (err, jobs) => {
47 if (err) {
48 logger.error('Cannot list pending jobs.', { error: err })
49 } else {
50 jobs.forEach(job => {
51 jobsQueue.push(job)
52 })
53 } 44 }
54 45
55 // Optimization: we could use "drain" from queue object 46 const state = JOB_STATES.PENDING
56 return setTimeout(next, JOBS_FETCHING_INTERVAL) 47 db.Job.listWithLimit(limit, state)
57 }) 48 .then(jobs => {
58 }, 49 this.enqueueJobs(jobsQueue, jobs)
59 50
60 err => { logger.error('Error in job scheduler queue.', { error: err }) } 51 // Optimization: we could use "drain" from queue object
61 ) 52 return setTimeout(next, JOBS_FETCHING_INTERVAL)
62 }) 53 })
54 .catch(err => logger.error('Cannot list pending jobs.', { error: err }))
55 },
56
57 err => logger.error('Error in job scheduler queue.', { error: err })
58 )
59 })
60 .catch(err => logger.error('Cannot list pending jobs.', { error: err }))
63 } 61 }
64 62
65 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { 63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
66 const createQuery = { 64 const createQuery = {
67 state: JOB_STATES.PENDING, 65 state: JOB_STATES.PENDING,
68 handlerName, 66 handlerName,
@@ -70,67 +68,62 @@ class JobScheduler {
70 } 68 }
71 const options = { transaction } 69 const options = { transaction }
72 70
73 db.Job.create(createQuery, options).asCallback(callback) 71 return db.Job.create(createQuery, options)
74 } 72 }
75 73
76 private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { 74 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
77 if (err) { 75 jobs.forEach(job => jobsQueue.push(job))
78 logger.error('Cannot list pending jobs.', { error: err })
79 } else {
80 jobs.forEach(job => {
81 jobsQueue.push(job)
82 })
83 }
84 } 76 }
85 77
86 private processJob (job: JobInstance, callback: (err: Error) => void) { 78 private processJob (job: JobInstance, callback: (err: Error) => void) {
87 const jobHandler = jobHandlers[job.handlerName] 79 const jobHandler = jobHandlers[job.handlerName]
80 if (jobHandler === undefined) {
81 logger.error('Unknown job handler for job %s.', job.handlerName)
82 return callback(null)
83 }
88 84
89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
90 86
91 job.state = JOB_STATES.PROCESSING 87 job.state = JOB_STATES.PROCESSING
92 job.save().asCallback(err => { 88 return job.save()
93 if (err) return this.cannotSaveJobError(err, callback) 89 .then(() => {
94 90 return jobHandler.process(job.handlerInputData)
95 if (jobHandler === undefined) { 91 })
96 logger.error('Unknown job handler for job %s.', job.handlerName) 92 .then(
97 return callback(null) 93 result => {
98 } 94 return this.onJobSuccess(jobHandler, job, result)
95 },
99 96
100 return jobHandler.process(job.handlerInputData, (err, result) => { 97 err => {
101 if (err) {
102 logger.error('Error in job handler %s.', job.handlerName, { error: err }) 98 logger.error('Error in job handler %s.', job.handlerName, { error: err })
103 return this.onJobError(jobHandler, job, result, callback) 99 return this.onJobError(jobHandler, job, err)
104 } 100 }
105 101 )
106 return this.onJobSuccess(jobHandler, job, result, callback) 102 .then(() => callback(null))
103 .catch(err => {
104 this.cannotSaveJobError(err)
105 return callback(err)
107 }) 106 })
108 })
109 } 107 }
110 108
111 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { 109 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
112 job.state = JOB_STATES.ERROR 110 job.state = JOB_STATES.ERROR
113 111
114 job.save().asCallback(err => { 112 return job.save()
115 if (err) return this.cannotSaveJobError(err, callback) 113 .then(() => jobHandler.onError(err, job.id))
116 114 .catch(err => this.cannotSaveJobError(err))
117 return jobHandler.onError(err, job.id, jobResult, callback)
118 })
119 } 115 }
120 116
121 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { 117 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
122 job.state = JOB_STATES.SUCCESS 118 job.state = JOB_STATES.SUCCESS
123 119
124 job.save().asCallback(err => { 120 return job.save()
125 if (err) return this.cannotSaveJobError(err, callback) 121 .then(() => jobHandler.onSuccess(job.id, jobResult))
126 122 .catch(err => this.cannotSaveJobError(err))
127 return jobHandler.onSuccess(err, job.id, jobResult, callback)
128 })
129 } 123 }
130 124
131 private cannotSaveJobError (err: Error, callback: (err: Error) => void) { 125 private cannotSaveJobError (err: Error) {
132 logger.error('Cannot save new job state.', { error: err }) 126 logger.error('Cannot save new job state.', { error: err })
133 return callback(err)
134 } 127 }
135} 128}
136 129
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts
index 7cf42e94c..f34c9c667 100644
--- a/server/lib/oauth-model.ts
+++ b/server/lib/oauth-model.ts
@@ -30,17 +30,10 @@ function getUser (username: string, password: string) {
30 return db.User.getByUsername(username).then(function (user) { 30 return db.User.getByUsername(username).then(function (user) {
31 if (!user) return null 31 if (!user) return null
32 32
33 // We need to return a promise 33 return user.isPasswordMatch(password).then(passwordMatch => {
34 return new Promise(function (resolve, reject) { 34 if (passwordMatch === false) return null
35 return user.isPasswordMatch(password, function (err, isPasswordMatch) {
36 if (err) return reject(err)
37 35
38 if (isPasswordMatch === true) { 36 return user
39 return resolve(user)
40 }
41
42 return resolve(null)
43 })
44 }) 37 })
45 }) 38 })
46} 39}
@@ -80,8 +73,6 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns
80 tokenCreated.user = user 73 tokenCreated.user = user
81 74
82 return tokenCreated 75 return tokenCreated
83 }).catch(function (err) {
84 throw err
85 }) 76 })
86} 77}
87 78
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts
index e81ab9c36..dd77fddb7 100644
--- a/server/lib/request/abstract-request-scheduler.ts
+++ b/server/lib/request/abstract-request-scheduler.ts
@@ -1,15 +1,16 @@
1import * as eachLimit from 'async/eachLimit' 1import { isEmpty } from 'lodash'
2import * as Promise from 'bluebird'
2 3
3import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
4import { logger, makeSecureRequest } from '../../helpers' 5import { logger, makeSecureRequest } from '../../helpers'
5import { PodInstance } from '../../models' 6import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models'
6import { 7import {
7 API_VERSION, 8 API_VERSION,
8 REQUESTS_IN_PARALLEL, 9 REQUESTS_IN_PARALLEL,
9 REQUESTS_INTERVAL 10 REQUESTS_INTERVAL
10} from '../../initializers' 11} from '../../initializers'
11 12
12abstract class AbstractRequestScheduler { 13abstract class AbstractRequestScheduler <T> {
13 requestInterval: number 14 requestInterval: number
14 limitPods: number 15 limitPods: number
15 limitPerPod: number 16 limitPerPod: number
@@ -24,9 +25,9 @@ abstract class AbstractRequestScheduler {
24 this.requestInterval = REQUESTS_INTERVAL 25 this.requestInterval = REQUESTS_INTERVAL
25 } 26 }
26 27
27 abstract getRequestModel () 28 abstract getRequestModel (): AbstractRequestClass<T>
28 abstract getRequestToPodModel () 29 abstract getRequestToPodModel (): AbstractRequestToPodClass
29 abstract buildRequestObjects (requests: any) 30 abstract buildRequestObjects (requestsGrouped: T): {}
30 31
31 activate () { 32 activate () {
32 logger.info('Requests scheduler activated.') 33 logger.info('Requests scheduler activated.')
@@ -55,20 +56,18 @@ abstract class AbstractRequestScheduler {
55 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) 56 return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
56 } 57 }
57 58
58 remainingRequestsCount (callback: (err: Error, total: number) => void) { 59 remainingRequestsCount () {
59 return this.getRequestModel().countTotalRequests(callback) 60 return this.getRequestModel().countTotalRequests()
60 } 61 }
61 62
62 flush (callback: (err: Error) => void) { 63 flush () {
63 this.getRequestModel().removeAll(callback) 64 return this.getRequestModel().removeAll()
64 } 65 }
65 66
66 // --------------------------------------------------------------------------- 67 // ---------------------------------------------------------------------------
67 68
68 // Make a requests to friends of a certain type 69 // Make a requests to friends of a certain type
69 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object, callback) { 70 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: Object) {
70 if (!callback) callback = function () { /* empty */ }
71
72 const params = { 71 const params = {
73 toPod: toPod, 72 toPod: toPod,
74 sign: true, // Prove our identity 73 sign: true, // Prove our identity
@@ -79,65 +78,64 @@ abstract class AbstractRequestScheduler {
79 78
80 // Make multiple retry requests to all of pods 79 // Make multiple retry requests to all of pods
81 // The function fire some useful callbacks 80 // The function fire some useful callbacks
82 makeSecureRequest(params, (err, res) => { 81 return makeSecureRequest(params)
83 if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) { 82 .then(({ response, body }) => {
84 err = err ? err.message : 'Status code not 20x : ' + res.statusCode 83 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
84 throw new Error('Status code not 20x : ' + response.statusCode)
85 }
86 })
87 .catch(err => {
85 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err }) 88 logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
86 89
87 return callback(err) 90 throw err
88 } 91 })
89
90 return callback(null)
91 })
92 } 92 }
93 93
94 // Make all the requests of the scheduler 94 // Make all the requests of the scheduler
95 protected makeRequests () { 95 protected makeRequests () {
96 this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => { 96 return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
97 if (err) { 97 .then((requestsGrouped: T) => {
98 logger.error('Cannot get the list of "%s".', this.description, { err: err }) 98 // We want to group requests by destinations pod and endpoint
99 return // Abort 99 const requestsToMake = this.buildRequestObjects(requestsGrouped)
100 } 100
101 101 // If there are no requests, abort
102 // If there are no requests, abort 102 if (isEmpty(requestsToMake) === true) {
103 if (requests.length === 0) { 103 logger.info('No "%s" to make.', this.description)
104 logger.info('No "%s" to make.', this.description) 104 return { goodPods: [], badPods: [] }
105 return 105 }
106 } 106
107 107 logger.info('Making "%s" to friends.', this.description)
108 // We want to group requests by destinations pod and endpoint 108
109 const requestsToMakeGrouped = this.buildRequestObjects(requests) 109 const goodPods = []
110 110 const badPods = []
111 logger.info('Making "%s" to friends.', this.description) 111
112 112 return Promise.map(Object.keys(requestsToMake), hashKey => {
113 const goodPods = [] 113 const requestToMake = requestsToMake[hashKey]
114 const badPods = [] 114 const toPod: PodInstance = requestToMake.toPod
115 115
116 eachLimit(Object.keys(requestsToMakeGrouped), REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => { 116 return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
117 const requestToMake = requestsToMakeGrouped[hashKey] 117 .then(() => {
118 const toPod = requestToMake.toPod 118 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
119 119 goodPods.push(requestToMake.toPod.id)
120 this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (err) => { 120
121 if (err) { 121 this.afterRequestHook()
122 badPods.push(requestToMake.toPod.id) 122
123 return callbackEach() 123 // Remove the pod id of these request ids
124 } 124 return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
125 125 })
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) 126 .catch(err => {
127 goodPods.push(requestToMake.toPod.id) 127 badPods.push(requestToMake.toPod.id)
128 128 logger.info('Cannot make request to %s.', toPod.host, { error: err })
129 // Remove the pod id of these request ids 129 })
130 this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach) 130 }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
131 })
132 .then(({ goodPods, badPods }) => {
133 this.afterRequestsHook()
131 134
132 this.afterRequestHook()
133 })
134 }, () => {
135 // All the requests were made, we update the pods score 135 // All the requests were made, we update the pods score
136 db.Pod.updatePodsScore(goodPods, badPods) 136 return db.Pod.updatePodsScore(goodPods, badPods)
137
138 this.afterRequestsHook()
139 }) 137 })
140 }) 138 .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
141 } 139 }
142 140
143 protected afterRequestHook () { 141 protected afterRequestHook () {
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts
index 575e0227c..0dd796fb0 100644
--- a/server/lib/request/request-scheduler.ts
+++ b/server/lib/request/request-scheduler.ts
@@ -3,10 +3,8 @@ import * as Sequelize from 'sequelize'
3import { database as db } from '../../initializers/database' 3import { database as db } from '../../initializers/database'
4import { AbstractRequestScheduler } from './abstract-request-scheduler' 4import { AbstractRequestScheduler } from './abstract-request-scheduler'
5import { logger } from '../../helpers' 5import { logger } from '../../helpers'
6import { 6import { REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PER_POD } from '../../initializers'
7 REQUESTS_LIMIT_PODS, 7import { RequestsGrouped } from '../../models'
8 REQUESTS_LIMIT_PER_POD
9} from '../../initializers'
10import { RequestEndpoint } from '../../../shared' 8import { RequestEndpoint } from '../../../shared'
11 9
12export type RequestSchedulerOptions = { 10export type RequestSchedulerOptions = {
@@ -17,7 +15,7 @@ export type RequestSchedulerOptions = {
17 transaction: Sequelize.Transaction 15 transaction: Sequelize.Transaction
18} 16}
19 17
20class RequestScheduler extends AbstractRequestScheduler { 18class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
21 constructor () { 19 constructor () {
22 super() 20 super()
23 21
@@ -36,11 +34,11 @@ class RequestScheduler extends AbstractRequestScheduler {
36 return db.RequestToPod 34 return db.RequestToPod
37 } 35 }
38 36
39 buildRequestObjects (requests: { [ toPodId: number ]: any }) { 37 buildRequestObjects (requestsGrouped: RequestsGrouped) {
40 const requestsToMakeGrouped = {} 38 const requestsToMakeGrouped = {}
41 39
42 Object.keys(requests).forEach(toPodId => { 40 Object.keys(requestsGrouped).forEach(toPodId => {
43 requests[toPodId].forEach(data => { 41 requestsGrouped[toPodId].forEach(data => {
44 const request = data.request 42 const request = data.request
45 const pod = data.pod 43 const pod = data.pod
46 const hashKey = toPodId + request.endpoint 44 const hashKey = toPodId + request.endpoint
@@ -62,12 +60,12 @@ class RequestScheduler extends AbstractRequestScheduler {
62 return requestsToMakeGrouped 60 return requestsToMakeGrouped
63 } 61 }
64 62
65 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions, callback: (err: Error) => void) { 63 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
66 // TODO: check the setPods works 64 // TODO: check the setPods works
67 const podIds = [] 65 const podIds = []
68 66
69 // If there are no destination pods abort 67 // If there are no destination pods abort
70 if (toIds.length === 0) return callback(null) 68 if (toIds.length === 0) return undefined
71 69
72 toIds.forEach(toPod => { 70 toIds.forEach(toPod => {
73 podIds.push(toPod) 71 podIds.push(toPod)
@@ -85,20 +83,18 @@ class RequestScheduler extends AbstractRequestScheduler {
85 transaction 83 transaction
86 } 84 }
87 85
88 return db.Request.create(createQuery, dbRequestOptions).asCallback((err, request) => { 86 return db.Request.create(createQuery, dbRequestOptions)
89 if (err) return callback(err) 87 .then(request => {
90 88 return request.setPods(podIds, dbRequestOptions)
91 return request.setPods(podIds, dbRequestOptions).asCallback(callback) 89 })
92 })
93 } 90 }
94 91
95 // --------------------------------------------------------------------------- 92 // ---------------------------------------------------------------------------
96 93
97 afterRequestsHook () { 94 afterRequestsHook () {
98 // Flush requests with no pod 95 // Flush requests with no pod
99 this.getRequestModel().removeWithEmptyTo(err => { 96 this.getRequestModel().removeWithEmptyTo()
100 if (err) logger.error('Error when removing requests with no pods.', { error: err }) 97 .catch(err => logger.error('Error when removing requests with no pods.', { error: err }))
101 })
102 } 98 }
103} 99}
104 100
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts
index 4bb76f4c9..d4d714c02 100644
--- a/server/lib/request/request-video-event-scheduler.ts
+++ b/server/lib/request/request-video-event-scheduler.ts
@@ -7,6 +7,7 @@ import {
7 REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, 7 REQUESTS_VIDEO_EVENT_LIMIT_PER_POD,
8 REQUEST_VIDEO_EVENT_ENDPOINT 8 REQUEST_VIDEO_EVENT_ENDPOINT
9} from '../../initializers' 9} from '../../initializers'
10import { RequestsVideoEventGrouped } from '../../models'
10import { RequestVideoEventType } from '../../../shared' 11import { RequestVideoEventType } from '../../../shared'
11 12
12export type RequestVideoEventSchedulerOptions = { 13export type RequestVideoEventSchedulerOptions = {
@@ -16,7 +17,7 @@ export type RequestVideoEventSchedulerOptions = {
16 transaction?: Sequelize.Transaction 17 transaction?: Sequelize.Transaction
17} 18}
18 19
19class RequestVideoEventScheduler extends AbstractRequestScheduler { 20class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoEventGrouped> {
20 constructor () { 21 constructor () {
21 super() 22 super()
22 23
@@ -35,7 +36,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
35 return db.RequestVideoEvent 36 return db.RequestVideoEvent
36 } 37 }
37 38
38 buildRequestObjects (eventsToProcess: { [ toPodId: number ]: any }[]) { 39 buildRequestObjects (eventRequests: RequestsVideoEventGrouped) {
39 const requestsToMakeGrouped = {} 40 const requestsToMakeGrouped = {}
40 41
41 /* Example: 42 /* Example:
@@ -50,8 +51,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
50 51
51 // We group video events per video and per pod 52 // We group video events per video and per pod
52 // We add the counts of the same event types 53 // We add the counts of the same event types
53 Object.keys(eventsToProcess).forEach(toPodId => { 54 Object.keys(eventRequests).forEach(toPodId => {
54 eventsToProcess[toPodId].forEach(eventToProcess => { 55 eventRequests[toPodId].forEach(eventToProcess => {
55 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} 56 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
56 57
57 if (!requestsToMakeGrouped[toPodId]) { 58 if (!requestsToMakeGrouped[toPodId]) {
@@ -97,7 +98,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
97 return requestsToMakeGrouped 98 return requestsToMakeGrouped
98 } 99 }
99 100
100 createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) { 101 createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions) {
101 if (count === undefined) count = 1 102 if (count === undefined) count = 1
102 103
103 const dbRequestOptions: Sequelize.CreateOptions = {} 104 const dbRequestOptions: Sequelize.CreateOptions = {}
@@ -109,7 +110,7 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler {
109 videoId 110 videoId
110 } 111 }
111 112
112 return db.RequestVideoEvent.create(createQuery, dbRequestOptions).asCallback(callback) 113 return db.RequestVideoEvent.create(createQuery, dbRequestOptions)
113 } 114 }
114} 115}
115 116
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts
index d7169cc81..5ec7de9c2 100644
--- a/server/lib/request/request-video-qadu-scheduler.ts
+++ b/server/lib/request/request-video-qadu-scheduler.ts
@@ -9,6 +9,7 @@ import {
9 REQUEST_VIDEO_QADU_ENDPOINT, 9 REQUEST_VIDEO_QADU_ENDPOINT,
10 REQUEST_VIDEO_QADU_TYPES 10 REQUEST_VIDEO_QADU_TYPES
11} from '../../initializers' 11} from '../../initializers'
12import { RequestsVideoQaduGrouped } from '../../models'
12import { RequestVideoQaduType } from '../../../shared' 13import { RequestVideoQaduType } from '../../../shared'
13 14
14export type RequestVideoQaduSchedulerOptions = { 15export type RequestVideoQaduSchedulerOptions = {
@@ -17,7 +18,7 @@ export type RequestVideoQaduSchedulerOptions = {
17 transaction?: Sequelize.Transaction 18 transaction?: Sequelize.Transaction
18} 19}
19 20
20class RequestVideoQaduScheduler extends AbstractRequestScheduler { 21class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQaduGrouped> {
21 constructor () { 22 constructor () {
22 super() 23 super()
23 24
@@ -36,7 +37,7 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler {
36 return db.RequestVideoQadu 37 return db.RequestVideoQadu
37 } 38 }
38 39
39 buildRequestObjects (requests: { [ toPodId: number ]: any }[]) { 40 buildRequestObjects (requests: RequestsVideoQaduGrouped) {
40 const requestsToMakeGrouped = {} 41 const requestsToMakeGrouped = {}
41 42
42 Object.keys(requests).forEach(toPodId => { 43 Object.keys(requests).forEach(toPodId => {
@@ -105,20 +106,18 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler {
105 return requestsToMakeGrouped 106 return requestsToMakeGrouped
106 } 107 }
107 108
108 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) { 109 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
109 const dbRequestOptions: Sequelize.BulkCreateOptions = {} 110 const dbRequestOptions: Sequelize.BulkCreateOptions = {}
110 if (transaction) dbRequestOptions.transaction = transaction 111 if (transaction) dbRequestOptions.transaction = transaction
111 112
112 // Send the update to all our friends 113 // Send the update to all our friends
113 db.Pod.listAllIds(transaction, function (err, podIds) { 114 return db.Pod.listAllIds(transaction).then(podIds => {
114 if (err) return callback(err)
115
116 const queries = [] 115 const queries = []
117 podIds.forEach(podId => { 116 podIds.forEach(podId => {
118 queries.push({ type, videoId, podId }) 117 queries.push({ type, videoId, podId })
119 }) 118 })
120 119
121 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback) 120 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
122 }) 121 })
123 } 122 }
124} 123}