aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-10-25 16:03:33 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-10-26 09:11:38 +0200
commitf5028693a896a3076dd286ac0030e3d8f78f5ebf (patch)
tree09144ed6357e49ea575fb110247f933283ad235e /server
parenteb08047657e739bcd9e592d76307befa3998482b (diff)
downloadPeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.gz
PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.zst
PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.zip
Use async/await in lib and initializers
Diffstat (limited to 'server')
-rw-r--r--server/helpers/peertube-crypto.ts103
-rw-r--r--server/helpers/requests.ts2
-rw-r--r--server/helpers/utils.ts20
-rw-r--r--server/initializers/checker.ts46
-rw-r--r--server/initializers/database.ts90
-rw-r--r--server/initializers/index.ts2
-rw-r--r--server/initializers/installer.ts141
-rw-r--r--server/initializers/migrator.ts107
-rw-r--r--server/lib/cache/videos-preview-cache.ts15
-rw-r--r--server/lib/friends.ts368
-rw-r--r--server/lib/jobs/handlers/video-file-optimizer.ts88
-rw-r--r--server/lib/jobs/handlers/video-file-transcoder.ts21
-rw-r--r--server/lib/jobs/job-scheduler.ts118
-rw-r--r--server/lib/oauth-model.ts54
-rw-r--r--server/lib/request/abstract-request-scheduler.ts121
-rw-r--r--server/lib/request/request-scheduler.ts16
-rw-r--r--server/lib/request/request-video-event-scheduler.ts20
-rw-r--r--server/lib/request/request-video-qadu-scheduler.ts34
-rw-r--r--server/lib/user.ts52
-rw-r--r--server/middlewares/secure.ts59
-rw-r--r--server/models/pod/pod.ts33
21 files changed, 740 insertions, 770 deletions
diff --git a/server/helpers/peertube-crypto.ts b/server/helpers/peertube-crypto.ts
index 89aef99c4..47f0243e7 100644
--- a/server/helpers/peertube-crypto.ts
+++ b/server/helpers/peertube-crypto.ts
@@ -1,5 +1,4 @@
1import * as crypto from 'crypto' 1import * as crypto from 'crypto'
2import * as Promise from 'bluebird'
3import { join } from 'path' 2import { join } from 'path'
4 3
5import { 4import {
@@ -41,7 +40,7 @@ function checkSignature (publicKey: string, data: string, hexSignature: string)
41 return isValid 40 return isValid
42} 41}
43 42
44function sign (data: string|Object) { 43async function sign (data: string|Object) {
45 const sign = crypto.createSign(SIGNATURE_ALGORITHM) 44 const sign = crypto.createSign(SIGNATURE_ALGORITHM)
46 45
47 let dataString: string 46 let dataString: string
@@ -52,33 +51,33 @@ function sign (data: string|Object) {
52 dataString = JSON.stringify(data) 51 dataString = JSON.stringify(data)
53 } catch (err) { 52 } catch (err) {
54 logger.error('Cannot sign data.', err) 53 logger.error('Cannot sign data.', err)
55 return Promise.resolve('') 54 return ''
56 } 55 }
57 } 56 }
58 57
59 sign.update(dataString, 'utf8') 58 sign.update(dataString, 'utf8')
60 59
61 return getMyPrivateCert().then(myKey => { 60 const myKey = await getMyPrivateCert()
62 return sign.sign(myKey, SIGNATURE_ENCODING) 61 return await sign.sign(myKey, SIGNATURE_ENCODING)
63 })
64} 62}
65 63
66function comparePassword (plainPassword: string, hashPassword: string) { 64function comparePassword (plainPassword: string, hashPassword: string) {
67 return bcryptComparePromise(plainPassword, hashPassword) 65 return bcryptComparePromise(plainPassword, hashPassword)
68} 66}
69 67
70function createCertsIfNotExist () { 68async function createCertsIfNotExist () {
71 return certsExist().then(exist => { 69 const exist = await certsExist()
72 if (exist === true) { 70 if (exist === true) {
73 return undefined 71 return undefined
74 } 72 }
75 73
76 return createCerts() 74 return await createCerts()
77 })
78} 75}
79 76
80function cryptPassword (password: string) { 77async function cryptPassword (password: string) {
81 return bcryptGenSaltPromise(BCRYPT_SALT_SIZE).then(salt => bcryptHashPromise(password, salt)) 78 const salt = await bcryptGenSaltPromise(BCRYPT_SALT_SIZE)
79
80 return await bcryptHashPromise(password, salt)
82} 81}
83 82
84function getMyPrivateCert () { 83function getMyPrivateCert () {
@@ -105,51 +104,45 @@ export {
105 104
106// --------------------------------------------------------------------------- 105// ---------------------------------------------------------------------------
107 106
108function certsExist () { 107async function certsExist () {
109 const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) 108 const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
110 109
111 // If there is an error the certificates do not exist 110 // If there is an error the certificates do not exist
112 return accessPromise(certPath) 111 try {
113 .then(() => true) 112 await accessPromise(certPath)
114 .catch(() => false) 113
114 return true
115 } catch {
116 return false
117 }
115} 118}
116 119
117function createCerts () { 120async function createCerts () {
118 return certsExist().then(exist => { 121 const exist = await certsExist()
119 if (exist === true) { 122 if (exist === true) {
120 const errorMessage = 'Certs already exist.' 123 const errorMessage = 'Certs already exist.'
121 logger.warning(errorMessage) 124 logger.warning(errorMessage)
122 throw new Error(errorMessage) 125 throw new Error(errorMessage)
123 } 126 }
124 127
125 logger.info('Generating a RSA key...') 128 logger.info('Generating a RSA key...')
126 129
127 const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) 130 const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
128 const genRsaOptions = { 131 const genRsaOptions = {
129 'out': privateCertPath, 132 'out': privateCertPath,
130 '2048': false 133 '2048': false
131 } 134 }
132 return opensslExecPromise('genrsa', genRsaOptions) 135
133 .then(() => { 136 await opensslExecPromise('genrsa', genRsaOptions)
134 logger.info('RSA key generated.') 137 logger.info('RSA key generated.')
135 logger.info('Managing public key...') 138 logger.info('Managing public key...')
136 139
137 const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub') 140 const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub')
138 const rsaOptions = { 141 const rsaOptions = {
139 'in': privateCertPath, 142 'in': privateCertPath,
140 'pubout': true, 143 'pubout': true,
141 'out': publicCertPath 144 'out': publicCertPath
142 } 145 }
143 return opensslExecPromise('rsa', rsaOptions) 146
144 .then(() => logger.info('Public key managed.')) 147 await opensslExecPromise('rsa', rsaOptions)
145 .catch(err => {
146 logger.error('Cannot create public key on this pod.')
147 throw err
148 })
149 })
150 .catch(err => {
151 logger.error('Cannot create private key on this pod.')
152 throw err
153 })
154 })
155} 148}
diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts
index d67d46044..8c5d848f3 100644
--- a/server/helpers/requests.ts
+++ b/server/helpers/requests.ts
@@ -73,7 +73,7 @@ function makeSecureRequest (params: MakeSecureRequestParams) {
73 signature 73 signature
74 } 74 }
75 75
76 // If there are data informations 76 // If there are data information
77 if (params.data) { 77 if (params.data) {
78 requestParams.json.data = params.data 78 requestParams.json.data = params.data
79 } 79 }
diff --git a/server/helpers/utils.ts b/server/helpers/utils.ts
index 6cabe117c..8b81a61e1 100644
--- a/server/helpers/utils.ts
+++ b/server/helpers/utils.ts
@@ -8,11 +8,13 @@ import { ResultList } from '../../shared'
8import { VideoResolution } from '../../shared/models/videos/video-resolution.enum' 8import { VideoResolution } from '../../shared/models/videos/video-resolution.enum'
9 9
10function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) { 10function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) {
11 res.type('json').status(400).end() 11 return res.type('json').status(400).end()
12} 12}
13 13
14function generateRandomString (size: number) { 14async function generateRandomString (size: number) {
15 return pseudoRandomBytesPromise(size).then(raw => raw.toString('hex')) 15 const raw = await pseudoRandomBytesPromise(size)
16
17 return raw.toString('hex')
16} 18}
17 19
18interface FormattableToJSON { 20interface FormattableToJSON {
@@ -34,19 +36,19 @@ function getFormattedObjects<U, T extends FormattableToJSON> (objects: T[], obje
34 return res 36 return res
35} 37}
36 38
37function isSignupAllowed () { 39async function isSignupAllowed () {
38 if (CONFIG.SIGNUP.ENABLED === false) { 40 if (CONFIG.SIGNUP.ENABLED === false) {
39 return Promise.resolve(false) 41 return false
40 } 42 }
41 43
42 // No limit and signup is enabled 44 // No limit and signup is enabled
43 if (CONFIG.SIGNUP.LIMIT === -1) { 45 if (CONFIG.SIGNUP.LIMIT === -1) {
44 return Promise.resolve(true) 46 return true
45 } 47 }
46 48
47 return db.User.countTotal().then(totalUsers => { 49 const totalUsers = await db.User.countTotal()
48 return totalUsers < CONFIG.SIGNUP.LIMIT 50
49 }) 51 return totalUsers < CONFIG.SIGNUP.LIMIT
50} 52}
51 53
52function computeResolutionsToTranscode (videoFileHeight: number) { 54function computeResolutionsToTranscode (videoFileHeight: number) {
diff --git a/server/initializers/checker.ts b/server/initializers/checker.ts
index eb9e9e280..9eaef1695 100644
--- a/server/initializers/checker.ts
+++ b/server/initializers/checker.ts
@@ -37,39 +37,37 @@ function checkMissedConfig () {
37 37
38// Check the available codecs 38// Check the available codecs
39// We get CONFIG by param to not import it in this file (import orders) 39// We get CONFIG by param to not import it in this file (import orders)
40function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) { 40async function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) {
41 const Ffmpeg = require('fluent-ffmpeg') 41 const Ffmpeg = require('fluent-ffmpeg')
42 const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs) 42 const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs)
43 43
44 getAvailableCodecsPromise() 44 const codecs = await getAvailableCodecsPromise()
45 .then(codecs => { 45 if (CONFIG.TRANSCODING.ENABLED === false) return undefined
46 if (CONFIG.TRANSCODING.ENABLED === false) return undefined 46
47 47 const canEncode = [ 'libx264' ]
48 const canEncode = [ 'libx264' ] 48 for (const codec of canEncode) {
49 canEncode.forEach(codec => { 49 if (codecs[codec] === undefined) {
50 if (codecs[codec] === undefined) { 50 throw new Error('Unknown codec ' + codec + ' in FFmpeg.')
51 throw new Error('Unknown codec ' + codec + ' in FFmpeg.') 51 }
52 } 52
53 53 if (codecs[codec].canEncode !== true) {
54 if (codecs[codec].canEncode !== true) { 54 throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg')
55 throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg') 55 }
56 } 56 }
57 })
58 })
59} 57}
60 58
61// We get db by param to not import it in this file (import orders) 59// We get db by param to not import it in this file (import orders)
62function clientsExist (OAuthClient: OAuthClientModel) { 60async function clientsExist (OAuthClient: OAuthClientModel) {
63 return OAuthClient.countTotal().then(totalClients => { 61 const totalClients = await OAuthClient.countTotal()
64 return totalClients !== 0 62
65 }) 63 return totalClients !== 0
66} 64}
67 65
68// We get db by param to not import it in this file (import orders) 66// We get db by param to not import it in this file (import orders)
69function usersExist (User: UserModel) { 67async function usersExist (User: UserModel) {
70 return User.countTotal().then(totalUsers => { 68 const totalUsers = await User.countTotal()
71 return totalUsers !== 0 69
72 }) 70 return totalUsers !== 0
73} 71}
74 72
75// --------------------------------------------------------------------------- 73// ---------------------------------------------------------------------------
diff --git a/server/initializers/database.ts b/server/initializers/database.ts
index ea2b68f59..ade72b62f 100644
--- a/server/initializers/database.ts
+++ b/server/initializers/database.ts
@@ -2,7 +2,7 @@ import { join } from 'path'
2import { flattenDepth } from 'lodash' 2import { flattenDepth } from 'lodash'
3require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string 3require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string
4import * as Sequelize from 'sequelize' 4import * as Sequelize from 'sequelize'
5import * as Promise from 'bluebird' 5import * as Bluebird from 'bluebird'
6 6
7import { CONFIG } from './constants' 7import { CONFIG } from './constants'
8// Do not use barrel, we need to load database first 8// Do not use barrel, we need to load database first
@@ -77,26 +77,26 @@ const sequelize = new Sequelize(dbname, username, password, {
77 77
78database.sequelize = sequelize 78database.sequelize = sequelize
79 79
80database.init = (silent: boolean) => { 80database.init = async (silent: boolean) => {
81 const modelDirectory = join(__dirname, '..', 'models') 81 const modelDirectory = join(__dirname, '..', 'models')
82 82
83 return getModelFiles(modelDirectory).then(filePaths => { 83 const filePaths = await getModelFiles(modelDirectory)
84 filePaths.forEach(filePath => {
85 const model = sequelize.import(filePath)
86 84
87 database[model['name']] = model 85 for (const filePath of filePaths) {
88 }) 86 const model = sequelize.import(filePath)
89 87
90 Object.keys(database).forEach(modelName => { 88 database[model['name']] = model
91 if ('associate' in database[modelName]) { 89 }
92 database[modelName].associate(database)
93 }
94 })
95 90
96 if (!silent) logger.info('Database %s is ready.', dbname) 91 for (const modelName of Object.keys(database)) {
92 if ('associate' in database[modelName]) {
93 database[modelName].associate(database)
94 }
95 }
97 96
98 return undefined 97 if (!silent) logger.info('Database %s is ready.', dbname)
99 }) 98
99 return undefined
100} 100}
101 101
102// --------------------------------------------------------------------------- 102// ---------------------------------------------------------------------------
@@ -107,31 +107,29 @@ export {
107 107
108// --------------------------------------------------------------------------- 108// ---------------------------------------------------------------------------
109 109
110function getModelFiles (modelDirectory: string) { 110async function getModelFiles (modelDirectory: string) {
111 return readdirPromise(modelDirectory) 111 const files = await readdirPromise(modelDirectory)
112 .then(files => { 112 const directories = files.filter(directory => {
113 const directories: string[] = files.filter(directory => { 113 // Find directories
114 // Find directories 114 if (
115 if ( 115 directory.endsWith('.js.map') ||
116 directory.endsWith('.js.map') || 116 directory === 'index.js' || directory === 'index.ts' ||
117 directory === 'index.js' || directory === 'index.ts' || 117 directory === 'utils.js' || directory === 'utils.ts'
118 directory === 'utils.js' || directory === 'utils.ts' 118 ) return false
119 ) return false 119
120 120 return true
121 return true 121 })
122 })
123 122
124 return directories 123 const tasks: Bluebird<any>[] = []
125 })
126 .then(directories => {
127 const tasks = []
128 124
129 // For each directory we read it and append model in the modelFilePaths array 125 // For each directory we read it and append model in the modelFilePaths array
130 directories.forEach(directory => { 126 for (const directory of directories) {
131 const modelDirectoryPath = join(modelDirectory, directory) 127 const modelDirectoryPath = join(modelDirectory, directory)
132 128
133 const promise = readdirPromise(modelDirectoryPath).then(files => { 129 const promise = readdirPromise(modelDirectoryPath)
134 const filteredFiles = files.filter(file => { 130 .then(files => {
131 const filteredFiles = files
132 .filter(file => {
135 if ( 133 if (
136 file === 'index.js' || file === 'index.ts' || 134 file === 'index.js' || file === 'index.ts' ||
137 file === 'utils.js' || file === 'utils.ts' || 135 file === 'utils.js' || file === 'utils.ts' ||
@@ -140,17 +138,15 @@ function getModelFiles (modelDirectory: string) {
140 ) return false 138 ) return false
141 139
142 return true 140 return true
143 }).map(file => join(modelDirectoryPath, file)) 141 })
144 142 .map(file => join(modelDirectoryPath, file))
145 return filteredFiles
146 })
147 143
148 tasks.push(promise) 144 return filteredFiles
149 }) 145 })
150 146
151 return Promise.all(tasks) 147 tasks.push(promise)
152 }) 148 }
153 .then((filteredFiles: string[][]) => { 149
154 return flattenDepth<string>(filteredFiles, 1) 150 const filteredFilesArray: string[][] = await Promise.all(tasks)
155 }) 151 return flattenDepth<string>(filteredFilesArray, 1)
156} 152}
diff --git a/server/initializers/index.ts b/server/initializers/index.ts
index b8400ff84..332702774 100644
--- a/server/initializers/index.ts
+++ b/server/initializers/index.ts
@@ -1,4 +1,4 @@
1// Constants first, databse in second! 1// Constants first, database in second!
2export * from './constants' 2export * from './constants'
3export * from './database' 3export * from './database'
4export * from './checker' 4export * from './checker'
diff --git a/server/initializers/installer.ts b/server/initializers/installer.ts
index b997de07f..4c04290fc 100644
--- a/server/initializers/installer.ts
+++ b/server/initializers/installer.ts
@@ -1,5 +1,5 @@
1import * as passwordGenerator from 'password-generator' 1import * as passwordGenerator from 'password-generator'
2import * as Promise from 'bluebird' 2import * as Bluebird from 'bluebird'
3 3
4import { database as db } from './database' 4import { database as db } from './database'
5import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants' 5import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants'
@@ -7,13 +7,13 @@ import { clientsExist, usersExist } from './checker'
7import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers' 7import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers'
8import { createUserAuthorAndChannel } from '../lib' 8import { createUserAuthorAndChannel } from '../lib'
9 9
10function installApplication () { 10async function installApplication () {
11 return db.sequelize.sync() 11 await db.sequelize.sync()
12 .then(() => removeCacheDirectories()) 12 await removeCacheDirectories()
13 .then(() => createDirectoriesIfNotExist()) 13 await createDirectoriesIfNotExist()
14 .then(() => createCertsIfNotExist()) 14 await createCertsIfNotExist()
15 .then(() => createOAuthClientIfNotExist()) 15 await createOAuthClientIfNotExist()
16 .then(() => createOAuthAdminIfNotExist()) 16 await createOAuthAdminIfNotExist()
17} 17}
18 18
19// --------------------------------------------------------------------------- 19// ---------------------------------------------------------------------------
@@ -27,13 +27,13 @@ export {
27function removeCacheDirectories () { 27function removeCacheDirectories () {
28 const cacheDirectories = CACHE.DIRECTORIES 28 const cacheDirectories = CACHE.DIRECTORIES
29 29
30 const tasks = [] 30 const tasks: Bluebird<any>[] = []
31 31
32 // Cache directories 32 // Cache directories
33 Object.keys(cacheDirectories).forEach(key => { 33 for (const key of Object.keys(cacheDirectories)) {
34 const dir = cacheDirectories[key] 34 const dir = cacheDirectories[key]
35 tasks.push(rimrafPromise(dir)) 35 tasks.push(rimrafPromise(dir))
36 }) 36 }
37 37
38 return Promise.all(tasks) 38 return Promise.all(tasks)
39} 39}
@@ -43,88 +43,83 @@ function createDirectoriesIfNotExist () {
43 const cacheDirectories = CACHE.DIRECTORIES 43 const cacheDirectories = CACHE.DIRECTORIES
44 44
45 const tasks = [] 45 const tasks = []
46 Object.keys(storage).forEach(key => { 46 for (const key of Object.keys(storage)) {
47 const dir = storage[key] 47 const dir = storage[key]
48 tasks.push(mkdirpPromise(dir)) 48 tasks.push(mkdirpPromise(dir))
49 }) 49 }
50 50
51 // Cache directories 51 // Cache directories
52 Object.keys(cacheDirectories).forEach(key => { 52 for (const key of Object.keys(cacheDirectories)) {
53 const dir = cacheDirectories[key] 53 const dir = cacheDirectories[key]
54 tasks.push(mkdirpPromise(dir)) 54 tasks.push(mkdirpPromise(dir))
55 }) 55 }
56 56
57 return Promise.all(tasks) 57 return Promise.all(tasks)
58} 58}
59 59
60function createOAuthClientIfNotExist () { 60async function createOAuthClientIfNotExist () {
61 return clientsExist(db.OAuthClient).then(exist => { 61 const exist = await clientsExist(db.OAuthClient)
62 // Nothing to do, clients already exist 62 // Nothing to do, clients already exist
63 if (exist === true) return undefined 63 if (exist === true) return undefined
64
65 logger.info('Creating a default OAuth Client.')
66
67 const id = passwordGenerator(32, false, /[a-z0-9]/)
68 const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/)
69 const client = db.OAuthClient.build({
70 clientId: id,
71 clientSecret: secret,
72 grants: [ 'password', 'refresh_token' ],
73 redirectUris: null
74 })
75 64
76 return client.save().then(createdClient => { 65 logger.info('Creating a default OAuth Client.')
77 logger.info('Client id: ' + createdClient.clientId)
78 logger.info('Client secret: ' + createdClient.clientSecret)
79 66
80 return undefined 67 const id = passwordGenerator(32, false, /[a-z0-9]/)
81 }) 68 const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/)
69 const client = db.OAuthClient.build({
70 clientId: id,
71 clientSecret: secret,
72 grants: [ 'password', 'refresh_token' ],
73 redirectUris: null
82 }) 74 })
83}
84 75
85function createOAuthAdminIfNotExist () { 76 const createdClient = await client.save()
86 return usersExist(db.User).then(exist => { 77 logger.info('Client id: ' + createdClient.clientId)
87 // Nothing to do, users already exist 78 logger.info('Client secret: ' + createdClient.clientSecret)
88 if (exist === true) return undefined
89 79
90 logger.info('Creating the administrator.') 80 return undefined
81}
91 82
92 const username = 'root' 83async function createOAuthAdminIfNotExist () {
93 const role = USER_ROLES.ADMIN 84 const exist = await usersExist(db.User)
94 const email = CONFIG.ADMIN.EMAIL 85 // Nothing to do, users already exist
95 let validatePassword = true 86 if (exist === true) return undefined
96 let password = ''
97 87
98 // Do not generate a random password for tests 88 logger.info('Creating the administrator.')
99 if (process.env.NODE_ENV === 'test') {
100 password = 'test'
101 89
102 if (process.env.NODE_APP_INSTANCE) { 90 const username = 'root'
103 password += process.env.NODE_APP_INSTANCE 91 const role = USER_ROLES.ADMIN
104 } 92 const email = CONFIG.ADMIN.EMAIL
93 let validatePassword = true
94 let password = ''
105 95
106 // Our password is weak so do not validate it 96 // Do not generate a random password for tests
107 validatePassword = false 97 if (process.env.NODE_ENV === 'test') {
108 } else { 98 password = 'test'
109 password = passwordGenerator(8, true)
110 }
111 99
112 const userData = { 100 if (process.env.NODE_APP_INSTANCE) {
113 username, 101 password += process.env.NODE_APP_INSTANCE
114 email,
115 password,
116 role,
117 videoQuota: -1
118 } 102 }
119 const user = db.User.build(userData)
120 103
121 return createUserAuthorAndChannel(user, validatePassword) 104 // Our password is weak so do not validate it
122 .then(({ user }) => { 105 validatePassword = false
123 logger.info('Username: ' + username) 106 } else {
124 logger.info('User password: ' + password) 107 password = passwordGenerator(8, true)
125 108 }
126 logger.info('Creating Application table.') 109
127 return db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION }) 110 const userData = {
128 }) 111 username,
129 }) 112 email,
113 password,
114 role,
115 videoQuota: -1
116 }
117 const user = db.User.build(userData)
118
119 await createUserAuthorAndChannel(user, validatePassword)
120 logger.info('Username: ' + username)
121 logger.info('User password: ' + password)
122
123 logger.info('Creating Application table.')
124 await db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION })
130} 125}
diff --git a/server/initializers/migrator.ts b/server/initializers/migrator.ts
index 7b535aea9..4fbe1cf5b 100644
--- a/server/initializers/migrator.ts
+++ b/server/initializers/migrator.ts
@@ -1,52 +1,35 @@
1import * as path from 'path' 1import * as path from 'path'
2import * as Promise from 'bluebird'
3 2
4import { database as db } from './database' 3import { database as db } from './database'
5import { LAST_MIGRATION_VERSION } from './constants' 4import { LAST_MIGRATION_VERSION } from './constants'
6import { logger, readdirPromise } from '../helpers' 5import { logger, readdirPromise } from '../helpers'
7 6
8function migrate () { 7async function migrate () {
9 const p = db.sequelize.getQueryInterface().showAllTables() 8 const tables = await db.sequelize.getQueryInterface().showAllTables()
10 .then(tables => {
11 // No tables, we don't need to migrate anything
12 // The installer will do that
13 if (tables.length === 0) throw null
14 })
15 .then(() => {
16 return db.Application.loadMigrationVersion()
17 })
18 .then(actualVersion => {
19 if (actualVersion === null) {
20 return db.Application.create({ migrationVersion: 0 }).then(() => 0)
21 }
22 9
23 return actualVersion 10 // No tables, we don't need to migrate anything
24 }) 11 // The installer will do that
25 .then(actualVersion => { 12 if (tables.length === 0) return
26 // No need migrations, abort
27 if (actualVersion >= LAST_MIGRATION_VERSION) throw null
28 13
29 return actualVersion 14 let actualVersion = await db.Application.loadMigrationVersion()
30 }) 15 if (actualVersion === null) {
31 .then(actualVersion => { 16 await db.Application.create({ migrationVersion: 0 })
32 // If there are a new migration scripts 17 actualVersion = 0
33 logger.info('Begin migrations.') 18 }
34 19
35 return getMigrationScripts().then(migrationScripts => ({ actualVersion, migrationScripts })) 20 // No need migrations, abort
36 }) 21 if (actualVersion >= LAST_MIGRATION_VERSION) return
37 .then(({ actualVersion, migrationScripts }) => {
38 return Promise.each(migrationScripts, entity => executeMigration(actualVersion, entity))
39 })
40 .then(() => {
41 logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION)
42 })
43 .catch(err => {
44 if (err === null) return undefined
45 22
46 throw err 23 // If there are a new migration scripts
47 }) 24 logger.info('Begin migrations.')
25
26 const migrationScripts = await getMigrationScripts()
48 27
49 return p 28 for (const migrationScript of migrationScripts) {
29 await executeMigration(actualVersion, migrationScript)
30 }
31
32 logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION)
50} 33}
51 34
52// --------------------------------------------------------------------------- 35// ---------------------------------------------------------------------------
@@ -57,29 +40,28 @@ export {
57 40
58// --------------------------------------------------------------------------- 41// ---------------------------------------------------------------------------
59 42
60function getMigrationScripts () { 43async function getMigrationScripts () {
61 return readdirPromise(path.join(__dirname, 'migrations')).then(files => { 44 const files = await readdirPromise(path.join(__dirname, 'migrations'))
62 const filesToMigrate: { 45 const filesToMigrate: {
63 version: string, 46 version: string,
64 script: string 47 script: string
65 }[] = [] 48 }[] = []
66 49
67 files 50 files
68 .filter(file => file.endsWith('.js.map') === false) 51 .filter(file => file.endsWith('.js.map') === false)
69 .forEach(file => { 52 .forEach(file => {
70 // Filename is something like 'version-blabla.js' 53 // Filename is something like 'version-blabla.js'
71 const version = file.split('-')[0] 54 const version = file.split('-')[0]
72 filesToMigrate.push({ 55 filesToMigrate.push({
73 version, 56 version,
74 script: file 57 script: file
75 })
76 }) 58 })
59 })
77 60
78 return filesToMigrate 61 return filesToMigrate
79 })
80} 62}
81 63
82function executeMigration (actualVersion: number, entity: { version: string, script: string }) { 64async function executeMigration (actualVersion: number, entity: { version: string, script: string }) {
83 const versionScript = parseInt(entity.version, 10) 65 const versionScript = parseInt(entity.version, 10)
84 66
85 // Do not execute old migration scripts 67 // Do not execute old migration scripts
@@ -91,7 +73,7 @@ function executeMigration (actualVersion: number, entity: { version: string, scr
91 73
92 const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName)) 74 const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName))
93 75
94 return db.sequelize.transaction(t => { 76 await db.sequelize.transaction(async t => {
95 const options = { 77 const options = {
96 transaction: t, 78 transaction: t,
97 queryInterface: db.sequelize.getQueryInterface(), 79 queryInterface: db.sequelize.getQueryInterface(),
@@ -99,10 +81,9 @@ function executeMigration (actualVersion: number, entity: { version: string, scr
99 db 81 db
100 } 82 }
101 83
102 return migrationScript.up(options) 84 await migrationScript.up(options)
103 .then(() => { 85
104 // Update the new migration version 86 // Update the new migration version
105 return db.Application.updateMigrationVersion(versionScript, t) 87 await db.Application.updateMigrationVersion(versionScript, t)
106 })
107 }) 88 })
108} 89}
diff --git a/server/lib/cache/videos-preview-cache.ts b/server/lib/cache/videos-preview-cache.ts
index fecdca6ef..0fe4d2f78 100644
--- a/server/lib/cache/videos-preview-cache.ts
+++ b/server/lib/cache/videos-preview-cache.ts
@@ -1,7 +1,6 @@
1import * as asyncLRU from 'async-lru' 1import * as asyncLRU from 'async-lru'
2import { join } from 'path' 2import { join } from 'path'
3import { createWriteStream } from 'fs' 3import { createWriteStream } from 'fs'
4import * as Promise from 'bluebird'
5 4
6import { database as db, CONFIG, CACHE } from '../../initializers' 5import { database as db, CONFIG, CACHE } from '../../initializers'
7import { logger, unlinkPromise } from '../../helpers' 6import { logger, unlinkPromise } from '../../helpers'
@@ -43,15 +42,15 @@ class VideosPreviewCache {
43 }) 42 })
44 } 43 }
45 44
46 private loadPreviews (key: string) { 45 private async loadPreviews (key: string) {
47 return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) 46 const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key)
48 .then(video => { 47 if (!video) return undefined
49 if (!video) return undefined
50 48
51 if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) 49 if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName())
52 50
53 return this.saveRemotePreviewAndReturnPath(video) 51 const res = await this.saveRemotePreviewAndReturnPath(video)
54 }) 52
53 return res
55 } 54 }
56 55
57 private saveRemotePreviewAndReturnPath (video: VideoInstance) { 56 private saveRemotePreviewAndReturnPath (video: VideoInstance) {
diff --git a/server/lib/friends.ts b/server/lib/friends.ts
index f035b099b..a33432dc1 100644
--- a/server/lib/friends.ts
+++ b/server/lib/friends.ts
@@ -1,6 +1,6 @@
1import * as request from 'request' 1import * as request from 'request'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import * as Promise from 'bluebird' 3import * as Bluebird from 'bluebird'
4import { join } from 'path' 4import { join } from 'path'
5 5
6import { database as db } from '../initializers/database' 6import { database as db } from '../initializers/database'
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.
188function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { 188function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
189 const tasks = [] 189 const tasks = []
190 190
191 eventsParams.forEach(eventParams => { 191 for (const eventParams of eventsParams) {
192 tasks.push(addEventToRemoteVideo(eventParams, transaction)) 192 tasks.push(addEventToRemoteVideo(eventParams, transaction))
193 }) 193 }
194 194
195 return Promise.all(tasks) 195 return Promise.all(tasks)
196} 196}
197 197
198function hasFriends () { 198async function hasFriends () {
199 return db.Pod.countAll().then(count => count !== 0) 199 const count = await db.Pod.countAll()
200
201 return count !== 0
200} 202}
201 203
202function makeFriends (hosts: string[]) { 204async function makeFriends (hosts: string[]) {
203 const podsScore = {} 205 const podsScore = {}
204 206
205 logger.info('Make friends!') 207 logger.info('Make friends!')
206 return getMyPublicCert() 208 const cert = await getMyPublicCert()
207 .then(cert => {
208 return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
209 })
210 .then(cert => {
211 logger.debug('Pods scores computed.', { podsScore: podsScore })
212 const podsList = computeWinningPods(hosts, podsScore)
213 logger.debug('Pods that we keep.', { podsToKeep: podsList })
214 209
215 return makeRequestsToWinningPods(cert, podsList) 210 for (const host of hosts) {
216 }) 211 await computeForeignPodsList(host, podsScore)
212 }
213
214 logger.debug('Pods scores computed.', { podsScore: podsScore })
215
216 const podsList = computeWinningPods(hosts, podsScore)
217 logger.debug('Pods that we keep.', { podsToKeep: podsList })
218
219 return makeRequestsToWinningPods(cert, podsList)
217} 220}
218 221
219function quitFriends () { 222async function quitFriends () {
220 // Stop pool requests 223 // Stop pool requests
221 requestScheduler.deactivate() 224 requestScheduler.deactivate()
222 225
223 return requestScheduler.flush() 226 try {
224 .then(() => { 227 await requestScheduler.flush()
225 return requestVideoQaduScheduler.flush() 228
226 }) 229 await requestVideoQaduScheduler.flush()
227 .then(() => { 230
228 return db.Pod.list() 231 const pods = await db.Pod.list()
229 }) 232 const requestParams = {
230 .then(pods => { 233 method: 'POST' as 'POST',
231 const requestParams = { 234 path: '/api/' + API_VERSION + '/remote/pods/remove',
232 method: 'POST' as 'POST', 235 toPod: null
233 path: '/api/' + API_VERSION + '/remote/pods/remove', 236 }
234 toPod: null
235 }
236 237
237 // Announce we quit them 238 // Announce we quit them
238 // We don't care if the request fails 239 // We don't care if the request fails
239 // The other pod will exclude us automatically after a while 240 // The other pod will exclude us automatically after a while
240 return Promise.map(pods, pod => { 241 try {
242 await Bluebird.map(pods, pod => {
241 requestParams.toPod = pod 243 requestParams.toPod = pod
242 244
243 return makeSecureRequest(requestParams) 245 return makeSecureRequest(requestParams)
244 }, { concurrency: REQUESTS_IN_PARALLEL }) 246 }, { concurrency: REQUESTS_IN_PARALLEL })
245 .then(() => pods) 247 } catch (err) { // Don't stop the process
246 .catch(err => { 248 logger.error('Some errors while quitting friends.', err)
247 logger.error('Some errors while quitting friends.', err) 249 }
248 // Don't stop the process
249 return pods
250 })
251 })
252 .then(pods => {
253 const tasks = []
254 pods.forEach(pod => tasks.push(pod.destroy()))
255 250
256 return Promise.all(pods) 251 const tasks = []
257 }) 252 for (const pod of pods) {
258 .then(() => { 253 tasks.push(pod.destroy())
259 logger.info('Removed all remote videos.') 254 }
260 // Don't forget to re activate the scheduler, even if there was an error 255 await Promise.all(pods)
261 return requestScheduler.activate() 256
262 }) 257 logger.info('Removed all remote videos.')
263 .finally(() => requestScheduler.activate()) 258
259 requestScheduler.activate()
260 } catch (err) {
261 // Don't forget to re activate the scheduler, even if there was an error
262 requestScheduler.activate()
263
264 throw err
265 }
264} 266}
265 267
266function sendOwnedDataToPod (podId: number) { 268async function sendOwnedDataToPod (podId: number) {
267 // First send authors 269 // First send authors
268 return sendOwnedAuthorsToPod(podId) 270 await sendOwnedAuthorsToPod(podId)
269 .then(() => sendOwnedChannelsToPod(podId)) 271 await sendOwnedChannelsToPod(podId)
270 .then(() => sendOwnedVideosToPod(podId)) 272 await sendOwnedVideosToPod(podId)
273}
274
275async function sendOwnedChannelsToPod (podId: number) {
276 const videoChannels = await db.VideoChannel.listOwned()
277
278 const tasks: Promise<any>[] = []
279 for (const videoChannel of videoChannels) {
280 const remoteVideoChannel = videoChannel.toAddRemoteJSON()
281 const options = {
282 type: 'add-channel' as 'add-channel',
283 endpoint: REQUEST_ENDPOINTS.VIDEOS,
284 data: remoteVideoChannel,
285 toIds: [ podId ],
286 transaction: null
287 }
288
289 const p = createRequest(options)
290 tasks.push(p)
291 }
292
293 await Promise.all(tasks)
271} 294}
272 295
273function sendOwnedChannelsToPod (podId: number) { 296async function sendOwnedAuthorsToPod (podId: number) {
274 return db.VideoChannel.listOwned() 297 const authors = await db.Author.listOwned()
275 .then(videoChannels => { 298 const tasks: Promise<any>[] = []
276 const tasks = []
277 videoChannels.forEach(videoChannel => {
278 const remoteVideoChannel = videoChannel.toAddRemoteJSON()
279 const options = {
280 type: 'add-channel' as 'add-channel',
281 endpoint: REQUEST_ENDPOINTS.VIDEOS,
282 data: remoteVideoChannel,
283 toIds: [ podId ],
284 transaction: null
285 }
286 299
287 const p = createRequest(options) 300 for (const author of authors) {
288 tasks.push(p) 301 const remoteAuthor = author.toAddRemoteJSON()
289 }) 302 const options = {
303 type: 'add-author' as 'add-author',
304 endpoint: REQUEST_ENDPOINTS.VIDEOS,
305 data: remoteAuthor,
306 toIds: [ podId ],
307 transaction: null
308 }
290 309
291 return Promise.all(tasks) 310 const p = createRequest(options)
292 }) 311 tasks.push(p)
312 }
313
314 await Promise.all(tasks)
293} 315}
294 316
295function sendOwnedAuthorsToPod (podId: number) { 317async function sendOwnedVideosToPod (podId: number) {
296 return db.Author.listOwned() 318 const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
297 .then(authors => { 319 const tasks: Bluebird<any>[] = []
298 const tasks = [] 320
299 authors.forEach(author => { 321 for (const video of videosList) {
300 const remoteAuthor = author.toAddRemoteJSON() 322 const promise = video.toAddRemoteJSON()
323 .then(remoteVideo => {
301 const options = { 324 const options = {
302 type: 'add-author' as 'add-author', 325 type: 'add-video' as 'add-video',
303 endpoint: REQUEST_ENDPOINTS.VIDEOS, 326 endpoint: REQUEST_ENDPOINTS.VIDEOS,
304 data: remoteAuthor, 327 data: remoteVideo,
305 toIds: [ podId ], 328 toIds: [ podId ],
306 transaction: null 329 transaction: null
307 } 330 }
308 331 return createRequest(options)
309 const p = createRequest(options)
310 tasks.push(p)
311 }) 332 })
312 333 .catch(err => {
313 return Promise.all(tasks) 334 logger.error('Cannot convert video to remote.', err)
314 }) 335 // Don't break the process
315} 336 return undefined
316
317function sendOwnedVideosToPod (podId: number) {
318 return db.Video.listOwnedAndPopulateAuthorAndTags()
319 .then(videosList => {
320 const tasks = []
321 videosList.forEach(video => {
322 const promise = video.toAddRemoteJSON()
323 .then(remoteVideo => {
324 const options = {
325 type: 'add-video' as 'add-video',
326 endpoint: REQUEST_ENDPOINTS.VIDEOS,
327 data: remoteVideo,
328 toIds: [ podId ],
329 transaction: null
330 }
331 return createRequest(options)
332 })
333 .catch(err => {
334 logger.error('Cannot convert video to remote.', err)
335 // Don't break the process
336 return undefined
337 })
338
339 tasks.push(promise)
340 }) 337 })
341 338
342 return Promise.all(tasks) 339 tasks.push(promise)
343 }) 340 }
341
342 await Promise.all(tasks)
344} 343}
345 344
346function fetchRemotePreview (video: VideoInstance) { 345function fetchRemotePreview (video: VideoInstance) {
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) {
350 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) 349 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
351} 350}
352 351
353function removeFriend (pod: PodInstance) { 352async function removeFriend (pod: PodInstance) {
354 const requestParams = { 353 const requestParams = {
355 method: 'POST' as 'POST', 354 method: 'POST' as 'POST',
356 path: '/api/' + API_VERSION + '/remote/pods/remove', 355 path: '/api/' + API_VERSION + '/remote/pods/remove',
357 toPod: pod 356 toPod: pod
358 } 357 }
359 358
360 return makeSecureRequest(requestParams) 359 try {
361 .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) 360 await makeSecureRequest(requestParams)
362 .then(() => pod.destroy()) 361 } catch (err) {
363 .then(() => logger.info('Removed friend %s.', pod.host)) 362 logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
364 .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) 363 }
364
365 try {
366 await pod.destroy()
367
368 logger.info('Removed friend %s.', pod.host)
369 } catch (err) {
370 logger.error('Cannot destroy friend %s.', pod.host, err)
371 }
365} 372}
366 373
367function getRequestScheduler () { 374function getRequestScheduler () {
@@ -406,23 +413,21 @@ export {
406 413
407// --------------------------------------------------------------------------- 414// ---------------------------------------------------------------------------
408 415
409function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { 416async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
410 // TODO: type res 417 const result = await getForeignPodsList(host)
411 return getForeignPodsList(host).then(res => { 418 const foreignPodsList: { host: string }[] = result.data
412 const foreignPodsList: { host: string }[] = res.data
413 419
414 // Let's give 1 point to the pod we ask the friends list 420 // Let's give 1 point to the pod we ask the friends list
415 foreignPodsList.push({ host }) 421 foreignPodsList.push({ host })
416 422
417 foreignPodsList.forEach(foreignPod => { 423 for (const foreignPod of foreignPodsList) {
418 const foreignPodHost = foreignPod.host 424 const foreignPodHost = foreignPod.host
419 425
420 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ 426 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
421 else podsScore[foreignPodHost] = 1 427 else podsScore[foreignPodHost] = 1
422 }) 428 }
423 429
424 return undefined 430 return undefined
425 })
426} 431}
427 432
428function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { 433function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num
431 const podsList = [] 436 const podsList = []
432 const baseScore = hosts.length / 2 437 const baseScore = hosts.length / 2
433 438
434 Object.keys(podsScore).forEach(podHost => { 439 for (const podHost of Object.keys(podsScore)) {
435 // If the pod is not me and with a good score we add it 440 // If the pod is not me and with a good score we add it
436 if (isMe(podHost) === false && podsScore[podHost] > baseScore) { 441 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
437 podsList.push({ host: podHost }) 442 podsList.push({ host: podHost })
438 } 443 }
439 }) 444 }
440 445
441 return podsList 446 return podsList
442} 447}
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) {
449 if (err) return rej(err) 454 if (err) return rej(err)
450 455
451 try { 456 try {
452 const json = JSON.parse(body) 457 const json: ResultList<FormattedPod> = JSON.parse(body)
453 return res(json) 458 return res(json)
454 } catch (err) { 459 } catch (err) {
455 return rej(err) 460 return rej(err)
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) {
458 }) 463 })
459} 464}
460 465
461function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { 466async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
462 // Stop pool requests 467 // Stop pool requests
463 requestScheduler.deactivate() 468 requestScheduler.deactivate()
464 // Flush pool requests 469 // Flush pool requests
465 requestScheduler.forceSend() 470 requestScheduler.forceSend()
466 471
467 return Promise.map(podsList, pod => { 472 try {
468 const params = { 473 await Bluebird.map(podsList, async pod => {
469 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', 474 const params = {
470 method: 'POST' as 'POST', 475 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
471 json: { 476 method: 'POST' as 'POST',
472 host: CONFIG.WEBSERVER.HOST, 477 json: {
473 email: CONFIG.ADMIN.EMAIL, 478 host: CONFIG.WEBSERVER.HOST,
474 publicKey: cert 479 email: CONFIG.ADMIN.EMAIL,
480 publicKey: cert
481 }
475 } 482 }
476 }
477 483
478 return makeRetryRequest(params) 484 const { response, body } = await makeRetryRequest(params)
479 .then(({ response, body }) => { 485 const typedBody = body as { cert: string, email: string }
480 body = body as { cert: string, email: string } 486
481 487 if (response.statusCode === 200) {
482 if (response.statusCode === 200) { 488 const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
483 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) 489
484 return podObj.save() 490 let podCreated: PodInstance
485 .then(podCreated => { 491 try {
486 492 podCreated = await podObj.save()
487 // Add our videos to the request scheduler 493 } catch (err) {
488 sendOwnedDataToPod(podCreated.id) 494 logger.error('Cannot add friend %s pod.', pod.host, err)
489 })
490 .catch(err => {
491 logger.error('Cannot add friend %s pod.', pod.host, err)
492 })
493 } else {
494 logger.error('Status not 200 for %s pod.', pod.host)
495 } 495 }
496 }) 496
497 .catch(err => { 497 // Add our videos to the request scheduler
498 logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) 498 sendOwnedDataToPod(podCreated.id)
499 // Don't break the process 499 .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
500 }) 500 } else {
501 }, { concurrency: REQUESTS_IN_PARALLEL }) 501 logger.error('Status not 200 for %s pod.', pod.host)
502 .then(() => logger.debug('makeRequestsToWinningPods finished.')) 502 }
503 .finally(() => { 503 }, { concurrency: REQUESTS_IN_PARALLEL })
504
505 logger.debug('makeRequestsToWinningPods finished.')
506
507 requestScheduler.activate()
508 } catch (err) {
504 // Final callback, we've ended all the requests 509 // Final callback, we've ended all the requests
505 // Now we made new friends, we can re activate the pool of requests 510 // Now we made new friends, we can re activate the pool of requests
506 requestScheduler.activate() 511 requestScheduler.activate()
507 }) 512 }
508} 513}
509 514
510// Wrapper that populate "toIds" argument with all our friends if it is not specified 515// Wrapper that populate "toIds" argument with all our friends if it is not specified
@@ -515,14 +520,19 @@ type CreateRequestOptions = {
515 toIds?: number[] 520 toIds?: number[]
516 transaction: Sequelize.Transaction 521 transaction: Sequelize.Transaction
517} 522}
518function createRequest (options: CreateRequestOptions) { 523async function createRequest (options: CreateRequestOptions) {
519 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) 524 if (options.toIds !== undefined) {
525 await requestScheduler.createRequest(options as RequestSchedulerOptions)
526 return undefined
527 }
520 528
521 // If the "toIds" pods is not specified, we send the request to all our friends 529 // If the "toIds" pods is not specified, we send the request to all our friends
522 return db.Pod.listAllIds(options.transaction).then(podIds => { 530 const podIds = await db.Pod.listAllIds(options.transaction)
523 const newOptions = Object.assign(options, { toIds: podIds }) 531
524 return requestScheduler.createRequest(newOptions) 532 const newOptions = Object.assign(options, { toIds: podIds })
525 }) 533 await requestScheduler.createRequest(newOptions)
534
535 return undefined
526} 536}
527 537
528function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { 538function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {
diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/handlers/video-file-optimizer.ts
index 63a51064c..799ba8b01 100644
--- a/server/lib/jobs/handlers/video-file-optimizer.ts
+++ b/server/lib/jobs/handlers/video-file-optimizer.ts
@@ -1,4 +1,4 @@
1import * as Promise from 'bluebird' 1import * as Bluebird from 'bluebird'
2 2
3import { database as db } from '../../../initializers/database' 3import { database as db } from '../../../initializers/database'
4import { logger, computeResolutionsToTranscode } from '../../../helpers' 4import { logger, computeResolutionsToTranscode } from '../../../helpers'
@@ -6,16 +6,17 @@ import { VideoInstance } from '../../../models'
6import { addVideoToFriends } from '../../friends' 6import { addVideoToFriends } from '../../friends'
7import { JobScheduler } from '../job-scheduler' 7import { JobScheduler } from '../job-scheduler'
8 8
9function process (data: { videoUUID: string }, jobId: number) { 9async function process (data: { videoUUID: string }, jobId: number) {
10 return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { 10 const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
11 // No video, maybe deleted? 11 // No video, maybe deleted?
12 if (!video) { 12 if (!video) {
13 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 13 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
14 return undefined 14 return undefined
15 } 15 }
16
17 await video.optimizeOriginalVideofile()
16 18
17 return video.optimizeOriginalVideofile().then(() => video) 19 return video
18 })
19} 20}
20 21
21function onError (err: Error, jobId: number) { 22function onError (err: Error, jobId: number) {
@@ -23,33 +24,31 @@ function onError (err: Error, jobId: number) {
23 return Promise.resolve() 24 return Promise.resolve()
24} 25}
25 26
26function onSuccess (jobId: number, video: VideoInstance) { 27async function onSuccess (jobId: number, video: VideoInstance) {
27 if (video === undefined) return undefined 28 if (video === undefined) return undefined
28 29
29 logger.info('Job %d is a success.', jobId) 30 logger.info('Job %d is a success.', jobId)
30 31
31 video.toAddRemoteJSON() 32 const remoteVideo = await video.toAddRemoteJSON()
32 .then(remoteVideo => { 33
33 // Now we'll add the video's meta data to our friends 34 // Now we'll add the video's meta data to our friends
34 return addVideoToFriends(remoteVideo, null) 35 await addVideoToFriends(remoteVideo, null)
35 }) 36
36 .then(() => { 37 const originalFileHeight = await video.getOriginalFileHeight()
37 return video.getOriginalFileHeight() 38 // Create transcoding jobs if there are enabled resolutions
38 }) 39
39 .then(originalFileHeight => { 40 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
40 // Create transcoding jobs if there are enabled resolutions 41 logger.info(
41 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) 42 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight,
42 logger.info( 43 { resolutions: resolutionsEnabled }
43 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, 44 )
44 { resolutions: resolutionsEnabled } 45
45 ) 46 if (resolutionsEnabled.length !== 0) {
46 47 try {
47 if (resolutionsEnabled.length === 0) return undefined 48 await db.sequelize.transaction(async t => {
48 49 const tasks: Bluebird<any>[] = []
49 return db.sequelize.transaction(t => { 50
50 const tasks: Promise<any>[] = [] 51 for (const resolution of resolutionsEnabled) {
51
52 resolutionsEnabled.forEach(resolution => {
53 const dataInput = { 52 const dataInput = {
54 videoUUID: video.uuid, 53 videoUUID: video.uuid,
55 resolution 54 resolution
@@ -57,24 +56,19 @@ function onSuccess (jobId: number, video: VideoInstance) {
57 56
58 const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) 57 const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput)
59 tasks.push(p) 58 tasks.push(p)
60 }) 59 }
61 60
62 return Promise.all(tasks).then(() => resolutionsEnabled) 61 await Promise.all(tasks)
63 }) 62 })
64 })
65 .then(resolutionsEnabled => {
66 if (resolutionsEnabled === undefined) {
67 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
68 return undefined
69 }
70 63
71 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) 64 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
72 }) 65 } catch (err) {
73 .catch((err: Error) => { 66 logger.warn('Cannot transcode the video.', err)
74 logger.debug('Cannot transcode the video.', err) 67 }
75 throw err 68 } else {
76 }) 69 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
77 70 return undefined
71 }
78} 72}
79 73
80// --------------------------------------------------------------------------- 74// ---------------------------------------------------------------------------
diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/handlers/video-file-transcoder.ts
index 0dafee566..b240ff58a 100644
--- a/server/lib/jobs/handlers/video-file-transcoder.ts
+++ b/server/lib/jobs/handlers/video-file-transcoder.ts
@@ -4,16 +4,17 @@ import { logger } from '../../../helpers'
4import { VideoInstance } from '../../../models' 4import { VideoInstance } from '../../../models'
5import { VideoResolution } from '../../../../shared' 5import { VideoResolution } from '../../../../shared'
6 6
7function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { 7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { 8 const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
9 // No video, maybe deleted? 9 // No video, maybe deleted?
10 if (!video) { 10 if (!video) {
11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
12 return undefined 12 return undefined
13 } 13 }
14 14
15 return video.transcodeOriginalVideofile(data.resolution).then(() => video) 15 await video.transcodeOriginalVideofile(data.resolution)
16 }) 16
17 return video
17} 18}
18 19
19function onError (err: Error, jobId: number) { 20function onError (err: Error, jobId: number) {
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index c2409d20c..61d483268 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -23,7 +23,7 @@ class JobScheduler {
23 return this.instance || (this.instance = new this()) 23 return this.instance || (this.instance = new this())
24 } 24 }
25 25
26 activate () { 26 async activate () {
27 const limit = JOBS_FETCH_LIMIT_PER_CYCLE 27 const limit = JOBS_FETCH_LIMIT_PER_CYCLE
28 28
29 logger.info('Jobs scheduler activated.') 29 logger.info('Jobs scheduler activated.')
@@ -32,32 +32,36 @@ class JobScheduler {
32 32
33 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
35 db.Job.listWithLimit(limit, state) 35 try {
36 .then(jobs => { 36 const jobs = await db.Job.listWithLimit(limit, state)
37 this.enqueueJobs(jobsQueue, jobs) 37
38 38 this.enqueueJobs(jobsQueue, jobs)
39 forever( 39 } catch (err) {
40 next => { 40 logger.error('Cannot list pending jobs.', err)
41 if (jobsQueue.length() !== 0) { 41 }
42 // Finish processing the queue first 42
43 return setTimeout(next, JOBS_FETCHING_INTERVAL) 43 forever(
44 } 44 async next => {
45 45 if (jobsQueue.length() !== 0) {
46 const state = JOB_STATES.PENDING 46 // Finish processing the queue first
47 db.Job.listWithLimit(limit, state) 47 return setTimeout(next, JOBS_FETCHING_INTERVAL)
48 .then(jobs => { 48 }
49 this.enqueueJobs(jobsQueue, jobs) 49
50 50 const state = JOB_STATES.PENDING
51 // Optimization: we could use "drain" from queue object 51 try {
52 return setTimeout(next, JOBS_FETCHING_INTERVAL) 52 const jobs = await db.Job.listWithLimit(limit, state)
53 }) 53
54 .catch(err => logger.error('Cannot list pending jobs.', err)) 54 this.enqueueJobs(jobsQueue, jobs)
55 }, 55 } catch (err) {
56 56 logger.error('Cannot list pending jobs.', err)
57 err => logger.error('Error in job scheduler queue.', err) 57 }
58 ) 58
59 }) 59 // Optimization: we could use "drain" from queue object
60 .catch(err => logger.error('Cannot list pending jobs.', err)) 60 return setTimeout(next, JOBS_FETCHING_INTERVAL)
61 },
62
63 err => logger.error('Error in job scheduler queue.', err)
64 )
61 } 65 }
62 66
63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { 67 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
@@ -75,7 +79,7 @@ class JobScheduler {
75 jobs.forEach(job => jobsQueue.push(job)) 79 jobs.forEach(job => jobsQueue.push(job))
76 } 80 }
77 81
78 private processJob (job: JobInstance, callback: (err: Error) => void) { 82 private async processJob (job: JobInstance, callback: (err: Error) => void) {
79 const jobHandler = jobHandlers[job.handlerName] 83 const jobHandler = jobHandlers[job.handlerName]
80 if (jobHandler === undefined) { 84 if (jobHandler === undefined) {
81 logger.error('Unknown job handler for job %s.', job.handlerName) 85 logger.error('Unknown job handler for job %s.', job.handlerName)
@@ -85,41 +89,45 @@ class JobScheduler {
85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
86 90
87 job.state = JOB_STATES.PROCESSING 91 job.state = JOB_STATES.PROCESSING
88 return job.save() 92 await job.save()
89 .then(() => { 93
90 return jobHandler.process(job.handlerInputData, job.id) 94 try {
91 }) 95 const result = await jobHandler.process(job.handlerInputData, job.id)
92 .then( 96 await this.onJobSuccess(jobHandler, job, result)
93 result => { 97 } catch (err) {
94 return this.onJobSuccess(jobHandler, job, result) 98 logger.error('Error in job handler %s.', job.handlerName, err)
95 }, 99
96 100 try {
97 err => { 101 await this.onJobError(jobHandler, job, err)
98 logger.error('Error in job handler %s.', job.handlerName, err) 102 } catch (innerErr) {
99 return this.onJobError(jobHandler, job, err) 103 this.cannotSaveJobError(innerErr)
100 } 104 return callback(innerErr)
101 ) 105 }
102 .then(() => callback(null)) 106 }
103 .catch(err => { 107
104 this.cannotSaveJobError(err) 108 callback(null)
105 return callback(err)
106 })
107 } 109 }
108 110
109 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { 111 private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
110 job.state = JOB_STATES.ERROR 112 job.state = JOB_STATES.ERROR
111 113
112 return job.save() 114 try {
113 .then(() => jobHandler.onError(err, job.id)) 115 await job.save()
114 .catch(err => this.cannotSaveJobError(err)) 116 await jobHandler.onError(err, job.id)
117 } catch (err) {
118 this.cannotSaveJobError(err)
119 }
115 } 120 }
116 121
117 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { 122 private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
118 job.state = JOB_STATES.SUCCESS 123 job.state = JOB_STATES.SUCCESS
119 124
120 return job.save() 125 try {
121 .then(() => jobHandler.onSuccess(job.id, jobResult)) 126 await job.save()
122 .catch(err => this.cannotSaveJobError(err)) 127 jobHandler.onSuccess(job.id, jobResult)
128 } catch (err) {
129 this.cannotSaveJobError(err)
130 }
123 } 131 }
124 132
125 private cannotSaveJobError (err: Error) { 133 private cannotSaveJobError (err: Error) {
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts
index 9aa3ea52f..d91b00c55 100644
--- a/server/lib/oauth-model.ts
+++ b/server/lib/oauth-model.ts
@@ -24,39 +24,36 @@ function getRefreshToken (refreshToken: string) {
24 return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) 24 return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken)
25} 25}
26 26
27function getUser (username: string, password: string) { 27async function getUser (username: string, password: string) {
28 logger.debug('Getting User (username: ' + username + ', password: ******).') 28 logger.debug('Getting User (username: ' + username + ', password: ******).')
29 29
30 return db.User.getByUsername(username).then(user => { 30 const user = await db.User.getByUsername(username)
31 if (!user) return null 31 if (!user) return null
32 32
33 return user.isPasswordMatch(password).then(passwordMatch => { 33 const passwordMatch = await user.isPasswordMatch(password)
34 if (passwordMatch === false) return null 34 if (passwordMatch === false) return null
35 35
36 return user 36 return user
37 })
38 })
39} 37}
40 38
41function revokeToken (token: TokenInfo) { 39async function revokeToken (tokenInfo: TokenInfo) {
42 return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => { 40 const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken)
43 if (tokenDB) tokenDB.destroy() 41 if (token) token.destroy()
44 42
45 /* 43 /*
46 * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js 44 * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js
47 * "As per the discussion we need set older date 45 * "As per the discussion we need set older date
48 * revokeToken will expected return a boolean in future version 46 * revokeToken will expected return a boolean in future version
49 * https://github.com/oauthjs/node-oauth2-server/pull/274 47 * https://github.com/oauthjs/node-oauth2-server/pull/274
50 * https://github.com/oauthjs/node-oauth2-server/issues/290" 48 * https://github.com/oauthjs/node-oauth2-server/issues/290"
51 */ 49 */
52 const expiredToken = tokenDB 50 const expiredToken = token
53 expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') 51 expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z')
54 52
55 return expiredToken 53 return expiredToken
56 })
57} 54}
58 55
59function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { 56async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) {
60 logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') 57 logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.')
61 58
62 const tokenToCreate = { 59 const tokenToCreate = {
@@ -68,11 +65,10 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns
68 userId: user.id 65 userId: user.id
69 } 66 }
70 67
71 return db.OAuthToken.create(tokenToCreate).then(tokenCreated => { 68 const tokenCreated = await db.OAuthToken.create(tokenToCreate)
72 const tokenToReturn = Object.assign(tokenCreated, { client, user }) 69 const tokenToReturn = Object.assign(tokenCreated, { client, user })
73 70
74 return tokenToReturn 71 return tokenToReturn
75 })
76} 72}
77 73
78// --------------------------------------------------------------------------- 74// ---------------------------------------------------------------------------
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts
index ce4e2ffd2..f46682824 100644
--- a/server/lib/request/abstract-request-scheduler.ts
+++ b/server/lib/request/abstract-request-scheduler.ts
@@ -1,5 +1,5 @@
1import { isEmpty } from 'lodash' 1import { isEmpty } from 'lodash'
2import * as Promise from 'bluebird' 2import * as Bluebird from 'bluebird'
3 3
4import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
5import { logger, makeSecureRequest } from '../../helpers' 5import { logger, makeSecureRequest } from '../../helpers'
@@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> {
76 // --------------------------------------------------------------------------- 76 // ---------------------------------------------------------------------------
77 77
78 // Make a requests to friends of a certain type 78 // Make a requests to friends of a certain type
79 protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { 79 protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
80 const params = { 80 const params = {
81 toPod: toPod, 81 toPod: toPod,
82 method: 'POST' as 'POST', 82 method: 'POST' as 'POST',
@@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> {
86 86
87 // Make multiple retry requests to all of pods 87 // Make multiple retry requests to all of pods
88 // The function fire some useful callbacks 88 // The function fire some useful callbacks
89 return makeSecureRequest(params) 89 try {
90 .then(({ response, body }) => { 90 const { response } = await makeSecureRequest(params)
91 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { 91 if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
92 throw new Error('Status code not 20x : ' + response.statusCode) 92 throw new Error('Status code not 20x : ' + response.statusCode)
93 } 93 }
94 }) 94 } catch (err) {
95 .catch(err => { 95 logger.error('Error sending secure request to %s pod.', toPod.host, err)
96 logger.error('Error sending secure request to %s pod.', toPod.host, err) 96
97 97 throw err
98 throw err 98 }
99 })
100 } 99 }
101 100
102 // Make all the requests of the scheduler 101 // Make all the requests of the scheduler
103 protected makeRequests () { 102 protected async makeRequests () {
104 return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) 103 let requestsGrouped: T
105 .then((requestsGrouped: T) => { 104
106 // We want to group requests by destinations pod and endpoint 105 try {
107 const requestsToMake = this.buildRequestsObjects(requestsGrouped) 106 requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
108 107 } catch (err) {
109 // If there are no requests, abort 108 logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })
110 if (isEmpty(requestsToMake) === true) { 109 throw err
111 logger.info('No "%s" to make.', this.description) 110 }
112 return { goodPods: [], badPods: [] } 111
113 } 112 // We want to group requests by destinations pod and endpoint
114 113 const requestsToMake = this.buildRequestsObjects(requestsGrouped)
115 logger.info('Making "%s" to friends.', this.description) 114
116 115 // If there are no requests, abort
117 const goodPods: number[] = [] 116 if (isEmpty(requestsToMake) === true) {
118 const badPods: number[] = [] 117 logger.info('No "%s" to make.', this.description)
119 118 return { goodPods: [], badPods: [] }
120 return Promise.map(Object.keys(requestsToMake), hashKey => { 119 }
121 const requestToMake = requestsToMake[hashKey] 120
122 const toPod: PodInstance = requestToMake.toPod 121 logger.info('Making "%s" to friends.', this.description)
123 122
124 return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) 123 const goodPods: number[] = []
125 .then(() => { 124 const badPods: number[] = []
126 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) 125
127 goodPods.push(requestToMake.toPod.id) 126 await Bluebird.map(Object.keys(requestsToMake), async hashKey => {
128 127 const requestToMake = requestsToMake[hashKey]
129 this.afterRequestHook() 128 const toPod: PodInstance = requestToMake.toPod
130 129
131 // Remove the pod id of these request ids 130 try {
132 return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) 131 await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
133 }) 132 logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
134 .catch(err => { 133 goodPods.push(requestToMake.toPod.id)
135 badPods.push(requestToMake.toPod.id) 134
136 logger.info('Cannot make request to %s.', toPod.host, err) 135 this.afterRequestHook()
137 }) 136
138 }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) 137 // Remove the pod id of these request ids
139 }) 138 await this.getRequestToPodModel()
140 .then(({ goodPods, badPods }) => { 139 .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
141 this.afterRequestsHook() 140 } catch (err) {
142 141 badPods.push(requestToMake.toPod.id)
143 // All the requests were made, we update the pods score 142 logger.info('Cannot make request to %s.', toPod.host, err)
144 return db.Pod.updatePodsScore(goodPods, badPods) 143 }
145 }) 144 }, { concurrency: REQUESTS_IN_PARALLEL })
146 .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) 145
146 this.afterRequestsHook()
147
148 // All the requests were made, we update the pods score
149 await db.Pod.updatePodsScore(goodPods, badPods)
147 } 150 }
148 151
149 protected afterRequestHook () { 152 protected afterRequestHook () {
150 // Nothing to do, let children reimplement it 153 // Nothing to do, let children re-implement it
151 } 154 }
152 155
153 protected afterRequestsHook () { 156 protected afterRequestsHook () {
154 // Nothing to do, let children reimplement it 157 // Nothing to do, let children re-implement it
155 } 158 }
156} 159}
157 160
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts
index 696875dcf..c3f7f6429 100644
--- a/server/lib/request/request-scheduler.ts
+++ b/server/lib/request/request-scheduler.ts
@@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
37 buildRequestsObjects (requestsGrouped: RequestsGrouped) { 37 buildRequestsObjects (requestsGrouped: RequestsGrouped) {
38 const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} 38 const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {}
39 39
40 Object.keys(requestsGrouped).forEach(toPodId => { 40 for (const toPodId of Object.keys(requestsGrouped)) {
41 requestsGrouped[toPodId].forEach(data => { 41 for (const data of requestsGrouped[toPodId]) {
42 const request = data.request 42 const request = data.request
43 const pod = data.pod 43 const pod = data.pod
44 const hashKey = toPodId + request.endpoint 44 const hashKey = toPodId + request.endpoint
@@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
54 54
55 requestsToMakeGrouped[hashKey].ids.push(request.id) 55 requestsToMakeGrouped[hashKey].ids.push(request.id)
56 requestsToMakeGrouped[hashKey].datas.push(request.request) 56 requestsToMakeGrouped[hashKey].datas.push(request.request)
57 }) 57 }
58 }) 58 }
59 59
60 return requestsToMakeGrouped 60 return requestsToMakeGrouped
61 } 61 }
62 62
63 createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { 63 async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
64 // If there are no destination pods abort 64 // If there are no destination pods abort
65 if (toIds.length === 0) return undefined 65 if (toIds.length === 0) return undefined
66 66
@@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> {
76 transaction 76 transaction
77 } 77 }
78 78
79 return db.Request.create(createQuery, dbRequestOptions) 79 const request = await db.Request.create(createQuery, dbRequestOptions)
80 .then(request => { 80 await request.setPods(toIds, dbRequestOptions)
81 return request.setPods(toIds, dbRequestOptions)
82 })
83 } 81 }
84 82
85 // --------------------------------------------------------------------------- 83 // ---------------------------------------------------------------------------
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts
index 680232732..5f21287f0 100644
--- a/server/lib/request/request-video-event-scheduler.ts
+++ b/server/lib/request/request-video-event-scheduler.ts
@@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
59 59
60 // We group video events per video and per pod 60 // We group video events per video and per pod
61 // We add the counts of the same event types 61 // We add the counts of the same event types
62 Object.keys(eventRequests).forEach(toPodId => { 62 for (const toPodId of Object.keys(eventRequests)) {
63 eventRequests[toPodId].forEach(eventToProcess => { 63 for (const eventToProcess of eventRequests[toPodId]) {
64 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} 64 if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
65 65
66 if (!requestsToMakeGrouped[toPodId]) { 66 if (!requestsToMakeGrouped[toPodId]) {
@@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
81 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 81 if (!events[eventToProcess.type]) events[eventToProcess.type] = 0
82 82
83 events[eventToProcess.type] += eventToProcess.count 83 events[eventToProcess.type] += eventToProcess.count
84 }) 84 }
85 }) 85 }
86 86
87 // Now we build our requests array per pod 87 // Now we build our requests array per pod
88 Object.keys(eventsPerVideoPerPod).forEach(toPodId => { 88 for (const toPodId of Object.keys(eventsPerVideoPerPod)) {
89 const eventsForPod = eventsPerVideoPerPod[toPodId] 89 const eventsForPod = eventsPerVideoPerPod[toPodId]
90 90
91 Object.keys(eventsForPod).forEach(uuid => { 91 for (const uuid of Object.keys(eventsForPod)) {
92 const eventsForVideo = eventsForPod[uuid] 92 const eventsForVideo = eventsForPod[uuid]
93 93
94 Object.keys(eventsForVideo).forEach(eventType => { 94 for (const eventType of Object.keys(eventsForVideo)) {
95 requestsToMakeGrouped[toPodId].datas.push({ 95 requestsToMakeGrouped[toPodId].datas.push({
96 data: { 96 data: {
97 uuid, 97 uuid,
@@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE
99 count: +eventsForVideo[eventType] 99 count: +eventsForVideo[eventType]
100 } 100 }
101 }) 101 })
102 }) 102 }
103 }) 103 }
104 }) 104 }
105 105
106 return requestsToMakeGrouped 106 return requestsToMakeGrouped
107 } 107 }
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts
index d7e1d5e31..a54efc111 100644
--- a/server/lib/request/request-video-qadu-scheduler.ts
+++ b/server/lib/request/request-video-qadu-scheduler.ts
@@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
59 buildRequestsObjects (requests: RequestsVideoQaduGrouped) { 59 buildRequestsObjects (requests: RequestsVideoQaduGrouped) {
60 const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} 60 const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {}
61 61
62 Object.keys(requests).forEach(toPodId => { 62 for (const toPodId of Object.keys(requests)) {
63 requests[toPodId].forEach(data => { 63 for (const data of requests[toPodId]) {
64 const request = data.request 64 const request = data.request
65 const video = data.video 65 const video = data.video
66 const pod = data.pod 66 const pod = data.pod
@@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa
105 // Maybe there are multiple quick and dirty update for the same video 105 // Maybe there are multiple quick and dirty update for the same video
106 // We use this hash map to dedupe them 106 // We use this hash map to dedupe them
107 requestsToMakeGrouped[hashKey].videos[video.id] = videoData 107 requestsToMakeGrouped[hashKey].videos[video.id] = videoData
108 }) 108 }
109 }) 109 }
110 110
111 // Now we deduped similar quick and dirty updates, we can build our requests data 111 // Now we deduped similar quick and dirty updates, we can build our requests data
112 Object.keys(requestsToMakeGrouped).forEach(hashKey => { 112 for (const hashKey of Object.keys(requestsToMakeGrouped)) {
113 Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { 113 for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) {
114 const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] 114 const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID]
115 115
116 requestsToMakeGrouped[hashKey].datas.push({ 116 requestsToMakeGrouped[hashKey].datas.push({
117 data: videoData 117 data: videoData
118 }) 118 })
119 }) 119 }
120 120
121 // We don't need it anymore, it was just to build our data array 121 // We don't need it anymore, it was just to build our data array
122 delete requestsToMakeGrouped[hashKey].videos 122 delete requestsToMakeGrouped[hashKey].videos
123 }) 123 }
124 124
125 return requestsToMakeGrouped 125 return requestsToMakeGrouped
126 } 126 }
127 127
128 createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { 128 async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
129 const dbRequestOptions: Sequelize.BulkCreateOptions = {} 129 const dbRequestOptions: Sequelize.BulkCreateOptions = {}
130 if (transaction) dbRequestOptions.transaction = transaction 130 if (transaction) dbRequestOptions.transaction = transaction
131 131
132 // Send the update to all our friends 132 // Send the update to all our friends
133 return db.Pod.listAllIds(transaction).then(podIds => { 133 const podIds = await db.Pod.listAllIds(transaction)
134 const queries = [] 134 const queries = []
135 podIds.forEach(podId => { 135 for (const podId of podIds) {
136 queries.push({ type, videoId, podId }) 136 queries.push({ type, videoId, podId })
137 }) 137 }
138 138
139 return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) 139 await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
140 }) 140 return undefined
141 } 141 }
142} 142}
143 143
diff --git a/server/lib/user.ts b/server/lib/user.ts
index 8609e72d8..a92f4777b 100644
--- a/server/lib/user.ts
+++ b/server/lib/user.ts
@@ -3,40 +3,36 @@ import { UserInstance } from '../models'
3import { addVideoAuthorToFriends } from './friends' 3import { addVideoAuthorToFriends } from './friends'
4import { createVideoChannel } from './video-channel' 4import { createVideoChannel } from './video-channel'
5 5
6function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { 6async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) {
7 return db.sequelize.transaction(t => { 7 const res = await db.sequelize.transaction(async t => {
8 const userOptions = { 8 const userOptions = {
9 transaction: t, 9 transaction: t,
10 validate: validateUser 10 validate: validateUser
11 } 11 }
12 12
13 return user.save(userOptions) 13 const userCreated = await user.save(userOptions)
14 .then(user => { 14 const authorInstance = db.Author.build({
15 const author = db.Author.build({ 15 name: userCreated.username,
16 name: user.username, 16 podId: null, // It is our pod
17 podId: null, // It is our pod 17 userId: userCreated.id
18 userId: user.id 18 })
19 }) 19
20 20 const authorCreated = await authorInstance.save({ transaction: t })
21 return author.save({ transaction: t }) 21
22 .then(author => ({ author, user })) 22 const remoteVideoAuthor = authorCreated.toAddRemoteJSON()
23 }) 23
24 .then(({ author, user }) => { 24 // Now we'll add the video channel's meta data to our friends
25 const remoteVideoAuthor = author.toAddRemoteJSON() 25 const author = await addVideoAuthorToFriends(remoteVideoAuthor, t)
26 26
27 // Now we'll add the video channel's meta data to our friends 27 const videoChannelInfo = {
28 return addVideoAuthorToFriends(remoteVideoAuthor, t) 28 name: `Default ${userCreated.username} channel`
29 .then(() => ({ author, user })) 29 }
30 }) 30 const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t)
31 .then(({ author, user }) => { 31
32 const videoChannelInfo = { 32 return { author, videoChannel }
33 name: `Default ${user.username} channel`
34 }
35
36 return createVideoChannel(videoChannelInfo, author, t)
37 .then(videoChannel => ({ author, user, videoChannel }))
38 })
39 }) 33 })
34
35 return res
40} 36}
41 37
42// --------------------------------------------------------------------------- 38// ---------------------------------------------------------------------------
diff --git a/server/middlewares/secure.ts b/server/middlewares/secure.ts
index f7424c9c3..5dd809f15 100644
--- a/server/middlewares/secure.ts
+++ b/server/middlewares/secure.ts
@@ -8,45 +8,44 @@ import {
8} from '../helpers' 8} from '../helpers'
9import { PodSignature } from '../../shared' 9import { PodSignature } from '../../shared'
10 10
11function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) { 11async function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) {
12 const signatureObject: PodSignature = req.body.signature 12 const signatureObject: PodSignature = req.body.signature
13 const host = signatureObject.host 13 const host = signatureObject.host
14 14
15 db.Pod.loadByHost(host) 15 try {
16 .then(pod => { 16 const pod = await db.Pod.loadByHost(host)
17 if (pod === null) { 17 if (pod === null) {
18 logger.error('Unknown pod %s.', host) 18 logger.error('Unknown pod %s.', host)
19 return res.sendStatus(403) 19 return res.sendStatus(403)
20 } 20 }
21
22 logger.debug('Checking signature from %s.', host)
23 21
24 let signatureShouldBe 22 logger.debug('Checking signature from %s.', host)
25 // If there is data in the body the sender used it for its signature
26 // If there is no data we just use its host as signature
27 if (req.body.data) {
28 signatureShouldBe = req.body.data
29 } else {
30 signatureShouldBe = host
31 }
32 23
33 const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature) 24 let signatureShouldBe
25 // If there is data in the body the sender used it for its signature
26 // If there is no data we just use its host as signature
27 if (req.body.data) {
28 signatureShouldBe = req.body.data
29 } else {
30 signatureShouldBe = host
31 }
34 32
35 if (signatureOk === true) { 33 const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature)
36 res.locals.secure = {
37 pod
38 }
39 34
40 return next() 35 if (signatureOk === true) {
36 res.locals.secure = {
37 pod
41 } 38 }
42 39
43 logger.error('Signature is not okay in body for %s.', signatureObject.host) 40 return next()
44 return res.sendStatus(403) 41 }
45 }) 42
46 .catch(err => { 43 logger.error('Signature is not okay in body for %s.', signatureObject.host)
47 logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature }) 44 return res.sendStatus(403)
48 return res.sendStatus(500) 45 } catch (err) {
49 }) 46 logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature })
47 return res.sendStatus(500)
48 }
50} 49}
51 50
52// --------------------------------------------------------------------------- 51// ---------------------------------------------------------------------------
diff --git a/server/models/pod/pod.ts b/server/models/pod/pod.ts
index e4d7db48a..6619726af 100644
--- a/server/models/pod/pod.ts
+++ b/server/models/pod/pod.ts
@@ -247,20 +247,21 @@ updatePodsScore = function (goodPods: number[], badPods: number[]) {
247// --------------------------------------------------------------------------- 247// ---------------------------------------------------------------------------
248 248
249// Remove pods with a score of 0 (too many requests where they were unreachable) 249// Remove pods with a score of 0 (too many requests where they were unreachable)
250function removeBadPods () { 250async function removeBadPods () {
251 return listBadPods() 251 try {
252 .then(pods => { 252 const pods = await listBadPods()
253 const podsRemovePromises = pods.map(pod => pod.destroy()) 253
254 return Promise.all(podsRemovePromises).then(() => pods.length) 254 const podsRemovePromises = pods.map(pod => pod.destroy())
255 }) 255 await Promise.all(podsRemovePromises)
256 .then(numberOfPodsRemoved => { 256
257 if (numberOfPodsRemoved) { 257 const numberOfPodsRemoved = pods.length
258 logger.info('Removed %d pods.', numberOfPodsRemoved) 258
259 } else { 259 if (numberOfPodsRemoved) {
260 logger.info('No need to remove bad pods.') 260 logger.info('Removed %d pods.', numberOfPodsRemoved)
261 } 261 } else {
262 }) 262 logger.info('No need to remove bad pods.')
263 .catch(err => { 263 }
264 logger.error('Cannot remove bad pods.', err) 264 } catch (err) {
265 }) 265 logger.error('Cannot remove bad pods.', err)
266 }
266} 267}