import * as express from 'express'
-import * as Sequelize from 'sequelize'
-import { eachSeries, waterfall } from 'async'
+import * as Promise from 'bluebird'
import { database as db } from '../../../initializers/database'
import {
remoteQaduVideosValidator,
remoteEventsVideosValidator
} from '../../../middlewares'
-import {
- logger,
- commitTransaction,
- retryTransactionWrapper,
- rollbackTransaction,
- startSerializableTransaction
-} from '../../../helpers'
+import { logger, retryTransactionWrapper } from '../../../helpers'
import { quickAndDirtyUpdatesVideoToFriends } from '../../../lib'
import { PodInstance, VideoInstance } from '../../../models'
const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
// Functions to call when processing a remote request
-const functionsHash = {}
+const functionsHash: { [ id: string ]: (...args) => Promise<any> } = {}
functionsHash[ENDPOINT_ACTIONS.ADD] = addRemoteVideoRetryWrapper
functionsHash[ENDPOINT_ACTIONS.UPDATE] = updateRemoteVideoRetryWrapper
functionsHash[ENDPOINT_ACTIONS.REMOVE] = removeRemoteVideo
// We need to process in the same order to keep consistency
// TODO: optimization
- eachSeries(requests, function (request: any, callbackEach) {
+ Promise.mapSeries(requests, (request: any) => {
const data = request.data
// Get the function we need to call in order to process the request
const fun = functionsHash[request.type]
if (fun === undefined) {
logger.error('Unkown remote request type %s.', request.type)
- return callbackEach(null)
+ return
}
- fun.call(this, data, fromPod, callbackEach)
- }, function (err) {
- if (err) logger.error('Error managing remote videos.', { error: err })
+ return fun.call(this, data, fromPod)
})
+ .catch(err => logger.error('Error managing remote videos.', { error: err }))
// We don't need to keep the other pod waiting
return res.type('json').status(204).end()
const requests = req.body.data
const fromPod = res.locals.secure.pod
- eachSeries(requests, function (request: any, callbackEach) {
+ Promise.mapSeries(requests, (request: any) => {
const videoData = request.data
- quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod, callbackEach)
- }, function (err) {
- if (err) logger.error('Error managing remote videos.', { error: err })
+ return quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod)
})
+ .catch(err => logger.error('Error managing remote videos.', { error: err }))
return res.type('json').status(204).end()
}
const requests = req.body.data
const fromPod = res.locals.secure.pod
- eachSeries(requests, function (request: any, callbackEach) {
+ Promise.mapSeries(requests, (request: any) => {
const eventData = request.data
- processVideosEventsRetryWrapper(eventData, fromPod, callbackEach)
- }, function (err) {
- if (err) logger.error('Error managing remote videos.', { error: err })
+ return processVideosEventsRetryWrapper(eventData, fromPod)
})
+ .catch(err => logger.error('Error managing remote videos.', { error: err }))
return res.type('json').status(204).end()
}
-function processVideosEventsRetryWrapper (eventData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function processVideosEventsRetryWrapper (eventData: any, fromPod: PodInstance) {
const options = {
arguments: [ eventData, fromPod ],
errorMessage: 'Cannot process videos events with many retries.'
}
- retryTransactionWrapper(processVideosEvents, options, finalCallback)
+ return retryTransactionWrapper(processVideosEvents, options)
}
-function processVideosEvents (eventData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
- waterfall([
- startSerializableTransaction,
-
- function findVideo (t, callback) {
- fetchOwnedVideo(eventData.remoteId, function (err, videoInstance) {
- return callback(err, t, videoInstance)
- })
- },
+function processVideosEvents (eventData: any, fromPod: PodInstance) {
- function updateVideoIntoDB (t, videoInstance, callback) {
- const options = { transaction: t }
+ return db.sequelize.transaction(t => {
+ return fetchOwnedVideo(eventData.remoteId)
+ .then(videoInstance => {
+ const options = { transaction: t }
- let columnToUpdate
- let qaduType
+ let columnToUpdate
+ let qaduType
- switch (eventData.eventType) {
- case REQUEST_VIDEO_EVENT_TYPES.VIEWS:
- columnToUpdate = 'views'
- qaduType = REQUEST_VIDEO_QADU_TYPES.VIEWS
- break
+ switch (eventData.eventType) {
+ case REQUEST_VIDEO_EVENT_TYPES.VIEWS:
+ columnToUpdate = 'views'
+ qaduType = REQUEST_VIDEO_QADU_TYPES.VIEWS
+ break
- case REQUEST_VIDEO_EVENT_TYPES.LIKES:
- columnToUpdate = 'likes'
- qaduType = REQUEST_VIDEO_QADU_TYPES.LIKES
- break
+ case REQUEST_VIDEO_EVENT_TYPES.LIKES:
+ columnToUpdate = 'likes'
+ qaduType = REQUEST_VIDEO_QADU_TYPES.LIKES
+ break
- case REQUEST_VIDEO_EVENT_TYPES.DISLIKES:
- columnToUpdate = 'dislikes'
- qaduType = REQUEST_VIDEO_QADU_TYPES.DISLIKES
- break
+ case REQUEST_VIDEO_EVENT_TYPES.DISLIKES:
+ columnToUpdate = 'dislikes'
+ qaduType = REQUEST_VIDEO_QADU_TYPES.DISLIKES
+ break
- default:
- return callback(new Error('Unknown video event type.'))
- }
+ default:
+ throw new Error('Unknown video event type.')
+ }
- const query = {}
- query[columnToUpdate] = eventData.count
+ const query = {}
+ query[columnToUpdate] = eventData.count
- videoInstance.increment(query, options).asCallback(function (err) {
- return callback(err, t, videoInstance, qaduType)
+ return videoInstance.increment(query, options).then(() => ({ videoInstance, qaduType }))
})
- },
-
- function sendQaduToFriends (t, videoInstance, qaduType, callback) {
- const qadusParams = [
- {
- videoId: videoInstance.id,
- type: qaduType
- }
- ]
-
- quickAndDirtyUpdatesVideoToFriends(qadusParams, t, function (err) {
- return callback(err, t)
+ .then(({ videoInstance, qaduType }) => {
+ const qadusParams = [
+ {
+ videoId: videoInstance.id,
+ type: qaduType
+ }
+ ]
+
+ return quickAndDirtyUpdatesVideoToFriends(qadusParams, t)
})
- },
-
- commitTransaction
-
- ], function (err: Error, t: Sequelize.Transaction) {
- if (err) {
- logger.debug('Cannot process a video event.', { error: err })
- return rollbackTransaction(err, t, finalCallback)
- }
-
- logger.info('Remote video event processed for video %s.', eventData.remoteId)
- return finalCallback(null)
+ })
+ .then(() => logger.info('Remote video event processed for video %s.', eventData.remoteId))
+ .catch(err => {
+ logger.debug('Cannot process a video event.', { error: err })
+ throw err
})
}
-function quickAndDirtyUpdateVideoRetryWrapper (videoData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function quickAndDirtyUpdateVideoRetryWrapper (videoData: any, fromPod: PodInstance) {
const options = {
arguments: [ videoData, fromPod ],
errorMessage: 'Cannot update quick and dirty the remote video with many retries.'
}
- retryTransactionWrapper(quickAndDirtyUpdateVideo, options, finalCallback)
+ return retryTransactionWrapper(quickAndDirtyUpdateVideo, options)
}
-function quickAndDirtyUpdateVideo (videoData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function quickAndDirtyUpdateVideo (videoData: any, fromPod: PodInstance) {
let videoName
- waterfall([
- startSerializableTransaction,
-
- function findVideo (t, callback) {
- fetchRemoteVideo(fromPod.host, videoData.remoteId, function (err, videoInstance) {
- return callback(err, t, videoInstance)
- })
- },
-
- function updateVideoIntoDB (t, videoInstance, callback) {
- const options = { transaction: t }
+ return db.sequelize.transaction(t => {
+ return fetchRemoteVideo(fromPod.host, videoData.remoteId)
+ .then(videoInstance => {
+ const options = { transaction: t }
- videoName = videoInstance.name
+ videoName = videoInstance.name
- if (videoData.views) {
- videoInstance.set('views', videoData.views)
- }
+ if (videoData.views) {
+ videoInstance.set('views', videoData.views)
+ }
- if (videoData.likes) {
- videoInstance.set('likes', videoData.likes)
- }
+ if (videoData.likes) {
+ videoInstance.set('likes', videoData.likes)
+ }
- if (videoData.dislikes) {
- videoInstance.set('dislikes', videoData.dislikes)
- }
+ if (videoData.dislikes) {
+ videoInstance.set('dislikes', videoData.dislikes)
+ }
- videoInstance.save(options).asCallback(function (err) {
- return callback(err, t)
+ return videoInstance.save(options)
})
- },
-
- commitTransaction
-
- ], function (err: Error, t: Sequelize.Transaction) {
- if (err) {
- logger.debug('Cannot quick and dirty update the remote video.', { error: err })
- return rollbackTransaction(err, t, finalCallback)
- }
-
- logger.info('Remote video %s quick and dirty updated', videoName)
- return finalCallback(null)
})
+ .then(() => logger.info('Remote video %s quick and dirty updated', videoName))
+ .catch(err => logger.debug('Cannot quick and dirty update the remote video.', { error: err }))
}
// Handle retries on fail
-function addRemoteVideoRetryWrapper (videoToCreateData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function addRemoteVideoRetryWrapper (videoToCreateData: any, fromPod: PodInstance) {
const options = {
arguments: [ videoToCreateData, fromPod ],
errorMessage: 'Cannot insert the remote video with many retries.'
}
- retryTransactionWrapper(addRemoteVideo, options, finalCallback)
+ return retryTransactionWrapper(addRemoteVideo, options)
}
-function addRemoteVideo (videoToCreateData: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function addRemoteVideo (videoToCreateData: any, fromPod: PodInstance) {
logger.debug('Adding remote video "%s".', videoToCreateData.remoteId)
- waterfall([
-
- startSerializableTransaction,
-
- function assertRemoteIdAndHostUnique (t, callback) {
- db.Video.loadByHostAndRemoteId(fromPod.host, videoToCreateData.remoteId, function (err, video) {
- if (err) return callback(err)
+ return db.sequelize.transaction(t => {
+ return db.Video.loadByHostAndRemoteId(fromPod.host, videoToCreateData.remoteId)
+ .then(video => {
+ if (video) throw new Error('RemoteId and host pair is not unique.')
- if (video) return callback(new Error('RemoteId and host pair is not unique.'))
-
- return callback(null, t)
+ return undefined
})
- },
-
- function findOrCreateAuthor (t, callback) {
- const name = videoToCreateData.author
- const podId = fromPod.id
- // This author is from another pod so we do not associate a user
- const userId = null
+ .then(() => {
+ const name = videoToCreateData.author
+ const podId = fromPod.id
+ // This author is from another pod so we do not associate a user
+ const userId = null
- db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
- return callback(err, t, authorInstance)
+ return db.Author.findOrCreateAuthor(name, podId, userId, t)
})
- },
+ .then(author => {
+ const tags = videoToCreateData.tags
- function findOrCreateTags (t, author, callback) {
- const tags = videoToCreateData.tags
-
- db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
- return callback(err, t, author, tagInstances)
+ return db.Tag.findOrCreateTags(tags, t).then(tagInstances => ({ author, tagInstances }))
})
- },
-
- function createVideoObject (t, author, tagInstances, callback) {
- const videoData = {
- name: videoToCreateData.name,
- remoteId: videoToCreateData.remoteId,
- extname: videoToCreateData.extname,
- infoHash: videoToCreateData.infoHash,
- category: videoToCreateData.category,
- licence: videoToCreateData.licence,
- language: videoToCreateData.language,
- nsfw: videoToCreateData.nsfw,
- description: videoToCreateData.description,
- authorId: author.id,
- duration: videoToCreateData.duration,
- createdAt: videoToCreateData.createdAt,
- // FIXME: updatedAt does not seems to be considered by Sequelize
- updatedAt: videoToCreateData.updatedAt,
- views: videoToCreateData.views,
- likes: videoToCreateData.likes,
- dislikes: videoToCreateData.dislikes
- }
-
- const video = db.Video.build(videoData)
-
- return callback(null, t, tagInstances, video)
- },
-
- function generateThumbnail (t, tagInstances, video, callback) {
- db.Video.generateThumbnailFromData(video, videoToCreateData.thumbnailData, function (err) {
- if (err) {
- logger.error('Cannot generate thumbnail from data.', { error: err })
- return callback(err)
+ .then(({ author, tagInstances }) => {
+ const videoData = {
+ name: videoToCreateData.name,
+ remoteId: videoToCreateData.remoteId,
+ extname: videoToCreateData.extname,
+ infoHash: videoToCreateData.infoHash,
+ category: videoToCreateData.category,
+ licence: videoToCreateData.licence,
+ language: videoToCreateData.language,
+ nsfw: videoToCreateData.nsfw,
+ description: videoToCreateData.description,
+ authorId: author.id,
+ duration: videoToCreateData.duration,
+ createdAt: videoToCreateData.createdAt,
+ // FIXME: updatedAt does not seems to be considered by Sequelize
+ updatedAt: videoToCreateData.updatedAt,
+ views: videoToCreateData.views,
+ likes: videoToCreateData.likes,
+ dislikes: videoToCreateData.dislikes
}
- return callback(err, t, tagInstances, video)
+ const video = db.Video.build(videoData)
+ return { tagInstances, video }
})
- },
-
- function insertVideoIntoDB (t, tagInstances, video, callback) {
- const options = {
- transaction: t
- }
-
- video.save(options).asCallback(function (err, videoCreated) {
- return callback(err, t, tagInstances, videoCreated)
+ .then(({ tagInstances, video }) => {
+ return db.Video.generateThumbnailFromData(video, videoToCreateData.thumbnailData).then(() => ({ tagInstances, video }))
})
- },
-
- function associateTagsToVideo (t, tagInstances, video, callback) {
- const options = {
- transaction: t
- }
+ .then(({ tagInstances, video }) => {
+ const options = {
+ transaction: t
+ }
- video.setTags(tagInstances, options).asCallback(function (err) {
- return callback(err, t)
+ return video.save(options).then(videoCreated => ({ tagInstances, videoCreated }))
})
- },
-
- commitTransaction
-
- ], function (err: Error, t: Sequelize.Transaction) {
- if (err) {
- // This is just a debug because we will retry the insert
- logger.debug('Cannot insert the remote video.', { error: err })
- return rollbackTransaction(err, t, finalCallback)
- }
+ .then(({ tagInstances, videoCreated }) => {
+ const options = {
+ transaction: t
+ }
- logger.info('Remote video %s inserted.', videoToCreateData.name)
- return finalCallback(null)
+ return videoCreated.setTags(tagInstances, options)
+ })
+ })
+ .then(() => logger.info('Remote video %s inserted.', videoToCreateData.name))
+ .catch(err => {
+ logger.debug('Cannot insert the remote video.', { error: err })
+ throw err
})
}
// Handle retries on fail
-function updateRemoteVideoRetryWrapper (videoAttributesToUpdate: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function updateRemoteVideoRetryWrapper (videoAttributesToUpdate: any, fromPod: PodInstance) {
const options = {
arguments: [ videoAttributesToUpdate, fromPod ],
errorMessage: 'Cannot update the remote video with many retries'
}
- retryTransactionWrapper(updateRemoteVideo, options, finalCallback)
+ return retryTransactionWrapper(updateRemoteVideo, options)
}
-function updateRemoteVideo (videoAttributesToUpdate: any, fromPod: PodInstance, finalCallback: (err: Error) => void) {
+function updateRemoteVideo (videoAttributesToUpdate: any, fromPod: PodInstance) {
logger.debug('Updating remote video "%s".', videoAttributesToUpdate.remoteId)
- waterfall([
+ return db.sequelize.transaction(t => {
+ return fetchRemoteVideo(fromPod.host, videoAttributesToUpdate.remoteId)
+ .then(videoInstance => {
+ const tags = videoAttributesToUpdate.tags
- startSerializableTransaction,
-
- function findVideo (t, callback) {
- fetchRemoteVideo(fromPod.host, videoAttributesToUpdate.remoteId, function (err, videoInstance) {
- return callback(err, t, videoInstance)
+ return db.Tag.findOrCreateTags(tags, t).then(tagInstances => ({ videoInstance, tagInstances }))
})
- },
-
- function findOrCreateTags (t, videoInstance, callback) {
- const tags = videoAttributesToUpdate.tags
-
- db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
- return callback(err, t, videoInstance, tagInstances)
- })
- },
-
- function updateVideoIntoDB (t, videoInstance, tagInstances, callback) {
- const options = { transaction: t }
-
- videoInstance.set('name', videoAttributesToUpdate.name)
- videoInstance.set('category', videoAttributesToUpdate.category)
- videoInstance.set('licence', videoAttributesToUpdate.licence)
- videoInstance.set('language', videoAttributesToUpdate.language)
- videoInstance.set('nsfw', videoAttributesToUpdate.nsfw)
- videoInstance.set('description', videoAttributesToUpdate.description)
- videoInstance.set('infoHash', videoAttributesToUpdate.infoHash)
- videoInstance.set('duration', videoAttributesToUpdate.duration)
- videoInstance.set('createdAt', videoAttributesToUpdate.createdAt)
- videoInstance.set('updatedAt', videoAttributesToUpdate.updatedAt)
- videoInstance.set('extname', videoAttributesToUpdate.extname)
- videoInstance.set('views', videoAttributesToUpdate.views)
- videoInstance.set('likes', videoAttributesToUpdate.likes)
- videoInstance.set('dislikes', videoAttributesToUpdate.dislikes)
-
- videoInstance.save(options).asCallback(function (err) {
- return callback(err, t, videoInstance, tagInstances)
+ .then(({ videoInstance, tagInstances }) => {
+ const options = { transaction: t }
+
+ videoInstance.set('name', videoAttributesToUpdate.name)
+ videoInstance.set('category', videoAttributesToUpdate.category)
+ videoInstance.set('licence', videoAttributesToUpdate.licence)
+ videoInstance.set('language', videoAttributesToUpdate.language)
+ videoInstance.set('nsfw', videoAttributesToUpdate.nsfw)
+ videoInstance.set('description', videoAttributesToUpdate.description)
+ videoInstance.set('infoHash', videoAttributesToUpdate.infoHash)
+ videoInstance.set('duration', videoAttributesToUpdate.duration)
+ videoInstance.set('createdAt', videoAttributesToUpdate.createdAt)
+ videoInstance.set('updatedAt', videoAttributesToUpdate.updatedAt)
+ videoInstance.set('extname', videoAttributesToUpdate.extname)
+ videoInstance.set('views', videoAttributesToUpdate.views)
+ videoInstance.set('likes', videoAttributesToUpdate.likes)
+ videoInstance.set('dislikes', videoAttributesToUpdate.dislikes)
+
+ return videoInstance.save(options).then(() => ({ videoInstance, tagInstances }))
})
- },
-
- function associateTagsToVideo (t, videoInstance, tagInstances, callback) {
- const options = { transaction: t }
+ .then(({ videoInstance, tagInstances }) => {
+ const options = { transaction: t }
- videoInstance.setTags(tagInstances, options).asCallback(function (err) {
- return callback(err, t)
+ return videoInstance.setTags(tagInstances, options)
})
- },
-
- commitTransaction
-
- ], function (err: Error, t: Sequelize.Transaction) {
- if (err) {
- // This is just a debug because we will retry the insert
- logger.debug('Cannot update the remote video.', { error: err })
- return rollbackTransaction(err, t, finalCallback)
- }
-
- logger.info('Remote video %s updated', videoAttributesToUpdate.name)
- return finalCallback(null)
+ })
+ .then(() => logger.info('Remote video %s updated', videoAttributesToUpdate.name))
+ .catch(err => {
+ // This is just a debug because we will retry the insert
+ logger.debug('Cannot update the remote video.', { error: err })
+ throw err
})
}
-function removeRemoteVideo (videoToRemoveData: any, fromPod: PodInstance, callback: (err: Error) => void) {
+function removeRemoteVideo (videoToRemoveData: any, fromPod: PodInstance) {
// We need the instance because we have to remove some other stuffs (thumbnail etc)
- fetchRemoteVideo(fromPod.host, videoToRemoveData.remoteId, function (err, video) {
- // Do not return the error, continue the process
- if (err) return callback(null)
-
- logger.debug('Removing remote video %s.', video.remoteId)
- video.destroy().asCallback(function (err) {
- // Do not return the error, continue the process
- if (err) {
- logger.error('Cannot remove remote video with id %s.', videoToRemoveData.remoteId, { error: err })
- }
-
- return callback(null)
+ return fetchRemoteVideo(fromPod.host, videoToRemoveData.remoteId)
+ .then(video => {
+ logger.debug('Removing remote video %s.', video.remoteId)
+ return video.destroy()
+ })
+ .catch(err => {
+ logger.debug('Could not fetch remote video.', { host: fromPod.host, remoteId: videoToRemoveData.remoteId, error: err })
})
- })
}
-function reportAbuseRemoteVideo (reportData: any, fromPod: PodInstance, callback: (err: Error) => void) {
- fetchOwnedVideo(reportData.videoRemoteId, function (err, video) {
- if (err || !video) {
- if (!err) err = new Error('video not found')
-
- logger.error('Cannot load video from id.', { error: err, id: reportData.videoRemoteId })
- // Do not return the error, continue the process
- return callback(null)
- }
-
- logger.debug('Reporting remote abuse for video %s.', video.id)
-
- const videoAbuseData = {
- reporterUsername: reportData.reporterUsername,
- reason: reportData.reportReason,
- reporterPodId: fromPod.id,
- videoId: video.id
- }
+function reportAbuseRemoteVideo (reportData: any, fromPod: PodInstance) {
+ return fetchOwnedVideo(reportData.videoRemoteId)
+ .then(video => {
+ logger.debug('Reporting remote abuse for video %s.', video.id)
- db.VideoAbuse.create(videoAbuseData).asCallback(function (err) {
- if (err) {
- logger.error('Cannot create remote abuse video.', { error: err })
+ const videoAbuseData = {
+ reporterUsername: reportData.reporterUsername,
+ reason: reportData.reportReason,
+ reporterPodId: fromPod.id,
+ videoId: video.id
}
- return callback(null)
+ return db.VideoAbuse.create(videoAbuseData)
})
- })
+ .catch(err => logger.error('Cannot create remote abuse video.', { error: err }))
}
-function fetchOwnedVideo (id: string, callback: (err: Error, video?: VideoInstance) => void) {
- db.Video.load(id, function (err, video) {
- if (err || !video) {
- if (!err) err = new Error('video not found')
+function fetchOwnedVideo (id: string) {
+ return db.Video.load(id)
+ .then(video => {
+ if (!video) throw new Error('Video not found')
+ return video
+ })
+ .catch(err => {
logger.error('Cannot load owned video from id.', { error: err, id })
- return callback(err)
- }
-
- return callback(null, video)
- })
+ throw err
+ })
}
-function fetchRemoteVideo (podHost: string, remoteId: string, callback: (err: Error, video?: VideoInstance) => void) {
- db.Video.loadByHostAndRemoteId(podHost, remoteId, function (err, video) {
- if (err || !video) {
- if (!err) err = new Error('video not found')
+function fetchRemoteVideo (podHost: string, remoteId: string) {
+ return db.Video.loadByHostAndRemoteId(podHost, remoteId)
+ .then(video => {
+ if (!video) throw new Error('Video not found')
+ return video
+ })
+ .catch(err => {
logger.error('Cannot load video from host and remote id.', { error: err, podHost, remoteId })
- return callback(err)
- }
-
- return callback(null, video)
- })
+ throw err
+ })
}