diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-10-25 16:03:33 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-10-26 09:11:38 +0200 |
commit | f5028693a896a3076dd286ac0030e3d8f78f5ebf (patch) | |
tree | 09144ed6357e49ea575fb110247f933283ad235e /server | |
parent | eb08047657e739bcd9e592d76307befa3998482b (diff) | |
download | PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.gz PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.tar.zst PeerTube-f5028693a896a3076dd286ac0030e3d8f78f5ebf.zip |
Use async/await in lib and initializers
Diffstat (limited to 'server')
-rw-r--r-- | server/helpers/peertube-crypto.ts | 103 | ||||
-rw-r--r-- | server/helpers/requests.ts | 2 | ||||
-rw-r--r-- | server/helpers/utils.ts | 20 | ||||
-rw-r--r-- | server/initializers/checker.ts | 46 | ||||
-rw-r--r-- | server/initializers/database.ts | 90 | ||||
-rw-r--r-- | server/initializers/index.ts | 2 | ||||
-rw-r--r-- | server/initializers/installer.ts | 141 | ||||
-rw-r--r-- | server/initializers/migrator.ts | 107 | ||||
-rw-r--r-- | server/lib/cache/videos-preview-cache.ts | 15 | ||||
-rw-r--r-- | server/lib/friends.ts | 368 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-file-optimizer.ts | 88 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-file-transcoder.ts | 21 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 118 | ||||
-rw-r--r-- | server/lib/oauth-model.ts | 54 | ||||
-rw-r--r-- | server/lib/request/abstract-request-scheduler.ts | 121 | ||||
-rw-r--r-- | server/lib/request/request-scheduler.ts | 16 | ||||
-rw-r--r-- | server/lib/request/request-video-event-scheduler.ts | 20 | ||||
-rw-r--r-- | server/lib/request/request-video-qadu-scheduler.ts | 34 | ||||
-rw-r--r-- | server/lib/user.ts | 52 | ||||
-rw-r--r-- | server/middlewares/secure.ts | 59 | ||||
-rw-r--r-- | server/models/pod/pod.ts | 33 |
21 files changed, 740 insertions, 770 deletions
diff --git a/server/helpers/peertube-crypto.ts b/server/helpers/peertube-crypto.ts index 89aef99c4..47f0243e7 100644 --- a/server/helpers/peertube-crypto.ts +++ b/server/helpers/peertube-crypto.ts | |||
@@ -1,5 +1,4 @@ | |||
1 | import * as crypto from 'crypto' | 1 | import * as crypto from 'crypto' |
2 | import * as Promise from 'bluebird' | ||
3 | import { join } from 'path' | 2 | import { join } from 'path' |
4 | 3 | ||
5 | import { | 4 | import { |
@@ -41,7 +40,7 @@ function checkSignature (publicKey: string, data: string, hexSignature: string) | |||
41 | return isValid | 40 | return isValid |
42 | } | 41 | } |
43 | 42 | ||
44 | function sign (data: string|Object) { | 43 | async function sign (data: string|Object) { |
45 | const sign = crypto.createSign(SIGNATURE_ALGORITHM) | 44 | const sign = crypto.createSign(SIGNATURE_ALGORITHM) |
46 | 45 | ||
47 | let dataString: string | 46 | let dataString: string |
@@ -52,33 +51,33 @@ function sign (data: string|Object) { | |||
52 | dataString = JSON.stringify(data) | 51 | dataString = JSON.stringify(data) |
53 | } catch (err) { | 52 | } catch (err) { |
54 | logger.error('Cannot sign data.', err) | 53 | logger.error('Cannot sign data.', err) |
55 | return Promise.resolve('') | 54 | return '' |
56 | } | 55 | } |
57 | } | 56 | } |
58 | 57 | ||
59 | sign.update(dataString, 'utf8') | 58 | sign.update(dataString, 'utf8') |
60 | 59 | ||
61 | return getMyPrivateCert().then(myKey => { | 60 | const myKey = await getMyPrivateCert() |
62 | return sign.sign(myKey, SIGNATURE_ENCODING) | 61 | return await sign.sign(myKey, SIGNATURE_ENCODING) |
63 | }) | ||
64 | } | 62 | } |
65 | 63 | ||
66 | function comparePassword (plainPassword: string, hashPassword: string) { | 64 | function comparePassword (plainPassword: string, hashPassword: string) { |
67 | return bcryptComparePromise(plainPassword, hashPassword) | 65 | return bcryptComparePromise(plainPassword, hashPassword) |
68 | } | 66 | } |
69 | 67 | ||
70 | function createCertsIfNotExist () { | 68 | async function createCertsIfNotExist () { |
71 | return certsExist().then(exist => { | 69 | const exist = await certsExist() |
72 | if (exist === true) { | 70 | if (exist === true) { |
73 | return undefined | 71 | return undefined |
74 | } | 72 | } |
75 | 73 | ||
76 | return createCerts() | 74 | return await createCerts() |
77 | }) | ||
78 | } | 75 | } |
79 | 76 | ||
80 | function cryptPassword (password: string) { | 77 | async function cryptPassword (password: string) { |
81 | return bcryptGenSaltPromise(BCRYPT_SALT_SIZE).then(salt => bcryptHashPromise(password, salt)) | 78 | const salt = await bcryptGenSaltPromise(BCRYPT_SALT_SIZE) |
79 | |||
80 | return await bcryptHashPromise(password, salt) | ||
82 | } | 81 | } |
83 | 82 | ||
84 | function getMyPrivateCert () { | 83 | function getMyPrivateCert () { |
@@ -105,51 +104,45 @@ export { | |||
105 | 104 | ||
106 | // --------------------------------------------------------------------------- | 105 | // --------------------------------------------------------------------------- |
107 | 106 | ||
108 | function certsExist () { | 107 | async function certsExist () { |
109 | const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) | 108 | const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) |
110 | 109 | ||
111 | // If there is an error the certificates do not exist | 110 | // If there is an error the certificates do not exist |
112 | return accessPromise(certPath) | 111 | try { |
113 | .then(() => true) | 112 | await accessPromise(certPath) |
114 | .catch(() => false) | 113 | |
114 | return true | ||
115 | } catch { | ||
116 | return false | ||
117 | } | ||
115 | } | 118 | } |
116 | 119 | ||
117 | function createCerts () { | 120 | async function createCerts () { |
118 | return certsExist().then(exist => { | 121 | const exist = await certsExist() |
119 | if (exist === true) { | 122 | if (exist === true) { |
120 | const errorMessage = 'Certs already exist.' | 123 | const errorMessage = 'Certs already exist.' |
121 | logger.warning(errorMessage) | 124 | logger.warning(errorMessage) |
122 | throw new Error(errorMessage) | 125 | throw new Error(errorMessage) |
123 | } | 126 | } |
124 | 127 | ||
125 | logger.info('Generating a RSA key...') | 128 | logger.info('Generating a RSA key...') |
126 | 129 | ||
127 | const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) | 130 | const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) |
128 | const genRsaOptions = { | 131 | const genRsaOptions = { |
129 | 'out': privateCertPath, | 132 | 'out': privateCertPath, |
130 | '2048': false | 133 | '2048': false |
131 | } | 134 | } |
132 | return opensslExecPromise('genrsa', genRsaOptions) | 135 | |
133 | .then(() => { | 136 | await opensslExecPromise('genrsa', genRsaOptions) |
134 | logger.info('RSA key generated.') | 137 | logger.info('RSA key generated.') |
135 | logger.info('Managing public key...') | 138 | logger.info('Managing public key...') |
136 | 139 | ||
137 | const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub') | 140 | const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub') |
138 | const rsaOptions = { | 141 | const rsaOptions = { |
139 | 'in': privateCertPath, | 142 | 'in': privateCertPath, |
140 | 'pubout': true, | 143 | 'pubout': true, |
141 | 'out': publicCertPath | 144 | 'out': publicCertPath |
142 | } | 145 | } |
143 | return opensslExecPromise('rsa', rsaOptions) | 146 | |
144 | .then(() => logger.info('Public key managed.')) | 147 | await opensslExecPromise('rsa', rsaOptions) |
145 | .catch(err => { | ||
146 | logger.error('Cannot create public key on this pod.') | ||
147 | throw err | ||
148 | }) | ||
149 | }) | ||
150 | .catch(err => { | ||
151 | logger.error('Cannot create private key on this pod.') | ||
152 | throw err | ||
153 | }) | ||
154 | }) | ||
155 | } | 148 | } |
diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts index d67d46044..8c5d848f3 100644 --- a/server/helpers/requests.ts +++ b/server/helpers/requests.ts | |||
@@ -73,7 +73,7 @@ function makeSecureRequest (params: MakeSecureRequestParams) { | |||
73 | signature | 73 | signature |
74 | } | 74 | } |
75 | 75 | ||
76 | // If there are data informations | 76 | // If there are data information |
77 | if (params.data) { | 77 | if (params.data) { |
78 | requestParams.json.data = params.data | 78 | requestParams.json.data = params.data |
79 | } | 79 | } |
diff --git a/server/helpers/utils.ts b/server/helpers/utils.ts index 6cabe117c..8b81a61e1 100644 --- a/server/helpers/utils.ts +++ b/server/helpers/utils.ts | |||
@@ -8,11 +8,13 @@ import { ResultList } from '../../shared' | |||
8 | import { VideoResolution } from '../../shared/models/videos/video-resolution.enum' | 8 | import { VideoResolution } from '../../shared/models/videos/video-resolution.enum' |
9 | 9 | ||
10 | function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) { | 10 | function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) { |
11 | res.type('json').status(400).end() | 11 | return res.type('json').status(400).end() |
12 | } | 12 | } |
13 | 13 | ||
14 | function generateRandomString (size: number) { | 14 | async function generateRandomString (size: number) { |
15 | return pseudoRandomBytesPromise(size).then(raw => raw.toString('hex')) | 15 | const raw = await pseudoRandomBytesPromise(size) |
16 | |||
17 | return raw.toString('hex') | ||
16 | } | 18 | } |
17 | 19 | ||
18 | interface FormattableToJSON { | 20 | interface FormattableToJSON { |
@@ -34,19 +36,19 @@ function getFormattedObjects<U, T extends FormattableToJSON> (objects: T[], obje | |||
34 | return res | 36 | return res |
35 | } | 37 | } |
36 | 38 | ||
37 | function isSignupAllowed () { | 39 | async function isSignupAllowed () { |
38 | if (CONFIG.SIGNUP.ENABLED === false) { | 40 | if (CONFIG.SIGNUP.ENABLED === false) { |
39 | return Promise.resolve(false) | 41 | return false |
40 | } | 42 | } |
41 | 43 | ||
42 | // No limit and signup is enabled | 44 | // No limit and signup is enabled |
43 | if (CONFIG.SIGNUP.LIMIT === -1) { | 45 | if (CONFIG.SIGNUP.LIMIT === -1) { |
44 | return Promise.resolve(true) | 46 | return true |
45 | } | 47 | } |
46 | 48 | ||
47 | return db.User.countTotal().then(totalUsers => { | 49 | const totalUsers = await db.User.countTotal() |
48 | return totalUsers < CONFIG.SIGNUP.LIMIT | 50 | |
49 | }) | 51 | return totalUsers < CONFIG.SIGNUP.LIMIT |
50 | } | 52 | } |
51 | 53 | ||
52 | function computeResolutionsToTranscode (videoFileHeight: number) { | 54 | function computeResolutionsToTranscode (videoFileHeight: number) { |
diff --git a/server/initializers/checker.ts b/server/initializers/checker.ts index eb9e9e280..9eaef1695 100644 --- a/server/initializers/checker.ts +++ b/server/initializers/checker.ts | |||
@@ -37,39 +37,37 @@ function checkMissedConfig () { | |||
37 | 37 | ||
38 | // Check the available codecs | 38 | // Check the available codecs |
39 | // We get CONFIG by param to not import it in this file (import orders) | 39 | // We get CONFIG by param to not import it in this file (import orders) |
40 | function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) { | 40 | async function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) { |
41 | const Ffmpeg = require('fluent-ffmpeg') | 41 | const Ffmpeg = require('fluent-ffmpeg') |
42 | const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs) | 42 | const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs) |
43 | 43 | ||
44 | getAvailableCodecsPromise() | 44 | const codecs = await getAvailableCodecsPromise() |
45 | .then(codecs => { | 45 | if (CONFIG.TRANSCODING.ENABLED === false) return undefined |
46 | if (CONFIG.TRANSCODING.ENABLED === false) return undefined | 46 | |
47 | 47 | const canEncode = [ 'libx264' ] | |
48 | const canEncode = [ 'libx264' ] | 48 | for (const codec of canEncode) { |
49 | canEncode.forEach(codec => { | 49 | if (codecs[codec] === undefined) { |
50 | if (codecs[codec] === undefined) { | 50 | throw new Error('Unknown codec ' + codec + ' in FFmpeg.') |
51 | throw new Error('Unknown codec ' + codec + ' in FFmpeg.') | 51 | } |
52 | } | 52 | |
53 | 53 | if (codecs[codec].canEncode !== true) { | |
54 | if (codecs[codec].canEncode !== true) { | 54 | throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg') |
55 | throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg') | 55 | } |
56 | } | 56 | } |
57 | }) | ||
58 | }) | ||
59 | } | 57 | } |
60 | 58 | ||
61 | // We get db by param to not import it in this file (import orders) | 59 | // We get db by param to not import it in this file (import orders) |
62 | function clientsExist (OAuthClient: OAuthClientModel) { | 60 | async function clientsExist (OAuthClient: OAuthClientModel) { |
63 | return OAuthClient.countTotal().then(totalClients => { | 61 | const totalClients = await OAuthClient.countTotal() |
64 | return totalClients !== 0 | 62 | |
65 | }) | 63 | return totalClients !== 0 |
66 | } | 64 | } |
67 | 65 | ||
68 | // We get db by param to not import it in this file (import orders) | 66 | // We get db by param to not import it in this file (import orders) |
69 | function usersExist (User: UserModel) { | 67 | async function usersExist (User: UserModel) { |
70 | return User.countTotal().then(totalUsers => { | 68 | const totalUsers = await User.countTotal() |
71 | return totalUsers !== 0 | 69 | |
72 | }) | 70 | return totalUsers !== 0 |
73 | } | 71 | } |
74 | 72 | ||
75 | // --------------------------------------------------------------------------- | 73 | // --------------------------------------------------------------------------- |
diff --git a/server/initializers/database.ts b/server/initializers/database.ts index ea2b68f59..ade72b62f 100644 --- a/server/initializers/database.ts +++ b/server/initializers/database.ts | |||
@@ -2,7 +2,7 @@ import { join } from 'path' | |||
2 | import { flattenDepth } from 'lodash' | 2 | import { flattenDepth } from 'lodash' |
3 | require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string | 3 | require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string |
4 | import * as Sequelize from 'sequelize' | 4 | import * as Sequelize from 'sequelize' |
5 | import * as Promise from 'bluebird' | 5 | import * as Bluebird from 'bluebird' |
6 | 6 | ||
7 | import { CONFIG } from './constants' | 7 | import { CONFIG } from './constants' |
8 | // Do not use barrel, we need to load database first | 8 | // Do not use barrel, we need to load database first |
@@ -77,26 +77,26 @@ const sequelize = new Sequelize(dbname, username, password, { | |||
77 | 77 | ||
78 | database.sequelize = sequelize | 78 | database.sequelize = sequelize |
79 | 79 | ||
80 | database.init = (silent: boolean) => { | 80 | database.init = async (silent: boolean) => { |
81 | const modelDirectory = join(__dirname, '..', 'models') | 81 | const modelDirectory = join(__dirname, '..', 'models') |
82 | 82 | ||
83 | return getModelFiles(modelDirectory).then(filePaths => { | 83 | const filePaths = await getModelFiles(modelDirectory) |
84 | filePaths.forEach(filePath => { | ||
85 | const model = sequelize.import(filePath) | ||
86 | 84 | ||
87 | database[model['name']] = model | 85 | for (const filePath of filePaths) { |
88 | }) | 86 | const model = sequelize.import(filePath) |
89 | 87 | ||
90 | Object.keys(database).forEach(modelName => { | 88 | database[model['name']] = model |
91 | if ('associate' in database[modelName]) { | 89 | } |
92 | database[modelName].associate(database) | ||
93 | } | ||
94 | }) | ||
95 | 90 | ||
96 | if (!silent) logger.info('Database %s is ready.', dbname) | 91 | for (const modelName of Object.keys(database)) { |
92 | if ('associate' in database[modelName]) { | ||
93 | database[modelName].associate(database) | ||
94 | } | ||
95 | } | ||
97 | 96 | ||
98 | return undefined | 97 | if (!silent) logger.info('Database %s is ready.', dbname) |
99 | }) | 98 | |
99 | return undefined | ||
100 | } | 100 | } |
101 | 101 | ||
102 | // --------------------------------------------------------------------------- | 102 | // --------------------------------------------------------------------------- |
@@ -107,31 +107,29 @@ export { | |||
107 | 107 | ||
108 | // --------------------------------------------------------------------------- | 108 | // --------------------------------------------------------------------------- |
109 | 109 | ||
110 | function getModelFiles (modelDirectory: string) { | 110 | async function getModelFiles (modelDirectory: string) { |
111 | return readdirPromise(modelDirectory) | 111 | const files = await readdirPromise(modelDirectory) |
112 | .then(files => { | 112 | const directories = files.filter(directory => { |
113 | const directories: string[] = files.filter(directory => { | 113 | // Find directories |
114 | // Find directories | 114 | if ( |
115 | if ( | 115 | directory.endsWith('.js.map') || |
116 | directory.endsWith('.js.map') || | 116 | directory === 'index.js' || directory === 'index.ts' || |
117 | directory === 'index.js' || directory === 'index.ts' || | 117 | directory === 'utils.js' || directory === 'utils.ts' |
118 | directory === 'utils.js' || directory === 'utils.ts' | 118 | ) return false |
119 | ) return false | 119 | |
120 | 120 | return true | |
121 | return true | 121 | }) |
122 | }) | ||
123 | 122 | ||
124 | return directories | 123 | const tasks: Bluebird<any>[] = [] |
125 | }) | ||
126 | .then(directories => { | ||
127 | const tasks = [] | ||
128 | 124 | ||
129 | // For each directory we read it and append model in the modelFilePaths array | 125 | // For each directory we read it and append model in the modelFilePaths array |
130 | directories.forEach(directory => { | 126 | for (const directory of directories) { |
131 | const modelDirectoryPath = join(modelDirectory, directory) | 127 | const modelDirectoryPath = join(modelDirectory, directory) |
132 | 128 | ||
133 | const promise = readdirPromise(modelDirectoryPath).then(files => { | 129 | const promise = readdirPromise(modelDirectoryPath) |
134 | const filteredFiles = files.filter(file => { | 130 | .then(files => { |
131 | const filteredFiles = files | ||
132 | .filter(file => { | ||
135 | if ( | 133 | if ( |
136 | file === 'index.js' || file === 'index.ts' || | 134 | file === 'index.js' || file === 'index.ts' || |
137 | file === 'utils.js' || file === 'utils.ts' || | 135 | file === 'utils.js' || file === 'utils.ts' || |
@@ -140,17 +138,15 @@ function getModelFiles (modelDirectory: string) { | |||
140 | ) return false | 138 | ) return false |
141 | 139 | ||
142 | return true | 140 | return true |
143 | }).map(file => join(modelDirectoryPath, file)) | 141 | }) |
144 | 142 | .map(file => join(modelDirectoryPath, file)) | |
145 | return filteredFiles | ||
146 | }) | ||
147 | 143 | ||
148 | tasks.push(promise) | 144 | return filteredFiles |
149 | }) | 145 | }) |
150 | 146 | ||
151 | return Promise.all(tasks) | 147 | tasks.push(promise) |
152 | }) | 148 | } |
153 | .then((filteredFiles: string[][]) => { | 149 | |
154 | return flattenDepth<string>(filteredFiles, 1) | 150 | const filteredFilesArray: string[][] = await Promise.all(tasks) |
155 | }) | 151 | return flattenDepth<string>(filteredFilesArray, 1) |
156 | } | 152 | } |
diff --git a/server/initializers/index.ts b/server/initializers/index.ts index b8400ff84..332702774 100644 --- a/server/initializers/index.ts +++ b/server/initializers/index.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | // Constants first, databse in second! | 1 | // Constants first, database in second! |
2 | export * from './constants' | 2 | export * from './constants' |
3 | export * from './database' | 3 | export * from './database' |
4 | export * from './checker' | 4 | export * from './checker' |
diff --git a/server/initializers/installer.ts b/server/initializers/installer.ts index b997de07f..4c04290fc 100644 --- a/server/initializers/installer.ts +++ b/server/initializers/installer.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import * as passwordGenerator from 'password-generator' | 1 | import * as passwordGenerator from 'password-generator' |
2 | import * as Promise from 'bluebird' | 2 | import * as Bluebird from 'bluebird' |
3 | 3 | ||
4 | import { database as db } from './database' | 4 | import { database as db } from './database' |
5 | import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants' | 5 | import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants' |
@@ -7,13 +7,13 @@ import { clientsExist, usersExist } from './checker' | |||
7 | import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers' | 7 | import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers' |
8 | import { createUserAuthorAndChannel } from '../lib' | 8 | import { createUserAuthorAndChannel } from '../lib' |
9 | 9 | ||
10 | function installApplication () { | 10 | async function installApplication () { |
11 | return db.sequelize.sync() | 11 | await db.sequelize.sync() |
12 | .then(() => removeCacheDirectories()) | 12 | await removeCacheDirectories() |
13 | .then(() => createDirectoriesIfNotExist()) | 13 | await createDirectoriesIfNotExist() |
14 | .then(() => createCertsIfNotExist()) | 14 | await createCertsIfNotExist() |
15 | .then(() => createOAuthClientIfNotExist()) | 15 | await createOAuthClientIfNotExist() |
16 | .then(() => createOAuthAdminIfNotExist()) | 16 | await createOAuthAdminIfNotExist() |
17 | } | 17 | } |
18 | 18 | ||
19 | // --------------------------------------------------------------------------- | 19 | // --------------------------------------------------------------------------- |
@@ -27,13 +27,13 @@ export { | |||
27 | function removeCacheDirectories () { | 27 | function removeCacheDirectories () { |
28 | const cacheDirectories = CACHE.DIRECTORIES | 28 | const cacheDirectories = CACHE.DIRECTORIES |
29 | 29 | ||
30 | const tasks = [] | 30 | const tasks: Bluebird<any>[] = [] |
31 | 31 | ||
32 | // Cache directories | 32 | // Cache directories |
33 | Object.keys(cacheDirectories).forEach(key => { | 33 | for (const key of Object.keys(cacheDirectories)) { |
34 | const dir = cacheDirectories[key] | 34 | const dir = cacheDirectories[key] |
35 | tasks.push(rimrafPromise(dir)) | 35 | tasks.push(rimrafPromise(dir)) |
36 | }) | 36 | } |
37 | 37 | ||
38 | return Promise.all(tasks) | 38 | return Promise.all(tasks) |
39 | } | 39 | } |
@@ -43,88 +43,83 @@ function createDirectoriesIfNotExist () { | |||
43 | const cacheDirectories = CACHE.DIRECTORIES | 43 | const cacheDirectories = CACHE.DIRECTORIES |
44 | 44 | ||
45 | const tasks = [] | 45 | const tasks = [] |
46 | Object.keys(storage).forEach(key => { | 46 | for (const key of Object.keys(storage)) { |
47 | const dir = storage[key] | 47 | const dir = storage[key] |
48 | tasks.push(mkdirpPromise(dir)) | 48 | tasks.push(mkdirpPromise(dir)) |
49 | }) | 49 | } |
50 | 50 | ||
51 | // Cache directories | 51 | // Cache directories |
52 | Object.keys(cacheDirectories).forEach(key => { | 52 | for (const key of Object.keys(cacheDirectories)) { |
53 | const dir = cacheDirectories[key] | 53 | const dir = cacheDirectories[key] |
54 | tasks.push(mkdirpPromise(dir)) | 54 | tasks.push(mkdirpPromise(dir)) |
55 | }) | 55 | } |
56 | 56 | ||
57 | return Promise.all(tasks) | 57 | return Promise.all(tasks) |
58 | } | 58 | } |
59 | 59 | ||
60 | function createOAuthClientIfNotExist () { | 60 | async function createOAuthClientIfNotExist () { |
61 | return clientsExist(db.OAuthClient).then(exist => { | 61 | const exist = await clientsExist(db.OAuthClient) |
62 | // Nothing to do, clients already exist | 62 | // Nothing to do, clients already exist |
63 | if (exist === true) return undefined | 63 | if (exist === true) return undefined |
64 | |||
65 | logger.info('Creating a default OAuth Client.') | ||
66 | |||
67 | const id = passwordGenerator(32, false, /[a-z0-9]/) | ||
68 | const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/) | ||
69 | const client = db.OAuthClient.build({ | ||
70 | clientId: id, | ||
71 | clientSecret: secret, | ||
72 | grants: [ 'password', 'refresh_token' ], | ||
73 | redirectUris: null | ||
74 | }) | ||
75 | 64 | ||
76 | return client.save().then(createdClient => { | 65 | logger.info('Creating a default OAuth Client.') |
77 | logger.info('Client id: ' + createdClient.clientId) | ||
78 | logger.info('Client secret: ' + createdClient.clientSecret) | ||
79 | 66 | ||
80 | return undefined | 67 | const id = passwordGenerator(32, false, /[a-z0-9]/) |
81 | }) | 68 | const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/) |
69 | const client = db.OAuthClient.build({ | ||
70 | clientId: id, | ||
71 | clientSecret: secret, | ||
72 | grants: [ 'password', 'refresh_token' ], | ||
73 | redirectUris: null | ||
82 | }) | 74 | }) |
83 | } | ||
84 | 75 | ||
85 | function createOAuthAdminIfNotExist () { | 76 | const createdClient = await client.save() |
86 | return usersExist(db.User).then(exist => { | 77 | logger.info('Client id: ' + createdClient.clientId) |
87 | // Nothing to do, users already exist | 78 | logger.info('Client secret: ' + createdClient.clientSecret) |
88 | if (exist === true) return undefined | ||
89 | 79 | ||
90 | logger.info('Creating the administrator.') | 80 | return undefined |
81 | } | ||
91 | 82 | ||
92 | const username = 'root' | 83 | async function createOAuthAdminIfNotExist () { |
93 | const role = USER_ROLES.ADMIN | 84 | const exist = await usersExist(db.User) |
94 | const email = CONFIG.ADMIN.EMAIL | 85 | // Nothing to do, users already exist |
95 | let validatePassword = true | 86 | if (exist === true) return undefined |
96 | let password = '' | ||
97 | 87 | ||
98 | // Do not generate a random password for tests | 88 | logger.info('Creating the administrator.') |
99 | if (process.env.NODE_ENV === 'test') { | ||
100 | password = 'test' | ||
101 | 89 | ||
102 | if (process.env.NODE_APP_INSTANCE) { | 90 | const username = 'root' |
103 | password += process.env.NODE_APP_INSTANCE | 91 | const role = USER_ROLES.ADMIN |
104 | } | 92 | const email = CONFIG.ADMIN.EMAIL |
93 | let validatePassword = true | ||
94 | let password = '' | ||
105 | 95 | ||
106 | // Our password is weak so do not validate it | 96 | // Do not generate a random password for tests |
107 | validatePassword = false | 97 | if (process.env.NODE_ENV === 'test') { |
108 | } else { | 98 | password = 'test' |
109 | password = passwordGenerator(8, true) | ||
110 | } | ||
111 | 99 | ||
112 | const userData = { | 100 | if (process.env.NODE_APP_INSTANCE) { |
113 | username, | 101 | password += process.env.NODE_APP_INSTANCE |
114 | email, | ||
115 | password, | ||
116 | role, | ||
117 | videoQuota: -1 | ||
118 | } | 102 | } |
119 | const user = db.User.build(userData) | ||
120 | 103 | ||
121 | return createUserAuthorAndChannel(user, validatePassword) | 104 | // Our password is weak so do not validate it |
122 | .then(({ user }) => { | 105 | validatePassword = false |
123 | logger.info('Username: ' + username) | 106 | } else { |
124 | logger.info('User password: ' + password) | 107 | password = passwordGenerator(8, true) |
125 | 108 | } | |
126 | logger.info('Creating Application table.') | 109 | |
127 | return db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION }) | 110 | const userData = { |
128 | }) | 111 | username, |
129 | }) | 112 | email, |
113 | password, | ||
114 | role, | ||
115 | videoQuota: -1 | ||
116 | } | ||
117 | const user = db.User.build(userData) | ||
118 | |||
119 | await createUserAuthorAndChannel(user, validatePassword) | ||
120 | logger.info('Username: ' + username) | ||
121 | logger.info('User password: ' + password) | ||
122 | |||
123 | logger.info('Creating Application table.') | ||
124 | await db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION }) | ||
130 | } | 125 | } |
diff --git a/server/initializers/migrator.ts b/server/initializers/migrator.ts index 7b535aea9..4fbe1cf5b 100644 --- a/server/initializers/migrator.ts +++ b/server/initializers/migrator.ts | |||
@@ -1,52 +1,35 @@ | |||
1 | import * as path from 'path' | 1 | import * as path from 'path' |
2 | import * as Promise from 'bluebird' | ||
3 | 2 | ||
4 | import { database as db } from './database' | 3 | import { database as db } from './database' |
5 | import { LAST_MIGRATION_VERSION } from './constants' | 4 | import { LAST_MIGRATION_VERSION } from './constants' |
6 | import { logger, readdirPromise } from '../helpers' | 5 | import { logger, readdirPromise } from '../helpers' |
7 | 6 | ||
8 | function migrate () { | 7 | async function migrate () { |
9 | const p = db.sequelize.getQueryInterface().showAllTables() | 8 | const tables = await db.sequelize.getQueryInterface().showAllTables() |
10 | .then(tables => { | ||
11 | // No tables, we don't need to migrate anything | ||
12 | // The installer will do that | ||
13 | if (tables.length === 0) throw null | ||
14 | }) | ||
15 | .then(() => { | ||
16 | return db.Application.loadMigrationVersion() | ||
17 | }) | ||
18 | .then(actualVersion => { | ||
19 | if (actualVersion === null) { | ||
20 | return db.Application.create({ migrationVersion: 0 }).then(() => 0) | ||
21 | } | ||
22 | 9 | ||
23 | return actualVersion | 10 | // No tables, we don't need to migrate anything |
24 | }) | 11 | // The installer will do that |
25 | .then(actualVersion => { | 12 | if (tables.length === 0) return |
26 | // No need migrations, abort | ||
27 | if (actualVersion >= LAST_MIGRATION_VERSION) throw null | ||
28 | 13 | ||
29 | return actualVersion | 14 | let actualVersion = await db.Application.loadMigrationVersion() |
30 | }) | 15 | if (actualVersion === null) { |
31 | .then(actualVersion => { | 16 | await db.Application.create({ migrationVersion: 0 }) |
32 | // If there are a new migration scripts | 17 | actualVersion = 0 |
33 | logger.info('Begin migrations.') | 18 | } |
34 | 19 | ||
35 | return getMigrationScripts().then(migrationScripts => ({ actualVersion, migrationScripts })) | 20 | // No need migrations, abort |
36 | }) | 21 | if (actualVersion >= LAST_MIGRATION_VERSION) return |
37 | .then(({ actualVersion, migrationScripts }) => { | ||
38 | return Promise.each(migrationScripts, entity => executeMigration(actualVersion, entity)) | ||
39 | }) | ||
40 | .then(() => { | ||
41 | logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION) | ||
42 | }) | ||
43 | .catch(err => { | ||
44 | if (err === null) return undefined | ||
45 | 22 | ||
46 | throw err | 23 | // If there are a new migration scripts |
47 | }) | 24 | logger.info('Begin migrations.') |
25 | |||
26 | const migrationScripts = await getMigrationScripts() | ||
48 | 27 | ||
49 | return p | 28 | for (const migrationScript of migrationScripts) { |
29 | await executeMigration(actualVersion, migrationScript) | ||
30 | } | ||
31 | |||
32 | logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION) | ||
50 | } | 33 | } |
51 | 34 | ||
52 | // --------------------------------------------------------------------------- | 35 | // --------------------------------------------------------------------------- |
@@ -57,29 +40,28 @@ export { | |||
57 | 40 | ||
58 | // --------------------------------------------------------------------------- | 41 | // --------------------------------------------------------------------------- |
59 | 42 | ||
60 | function getMigrationScripts () { | 43 | async function getMigrationScripts () { |
61 | return readdirPromise(path.join(__dirname, 'migrations')).then(files => { | 44 | const files = await readdirPromise(path.join(__dirname, 'migrations')) |
62 | const filesToMigrate: { | 45 | const filesToMigrate: { |
63 | version: string, | 46 | version: string, |
64 | script: string | 47 | script: string |
65 | }[] = [] | 48 | }[] = [] |
66 | 49 | ||
67 | files | 50 | files |
68 | .filter(file => file.endsWith('.js.map') === false) | 51 | .filter(file => file.endsWith('.js.map') === false) |
69 | .forEach(file => { | 52 | .forEach(file => { |
70 | // Filename is something like 'version-blabla.js' | 53 | // Filename is something like 'version-blabla.js' |
71 | const version = file.split('-')[0] | 54 | const version = file.split('-')[0] |
72 | filesToMigrate.push({ | 55 | filesToMigrate.push({ |
73 | version, | 56 | version, |
74 | script: file | 57 | script: file |
75 | }) | ||
76 | }) | 58 | }) |
59 | }) | ||
77 | 60 | ||
78 | return filesToMigrate | 61 | return filesToMigrate |
79 | }) | ||
80 | } | 62 | } |
81 | 63 | ||
82 | function executeMigration (actualVersion: number, entity: { version: string, script: string }) { | 64 | async function executeMigration (actualVersion: number, entity: { version: string, script: string }) { |
83 | const versionScript = parseInt(entity.version, 10) | 65 | const versionScript = parseInt(entity.version, 10) |
84 | 66 | ||
85 | // Do not execute old migration scripts | 67 | // Do not execute old migration scripts |
@@ -91,7 +73,7 @@ function executeMigration (actualVersion: number, entity: { version: string, scr | |||
91 | 73 | ||
92 | const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName)) | 74 | const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName)) |
93 | 75 | ||
94 | return db.sequelize.transaction(t => { | 76 | await db.sequelize.transaction(async t => { |
95 | const options = { | 77 | const options = { |
96 | transaction: t, | 78 | transaction: t, |
97 | queryInterface: db.sequelize.getQueryInterface(), | 79 | queryInterface: db.sequelize.getQueryInterface(), |
@@ -99,10 +81,9 @@ function executeMigration (actualVersion: number, entity: { version: string, scr | |||
99 | db | 81 | db |
100 | } | 82 | } |
101 | 83 | ||
102 | return migrationScript.up(options) | 84 | await migrationScript.up(options) |
103 | .then(() => { | 85 | |
104 | // Update the new migration version | 86 | // Update the new migration version |
105 | return db.Application.updateMigrationVersion(versionScript, t) | 87 | await db.Application.updateMigrationVersion(versionScript, t) |
106 | }) | ||
107 | }) | 88 | }) |
108 | } | 89 | } |
diff --git a/server/lib/cache/videos-preview-cache.ts b/server/lib/cache/videos-preview-cache.ts index fecdca6ef..0fe4d2f78 100644 --- a/server/lib/cache/videos-preview-cache.ts +++ b/server/lib/cache/videos-preview-cache.ts | |||
@@ -1,7 +1,6 @@ | |||
1 | import * as asyncLRU from 'async-lru' | 1 | import * as asyncLRU from 'async-lru' |
2 | import { join } from 'path' | 2 | import { join } from 'path' |
3 | import { createWriteStream } from 'fs' | 3 | import { createWriteStream } from 'fs' |
4 | import * as Promise from 'bluebird' | ||
5 | 4 | ||
6 | import { database as db, CONFIG, CACHE } from '../../initializers' | 5 | import { database as db, CONFIG, CACHE } from '../../initializers' |
7 | import { logger, unlinkPromise } from '../../helpers' | 6 | import { logger, unlinkPromise } from '../../helpers' |
@@ -43,15 +42,15 @@ class VideosPreviewCache { | |||
43 | }) | 42 | }) |
44 | } | 43 | } |
45 | 44 | ||
46 | private loadPreviews (key: string) { | 45 | private async loadPreviews (key: string) { |
47 | return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) | 46 | const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) |
48 | .then(video => { | 47 | if (!video) return undefined |
49 | if (!video) return undefined | ||
50 | 48 | ||
51 | if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) | 49 | if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) |
52 | 50 | ||
53 | return this.saveRemotePreviewAndReturnPath(video) | 51 | const res = await this.saveRemotePreviewAndReturnPath(video) |
54 | }) | 52 | |
53 | return res | ||
55 | } | 54 | } |
56 | 55 | ||
57 | private saveRemotePreviewAndReturnPath (video: VideoInstance) { | 56 | private saveRemotePreviewAndReturnPath (video: VideoInstance) { |
diff --git a/server/lib/friends.ts b/server/lib/friends.ts index f035b099b..a33432dc1 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import * as request from 'request' | 1 | import * as request from 'request' |
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | import * as Promise from 'bluebird' | 3 | import * as Bluebird from 'bluebird' |
4 | import { join } from 'path' | 4 | import { join } from 'path' |
5 | 5 | ||
6 | import { database as db } from '../initializers/database' | 6 | import { database as db } from '../initializers/database' |
@@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize. | |||
188 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { | 188 | function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { |
189 | const tasks = [] | 189 | const tasks = [] |
190 | 190 | ||
191 | eventsParams.forEach(eventParams => { | 191 | for (const eventParams of eventsParams) { |
192 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) | 192 | tasks.push(addEventToRemoteVideo(eventParams, transaction)) |
193 | }) | 193 | } |
194 | 194 | ||
195 | return Promise.all(tasks) | 195 | return Promise.all(tasks) |
196 | } | 196 | } |
197 | 197 | ||
198 | function hasFriends () { | 198 | async function hasFriends () { |
199 | return db.Pod.countAll().then(count => count !== 0) | 199 | const count = await db.Pod.countAll() |
200 | |||
201 | return count !== 0 | ||
200 | } | 202 | } |
201 | 203 | ||
202 | function makeFriends (hosts: string[]) { | 204 | async function makeFriends (hosts: string[]) { |
203 | const podsScore = {} | 205 | const podsScore = {} |
204 | 206 | ||
205 | logger.info('Make friends!') | 207 | logger.info('Make friends!') |
206 | return getMyPublicCert() | 208 | const cert = await getMyPublicCert() |
207 | .then(cert => { | ||
208 | return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert) | ||
209 | }) | ||
210 | .then(cert => { | ||
211 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | ||
212 | const podsList = computeWinningPods(hosts, podsScore) | ||
213 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | ||
214 | 209 | ||
215 | return makeRequestsToWinningPods(cert, podsList) | 210 | for (const host of hosts) { |
216 | }) | 211 | await computeForeignPodsList(host, podsScore) |
212 | } | ||
213 | |||
214 | logger.debug('Pods scores computed.', { podsScore: podsScore }) | ||
215 | |||
216 | const podsList = computeWinningPods(hosts, podsScore) | ||
217 | logger.debug('Pods that we keep.', { podsToKeep: podsList }) | ||
218 | |||
219 | return makeRequestsToWinningPods(cert, podsList) | ||
217 | } | 220 | } |
218 | 221 | ||
219 | function quitFriends () { | 222 | async function quitFriends () { |
220 | // Stop pool requests | 223 | // Stop pool requests |
221 | requestScheduler.deactivate() | 224 | requestScheduler.deactivate() |
222 | 225 | ||
223 | return requestScheduler.flush() | 226 | try { |
224 | .then(() => { | 227 | await requestScheduler.flush() |
225 | return requestVideoQaduScheduler.flush() | 228 | |
226 | }) | 229 | await requestVideoQaduScheduler.flush() |
227 | .then(() => { | 230 | |
228 | return db.Pod.list() | 231 | const pods = await db.Pod.list() |
229 | }) | 232 | const requestParams = { |
230 | .then(pods => { | 233 | method: 'POST' as 'POST', |
231 | const requestParams = { | 234 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
232 | method: 'POST' as 'POST', | 235 | toPod: null |
233 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 236 | } |
234 | toPod: null | ||
235 | } | ||
236 | 237 | ||
237 | // Announce we quit them | 238 | // Announce we quit them |
238 | // We don't care if the request fails | 239 | // We don't care if the request fails |
239 | // The other pod will exclude us automatically after a while | 240 | // The other pod will exclude us automatically after a while |
240 | return Promise.map(pods, pod => { | 241 | try { |
242 | await Bluebird.map(pods, pod => { | ||
241 | requestParams.toPod = pod | 243 | requestParams.toPod = pod |
242 | 244 | ||
243 | return makeSecureRequest(requestParams) | 245 | return makeSecureRequest(requestParams) |
244 | }, { concurrency: REQUESTS_IN_PARALLEL }) | 246 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
245 | .then(() => pods) | 247 | } catch (err) { // Don't stop the process |
246 | .catch(err => { | 248 | logger.error('Some errors while quitting friends.', err) |
247 | logger.error('Some errors while quitting friends.', err) | 249 | } |
248 | // Don't stop the process | ||
249 | return pods | ||
250 | }) | ||
251 | }) | ||
252 | .then(pods => { | ||
253 | const tasks = [] | ||
254 | pods.forEach(pod => tasks.push(pod.destroy())) | ||
255 | 250 | ||
256 | return Promise.all(pods) | 251 | const tasks = [] |
257 | }) | 252 | for (const pod of pods) { |
258 | .then(() => { | 253 | tasks.push(pod.destroy()) |
259 | logger.info('Removed all remote videos.') | 254 | } |
260 | // Don't forget to re activate the scheduler, even if there was an error | 255 | await Promise.all(pods) |
261 | return requestScheduler.activate() | 256 | |
262 | }) | 257 | logger.info('Removed all remote videos.') |
263 | .finally(() => requestScheduler.activate()) | 258 | |
259 | requestScheduler.activate() | ||
260 | } catch (err) { | ||
261 | // Don't forget to re activate the scheduler, even if there was an error | ||
262 | requestScheduler.activate() | ||
263 | |||
264 | throw err | ||
265 | } | ||
264 | } | 266 | } |
265 | 267 | ||
266 | function sendOwnedDataToPod (podId: number) { | 268 | async function sendOwnedDataToPod (podId: number) { |
267 | // First send authors | 269 | // First send authors |
268 | return sendOwnedAuthorsToPod(podId) | 270 | await sendOwnedAuthorsToPod(podId) |
269 | .then(() => sendOwnedChannelsToPod(podId)) | 271 | await sendOwnedChannelsToPod(podId) |
270 | .then(() => sendOwnedVideosToPod(podId)) | 272 | await sendOwnedVideosToPod(podId) |
273 | } | ||
274 | |||
275 | async function sendOwnedChannelsToPod (podId: number) { | ||
276 | const videoChannels = await db.VideoChannel.listOwned() | ||
277 | |||
278 | const tasks: Promise<any>[] = [] | ||
279 | for (const videoChannel of videoChannels) { | ||
280 | const remoteVideoChannel = videoChannel.toAddRemoteJSON() | ||
281 | const options = { | ||
282 | type: 'add-channel' as 'add-channel', | ||
283 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
284 | data: remoteVideoChannel, | ||
285 | toIds: [ podId ], | ||
286 | transaction: null | ||
287 | } | ||
288 | |||
289 | const p = createRequest(options) | ||
290 | tasks.push(p) | ||
291 | } | ||
292 | |||
293 | await Promise.all(tasks) | ||
271 | } | 294 | } |
272 | 295 | ||
273 | function sendOwnedChannelsToPod (podId: number) { | 296 | async function sendOwnedAuthorsToPod (podId: number) { |
274 | return db.VideoChannel.listOwned() | 297 | const authors = await db.Author.listOwned() |
275 | .then(videoChannels => { | 298 | const tasks: Promise<any>[] = [] |
276 | const tasks = [] | ||
277 | videoChannels.forEach(videoChannel => { | ||
278 | const remoteVideoChannel = videoChannel.toAddRemoteJSON() | ||
279 | const options = { | ||
280 | type: 'add-channel' as 'add-channel', | ||
281 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
282 | data: remoteVideoChannel, | ||
283 | toIds: [ podId ], | ||
284 | transaction: null | ||
285 | } | ||
286 | 299 | ||
287 | const p = createRequest(options) | 300 | for (const author of authors) { |
288 | tasks.push(p) | 301 | const remoteAuthor = author.toAddRemoteJSON() |
289 | }) | 302 | const options = { |
303 | type: 'add-author' as 'add-author', | ||
304 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
305 | data: remoteAuthor, | ||
306 | toIds: [ podId ], | ||
307 | transaction: null | ||
308 | } | ||
290 | 309 | ||
291 | return Promise.all(tasks) | 310 | const p = createRequest(options) |
292 | }) | 311 | tasks.push(p) |
312 | } | ||
313 | |||
314 | await Promise.all(tasks) | ||
293 | } | 315 | } |
294 | 316 | ||
295 | function sendOwnedAuthorsToPod (podId: number) { | 317 | async function sendOwnedVideosToPod (podId: number) { |
296 | return db.Author.listOwned() | 318 | const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags() |
297 | .then(authors => { | 319 | const tasks: Bluebird<any>[] = [] |
298 | const tasks = [] | 320 | |
299 | authors.forEach(author => { | 321 | for (const video of videosList) { |
300 | const remoteAuthor = author.toAddRemoteJSON() | 322 | const promise = video.toAddRemoteJSON() |
323 | .then(remoteVideo => { | ||
301 | const options = { | 324 | const options = { |
302 | type: 'add-author' as 'add-author', | 325 | type: 'add-video' as 'add-video', |
303 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | 326 | endpoint: REQUEST_ENDPOINTS.VIDEOS, |
304 | data: remoteAuthor, | 327 | data: remoteVideo, |
305 | toIds: [ podId ], | 328 | toIds: [ podId ], |
306 | transaction: null | 329 | transaction: null |
307 | } | 330 | } |
308 | 331 | return createRequest(options) | |
309 | const p = createRequest(options) | ||
310 | tasks.push(p) | ||
311 | }) | 332 | }) |
312 | 333 | .catch(err => { | |
313 | return Promise.all(tasks) | 334 | logger.error('Cannot convert video to remote.', err) |
314 | }) | 335 | // Don't break the process |
315 | } | 336 | return undefined |
316 | |||
317 | function sendOwnedVideosToPod (podId: number) { | ||
318 | return db.Video.listOwnedAndPopulateAuthorAndTags() | ||
319 | .then(videosList => { | ||
320 | const tasks = [] | ||
321 | videosList.forEach(video => { | ||
322 | const promise = video.toAddRemoteJSON() | ||
323 | .then(remoteVideo => { | ||
324 | const options = { | ||
325 | type: 'add-video' as 'add-video', | ||
326 | endpoint: REQUEST_ENDPOINTS.VIDEOS, | ||
327 | data: remoteVideo, | ||
328 | toIds: [ podId ], | ||
329 | transaction: null | ||
330 | } | ||
331 | return createRequest(options) | ||
332 | }) | ||
333 | .catch(err => { | ||
334 | logger.error('Cannot convert video to remote.', err) | ||
335 | // Don't break the process | ||
336 | return undefined | ||
337 | }) | ||
338 | |||
339 | tasks.push(promise) | ||
340 | }) | 337 | }) |
341 | 338 | ||
342 | return Promise.all(tasks) | 339 | tasks.push(promise) |
343 | }) | 340 | } |
341 | |||
342 | await Promise.all(tasks) | ||
344 | } | 343 | } |
345 | 344 | ||
346 | function fetchRemotePreview (video: VideoInstance) { | 345 | function fetchRemotePreview (video: VideoInstance) { |
@@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) { | |||
350 | return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) | 349 | return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) |
351 | } | 350 | } |
352 | 351 | ||
353 | function removeFriend (pod: PodInstance) { | 352 | async function removeFriend (pod: PodInstance) { |
354 | const requestParams = { | 353 | const requestParams = { |
355 | method: 'POST' as 'POST', | 354 | method: 'POST' as 'POST', |
356 | path: '/api/' + API_VERSION + '/remote/pods/remove', | 355 | path: '/api/' + API_VERSION + '/remote/pods/remove', |
357 | toPod: pod | 356 | toPod: pod |
358 | } | 357 | } |
359 | 358 | ||
360 | return makeSecureRequest(requestParams) | 359 | try { |
361 | .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) | 360 | await makeSecureRequest(requestParams) |
362 | .then(() => pod.destroy()) | 361 | } catch (err) { |
363 | .then(() => logger.info('Removed friend %s.', pod.host)) | 362 | logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err) |
364 | .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) | 363 | } |
364 | |||
365 | try { | ||
366 | await pod.destroy() | ||
367 | |||
368 | logger.info('Removed friend %s.', pod.host) | ||
369 | } catch (err) { | ||
370 | logger.error('Cannot destroy friend %s.', pod.host, err) | ||
371 | } | ||
365 | } | 372 | } |
366 | 373 | ||
367 | function getRequestScheduler () { | 374 | function getRequestScheduler () { |
@@ -406,23 +413,21 @@ export { | |||
406 | 413 | ||
407 | // --------------------------------------------------------------------------- | 414 | // --------------------------------------------------------------------------- |
408 | 415 | ||
409 | function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { | 416 | async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { |
410 | // TODO: type res | 417 | const result = await getForeignPodsList(host) |
411 | return getForeignPodsList(host).then(res => { | 418 | const foreignPodsList: { host: string }[] = result.data |
412 | const foreignPodsList: { host: string }[] = res.data | ||
413 | 419 | ||
414 | // Let's give 1 point to the pod we ask the friends list | 420 | // Let's give 1 point to the pod we ask the friends list |
415 | foreignPodsList.push({ host }) | 421 | foreignPodsList.push({ host }) |
416 | 422 | ||
417 | foreignPodsList.forEach(foreignPod => { | 423 | for (const foreignPod of foreignPodsList) { |
418 | const foreignPodHost = foreignPod.host | 424 | const foreignPodHost = foreignPod.host |
419 | 425 | ||
420 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ | 426 | if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ |
421 | else podsScore[foreignPodHost] = 1 | 427 | else podsScore[foreignPodHost] = 1 |
422 | }) | 428 | } |
423 | 429 | ||
424 | return undefined | 430 | return undefined |
425 | }) | ||
426 | } | 431 | } |
427 | 432 | ||
428 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { | 433 | function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { |
@@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num | |||
431 | const podsList = [] | 436 | const podsList = [] |
432 | const baseScore = hosts.length / 2 | 437 | const baseScore = hosts.length / 2 |
433 | 438 | ||
434 | Object.keys(podsScore).forEach(podHost => { | 439 | for (const podHost of Object.keys(podsScore)) { |
435 | // If the pod is not me and with a good score we add it | 440 | // If the pod is not me and with a good score we add it |
436 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { | 441 | if (isMe(podHost) === false && podsScore[podHost] > baseScore) { |
437 | podsList.push({ host: podHost }) | 442 | podsList.push({ host: podHost }) |
438 | } | 443 | } |
439 | }) | 444 | } |
440 | 445 | ||
441 | return podsList | 446 | return podsList |
442 | } | 447 | } |
@@ -449,7 +454,7 @@ function getForeignPodsList (host: string) { | |||
449 | if (err) return rej(err) | 454 | if (err) return rej(err) |
450 | 455 | ||
451 | try { | 456 | try { |
452 | const json = JSON.parse(body) | 457 | const json: ResultList<FormattedPod> = JSON.parse(body) |
453 | return res(json) | 458 | return res(json) |
454 | } catch (err) { | 459 | } catch (err) { |
455 | return rej(err) | 460 | return rej(err) |
@@ -458,53 +463,53 @@ function getForeignPodsList (host: string) { | |||
458 | }) | 463 | }) |
459 | } | 464 | } |
460 | 465 | ||
461 | function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { | 466 | async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { |
462 | // Stop pool requests | 467 | // Stop pool requests |
463 | requestScheduler.deactivate() | 468 | requestScheduler.deactivate() |
464 | // Flush pool requests | 469 | // Flush pool requests |
465 | requestScheduler.forceSend() | 470 | requestScheduler.forceSend() |
466 | 471 | ||
467 | return Promise.map(podsList, pod => { | 472 | try { |
468 | const params = { | 473 | await Bluebird.map(podsList, async pod => { |
469 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', | 474 | const params = { |
470 | method: 'POST' as 'POST', | 475 | url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', |
471 | json: { | 476 | method: 'POST' as 'POST', |
472 | host: CONFIG.WEBSERVER.HOST, | 477 | json: { |
473 | email: CONFIG.ADMIN.EMAIL, | 478 | host: CONFIG.WEBSERVER.HOST, |
474 | publicKey: cert | 479 | email: CONFIG.ADMIN.EMAIL, |
480 | publicKey: cert | ||
481 | } | ||
475 | } | 482 | } |
476 | } | ||
477 | 483 | ||
478 | return makeRetryRequest(params) | 484 | const { response, body } = await makeRetryRequest(params) |
479 | .then(({ response, body }) => { | 485 | const typedBody = body as { cert: string, email: string } |
480 | body = body as { cert: string, email: string } | 486 | |
481 | 487 | if (response.statusCode === 200) { | |
482 | if (response.statusCode === 200) { | 488 | const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email }) |
483 | const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) | 489 | |
484 | return podObj.save() | 490 | let podCreated: PodInstance |
485 | .then(podCreated => { | 491 | try { |
486 | 492 | podCreated = await podObj.save() | |
487 | // Add our videos to the request scheduler | 493 | } catch (err) { |
488 | sendOwnedDataToPod(podCreated.id) | 494 | logger.error('Cannot add friend %s pod.', pod.host, err) |
489 | }) | ||
490 | .catch(err => { | ||
491 | logger.error('Cannot add friend %s pod.', pod.host, err) | ||
492 | }) | ||
493 | } else { | ||
494 | logger.error('Status not 200 for %s pod.', pod.host) | ||
495 | } | 495 | } |
496 | }) | 496 | |
497 | .catch(err => { | 497 | // Add our videos to the request scheduler |
498 | logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) | 498 | sendOwnedDataToPod(podCreated.id) |
499 | // Don't break the process | 499 | .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err)) |
500 | }) | 500 | } else { |
501 | }, { concurrency: REQUESTS_IN_PARALLEL }) | 501 | logger.error('Status not 200 for %s pod.', pod.host) |
502 | .then(() => logger.debug('makeRequestsToWinningPods finished.')) | 502 | } |
503 | .finally(() => { | 503 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
504 | |||
505 | logger.debug('makeRequestsToWinningPods finished.') | ||
506 | |||
507 | requestScheduler.activate() | ||
508 | } catch (err) { | ||
504 | // Final callback, we've ended all the requests | 509 | // Final callback, we've ended all the requests |
505 | // Now we made new friends, we can re activate the pool of requests | 510 | // Now we made new friends, we can re activate the pool of requests |
506 | requestScheduler.activate() | 511 | requestScheduler.activate() |
507 | }) | 512 | } |
508 | } | 513 | } |
509 | 514 | ||
510 | // Wrapper that populate "toIds" argument with all our friends if it is not specified | 515 | // Wrapper that populate "toIds" argument with all our friends if it is not specified |
@@ -515,14 +520,19 @@ type CreateRequestOptions = { | |||
515 | toIds?: number[] | 520 | toIds?: number[] |
516 | transaction: Sequelize.Transaction | 521 | transaction: Sequelize.Transaction |
517 | } | 522 | } |
518 | function createRequest (options: CreateRequestOptions) { | 523 | async function createRequest (options: CreateRequestOptions) { |
519 | if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) | 524 | if (options.toIds !== undefined) { |
525 | await requestScheduler.createRequest(options as RequestSchedulerOptions) | ||
526 | return undefined | ||
527 | } | ||
520 | 528 | ||
521 | // If the "toIds" pods is not specified, we send the request to all our friends | 529 | // If the "toIds" pods is not specified, we send the request to all our friends |
522 | return db.Pod.listAllIds(options.transaction).then(podIds => { | 530 | const podIds = await db.Pod.listAllIds(options.transaction) |
523 | const newOptions = Object.assign(options, { toIds: podIds }) | 531 | |
524 | return requestScheduler.createRequest(newOptions) | 532 | const newOptions = Object.assign(options, { toIds: podIds }) |
525 | }) | 533 | await requestScheduler.createRequest(newOptions) |
534 | |||
535 | return undefined | ||
526 | } | 536 | } |
527 | 537 | ||
528 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { | 538 | function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { |
diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/handlers/video-file-optimizer.ts index 63a51064c..799ba8b01 100644 --- a/server/lib/jobs/handlers/video-file-optimizer.ts +++ b/server/lib/jobs/handlers/video-file-optimizer.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as Promise from 'bluebird' | 1 | import * as Bluebird from 'bluebird' |
2 | 2 | ||
3 | import { database as db } from '../../../initializers/database' | 3 | import { database as db } from '../../../initializers/database' |
4 | import { logger, computeResolutionsToTranscode } from '../../../helpers' | 4 | import { logger, computeResolutionsToTranscode } from '../../../helpers' |
@@ -6,16 +6,17 @@ import { VideoInstance } from '../../../models' | |||
6 | import { addVideoToFriends } from '../../friends' | 6 | import { addVideoToFriends } from '../../friends' |
7 | import { JobScheduler } from '../job-scheduler' | 7 | import { JobScheduler } from '../job-scheduler' |
8 | 8 | ||
9 | function process (data: { videoUUID: string }, jobId: number) { | 9 | async function process (data: { videoUUID: string }, jobId: number) { |
10 | return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { | 10 | const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) |
11 | // No video, maybe deleted? | 11 | // No video, maybe deleted? |
12 | if (!video) { | 12 | if (!video) { |
13 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 13 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) |
14 | return undefined | 14 | return undefined |
15 | } | 15 | } |
16 | |||
17 | await video.optimizeOriginalVideofile() | ||
16 | 18 | ||
17 | return video.optimizeOriginalVideofile().then(() => video) | 19 | return video |
18 | }) | ||
19 | } | 20 | } |
20 | 21 | ||
21 | function onError (err: Error, jobId: number) { | 22 | function onError (err: Error, jobId: number) { |
@@ -23,33 +24,31 @@ function onError (err: Error, jobId: number) { | |||
23 | return Promise.resolve() | 24 | return Promise.resolve() |
24 | } | 25 | } |
25 | 26 | ||
26 | function onSuccess (jobId: number, video: VideoInstance) { | 27 | async function onSuccess (jobId: number, video: VideoInstance) { |
27 | if (video === undefined) return undefined | 28 | if (video === undefined) return undefined |
28 | 29 | ||
29 | logger.info('Job %d is a success.', jobId) | 30 | logger.info('Job %d is a success.', jobId) |
30 | 31 | ||
31 | video.toAddRemoteJSON() | 32 | const remoteVideo = await video.toAddRemoteJSON() |
32 | .then(remoteVideo => { | 33 | |
33 | // Now we'll add the video's meta data to our friends | 34 | // Now we'll add the video's meta data to our friends |
34 | return addVideoToFriends(remoteVideo, null) | 35 | await addVideoToFriends(remoteVideo, null) |
35 | }) | 36 | |
36 | .then(() => { | 37 | const originalFileHeight = await video.getOriginalFileHeight() |
37 | return video.getOriginalFileHeight() | 38 | // Create transcoding jobs if there are enabled resolutions |
38 | }) | 39 | |
39 | .then(originalFileHeight => { | 40 | const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) |
40 | // Create transcoding jobs if there are enabled resolutions | 41 | logger.info( |
41 | const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) | 42 | 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, |
42 | logger.info( | 43 | { resolutions: resolutionsEnabled } |
43 | 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, | 44 | ) |
44 | { resolutions: resolutionsEnabled } | 45 | |
45 | ) | 46 | if (resolutionsEnabled.length !== 0) { |
46 | 47 | try { | |
47 | if (resolutionsEnabled.length === 0) return undefined | 48 | await db.sequelize.transaction(async t => { |
48 | 49 | const tasks: Bluebird<any>[] = [] | |
49 | return db.sequelize.transaction(t => { | 50 | |
50 | const tasks: Promise<any>[] = [] | 51 | for (const resolution of resolutionsEnabled) { |
51 | |||
52 | resolutionsEnabled.forEach(resolution => { | ||
53 | const dataInput = { | 52 | const dataInput = { |
54 | videoUUID: video.uuid, | 53 | videoUUID: video.uuid, |
55 | resolution | 54 | resolution |
@@ -57,24 +56,19 @@ function onSuccess (jobId: number, video: VideoInstance) { | |||
57 | 56 | ||
58 | const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) | 57 | const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) |
59 | tasks.push(p) | 58 | tasks.push(p) |
60 | }) | 59 | } |
61 | 60 | ||
62 | return Promise.all(tasks).then(() => resolutionsEnabled) | 61 | await Promise.all(tasks) |
63 | }) | 62 | }) |
64 | }) | ||
65 | .then(resolutionsEnabled => { | ||
66 | if (resolutionsEnabled === undefined) { | ||
67 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') | ||
68 | return undefined | ||
69 | } | ||
70 | 63 | ||
71 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) | 64 | logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) |
72 | }) | 65 | } catch (err) { |
73 | .catch((err: Error) => { | 66 | logger.warn('Cannot transcode the video.', err) |
74 | logger.debug('Cannot transcode the video.', err) | 67 | } |
75 | throw err | 68 | } else { |
76 | }) | 69 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') |
77 | 70 | return undefined | |
71 | } | ||
78 | } | 72 | } |
79 | 73 | ||
80 | // --------------------------------------------------------------------------- | 74 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/handlers/video-file-transcoder.ts index 0dafee566..b240ff58a 100644 --- a/server/lib/jobs/handlers/video-file-transcoder.ts +++ b/server/lib/jobs/handlers/video-file-transcoder.ts | |||
@@ -4,16 +4,17 @@ import { logger } from '../../../helpers' | |||
4 | import { VideoInstance } from '../../../models' | 4 | import { VideoInstance } from '../../../models' |
5 | import { VideoResolution } from '../../../../shared' | 5 | import { VideoResolution } from '../../../../shared' |
6 | 6 | ||
7 | function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { | 7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { |
8 | return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { | 8 | const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) |
9 | // No video, maybe deleted? | 9 | // No video, maybe deleted? |
10 | if (!video) { | 10 | if (!video) { |
11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) |
12 | return undefined | 12 | return undefined |
13 | } | 13 | } |
14 | 14 | ||
15 | return video.transcodeOriginalVideofile(data.resolution).then(() => video) | 15 | await video.transcodeOriginalVideofile(data.resolution) |
16 | }) | 16 | |
17 | return video | ||
17 | } | 18 | } |
18 | 19 | ||
19 | function onError (err: Error, jobId: number) { | 20 | function onError (err: Error, jobId: number) { |
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index c2409d20c..61d483268 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -23,7 +23,7 @@ class JobScheduler { | |||
23 | return this.instance || (this.instance = new this()) | 23 | return this.instance || (this.instance = new this()) |
24 | } | 24 | } |
25 | 25 | ||
26 | activate () { | 26 | async activate () { |
27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE | 27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE |
28 | 28 | ||
29 | logger.info('Jobs scheduler activated.') | 29 | logger.info('Jobs scheduler activated.') |
@@ -32,32 +32,36 @@ class JobScheduler { | |||
32 | 32 | ||
33 | // Finish processing jobs from a previous start | 33 | // Finish processing jobs from a previous start |
34 | const state = JOB_STATES.PROCESSING | 34 | const state = JOB_STATES.PROCESSING |
35 | db.Job.listWithLimit(limit, state) | 35 | try { |
36 | .then(jobs => { | 36 | const jobs = await db.Job.listWithLimit(limit, state) |
37 | this.enqueueJobs(jobsQueue, jobs) | 37 | |
38 | 38 | this.enqueueJobs(jobsQueue, jobs) | |
39 | forever( | 39 | } catch (err) { |
40 | next => { | 40 | logger.error('Cannot list pending jobs.', err) |
41 | if (jobsQueue.length() !== 0) { | 41 | } |
42 | // Finish processing the queue first | 42 | |
43 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | 43 | forever( |
44 | } | 44 | async next => { |
45 | 45 | if (jobsQueue.length() !== 0) { | |
46 | const state = JOB_STATES.PENDING | 46 | // Finish processing the queue first |
47 | db.Job.listWithLimit(limit, state) | 47 | return setTimeout(next, JOBS_FETCHING_INTERVAL) |
48 | .then(jobs => { | 48 | } |
49 | this.enqueueJobs(jobsQueue, jobs) | 49 | |
50 | 50 | const state = JOB_STATES.PENDING | |
51 | // Optimization: we could use "drain" from queue object | 51 | try { |
52 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | 52 | const jobs = await db.Job.listWithLimit(limit, state) |
53 | }) | 53 | |
54 | .catch(err => logger.error('Cannot list pending jobs.', err)) | 54 | this.enqueueJobs(jobsQueue, jobs) |
55 | }, | 55 | } catch (err) { |
56 | 56 | logger.error('Cannot list pending jobs.', err) | |
57 | err => logger.error('Error in job scheduler queue.', err) | 57 | } |
58 | ) | 58 | |
59 | }) | 59 | // Optimization: we could use "drain" from queue object |
60 | .catch(err => logger.error('Cannot list pending jobs.', err)) | 60 | return setTimeout(next, JOBS_FETCHING_INTERVAL) |
61 | }, | ||
62 | |||
63 | err => logger.error('Error in job scheduler queue.', err) | ||
64 | ) | ||
61 | } | 65 | } |
62 | 66 | ||
63 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { | 67 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { |
@@ -75,7 +79,7 @@ class JobScheduler { | |||
75 | jobs.forEach(job => jobsQueue.push(job)) | 79 | jobs.forEach(job => jobsQueue.push(job)) |
76 | } | 80 | } |
77 | 81 | ||
78 | private processJob (job: JobInstance, callback: (err: Error) => void) { | 82 | private async processJob (job: JobInstance, callback: (err: Error) => void) { |
79 | const jobHandler = jobHandlers[job.handlerName] | 83 | const jobHandler = jobHandlers[job.handlerName] |
80 | if (jobHandler === undefined) { | 84 | if (jobHandler === undefined) { |
81 | logger.error('Unknown job handler for job %s.', job.handlerName) | 85 | logger.error('Unknown job handler for job %s.', job.handlerName) |
@@ -85,41 +89,45 @@ class JobScheduler { | |||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | 89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) |
86 | 90 | ||
87 | job.state = JOB_STATES.PROCESSING | 91 | job.state = JOB_STATES.PROCESSING |
88 | return job.save() | 92 | await job.save() |
89 | .then(() => { | 93 | |
90 | return jobHandler.process(job.handlerInputData, job.id) | 94 | try { |
91 | }) | 95 | const result = await jobHandler.process(job.handlerInputData, job.id) |
92 | .then( | 96 | await this.onJobSuccess(jobHandler, job, result) |
93 | result => { | 97 | } catch (err) { |
94 | return this.onJobSuccess(jobHandler, job, result) | 98 | logger.error('Error in job handler %s.', job.handlerName, err) |
95 | }, | 99 | |
96 | 100 | try { | |
97 | err => { | 101 | await this.onJobError(jobHandler, job, err) |
98 | logger.error('Error in job handler %s.', job.handlerName, err) | 102 | } catch (innerErr) { |
99 | return this.onJobError(jobHandler, job, err) | 103 | this.cannotSaveJobError(innerErr) |
100 | } | 104 | return callback(innerErr) |
101 | ) | 105 | } |
102 | .then(() => callback(null)) | 106 | } |
103 | .catch(err => { | 107 | |
104 | this.cannotSaveJobError(err) | 108 | callback(null) |
105 | return callback(err) | ||
106 | }) | ||
107 | } | 109 | } |
108 | 110 | ||
109 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { | 111 | private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { |
110 | job.state = JOB_STATES.ERROR | 112 | job.state = JOB_STATES.ERROR |
111 | 113 | ||
112 | return job.save() | 114 | try { |
113 | .then(() => jobHandler.onError(err, job.id)) | 115 | await job.save() |
114 | .catch(err => this.cannotSaveJobError(err)) | 116 | await jobHandler.onError(err, job.id) |
117 | } catch (err) { | ||
118 | this.cannotSaveJobError(err) | ||
119 | } | ||
115 | } | 120 | } |
116 | 121 | ||
117 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { | 122 | private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { |
118 | job.state = JOB_STATES.SUCCESS | 123 | job.state = JOB_STATES.SUCCESS |
119 | 124 | ||
120 | return job.save() | 125 | try { |
121 | .then(() => jobHandler.onSuccess(job.id, jobResult)) | 126 | await job.save() |
122 | .catch(err => this.cannotSaveJobError(err)) | 127 | jobHandler.onSuccess(job.id, jobResult) |
128 | } catch (err) { | ||
129 | this.cannotSaveJobError(err) | ||
130 | } | ||
123 | } | 131 | } |
124 | 132 | ||
125 | private cannotSaveJobError (err: Error) { | 133 | private cannotSaveJobError (err: Error) { |
diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts index 9aa3ea52f..d91b00c55 100644 --- a/server/lib/oauth-model.ts +++ b/server/lib/oauth-model.ts | |||
@@ -24,39 +24,36 @@ function getRefreshToken (refreshToken: string) { | |||
24 | return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) | 24 | return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) |
25 | } | 25 | } |
26 | 26 | ||
27 | function getUser (username: string, password: string) { | 27 | async function getUser (username: string, password: string) { |
28 | logger.debug('Getting User (username: ' + username + ', password: ******).') | 28 | logger.debug('Getting User (username: ' + username + ', password: ******).') |
29 | 29 | ||
30 | return db.User.getByUsername(username).then(user => { | 30 | const user = await db.User.getByUsername(username) |
31 | if (!user) return null | 31 | if (!user) return null |
32 | 32 | ||
33 | return user.isPasswordMatch(password).then(passwordMatch => { | 33 | const passwordMatch = await user.isPasswordMatch(password) |
34 | if (passwordMatch === false) return null | 34 | if (passwordMatch === false) return null |
35 | 35 | ||
36 | return user | 36 | return user |
37 | }) | ||
38 | }) | ||
39 | } | 37 | } |
40 | 38 | ||
41 | function revokeToken (token: TokenInfo) { | 39 | async function revokeToken (tokenInfo: TokenInfo) { |
42 | return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => { | 40 | const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken) |
43 | if (tokenDB) tokenDB.destroy() | 41 | if (token) token.destroy() |
44 | 42 | ||
45 | /* | 43 | /* |
46 | * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js | 44 | * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js |
47 | * "As per the discussion we need set older date | 45 | * "As per the discussion we need set older date |
48 | * revokeToken will expected return a boolean in future version | 46 | * revokeToken will expected return a boolean in future version |
49 | * https://github.com/oauthjs/node-oauth2-server/pull/274 | 47 | * https://github.com/oauthjs/node-oauth2-server/pull/274 |
50 | * https://github.com/oauthjs/node-oauth2-server/issues/290" | 48 | * https://github.com/oauthjs/node-oauth2-server/issues/290" |
51 | */ | 49 | */ |
52 | const expiredToken = tokenDB | 50 | const expiredToken = token |
53 | expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') | 51 | expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') |
54 | 52 | ||
55 | return expiredToken | 53 | return expiredToken |
56 | }) | ||
57 | } | 54 | } |
58 | 55 | ||
59 | function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { | 56 | async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { |
60 | logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') | 57 | logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') |
61 | 58 | ||
62 | const tokenToCreate = { | 59 | const tokenToCreate = { |
@@ -68,11 +65,10 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns | |||
68 | userId: user.id | 65 | userId: user.id |
69 | } | 66 | } |
70 | 67 | ||
71 | return db.OAuthToken.create(tokenToCreate).then(tokenCreated => { | 68 | const tokenCreated = await db.OAuthToken.create(tokenToCreate) |
72 | const tokenToReturn = Object.assign(tokenCreated, { client, user }) | 69 | const tokenToReturn = Object.assign(tokenCreated, { client, user }) |
73 | 70 | ||
74 | return tokenToReturn | 71 | return tokenToReturn |
75 | }) | ||
76 | } | 72 | } |
77 | 73 | ||
78 | // --------------------------------------------------------------------------- | 74 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index ce4e2ffd2..f46682824 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import { isEmpty } from 'lodash' | 1 | import { isEmpty } from 'lodash' |
2 | import * as Promise from 'bluebird' | 2 | import * as Bluebird from 'bluebird' |
3 | 3 | ||
4 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
5 | import { logger, makeSecureRequest } from '../../helpers' | 5 | import { logger, makeSecureRequest } from '../../helpers' |
@@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler <T> { | |||
76 | // --------------------------------------------------------------------------- | 76 | // --------------------------------------------------------------------------- |
77 | 77 | ||
78 | // Make a requests to friends of a certain type | 78 | // Make a requests to friends of a certain type |
79 | protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { | 79 | protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { |
80 | const params = { | 80 | const params = { |
81 | toPod: toPod, | 81 | toPod: toPod, |
82 | method: 'POST' as 'POST', | 82 | method: 'POST' as 'POST', |
@@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler <T> { | |||
86 | 86 | ||
87 | // Make multiple retry requests to all of pods | 87 | // Make multiple retry requests to all of pods |
88 | // The function fire some useful callbacks | 88 | // The function fire some useful callbacks |
89 | return makeSecureRequest(params) | 89 | try { |
90 | .then(({ response, body }) => { | 90 | const { response } = await makeSecureRequest(params) |
91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { | 91 | if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { |
92 | throw new Error('Status code not 20x : ' + response.statusCode) | 92 | throw new Error('Status code not 20x : ' + response.statusCode) |
93 | } | 93 | } |
94 | }) | 94 | } catch (err) { |
95 | .catch(err => { | 95 | logger.error('Error sending secure request to %s pod.', toPod.host, err) |
96 | logger.error('Error sending secure request to %s pod.', toPod.host, err) | 96 | |
97 | 97 | throw err | |
98 | throw err | 98 | } |
99 | }) | ||
100 | } | 99 | } |
101 | 100 | ||
102 | // Make all the requests of the scheduler | 101 | // Make all the requests of the scheduler |
103 | protected makeRequests () { | 102 | protected async makeRequests () { |
104 | return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) | 103 | let requestsGrouped: T |
105 | .then((requestsGrouped: T) => { | 104 | |
106 | // We want to group requests by destinations pod and endpoint | 105 | try { |
107 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | 106 | requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) |
108 | 107 | } catch (err) { | |
109 | // If there are no requests, abort | 108 | logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }) |
110 | if (isEmpty(requestsToMake) === true) { | 109 | throw err |
111 | logger.info('No "%s" to make.', this.description) | 110 | } |
112 | return { goodPods: [], badPods: [] } | 111 | |
113 | } | 112 | // We want to group requests by destinations pod and endpoint |
114 | 113 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | |
115 | logger.info('Making "%s" to friends.', this.description) | 114 | |
116 | 115 | // If there are no requests, abort | |
117 | const goodPods: number[] = [] | 116 | if (isEmpty(requestsToMake) === true) { |
118 | const badPods: number[] = [] | 117 | logger.info('No "%s" to make.', this.description) |
119 | 118 | return { goodPods: [], badPods: [] } | |
120 | return Promise.map(Object.keys(requestsToMake), hashKey => { | 119 | } |
121 | const requestToMake = requestsToMake[hashKey] | 120 | |
122 | const toPod: PodInstance = requestToMake.toPod | 121 | logger.info('Making "%s" to friends.', this.description) |
123 | 122 | ||
124 | return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) | 123 | const goodPods: number[] = [] |
125 | .then(() => { | 124 | const badPods: number[] = [] |
126 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | 125 | |
127 | goodPods.push(requestToMake.toPod.id) | 126 | await Bluebird.map(Object.keys(requestsToMake), async hashKey => { |
128 | 127 | const requestToMake = requestsToMake[hashKey] | |
129 | this.afterRequestHook() | 128 | const toPod: PodInstance = requestToMake.toPod |
130 | 129 | ||
131 | // Remove the pod id of these request ids | 130 | try { |
132 | return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) | 131 | await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) |
133 | }) | 132 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) |
134 | .catch(err => { | 133 | goodPods.push(requestToMake.toPod.id) |
135 | badPods.push(requestToMake.toPod.id) | 134 | |
136 | logger.info('Cannot make request to %s.', toPod.host, err) | 135 | this.afterRequestHook() |
137 | }) | 136 | |
138 | }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) | 137 | // Remove the pod id of these request ids |
139 | }) | 138 | await this.getRequestToPodModel() |
140 | .then(({ goodPods, badPods }) => { | 139 | .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) |
141 | this.afterRequestsHook() | 140 | } catch (err) { |
142 | 141 | badPods.push(requestToMake.toPod.id) | |
143 | // All the requests were made, we update the pods score | 142 | logger.info('Cannot make request to %s.', toPod.host, err) |
144 | return db.Pod.updatePodsScore(goodPods, badPods) | 143 | } |
145 | }) | 144 | }, { concurrency: REQUESTS_IN_PARALLEL }) |
146 | .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) | 145 | |
146 | this.afterRequestsHook() | ||
147 | |||
148 | // All the requests were made, we update the pods score | ||
149 | await db.Pod.updatePodsScore(goodPods, badPods) | ||
147 | } | 150 | } |
148 | 151 | ||
149 | protected afterRequestHook () { | 152 | protected afterRequestHook () { |
150 | // Nothing to do, let children reimplement it | 153 | // Nothing to do, let children re-implement it |
151 | } | 154 | } |
152 | 155 | ||
153 | protected afterRequestsHook () { | 156 | protected afterRequestsHook () { |
154 | // Nothing to do, let children reimplement it | 157 | // Nothing to do, let children re-implement it |
155 | } | 158 | } |
156 | } | 159 | } |
157 | 160 | ||
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 696875dcf..c3f7f6429 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts | |||
@@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { | 37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { |
38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} | 38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} |
39 | 39 | ||
40 | Object.keys(requestsGrouped).forEach(toPodId => { | 40 | for (const toPodId of Object.keys(requestsGrouped)) { |
41 | requestsGrouped[toPodId].forEach(data => { | 41 | for (const data of requestsGrouped[toPodId]) { |
42 | const request = data.request | 42 | const request = data.request |
43 | const pod = data.pod | 43 | const pod = data.pod |
44 | const hashKey = toPodId + request.endpoint | 44 | const hashKey = toPodId + request.endpoint |
@@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
54 | 54 | ||
55 | requestsToMakeGrouped[hashKey].ids.push(request.id) | 55 | requestsToMakeGrouped[hashKey].ids.push(request.id) |
56 | requestsToMakeGrouped[hashKey].datas.push(request.request) | 56 | requestsToMakeGrouped[hashKey].datas.push(request.request) |
57 | }) | 57 | } |
58 | }) | 58 | } |
59 | 59 | ||
60 | return requestsToMakeGrouped | 60 | return requestsToMakeGrouped |
61 | } | 61 | } |
62 | 62 | ||
63 | createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { | 63 | async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { |
64 | // If there are no destination pods abort | 64 | // If there are no destination pods abort |
65 | if (toIds.length === 0) return undefined | 65 | if (toIds.length === 0) return undefined |
66 | 66 | ||
@@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | |||
76 | transaction | 76 | transaction |
77 | } | 77 | } |
78 | 78 | ||
79 | return db.Request.create(createQuery, dbRequestOptions) | 79 | const request = await db.Request.create(createQuery, dbRequestOptions) |
80 | .then(request => { | 80 | await request.setPods(toIds, dbRequestOptions) |
81 | return request.setPods(toIds, dbRequestOptions) | ||
82 | }) | ||
83 | } | 81 | } |
84 | 82 | ||
85 | // --------------------------------------------------------------------------- | 83 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 680232732..5f21287f0 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts | |||
@@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
59 | 59 | ||
60 | // We group video events per video and per pod | 60 | // We group video events per video and per pod |
61 | // We add the counts of the same event types | 61 | // We add the counts of the same event types |
62 | Object.keys(eventRequests).forEach(toPodId => { | 62 | for (const toPodId of Object.keys(eventRequests)) { |
63 | eventRequests[toPodId].forEach(eventToProcess => { | 63 | for (const eventToProcess of eventRequests[toPodId]) { |
64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | 64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} |
65 | 65 | ||
66 | if (!requestsToMakeGrouped[toPodId]) { | 66 | if (!requestsToMakeGrouped[toPodId]) { |
@@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 | 81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 |
82 | 82 | ||
83 | events[eventToProcess.type] += eventToProcess.count | 83 | events[eventToProcess.type] += eventToProcess.count |
84 | }) | 84 | } |
85 | }) | 85 | } |
86 | 86 | ||
87 | // Now we build our requests array per pod | 87 | // Now we build our requests array per pod |
88 | Object.keys(eventsPerVideoPerPod).forEach(toPodId => { | 88 | for (const toPodId of Object.keys(eventsPerVideoPerPod)) { |
89 | const eventsForPod = eventsPerVideoPerPod[toPodId] | 89 | const eventsForPod = eventsPerVideoPerPod[toPodId] |
90 | 90 | ||
91 | Object.keys(eventsForPod).forEach(uuid => { | 91 | for (const uuid of Object.keys(eventsForPod)) { |
92 | const eventsForVideo = eventsForPod[uuid] | 92 | const eventsForVideo = eventsForPod[uuid] |
93 | 93 | ||
94 | Object.keys(eventsForVideo).forEach(eventType => { | 94 | for (const eventType of Object.keys(eventsForVideo)) { |
95 | requestsToMakeGrouped[toPodId].datas.push({ | 95 | requestsToMakeGrouped[toPodId].datas.push({ |
96 | data: { | 96 | data: { |
97 | uuid, | 97 | uuid, |
@@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoE | |||
99 | count: +eventsForVideo[eventType] | 99 | count: +eventsForVideo[eventType] |
100 | } | 100 | } |
101 | }) | 101 | }) |
102 | }) | 102 | } |
103 | }) | 103 | } |
104 | }) | 104 | } |
105 | 105 | ||
106 | return requestsToMakeGrouped | 106 | return requestsToMakeGrouped |
107 | } | 107 | } |
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts index d7e1d5e31..a54efc111 100644 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ b/server/lib/request/request-video-qadu-scheduler.ts | |||
@@ -59,8 +59,8 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa | |||
59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { | 59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { |
60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} | 60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} |
61 | 61 | ||
62 | Object.keys(requests).forEach(toPodId => { | 62 | for (const toPodId of Object.keys(requests)) { |
63 | requests[toPodId].forEach(data => { | 63 | for (const data of requests[toPodId]) { |
64 | const request = data.request | 64 | const request = data.request |
65 | const video = data.video | 65 | const video = data.video |
66 | const pod = data.pod | 66 | const pod = data.pod |
@@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQa | |||
105 | // Maybe there are multiple quick and dirty update for the same video | 105 | // Maybe there are multiple quick and dirty update for the same video |
106 | // We use this hash map to dedupe them | 106 | // We use this hash map to dedupe them |
107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData | 107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData |
108 | }) | 108 | } |
109 | }) | 109 | } |
110 | 110 | ||
111 | // Now we deduped similar quick and dirty updates, we can build our requests data | 111 | // Now we deduped similar quick and dirty updates, we can build our requests data |
112 | Object.keys(requestsToMakeGrouped).forEach(hashKey => { | 112 | for (const hashKey of Object.keys(requestsToMakeGrouped)) { |
113 | Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { | 113 | for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) { |
114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] | 114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] |
115 | 115 | ||
116 | requestsToMakeGrouped[hashKey].datas.push({ | 116 | requestsToMakeGrouped[hashKey].datas.push({ |
117 | data: videoData | 117 | data: videoData |
118 | }) | 118 | }) |
119 | }) | 119 | } |
120 | 120 | ||
121 | // We don't need it anymore, it was just to build our data array | 121 | // We don't need it anymore, it was just to build our data array |
122 | delete requestsToMakeGrouped[hashKey].videos | 122 | delete requestsToMakeGrouped[hashKey].videos |
123 | }) | 123 | } |
124 | 124 | ||
125 | return requestsToMakeGrouped | 125 | return requestsToMakeGrouped |
126 | } | 126 | } |
127 | 127 | ||
128 | createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { | 128 | async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { |
129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} | 129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} |
130 | if (transaction) dbRequestOptions.transaction = transaction | 130 | if (transaction) dbRequestOptions.transaction = transaction |
131 | 131 | ||
132 | // Send the update to all our friends | 132 | // Send the update to all our friends |
133 | return db.Pod.listAllIds(transaction).then(podIds => { | 133 | const podIds = await db.Pod.listAllIds(transaction) |
134 | const queries = [] | 134 | const queries = [] |
135 | podIds.forEach(podId => { | 135 | for (const podId of podIds) { |
136 | queries.push({ type, videoId, podId }) | 136 | queries.push({ type, videoId, podId }) |
137 | }) | 137 | } |
138 | 138 | ||
139 | return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) | 139 | await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) |
140 | }) | 140 | return undefined |
141 | } | 141 | } |
142 | } | 142 | } |
143 | 143 | ||
diff --git a/server/lib/user.ts b/server/lib/user.ts index 8609e72d8..a92f4777b 100644 --- a/server/lib/user.ts +++ b/server/lib/user.ts | |||
@@ -3,40 +3,36 @@ import { UserInstance } from '../models' | |||
3 | import { addVideoAuthorToFriends } from './friends' | 3 | import { addVideoAuthorToFriends } from './friends' |
4 | import { createVideoChannel } from './video-channel' | 4 | import { createVideoChannel } from './video-channel' |
5 | 5 | ||
6 | function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { | 6 | async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { |
7 | return db.sequelize.transaction(t => { | 7 | const res = await db.sequelize.transaction(async t => { |
8 | const userOptions = { | 8 | const userOptions = { |
9 | transaction: t, | 9 | transaction: t, |
10 | validate: validateUser | 10 | validate: validateUser |
11 | } | 11 | } |
12 | 12 | ||
13 | return user.save(userOptions) | 13 | const userCreated = await user.save(userOptions) |
14 | .then(user => { | 14 | const authorInstance = db.Author.build({ |
15 | const author = db.Author.build({ | 15 | name: userCreated.username, |
16 | name: user.username, | 16 | podId: null, // It is our pod |
17 | podId: null, // It is our pod | 17 | userId: userCreated.id |
18 | userId: user.id | 18 | }) |
19 | }) | 19 | |
20 | 20 | const authorCreated = await authorInstance.save({ transaction: t }) | |
21 | return author.save({ transaction: t }) | 21 | |
22 | .then(author => ({ author, user })) | 22 | const remoteVideoAuthor = authorCreated.toAddRemoteJSON() |
23 | }) | 23 | |
24 | .then(({ author, user }) => { | 24 | // Now we'll add the video channel's meta data to our friends |
25 | const remoteVideoAuthor = author.toAddRemoteJSON() | 25 | const author = await addVideoAuthorToFriends(remoteVideoAuthor, t) |
26 | 26 | ||
27 | // Now we'll add the video channel's meta data to our friends | 27 | const videoChannelInfo = { |
28 | return addVideoAuthorToFriends(remoteVideoAuthor, t) | 28 | name: `Default ${userCreated.username} channel` |
29 | .then(() => ({ author, user })) | 29 | } |
30 | }) | 30 | const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t) |
31 | .then(({ author, user }) => { | 31 | |
32 | const videoChannelInfo = { | 32 | return { author, videoChannel } |
33 | name: `Default ${user.username} channel` | ||
34 | } | ||
35 | |||
36 | return createVideoChannel(videoChannelInfo, author, t) | ||
37 | .then(videoChannel => ({ author, user, videoChannel })) | ||
38 | }) | ||
39 | }) | 33 | }) |
34 | |||
35 | return res | ||
40 | } | 36 | } |
41 | 37 | ||
42 | // --------------------------------------------------------------------------- | 38 | // --------------------------------------------------------------------------- |
diff --git a/server/middlewares/secure.ts b/server/middlewares/secure.ts index f7424c9c3..5dd809f15 100644 --- a/server/middlewares/secure.ts +++ b/server/middlewares/secure.ts | |||
@@ -8,45 +8,44 @@ import { | |||
8 | } from '../helpers' | 8 | } from '../helpers' |
9 | import { PodSignature } from '../../shared' | 9 | import { PodSignature } from '../../shared' |
10 | 10 | ||
11 | function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) { | 11 | async function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) { |
12 | const signatureObject: PodSignature = req.body.signature | 12 | const signatureObject: PodSignature = req.body.signature |
13 | const host = signatureObject.host | 13 | const host = signatureObject.host |
14 | 14 | ||
15 | db.Pod.loadByHost(host) | 15 | try { |
16 | .then(pod => { | 16 | const pod = await db.Pod.loadByHost(host) |
17 | if (pod === null) { | 17 | if (pod === null) { |
18 | logger.error('Unknown pod %s.', host) | 18 | logger.error('Unknown pod %s.', host) |
19 | return res.sendStatus(403) | 19 | return res.sendStatus(403) |
20 | } | 20 | } |
21 | |||
22 | logger.debug('Checking signature from %s.', host) | ||
23 | 21 | ||
24 | let signatureShouldBe | 22 | logger.debug('Checking signature from %s.', host) |
25 | // If there is data in the body the sender used it for its signature | ||
26 | // If there is no data we just use its host as signature | ||
27 | if (req.body.data) { | ||
28 | signatureShouldBe = req.body.data | ||
29 | } else { | ||
30 | signatureShouldBe = host | ||
31 | } | ||
32 | 23 | ||
33 | const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature) | 24 | let signatureShouldBe |
25 | // If there is data in the body the sender used it for its signature | ||
26 | // If there is no data we just use its host as signature | ||
27 | if (req.body.data) { | ||
28 | signatureShouldBe = req.body.data | ||
29 | } else { | ||
30 | signatureShouldBe = host | ||
31 | } | ||
34 | 32 | ||
35 | if (signatureOk === true) { | 33 | const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature) |
36 | res.locals.secure = { | ||
37 | pod | ||
38 | } | ||
39 | 34 | ||
40 | return next() | 35 | if (signatureOk === true) { |
36 | res.locals.secure = { | ||
37 | pod | ||
41 | } | 38 | } |
42 | 39 | ||
43 | logger.error('Signature is not okay in body for %s.', signatureObject.host) | 40 | return next() |
44 | return res.sendStatus(403) | 41 | } |
45 | }) | 42 | |
46 | .catch(err => { | 43 | logger.error('Signature is not okay in body for %s.', signatureObject.host) |
47 | logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature }) | 44 | return res.sendStatus(403) |
48 | return res.sendStatus(500) | 45 | } catch (err) { |
49 | }) | 46 | logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature }) |
47 | return res.sendStatus(500) | ||
48 | } | ||
50 | } | 49 | } |
51 | 50 | ||
52 | // --------------------------------------------------------------------------- | 51 | // --------------------------------------------------------------------------- |
diff --git a/server/models/pod/pod.ts b/server/models/pod/pod.ts index e4d7db48a..6619726af 100644 --- a/server/models/pod/pod.ts +++ b/server/models/pod/pod.ts | |||
@@ -247,20 +247,21 @@ updatePodsScore = function (goodPods: number[], badPods: number[]) { | |||
247 | // --------------------------------------------------------------------------- | 247 | // --------------------------------------------------------------------------- |
248 | 248 | ||
249 | // Remove pods with a score of 0 (too many requests where they were unreachable) | 249 | // Remove pods with a score of 0 (too many requests where they were unreachable) |
250 | function removeBadPods () { | 250 | async function removeBadPods () { |
251 | return listBadPods() | 251 | try { |
252 | .then(pods => { | 252 | const pods = await listBadPods() |
253 | const podsRemovePromises = pods.map(pod => pod.destroy()) | 253 | |
254 | return Promise.all(podsRemovePromises).then(() => pods.length) | 254 | const podsRemovePromises = pods.map(pod => pod.destroy()) |
255 | }) | 255 | await Promise.all(podsRemovePromises) |
256 | .then(numberOfPodsRemoved => { | 256 | |
257 | if (numberOfPodsRemoved) { | 257 | const numberOfPodsRemoved = pods.length |
258 | logger.info('Removed %d pods.', numberOfPodsRemoved) | 258 | |
259 | } else { | 259 | if (numberOfPodsRemoved) { |
260 | logger.info('No need to remove bad pods.') | 260 | logger.info('Removed %d pods.', numberOfPodsRemoved) |
261 | } | 261 | } else { |
262 | }) | 262 | logger.info('No need to remove bad pods.') |
263 | .catch(err => { | 263 | } |
264 | logger.error('Cannot remove bad pods.', err) | 264 | } catch (err) { |
265 | }) | 265 | logger.error('Cannot remove bad pods.', err) |
266 | } | ||
266 | } | 267 | } |