diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-10-25 16:03:33 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-10-26 09:11:38 +0200 |
commit | f5028693a896a3076dd286ac0030e3d8f78f5ebf (patch) | |
tree | 09144ed6357e49ea575fb110247f933283ad235e /server/lib | |
parent | eb08047657e739bcd9e592d76307befa3998482b (diff) | |
download | PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.gz PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.zst PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.zip |
Use async/await in lib and initializers
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/cache/videos-preview-cache.ts | 15 | ||||
-rw-r--r-- | server/lib/friends.ts | 368 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-file-optimizer.ts | 88 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-file-transcoder.ts | 21 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 118 | ||||
-rw-r--r-- | server/lib/oauth-model.ts | 54 | ||||
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 121 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.ts | 16 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.ts | 20 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.ts | 34 | ||||
-rw-r--r-- | server/lib/user.ts | 52 |
11 files changed, 456 insertions, 451 deletions
diff --git a/server/lib/cache/videos-preview-cache.ts b/server/lib/cache/videos-preview-cache.ts index fecdca6ef..0fe4d2f78 100644 --- a/server/lib/cache/videos-preview-cache.ts +++ b/server/lib/cache/videos-preview-cache.ts | |||
@@ -1,7 +1,6 @@ | |||
1 | import * as asyncLRU from 'async-lru' | 1 | import * as asyncLRU from 'async-lru' |
2 | import { join } from 'path' | 2 | import { join } from 'path' |
3 | import { createWriteStream } from 'fs' | 3 | import { createWriteStream } from 'fs' |
4 | import * as Promise from 'bluebird' | ||
5 | 4 | ||
6 | import { database as db, CONFIG, CACHE } from '../../initializers' | 5 | import { database as db, CONFIG, CACHE } from '../../initializers' |
7 | import { logger, unlinkPromise } from '../../helpers' | 6 | import { logger, unlinkPromise } from '../../helpers' |
@@ -43,15 +42,15 @@ class VideosPreviewCache { | |||
43 | }) | 42 | }) |
44 | } | 43 | } |
45 | 44 | ||
46 | private loadPreviews (key: string) { | 45 | private async loadPreviews (key: string) { |
47 | return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) | 46 | const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) |
48 | .then(video => { | 47 | if (!video) return undefined |
49 | if (!video) return undefined | ||
50 | 48 | ||
51 | if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) | 49 | if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) |
52 | 50 | ||
53 | return this.saveRemotePreviewAndReturnPath(video) | 51 | const res = await this.saveRemotePreviewAndReturnPath(video) |
54 | }) | 52 | |
53 | return res | ||
55 | } | 54 | } |
56 | 55 | ||
57 | private saveRemotePreviewAndReturnPath (video: VideoInstance) { | 56 | private saveRemotePreviewAndReturnPath (video: VideoInstance) { |
diff --git a/server/lib/friends.ts b/server/lib/friends.ts index f035b099b..a33432dc1 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import * as request from 'request' | 1 | import * as request from 'request' |
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | import * as Promise from 'bluebird' | 3 | import * as Bluebird from 'bluebird' |
4 | import { join } from 'path' | 4 | import { join } from 'path' |
5 | 5 | ||
6 | import { database as db } from '../initializers/database' | 6 | import { database as db } from '../initializers/database' |
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize. | |||
188 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { | 188 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { |
189 | const tasks = [] | 189 | const tasks = [] |
190 | 190 | ||
191 | eventsParams.forEach(eventParams => { | 191 | for (const eventParams of eventsParams) { |
192 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) | 192 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) |
193 | }) | 193 | } |
194 | 194 | ||
195 | return Promise.all(tasks) | 195 | return Promise.all(tasks) |
196 | } | 196 | } |
197 | 197 | ||
198 | function hasFriends () { | 198 | async function hasFriends () { |
199 | return db.Pod.countAll().then(count => count !== 0) | 199 | const count = await db.Pod.countAll() |
200 | |||
201 | return count !== 0 | ||
200 | } | 202 | } |
201 | 203 | ||
202 | function makeFriends (hosts: string[]) { | 204 | async function makeFriends (hosts: string[]) { |
203 | const podsScore = {} | 205 | const podsScore = {} |
204 | 206 | ||
205 | logger.info('Make friends!') | 207 | logger.info('Make friends!') |
206 | return getMyPublicCert() | 208 | const cert = await getMyPublicCert() |
207 | .then(cert => { | ||
208 | return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert) | ||
209 | }) | ||
210 | .then(cert => { | ||
211 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | ||
212 | const podsList = computeWinningPods(hosts, podsScore) | ||
213 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | ||
214 | 209 | ||
215 | return makeRequestsToWinningPods(cert, podsList) | 210 | for (const host of hosts) { |
216 | }) | 211 | await computeForeignPodsList(host, podsScore) |
212 | } | ||
213 | |||
214 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | ||
215 | |||
216 | const podsList = computeWinningPods(hosts, podsScore) | ||
217 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | ||
218 | |||
219 | return makeRequestsToWinningPods(cert, podsList) | ||
217 | } | 220 | } |
218 | 221 | ||
219 | function quitFriends () { | 222 | async function quitFriends () { |
220 | // Stop pool requests | 223 | // Stop pool requests |
221 | requestScheduler.deactivate() | 224 | requestScheduler.deactivate() |
222 | 225 | ||
223 | return requestScheduler.flush() | 226 | try { |
224 | .then(() => { | 227 | await requestScheduler.flush() |
225 | return requestVideoQaduScheduler.flush() | 228 | |
226 | }) | 229 | await requestVideoQaduScheduler.flush() |
227 | .then(() => { | 230 | |
228 | return db.Pod.list() | 231 | const pods = await db.Pod.list() |
229 | }) | 232 | const requestParams = { |
230 | .then(pods => { | 233 | method: 'POST' as 'POST', |
231 | const requestParams = { | 234 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
232 | method: 'POST' as 'POST', | 235 | toPod: null |
233 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 236 | } |
234 | toPod: null | ||
235 | } | ||
236 | 237 | ||
237 | // Announce we quit them | 238 | // Announce we quit them |
238 | // We don't care if the request fails | 239 | // We don't care if the request fails |
239 | // The other pod will exclude us automatically after a while | 240 | // The other pod will exclude us automatically after a while |
240 | return Promise.map(pods, pod => { | 241 | try { |
242 | await Bluebird.map(pods, pod => { | ||
241 | requestParams.toPod = pod | 243 | requestParams.toPod = pod |
242 | 244 | ||
243 | return makeSecureRequest(requestParams) | 245 | return makeSecureRequest(requestParams) |
244 | }, { concurrency: REQUESTS_IN_PARALLEL }) | 246 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
245 | .then(() => pods) | 247 | } catch (err) { // Don't stop the process |
246 | .catch(err => { | 248 | logger.error('Some errors while quitting friends.', err) |
247 | logger.error('Some errors while quitting friends.', err) | 249 | } |
248 | // Don't stop the process | ||
249 | return pods | ||
250 | }) | ||
251 | }) | ||
252 | .then(pods => { | ||
253 | const tasks = [] | ||
254 | pods.forEach(pod => tasks.push(pod.destroy())) | ||
255 | 250 | ||
256 | return Promise.all(pods) | 251 | const tasks = [] |
257 | }) | 252 | for (const pod of pods) { |
258 | .then(() => { | 253 | tasks.push(pod.destroy()) |
259 | logger.info('Removed all remote videos.') | 254 | } |
260 | // Don't forget to re activate the scheduler, even if there was an error | 255 | await Promise.all(pods) |
261 | return requestScheduler.activate() | 256 | |
262 | }) | 257 | logger.info('Removed all remote videos.') |
263 | .finally(() => requestScheduler.activate()) | 258 | |
259 | requestScheduler.activate() | ||
260 | } catch (err) { | ||
261 | // Don't forget to re activate the scheduler, even if there was an error | ||
262 | requestScheduler.activate() | ||
263 | |||
264 | throw err | ||
265 | } | ||
264 | } | 266 | } |
265 | 267 | ||
266 | function sendOwnedDataToPod (podId: number) { | 268 | async function sendOwnedDataToPod (podId: number) { |
267 | // First send authors | 269 | // First send authors |
268 | return sendOwnedAuthorsToPod(podId) | 270 | await sendOwnedAuthorsToPod(podId) |
269 | .then(() => sendOwnedChannelsToPod(podId)) | 271 | await sendOwnedChannelsToPod(podId) |
270 | .then(() => sendOwnedVideosToPod(podId)) | 272 | await sendOwnedVideosToPod(podId) |
273 | } | ||
274 | |||
275 | async function sendOwnedChannelsToPod (podId: number) { | ||
276 | const videoChannels = await db.VideoChannel.listOwned() | ||
277 | |||
278 | const tasks: Promise<any>[] = [] | ||
279 | for (const videoChannel of videoChannels) { | ||
280 | const remoteVideoChannel = videoChannel.toAddRemoteJSON() | ||
281 | const options = { | ||
282 | type: 'add-channel' as 'add-channel', | ||
283 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
284 | data: remoteVideoChannel, | ||
285 | toIds: [ podId ], | ||
286 | transaction: null | ||
287 | } | ||
288 | |||
289 | const p = createRequest(options) | ||
290 | tasks.push(p) | ||
291 | } | ||
292 | |||
293 | await Promise.all(tasks) | ||
271 | } | 294 | } |
272 | 295 | ||
273 | function sendOwnedChannelsToPod (podId: number) { | 296 | async function sendOwnedAuthorsToPod (podId: number) { |
274 | return db.VideoChannel.listOwned() | 297 | const authors = await db.Author.listOwned() |
275 | .then(videoChannels => { | 298 | const tasks: Promise<any>[] = [] |
276 | const tasks = [] | ||
277 | videoChannels.forEach(videoChannel => { | ||
278 | const remoteVideoChannel = videoChannel.toAddRemoteJSON() | ||
279 | const options = { | ||
280 | type: 'add-channel' as 'add-channel', | ||
281 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
282 | data: remoteVideoChannel, | ||
283 | toIds: [ podId ], | ||
284 | transaction: null | ||
285 | } | ||
286 | 299 | ||
287 | const p = createRequest(options) | 300 | for (const author of authors) { |
288 | tasks.push(p) | 301 | const remoteAuthor = author.toAddRemoteJSON() |
289 | }) | 302 | const options = { |
303 | type: 'add-author' as 'add-author', | ||
304 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
305 | data: remoteAuthor, | ||
306 | toIds: [ podId ], | ||
307 | transaction: null | ||
308 | } | ||
290 | 309 | ||
291 | return Promise.all(tasks) | 310 | const p = createRequest(options) |
292 | }) | 311 | tasks.push(p) |
312 | } | ||
313 | |||
314 | await Promise.all(tasks) | ||
293 | } | 315 | } |
294 | 316 | ||
295 | function sendOwnedAuthorsToPod (podId: number) { | 317 | async function sendOwnedVideosToPod (podId: number) { |
296 | return db.Author.listOwned() | 318 | const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags() |
297 | .then(authors => { | 319 | const tasks: Bluebird<any>[] = [] |
298 | const tasks = [] | 320 | |
299 | authors.forEach(author => { | 321 | for (const video of videosList) { |
300 | const remoteAuthor = author.toAddRemoteJSON() | 322 | const promise = video.toAddRemoteJSON() |
323 | .then(remoteVideo => { | ||
301 | const options = { | 324 | const options = { |
302 | type: 'add-author' as 'add-author', | 325 | type: 'add-video' as 'add-video', |
303 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 326 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
304 | data: remoteAuthor, | 327 | data: remoteVideo, |
305 | toIds: [ podId ], | 328 | toIds: [ podId ], |
306 | transaction: null | 329 | transaction: null |
307 | } | 330 | } |
308 | 331 | return createRequest(options) | |
309 | const p = createRequest(options) | ||
310 | tasks.push(p) | ||
311 | }) | 332 | }) |
312 | 333 | .catch(err => { | |
313 | return Promise.all(tasks) | 334 | logger.error('Cannot convert video to remote.', err) |
314 | }) | 335 | // Don't break the process |
315 | } | 336 | return undefined |
316 | |||
317 | function sendOwnedVideosToPod (podId: number) { | ||
318 | return db.Video.listOwnedAndPopulateAuthorAndTags() | ||
319 | .then(videosList => { | ||
320 | const tasks = [] | ||
321 | videosList.forEach(video => { | ||
322 | const promise = video.toAddRemoteJSON() | ||
323 | .then(remoteVideo => { | ||
324 | const options = { | ||
325 | type: 'add-video' as 'add-video', | ||
326 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
327 | data: remoteVideo, | ||
328 | toIds: [ podId ], | ||
329 | transaction: null | ||
330 | } | ||
331 | return createRequest(options) | ||
332 | }) | ||
333 | .catch(err => { | ||
334 | logger.error('Cannot convert video to remote.', err) | ||
335 | // Don't break the process | ||
336 | return undefined | ||
337 | }) | ||
338 | |||
339 | tasks.push(promise) | ||
340 | }) | 337 | }) |
341 | 338 | ||
342 | return Promise.all(tasks) | 339 | tasks.push(promise) |
343 | }) | 340 | } |
341 | |||
342 | await Promise.all(tasks) | ||
344 | } | 343 | } |
345 | 344 | ||
346 | function fetchRemotePreview (video: VideoInstance) { | 345 | function fetchRemotePreview (video: VideoInstance) { |
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) { | |||
350 | return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) | 349 | return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) |
351 | } | 350 | } |
352 | 351 | ||
353 | function removeFriend (pod: PodInstance) { | 352 | async function removeFriend (pod: PodInstance) { |
354 | const requestParams = { | 353 | const requestParams = { |
355 | method: 'POST' as 'POST', | 354 | method: 'POST' as 'POST', |
356 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 355 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
357 | toPod: pod | 356 | toPod: pod |
358 | } | 357 | } |
359 | 358 | ||
360 | return makeSecureRequest(requestParams) | 359 | try { |
361 | .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) | 360 | await makeSecureRequest(requestParams) |
362 | .then(() => pod.destroy()) | 361 | } catch (err) { |
363 | .then(() => logger.info('Removed friend %s.', pod.host)) | 362 | logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err) |
364 | .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) | 363 | } |
364 | |||
365 | try { | ||
366 | await pod.destroy() | ||
367 | |||
368 | logger.info('Removed friend %s.', pod.host) | ||
369 | } catch (err) { | ||
370 | logger.error('Cannot destroy friend %s.', pod.host, err) | ||
371 | } | ||
365 | } | 372 | } |
366 | 373 | ||
367 | function getRequestScheduler () { | 374 | function getRequestScheduler () { |
@@ -406,23 +413,21 @@ export { | |||
406 | 413 | ||
407 | // --------------------------------------------------------------------------- | 414 | // --------------------------------------------------------------------------- |
408 | 415 | ||
409 | function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { | 416 | async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { |
410 | // TODO: type res | 417 | const result = await getForeignPodsList(host) |
411 | return getForeignPodsList(host).then(res => { | 418 | const foreignPodsList: { host: string }[] = result.data |
412 | const foreignPodsList: { host: string }[] = res.data | ||
413 | 419 | ||
414 | // Let's give 1 point to the pod we ask the friends list | 420 | // Let's give 1 point to the pod we ask the friends list |
415 | foreignPodsList.push({ host }) | 421 | foreignPodsList.push({ host }) |
416 | 422 | ||
417 | foreignPodsList.forEach(foreignPod => { | 423 | for (const foreignPod of foreignPodsList) { |
418 | const foreignPodHost = foreignPod.host | 424 | const foreignPodHost = foreignPod.host |
419 | 425 | ||
420 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ | 426 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ |
421 | else podsScore[foreignPodHost] = 1 | 427 | else podsScore[foreignPodHost] = 1 |
422 | }) | 428 | } |
423 | 429 | ||
424 | return undefined | 430 | return undefined |
425 | }) | ||
426 | } | 431 | } |
427 | 432 | ||
428 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { | 433 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { |
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num | |||
431 | const podsList = [] | 436 | const podsList = [] |
432 | const baseScore = hosts.length / 2 | 437 | const baseScore = hosts.length / 2 |
433 | 438 | ||
434 | Object.keys(podsScore).forEach(podHost => { | 439 | for (const podHost of Object.keys(podsScore)) { |
435 | // If the pod is not me and with a good score we add it | 440 | // If the pod is not me and with a good score we add it |
436 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { | 441 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { |
437 | podsList.push({ host: podHost }) | 442 | podsList.push({ host: podHost }) |
438 | } | 443 | } |
439 | }) | 444 | } |
440 | 445 | ||
441 | return podsList | 446 | return podsList |
442 | } | 447 | } |
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) { | |||
449 | if (err) return rej(err) | 454 | if (err) return rej(err) |
450 | 455 | ||
451 | try { | 456 | try { |
452 | const json = JSON.parse(body) | 457 | const json: ResultList<FormattedPod> = JSON.parse(body) |
453 | return res(json) | 458 | return res(json) |
454 | } catch (err) { | 459 | } catch (err) { |
455 | return rej(err) | 460 | return rej(err) |
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) { | |||
458 | }) | 463 | }) |
459 | } | 464 | } |
460 | 465 | ||
461 | function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { | 466 | async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { |
462 | // Stop pool requests | 467 | // Stop pool requests |
463 | requestScheduler.deactivate() | 468 | requestScheduler.deactivate() |
464 | // Flush pool requests | 469 | // Flush pool requests |
465 | requestScheduler.forceSend() | 470 | requestScheduler.forceSend() |
466 | 471 | ||
467 | return Promise.map(podsList, pod => { | 472 | try { |
468 | const params = { | 473 | await Bluebird.map(podsList, async pod => { |
469 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', | 474 | const params = { |
470 | method: 'POST' as 'POST', | 475 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', |
471 | json: { | 476 | method: 'POST' as 'POST', |
472 | host: CONFIG.WEBSERVER.HOST, | 477 | json: { |
473 | email: CONFIG.ADMIN.EMAIL, | 478 | host: CONFIG.WEBSERVER.HOST, |
474 | publicKey: cert | 479 | email: CONFIG.ADMIN.EMAIL, |
480 | publicKey: cert | ||
481 | } | ||
475 | } | 482 | } |
476 | } | ||
477 | 483 | ||
478 | return makeRetryRequest(params) | 484 | const { response, body } = await makeRetryRequest(params) |
479 | .then(({ response, body }) => { | 485 | const typedBody = body as { cert: string, email: string } |
480 | body = body as { cert: string, email: string } | 486 | |
481 | 487 | if (response.statusCode === 200) { | |
482 | if (response.statusCode === 200) { | 488 | const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email }) |
483 | const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) | 489 | |
484 | return podObj.save() | 490 | let podCreated: PodInstance |
485 | .then(podCreated => { | 491 | try { |
486 | 492 | podCreated = await podObj.save() | |
487 | // Add our videos to the request scheduler | 493 | } catch (err) { |
488 | sendOwnedDataToPod(podCreated.id) | 494 | logger.error('Cannot add friend %s pod.', pod.host, err) |
489 | }) | ||
490 | .catch(err => { | ||
491 | logger.error('Cannot add friend %s pod.', pod.host, err) | ||
492 | }) | ||
493 | } else { | ||
494 | logger.error('Status not 200 for %s pod.', pod.host) | ||
495 | } | 495 | } |
496 | }) | 496 | |
497 | .catch(err => { | 497 | // Add our videos to the request scheduler |
498 | logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) | 498 | sendOwnedDataToPod(podCreated.id) |
499 | // Don't break the process | 499 | .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err)) |
500 | }) | 500 | } else { |
501 | }, { concurrency: REQUESTS_IN_PARALLEL }) | 501 | logger.error('Status not 200 for %s pod.', pod.host) |
502 | .then(() => logger.debug('makeRequestsToWinningPods finished.')) | 502 | } |
503 | .finally(() => { | 503 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
504 | |||
505 | logger.debug('makeRequestsToWinningPods finished.') | ||
506 | |||
507 | requestScheduler.activate() | ||
508 | } catch (err) { | ||
504 | // Final callback, we've ended all the requests | 509 | // Final callback, we've ended all the requests |
505 | // Now we made new friends, we can re activate the pool of requests | 510 | // Now we made new friends, we can re activate the pool of requests |
506 | requestScheduler.activate() | 511 | requestScheduler.activate() |
507 | }) | 512 | } |
508 | } | 513 | } |
509 | 514 | ||
510 | // Wrapper that populate "toIds" argument with all our friends if it is not specified | 515 | // Wrapper that populate "toIds" argument with all our friends if it is not specified |
@@ -515,14 +520,19 @@ type CreateRequestOptions = { | |||
515 | toIds?: number[] | 520 | toIds?: number[] |
516 | transaction: Sequelize.Transaction | 521 | transaction: Sequelize.Transaction |
517 | } | 522 | } |
518 | function createRequest (options: CreateRequestOptions) { | 523 | async function createRequest (options: CreateRequestOptions) { |
519 | if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) | 524 | if (options.toIds !== undefined) { |
525 | await requestScheduler.createRequest(options as RequestSchedulerOptions) | ||
526 | return undefined | ||
527 | } | ||
520 | 528 | ||
521 | // If the "toIds" pods is not specified, we send the request to all our friends | 529 | // If the "toIds" pods is not specified, we send the request to all our friends |
522 | return db.Pod.listAllIds(options.transaction).then(podIds => { | 530 | const podIds = await db.Pod.listAllIds(options.transaction) |
523 | const newOptions = Object.assign(options, { toIds: podIds }) | 531 | |
524 | return requestScheduler.createRequest(newOptions) | 532 | const newOptions = Object.assign(options, { toIds: podIds }) |
525 | }) | 533 | await requestScheduler.createRequest(newOptions) |
534 | |||
535 | return undefined | ||
526 | } | 536 | } |
527 | 537 | ||
528 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { | 538 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { |
diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/handlers/video-file-optimizer.ts index 63a51064c..799ba8b01 100644 --- a/server/lib/jobs/handlers/video-file-optimizer.ts +++ b/server/lib/jobs/handlers/video-file-optimizer.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Promise from 'bluebird' | 1 | import * as Bluebird from 'bluebird' |
2 | 2 | ||
3 | import { database as db } from '../../../initializers/database' | 3 | import { database as db } from '../../../initializers/database' |
4 | import { logger, computeResolutionsToTranscode } from '../../../helpers' | 4 | import { logger, computeResolutionsToTranscode } from '../../../helpers' |
@@ -6,16 +6,17 @@ import { VideoInstance } from '../../../models' | |||
6 | import { addVideoToFriends } from '../../friends' | 6 | import { addVideoToFriends } from '../../friends' |
7 | import { JobScheduler } from '../job-scheduler' | 7 | import { JobScheduler } from '../job-scheduler' |
8 | 8 | ||
9 | function process (data: { videoUUID: string }, jobId: number) { | 9 | async function process (data: { videoUUID: string }, jobId: number) { |
10 | return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { | 10 | const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) |
11 | // No video, maybe deleted? | 11 | // No video, maybe deleted? |
12 | if (!video) { | 12 | if (!video) { |
13 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 13 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) |
14 | return undefined | 14 | return undefined |
15 | } | 15 | } |
16 | |||
17 | await video.optimizeOriginalVideofile() | ||
16 | 18 | ||
17 | return video.optimizeOriginalVideofile().then(() => video) | 19 | return video |
18 | }) | ||
19 | } | 20 | } |
20 | 21 | ||
21 | function onError (err: Error, jobId: number) { | 22 | function onError (err: Error, jobId: number) { |
@@ -23,33 +24,31 @@ function onError (err: Error, jobId: number) { | |||
23 | return Promise.resolve() | 24 | return Promise.resolve() |
24 | } | 25 | } |
25 | 26 | ||
26 | function onSuccess (jobId: number, video: VideoInstance) { | 27 | async function onSuccess (jobId: number, video: VideoInstance) { |
27 | if (video === undefined) return undefined | 28 | if (video === undefined) return undefined |
28 | 29 | ||
29 | logger.info('Job %d is a success.', jobId) | 30 | logger.info('Job %d is a success.', jobId) |
30 | 31 | ||
31 | video.toAddRemoteJSON() | 32 | const remoteVideo = await video.toAddRemoteJSON() |
32 | .then(remoteVideo => { | 33 | |
33 | // Now we'll add the video's meta data to our friends | 34 | // Now we'll add the video's meta data to our friends |
34 | return addVideoToFriends(remoteVideo, null) | 35 | await addVideoToFriends(remoteVideo, null) |
35 | }) | 36 | |
36 | .then(() => { | 37 | const originalFileHeight = await video.getOriginalFileHeight() |
37 | return video.getOriginalFileHeight() | 38 | // Create transcoding jobs if there are enabled resolutions |
38 | }) | 39 | |
39 | .then(originalFileHeight => { | 40 | const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) |
40 | // Create transcoding jobs if there are enabled resolutions | 41 | logger.info( |
41 | const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) | 42 | 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, |
42 | logger.info( | 43 | { resolutions: resolutionsEnabled } |
43 | 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, | 44 | ) |
44 | { resolutions: resolutionsEnabled } | 45 | |
45 | ) | 46 | if (resolutionsEnabled.length !== 0) { |
46 | 47 | try { | |
47 | if (resolutionsEnabled.length === 0) return undefined | 48 | await db.sequelize.transaction(async t => { |
48 | 49 | const tasks: Bluebird<any>[] = [] | |
49 | return db.sequelize.transaction(t => { | 50 | |
50 | const tasks: Promise<any>[] = [] | 51 | for (const resolution of resolutionsEnabled) { |
51 | |||
52 | resolutionsEnabled.forEach(resolution => { | ||
53 | const dataInput = { | 52 | const dataInput = { |
54 | videoUUID: video.uuid, | 53 | videoUUID: video.uuid, |
55 | resolution | 54 | resolution |
@@ -57,24 +56,19 @@ function onSuccess (jobId: number, video: VideoInstance) { | |||
57 | 56 | ||
58 | const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) | 57 | const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) |
59 | tasks.push(p) | 58 | tasks.push(p) |
60 | }) | 59 | } |
61 | 60 | ||
62 | return Promise.all(tasks).then(() => resolutionsEnabled) | 61 | await Promise.all(tasks) |
63 | }) | 62 | }) |
64 | }) | ||
65 | .then(resolutionsEnabled => { | ||
66 | if (resolutionsEnabled === undefined) { | ||
67 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') | ||
68 | return undefined | ||
69 | } | ||
70 | 63 | ||
71 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) | 64 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) |
72 | }) | 65 | } catch (err) { |
73 | .catch((err: Error) => { | 66 | logger.warn('Cannot transcode the video.', err) |
74 | logger.debug('Cannot transcode the video.', err) | 67 | } |
75 | throw err | 68 | } else { |
76 | }) | 69 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') |
77 | 70 | return undefined | |
71 | } | ||
78 | } | 72 | } |
79 | 73 | ||
80 | // --------------------------------------------------------------------------- | 74 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/handlers/video-file-transcoder.ts index 0dafee566..b240ff58a 100644 --- a/server/lib/jobs/handlers/video-file-transcoder.ts +++ b/server/lib/jobs/handlers/video-file-transcoder.ts | |||
@@ -4,16 +4,17 @@ import { logger } from '../../../helpers' | |||
4 | import { VideoInstance } from '../../../models' | 4 | import { VideoInstance } from '../../../models' |
5 | import { VideoResolution } from '../../../../shared' | 5 | import { VideoResolution } from '../../../../shared' |
6 | 6 | ||
7 | function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { | 7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { |
8 | return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { | 8 | const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) |
9 | // No video, maybe deleted? | 9 | // No video, maybe deleted? |
10 | if (!video) { | 10 | if (!video) { |
11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) |
12 | return undefined | 12 | return undefined |
13 | } | 13 | } |
14 | 14 | ||
15 | return video.transcodeOriginalVideofile(data.resolution).then(() => video) | 15 | await video.transcodeOriginalVideofile(data.resolution) |
16 | }) | 16 | |
17 | return video | ||
17 | } | 18 | } |
18 | 19 | ||
19 | function onError (err: Error, jobId: number) { | 20 | function onError (err: Error, jobId: number) { |
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index c2409d20c..61d483268 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -23,7 +23,7 @@ class JobScheduler { | |||
23 | return this.instance || (this.instance = new this()) | 23 | return this.instance || (this.instance = new this()) |
24 | } | 24 | } |
25 | 25 | ||
26 | activate () { | 26 | async activate () { |
27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE | 27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE |
28 | 28 | ||
29 | logger.info('Jobs scheduler activated.') | 29 | logger.info('Jobs scheduler activated.') |
@@ -32,32 +32,36 @@ class JobScheduler { | |||
32 | 32 | ||
33 | // Finish processing jobs from a previous start | 33 | // Finish processing jobs from a previous start |
34 | const state = JOB_STATES.PROCESSING | 34 | const state = JOB_STATES.PROCESSING |
35 | db.Job.listWithLimit(limit, state) | 35 | try { |
36 | .then(jobs => { | 36 | const jobs = await db.Job.listWithLimit(limit, state) |
37 | this.enqueueJobs(jobsQueue, jobs) | 37 | |
38 | 38 | this.enqueueJobs(jobsQueue, jobs) | |
39 | forever( | 39 | } catch (err) { |
40 | next => { | 40 | logger.error('Cannot list pending jobs.', err) |
41 | if (jobsQueue.length() !== 0) { | 41 | } |
42 | // Finish processing the queue first | 42 | |
43 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | 43 | forever( |
44 | } | 44 | async next => { |
45 | 45 | if (jobsQueue.length() !== 0) { | |
46 | const state = JOB_STATES.PENDING | 46 | // Finish processing the queue first |
47 | db.Job.listWithLimit(limit, state) | 47 | return setTimeout(next, JOBS_FETCHING_INTERVAL) |
48 | .then(jobs => { | 48 | } |
49 | this.enqueueJobs(jobsQueue, jobs) | 49 | |
50 | 50 | const state = JOB_STATES.PENDING | |
51 | // Optimization: we could use "drain" from queue object | 51 | try { |
52 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | 52 | const jobs = await db.Job.listWithLimit(limit, state) |
53 | }) | 53 | |
54 | .catch(err => logger.error('Cannot list pending jobs.', err)) | 54 | this.enqueueJobs(jobsQueue, jobs) |
55 | }, | 55 | } catch (err) { |
56 | 56 | logger.error('Cannot list pending jobs.', err) | |
57 | err => logger.error('Error in job scheduler queue.', err) | 57 | } |
58 | ) | 58 | |
59 | }) | 59 | // Optimization: we could use "drain" from queue object |
60 | .catch(err => logger.error('Cannot list pending jobs.', err)) | 60 | return setTimeout(next, JOBS_FETCHING_INTERVAL) |
61 | }, | ||
62 | |||
63 | err => logger.error('Error in job scheduler queue.', err) | ||
64 | ) | ||
61 | } | 65 | } |
62 | 66 | ||
63 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { | 67 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { |
@@ -75,7 +79,7 @@ class JobScheduler { | |||
75 | jobs.forEach(job => jobsQueue.push(job)) | 79 | jobs.forEach(job => jobsQueue.push(job)) |
76 | } | 80 | } |
77 | 81 | ||
78 | private processJob (job: JobInstance, callback: (err: Error) => void) { | 82 | private async processJob (job: JobInstance, callback: (err: Error) => void) { |
79 | const jobHandler = jobHandlers[job.handlerName] | 83 | const jobHandler = jobHandlers[job.handlerName] |
80 | if (jobHandler === undefined) { | 84 | if (jobHandler === undefined) { |
81 | logger.error('Unknown job handler for job %s.', job.handlerName) | 85 | logger.error('Unknown job handler for job %s.', job.handlerName) |
@@ -85,41 +89,45 @@ class JobScheduler { | |||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | 89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) |
86 | 90 | ||
87 | job.state = JOB_STATES.PROCESSING | 91 | job.state = JOB_STATES.PROCESSING |
88 | return job.save() | 92 | await job.save() |
89 | .then(() => { | 93 | |
90 | return jobHandler.process(job.handlerInputData, job.id) | 94 | try { |
91 | }) | 95 | const result = await jobHandler.process(job.handlerInputData, job.id) |
92 | .then( | 96 | await this.onJobSuccess(jobHandler, job, result) |
93 | result => { | 97 | } catch (err) { |
94 | return this.onJobSuccess(jobHandler, job, result) | 98 | logger.error('Error in job handler %s.', job.handlerName, err) |
95 | }, | 99 | |
96 | 100 | try { | |
97 | err => { | 101 | await this.onJobError(jobHandler, job, err) |
98 | logger.error('Error in job handler %s.', job.handlerName, err) | 102 | } catch (innerErr) { |
99 | return this.onJobError(jobHandler, job, err) | 103 | this.cannotSaveJobError(innerErr) |
100 | } | 104 | return callback(innerErr) |
101 | ) | 105 | } |
102 | .then(() => callback(null)) | 106 | } |
103 | .catch(err => { | 107 | |
104 | this.cannotSaveJobError(err) | 108 | callback(null) |
105 | return callback(err) | ||
106 | }) | ||
107 | } | 109 | } |
108 | 110 | ||
109 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { | 111 | private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { |
110 | job.state = JOB_STATES.ERROR | 112 | job.state = JOB_STATES.ERROR |
111 | 113 | ||
112 | return job.save() | 114 | try { |
113 | .then(() => jobHandler.onError(err, job.id)) | 115 | await job.save() |
114 | .catch(err => this.cannotSaveJobError(err)) | 116 | await jobHandler.onError(err, job.id) |
117 | } catch (err) { | ||
118 | this.cannotSaveJobError(err) | ||
119 | } | ||
115 | } | 120 | } |
116 | 121 | ||
117 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { | 122 | private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { |
118 | job.state = JOB_STATES.SUCCESS | 123 | job.state = JOB_STATES.SUCCESS |
119 | 124 | ||
120 | return job.save() | 125 | try { |
121 | .then(() => jobHandler.onSuccess(job.id, jobResult)) | 126 | await job.save() |
122 | .catch(err => this.cannotSaveJobError(err)) | 127 | jobHandler.onSuccess(job.id, jobResult) |
128 | } catch (err) { | ||
129 | this.cannotSaveJobError(err) | ||
130 | } | ||
123 | } | 131 | } |
124 | 132 | ||
125 | private cannotSaveJobError (err: Error) { | 133 | private cannotSaveJobError (err: Error) { |
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts index 9aa3ea52f..d91b00c55 100644 --- a/server/lib/oauth-model.ts +++ b/server/lib/oauth-model.ts | |||
@@ -24,39 +24,36 @@ function getRefreshToken (refreshToken: string) { | |||
24 | return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) | 24 | return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) |
25 | } | 25 | } |
26 | 26 | ||
27 | function getUser (username: string, password: string) { | 27 | async function getUser (username: string, password: string) { |
28 | logger.debug('Getting User (username: ' + username + ', password: ******).') | 28 | logger.debug('Getting User (username: ' + username + ', password: ******).') |
29 | 29 | ||
30 | return db.User.getByUsername(username).then(user => { | 30 | const user = await db.User.getByUsername(username) |
31 | if (!user) return null | 31 | if (!user) return null |
32 | 32 | ||
33 | return user.isPasswordMatch(password).then(passwordMatch => { | 33 | const passwordMatch = await user.isPasswordMatch(password) |
34 | if (passwordMatch === false) return null | 34 | if (passwordMatch === false) return null |
35 | 35 | ||
36 | return user | 36 | return user |
37 | }) | ||
38 | }) | ||
39 | } | 37 | } |
40 | 38 | ||
41 | function revokeToken (token: TokenInfo) { | 39 | async function revokeToken (tokenInfo: TokenInfo) { |
42 | return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => { | 40 | const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken) |
43 | if (tokenDB) tokenDB.destroy() | 41 | if (token) token.destroy() |
44 | 42 | ||
45 | /* | 43 | /* |
46 | * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js | 44 | * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js |
47 | * "As per the discussion we need set older date | 45 | * "As per the discussion we need set older date |
48 | * revokeToken will expected return a boolean in future version | 46 | * revokeToken will expected return a boolean in future version |
49 | * https://github.com/oauthjs/node-oauth2-server/pull/274 | 47 | * https://github.com/oauthjs/node-oauth2-server/pull/274 |
50 | * https://github.com/oauthjs/node-oauth2-server/issues/290" | 48 | * https://github.com/oauthjs/node-oauth2-server/issues/290" |
51 | */ | 49 | */ |
52 | const expiredToken = tokenDB | 50 | const expiredToken = token |
53 | expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') | 51 | expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') |
54 | 52 | ||
55 | return expiredToken | 53 | return expiredToken |
56 | }) | ||
57 | } | 54 | } |
58 | 55 | ||
59 | function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { | 56 | async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { |
60 | logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') | 57 | logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') |
61 | 58 | ||
62 | const tokenToCreate = { | 59 | const tokenToCreate = { |
@@ -68,11 +65,10 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns | |||
68 | userId: user.id | 65 | userId: user.id |
69 | } | 66 | } |
70 | 67 | ||
71 | return db.OAuthToken.create(tokenToCreate).then(tokenCreated => { | 68 | const tokenCreated = await db.OAuthToken.create(tokenToCreate) |
72 | const tokenToReturn = Object.assign(tokenCreated, { client, user }) | 69 | const tokenToReturn = Object.assign(tokenCreated, { client, user }) |
73 | 70 | ||
74 | return tokenToReturn | 71 | return tokenToReturn |
75 | }) | ||
76 | } | 72 | } |
77 | 73 | ||
78 | // --------------------------------------------------------------------------- | 74 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index ce4e2ffd2..f46682824 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import { isEmpty } from 'lodash' | 1 | import { isEmpty } from 'lodash' |
2 | import * as Promise from 'bluebird' | 2 | import * as Bluebird from 'bluebird' |
3 | 3 | ||
4 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
5 | import { logger, makeSecureRequest } from '../../helpers' | 5 | import { logger, makeSecureRequest } from '../../helpers' |
@@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> { | |||
76 | // --------------------------------------------------------------------------- | 76 | // --------------------------------------------------------------------------- |
77 | 77 | ||
78 | // Make a requests to friends of a certain type | 78 | // Make a requests to friends of a certain type |
79 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { | 79 | protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { |
80 | const params = { | 80 | const params = { |
81 | toPod: toPod, | 81 | toPod: toPod, |
82 | method: 'POST' as 'POST', | 82 | method: 'POST' as 'POST', |
@@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> { | |||
86 | 86 | ||
87 | // Make multiple retry requests to all of pods | 87 | // Make multiple retry requests to all of pods |
88 | // The function fire some useful callbacks | 88 | // The function fire some useful callbacks |
89 | return makeSecureRequest(params) | 89 | try { |
90 | .then(({ response, body }) => { | 90 | const { response } = await makeSecureRequest(params) |
91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { | 91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { |
92 | throw new Error('Status code not 20x : ' + response.statusCode) | 92 | throw new Error('Status code not 20x : ' + response.statusCode) |
93 | } | 93 | } |
94 | }) | 94 | } catch (err) { |
95 | .catch(err => { | 95 | logger.error('Error sending secure request to %s pod.', toPod.host, err) |
96 | logger.error('Error sending secure request to %s pod.', toPod.host, err) | 96 | |
97 | 97 | throw err | |
98 | throw err | 98 | } |
99 | }) | ||
100 | } | 99 | } |
101 | 100 | ||
102 | // Make all the requests of the scheduler | 101 | // Make all the requests of the scheduler |
103 | protected makeRequests () { | 102 | protected async makeRequests () { |
104 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) | 103 | let requestsGrouped: T |
105 | .then((requestsGrouped: T) => { | 104 | |
106 | // We want to group requests by destinations pod and endpoint | 105 | try { |
107 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | 106 | requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) |
108 | 107 | } catch (err) { | |
109 | // If there are no requests, abort | 108 | logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }) |
110 | if (isEmpty(requestsToMake) === true) { | 109 | throw err |
111 | logger.info('No "%s" to make.', this.description) | 110 | } |
112 | return { goodPods: [], badPods: [] } | 111 | |
113 | } | 112 | // We want to group requests by destinations pod and endpoint |
114 | 113 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | |
115 | logger.info('Making "%s" to friends.', this.description) | 114 | |
116 | 115 | // If there are no requests, abort | |
117 | const goodPods: number[] = [] | 116 | if (isEmpty(requestsToMake) === true) { |
118 | const badPods: number[] = [] | 117 | logger.info('No "%s" to make.', this.description) |
119 | 118 | return { goodPods: [], badPods: [] } | |
120 | return Promise.map(Object.keys(requestsToMake), hashKey => { | 119 | } |
121 | const requestToMake = requestsToMake[hashKey] | 120 | |
122 | const toPod: PodInstance = requestToMake.toPod | 121 | logger.info('Making "%s" to friends.', this.description) |
123 | 122 | ||
124 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) | 123 | const goodPods: number[] = [] |
125 | .then(() => { | 124 | const badPods: number[] = [] |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | 125 | |
127 | goodPods.push(requestToMake.toPod.id) | 126 | await Bluebird.map(Object.keys(requestsToMake), async hashKey => { |
128 | 127 | const requestToMake = requestsToMake[hashKey] | |
129 | this.afterRequestHook() | 128 | const toPod: PodInstance = requestToMake.toPod |
130 | 129 | ||
131 | // Remove the pod id of these request ids | 130 | try { |
132 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) | 131 | await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) |
133 | }) | 132 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
134 | .catch(err => { | 133 | goodPods.push(requestToMake.toPod.id) |
135 | badPods.push(requestToMake.toPod.id) | 134 | |
136 | logger.info('Cannot make request to %s.', toPod.host, err) | 135 | this.afterRequestHook() |
137 | }) | 136 | |
138 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) | 137 | // Remove the pod id of these request ids |
139 | }) | 138 | await this.getRequestToPodModel() |
140 | .then(({ goodPods, badPods }) => { | 139 | .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) |
141 | this.afterRequestsHook() | 140 | } catch (err) { |
142 | 141 | badPods.push(requestToMake.toPod.id) | |
143 | // All the requests were made, we update the pods score | 142 | logger.info('Cannot make request to %s.', toPod.host, err) |
144 | return db.Pod.updatePodsScore(goodPods, badPods) | 143 | } |
145 | }) | 144 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
146 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) | 145 | |
146 | this.afterRequestsHook() | ||
147 | |||
148 | // All the requests were made, we update the pods score | ||
149 | await db.Pod.updatePodsScore(goodPods, badPods) | ||
147 | } | 150 | } |
148 | 151 | ||
149 | protected afterRequestHook () { | 152 | protected afterRequestHook () { |
150 | // Nothing to do, let children reimplement it | 153 | // Nothing to do, let children re-implement it |
151 | } | 154 | } |
152 | 155 | ||
153 | protected afterRequestsHook () { | 156 | protected afterRequestsHook () { |
154 | // Nothing to do, let children reimplement it | 157 | // Nothing to do, let children re-implement it |
155 | } | 158 | } |
156 | } | 159 | } |
157 | 160 | ||
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 696875dcf..c3f7f6429 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts | |||
@@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { | 37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { |
38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} | 38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} |
39 | 39 | ||
40 | Object.keys(requestsGrouped).forEach(toPodId => { | 40 | for (const toPodId of Object.keys(requestsGrouped)) { |
41 | requestsGrouped[toPodId].forEach(data => { | 41 | for (const data of requestsGrouped[toPodId]) { |
42 | const request = data.request | 42 | const request = data.request |
43 | const pod = data.pod | 43 | const pod = data.pod |
44 | const hashKey = toPodId + request.endpoint | 44 | const hashKey = toPodId + request.endpoint |
@@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
54 | 54 | ||
55 | requestsToMakeGrouped[hashKey].ids.push(request.id) | 55 | requestsToMakeGrouped[hashKey].ids.push(request.id) |
56 | requestsToMakeGrouped[hashKey].datas.push(request.request) | 56 | requestsToMakeGrouped[hashKey].datas.push(request.request) |
57 | }) | 57 | } |
58 | }) | 58 | } |
59 | 59 | ||
60 | return requestsToMakeGrouped | 60 | return requestsToMakeGrouped |
61 | } | 61 | } |
62 | 62 | ||
63 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { | 63 | async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { |
64 | // If there are no destination pods abort | 64 | // If there are no destination pods abort |
65 | if (toIds.length === 0) return undefined | 65 | if (toIds.length === 0) return undefined |
66 | 66 | ||
@@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
76 | transaction | 76 | transaction |
77 | } | 77 | } |
78 | 78 | ||
79 | return db.Request.create(createQuery, dbRequestOptions) | 79 | const request = await db.Request.create(createQuery, dbRequestOptions) |
80 | .then(request => { | 80 | await request.setPods(toIds, dbRequestOptions) |
81 | return request.setPods(toIds, dbRequestOptions) | ||
82 | }) | ||
83 | } | 81 | } |
84 | 82 | ||
85 | // --------------------------------------------------------------------------- | 83 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 680232732..5f21287f0 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts | |||
@@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
59 | 59 | ||
60 | // We group video events per video and per pod | 60 | // We group video events per video and per pod |
61 | // We add the counts of the same event types | 61 | // We add the counts of the same event types |
62 | Object.keys(eventRequests).forEach(toPodId => { | 62 | for (const toPodId of Object.keys(eventRequests)) { |
63 | eventRequests[toPodId].forEach(eventToProcess => { | 63 | for (const eventToProcess of eventRequests[toPodId]) { |
64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | 64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} |
65 | 65 | ||
66 | if (!requestsToMakeGrouped[toPodId]) { | 66 | if (!requestsToMakeGrouped[toPodId]) { |
@@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 | 81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 |
82 | 82 | ||
83 | events[eventToProcess.type] += eventToProcess.count | 83 | events[eventToProcess.type] += eventToProcess.count |
84 | }) | 84 | } |
85 | }) | 85 | } |
86 | 86 | ||
87 | // Now we build our requests array per pod | 87 | // Now we build our requests array per pod |
88 | Object.keys(eventsPerVideoPerPod).forEach(toPodId => { | 88 | for (const toPodId of Object.keys(eventsPerVideoPerPod)) { |
89 | const eventsForPod = eventsPerVideoPerPod[toPodId] | 89 | const eventsForPod = eventsPerVideoPerPod[toPodId] |
90 | 90 | ||
91 | Object.keys(eventsForPod).forEach(uuid => { | 91 | for (const uuid of Object.keys(eventsForPod)) { |
92 | const eventsForVideo = eventsForPod[uuid] | 92 | const eventsForVideo = eventsForPod[uuid] |
93 | 93 | ||
94 | Object.keys(eventsForVideo).forEach(eventType => { | 94 | for (const eventType of Object.keys(eventsForVideo)) { |
95 | requestsToMakeGrouped[toPodId].datas.push({ | 95 | requestsToMakeGrouped[toPodId].datas.push({ |
96 | data: { | 96 | data: { |
97 | uuid, | 97 | uuid, |
@@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
99 | count: +eventsForVideo[eventType] | 99 | count: +eventsForVideo[eventType] |
100 | } | 100 | } |
101 | }) | 101 | }) |
102 | }) | 102 | } |
103 | }) | 103 | } |
104 | }) | 104 | } |
105 | 105 | ||
106 | return requestsToMakeGrouped | 106 | return requestsToMakeGrouped |
107 | } | 107 | } |
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index d7e1d5e31..a54efc111 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts | |||
@@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa | |||
59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { | 59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { |
60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} | 60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} |
61 | 61 | ||
62 | Object.keys(requests).forEach(toPodId => { | 62 | for (const toPodId of Object.keys(requests)) { |
63 | requests[toPodId].forEach(data => { | 63 | for (const data of requests[toPodId]) { |
64 | const request = data.request | 64 | const request = data.request |
65 | const video = data.video | 65 | const video = data.video |
66 | const pod = data.pod | 66 | const pod = data.pod |
@@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa | |||
105 | // Maybe there are multiple quick and dirty update for the same video | 105 | // Maybe there are multiple quick and dirty update for the same video |
106 | // We use this hash map to dedupe them | 106 | // We use this hash map to dedupe them |
107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData | 107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData |
108 | }) | 108 | } |
109 | }) | 109 | } |
110 | 110 | ||
111 | // Now we deduped similar quick and dirty updates, we can build our requests data | 111 | // Now we deduped similar quick and dirty updates, we can build our requests data |
112 | Object.keys(requestsToMakeGrouped).forEach(hashKey => { | 112 | for (const hashKey of Object.keys(requestsToMakeGrouped)) { |
113 | Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { | 113 | for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) { |
114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] | 114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] |
115 | 115 | ||
116 | requestsToMakeGrouped[hashKey].datas.push({ | 116 | requestsToMakeGrouped[hashKey].datas.push({ |
117 | data: videoData | 117 | data: videoData |
118 | }) | 118 | }) |
119 | }) | 119 | } |
120 | 120 | ||
121 | // We don't need it anymore, it was just to build our data array | 121 | // We don't need it anymore, it was just to build our data array |
122 | delete requestsToMakeGrouped[hashKey].videos | 122 | delete requestsToMakeGrouped[hashKey].videos |
123 | }) | 123 | } |
124 | 124 | ||
125 | return requestsToMakeGrouped | 125 | return requestsToMakeGrouped |
126 | } | 126 | } |
127 | 127 | ||
128 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { | 128 | async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { |
129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} | 129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} |
130 | if (transaction) dbRequestOptions.transaction = transaction | 130 | if (transaction) dbRequestOptions.transaction = transaction |
131 | 131 | ||
132 | // Send the update to all our friends | 132 | // Send the update to all our friends |
133 | return db.Pod.listAllIds(transaction).then(podIds => { | 133 | const podIds = await db.Pod.listAllIds(transaction) |
134 | const queries = [] | 134 | const queries = [] |
135 | podIds.forEach(podId => { | 135 | for (const podId of podIds) { |
136 | queries.push({ type, videoId, podId }) | 136 | queries.push({ type, videoId, podId }) |
137 | }) | 137 | } |
138 | 138 | ||
139 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) | 139 | await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) |
140 | }) | 140 | return undefined |
141 | } | 141 | } |
142 | } | 142 | } |
143 | 143 | ||
diff --git a/server/lib/user.ts b/server/lib/user.ts index 8609e72d8..a92f4777b 100644 --- a/server/lib/user.ts +++ b/server/lib/user.ts | |||
@@ -3,40 +3,36 @@ import { UserInstance } from '../models' | |||
3 | import { addVideoAuthorToFriends } from './friends' | 3 | import { addVideoAuthorToFriends } from './friends' |
4 | import { createVideoChannel } from './video-channel' | 4 | import { createVideoChannel } from './video-channel' |
5 | 5 | ||
6 | function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { | 6 | async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { |
7 | return db.sequelize.transaction(t => { | 7 | const res = await db.sequelize.transaction(async t => { |
8 | const userOptions = { | 8 | const userOptions = { |
9 | transaction: t, | 9 | transaction: t, |
10 | validate: validateUser | 10 | validate: validateUser |
11 | } | 11 | } |
12 | 12 | ||
13 | return user.save(userOptions) | 13 | const userCreated = await user.save(userOptions) |
14 | .then(user => { | 14 | const authorInstance = db.Author.build({ |
15 | const author = db.Author.build({ | 15 | name: userCreated.username, |
16 | name: user.username, | 16 | podId: null, // It is our pod |
17 | podId: null, // It is our pod | 17 | userId: userCreated.id |
18 | userId: user.id | 18 | }) |
19 | }) | 19 | |
20 | 20 | const authorCreated = await authorInstance.save({ transaction: t }) | |
21 | return author.save({ transaction: t }) | 21 | |
22 | .then(author => ({ author, user })) | 22 | const remoteVideoAuthor = authorCreated.toAddRemoteJSON() |
23 | }) | 23 | |
24 | .then(({ author, user }) => { | 24 | // Now we'll add the video channel's meta data to our friends |
25 | const remoteVideoAuthor = author.toAddRemoteJSON() | 25 | const author = await addVideoAuthorToFriends(remoteVideoAuthor, t) |
26 | 26 | ||
27 | // Now we'll add the video channel's meta data to our friends | 27 | const videoChannelInfo = { |
28 | return addVideoAuthorToFriends(remoteVideoAuthor, t) | 28 | name: `Default ${userCreated.username} channel` |
29 | .then(() => ({ author, user })) | 29 | } |
30 | }) | 30 | const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t) |
31 | .then(({ author, user }) => { | 31 | |
32 | const videoChannelInfo = { | 32 | return { author, videoChannel } |
33 | name: `Default ${user.username} channel` | ||
34 | } | ||
35 | |||
36 | return createVideoChannel(videoChannelInfo, author, t) | ||
37 | .then(videoChannel => ({ author, user, videoChannel })) | ||
38 | }) | ||
39 | }) | 33 | }) |
34 | |||
35 | return res | ||
40 | } | 36 | } |
41 | 37 | ||
42 | // --------------------------------------------------------------------------- | 38 | // --------------------------------------------------------------------------- |