aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-10-25 16:03:33 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-10-26 09:11:38 +0200
commitf5028693a896a3076dd286ac0030e3d8f78f5ebf (patch)
tree09144ed6357e49ea575fb110247f933283ad235e /server/lib
parenteb08047657e739bcd9e592d76307befa3998482b (diff)
downloadPeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.gz
PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.zst
PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.zip
Use async/await in lib and initializers
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/cache/videos-preview-cache.ts15
-rw-r--r--server/lib/friends.ts368
-rw-r--r--server/lib/jobs/handlers/video-file-optimizer.ts88
-rw-r--r--server/lib/jobs/handlers/video-file-transcoder.ts21
-rw-r--r--server/lib/jobs/job-scheduler.ts118
-rw-r--r--server/lib/oauth-model.ts54
-rw-r--r--server/lib/request/abstract-request-scheduler.ts121
-rw-r--r--server/lib/request/request-scheduler.ts16
-rw-r--r--server/lib/request/request-video-event-scheduler.ts20
-rw-r--r--server/lib/request/request-video-qadu-scheduler.ts34
-rw-r--r--server/lib/user.ts52
11 files changed, 456 insertions, 451 deletions
diff --git a/server/lib/cache/videos-preview-cache.ts b/server/lib/cache/videos-preview-cache.ts
index fecdca6ef..0fe4d2f78 100644
--- a/server/lib/cache/videos-preview-cache.ts
+++ b/server/lib/cache/videos-preview-cache.ts
@@ -1,7 +1,6 @@
1import * as asyncLRU from 'async-lru' 1import * as asyncLRU from 'async-lru'
2import { join } from 'path' 2import { join } from 'path'
3import { createWriteStream } from 'fs' 3import { createWriteStream } from 'fs'
4import * as Promise from 'bluebird'
5 4
6import { database as db, CONFIG, CACHE } from '../../initializers' 5import { database as db, CONFIG, CACHE } from '../../initializers'
7import { logger, unlinkPromise } from '../../helpers' 6import { logger, unlinkPromise } from '../../helpers'
@@ -43,15 +42,15 @@ class VideosPreviewCache {
43 }) 42 })
44 } 43 }
45 44
46 private loadPreviews (key: string) { 45 private async loadPreviews (key: string) {
47 return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) 46 const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key)
48 .then(video => { 47 if (!video) return undefined
49 if (!video) return undefined
50 48
51 if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) 49 if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName())
52 50
53 return this.saveRemotePreviewAndReturnPath(video) 51 const res = await this.saveRemotePreviewAndReturnPath(video)
54 }) 52
53 return res
55 } 54 }
56 55
57 private saveRemotePreviewAndReturnPath (video: VideoInstance) { 56 private saveRemotePreviewAndReturnPath (video: VideoInstance) {
diff --git a/server/lib/friends.ts b/server/lib/friends.ts
index f035b099b..a33432dc1 100644
--- a/server/lib/friends.ts
+++ b/server/lib/friends.ts
@@ -1,6 +1,6 @@
1import * as request from 'request' 1import * as request from 'request'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import * as Promise from 'bluebird' 3import * as Bluebird from 'bluebird'
4import { join } from 'path' 4import { join } from 'path'
5 5
6import { database as db } from '../initializers/database' 6import { database as db } from '../initializers/database'
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.
188function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { 188function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
189 const tasks = [] 189 const tasks = []
190 190
191 eventsParams.forEach(eventParams => { 191 for (const eventParams of eventsParams) {
192 tasks.push(addEventToRemoteVideo(eventParams, transaction)) 192 tasks.push(addEventToRemoteVideo(eventParams, transaction))
193 }) 193 }
194 194
195 return Promise.all(tasks) 195 return Promise.all(tasks)
196} 196}
197 197
198function hasFriends () { 198async function hasFriends () {
199 return db.Pod.countAll().then(count => count !== 0) 199 const count = await db.Pod.countAll()
200
201 return count !== 0
200} 202}
201 203
202function makeFriends (hosts: string[]) { 204async function makeFriends (hosts: string[]) {
203 const podsScore = {} 205 const podsScore = {}
204 206
205 logger.info('Make friends!') 207 logger.info('Make friends!')
206 return getMyPublicCert() 208 const cert = await getMyPublicCert()
207 .then(cert => {
208 return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
209 })
210 .then(cert => {
211 logger.debug('Pods scores computed.', { podsScore: podsScore })
212 const podsList = computeWinningPods(hosts, podsScore)
213 logger.debug('Pods that we keep.', { podsToKeep: podsList })
214 209
215 return makeRequestsToWinningPods(cert, podsList) 210 for (const host of hosts) {
216 }) 211 await computeForeignPodsList(host, podsScore)
212 }
213
214 logger.debug('Pods scores computed.', { podsScore: podsScore })
215
216 const podsList = computeWinningPods(hosts, podsScore)
217 logger.debug('Pods that we keep.', { podsToKeep: podsList })
218
219 return makeRequestsToWinningPods(cert, podsList)
217} 220}
218 221
219function quitFriends () { 222async function quitFriends () {
220 // Stop pool requests 223 // Stop pool requests
221 requestScheduler.deactivate() 224 requestScheduler.deactivate()
222 225
223 return requestScheduler.flush() 226 try {
224 .then(() => { 227 await requestScheduler.flush()
225 return requestVideoQaduScheduler.flush() 228
226 }) 229 await requestVideoQaduScheduler.flush()
227 .then(() => { 230
228 return db.Pod.list() 231 const pods = await db.Pod.list()
229 }) 232 const requestParams = {
230 .then(pods => { 233 method: 'POST' as 'POST',
231 const requestParams = { 234 path: '/api/' + API_VERSION + '/remote/pods/remove',
232 method: 'POST' as 'POST', 235 toPod: null
233 path: '/api/' + API_VERSION + '/remote/pods/remove', 236 }
234 toPod: null
235 }
236 237
237 // Announce we quit them 238 // Announce we quit them
238 // We don't care if the request fails 239 // We don't care if the request fails
239 // The other pod will exclude us automatically after a while 240 // The other pod will exclude us automatically after a while
240 return Promise.map(pods, pod => { 241 try {
242 await Bluebird.map(pods, pod => {
241 requestParams.toPod = pod 243 requestParams.toPod = pod
242 244
243 return makeSecureRequest(requestParams) 245 return makeSecureRequest(requestParams)
244 }, { concurrency: REQUESTS_IN_PARALLEL }) 246 }, { concurrency: REQUESTS_IN_PARALLEL })
245 .then(() => pods) 247 } catch (err) { // Don't stop the process
246 .catch(err => { 248 logger.error('Some errors while quitting friends.', err)
247 logger.error('Some errors while quitting friends.', err) 249 }
248 // Don't stop the process
249 return pods
250 })
251 })
252 .then(pods => {
253 const tasks = []
254 pods.forEach(pod => tasks.push(pod.destroy()))
255 250
256 return Promise.all(pods) 251 const tasks = []
257 }) 252 for (const pod of pods) {
258 .then(() => { 253 tasks.push(pod.destroy())
259 logger.info('Removed all remote videos.') 254 }
260 // Don't forget to re activate the scheduler, even if there was an error 255 await Promise.all(pods)
261 return requestScheduler.activate() 256
262 }) 257 logger.info('Removed all remote videos.')
263 .finally(() => requestScheduler.activate()) 258
259 requestScheduler.activate()
260 } catch (err) {
261 // Don't forget to re activate the scheduler, even if there was an error
262 requestScheduler.activate()
263
264 throw err
265 }
264} 266}
265 267
266function sendOwnedDataToPod (podId: number) { 268async function sendOwnedDataToPod (podId: number) {
267 // First send authors 269 // First send authors
268 return sendOwnedAuthorsToPod(podId) 270 await sendOwnedAuthorsToPod(podId)
269 .then(() => sendOwnedChannelsToPod(podId)) 271 await sendOwnedChannelsToPod(podId)
270 .then(() => sendOwnedVideosToPod(podId)) 272 await sendOwnedVideosToPod(podId)
273}
274
275async function sendOwnedChannelsToPod (podId: number) {
276 const videoChannels = await db.VideoChannel.listOwned()
277
278 const tasks: Promise<any>[] = []
279 for (const videoChannel of videoChannels) {
280 const remoteVideoChannel = videoChannel.toAddRemoteJSON()
281 const options = {
282 type: 'add-channel' as 'add-channel',
283 endpoint: REQUEST_ENDPOINTS.VIDEOS,
284 data: remoteVideoChannel,
285 toIds: [ podId ],
286 transaction: null
287 }
288
289 const p = createRequest(options)
290 tasks.push(p)
291 }
292
293 await Promise.all(tasks)
271} 294}
272 295
273function sendOwnedChannelsToPod (podId: number) { 296async function sendOwnedAuthorsToPod (podId: number) {
274 return db.VideoChannel.listOwned() 297 const authors = await db.Author.listOwned()
275 .then(videoChannels => { 298 const tasks: Promise<any>[] = []
276 const tasks = []
277 videoChannels.forEach(videoChannel => {
278 const remoteVideoChannel = videoChannel.toAddRemoteJSON()
279 const options = {
280 type: 'add-channel' as 'add-channel',
281 endpoint: REQUEST_ENDPOINTS.VIDEOS,
282 data: remoteVideoChannel,
283 toIds: [ podId ],
284 transaction: null
285 }
286 299
287 const p = createRequest(options) 300 for (const author of authors) {
288 tasks.push(p) 301 const remoteAuthor = author.toAddRemoteJSON()
289 }) 302 const options = {
303 type: 'add-author' as 'add-author',
304 endpoint: REQUEST_ENDPOINTS.VIDEOS,
305 data: remoteAuthor,
306 toIds: [ podId ],
307 transaction: null
308 }
290 309
291 return Promise.all(tasks) 310 const p = createRequest(options)
292 }) 311 tasks.push(p)
312 }
313
314 await Promise.all(tasks)
293} 315}
294 316
295function sendOwnedAuthorsToPod (podId: number) { 317async function sendOwnedVideosToPod (podId: number) {
296 return db.Author.listOwned() 318 const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
297 .then(authors => { 319 const tasks: Bluebird<any>[] = []
298 const tasks = [] 320
299 authors.forEach(author => { 321 for (const video of videosList) {
300 const remoteAuthor = author.toAddRemoteJSON() 322 const promise = video.toAddRemoteJSON()
323 .then(remoteVideo => {
301 const options = { 324 const options = {
302 type: 'add-author' as 'add-author', 325 type: 'add-video' as 'add-video',
303 endpoint: REQUEST_ENDPOINTS.VIDEOS, 326 endpoint: REQUEST_ENDPOINTS.VIDEOS,
304 data: remoteAuthor, 327 data: remoteVideo,
305 toIds: [ podId ], 328 toIds: [ podId ],
306 transaction: null 329 transaction: null
307 } 330 }
308 331 return createRequest(options)
309 const p = createRequest(options)
310 tasks.push(p)
311 }) 332 })
312 333 .catch(err => {
313 return Promise.all(tasks) 334 logger.error('Cannot convert video to remote.', err)
314 }) 335 // Don't break the process
315} 336 return undefined
316
317function sendOwnedVideosToPod (podId: number) {
318 return db.Video.listOwnedAndPopulateAuthorAndTags()
319 .then(videosList => {
320 const tasks = []
321 videosList.forEach(video => {
322 const promise = video.toAddRemoteJSON()
323 .then(remoteVideo => {
324 const options = {
325 type: 'add-video' as 'add-video',
326 endpoint: REQUEST_ENDPOINTS.VIDEOS,
327 data: remoteVideo,
328 toIds: [ podId ],
329 transaction: null
330 }
331 return createRequest(options)
332 })
333 .catch(err => {
334 logger.error('Cannot convert video to remote.', err)
335 // Don't break the process
336 return undefined
337 })
338
339 tasks.push(promise)
340 }) 337 })
341 338
342 return Promise.all(tasks) 339 tasks.push(promise)
343 }) 340 }
341
342 await Promise.all(tasks)
344} 343}
345 344
346function fetchRemotePreview (video: VideoInstance) { 345function fetchRemotePreview (video: VideoInstance) {
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) {
350 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) 349 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
351} 350}
352 351
353function removeFriend (pod: PodInstance) { 352async function removeFriend (pod: PodInstance) {
354 const requestParams = { 353 const requestParams = {
355 method: 'POST' as 'POST', 354 method: 'POST' as 'POST',
356 path: '/api/' + API_VERSION + '/remote/pods/remove', 355 path: '/api/' + API_VERSION + '/remote/pods/remove',
357 toPod: pod 356 toPod: pod
358 } 357 }
359 358
360 return makeSecureRequest(requestParams) 359 try {
361 .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) 360 await makeSecureRequest(requestParams)
362 .then(() => pod.destroy()) 361 } catch (err) {
363 .then(() => logger.info('Removed friend %s.', pod.host)) 362 logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
364 .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) 363 }
364
365 try {
366 await pod.destroy()
367
368 logger.info('Removed friend %s.', pod.host)
369 } catch (err) {
370 logger.error('Cannot destroy friend %s.', pod.host, err)
371 }
365} 372}
366 373
367function getRequestScheduler () { 374function getRequestScheduler () {
@@ -406,23 +413,21 @@ export {
406 413
407// --------------------------------------------------------------------------- 414// ---------------------------------------------------------------------------
408 415
409function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { 416async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
410 // TODO: type res 417 const result = await getForeignPodsList(host)
411 return getForeignPodsList(host).then(res => { 418 const foreignPodsList: { host: string }[] = result.data
412 const foreignPodsList: { host: string }[] = res.data
413 419
414 // Let's give 1 point to the pod we ask the friends list 420 // Let's give 1 point to the pod we ask the friends list
415 foreignPodsList.push({ host }) 421 foreignPodsList.push({ host })
416 422
417 foreignPodsList.forEach(foreignPod => { 423 for (const foreignPod of foreignPodsList) {
418 const foreignPodHost = foreignPod.host 424 const foreignPodHost = foreignPod.host
419 425
420 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ 426 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
421 else podsScore[foreignPodHost] = 1 427 else podsScore[foreignPodHost] = 1
422 }) 428 }
423 429
424 return undefined 430 return undefined
425 })
426} 431}
427 432
428function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { 433function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
431 const podsList = [] 436 const podsList = []
432 const baseScore = hosts.length / 2 437 const baseScore = hosts.length / 2
433 438
434 Object.keys(podsScore).forEach(podHost => { 439 for (const podHost of Object.keys(podsScore)) {
435 // If the pod is not me and with a good score we add it 440 // If the pod is not me and with a good score we add it
436 if (isMe(podHost) === false && podsScore[podHost] > baseScore) { 441 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
437 podsList.push({ host: podHost }) 442 podsList.push({ host: podHost })
438 } 443 }
439 }) 444 }
440 445
441 return podsList 446 return podsList
442} 447}
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) {
449 if (err) return rej(err) 454 if (err) return rej(err)
450 455
451 try { 456 try {
452 const json = JSON.parse(body) 457 const json: ResultList<FormattedPod> = JSON.parse(body)
453 return res(json) 458 return res(json)
454 } catch (err) { 459 } catch (err) {
455 return rej(err) 460 return rej(err)
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) {
458 }) 463 })
459} 464}
460 465
461function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { 466async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
462 // Stop pool requests 467 // Stop pool requests
463 requestScheduler.deactivate() 468 requestScheduler.deactivate()
464 // Flush pool requests 469 // Flush pool requests
465 requestScheduler.forceSend() 470 requestScheduler.forceSend()
466 471
467 return Promise.map(podsList, pod => { 472 try {
468 const params = { 473 await Bluebird.map(podsList, async pod => {
469 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', 474 const params = {
470 method: 'POST' as 'POST', 475 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
471 json: { 476 method: 'POST' as 'POST',
472 host: CONFIG.WEBSERVER.HOST, 477 json: {
473 email: CONFIG.ADMIN.EMAIL, 478 host: CONFIG.WEBSERVER.HOST,
474 publicKey: cert 479 email: CONFIG.ADMIN.EMAIL,
480 publicKey: cert
481 }
475 } 482 }
476 }
477 483
478 return makeRetryRequest(params) 484 const { response, body } = await makeRetryRequest(params)
479 .then(({ response, body }) => { 485 const typedBody = body as { cert: string, email: string }
480 body = body as { cert: string, email: string } 486
481 487 if (response.statusCode === 200) {
482 if (response.statusCode === 200) { 488 const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
483 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) 489
484 return podObj.save() 490 let podCreated: PodInstance
485 .then(podCreated => { 491 try {
486 492 podCreated = await podObj.save()
487 // Add our videos to the request scheduler 493 } catch (err) {
488 sendOwnedDataToPod(podCreated.id) 494 logger.error('Cannot add friend %s pod.', pod.host, err)
489 })
490 .catch(err => {
491 logger.error('Cannot add friend %s pod.', pod.host, err)
492 })
493 } else {
494 logger.error('Status not 200 for %s pod.', pod.host)
495 } 495 }
496 }) 496
497 .catch(err => { 497 // Add our videos to the request scheduler
498 logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) 498 sendOwnedDataToPod(podCreated.id)
499 // Don't break the process 499 .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
500 }) 500 } else {
501 }, { concurrency: REQUESTS_IN_PARALLEL }) 501 logger.error('Status not 200 for %s pod.', pod.host)
502 .then(() => logger.debug('makeRequestsToWinningPods finished.')) 502 }
503 .finally(() => { 503 }, { concurrency: REQUESTS_IN_PARALLEL })
504
505 logger.debug('makeRequestsToWinningPods finished.')
506
507 requestScheduler.activate()
508 } catch (err) {
504 // Final callback, we've ended all the requests 509 // Final callback, we've ended all the requests
505 // Now we made new friends, we can re activate the pool of requests 510 // Now we made new friends, we can re activate the pool of requests
506 requestScheduler.activate() 511 requestScheduler.activate()
507 }) 512 }
508} 513}
509 514
510// Wrapper that populate "toIds" argument with all our friends if it is not specified 515// Wrapper that populate "toIds" argument with all our friends if it is not specified
@@ -515,14 +520,19 @@ type CreateRequestOptions = {
515 toIds?: number[] 520 toIds?: number[]
516 transaction: Sequelize.Transaction 521 transaction: Sequelize.Transaction
517} 522}
518function createRequest (options: CreateRequestOptions) { 523async function createRequest (options: CreateRequestOptions) {
519 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) 524 if (options.toIds !== undefined) {
525 await requestScheduler.createRequest(options as RequestSchedulerOptions)
526 return undefined
527 }
520 528
521 // If the "toIds" pods is not specified, we send the request to all our friends 529 // If the "toIds" pods is not specified, we send the request to all our friends
522 return db.Pod.listAllIds(options.transaction).then(podIds => { 530 const podIds = await db.Pod.listAllIds(options.transaction)
523 const newOptions = Object.assign(options, { toIds: podIds }) 531
524 return requestScheduler.createRequest(newOptions) 532 const newOptions = Object.assign(options, { toIds: podIds })
525 }) 533 await requestScheduler.createRequest(newOptions)
534
535 return undefined
526} 536}
527 537
528function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { 538function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {
diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/handlers/video-file-optimizer.ts
index 63a51064c..799ba8b01 100644
--- a/server/lib/jobs/handlers/video-file-optimizer.ts
+++ b/server/lib/jobs/handlers/video-file-optimizer.ts
@@ -1,4 +1,4 @@
1import * as Promise from 'bluebird' 1import * as Bluebird from 'bluebird'
2 2
3import { database as db } from '../../../initializers/database' 3import { database as db } from '../../../initializers/database'
4import { logger, computeResolutionsToTranscode } from '../../../helpers' 4import { logger, computeResolutionsToTranscode } from '../../../helpers'
@@ -6,16 +6,17 @@ import { VideoInstance } from '../../../models'
6import { addVideoToFriends } from '../../friends' 6import { addVideoToFriends } from '../../friends'
7import { JobScheduler } from '../job-scheduler' 7import { JobScheduler } from '../job-scheduler'
8 8
9function process (data: { videoUUID: string }, jobId: number) { 9async function process (data: { videoUUID: string }, jobId: number) {
10 return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { 10 const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
11 // No video, maybe deleted? 11 // No video, maybe deleted?
12 if (!video) { 12 if (!video) {
13 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 13 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
14 return undefined 14 return undefined
15 } 15 }
16
17 await video.optimizeOriginalVideofile()
16 18
17 return video.optimizeOriginalVideofile().then(() => video) 19 return video
18 })
19} 20}
20 21
21function onError (err: Error, jobId: number) { 22function onError (err: Error, jobId: number) {
@@ -23,33 +24,31 @@ function onError (err: Error, jobId: number) {
23 return Promise.resolve() 24 return Promise.resolve()
24} 25}
25 26
26function onSuccess (jobId: number, video: VideoInstance) { 27async function onSuccess (jobId: number, video: VideoInstance) {
27 if (video === undefined) return undefined 28 if (video === undefined) return undefined
28 29
29 logger.info('Job %d is a success.', jobId) 30 logger.info('Job %d is a success.', jobId)
30 31
31 video.toAddRemoteJSON() 32 const remoteVideo = await video.toAddRemoteJSON()
32 .then(remoteVideo => { 33
33 // Now we'll add the video's meta data to our friends 34 // Now we'll add the video's meta data to our friends
34 return addVideoToFriends(remoteVideo, null) 35 await addVideoToFriends(remoteVideo, null)
35 }) 36
36 .then(() => { 37 const originalFileHeight = await video.getOriginalFileHeight()
37 return video.getOriginalFileHeight() 38 // Create transcoding jobs if there are enabled resolutions
38 }) 39
39 .then(originalFileHeight => { 40 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
40 // Create transcoding jobs if there are enabled resolutions 41 logger.info(
41 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) 42 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight,
42 logger.info( 43 { resolutions: resolutionsEnabled }
43 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, 44 )
44 { resolutions: resolutionsEnabled } 45
45 ) 46 if (resolutionsEnabled.length !== 0) {
46 47 try {
47 if (resolutionsEnabled.length === 0) return undefined 48 await db.sequelize.transaction(async t => {
48 49 const tasks: Bluebird<any>[] = []
49 return db.sequelize.transaction(t => { 50
50 const tasks: Promise<any>[] = [] 51 for (const resolution of resolutionsEnabled) {
51
52 resolutionsEnabled.forEach(resolution => {
53 const dataInput = { 52 const dataInput = {
54 videoUUID: video.uuid, 53 videoUUID: video.uuid,
55 resolution 54 resolution
@@ -57,24 +56,19 @@ function onSuccess (jobId: number, video: VideoInstance) {
57 56
58 const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) 57 const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput)
59 tasks.push(p) 58 tasks.push(p)
60 }) 59 }
61 60
62 return Promise.all(tasks).then(() => resolutionsEnabled) 61 await Promise.all(tasks)
63 }) 62 })
64 })
65 .then(resolutionsEnabled => {
66 if (resolutionsEnabled === undefined) {
67 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
68 return undefined
69 }
70 63
71 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) 64 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
72 }) 65 } catch (err) {
73 .catch((err: Error) => { 66 logger.warn('Cannot transcode the video.', err)
74 logger.debug('Cannot transcode the video.', err) 67 }
75 throw err 68 } else {
76 }) 69 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
77 70 return undefined
71 }
78} 72}
79 73
80// --------------------------------------------------------------------------- 74// ---------------------------------------------------------------------------
diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/handlers/video-file-transcoder.ts
index 0dafee566..b240ff58a 100644
--- a/server/lib/jobs/handlers/video-file-transcoder.ts
+++ b/server/lib/jobs/handlers/video-file-transcoder.ts
@@ -4,16 +4,17 @@ import { logger } from '../../../helpers'
4import { VideoInstance } from '../../../models' 4import { VideoInstance } from '../../../models'
5import { VideoResolution } from '../../../../shared' 5import { VideoResolution } from '../../../../shared'
6 6
7function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { 7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { 8 const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
9 // No video, maybe deleted? 9 // No video, maybe deleted?
10 if (!video) { 10 if (!video) {
11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
12 return undefined 12 return undefined
13 } 13 }
14 14
15 return video.transcodeOriginalVideofile(data.resolution).then(() => video) 15 await video.transcodeOriginalVideofile(data.resolution)
16 }) 16
17 return video
17} 18}
18 19
19function onError (err: Error, jobId: number) { 20function onError (err: Error, jobId: number) {
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index c2409d20c..61d483268 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -23,7 +23,7 @@ class JobScheduler {
23 return this.instance || (this.instance = new this()) 23 return this.instance || (this.instance = new this())
24 } 24 }
25 25
26 activate () { 26 async activate () {
27 const limit = JOBS_FETCH_LIMIT_PER_CYCLE 27 const limit = JOBS_FETCH_LIMIT_PER_CYCLE
28 28
29 logger.info('Jobs scheduler activated.') 29 logger.info('Jobs scheduler activated.')
@@ -32,32 +32,36 @@ class JobScheduler {
32 32
33 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
35 db.Job.listWithLimit(limit, state) 35 try {
36 .then(jobs => { 36 const jobs = await db.Job.listWithLimit(limit, state)
37 this.enqueueJobs(jobsQueue, jobs) 37
38 38 this.enqueueJobs(jobsQueue, jobs)
39 forever( 39 } catch (err) {
40 next => { 40 logger.error('Cannot list pending jobs.', err)
41 if (jobsQueue.length() !== 0) { 41 }
42 // Finish processing the queue first 42
43 return setTimeout(next, JOBS_FETCHING_INTERVAL) 43 forever(
44 } 44 async next => {
45 45 if (jobsQueue.length() !== 0) {
46 const state = JOB_STATES.PENDING 46 // Finish processing the queue first
47 db.Job.listWithLimit(limit, state) 47 return setTimeout(next, JOBS_FETCHING_INTERVAL)
48 .then(jobs => { 48 }
49 this.enqueueJobs(jobsQueue, jobs) 49
50 50 const state = JOB_STATES.PENDING
51 // Optimization: we could use "drain" from queue object 51 try {
52 return setTimeout(next, JOBS_FETCHING_INTERVAL) 52 const jobs = await db.Job.listWithLimit(limit, state)
53 }) 53
54 .catch(err => logger.error('Cannot list pending jobs.', err)) 54 this.enqueueJobs(jobsQueue, jobs)
55 }, 55 } catch (err) {
56 56 logger.error('Cannot list pending jobs.', err)
57 err => logger.error('Error in job scheduler queue.', err) 57 }
58 ) 58
59 }) 59 // Optimization: we could use "drain" from queue object
60 .catch(err => logger.error('Cannot list pending jobs.', err)) 60 return setTimeout(next, JOBS_FETCHING_INTERVAL)
61 },
62
63 err => logger.error('Error in job scheduler queue.', err)
64 )
61 } 65 }
62 66
63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { 67 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
@@ -75,7 +79,7 @@ class JobScheduler {
75 jobs.forEach(job => jobsQueue.push(job)) 79 jobs.forEach(job => jobsQueue.push(job))
76 } 80 }
77 81
78 private processJob (job: JobInstance, callback: (err: Error) => void) { 82 private async processJob (job: JobInstance, callback: (err: Error) => void) {
79 const jobHandler = jobHandlers[job.handlerName] 83 const jobHandler = jobHandlers[job.handlerName]
80 if (jobHandler === undefined) { 84 if (jobHandler === undefined) {
81 logger.error('Unknown job handler for job %s.', job.handlerName) 85 logger.error('Unknown job handler for job %s.', job.handlerName)
@@ -85,41 +89,45 @@ class JobScheduler {
85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
86 90
87 job.state = JOB_STATES.PROCESSING 91 job.state = JOB_STATES.PROCESSING
88 return job.save() 92 await job.save()
89 .then(() => { 93
90 return jobHandler.process(job.handlerInputData, job.id) 94 try {
91 }) 95 const result = await jobHandler.process(job.handlerInputData, job.id)
92 .then( 96 await this.onJobSuccess(jobHandler, job, result)
93 result => { 97 } catch (err) {
94 return this.onJobSuccess(jobHandler, job, result) 98 logger.error('Error in job handler %s.', job.handlerName, err)
95 }, 99
96 100 try {
97 err => { 101 await this.onJobError(jobHandler, job, err)
98 logger.error('Error in job handler %s.', job.handlerName, err) 102 } catch (innerErr) {
99 return this.onJobError(jobHandler, job, err) 103 this.cannotSaveJobError(innerErr)
100 } 104 return callback(innerErr)
101 ) 105 }
102 .then(() => callback(null)) 106 }
103 .catch(err => { 107
104 this.cannotSaveJobError(err) 108 callback(null)
105 return callback(err)
106 })
107 } 109 }
108 110
109 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { 111 private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
110 job.state = JOB_STATES.ERROR 112 job.state = JOB_STATES.ERROR
111 113
112 return job.save() 114 try {
113 .then(() => jobHandler.onError(err, job.id)) 115 await job.save()
114 .catch(err => this.cannotSaveJobError(err)) 116 await jobHandler.onError(err, job.id)
117 } catch (err) {
118 this.cannotSaveJobError(err)
119 }
115 } 120 }
116 121
117 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { 122 private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
118 job.state = JOB_STATES.SUCCESS 123 job.state = JOB_STATES.SUCCESS
119 124
120 return job.save() 125 try {
121 .then(() => jobHandler.onSuccess(job.id, jobResult)) 126 await job.save()
122 .catch(err => this.cannotSaveJobError(err)) 127 jobHandler.onSuccess(job.id, jobResult)
128 } catch (err) {
129 this.cannotSaveJobError(err)
130 }
123 } 131 }
124 132
125 private cannotSaveJobError (err: Error) { 133 private cannotSaveJobError (err: Error) {
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts
index 9aa3ea52f..d91b00c55 100644
--- a/server/lib/oauth-model.ts
+++ b/server/lib/oauth-model.ts
@@ -24,39 +24,36 @@ function getRefreshToken (refreshToken: string) {
24 return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) 24 return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken)
25} 25}
26 26
27function getUser (username: string, password: string) { 27async function getUser (username: string, password: string) {
28 logger.debug('Getting User (username: ' + username + ', password: ******).') 28 logger.debug('Getting User (username: ' + username + ', password: ******).')
29 29
30 return db.User.getByUsername(username).then(user => { 30 const user = await db.User.getByUsername(username)
31 if (!user) return null 31 if (!user) return null
32 32
33 return user.isPasswordMatch(password).then(passwordMatch => { 33 const passwordMatch = await user.isPasswordMatch(password)
34 if (passwordMatch === false) return null 34 if (passwordMatch === false) return null
35 35
36 return user 36 return user
37 })
38 })
39} 37}
40 38
41function revokeToken (token: TokenInfo) { 39async function revokeToken (tokenInfo: TokenInfo) {
42 return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => { 40 const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken)
43 if (tokenDB) tokenDB.destroy() 41 if (token) token.destroy()
44 42
45 /* 43 /*
46 * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js 44 * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js
47 * "As per the discussion we need set older date 45 * "As per the discussion we need set older date
48 * revokeToken will expected return a boolean in future version 46 * revokeToken will expected return a boolean in future version
49 * https://github.com/oauthjs/node-oauth2-server/pull/274 47 * https://github.com/oauthjs/node-oauth2-server/pull/274
50 * https://github.com/oauthjs/node-oauth2-server/issues/290" 48 * https://github.com/oauthjs/node-oauth2-server/issues/290"
51 */ 49 */
52 const expiredToken = tokenDB 50 const expiredToken = token
53 expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') 51 expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z')
54 52
55 return expiredToken 53 return expiredToken
56 })
57} 54}
58 55
59function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { 56async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) {
60 logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') 57 logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.')
61 58
62 const tokenToCreate = { 59 const tokenToCreate = {
@@ -68,11 +65,10 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns
68 userId: user.id 65 userId: user.id
69 } 66 }
70 67
71 return db.OAuthToken.create(tokenToCreate).then(tokenCreated => { 68 const tokenCreated = await db.OAuthToken.create(tokenToCreate)
72 const tokenToReturn = Object.assign(tokenCreated, { client, user }) 69 const tokenToReturn = Object.assign(tokenCreated, { client, user })
73 70
74 return tokenToReturn 71 return tokenToReturn
75 })
76} 72}
77 73
78// --------------------------------------------------------------------------- 74// ---------------------------------------------------------------------------
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts
index ce4e2ffd2..f46682824 100644
--- a/server/lib/request/abstract-request-scheduler.ts
+++ b/server/lib/request/abstract-request-scheduler.ts
@@ -1,5 +1,5 @@
1import { isEmpty } from 'lodash' 1import { isEmpty } from 'lodash'
2import * as Promise from 'bluebird' 2import * as Bluebird from 'bluebird'
3 3
4import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
5import { logger, makeSecureRequest } from '../../helpers' 5import { logger, makeSecureRequest } from '../../helpers'
@@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> {
76 // --------------------------------------------------------------------------- 76 // ---------------------------------------------------------------------------
77 77
78 // Make a requests to friends of a certain type 78 // Make a requests to friends of a certain type
79 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { 79 protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
80 const params = { 80 const params = {
81 toPod: toPod, 81 toPod: toPod,
82 method: 'POST' as 'POST', 82 method: 'POST' as 'POST',
@@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> {
86 86
87 // Make multiple retry requests to all of pods 87 // Make multiple retry requests to all of pods
88 // The function fire some useful callbacks 88 // The function fire some useful callbacks
89 return makeSecureRequest(params) 89 try {
90 .then(({ response, body }) => { 90 const { response } = await makeSecureRequest(params)
91 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { 91 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
92 throw new Error('Status code not 20x : ' + response.statusCode) 92 throw new Error('Status code not 20x : ' + response.statusCode)
93 } 93 }
94 }) 94 } catch (err) {
95 .catch(err => { 95 logger.error('Error sending secure request to %s pod.', toPod.host, err)
96 logger.error('Error sending secure request to %s pod.', toPod.host, err) 96
97 97 throw err
98 throw err 98 }
99 })
100 } 99 }
101 100
102 // Make all the requests of the scheduler 101 // Make all the requests of the scheduler
103 protected makeRequests () { 102 protected async makeRequests () {
104 return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) 103 let requestsGrouped: T
105 .then((requestsGrouped: T) => { 104
106 // We want to group requests by destinations pod and endpoint 105 try {
107 const requestsToMake = this.buildRequestsObjects(requestsGrouped) 106 requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
108 107 } catch (err) {
109 // If there are no requests, abort 108 logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })
110 if (isEmpty(requestsToMake) === true) { 109 throw err
111 logger.info('No "%s" to make.', this.description) 110 }
112 return { goodPods: [], badPods: [] } 111
113 } 112 // We want to group requests by destinations pod and endpoint
114 113 const requestsToMake = this.buildRequestsObjects(requestsGrouped)
115 logger.info('Making "%s" to friends.', this.description) 114
116 115 // If there are no requests, abort
117 const goodPods: number[] = [] 116 if (isEmpty(requestsToMake) === true) {
118 const badPods: number[] = [] 117 logger.info('No "%s" to make.', this.description)
119 118 return { goodPods: [], badPods: [] }
120 return Promise.map(Object.keys(requestsToMake), hashKey => { 119 }
121 const requestToMake = requestsToMake[hashKey] 120
122 const toPod: PodInstance = requestToMake.toPod 121 logger.info('Making "%s" to friends.', this.description)
123 122
124 return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) 123 const goodPods: number[] = []
125 .then(() => { 124 const badPods: number[] = []
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) 125
127 goodPods.push(requestToMake.toPod.id) 126 await Bluebird.map(Object.keys(requestsToMake), async hashKey => {
128 127 const requestToMake = requestsToMake[hashKey]
129 this.afterRequestHook() 128 const toPod: PodInstance = requestToMake.toPod
130 129
131 // Remove the pod id of these request ids 130 try {
132 return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) 131 await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
133 }) 132 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
134 .catch(err => { 133 goodPods.push(requestToMake.toPod.id)
135 badPods.push(requestToMake.toPod.id) 134
136 logger.info('Cannot make request to %s.', toPod.host, err) 135 this.afterRequestHook()
137 }) 136
138 }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) 137 // Remove the pod id of these request ids
139 }) 138 await this.getRequestToPodModel()
140 .then(({ goodPods, badPods }) => { 139 .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
141 this.afterRequestsHook() 140 } catch (err) {
142 141 badPods.push(requestToMake.toPod.id)
143 // All the requests were made, we update the pods score 142 logger.info('Cannot make request to %s.', toPod.host, err)
144 return db.Pod.updatePodsScore(goodPods, badPods) 143 }
145 }) 144 }, { concurrency: REQUESTS_IN_PARALLEL })
146 .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) 145
146 this.afterRequestsHook()
147
148 // All the requests were made, we update the pods score
149 await db.Pod.updatePodsScore(goodPods, badPods)
147 } 150 }
148 151
149 protected afterRequestHook () { 152 protected afterRequestHook () {
150 // Nothing to do, let children reimplement it 153 // Nothing to do, let children re-implement it
151 } 154 }
152 155
153 protected afterRequestsHook () { 156 protected afterRequestsHook () {
154 // Nothing to do, let children reimplement it 157 // Nothing to do, let children re-implement it
155 } 158 }
156} 159}
157 160
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts
index 696875dcf..c3f7f6429 100644
--- a/server/lib/request/request-scheduler.ts
+++ b/server/lib/request/request-scheduler.ts
@@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
37 buildRequestsObjects (requestsGrouped: RequestsGrouped) { 37 buildRequestsObjects (requestsGrouped: RequestsGrouped) {
38 const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} 38 const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {}
39 39
40 Object.keys(requestsGrouped).forEach(toPodId => { 40 for (const toPodId of Object.keys(requestsGrouped)) {
41 requestsGrouped[toPodId].forEach(data => { 41 for (const data of requestsGrouped[toPodId]) {
42 const request = data.request 42 const request = data.request
43 const pod = data.pod 43 const pod = data.pod
44 const hashKey = toPodId + request.endpoint 44 const hashKey = toPodId + request.endpoint
@@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
54 54
55 requestsToMakeGrouped[hashKey].ids.push(request.id) 55 requestsToMakeGrouped[hashKey].ids.push(request.id)
56 requestsToMakeGrouped[hashKey].datas.push(request.request) 56 requestsToMakeGrouped[hashKey].datas.push(request.request)
57 }) 57 }
58 }) 58 }
59 59
60 return requestsToMakeGrouped 60 return requestsToMakeGrouped
61 } 61 }
62 62
63 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { 63 async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
64 // If there are no destination pods abort 64 // If there are no destination pods abort
65 if (toIds.length === 0) return undefined 65 if (toIds.length === 0) return undefined
66 66
@@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
76 transaction 76 transaction
77 } 77 }
78 78
79 return db.Request.create(createQuery, dbRequestOptions) 79 const request = await db.Request.create(createQuery, dbRequestOptions)
80 .then(request => { 80 await request.setPods(toIds, dbRequestOptions)
81 return request.setPods(toIds, dbRequestOptions)
82 })
83 } 81 }
84 82
85 // --------------------------------------------------------------------------- 83 // ---------------------------------------------------------------------------
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts
index 680232732..5f21287f0 100644
--- a/server/lib/request/request-video-event-scheduler.ts
+++ b/server/lib/request/request-video-event-scheduler.ts
@@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
59 59
60 // We group video events per video and per pod 60 // We group video events per video and per pod
61 // We add the counts of the same event types 61 // We add the counts of the same event types
62 Object.keys(eventRequests).forEach(toPodId => { 62 for (const toPodId of Object.keys(eventRequests)) {
63 eventRequests[toPodId].forEach(eventToProcess => { 63 for (const eventToProcess of eventRequests[toPodId]) {
64 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} 64 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
65 65
66 if (!requestsToMakeGrouped[toPodId]) { 66 if (!requestsToMakeGrouped[toPodId]) {
@@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
81 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 81 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0
82 82
83 events[eventToProcess.type] += eventToProcess.count 83 events[eventToProcess.type] += eventToProcess.count
84 }) 84 }
85 }) 85 }
86 86
87 // Now we build our requests array per pod 87 // Now we build our requests array per pod
88 Object.keys(eventsPerVideoPerPod).forEach(toPodId => { 88 for (const toPodId of Object.keys(eventsPerVideoPerPod)) {
89 const eventsForPod = eventsPerVideoPerPod[toPodId] 89 const eventsForPod = eventsPerVideoPerPod[toPodId]
90 90
91 Object.keys(eventsForPod).forEach(uuid => { 91 for (const uuid of Object.keys(eventsForPod)) {
92 const eventsForVideo = eventsForPod[uuid] 92 const eventsForVideo = eventsForPod[uuid]
93 93
94 Object.keys(eventsForVideo).forEach(eventType => { 94 for (const eventType of Object.keys(eventsForVideo)) {
95 requestsToMakeGrouped[toPodId].datas.push({ 95 requestsToMakeGrouped[toPodId].datas.push({
96 data: { 96 data: {
97 uuid, 97 uuid,
@@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
99 count: +eventsForVideo[eventType] 99 count: +eventsForVideo[eventType]
100 } 100 }
101 }) 101 })
102 }) 102 }
103 }) 103 }
104 }) 104 }
105 105
106 return requestsToMakeGrouped 106 return requestsToMakeGrouped
107 } 107 }
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts
index d7e1d5e31..a54efc111 100644
--- a/server/lib/request/request-video-qadu-scheduler.ts
+++ b/server/lib/request/request-video-qadu-scheduler.ts
@@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
59 buildRequestsObjects (requests: RequestsVideoQaduGrouped) { 59 buildRequestsObjects (requests: RequestsVideoQaduGrouped) {
60 const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} 60 const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {}
61 61
62 Object.keys(requests).forEach(toPodId => { 62 for (const toPodId of Object.keys(requests)) {
63 requests[toPodId].forEach(data => { 63 for (const data of requests[toPodId]) {
64 const request = data.request 64 const request = data.request
65 const video = data.video 65 const video = data.video
66 const pod = data.pod 66 const pod = data.pod
@@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
105 // Maybe there are multiple quick and dirty update for the same video 105 // Maybe there are multiple quick and dirty update for the same video
106 // We use this hash map to dedupe them 106 // We use this hash map to dedupe them
107 requestsToMakeGrouped[hashKey].videos[video.id] = videoData 107 requestsToMakeGrouped[hashKey].videos[video.id] = videoData
108 }) 108 }
109 }) 109 }
110 110
111 // Now we deduped similar quick and dirty updates, we can build our requests data 111 // Now we deduped similar quick and dirty updates, we can build our requests data
112 Object.keys(requestsToMakeGrouped).forEach(hashKey => { 112 for (const hashKey of Object.keys(requestsToMakeGrouped)) {
113 Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { 113 for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) {
114 const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] 114 const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID]
115 115
116 requestsToMakeGrouped[hashKey].datas.push({ 116 requestsToMakeGrouped[hashKey].datas.push({
117 data: videoData 117 data: videoData
118 }) 118 })
119 }) 119 }
120 120
121 // We don't need it anymore, it was just to build our data array 121 // We don't need it anymore, it was just to build our data array
122 delete requestsToMakeGrouped[hashKey].videos 122 delete requestsToMakeGrouped[hashKey].videos
123 }) 123 }
124 124
125 return requestsToMakeGrouped 125 return requestsToMakeGrouped
126 } 126 }
127 127
128 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { 128 async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
129 const dbRequestOptions: Sequelize.BulkCreateOptions = {} 129 const dbRequestOptions: Sequelize.BulkCreateOptions = {}
130 if (transaction) dbRequestOptions.transaction = transaction 130 if (transaction) dbRequestOptions.transaction = transaction
131 131
132 // Send the update to all our friends 132 // Send the update to all our friends
133 return db.Pod.listAllIds(transaction).then(podIds => { 133 const podIds = await db.Pod.listAllIds(transaction)
134 const queries = [] 134 const queries = []
135 podIds.forEach(podId => { 135 for (const podId of podIds) {
136 queries.push({ type, videoId, podId }) 136 queries.push({ type, videoId, podId })
137 }) 137 }
138 138
139 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) 139 await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
140 }) 140 return undefined
141 } 141 }
142} 142}
143 143
diff --git a/server/lib/user.ts b/server/lib/user.ts
index 8609e72d8..a92f4777b 100644
--- a/server/lib/user.ts
+++ b/server/lib/user.ts
@@ -3,40 +3,36 @@ import { UserInstance } from '../models'
3import { addVideoAuthorToFriends } from './friends' 3import { addVideoAuthorToFriends } from './friends'
4import { createVideoChannel } from './video-channel' 4import { createVideoChannel } from './video-channel'
5 5
6function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { 6async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) {
7 return db.sequelize.transaction(t => { 7 const res = await db.sequelize.transaction(async t => {
8 const userOptions = { 8 const userOptions = {
9 transaction: t, 9 transaction: t,
10 validate: validateUser 10 validate: validateUser
11 } 11 }
12 12
13 return user.save(userOptions) 13 const userCreated = await user.save(userOptions)
14 .then(user => { 14 const authorInstance = db.Author.build({
15 const author = db.Author.build({ 15 name: userCreated.username,
16 name: user.username, 16 podId: null, // It is our pod
17 podId: null, // It is our pod 17 userId: userCreated.id
18 userId: user.id 18 })
19 }) 19
20 20 const authorCreated = await authorInstance.save({ transaction: t })
21 return author.save({ transaction: t }) 21
22 .then(author => ({ author, user })) 22 const remoteVideoAuthor = authorCreated.toAddRemoteJSON()
23 }) 23
24 .then(({ author, user }) => { 24 // Now we'll add the video channel's meta data to our friends
25 const remoteVideoAuthor = author.toAddRemoteJSON() 25 const author = await addVideoAuthorToFriends(remoteVideoAuthor, t)
26 26
27 // Now we'll add the video channel's meta data to our friends 27 const videoChannelInfo = {
28 return addVideoAuthorToFriends(remoteVideoAuthor, t) 28 name: `Default ${userCreated.username} channel`
29 .then(() => ({ author, user })) 29 }
30 }) 30 const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t)
31 .then(({ author, user }) => { 31
32 const videoChannelInfo = { 32 return { author, videoChannel }
33 name: `Default ${user.username} channel`
34 }
35
36 return createVideoChannel(videoChannelInfo, author, t)
37 .then(videoChannel => ({ author, user, videoChannel }))
38 })
39 }) 33 })
34
35 return res
40} 36}
41 37
42// --------------------------------------------------------------------------- 38// ---------------------------------------------------------------------------