import * as crypto from 'crypto'
-import * as Promise from 'bluebird'
import { join } from 'path'
import {
return isValid
}
-function sign (data: string|Object) {
+async function sign (data: string|Object) {
const sign = crypto.createSign(SIGNATURE_ALGORITHM)
let dataString: string
dataString = JSON.stringify(data)
} catch (err) {
logger.error('Cannot sign data.', err)
- return Promise.resolve('')
+ return ''
}
}
sign.update(dataString, 'utf8')
- return getMyPrivateCert().then(myKey => {
- return sign.sign(myKey, SIGNATURE_ENCODING)
- })
+ const myKey = await getMyPrivateCert()
+ return await sign.sign(myKey, SIGNATURE_ENCODING)
}
function comparePassword (plainPassword: string, hashPassword: string) {
return bcryptComparePromise(plainPassword, hashPassword)
}
-function createCertsIfNotExist () {
- return certsExist().then(exist => {
- if (exist === true) {
- return undefined
- }
+async function createCertsIfNotExist () {
+ const exist = await certsExist()
+ if (exist === true) {
+ return undefined
+ }
- return createCerts()
- })
+ return await createCerts()
}
-function cryptPassword (password: string) {
- return bcryptGenSaltPromise(BCRYPT_SALT_SIZE).then(salt => bcryptHashPromise(password, salt))
+async function cryptPassword (password: string) {
+ const salt = await bcryptGenSaltPromise(BCRYPT_SALT_SIZE)
+
+ return await bcryptHashPromise(password, salt)
}
function getMyPrivateCert () {
// ---------------------------------------------------------------------------
-function certsExist () {
+async function certsExist () {
const certPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
// If there is an error the certificates do not exist
- return accessPromise(certPath)
- .then(() => true)
- .catch(() => false)
+ try {
+ await accessPromise(certPath)
+
+ return true
+ } catch {
+ return false
+ }
}
-function createCerts () {
- return certsExist().then(exist => {
- if (exist === true) {
- const errorMessage = 'Certs already exist.'
- logger.warning(errorMessage)
- throw new Error(errorMessage)
- }
+async function createCerts () {
+ const exist = await certsExist()
+ if (exist === true) {
+ const errorMessage = 'Certs already exist.'
+ logger.warning(errorMessage)
+ throw new Error(errorMessage)
+ }
- logger.info('Generating a RSA key...')
+ logger.info('Generating a RSA key...')
- const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
- const genRsaOptions = {
- 'out': privateCertPath,
- '2048': false
- }
- return opensslExecPromise('genrsa', genRsaOptions)
- .then(() => {
- logger.info('RSA key generated.')
- logger.info('Managing public key...')
-
- const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub')
- const rsaOptions = {
- 'in': privateCertPath,
- 'pubout': true,
- 'out': publicCertPath
- }
- return opensslExecPromise('rsa', rsaOptions)
- .then(() => logger.info('Public key managed.'))
- .catch(err => {
- logger.error('Cannot create public key on this pod.')
- throw err
- })
- })
- .catch(err => {
- logger.error('Cannot create private key on this pod.')
- throw err
- })
- })
+ const privateCertPath = join(CONFIG.STORAGE.CERT_DIR, PRIVATE_CERT_NAME)
+ const genRsaOptions = {
+ 'out': privateCertPath,
+ '2048': false
+ }
+
+ await opensslExecPromise('genrsa', genRsaOptions)
+ logger.info('RSA key generated.')
+ logger.info('Managing public key...')
+
+ const publicCertPath = join(CONFIG.STORAGE.CERT_DIR, 'peertube.pub')
+ const rsaOptions = {
+ 'in': privateCertPath,
+ 'pubout': true,
+ 'out': publicCertPath
+ }
+
+ await opensslExecPromise('rsa', rsaOptions)
}
signature
}
- // If there are data informations
+ // If there are data information
if (params.data) {
requestParams.json.data = params.data
}
import { VideoResolution } from '../../shared/models/videos/video-resolution.enum'
function badRequest (req: express.Request, res: express.Response, next: express.NextFunction) {
- res.type('json').status(400).end()
+ return res.type('json').status(400).end()
}
-function generateRandomString (size: number) {
- return pseudoRandomBytesPromise(size).then(raw => raw.toString('hex'))
+async function generateRandomString (size: number) {
+ const raw = await pseudoRandomBytesPromise(size)
+
+ return raw.toString('hex')
}
interface FormattableToJSON {
return res
}
-function isSignupAllowed () {
+async function isSignupAllowed () {
if (CONFIG.SIGNUP.ENABLED === false) {
- return Promise.resolve(false)
+ return false
}
// No limit and signup is enabled
if (CONFIG.SIGNUP.LIMIT === -1) {
- return Promise.resolve(true)
+ return true
}
- return db.User.countTotal().then(totalUsers => {
- return totalUsers < CONFIG.SIGNUP.LIMIT
- })
+ const totalUsers = await db.User.countTotal()
+
+ return totalUsers < CONFIG.SIGNUP.LIMIT
}
function computeResolutionsToTranscode (videoFileHeight: number) {
// Check the available codecs
// We get CONFIG by param to not import it in this file (import orders)
-function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) {
+async function checkFFmpeg (CONFIG: { TRANSCODING: { ENABLED: boolean } }) {
const Ffmpeg = require('fluent-ffmpeg')
const getAvailableCodecsPromise = promisify0(Ffmpeg.getAvailableCodecs)
- getAvailableCodecsPromise()
- .then(codecs => {
- if (CONFIG.TRANSCODING.ENABLED === false) return undefined
-
- const canEncode = [ 'libx264' ]
- canEncode.forEach(codec => {
- if (codecs[codec] === undefined) {
- throw new Error('Unknown codec ' + codec + ' in FFmpeg.')
- }
-
- if (codecs[codec].canEncode !== true) {
- throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg')
- }
- })
- })
+ const codecs = await getAvailableCodecsPromise()
+ if (CONFIG.TRANSCODING.ENABLED === false) return undefined
+
+ const canEncode = [ 'libx264' ]
+ for (const codec of canEncode) {
+ if (codecs[codec] === undefined) {
+ throw new Error('Unknown codec ' + codec + ' in FFmpeg.')
+ }
+
+ if (codecs[codec].canEncode !== true) {
+ throw new Error('Unavailable encode codec ' + codec + ' in FFmpeg')
+ }
+ }
}
// We get db by param to not import it in this file (import orders)
-function clientsExist (OAuthClient: OAuthClientModel) {
- return OAuthClient.countTotal().then(totalClients => {
- return totalClients !== 0
- })
+async function clientsExist (OAuthClient: OAuthClientModel) {
+ const totalClients = await OAuthClient.countTotal()
+
+ return totalClients !== 0
}
// We get db by param to not import it in this file (import orders)
-function usersExist (User: UserModel) {
- return User.countTotal().then(totalUsers => {
- return totalUsers !== 0
- })
+async function usersExist (User: UserModel) {
+ const totalUsers = await User.countTotal()
+
+ return totalUsers !== 0
}
// ---------------------------------------------------------------------------
import { flattenDepth } from 'lodash'
require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string
import * as Sequelize from 'sequelize'
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
import { CONFIG } from './constants'
// Do not use barrel, we need to load database first
database.sequelize = sequelize
-database.init = (silent: boolean) => {
+database.init = async (silent: boolean) => {
const modelDirectory = join(__dirname, '..', 'models')
- return getModelFiles(modelDirectory).then(filePaths => {
- filePaths.forEach(filePath => {
- const model = sequelize.import(filePath)
+ const filePaths = await getModelFiles(modelDirectory)
- database[model['name']] = model
- })
+ for (const filePath of filePaths) {
+ const model = sequelize.import(filePath)
- Object.keys(database).forEach(modelName => {
- if ('associate' in database[modelName]) {
- database[modelName].associate(database)
- }
- })
+ database[model['name']] = model
+ }
- if (!silent) logger.info('Database %s is ready.', dbname)
+ for (const modelName of Object.keys(database)) {
+ if ('associate' in database[modelName]) {
+ database[modelName].associate(database)
+ }
+ }
- return undefined
- })
+ if (!silent) logger.info('Database %s is ready.', dbname)
+
+ return undefined
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
-function getModelFiles (modelDirectory: string) {
- return readdirPromise(modelDirectory)
- .then(files => {
- const directories: string[] = files.filter(directory => {
- // Find directories
- if (
- directory.endsWith('.js.map') ||
- directory === 'index.js' || directory === 'index.ts' ||
- directory === 'utils.js' || directory === 'utils.ts'
- ) return false
-
- return true
- })
+async function getModelFiles (modelDirectory: string) {
+ const files = await readdirPromise(modelDirectory)
+ const directories = files.filter(directory => {
+ // Find directories
+ if (
+ directory.endsWith('.js.map') ||
+ directory === 'index.js' || directory === 'index.ts' ||
+ directory === 'utils.js' || directory === 'utils.ts'
+ ) return false
+
+ return true
+ })
- return directories
- })
- .then(directories => {
- const tasks = []
+ const tasks: Bluebird<any>[] = []
- // For each directory we read it and append model in the modelFilePaths array
- directories.forEach(directory => {
- const modelDirectoryPath = join(modelDirectory, directory)
+ // For each directory we read it and append model in the modelFilePaths array
+ for (const directory of directories) {
+ const modelDirectoryPath = join(modelDirectory, directory)
- const promise = readdirPromise(modelDirectoryPath).then(files => {
- const filteredFiles = files.filter(file => {
+ const promise = readdirPromise(modelDirectoryPath)
+ .then(files => {
+ const filteredFiles = files
+ .filter(file => {
if (
file === 'index.js' || file === 'index.ts' ||
file === 'utils.js' || file === 'utils.ts' ||
) return false
return true
- }).map(file => join(modelDirectoryPath, file))
-
- return filteredFiles
- })
+ })
+ .map(file => join(modelDirectoryPath, file))
- tasks.push(promise)
+ return filteredFiles
})
- return Promise.all(tasks)
- })
- .then((filteredFiles: string[][]) => {
- return flattenDepth<string>(filteredFiles, 1)
- })
+ tasks.push(promise)
+ }
+
+ const filteredFilesArray: string[][] = await Promise.all(tasks)
+ return flattenDepth<string>(filteredFilesArray, 1)
}
-// Constants first, databse in second!
+// Constants first, database in second!
export * from './constants'
export * from './database'
export * from './checker'
import * as passwordGenerator from 'password-generator'
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
import { database as db } from './database'
import { USER_ROLES, CONFIG, LAST_MIGRATION_VERSION, CACHE } from './constants'
import { logger, createCertsIfNotExist, mkdirpPromise, rimrafPromise } from '../helpers'
import { createUserAuthorAndChannel } from '../lib'
-function installApplication () {
- return db.sequelize.sync()
- .then(() => removeCacheDirectories())
- .then(() => createDirectoriesIfNotExist())
- .then(() => createCertsIfNotExist())
- .then(() => createOAuthClientIfNotExist())
- .then(() => createOAuthAdminIfNotExist())
+async function installApplication () {
+ await db.sequelize.sync()
+ await removeCacheDirectories()
+ await createDirectoriesIfNotExist()
+ await createCertsIfNotExist()
+ await createOAuthClientIfNotExist()
+ await createOAuthAdminIfNotExist()
}
// ---------------------------------------------------------------------------
function removeCacheDirectories () {
const cacheDirectories = CACHE.DIRECTORIES
- const tasks = []
+ const tasks: Bluebird<any>[] = []
// Cache directories
- Object.keys(cacheDirectories).forEach(key => {
+ for (const key of Object.keys(cacheDirectories)) {
const dir = cacheDirectories[key]
tasks.push(rimrafPromise(dir))
- })
+ }
return Promise.all(tasks)
}
const cacheDirectories = CACHE.DIRECTORIES
const tasks = []
- Object.keys(storage).forEach(key => {
+ for (const key of Object.keys(storage)) {
const dir = storage[key]
tasks.push(mkdirpPromise(dir))
- })
+ }
// Cache directories
- Object.keys(cacheDirectories).forEach(key => {
+ for (const key of Object.keys(cacheDirectories)) {
const dir = cacheDirectories[key]
tasks.push(mkdirpPromise(dir))
- })
+ }
return Promise.all(tasks)
}
-function createOAuthClientIfNotExist () {
- return clientsExist(db.OAuthClient).then(exist => {
- // Nothing to do, clients already exist
- if (exist === true) return undefined
-
- logger.info('Creating a default OAuth Client.')
-
- const id = passwordGenerator(32, false, /[a-z0-9]/)
- const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/)
- const client = db.OAuthClient.build({
- clientId: id,
- clientSecret: secret,
- grants: [ 'password', 'refresh_token' ],
- redirectUris: null
- })
+async function createOAuthClientIfNotExist () {
+ const exist = await clientsExist(db.OAuthClient)
+ // Nothing to do, clients already exist
+ if (exist === true) return undefined
- return client.save().then(createdClient => {
- logger.info('Client id: ' + createdClient.clientId)
- logger.info('Client secret: ' + createdClient.clientSecret)
+ logger.info('Creating a default OAuth Client.')
- return undefined
- })
+ const id = passwordGenerator(32, false, /[a-z0-9]/)
+ const secret = passwordGenerator(32, false, /[a-zA-Z0-9]/)
+ const client = db.OAuthClient.build({
+ clientId: id,
+ clientSecret: secret,
+ grants: [ 'password', 'refresh_token' ],
+ redirectUris: null
})
-}
-function createOAuthAdminIfNotExist () {
- return usersExist(db.User).then(exist => {
- // Nothing to do, users already exist
- if (exist === true) return undefined
+ const createdClient = await client.save()
+ logger.info('Client id: ' + createdClient.clientId)
+ logger.info('Client secret: ' + createdClient.clientSecret)
- logger.info('Creating the administrator.')
+ return undefined
+}
- const username = 'root'
- const role = USER_ROLES.ADMIN
- const email = CONFIG.ADMIN.EMAIL
- let validatePassword = true
- let password = ''
+async function createOAuthAdminIfNotExist () {
+ const exist = await usersExist(db.User)
+ // Nothing to do, users already exist
+ if (exist === true) return undefined
- // Do not generate a random password for tests
- if (process.env.NODE_ENV === 'test') {
- password = 'test'
+ logger.info('Creating the administrator.')
- if (process.env.NODE_APP_INSTANCE) {
- password += process.env.NODE_APP_INSTANCE
- }
+ const username = 'root'
+ const role = USER_ROLES.ADMIN
+ const email = CONFIG.ADMIN.EMAIL
+ let validatePassword = true
+ let password = ''
- // Our password is weak so do not validate it
- validatePassword = false
- } else {
- password = passwordGenerator(8, true)
- }
+ // Do not generate a random password for tests
+ if (process.env.NODE_ENV === 'test') {
+ password = 'test'
- const userData = {
- username,
- email,
- password,
- role,
- videoQuota: -1
+ if (process.env.NODE_APP_INSTANCE) {
+ password += process.env.NODE_APP_INSTANCE
}
- const user = db.User.build(userData)
- return createUserAuthorAndChannel(user, validatePassword)
- .then(({ user }) => {
- logger.info('Username: ' + username)
- logger.info('User password: ' + password)
-
- logger.info('Creating Application table.')
- return db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION })
- })
- })
+ // Our password is weak so do not validate it
+ validatePassword = false
+ } else {
+ password = passwordGenerator(8, true)
+ }
+
+ const userData = {
+ username,
+ email,
+ password,
+ role,
+ videoQuota: -1
+ }
+ const user = db.User.build(userData)
+
+ await createUserAuthorAndChannel(user, validatePassword)
+ logger.info('Username: ' + username)
+ logger.info('User password: ' + password)
+
+ logger.info('Creating Application table.')
+ await db.Application.create({ migrationVersion: LAST_MIGRATION_VERSION })
}
import * as path from 'path'
-import * as Promise from 'bluebird'
import { database as db } from './database'
import { LAST_MIGRATION_VERSION } from './constants'
import { logger, readdirPromise } from '../helpers'
-function migrate () {
- const p = db.sequelize.getQueryInterface().showAllTables()
- .then(tables => {
- // No tables, we don't need to migrate anything
- // The installer will do that
- if (tables.length === 0) throw null
- })
- .then(() => {
- return db.Application.loadMigrationVersion()
- })
- .then(actualVersion => {
- if (actualVersion === null) {
- return db.Application.create({ migrationVersion: 0 }).then(() => 0)
- }
+async function migrate () {
+ const tables = await db.sequelize.getQueryInterface().showAllTables()
- return actualVersion
- })
- .then(actualVersion => {
- // No need migrations, abort
- if (actualVersion >= LAST_MIGRATION_VERSION) throw null
+ // No tables, we don't need to migrate anything
+ // The installer will do that
+ if (tables.length === 0) return
- return actualVersion
- })
- .then(actualVersion => {
- // If there are a new migration scripts
- logger.info('Begin migrations.')
+ let actualVersion = await db.Application.loadMigrationVersion()
+ if (actualVersion === null) {
+ await db.Application.create({ migrationVersion: 0 })
+ actualVersion = 0
+ }
- return getMigrationScripts().then(migrationScripts => ({ actualVersion, migrationScripts }))
- })
- .then(({ actualVersion, migrationScripts }) => {
- return Promise.each(migrationScripts, entity => executeMigration(actualVersion, entity))
- })
- .then(() => {
- logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION)
- })
- .catch(err => {
- if (err === null) return undefined
+ // No need migrations, abort
+ if (actualVersion >= LAST_MIGRATION_VERSION) return
- throw err
- })
+ // If there are a new migration scripts
+ logger.info('Begin migrations.')
+
+ const migrationScripts = await getMigrationScripts()
- return p
+ for (const migrationScript of migrationScripts) {
+ await executeMigration(actualVersion, migrationScript)
+ }
+
+ logger.info('Migrations finished. New migration version schema: %s', LAST_MIGRATION_VERSION)
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
-function getMigrationScripts () {
- return readdirPromise(path.join(__dirname, 'migrations')).then(files => {
- const filesToMigrate: {
- version: string,
- script: string
- }[] = []
-
- files
- .filter(file => file.endsWith('.js.map') === false)
- .forEach(file => {
- // Filename is something like 'version-blabla.js'
- const version = file.split('-')[0]
- filesToMigrate.push({
- version,
- script: file
- })
+async function getMigrationScripts () {
+ const files = await readdirPromise(path.join(__dirname, 'migrations'))
+ const filesToMigrate: {
+ version: string,
+ script: string
+ }[] = []
+
+ files
+ .filter(file => file.endsWith('.js.map') === false)
+ .forEach(file => {
+ // Filename is something like 'version-blabla.js'
+ const version = file.split('-')[0]
+ filesToMigrate.push({
+ version,
+ script: file
})
+ })
- return filesToMigrate
- })
+ return filesToMigrate
}
-function executeMigration (actualVersion: number, entity: { version: string, script: string }) {
+async function executeMigration (actualVersion: number, entity: { version: string, script: string }) {
const versionScript = parseInt(entity.version, 10)
// Do not execute old migration scripts
const migrationScript = require(path.join(__dirname, 'migrations', migrationScriptName))
- return db.sequelize.transaction(t => {
+ await db.sequelize.transaction(async t => {
const options = {
transaction: t,
queryInterface: db.sequelize.getQueryInterface(),
db
}
- return migrationScript.up(options)
- .then(() => {
- // Update the new migration version
- return db.Application.updateMigrationVersion(versionScript, t)
- })
+ await migrationScript.up(options)
+
+ // Update the new migration version
+ await db.Application.updateMigrationVersion(versionScript, t)
})
}
import * as asyncLRU from 'async-lru'
import { join } from 'path'
import { createWriteStream } from 'fs'
-import * as Promise from 'bluebird'
import { database as db, CONFIG, CACHE } from '../../initializers'
import { logger, unlinkPromise } from '../../helpers'
})
}
- private loadPreviews (key: string) {
- return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key)
- .then(video => {
- if (!video) return undefined
+ private async loadPreviews (key: string) {
+ const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(key)
+ if (!video) return undefined
- if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName())
+ if (video.isOwned()) return join(CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName())
- return this.saveRemotePreviewAndReturnPath(video)
- })
+ const res = await this.saveRemotePreviewAndReturnPath(video)
+
+ return res
}
private saveRemotePreviewAndReturnPath (video: VideoInstance) {
import * as request from 'request'
import * as Sequelize from 'sequelize'
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
import { join } from 'path'
import { database as db } from '../initializers/database'
function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
const tasks = []
- eventsParams.forEach(eventParams => {
+ for (const eventParams of eventsParams) {
tasks.push(addEventToRemoteVideo(eventParams, transaction))
- })
+ }
return Promise.all(tasks)
}
-function hasFriends () {
- return db.Pod.countAll().then(count => count !== 0)
+async function hasFriends () {
+ const count = await db.Pod.countAll()
+
+ return count !== 0
}
-function makeFriends (hosts: string[]) {
+async function makeFriends (hosts: string[]) {
const podsScore = {}
logger.info('Make friends!')
- return getMyPublicCert()
- .then(cert => {
- return Promise.each(hosts, host => computeForeignPodsList(host, podsScore)).then(() => cert)
- })
- .then(cert => {
- logger.debug('Pods scores computed.', { podsScore: podsScore })
- const podsList = computeWinningPods(hosts, podsScore)
- logger.debug('Pods that we keep.', { podsToKeep: podsList })
+ const cert = await getMyPublicCert()
- return makeRequestsToWinningPods(cert, podsList)
- })
+ for (const host of hosts) {
+ await computeForeignPodsList(host, podsScore)
+ }
+
+ logger.debug('Pods scores computed.', { podsScore: podsScore })
+
+ const podsList = computeWinningPods(hosts, podsScore)
+ logger.debug('Pods that we keep.', { podsToKeep: podsList })
+
+ return makeRequestsToWinningPods(cert, podsList)
}
-function quitFriends () {
+async function quitFriends () {
// Stop pool requests
requestScheduler.deactivate()
- return requestScheduler.flush()
- .then(() => {
- return requestVideoQaduScheduler.flush()
- })
- .then(() => {
- return db.Pod.list()
- })
- .then(pods => {
- const requestParams = {
- method: 'POST' as 'POST',
- path: '/api/' + API_VERSION + '/remote/pods/remove',
- toPod: null
- }
+ try {
+ await requestScheduler.flush()
+
+ await requestVideoQaduScheduler.flush()
+
+ const pods = await db.Pod.list()
+ const requestParams = {
+ method: 'POST' as 'POST',
+ path: '/api/' + API_VERSION + '/remote/pods/remove',
+ toPod: null
+ }
- // Announce we quit them
- // We don't care if the request fails
- // The other pod will exclude us automatically after a while
- return Promise.map(pods, pod => {
+ // Announce we quit them
+ // We don't care if the request fails
+ // The other pod will exclude us automatically after a while
+ try {
+ await Bluebird.map(pods, pod => {
requestParams.toPod = pod
return makeSecureRequest(requestParams)
}, { concurrency: REQUESTS_IN_PARALLEL })
- .then(() => pods)
- .catch(err => {
- logger.error('Some errors while quitting friends.', err)
- // Don't stop the process
- return pods
- })
- })
- .then(pods => {
- const tasks = []
- pods.forEach(pod => tasks.push(pod.destroy()))
+ } catch (err) { // Don't stop the process
+ logger.error('Some errors while quitting friends.', err)
+ }
- return Promise.all(pods)
- })
- .then(() => {
- logger.info('Removed all remote videos.')
- // Don't forget to re activate the scheduler, even if there was an error
- return requestScheduler.activate()
- })
- .finally(() => requestScheduler.activate())
+ const tasks = []
+ for (const pod of pods) {
+ tasks.push(pod.destroy())
+ }
+ await Promise.all(pods)
+
+ logger.info('Removed all remote videos.')
+
+ requestScheduler.activate()
+ } catch (err) {
+ // Don't forget to re activate the scheduler, even if there was an error
+ requestScheduler.activate()
+
+ throw err
+ }
}
-function sendOwnedDataToPod (podId: number) {
+async function sendOwnedDataToPod (podId: number) {
// First send authors
- return sendOwnedAuthorsToPod(podId)
- .then(() => sendOwnedChannelsToPod(podId))
- .then(() => sendOwnedVideosToPod(podId))
+ await sendOwnedAuthorsToPod(podId)
+ await sendOwnedChannelsToPod(podId)
+ await sendOwnedVideosToPod(podId)
+}
+
+async function sendOwnedChannelsToPod (podId: number) {
+ const videoChannels = await db.VideoChannel.listOwned()
+
+ const tasks: Promise<any>[] = []
+ for (const videoChannel of videoChannels) {
+ const remoteVideoChannel = videoChannel.toAddRemoteJSON()
+ const options = {
+ type: 'add-channel' as 'add-channel',
+ endpoint: REQUEST_ENDPOINTS.VIDEOS,
+ data: remoteVideoChannel,
+ toIds: [ podId ],
+ transaction: null
+ }
+
+ const p = createRequest(options)
+ tasks.push(p)
+ }
+
+ await Promise.all(tasks)
}
-function sendOwnedChannelsToPod (podId: number) {
- return db.VideoChannel.listOwned()
- .then(videoChannels => {
- const tasks = []
- videoChannels.forEach(videoChannel => {
- const remoteVideoChannel = videoChannel.toAddRemoteJSON()
- const options = {
- type: 'add-channel' as 'add-channel',
- endpoint: REQUEST_ENDPOINTS.VIDEOS,
- data: remoteVideoChannel,
- toIds: [ podId ],
- transaction: null
- }
+async function sendOwnedAuthorsToPod (podId: number) {
+ const authors = await db.Author.listOwned()
+ const tasks: Promise<any>[] = []
- const p = createRequest(options)
- tasks.push(p)
- })
+ for (const author of authors) {
+ const remoteAuthor = author.toAddRemoteJSON()
+ const options = {
+ type: 'add-author' as 'add-author',
+ endpoint: REQUEST_ENDPOINTS.VIDEOS,
+ data: remoteAuthor,
+ toIds: [ podId ],
+ transaction: null
+ }
- return Promise.all(tasks)
- })
+ const p = createRequest(options)
+ tasks.push(p)
+ }
+
+ await Promise.all(tasks)
}
-function sendOwnedAuthorsToPod (podId: number) {
- return db.Author.listOwned()
- .then(authors => {
- const tasks = []
- authors.forEach(author => {
- const remoteAuthor = author.toAddRemoteJSON()
+async function sendOwnedVideosToPod (podId: number) {
+ const videosList = await db.Video.listOwnedAndPopulateAuthorAndTags()
+ const tasks: Bluebird<any>[] = []
+
+ for (const video of videosList) {
+ const promise = video.toAddRemoteJSON()
+ .then(remoteVideo => {
const options = {
- type: 'add-author' as 'add-author',
+ type: 'add-video' as 'add-video',
endpoint: REQUEST_ENDPOINTS.VIDEOS,
- data: remoteAuthor,
+ data: remoteVideo,
toIds: [ podId ],
transaction: null
}
-
- const p = createRequest(options)
- tasks.push(p)
+ return createRequest(options)
})
-
- return Promise.all(tasks)
- })
-}
-
-function sendOwnedVideosToPod (podId: number) {
- return db.Video.listOwnedAndPopulateAuthorAndTags()
- .then(videosList => {
- const tasks = []
- videosList.forEach(video => {
- const promise = video.toAddRemoteJSON()
- .then(remoteVideo => {
- const options = {
- type: 'add-video' as 'add-video',
- endpoint: REQUEST_ENDPOINTS.VIDEOS,
- data: remoteVideo,
- toIds: [ podId ],
- transaction: null
- }
- return createRequest(options)
- })
- .catch(err => {
- logger.error('Cannot convert video to remote.', err)
- // Don't break the process
- return undefined
- })
-
- tasks.push(promise)
+ .catch(err => {
+ logger.error('Cannot convert video to remote.', err)
+ // Don't break the process
+ return undefined
})
- return Promise.all(tasks)
- })
+ tasks.push(promise)
+ }
+
+ await Promise.all(tasks)
}
function fetchRemotePreview (video: VideoInstance) {
return request.get(REMOTE_SCHEME.HTTP + '://' + host + path)
}
-function removeFriend (pod: PodInstance) {
+async function removeFriend (pod: PodInstance) {
const requestParams = {
method: 'POST' as 'POST',
path: '/api/' + API_VERSION + '/remote/pods/remove',
toPod: pod
}
- return makeSecureRequest(requestParams)
- .catch(err => logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err))
- .then(() => pod.destroy())
- .then(() => logger.info('Removed friend %s.', pod.host))
- .catch(err => logger.error('Cannot destroy friend %s.', pod.host, err))
+ try {
+ await makeSecureRequest(requestParams)
+ } catch (err) {
+ logger.warn('Cannot notify friends %s we are quitting him.', pod.host, err)
+ }
+
+ try {
+ await pod.destroy()
+
+ logger.info('Removed friend %s.', pod.host)
+ } catch (err) {
+ logger.error('Cannot destroy friend %s.', pod.host, err)
+ }
}
function getRequestScheduler () {
// ---------------------------------------------------------------------------
-function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
- // TODO: type res
- return getForeignPodsList(host).then(res => {
- const foreignPodsList: { host: string }[] = res.data
+async function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
+ const result = await getForeignPodsList(host)
+ const foreignPodsList: { host: string }[] = result.data
- // Let's give 1 point to the pod we ask the friends list
- foreignPodsList.push({ host })
+ // Let's give 1 point to the pod we ask the friends list
+ foreignPodsList.push({ host })
- foreignPodsList.forEach(foreignPod => {
- const foreignPodHost = foreignPod.host
+ for (const foreignPod of foreignPodsList) {
+ const foreignPodHost = foreignPod.host
- if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
- else podsScore[foreignPodHost] = 1
- })
+ if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
+ else podsScore[foreignPodHost] = 1
+ }
- return undefined
- })
+ return undefined
}
function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
const podsList = []
const baseScore = hosts.length / 2
- Object.keys(podsScore).forEach(podHost => {
+ for (const podHost of Object.keys(podsScore)) {
// If the pod is not me and with a good score we add it
if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
podsList.push({ host: podHost })
}
- })
+ }
return podsList
}
if (err) return rej(err)
try {
- const json = JSON.parse(body)
+ const json: ResultList<FormattedPod> = JSON.parse(body)
return res(json)
} catch (err) {
return rej(err)
})
}
-function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
+async function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
// Stop pool requests
requestScheduler.deactivate()
// Flush pool requests
requestScheduler.forceSend()
- return Promise.map(podsList, pod => {
- const params = {
- url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
- method: 'POST' as 'POST',
- json: {
- host: CONFIG.WEBSERVER.HOST,
- email: CONFIG.ADMIN.EMAIL,
- publicKey: cert
+ try {
+ await Bluebird.map(podsList, async pod => {
+ const params = {
+ url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/remote/pods/add',
+ method: 'POST' as 'POST',
+ json: {
+ host: CONFIG.WEBSERVER.HOST,
+ email: CONFIG.ADMIN.EMAIL,
+ publicKey: cert
+ }
}
- }
- return makeRetryRequest(params)
- .then(({ response, body }) => {
- body = body as { cert: string, email: string }
-
- if (response.statusCode === 200) {
- const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
- return podObj.save()
- .then(podCreated => {
-
- // Add our videos to the request scheduler
- sendOwnedDataToPod(podCreated.id)
- })
- .catch(err => {
- logger.error('Cannot add friend %s pod.', pod.host, err)
- })
- } else {
- logger.error('Status not 200 for %s pod.', pod.host)
+ const { response, body } = await makeRetryRequest(params)
+ const typedBody = body as { cert: string, email: string }
+
+ if (response.statusCode === 200) {
+ const podObj = db.Pod.build({ host: pod.host, publicKey: typedBody.cert, email: typedBody.email })
+
+ let podCreated: PodInstance
+ try {
+ podCreated = await podObj.save()
+ } catch (err) {
+ logger.error('Cannot add friend %s pod.', pod.host, err)
}
- })
- .catch(err => {
- logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
- // Don't break the process
- })
- }, { concurrency: REQUESTS_IN_PARALLEL })
- .then(() => logger.debug('makeRequestsToWinningPods finished.'))
- .finally(() => {
+
+ // Add our videos to the request scheduler
+ sendOwnedDataToPod(podCreated.id)
+ .catch(err => logger.warn('Cannot send owned data to pod %d.', podCreated.id, err))
+ } else {
+ logger.error('Status not 200 for %s pod.', pod.host)
+ }
+ }, { concurrency: REQUESTS_IN_PARALLEL })
+
+ logger.debug('makeRequestsToWinningPods finished.')
+
+ requestScheduler.activate()
+ } catch (err) {
// Final callback, we've ended all the requests
// Now we made new friends, we can re activate the pool of requests
requestScheduler.activate()
- })
+ }
}
// Wrapper that populate "toIds" argument with all our friends if it is not specified
toIds?: number[]
transaction: Sequelize.Transaction
}
-function createRequest (options: CreateRequestOptions) {
- if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
+async function createRequest (options: CreateRequestOptions) {
+ if (options.toIds !== undefined) {
+ await requestScheduler.createRequest(options as RequestSchedulerOptions)
+ return undefined
+ }
// If the "toIds" pods is not specified, we send the request to all our friends
- return db.Pod.listAllIds(options.transaction).then(podIds => {
- const newOptions = Object.assign(options, { toIds: podIds })
- return requestScheduler.createRequest(newOptions)
- })
+ const podIds = await db.Pod.listAllIds(options.transaction)
+
+ const newOptions = Object.assign(options, { toIds: podIds })
+ await requestScheduler.createRequest(newOptions)
+
+ return undefined
}
function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
import { database as db } from '../../../initializers/database'
import { logger, computeResolutionsToTranscode } from '../../../helpers'
import { addVideoToFriends } from '../../friends'
import { JobScheduler } from '../job-scheduler'
-function process (data: { videoUUID: string }, jobId: number) {
- return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => {
- // No video, maybe deleted?
- if (!video) {
- logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
- return undefined
- }
+async function process (data: { videoUUID: string }, jobId: number) {
+ const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
+ // No video, maybe deleted?
+ if (!video) {
+ logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
+ return undefined
+ }
+
+ await video.optimizeOriginalVideofile()
- return video.optimizeOriginalVideofile().then(() => video)
- })
+ return video
}
function onError (err: Error, jobId: number) {
return Promise.resolve()
}
-function onSuccess (jobId: number, video: VideoInstance) {
+async function onSuccess (jobId: number, video: VideoInstance) {
if (video === undefined) return undefined
logger.info('Job %d is a success.', jobId)
- video.toAddRemoteJSON()
- .then(remoteVideo => {
- // Now we'll add the video's meta data to our friends
- return addVideoToFriends(remoteVideo, null)
- })
- .then(() => {
- return video.getOriginalFileHeight()
- })
- .then(originalFileHeight => {
- // Create transcoding jobs if there are enabled resolutions
- const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
- logger.info(
- 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight,
- { resolutions: resolutionsEnabled }
- )
-
- if (resolutionsEnabled.length === 0) return undefined
-
- return db.sequelize.transaction(t => {
- const tasks: Promise<any>[] = []
-
- resolutionsEnabled.forEach(resolution => {
+ const remoteVideo = await video.toAddRemoteJSON()
+
+ // Now we'll add the video's meta data to our friends
+ await addVideoToFriends(remoteVideo, null)
+
+ const originalFileHeight = await video.getOriginalFileHeight()
+ // Create transcoding jobs if there are enabled resolutions
+
+ const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
+ logger.info(
+ 'Resolutions computed for video %s and origin file height of %d.', video.uuid, originalFileHeight,
+ { resolutions: resolutionsEnabled }
+ )
+
+ if (resolutionsEnabled.length !== 0) {
+ try {
+ await db.sequelize.transaction(async t => {
+ const tasks: Bluebird<any>[] = []
+
+ for (const resolution of resolutionsEnabled) {
const dataInput = {
videoUUID: video.uuid,
resolution
const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput)
tasks.push(p)
- })
+ }
- return Promise.all(tasks).then(() => resolutionsEnabled)
+ await Promise.all(tasks)
})
- })
- .then(resolutionsEnabled => {
- if (resolutionsEnabled === undefined) {
- logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
- return undefined
- }
logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
- })
- .catch((err: Error) => {
- logger.debug('Cannot transcode the video.', err)
- throw err
- })
-
+ } catch (err) {
+ logger.warn('Cannot transcode the video.', err)
+ }
+ } else {
+ logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
+ return undefined
+ }
}
// ---------------------------------------------------------------------------
import { VideoInstance } from '../../../models'
import { VideoResolution } from '../../../../shared'
-function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
- return db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID).then(video => {
- // No video, maybe deleted?
- if (!video) {
- logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
- return undefined
- }
-
- return video.transcodeOriginalVideofile(data.resolution).then(() => video)
- })
+async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
+ const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID)
+ // No video, maybe deleted?
+ if (!video) {
+ logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
+ return undefined
+ }
+
+ await video.transcodeOriginalVideofile(data.resolution)
+
+ return video
}
function onError (err: Error, jobId: number) {
return this.instance || (this.instance = new this())
}
- activate () {
+ async activate () {
const limit = JOBS_FETCH_LIMIT_PER_CYCLE
logger.info('Jobs scheduler activated.')
// Finish processing jobs from a previous start
const state = JOB_STATES.PROCESSING
- db.Job.listWithLimit(limit, state)
- .then(jobs => {
- this.enqueueJobs(jobsQueue, jobs)
-
- forever(
- next => {
- if (jobsQueue.length() !== 0) {
- // Finish processing the queue first
- return setTimeout(next, JOBS_FETCHING_INTERVAL)
- }
-
- const state = JOB_STATES.PENDING
- db.Job.listWithLimit(limit, state)
- .then(jobs => {
- this.enqueueJobs(jobsQueue, jobs)
-
- // Optimization: we could use "drain" from queue object
- return setTimeout(next, JOBS_FETCHING_INTERVAL)
- })
- .catch(err => logger.error('Cannot list pending jobs.', err))
- },
-
- err => logger.error('Error in job scheduler queue.', err)
- )
- })
- .catch(err => logger.error('Cannot list pending jobs.', err))
+ try {
+ const jobs = await db.Job.listWithLimit(limit, state)
+
+ this.enqueueJobs(jobsQueue, jobs)
+ } catch (err) {
+ logger.error('Cannot list pending jobs.', err)
+ }
+
+ forever(
+ async next => {
+ if (jobsQueue.length() !== 0) {
+ // Finish processing the queue first
+ return setTimeout(next, JOBS_FETCHING_INTERVAL)
+ }
+
+ const state = JOB_STATES.PENDING
+ try {
+ const jobs = await db.Job.listWithLimit(limit, state)
+
+ this.enqueueJobs(jobsQueue, jobs)
+ } catch (err) {
+ logger.error('Cannot list pending jobs.', err)
+ }
+
+ // Optimization: we could use "drain" from queue object
+ return setTimeout(next, JOBS_FETCHING_INTERVAL)
+ },
+
+ err => logger.error('Error in job scheduler queue.', err)
+ )
}
createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
jobs.forEach(job => jobsQueue.push(job))
}
- private processJob (job: JobInstance, callback: (err: Error) => void) {
+ private async processJob (job: JobInstance, callback: (err: Error) => void) {
const jobHandler = jobHandlers[job.handlerName]
if (jobHandler === undefined) {
logger.error('Unknown job handler for job %s.', job.handlerName)
logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
job.state = JOB_STATES.PROCESSING
- return job.save()
- .then(() => {
- return jobHandler.process(job.handlerInputData, job.id)
- })
- .then(
- result => {
- return this.onJobSuccess(jobHandler, job, result)
- },
-
- err => {
- logger.error('Error in job handler %s.', job.handlerName, err)
- return this.onJobError(jobHandler, job, err)
- }
- )
- .then(() => callback(null))
- .catch(err => {
- this.cannotSaveJobError(err)
- return callback(err)
- })
+ await job.save()
+
+ try {
+ const result = await jobHandler.process(job.handlerInputData, job.id)
+ await this.onJobSuccess(jobHandler, job, result)
+ } catch (err) {
+ logger.error('Error in job handler %s.', job.handlerName, err)
+
+ try {
+ await this.onJobError(jobHandler, job, err)
+ } catch (innerErr) {
+ this.cannotSaveJobError(innerErr)
+ return callback(innerErr)
+ }
+ }
+
+ callback(null)
}
- private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
+ private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
job.state = JOB_STATES.ERROR
- return job.save()
- .then(() => jobHandler.onError(err, job.id))
- .catch(err => this.cannotSaveJobError(err))
+ try {
+ await job.save()
+ await jobHandler.onError(err, job.id)
+ } catch (err) {
+ this.cannotSaveJobError(err)
+ }
}
- private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
+ private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
job.state = JOB_STATES.SUCCESS
- return job.save()
- .then(() => jobHandler.onSuccess(job.id, jobResult))
- .catch(err => this.cannotSaveJobError(err))
+ try {
+ await job.save()
+ jobHandler.onSuccess(job.id, jobResult)
+ } catch (err) {
+ this.cannotSaveJobError(err)
+ }
}
private cannotSaveJobError (err: Error) {
return db.OAuthToken.getByRefreshTokenAndPopulateClient(refreshToken)
}
-function getUser (username: string, password: string) {
+async function getUser (username: string, password: string) {
logger.debug('Getting User (username: ' + username + ', password: ******).')
- return db.User.getByUsername(username).then(user => {
- if (!user) return null
+ const user = await db.User.getByUsername(username)
+ if (!user) return null
- return user.isPasswordMatch(password).then(passwordMatch => {
- if (passwordMatch === false) return null
+ const passwordMatch = await user.isPasswordMatch(password)
+ if (passwordMatch === false) return null
- return user
- })
- })
+ return user
}
-function revokeToken (token: TokenInfo) {
- return db.OAuthToken.getByRefreshTokenAndPopulateUser(token.refreshToken).then(tokenDB => {
- if (tokenDB) tokenDB.destroy()
-
- /*
- * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js
- * "As per the discussion we need set older date
- * revokeToken will expected return a boolean in future version
- * https://github.com/oauthjs/node-oauth2-server/pull/274
- * https://github.com/oauthjs/node-oauth2-server/issues/290"
- */
- const expiredToken = tokenDB
- expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z')
-
- return expiredToken
- })
+async function revokeToken (tokenInfo: TokenInfo) {
+ const token = await db.OAuthToken.getByRefreshTokenAndPopulateUser(tokenInfo.refreshToken)
+ if (token) token.destroy()
+
+ /*
+ * Thanks to https://github.com/manjeshpv/node-oauth2-server-implementation/blob/master/components/oauth/mongo-models.js
+ * "As per the discussion we need set older date
+ * revokeToken will expected return a boolean in future version
+ * https://github.com/oauthjs/node-oauth2-server/pull/274
+ * https://github.com/oauthjs/node-oauth2-server/issues/290"
+ */
+ const expiredToken = token
+ expiredToken.refreshTokenExpiresAt = new Date('2015-05-28T06:59:53.000Z')
+
+ return expiredToken
}
-function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) {
+async function saveToken (token: TokenInfo, client: OAuthClientInstance, user: UserInstance) {
logger.debug('Saving token ' + token.accessToken + ' for client ' + client.id + ' and user ' + user.id + '.')
const tokenToCreate = {
userId: user.id
}
- return db.OAuthToken.create(tokenToCreate).then(tokenCreated => {
- const tokenToReturn = Object.assign(tokenCreated, { client, user })
+ const tokenCreated = await db.OAuthToken.create(tokenToCreate)
+ const tokenToReturn = Object.assign(tokenCreated, { client, user })
- return tokenToReturn
- })
+ return tokenToReturn
}
// ---------------------------------------------------------------------------
import { isEmpty } from 'lodash'
-import * as Promise from 'bluebird'
+import * as Bluebird from 'bluebird'
import { database as db } from '../../initializers/database'
import { logger, makeSecureRequest } from '../../helpers'
// ---------------------------------------------------------------------------
// Make a requests to friends of a certain type
- protected makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
+ protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) {
const params = {
toPod: toPod,
method: 'POST' as 'POST',
// Make multiple retry requests to all of pods
// The function fire some useful callbacks
- return makeSecureRequest(params)
- .then(({ response, body }) => {
- if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
- throw new Error('Status code not 20x : ' + response.statusCode)
- }
- })
- .catch(err => {
- logger.error('Error sending secure request to %s pod.', toPod.host, err)
-
- throw err
- })
+ try {
+ const { response } = await makeSecureRequest(params)
+ if (response.statusCode !== 200 && response.statusCode !== 201 && response.statusCode !== 204) {
+ throw new Error('Status code not 20x : ' + response.statusCode)
+ }
+ } catch (err) {
+ logger.error('Error sending secure request to %s pod.', toPod.host, err)
+
+ throw err
+ }
}
// Make all the requests of the scheduler
- protected makeRequests () {
- return this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
- .then((requestsGrouped: T) => {
- // We want to group requests by destinations pod and endpoint
- const requestsToMake = this.buildRequestsObjects(requestsGrouped)
-
- // If there are no requests, abort
- if (isEmpty(requestsToMake) === true) {
- logger.info('No "%s" to make.', this.description)
- return { goodPods: [], badPods: [] }
- }
-
- logger.info('Making "%s" to friends.', this.description)
-
- const goodPods: number[] = []
- const badPods: number[] = []
-
- return Promise.map(Object.keys(requestsToMake), hashKey => {
- const requestToMake = requestsToMake[hashKey]
- const toPod: PodInstance = requestToMake.toPod
-
- return this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
- .then(() => {
- logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
- goodPods.push(requestToMake.toPod.id)
-
- this.afterRequestHook()
-
- // Remove the pod id of these request ids
- return this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
- })
- .catch(err => {
- badPods.push(requestToMake.toPod.id)
- logger.info('Cannot make request to %s.', toPod.host, err)
- })
- }, { concurrency: REQUESTS_IN_PARALLEL }).then(() => ({ goodPods, badPods }))
- })
- .then(({ goodPods, badPods }) => {
- this.afterRequestsHook()
-
- // All the requests were made, we update the pods score
- return db.Pod.updatePodsScore(goodPods, badPods)
- })
- .catch(err => logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }))
+ protected async makeRequests () {
+ let requestsGrouped: T
+
+ try {
+ requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod)
+ } catch (err) {
+ logger.error('Cannot get the list of "%s".', this.description, { error: err.stack })
+ throw err
+ }
+
+ // We want to group requests by destinations pod and endpoint
+ const requestsToMake = this.buildRequestsObjects(requestsGrouped)
+
+ // If there are no requests, abort
+ if (isEmpty(requestsToMake) === true) {
+ logger.info('No "%s" to make.', this.description)
+ return { goodPods: [], badPods: [] }
+ }
+
+ logger.info('Making "%s" to friends.', this.description)
+
+ const goodPods: number[] = []
+ const badPods: number[] = []
+
+ await Bluebird.map(Object.keys(requestsToMake), async hashKey => {
+ const requestToMake = requestsToMake[hashKey]
+ const toPod: PodInstance = requestToMake.toPod
+
+ try {
+ await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas)
+ logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
+ goodPods.push(requestToMake.toPod.id)
+
+ this.afterRequestHook()
+
+ // Remove the pod id of these request ids
+ await this.getRequestToPodModel()
+ .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id)
+ } catch (err) {
+ badPods.push(requestToMake.toPod.id)
+ logger.info('Cannot make request to %s.', toPod.host, err)
+ }
+ }, { concurrency: REQUESTS_IN_PARALLEL })
+
+ this.afterRequestsHook()
+
+ // All the requests were made, we update the pods score
+ await db.Pod.updatePodsScore(goodPods, badPods)
}
protected afterRequestHook () {
- // Nothing to do, let children reimplement it
+ // Nothing to do, let children re-implement it
}
protected afterRequestsHook () {
- // Nothing to do, let children reimplement it
+ // Nothing to do, let children re-implement it
}
}
buildRequestsObjects (requestsGrouped: RequestsGrouped) {
const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {}
- Object.keys(requestsGrouped).forEach(toPodId => {
- requestsGrouped[toPodId].forEach(data => {
+ for (const toPodId of Object.keys(requestsGrouped)) {
+ for (const data of requestsGrouped[toPodId]) {
const request = data.request
const pod = data.pod
const hashKey = toPodId + request.endpoint
requestsToMakeGrouped[hashKey].ids.push(request.id)
requestsToMakeGrouped[hashKey].datas.push(request.request)
- })
- })
+ }
+ }
return requestsToMakeGrouped
}
- createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
+ async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) {
// If there are no destination pods abort
if (toIds.length === 0) return undefined
transaction
}
- return db.Request.create(createQuery, dbRequestOptions)
- .then(request => {
- return request.setPods(toIds, dbRequestOptions)
- })
+ const request = await db.Request.create(createQuery, dbRequestOptions)
+ await request.setPods(toIds, dbRequestOptions)
}
// ---------------------------------------------------------------------------
// We group video events per video and per pod
// We add the counts of the same event types
- Object.keys(eventRequests).forEach(toPodId => {
- eventRequests[toPodId].forEach(eventToProcess => {
+ for (const toPodId of Object.keys(eventRequests)) {
+ for (const eventToProcess of eventRequests[toPodId]) {
if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {}
if (!requestsToMakeGrouped[toPodId]) {
if (!events[eventToProcess.type]) events[eventToProcess.type] = 0
events[eventToProcess.type] += eventToProcess.count
- })
- })
+ }
+ }
// Now we build our requests array per pod
- Object.keys(eventsPerVideoPerPod).forEach(toPodId => {
+ for (const toPodId of Object.keys(eventsPerVideoPerPod)) {
const eventsForPod = eventsPerVideoPerPod[toPodId]
- Object.keys(eventsForPod).forEach(uuid => {
+ for (const uuid of Object.keys(eventsForPod)) {
const eventsForVideo = eventsForPod[uuid]
- Object.keys(eventsForVideo).forEach(eventType => {
+ for (const eventType of Object.keys(eventsForVideo)) {
requestsToMakeGrouped[toPodId].datas.push({
data: {
uuid,
count: +eventsForVideo[eventType]
}
})
- })
- })
- })
+ }
+ }
+ }
return requestsToMakeGrouped
}
buildRequestsObjects (requests: RequestsVideoQaduGrouped) {
const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {}
- Object.keys(requests).forEach(toPodId => {
- requests[toPodId].forEach(data => {
+ for (const toPodId of Object.keys(requests)) {
+ for (const data of requests[toPodId]) {
const request = data.request
const video = data.video
const pod = data.pod
// Maybe there are multiple quick and dirty update for the same video
// We use this hash map to dedupe them
requestsToMakeGrouped[hashKey].videos[video.id] = videoData
- })
- })
+ }
+ }
// Now we deduped similar quick and dirty updates, we can build our requests data
- Object.keys(requestsToMakeGrouped).forEach(hashKey => {
- Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoUUID => {
+ for (const hashKey of Object.keys(requestsToMakeGrouped)) {
+ for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) {
const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID]
requestsToMakeGrouped[hashKey].datas.push({
data: videoData
})
- })
+ }
// We don't need it anymore, it was just to build our data array
delete requestsToMakeGrouped[hashKey].videos
- })
+ }
return requestsToMakeGrouped
}
- createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
+ async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) {
const dbRequestOptions: Sequelize.BulkCreateOptions = {}
if (transaction) dbRequestOptions.transaction = transaction
// Send the update to all our friends
- return db.Pod.listAllIds(transaction).then(podIds => {
- const queries = []
- podIds.forEach(podId => {
- queries.push({ type, videoId, podId })
- })
-
- return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
- })
+ const podIds = await db.Pod.listAllIds(transaction)
+ const queries = []
+ for (const podId of podIds) {
+ queries.push({ type, videoId, podId })
+ }
+
+ await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions)
+ return undefined
}
}
import { addVideoAuthorToFriends } from './friends'
import { createVideoChannel } from './video-channel'
-function createUserAuthorAndChannel (user: UserInstance, validateUser = true) {
- return db.sequelize.transaction(t => {
+async function createUserAuthorAndChannel (user: UserInstance, validateUser = true) {
+ const res = await db.sequelize.transaction(async t => {
const userOptions = {
transaction: t,
validate: validateUser
}
- return user.save(userOptions)
- .then(user => {
- const author = db.Author.build({
- name: user.username,
- podId: null, // It is our pod
- userId: user.id
- })
-
- return author.save({ transaction: t })
- .then(author => ({ author, user }))
- })
- .then(({ author, user }) => {
- const remoteVideoAuthor = author.toAddRemoteJSON()
-
- // Now we'll add the video channel's meta data to our friends
- return addVideoAuthorToFriends(remoteVideoAuthor, t)
- .then(() => ({ author, user }))
- })
- .then(({ author, user }) => {
- const videoChannelInfo = {
- name: `Default ${user.username} channel`
- }
-
- return createVideoChannel(videoChannelInfo, author, t)
- .then(videoChannel => ({ author, user, videoChannel }))
- })
+ const userCreated = await user.save(userOptions)
+ const authorInstance = db.Author.build({
+ name: userCreated.username,
+ podId: null, // It is our pod
+ userId: userCreated.id
+ })
+
+ const authorCreated = await authorInstance.save({ transaction: t })
+
+ const remoteVideoAuthor = authorCreated.toAddRemoteJSON()
+
+ // Now we'll add the video channel's meta data to our friends
+ const author = await addVideoAuthorToFriends(remoteVideoAuthor, t)
+
+ const videoChannelInfo = {
+ name: `Default ${userCreated.username} channel`
+ }
+ const videoChannel = await createVideoChannel(videoChannelInfo, authorCreated, t)
+
+ return { author, videoChannel }
})
+
+ return res
}
// ---------------------------------------------------------------------------
} from '../helpers'
import { PodSignature } from '../../shared'
-function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) {
+async function checkSignature (req: express.Request, res: express.Response, next: express.NextFunction) {
const signatureObject: PodSignature = req.body.signature
const host = signatureObject.host
- db.Pod.loadByHost(host)
- .then(pod => {
- if (pod === null) {
- logger.error('Unknown pod %s.', host)
- return res.sendStatus(403)
- }
-
- logger.debug('Checking signature from %s.', host)
+ try {
+ const pod = await db.Pod.loadByHost(host)
+ if (pod === null) {
+ logger.error('Unknown pod %s.', host)
+ return res.sendStatus(403)
+ }
- let signatureShouldBe
- // If there is data in the body the sender used it for its signature
- // If there is no data we just use its host as signature
- if (req.body.data) {
- signatureShouldBe = req.body.data
- } else {
- signatureShouldBe = host
- }
+ logger.debug('Checking signature from %s.', host)
- const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature)
+ let signatureShouldBe
+ // If there is data in the body the sender used it for its signature
+ // If there is no data we just use its host as signature
+ if (req.body.data) {
+ signatureShouldBe = req.body.data
+ } else {
+ signatureShouldBe = host
+ }
- if (signatureOk === true) {
- res.locals.secure = {
- pod
- }
+ const signatureOk = peertubeCryptoCheckSignature(pod.publicKey, signatureShouldBe, signatureObject.signature)
- return next()
+ if (signatureOk === true) {
+ res.locals.secure = {
+ pod
}
- logger.error('Signature is not okay in body for %s.', signatureObject.host)
- return res.sendStatus(403)
- })
- .catch(err => {
- logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature })
- return res.sendStatus(500)
- })
+ return next()
+ }
+
+ logger.error('Signature is not okay in body for %s.', signatureObject.host)
+ return res.sendStatus(403)
+ } catch (err) {
+ logger.error('Cannot get signed host in body.', { error: err.stack, signature: signatureObject.signature })
+ return res.sendStatus(500)
+ }
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// Remove pods with a score of 0 (too many requests where they were unreachable)
-function removeBadPods () {
- return listBadPods()
- .then(pods => {
- const podsRemovePromises = pods.map(pod => pod.destroy())
- return Promise.all(podsRemovePromises).then(() => pods.length)
- })
- .then(numberOfPodsRemoved => {
- if (numberOfPodsRemoved) {
- logger.info('Removed %d pods.', numberOfPodsRemoved)
- } else {
- logger.info('No need to remove bad pods.')
- }
- })
- .catch(err => {
- logger.error('Cannot remove bad pods.', err)
- })
+async function removeBadPods () {
+ try {
+ const pods = await listBadPods()
+
+ const podsRemovePromises = pods.map(pod => pod.destroy())
+ await Promise.all(podsRemovePromises)
+
+ const numberOfPodsRemoved = pods.length
+
+ if (numberOfPodsRemoved) {
+ logger.info('Removed %d pods.', numberOfPodsRemoved)
+ } else {
+ logger.info('No need to remove bad pods.')
+ }
+ } catch (err) {
+ logger.error('Cannot remove bad pods.', err)
+ }
}