From f5028693a896a3076dd286ac0030e3d8f78f5ebf Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 25 Oct 2017 16:03:33 +0200 Subject: [PATCH] Use async/await in lib and initializers --- server/helpers/peertube-crypto.ts | 103 +++-- server/helpers/requests.ts | 2 +- server/helpers/utils.ts | 20 +- server/initializers/checker.ts | 46 ++- server/initializers/database.ts | 90 ++--- server/initializers/index.ts | 2 +- server/initializers/installer.ts | 141 ++++--- server/initializers/migrator.ts | 107 +++-- server/lib/cache/videos-preview-cache.ts | 15 +- server/lib/friends.ts | 368 +++++++++--------- .../lib/jobs/handlers/video-file-optimizer.ts | 88 ++--- .../jobs/handlers/video-file-transcoder.ts | 21 +- server/lib/jobs/job-scheduler.ts | 118 +++--- server/lib/oauth-model.ts | 54 ++- .../lib/request/abstract-request-scheduler.ts | 121 +++--- server/lib/request/request-scheduler.ts | 16 +- .../request/request-video-event-scheduler.ts | 20 +- .../request/request-video-qadu-scheduler.ts | 34 +- server/lib/user.ts | 52 ++- server/middlewares/secure.ts | 59 ++- server/models/pod/pod.ts | 33 +- 21 files changed, 740 insertions(+), 770 deletions(-) diff --git a/server/helpers/peertube-crypto.ts b/server/helpers/peertube-crypto.ts index 89aef99c4..47f0243e7 100644 --- a/server/helpers/peertube-crypto.ts +++ b/server/helpers/peertube-crypto.ts @@ -1,5 +1,4 @@ import * as crypto from 'crypto' -import * as Promise from 'bluebird' import { join } from 'path' import { @@ -41,7 +40,7 @@ function checkSignature (publicKey: string, data: string, hexSignature: string) return isValid } -function sign (data: string|Object) { +async function sign (data: string|Object) { const sign = crypto.createSign(SIGNATURE_ALGORITHM) let dataString: string @@ -52,33 +51,33 @@ function sign (data: string|Object) { dataString = JSON.stringify(data) } catch (err) { logger.error('Cannot sign data.', err) - return Promise.resolve('') + return '' } } sign.update(dataString, 'utf8') - return getMyPrivateCert().then(myKey => { - return sign.sign(myKey, SIGNATURE_ENCODING) - }) + const myKey = await getMyPrivateCert() + return await sign.sign(myKey, SIGNATURE_ENCODING) } function comparePassword (plainPassword: string, hashPassword: string) { return bcryptComparePromise(plainPassword, hashPassword) } -function createCertsIfNotExist () { - return certsExist().then(exist => { - if (exist === true) { - return undefined - } +async function createCertsIfNotExist () { + const exist = await certsExist() + if (exist === true) { + return undefined + } - return createCerts() - }) + return await createCerts() } -function cryptPassword (password: string) { - return bcryptGenSaltPromise(BCRYPT_SALT_SIZE).then(salt => bcryptHashPromise(password, salt)) +async function cryptPassword (password: string) { + const salt = await bcryptGenSaltPromise(BCRYPT_SALT_SIZE) + + return await bcryptHashPromise(password, salt) } function getMyPrivateCert () { @@ -105,51 +104,45 @@ export { // --------------------------------------------------------------------------- -function certsExist () { +async function certsExist () { const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) // If there is an error the certificates do not exist - return accessPromise(certPath) - .then(() => true) - .catch(() => false) + try { + await accessPromise(certPath) + + return true + } catch { + return false + } } -function createCerts () { - return certsExist().then(exist => { - if (exist === true) { - const errorMessage = 'Certs already exist.' - logger.warning(errorMessage) - throw new Error(errorMessage) - } +async function createCerts () { + const exist = await certsExist() + if (exist === true) { + const errorMessage = 'Certs already exist.' + logger.warning(errorMessage) + throw new Error(errorMessage) + } - logger.info('Generating a RSA key...') + logger.info('Generating a RSA key...') - const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) - const genRsaOptions = { - 'out': privateCertPath, - '2048': false - } - return opensslExecPromise('genrsa', genRsaOptions) - .then(() => { - logger.info('RSA key generated.') - logger.info('Managing public key...') - - const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub') - const rsaOptions = { - 'in': privateCertPath, - 'pubout': true, - 'out': publicCertPath - } - return opensslExecPromise('rsa', rsaOptions) - .then(() => logger.info('Public key managed.')) - .catch(err => { - logger.error('Cannot create public key on this pod.') - throw err - }) - }) - .catch(err => { - logger.error('Cannot create private key on this pod.') - throw err - }) - }) + const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME) + const genRsaOptions = { + 'out': privateCertPath, + '2048': false + } + + await opensslExecPromise('genrsa', genRsaOptions) + logger.info('RSA key generated.') + logger.info('Managing public key...') + + const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub') + const rsaOptions = { + 'in': privateCertPath, + 'pubout': true, + 'out': publicCertPath + } + + await opensslExecPromise('rsa', rsaOptions) } diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts index d67d46044..8c5d848f3 100644 --- a/server/helpers/requests.ts +++ b/server/helpers/requests.ts @@ -73,7 +73,7 @@ function makeSecureRequest (params: MakeSecureRequestParams) { signature } - // If there are data informations + // If there are data information if (params.data) { requestParams.json.data = params.data } diff --git a/server/helpers/utils.ts b/server/helpers/utils.ts index 6cabe117c..8b81a61e1 100644 --- a/server/helpers/utils.ts +++ b/server/helpers/utils.ts @@ -8,11 +8,13 @@ import { ResultList } from '../../shared' import { VideoResolution } from '../../shared/models/videos/video-resolution.enum' function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) { - res.type('json').status(400).end() + return res.type('json').status(400).end() } -function generateRandomString (size: number) { - return pseudoRandomBytesPromise(size).then(raw => raw.toString('hex')) +async function generateRandomString (size: number) { + const raw = await pseudoRandomBytesPromise(size) + + return raw.toString('hex') } interface FormattableToJSON { @@ -34,19 +36,19 @@ function getFormattedObjects (objects: T[], obje return res } -function isSignupAllowed () { +async function isSignupAllowed () { if (CONFIG.SIGNUP.ENABLED === false) { - return Promise.resolve(false) + return false } // No limit and signup is enabled if (CONFIG.SIGNUP.LIMIT === -1) { - return Promise.resolve(true) + return true } - return db.User.countTotal().then(totalUsers => { - return totalUsers < CONFIG.SIGNUP.LIMIT - }) + const totalUsers = await db.User.countTotal() + + return totalUsers < CONFIG.SIGNUP.LIMIT } function computeResolutionsToTranscode (videoFileHeight: number) { diff --git a/server/initializers/checker.ts b/server/initializers/checker.ts index eb9e9e280..9eaef1695 100644 --- a/server/initializers/checker.ts +++ b/server/initializers/checker.ts @@ -37,39 +37,37 @@ function checkMissedConfig () { // Check the available codecs // We get CONFIG by param to not import it in this file (import orders) -function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) { +async function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) { const Ffmpeg = require('fluent-ffmpeg') const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs) - getAvailableCodecsPromise() - .then(codecs => { - if (CONFIG.TRANSCODING.ENABLED === false) return undefined - - const canEncode = [ 'libx264' ] - canEncode.forEach(codec => { - if (codecs[codec] === undefined) { - throw new Error('Unknown codec ' + codec + ' in FFmpeg.') - } - - if (codecs[codec].canEncode !== true) { - throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg') - } - }) - }) + const codecs = await getAvailableCodecsPromise() + if (CONFIG.TRANSCODING.ENABLED === false) return undefined + + const canEncode = [ 'libx264' ] + for (const codec of canEncode) { + if (codecs[codec] === undefined) { + throw new Error('Unknown codec ' + codec + ' in FFmpeg.') + } + + if (codecs[codec].canEncode !== true) { + throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg') + } + } } // We get db by param to not import it in this file (import orders) -function clientsExist (OAuthClient: OAuthClientModel) { - return OAuthClient.countTotal().then(totalClients => { - return totalClients !== 0 - }) +async function clientsExist (OAuthClient: OAuthClientModel) { + const totalClients = await OAuthClient.countTotal() + + return totalClients !== 0 } // We get db by param to not import it in this file (import orders) -function usersExist (User: UserModel) { - return User.countTotal().then(totalUsers => { - return totalUsers !== 0 - }) +async function usersExist (User: UserModel) { + const totalUsers = await User.countTotal() + + return totalUsers !== 0 } // --------------------------------------------------------------------------- diff --git a/server/initializers/database.ts b/server/initializers/database.ts index ea2b68f59..ade72b62f 100644 --- a/server/initializers/database.ts +++ b/server/initializers/database.ts @@ -2,7 +2,7 @@ import { join } from 'path' import { flattenDepth } from 'lodash' require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string import * as Sequelize from 'sequelize' -import * as Promise from 'bluebird' +import * as Bluebird from 'bluebird' import { CONFIG } from './constants' // Do not use barrel, we need to load database first @@ -77,26 +77,26 @@ const sequelize = new Sequelize(dbname, username, password, { database.sequelize = sequelize -database.init = (silent: boolean) => { +database.init = async (silent: boolean) => { const modelDirectory = join(__dirname, '..', 'models') - return getModelFiles(modelDirectory).then(filePaths => { - filePaths.forEach(filePath => { - const model = sequelize.import(filePath) + const filePaths = await getModelFiles(modelDirectory) - database[model['name']] = model - }) + for (const filePath of filePaths) { + const model = sequelize.import(filePath) - Object.keys(database).forEach(modelName => { - if ('associate' in database[modelName]) { - database[modelName].associate(database) - } - }) + database[model['name']] = model + } - if (!silent) logger.info('Database %s is ready.', dbname) + for (const modelName of Object.keys(database)) { + if ('associate' in database[modelName]) { + database[modelName].associate(database) + } + } - return undefined - }) + if (!silent) logger.info('Database %s is ready.', dbname) + + return undefined } // --------------------------------------------------------------------------- @@ -107,31 +107,29 @@ export { // --------------------------------------------------------------------------- -function getModelFiles (modelDirectory: string) { - return readdirPromise(modelDirectory) - .then(files => { - const directories: string[] = files.filter(directory => { - // Find directories - if ( - directory.endsWith('.js.map') || - directory === 'index.js' || directory === 'index.ts' || - directory === 'utils.js' || directory === 'utils.ts' - ) return false - - return true - }) +async function getModelFiles (modelDirectory: string) { + const files = await readdirPromise(modelDirectory) + const directories = files.filter(directory => { + // Find directories + if ( + directory.endsWith('.js.map') || + directory === 'index.js' || directory === 'index.ts' || + directory === 'utils.js' || directory === 'utils.ts' + ) return false + + return true + }) - return directories - }) - .then(directories => { - const tasks = [] + const tasks: Bluebird[] = [] - // For each directory we read it and append model in the modelFilePaths array - directories.forEach(directory => { - const modelDirectoryPath = join(modelDirectory, directory) + // For each directory we read it and append model in the modelFilePaths array + for (const directory of directories) { + const modelDirectoryPath = join(modelDirectory, directory) - const promise = readdirPromise(modelDirectoryPath).then(files => { - const filteredFiles = files.filter(file => { + const promise = readdirPromise(modelDirectoryPath) + .then(files => { + const filteredFiles = files + .filter(file => { if ( file === 'index.js' || file === 'index.ts' || file === 'utils.js' || file === 'utils.ts' || @@ -140,17 +138,15 @@ function getModelFiles (modelDirectory: string) { ) return false return true - }).map(file => join(modelDirectoryPath, file)) - - return filteredFiles - }) + }) + .map(file => join(modelDirectoryPath, file)) - tasks.push(promise) + return filteredFiles }) - return Promise.all(tasks) - }) - .then((filteredFiles: string[][]) => { - return flattenDepth(filteredFiles, 1) - }) + tasks.push(promise) + } + + const filteredFilesArray: string[][] = await Promise.all(tasks) + return flattenDepth(filteredFilesArray, 1) } diff --git a/server/initializers/index.ts b/server/initializers/index.ts index b8400ff84..332702774 100644 --- a/server/initializers/index.ts +++ b/server/initializers/index.ts @@ -1,4 +1,4 @@ -// Constants first, databse in second! +// Constants first, database in second! export * from './constants' export * from './database' export * from './checker' diff --git a/server/initializers/installer.ts b/server/initializers/installer.ts index b997de07f..4c04290fc 100644 --- a/server/initializers/installer.ts +++ b/server/initializers/installer.ts @@ -1,5 +1,5 @@ import * as passwordGenerator from 'password-generator' -import * as Promise from 'bluebird' +import * as Bluebird from 'bluebird' import { database as db } from './database' import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants' @@ -7,13 +7,13 @@ import { clientsExist, usersExist } from './checker' import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers' import { createUserAuthorAndChannel } from '../lib' -function installApplication () { - return db.sequelize.sync() - .then(() => removeCacheDirectories()) - .then(() => createDirectoriesIfNotExist()) - .then(() => createCertsIfNotExist()) - .then(() => createOAuthClientIfNotExist()) - .then(() => createOAuthAdminIfNotExist()) +async function installApplication () { + await db.sequelize.sync() + await removeCacheDirectories() + await createDirectoriesIfNotExist() + await createCertsIfNotExist() + await createOAuthClientIfNotExist() + await createOAuthAdminIfNotExist() } // --------------------------------------------------------------------------- @@ -27,13 +27,13 @@ export { function removeCacheDirectories () { const cacheDirectories = CACHE.DIRECTORIES - const tasks = [] + const tasks: Bluebird[] = [] // Cache directories - Object.keys(cacheDirectories).forEach(key => { + for (const key of Object.keys(cacheDirectories)) { const dir = cacheDirectories[key] tasks.push(rimrafPromise(dir)) - }) + } return Promise.all(tasks) } @@ -43,88 +43,83 @@ function createDirectoriesIfNotExist () { const cacheDirectories = CACHE.DIRECTORIES const tasks = [] - Object.keys(storage).forEach(key => { + for (const key of Object.keys(storage)) { const dir = storage[key] tasks.push(mkdirpPromise(dir)) - }) + } // Cache directories - Object.keys(cacheDirectories).forEach(key => { + for (const key of Object.keys(cacheDirectories)) { const dir = cacheDirectories[key] tasks.push(mkdirpPromise(dir)) - }) + } return Promise.all(tasks) } -function createOAuthClientIfNotExist () { - return clientsExist(db.OAuthClient).then(exist => { - // Nothing to do, clients already exist - if (exist === true) return undefined - - logger.info('Creating a default OAuth Client.') - - const id = passwordGenerator(32, false, /[a-z0-9]/) - const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/) - const client = db.OAuthClient.build({ - clientId: id, - clientSecret: secret, - grants: [ 'password', 'refresh_token' ], - redirectUris: null - }) +async function createOAuthClientIfNotExist () { + const exist = await clientsExist(db.OAuthClient) + // Nothing to do, clients already exist + if (exist === true) return undefined - return client.save().then(createdClient => { - logger.info('Client id: ' + createdClient.clientId) - logger.info('Client secret: ' + createdClient.clientSecret) + logger.info('Creating a default OAuth Client.') - return undefined - }) + const id = passwordGenerator(32, false, /[a-z0-9]/) + const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/) + const client = db.OAuthClient.build({ + clientId: id, + clientSecret: secret, + grants: [ 'password', 'refresh_token' ], + redirectUris: null }) -} -function createOAuthAdminIfNotExist () { - return usersExist(db.User).then(exist => { - // Nothing to do, users already exist - if (exist === true) return undefined + const createdClient = await client.save() + logger.info('Client id: ' + createdClient.clientId) + logger.info('Client secret: ' + createdClient.clientSecret) - logger.info('Creating the administrator.') + return undefined +} - const username = 'root' - const role = USER_ROLES.ADMIN - const email = CONFIG.ADMIN.EMAIL - let validatePassword = true - let password = '' +async function createOAuthAdminIfNotExist () { + const exist = await usersExist(db.User) + // Nothing to do, users already exist + if (exist === true) return undefined - // Do not generate a random password for tests - if (process.env.NODE_ENV === 'test') { - password = 'test' + logger.info('Creating the administrator.') - if (process.env.NODE_APP_INSTANCE) { - password += process.env.NODE_APP_INSTANCE - } + const username = 'root' + const role = USER_ROLES.ADMIN + const email = CONFIG.ADMIN.EMAIL + let validatePassword = true + let password = '' - // Our password is weak so do not validate it - validatePassword = false - } else { - password = passwordGenerator(8, true) - } + // Do not generate a random password for tests + if (process.env.NODE_ENV === 'test') { + password = 'test' - const userData = { - username, - email, - password, - role, - videoQuota: -1 + if (process.env.NODE_APP_INSTANCE) { + password += process.env.NODE_APP_INSTANCE } - const user = db.User.build(userData) - return createUserAuthorAndChannel(user, validatePassword) - .then(({ user }) => { - logger.info('Username: ' + username) - logger.info('User password: ' + password) - - logger.info('Creating Application table.') - return db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION }) - }) - }) + // Our password is weak so do not validate it + validatePassword = false + } else { + password = passwordGenerator(8, true) + } + + const userData = { + username, + email, + password, + role, + videoQuota: -1 + } + const user = db.User.build(userData) + + await createUserAuthorAndChannel(user, validatePassword) + logger.info('Username: ' + username) + logger.info('User password: ' + password) + + logger.info('Creating Application table.') + await db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION }) } diff --git a/server/initializers/migrator.ts b/server/initializers/migrator.ts index 7b535aea9..4fbe1cf5b 100644 --- a/server/initializers/migrator.ts +++ b/server/initializers/migrator.ts @@ -1,52 +1,35 @@ import * as path from 'path' -import * as Promise from 'bluebird' import { database as db } from './database' import { LAST_MIGRATION_VERSION } from './constants' import { logger, readdirPromise } from '../helpers' -function migrate () { - const p = db.sequelize.getQueryInterface().showAllTables() - .then(tables => { - // No tables, we don't need to migrate anything - // The installer will do that - if (tables.length === 0) throw null - }) - .then(() => { - return db.Application.loadMigrationVersion() - }) - .then(actualVersion => { - if (actualVersion === null) { - return db.Application.create({ migrationVersion: 0 }).then(() => 0) - } +async function migrate () { + const tables = await db.sequelize.getQueryInterface().showAllTables() - return actualVersion - }) - .then(actualVersion => { - // No need migrations, abort - if (actualVersion >= LAST_MIGRATION_VERSION) throw null + // No tables, we don't need to migrate anything + // The installer will do that + if (tables.length === 0) return - return actualVersion - }) - .then(actualVersion => { - // If there are a new migration scripts - logger.info('Begin migrations.') + let actualVersion = await db.Application.loadMigrationVersion() + if (actualVersion === null) { + await db.Application.create({ migrationVersion: 0 }) + actualVersion = 0 + } - return getMigrationScripts().then(migrationScripts => ({ actualVersion, migrationScripts })) - }) - .then(({ actualVersion, migrationScripts }) => { - return Promise.each(migrationScripts, entity => executeMigration(actualVersion, entity)) - }) - .then(() => { - logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION) - }) - .catch(err => { - if (err === null) return undefined + // No need migrations, abort + if (actualVersion >= LAST_MIGRATION_VERSION) return - throw err - }) + // If there are a new migration scripts + logger.info('Begin migrations.') + + const migrationScripts = await getMigrationScripts() - return p + for (const migrationScript of migrationScripts) { + await executeMigration(actualVersion, migrationScript) + } + + logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION) } // --------------------------------------------------------------------------- @@ -57,29 +40,28 @@ export { // --------------------------------------------------------------------------- -function getMigrationScripts () { - return readdirPromise(path.join(__dirname, 'migrations')).then(files => { - const filesToMigrate: { - version: string, - script: string - }[] = [] - - files - .filter(file => file.endsWith('.js.map') === false) - .forEach(file => { - // Filename is something like 'version-blabla.js' - const version = file.split('-')[0] - filesToMigrate.push({ - version, - script: file - }) +async function getMigrationScripts () { + const files = await readdirPromise(path.join(__dirname, 'migrations')) + const filesToMigrate: { + version: string, + script: string + }[] = [] + + files + .filter(file => file.endsWith('.js.map') === false) + .forEach(file => { + // Filename is something like 'version-blabla.js' + const version = file.split('-')[0] + filesToMigrate.push({ + version, + script: file }) + }) - return filesToMigrate - }) + return filesToMigrate } -function executeMigration (actualVersion: number, entity: { version: string, script: string }) { +async function executeMigration (actualVersion: number, entity: { version: string, script: string }) { const versionScript = parseInt(entity.version, 10) // Do not execute old migration scripts @@ -91,7 +73,7 @@ function executeMigration (actualVersion: number, entity: { version: string, scr const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName)) - return db.sequelize.transaction(t => { + await db.sequelize.transaction(async t => { const options = { transaction: t, queryInterface: db.sequelize.getQueryInterface(), @@ -99,10 +81,9 @@ function executeMigration (actualVersion: number, entity: { version: string, scr db } - return migrationScript.up(options) - .then(() => { - // Update the new migration version - return db.Application.updateMigrationVersion(versionScript, t) - }) + await migrationScript.up(options) + + // Update the new migration version + await db.Application.updateMigrationVersion(versionScript, t) }) } diff --git a/server/lib/cache/videos-preview-cache.ts b/server/lib/cache/videos-preview-cache.ts index fecdca6ef..0fe4d2f78 100644 --- a/server/lib/cache/videos-preview-cache.ts +++ b/server/lib/cache/videos-preview-cache.ts @@ -1,7 +1,6 @@ import * as asyncLRU from 'async-lru' import { join } from 'path' import { createWriteStream } from 'fs' -import * as Promise from 'bluebird' import { database as db, CONFIG, CACHE } from '../../initializers' import { logger, unlinkPromise } from '../../helpers' @@ -43,15 +42,15 @@ class VideosPreviewCache { }) } - private loadPreviews (key: string) { - return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) - .then(video => { - if (!video) return undefined + private async loadPreviews (key: string) { + const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key) + if (!video) return undefined - if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) + if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName()) - return this.saveRemotePreviewAndReturnPath(video) - }) + const res = await this.saveRemotePreviewAndReturnPath(video) + + return res } private saveRemotePreviewAndReturnPath (video: VideoInstance) { diff --git a/server/lib/friends.ts b/server/lib/friends.ts index f035b099b..a33432dc1 100644 --- a/server/lib/friends.ts +++ b/server/lib/friends.ts @@ -1,6 +1,6 @@ import * as request from 'request' import * as Sequelize from 'sequelize' -import * as Promise from 'bluebird' +import * as Bluebird from 'bluebird' import { join } from 'path' import { database as db } from '../initializers/database' @@ -188,159 +188,158 @@ function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize. function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) { const tasks = [] - eventsParams.forEach(eventParams => { + for (const eventParams of eventsParams) { tasks.push(addEventToRemoteVideo(eventParams, transaction)) - }) + } return Promise.all(tasks) } -function hasFriends () { - return db.Pod.countAll().then(count => count !== 0) +async function hasFriends () { + const count = await db.Pod.countAll() + + return count !== 0 } -function makeFriends (hosts: string[]) { +async function makeFriends (hosts: string[]) { const podsScore = {} logger.info('Make friends!') - return getMyPublicCert() - .then(cert => { - return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert) - }) - .then(cert => { - logger.debug('Pods scores computed.', { podsScore: podsScore }) - const podsList = computeWinningPods(hosts, podsScore) - logger.debug('Pods that we keep.', { podsToKeep: podsList }) + const cert = await getMyPublicCert() - return makeRequestsToWinningPods(cert, podsList) - }) + for (const host of hosts) { + await computeForeignPodsList(host, podsScore) + } + + logger.debug('Pods scores computed.', { podsScore: podsScore }) + + const podsList = computeWinningPods(hosts, podsScore) + logger.debug('Pods that we keep.', { podsToKeep: podsList }) + + return makeRequestsToWinningPods(cert, podsList) } -function quitFriends () { +async function quitFriends () { // Stop pool requests requestScheduler.deactivate() - return requestScheduler.flush() - .then(() => { - return requestVideoQaduScheduler.flush() - }) - .then(() => { - return db.Pod.list() - }) - .then(pods => { - const requestParams = { - method: 'POST' as 'POST', - path: '/api/' + API_VERSION + '/remote/pods/remove', - toPod: null - } + try { + await requestScheduler.flush() + + await requestVideoQaduScheduler.flush() + + const pods = await db.Pod.list() + const requestParams = { + method: 'POST' as 'POST', + path: '/api/' + API_VERSION + '/remote/pods/remove', + toPod: null + } - // Announce we quit them - // We don't care if the request fails - // The other pod will exclude us automatically after a while - return Promise.map(pods, pod => { + // Announce we quit them + // We don't care if the request fails + // The other pod will exclude us automatically after a while + try { + await Bluebird.map(pods, pod => { requestParams.toPod = pod return makeSecureRequest(requestParams) }, { concurrency: REQUESTS_IN_PARALLEL }) - .then(() => pods) - .catch(err => { - logger.error('Some errors while quitting friends.', err) - // Don't stop the process - return pods - }) - }) - .then(pods => { - const tasks = [] - pods.forEach(pod => tasks.push(pod.destroy())) + } catch (err) { // Don't stop the process + logger.error('Some errors while quitting friends.', err) + } - return Promise.all(pods) - }) - .then(() => { - logger.info('Removed all remote videos.') - // Don't forget to re activate the scheduler, even if there was an error - return requestScheduler.activate() - }) - .finally(() => requestScheduler.activate()) + const tasks = [] + for (const pod of pods) { + tasks.push(pod.destroy()) + } + await Promise.all(pods) + + logger.info('Removed all remote videos.') + + requestScheduler.activate() + } catch (err) { + // Don't forget to re activate the scheduler, even if there was an error + requestScheduler.activate() + + throw err + } } -function sendOwnedDataToPod (podId: number) { +async function sendOwnedDataToPod (podId: number) { // First send authors - return sendOwnedAuthorsToPod(podId) - .then(() => sendOwnedChannelsToPod(podId)) - .then(() => sendOwnedVideosToPod(podId)) + await sendOwnedAuthorsToPod(podId) + await sendOwnedChannelsToPod(podId) + await sendOwnedVideosToPod(podId) +} + +async function sendOwnedChannelsToPod (podId: number) { + const videoChannels = await db.VideoChannel.listOwned() + + const tasks: Promise[] = [] + for (const videoChannel of videoChannels) { + const remoteVideoChannel = videoChannel.toAddRemoteJSON() + const options = { + type: 'add-channel' as 'add-channel', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteVideoChannel, + toIds: [ podId ], + transaction: null + } + + const p = createRequest(options) + tasks.push(p) + } + + await Promise.all(tasks) } -function sendOwnedChannelsToPod (podId: number) { - return db.VideoChannel.listOwned() - .then(videoChannels => { - const tasks = [] - videoChannels.forEach(videoChannel => { - const remoteVideoChannel = videoChannel.toAddRemoteJSON() - const options = { - type: 'add-channel' as 'add-channel', - endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideoChannel, - toIds: [ podId ], - transaction: null - } +async function sendOwnedAuthorsToPod (podId: number) { + const authors = await db.Author.listOwned() + const tasks: Promise[] = [] - const p = createRequest(options) - tasks.push(p) - }) + for (const author of authors) { + const remoteAuthor = author.toAddRemoteJSON() + const options = { + type: 'add-author' as 'add-author', + endpoint: REQUEST_ENDPOINTS.VIDEOS, + data: remoteAuthor, + toIds: [ podId ], + transaction: null + } - return Promise.all(tasks) - }) + const p = createRequest(options) + tasks.push(p) + } + + await Promise.all(tasks) } -function sendOwnedAuthorsToPod (podId: number) { - return db.Author.listOwned() - .then(authors => { - const tasks = [] - authors.forEach(author => { - const remoteAuthor = author.toAddRemoteJSON() +async function sendOwnedVideosToPod (podId: number) { + const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags() + const tasks: Bluebird[] = [] + + for (const video of videosList) { + const promise = video.toAddRemoteJSON() + .then(remoteVideo => { const options = { - type: 'add-author' as 'add-author', + type: 'add-video' as 'add-video', endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteAuthor, + data: remoteVideo, toIds: [ podId ], transaction: null } - - const p = createRequest(options) - tasks.push(p) + return createRequest(options) }) - - return Promise.all(tasks) - }) -} - -function sendOwnedVideosToPod (podId: number) { - return db.Video.listOwnedAndPopulateAuthorAndTags() - .then(videosList => { - const tasks = [] - videosList.forEach(video => { - const promise = video.toAddRemoteJSON() - .then(remoteVideo => { - const options = { - type: 'add-video' as 'add-video', - endpoint: REQUEST_ENDPOINTS.VIDEOS, - data: remoteVideo, - toIds: [ podId ], - transaction: null - } - return createRequest(options) - }) - .catch(err => { - logger.error('Cannot convert video to remote.', err) - // Don't break the process - return undefined - }) - - tasks.push(promise) + .catch(err => { + logger.error('Cannot convert video to remote.', err) + // Don't break the process + return undefined }) - return Promise.all(tasks) - }) + tasks.push(promise) + } + + await Promise.all(tasks) } function fetchRemotePreview (video: VideoInstance) { @@ -350,18 +349,26 @@ function fetchRemotePreview (video: VideoInstance) { return request.get(REMOTE_SCHEME.HTTP + '://' + host + path) } -function removeFriend (pod: PodInstance) { +async function removeFriend (pod: PodInstance) { const requestParams = { method: 'POST' as 'POST', path: '/api/' + API_VERSION + '/remote/pods/remove', toPod: pod } - return makeSecureRequest(requestParams) - .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)) - .then(() => pod.destroy()) - .then(() => logger.info('Removed friend %s.', pod.host)) - .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err)) + try { + await makeSecureRequest(requestParams) + } catch (err) { + logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err) + } + + try { + await pod.destroy() + + logger.info('Removed friend %s.', pod.host) + } catch (err) { + logger.error('Cannot destroy friend %s.', pod.host, err) + } } function getRequestScheduler () { @@ -406,23 +413,21 @@ export { // --------------------------------------------------------------------------- -function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { - // TODO: type res - return getForeignPodsList(host).then(res => { - const foreignPodsList: { host: string }[] = res.data +async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) { + const result = await getForeignPodsList(host) + const foreignPodsList: { host: string }[] = result.data - // Let's give 1 point to the pod we ask the friends list - foreignPodsList.push({ host }) + // Let's give 1 point to the pod we ask the friends list + foreignPodsList.push({ host }) - foreignPodsList.forEach(foreignPod => { - const foreignPodHost = foreignPod.host + for (const foreignPod of foreignPodsList) { + const foreignPodHost = foreignPod.host - if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ - else podsScore[foreignPodHost] = 1 - }) + if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++ + else podsScore[foreignPodHost] = 1 + } - return undefined - }) + return undefined } function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) { @@ -431,12 +436,12 @@ function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: num const podsList = [] const baseScore = hosts.length / 2 - Object.keys(podsScore).forEach(podHost => { + for (const podHost of Object.keys(podsScore)) { // If the pod is not me and with a good score we add it if (isMe(podHost) === false && podsScore[podHost] > baseScore) { podsList.push({ host: podHost }) } - }) + } return podsList } @@ -449,7 +454,7 @@ function getForeignPodsList (host: string) { if (err) return rej(err) try { - const json = JSON.parse(body) + const json: ResultList = JSON.parse(body) return res(json) } catch (err) { return rej(err) @@ -458,53 +463,53 @@ function getForeignPodsList (host: string) { }) } -function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { +async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) { // Stop pool requests requestScheduler.deactivate() // Flush pool requests requestScheduler.forceSend() - return Promise.map(podsList, pod => { - const params = { - url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', - method: 'POST' as 'POST', - json: { - host: CONFIG.WEBSERVER.HOST, - email: CONFIG.ADMIN.EMAIL, - publicKey: cert + try { + await Bluebird.map(podsList, async pod => { + const params = { + url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add', + method: 'POST' as 'POST', + json: { + host: CONFIG.WEBSERVER.HOST, + email: CONFIG.ADMIN.EMAIL, + publicKey: cert + } } - } - return makeRetryRequest(params) - .then(({ response, body }) => { - body = body as { cert: string, email: string } - - if (response.statusCode === 200) { - const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email }) - return podObj.save() - .then(podCreated => { - - // Add our videos to the request scheduler - sendOwnedDataToPod(podCreated.id) - }) - .catch(err => { - logger.error('Cannot add friend %s pod.', pod.host, err) - }) - } else { - logger.error('Status not 200 for %s pod.', pod.host) + const { response, body } = await makeRetryRequest(params) + const typedBody = body as { cert: string, email: string } + + if (response.statusCode === 200) { + const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email }) + + let podCreated: PodInstance + try { + podCreated = await podObj.save() + } catch (err) { + logger.error('Cannot add friend %s pod.', pod.host, err) } - }) - .catch(err => { - logger.error('Error with adding %s pod.', pod.host, { error: err.stack }) - // Don't break the process - }) - }, { concurrency: REQUESTS_IN_PARALLEL }) - .then(() => logger.debug('makeRequestsToWinningPods finished.')) - .finally(() => { + + // Add our videos to the request scheduler + sendOwnedDataToPod(podCreated.id) + .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err)) + } else { + logger.error('Status not 200 for %s pod.', pod.host) + } + }, { concurrency: REQUESTS_IN_PARALLEL }) + + logger.debug('makeRequestsToWinningPods finished.') + + requestScheduler.activate() + } catch (err) { // Final callback, we've ended all the requests // Now we made new friends, we can re activate the pool of requests requestScheduler.activate() - }) + } } // Wrapper that populate "toIds" argument with all our friends if it is not specified @@ -515,14 +520,19 @@ type CreateRequestOptions = { toIds?: number[] transaction: Sequelize.Transaction } -function createRequest (options: CreateRequestOptions) { - if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions) +async function createRequest (options: CreateRequestOptions) { + if (options.toIds !== undefined) { + await requestScheduler.createRequest(options as RequestSchedulerOptions) + return undefined + } // If the "toIds" pods is not specified, we send the request to all our friends - return db.Pod.listAllIds(options.transaction).then(podIds => { - const newOptions = Object.assign(options, { toIds: podIds }) - return requestScheduler.createRequest(newOptions) - }) + const podIds = await db.Pod.listAllIds(options.transaction) + + const newOptions = Object.assign(options, { toIds: podIds }) + await requestScheduler.createRequest(newOptions) + + return undefined } function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) { diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/handlers/video-file-optimizer.ts index 63a51064c..799ba8b01 100644 --- a/server/lib/jobs/handlers/video-file-optimizer.ts +++ b/server/lib/jobs/handlers/video-file-optimizer.ts @@ -1,4 +1,4 @@ -import * as Promise from 'bluebird' +import * as Bluebird from 'bluebird' import { database as db } from '../../../initializers/database' import { logger, computeResolutionsToTranscode } from '../../../helpers' @@ -6,16 +6,17 @@ import { VideoInstance } from '../../../models' import { addVideoToFriends } from '../../friends' import { JobScheduler } from '../job-scheduler' -function process (data: { videoUUID: string }, jobId: number) { - return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } +async function process (data: { videoUUID: string }, jobId: number) { + const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) + return undefined + } + + await video.optimizeOriginalVideofile() - return video.optimizeOriginalVideofile().then(() => video) - }) + return video } function onError (err: Error, jobId: number) { @@ -23,33 +24,31 @@ function onError (err: Error, jobId: number) { return Promise.resolve() } -function onSuccess (jobId: number, video: VideoInstance) { +async function onSuccess (jobId: number, video: VideoInstance) { if (video === undefined) return undefined logger.info('Job %d is a success.', jobId) - video.toAddRemoteJSON() - .then(remoteVideo => { - // Now we'll add the video's meta data to our friends - return addVideoToFriends(remoteVideo, null) - }) - .then(() => { - return video.getOriginalFileHeight() - }) - .then(originalFileHeight => { - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, - { resolutions: resolutionsEnabled } - ) - - if (resolutionsEnabled.length === 0) return undefined - - return db.sequelize.transaction(t => { - const tasks: Promise[] = [] - - resolutionsEnabled.forEach(resolution => { + const remoteVideo = await video.toAddRemoteJSON() + + // Now we'll add the video's meta data to our friends + await addVideoToFriends(remoteVideo, null) + + const originalFileHeight = await video.getOriginalFileHeight() + // Create transcoding jobs if there are enabled resolutions + + const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length !== 0) { + try { + await db.sequelize.transaction(async t => { + const tasks: Bluebird[] = [] + + for (const resolution of resolutionsEnabled) { const dataInput = { videoUUID: video.uuid, resolution @@ -57,24 +56,19 @@ function onSuccess (jobId: number, video: VideoInstance) { const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) tasks.push(p) - }) + } - return Promise.all(tasks).then(() => resolutionsEnabled) + await Promise.all(tasks) }) - }) - .then(resolutionsEnabled => { - if (resolutionsEnabled === undefined) { - logger.info('No transcoding jobs created for video %s (no resolutions enabled).') - return undefined - } logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) - }) - .catch((err: Error) => { - logger.debug('Cannot transcode the video.', err) - throw err - }) - + } catch (err) { + logger.warn('Cannot transcode the video.', err) + } + } else { + logger.info('No transcoding jobs created for video %s (no resolutions enabled).') + return undefined + } } // --------------------------------------------------------------------------- diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/handlers/video-file-transcoder.ts index 0dafee566..b240ff58a 100644 --- a/server/lib/jobs/handlers/video-file-transcoder.ts +++ b/server/lib/jobs/handlers/video-file-transcoder.ts @@ -4,16 +4,17 @@ import { logger } from '../../../helpers' import { VideoInstance } from '../../../models' import { VideoResolution } from '../../../../shared' -function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { - return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => { - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - return video.transcodeOriginalVideofile(data.resolution).then(() => video) - }) +async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { + const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) + return undefined + } + + await video.transcodeOriginalVideofile(data.resolution) + + return video } function onError (err: Error, jobId: number) { diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index c2409d20c..61d483268 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts @@ -23,7 +23,7 @@ class JobScheduler { return this.instance || (this.instance = new this()) } - activate () { + async activate () { const limit = JOBS_FETCH_LIMIT_PER_CYCLE logger.info('Jobs scheduler activated.') @@ -32,32 +32,36 @@ class JobScheduler { // Finish processing jobs from a previous start const state = JOB_STATES.PROCESSING - db.Job.listWithLimit(limit, state) - .then(jobs => { - this.enqueueJobs(jobsQueue, jobs) - - forever( - next => { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, JOBS_FETCHING_INTERVAL) - } - - const state = JOB_STATES.PENDING - db.Job.listWithLimit(limit, state) - .then(jobs => { - this.enqueueJobs(jobsQueue, jobs) - - // Optimization: we could use "drain" from queue object - return setTimeout(next, JOBS_FETCHING_INTERVAL) - }) - .catch(err => logger.error('Cannot list pending jobs.', err)) - }, - - err => logger.error('Error in job scheduler queue.', err) - ) - }) - .catch(err => logger.error('Cannot list pending jobs.', err)) + try { + const jobs = await db.Job.listWithLimit(limit, state) + + this.enqueueJobs(jobsQueue, jobs) + } catch (err) { + logger.error('Cannot list pending jobs.', err) + } + + forever( + async next => { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, JOBS_FETCHING_INTERVAL) + } + + const state = JOB_STATES.PENDING + try { + const jobs = await db.Job.listWithLimit(limit, state) + + this.enqueueJobs(jobsQueue, jobs) + } catch (err) { + logger.error('Cannot list pending jobs.', err) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, JOBS_FETCHING_INTERVAL) + }, + + err => logger.error('Error in job scheduler queue.', err) + ) } createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { @@ -75,7 +79,7 @@ class JobScheduler { jobs.forEach(job => jobsQueue.push(job)) } - private processJob (job: JobInstance, callback: (err: Error) => void) { + private async processJob (job: JobInstance, callback: (err: Error) => void) { const jobHandler = jobHandlers[job.handlerName] if (jobHandler === undefined) { logger.error('Unknown job handler for job %s.', job.handlerName) @@ -85,41 +89,45 @@ class JobScheduler { logger.info('Processing job %d with handler %s.', job.id, job.handlerName) job.state = JOB_STATES.PROCESSING - return job.save() - .then(() => { - return jobHandler.process(job.handlerInputData, job.id) - }) - .then( - result => { - return this.onJobSuccess(jobHandler, job, result) - }, - - err => { - logger.error('Error in job handler %s.', job.handlerName, err) - return this.onJobError(jobHandler, job, err) - } - ) - .then(() => callback(null)) - .catch(err => { - this.cannotSaveJobError(err) - return callback(err) - }) + await job.save() + + try { + const result = await jobHandler.process(job.handlerInputData, job.id) + await this.onJobSuccess(jobHandler, job, result) + } catch (err) { + logger.error('Error in job handler %s.', job.handlerName, err) + + try { + await this.onJobError(jobHandler, job, err) + } catch (innerErr) { + this.cannotSaveJobError(innerErr) + return callback(innerErr) + } + } + + callback(null) } - private onJobError (jobHandler: JobHandler, job: JobInstance, err: Error) { + private async onJobError (jobHandler: JobHandler, job: JobInstance, err: Error) { job.state = JOB_STATES.ERROR - return job.save() - .then(() => jobHandler.onError(err, job.id)) - .catch(err => this.cannotSaveJobError(err)) + try { + await job.save() + await jobHandler.onError(err, job.id) + } catch (err) { + this.cannotSaveJobError(err) + } } - private onJobSuccess (jobHandler: JobHandler, job: JobInstance, jobResult: any) { + private async onJobSuccess (jobHandler: JobHandler, job: JobInstance, jobResult: any) { job.state = JOB_STATES.SUCCESS - return job.save() - .then(() => jobHandler.onSuccess(job.id, jobResult)) - .catch(err => this.cannotSaveJobError(err)) + try { + await job.save() + jobHandler.onSuccess(job.id, jobResult) + } catch (err) { + this.cannotSaveJobError(err) + } } private cannotSaveJobError (err: Error) { diff --git a/server/lib/oauth-model.ts b/server/lib/oauth-model.ts index 9aa3ea52f..d91b00c55 100644 --- a/server/lib/oauth-model.ts +++ b/server/lib/oauth-model.ts @@ -24,39 +24,36 @@ function getRefreshToken (refreshToken: string) { return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken) } -function getUser (username: string, password: string) { +async function getUser (username: string, password: string) { logger.debug('Getting User (username: ' + username + ', password: ******).') - return db.User.getByUsername(username).then(user => { - if (!user) return null + const user = await db.User.getByUsername(username) + if (!user) return null - return user.isPasswordMatch(password).then(passwordMatch => { - if (passwordMatch === false) return null + const passwordMatch = await user.isPasswordMatch(password) + if (passwordMatch === false) return null - return user - }) - }) + return user } -function revokeToken (token: TokenInfo) { - return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => { - if (tokenDB) tokenDB.destroy() - - /* - * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js - * "As per the discussion we need set older date - * revokeToken will expected return a boolean in future version - * https://github.com/oauthjs/node-oauth2-server/pull/274 - * https://github.com/oauthjs/node-oauth2-server/issues/290" - */ - const expiredToken = tokenDB - expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') - - return expiredToken - }) +async function revokeToken (tokenInfo: TokenInfo) { + const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken) + if (token) token.destroy() + + /* + * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js + * "As per the discussion we need set older date + * revokeToken will expected return a boolean in future version + * https://github.com/oauthjs/node-oauth2-server/pull/274 + * https://github.com/oauthjs/node-oauth2-server/issues/290" + */ + const expiredToken = token + expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z') + + return expiredToken } -function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { +async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) { logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.') const tokenToCreate = { @@ -68,11 +65,10 @@ function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserIns userId: user.id } - return db.OAuthToken.create(tokenToCreate).then(tokenCreated => { - const tokenToReturn = Object.assign(tokenCreated, { client, user }) + const tokenCreated = await db.OAuthToken.create(tokenToCreate) + const tokenToReturn = Object.assign(tokenCreated, { client, user }) - return tokenToReturn - }) + return tokenToReturn } // --------------------------------------------------------------------------- diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts index ce4e2ffd2..f46682824 100644 --- a/server/lib/request/abstract-request-scheduler.ts +++ b/server/lib/request/abstract-request-scheduler.ts @@ -1,5 +1,5 @@ import { isEmpty } from 'lodash' -import * as Promise from 'bluebird' +import * as Bluebird from 'bluebird' import { database as db } from '../../initializers/database' import { logger, makeSecureRequest } from '../../helpers' @@ -76,7 +76,7 @@ abstract class AbstractRequestScheduler { // --------------------------------------------------------------------------- // Make a requests to friends of a certain type - protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { + protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { const params = { toPod: toPod, method: 'POST' as 'POST', @@ -86,72 +86,75 @@ abstract class AbstractRequestScheduler { // Make multiple retry requests to all of pods // The function fire some useful callbacks - return makeSecureRequest(params) - .then(({ response, body }) => { - if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { - throw new Error('Status code not 20x : ' + response.statusCode) - } - }) - .catch(err => { - logger.error('Error sending secure request to %s pod.', toPod.host, err) - - throw err - }) + try { + const { response } = await makeSecureRequest(params) + if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) { + throw new Error('Status code not 20x : ' + response.statusCode) + } + } catch (err) { + logger.error('Error sending secure request to %s pod.', toPod.host, err) + + throw err + } } // Make all the requests of the scheduler - protected makeRequests () { - return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) - .then((requestsGrouped: T) => { - // We want to group requests by destinations pod and endpoint - const requestsToMake = this.buildRequestsObjects(requestsGrouped) - - // If there are no requests, abort - if (isEmpty(requestsToMake) === true) { - logger.info('No "%s" to make.', this.description) - return { goodPods: [], badPods: [] } - } - - logger.info('Making "%s" to friends.', this.description) - - const goodPods: number[] = [] - const badPods: number[] = [] - - return Promise.map(Object.keys(requestsToMake), hashKey => { - const requestToMake = requestsToMake[hashKey] - const toPod: PodInstance = requestToMake.toPod - - return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) - .then(() => { - logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) - goodPods.push(requestToMake.toPod.id) - - this.afterRequestHook() - - // Remove the pod id of these request ids - return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) - }) - .catch(err => { - badPods.push(requestToMake.toPod.id) - logger.info('Cannot make request to %s.', toPod.host, err) - }) - }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods })) - }) - .then(({ goodPods, badPods }) => { - this.afterRequestsHook() - - // All the requests were made, we update the pods score - return db.Pod.updatePodsScore(goodPods, badPods) - }) - .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })) + protected async makeRequests () { + let requestsGrouped: T + + try { + requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) + } catch (err) { + logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }) + throw err + } + + // We want to group requests by destinations pod and endpoint + const requestsToMake = this.buildRequestsObjects(requestsGrouped) + + // If there are no requests, abort + if (isEmpty(requestsToMake) === true) { + logger.info('No "%s" to make.', this.description) + return { goodPods: [], badPods: [] } + } + + logger.info('Making "%s" to friends.', this.description) + + const goodPods: number[] = [] + const badPods: number[] = [] + + await Bluebird.map(Object.keys(requestsToMake), async hashKey => { + const requestToMake = requestsToMake[hashKey] + const toPod: PodInstance = requestToMake.toPod + + try { + await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) + logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) + goodPods.push(requestToMake.toPod.id) + + this.afterRequestHook() + + // Remove the pod id of these request ids + await this.getRequestToPodModel() + .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) + } catch (err) { + badPods.push(requestToMake.toPod.id) + logger.info('Cannot make request to %s.', toPod.host, err) + } + }, { concurrency: REQUESTS_IN_PARALLEL }) + + this.afterRequestsHook() + + // All the requests were made, we update the pods score + await db.Pod.updatePodsScore(goodPods, badPods) } protected afterRequestHook () { - // Nothing to do, let children reimplement it + // Nothing to do, let children re-implement it } protected afterRequestsHook () { - // Nothing to do, let children reimplement it + // Nothing to do, let children re-implement it } } diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts index 696875dcf..c3f7f6429 100644 --- a/server/lib/request/request-scheduler.ts +++ b/server/lib/request/request-scheduler.ts @@ -37,8 +37,8 @@ class RequestScheduler extends AbstractRequestScheduler { buildRequestsObjects (requestsGrouped: RequestsGrouped) { const requestsToMakeGrouped: RequestsObjects = {} - Object.keys(requestsGrouped).forEach(toPodId => { - requestsGrouped[toPodId].forEach(data => { + for (const toPodId of Object.keys(requestsGrouped)) { + for (const data of requestsGrouped[toPodId]) { const request = data.request const pod = data.pod const hashKey = toPodId + request.endpoint @@ -54,13 +54,13 @@ class RequestScheduler extends AbstractRequestScheduler { requestsToMakeGrouped[hashKey].ids.push(request.id) requestsToMakeGrouped[hashKey].datas.push(request.request) - }) - }) + } + } return requestsToMakeGrouped } - createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { + async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { // If there are no destination pods abort if (toIds.length === 0) return undefined @@ -76,10 +76,8 @@ class RequestScheduler extends AbstractRequestScheduler { transaction } - return db.Request.create(createQuery, dbRequestOptions) - .then(request => { - return request.setPods(toIds, dbRequestOptions) - }) + const request = await db.Request.create(createQuery, dbRequestOptions) + await request.setPods(toIds, dbRequestOptions) } // --------------------------------------------------------------------------- diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts index 680232732..5f21287f0 100644 --- a/server/lib/request/request-video-event-scheduler.ts +++ b/server/lib/request/request-video-event-scheduler.ts @@ -59,8 +59,8 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { - eventRequests[toPodId].forEach(eventToProcess => { + for (const toPodId of Object.keys(eventRequests)) { + for (const eventToProcess of eventRequests[toPodId]) { if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} if (!requestsToMakeGrouped[toPodId]) { @@ -81,17 +81,17 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler { + for (const toPodId of Object.keys(eventsPerVideoPerPod)) { const eventsForPod = eventsPerVideoPerPod[toPodId] - Object.keys(eventsForPod).forEach(uuid => { + for (const uuid of Object.keys(eventsForPod)) { const eventsForVideo = eventsForPod[uuid] - Object.keys(eventsForVideo).forEach(eventType => { + for (const eventType of Object.keys(eventsForVideo)) { requestsToMakeGrouped[toPodId].datas.push({ data: { uuid, @@ -99,9 +99,9 @@ class RequestVideoEventScheduler extends AbstractRequestScheduler = {} - Object.keys(requests).forEach(toPodId => { - requests[toPodId].forEach(data => { + for (const toPodId of Object.keys(requests)) { + for (const data of requests[toPodId]) { const request = data.request const video = data.video const pod = data.pod @@ -105,39 +105,39 @@ class RequestVideoQaduScheduler extends AbstractRequestScheduler { - Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => { + for (const hashKey of Object.keys(requestsToMakeGrouped)) { + for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) { const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] requestsToMakeGrouped[hashKey].datas.push({ data: videoData }) - }) + } // We don't need it anymore, it was just to build our data array delete requestsToMakeGrouped[hashKey].videos - }) + } return requestsToMakeGrouped } - createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { + async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { const dbRequestOptions: Sequelize.BulkCreateOptions = {} if (transaction) dbRequestOptions.transaction = transaction // Send the update to all our friends - return db.Pod.listAllIds(transaction).then(podIds => { - const queries = [] - podIds.forEach(podId => { - queries.push({ type, videoId, podId }) - }) - - return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) - }) + const podIds = await db.Pod.listAllIds(transaction) + const queries = [] + for (const podId of podIds) { + queries.push({ type, videoId, podId }) + } + + await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) + return undefined } } diff --git a/server/lib/user.ts b/server/lib/user.ts index 8609e72d8..a92f4777b 100644 --- a/server/lib/user.ts +++ b/server/lib/user.ts @@ -3,40 +3,36 @@ import { UserInstance } from '../models' import { addVideoAuthorToFriends } from './friends' import { createVideoChannel } from './video-channel' -function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { - return db.sequelize.transaction(t => { +async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) { + const res = await db.sequelize.transaction(async t => { const userOptions = { transaction: t, validate: validateUser } - return user.save(userOptions) - .then(user => { - const author = db.Author.build({ - name: user.username, - podId: null, // It is our pod - userId: user.id - }) - - return author.save({ transaction: t }) - .then(author => ({ author, user })) - }) - .then(({ author, user }) => { - const remoteVideoAuthor = author.toAddRemoteJSON() - - // Now we'll add the video channel's meta data to our friends - return addVideoAuthorToFriends(remoteVideoAuthor, t) - .then(() => ({ author, user })) - }) - .then(({ author, user }) => { - const videoChannelInfo = { - name: `Default ${user.username} channel` - } - - return createVideoChannel(videoChannelInfo, author, t) - .then(videoChannel => ({ author, user, videoChannel })) - }) + const userCreated = await user.save(userOptions) + const authorInstance = db.Author.build({ + name: userCreated.username, + podId: null, // It is our pod + userId: userCreated.id + }) + + const authorCreated = await authorInstance.save({ transaction: t }) + + const remoteVideoAuthor = authorCreated.toAddRemoteJSON() + + // Now we'll add the video channel's meta data to our friends + const author = await addVideoAuthorToFriends(remoteVideoAuthor, t) + + const videoChannelInfo = { + name: `Default ${userCreated.username} channel` + } + const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t) + + return { author, videoChannel } }) + + return res } // --------------------------------------------------------------------------- diff --git a/server/middlewares/secure.ts b/server/middlewares/secure.ts index f7424c9c3..5dd809f15 100644 --- a/server/middlewares/secure.ts +++ b/server/middlewares/secure.ts @@ -8,45 +8,44 @@ import { } from '../helpers' import { PodSignature } from '../../shared' -function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) { +async function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) { const signatureObject: PodSignature = req.body.signature const host = signatureObject.host - db.Pod.loadByHost(host) - .then(pod => { - if (pod === null) { - logger.error('Unknown pod %s.', host) - return res.sendStatus(403) - } - - logger.debug('Checking signature from %s.', host) + try { + const pod = await db.Pod.loadByHost(host) + if (pod === null) { + logger.error('Unknown pod %s.', host) + return res.sendStatus(403) + } - let signatureShouldBe - // If there is data in the body the sender used it for its signature - // If there is no data we just use its host as signature - if (req.body.data) { - signatureShouldBe = req.body.data - } else { - signatureShouldBe = host - } + logger.debug('Checking signature from %s.', host) - const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature) + let signatureShouldBe + // If there is data in the body the sender used it for its signature + // If there is no data we just use its host as signature + if (req.body.data) { + signatureShouldBe = req.body.data + } else { + signatureShouldBe = host + } - if (signatureOk === true) { - res.locals.secure = { - pod - } + const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature) - return next() + if (signatureOk === true) { + res.locals.secure = { + pod } - logger.error('Signature is not okay in body for %s.', signatureObject.host) - return res.sendStatus(403) - }) - .catch(err => { - logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature }) - return res.sendStatus(500) - }) + return next() + } + + logger.error('Signature is not okay in body for %s.', signatureObject.host) + return res.sendStatus(403) + } catch (err) { + logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature }) + return res.sendStatus(500) + } } // --------------------------------------------------------------------------- diff --git a/server/models/pod/pod.ts b/server/models/pod/pod.ts index e4d7db48a..6619726af 100644 --- a/server/models/pod/pod.ts +++ b/server/models/pod/pod.ts @@ -247,20 +247,21 @@ updatePodsScore = function (goodPods: number[], badPods: number[]) { // --------------------------------------------------------------------------- // Remove pods with a score of 0 (too many requests where they were unreachable) -function removeBadPods () { - return listBadPods() - .then(pods => { - const podsRemovePromises = pods.map(pod => pod.destroy()) - return Promise.all(podsRemovePromises).then(() => pods.length) - }) - .then(numberOfPodsRemoved => { - if (numberOfPodsRemoved) { - logger.info('Removed %d pods.', numberOfPodsRemoved) - } else { - logger.info('No need to remove bad pods.') - } - }) - .catch(err => { - logger.error('Cannot remove bad pods.', err) - }) +async function removeBadPods () { + try { + const pods = await listBadPods() + + const podsRemovePromises = pods.map(pod => pod.destroy()) + await Promise.all(podsRemovePromises) + + const numberOfPodsRemoved = pods.length + + if (numberOfPodsRemoved) { + logger.info('Removed %d pods.', numberOfPodsRemoved) + } else { + logger.info('No need to remove bad pods.') + } + } catch (err) { + logger.error('Cannot remove bad pods.', err) + } } -- 2.41.0